Completar frases
Mastering PySpark Data ProcessingVersión en línea
Drills to master data processing in PySpark
1
df = df.withColumn("exploded", explode(df["array_column
aggregated_df = df.groupBy("id").agg(avg("exploded").alias("average_value
from pyspark.sql.functions import explode, avg
Scenario
10
:
Aggregating
Elements
in
an
Array
Problem
:
Compute
the
average
of
numeric
values
stored
in
an
array
column
.
Solution
:
from
pyspark
.
sql
.
functions
import
explode
,
avg
df
=
df
.
withColumn
(
"
exploded
"
,
explode
(
df
[
"
array_column
"
]
)
)
aggregated_df
=
df
.
groupBy
(
"
id
"
)
.
agg
(
avg
(
"
exploded
"
)
.
alias
(
"
average_value
"
)
)
"
]
)
)
"
)
)
Explanation
:
explode
(
)
is
used
to
transform
each
element
of
an
array
into
a
separate
row
,
replicating
all
other
column
values
.
This
is
then
followed
by
grouping
and
averaging
the
exploded
values
.
This
technique
is
useful
for
performing
aggregations
on
data
stored
in
arrays
.
2
from pyspark.sql.functions import broadcast
joined_df = df_large.join(broadcast(df_small), df_large["key"] == df_small["key
Scenario
19
:
Performing
an
Efficient
Join
Join
Operations
and
Optimizations
Problem
:
You
need
to
join
two
large
DataFrames
on
a
common
key
but
want
to
avoid
performance
issues
like
data
skew
or
excessive
shuffling
.
Solution
:
from
pyspark
.
sql
.
functions
import
broadcast
#
Assuming
df_small
is
the
smaller
DataFrame
and
df_large
is
the
larger
one
joined_df
=
df_large
.
join
(
broadcast
(
df_small
)
,
df_large
[
"
key
"
]
=
=
df_small
[
"
key
"
]
)
-
-
-
#
Assuming
df_small
is
the
smaller
DataFrame
and
df_large
is
the
larger
one
"
]
)
Explanation
:
The
broadcast
(
)
function
is
used
in
joins
where
one
DataFrame
is
significantly
smaller
than
the
other
.
It
broadcasts
the
smaller
DataFrame
to
all
nodes
in
the
cluster
,
reducing
the
need
for
shuffling
the
larger
DataFrame
across
the
network
and
speeding
up
the
join
process
.
3
avg_sales_df = spark.sql("SELECT department, AVG(sales) AS average_sales FROM sales GROUP BY department
df.createOrReplaceTempView("sales
Scenario
13
:
Aggregating
Data
Using
Group
By
in
SQL
Problem
:
You
need
to
calculate
the
average
sales
by
department
from
a
sales
DataFrame
using
SQL
.
Solution
:
df
.
createOrReplaceTempView
(
"
sales
"
)
avg_sales_df
=
spark
.
sql
(
"
SELECT
department
,
AVG
(
sales
)
AS
average_sales
FROM
sales
GROUP
BY
department
"
)
"
)
"
)
Explanation
:
SQL's
GROUP
BY
clause
is
used
here
to
aggregate
sales
data
by
department
,
calculating
the
average
sales
per
department
.
This
approach
leverages
SQL's
powerful
aggregation
capabilities
,
making
the
query
easy
to
understand
and
maintain
.
4
joined_df = spark.sql("SELECT * FROM table1 INNER JOIN table2 ON table1.id = table2.id
df1.createOrReplaceTempView("table1
df2.createOrReplaceTempView("table2
Scenario
12
:
Joining
DataFrames
Using
SQL
Using
SQL
Queries
in
PySpark
Problem
:
You
have
two
DataFrames
and
need
to
perform
an
inner
join
on
them
using
SQL
.
Solution
:
df1
.
createOrReplaceTempView
(
"
table1
"
)
df2
.
createOrReplaceTempView
(
"
table2
"
)
joined_df
=
spark
.
sql
(
"
SELECT
*
FROM
table1
INNER
JOIN
table2
ON
table1
.
id
=
table2
.
id
"
)
"
)
"
)
"
)
Explanation
:
Temp
views
allow
you
to
run
SQL
queries
on
DataFrame
objects
.
After
creating
views
for
each
DataFrame
,
you
can
perform
joins
and
other
SQL
operations
just
as
you
would
in
a
database
query
environment
,
leveraging
SQL's
expressive
and
familiar
syntax
.
5
optimized_join_df = df1.join(df2, "join_key
df1 = df1.repartition("join_key
df2 = df2.repartition("join_key
Scenario
21
:
Optimizing
Joins
on
Large
Datasets
Problem
:
You
need
to
join
two
large
DataFrames
and
ensure
the
operation
is
as
efficient
as
possible
.
Solution
:
#
Ensure
both
DataFrames
are
partitioned
on
the
join
key
df1
=
df1
.
repartition
(
"
join_key
"
)
df2
=
df2
.
repartition
(
"
join_key
"
)
#
Perform
the
join
optimized_join_df
=
df1
.
join
(
df2
,
"
join_key
"
)
-
-
-
-
#
Ensure
both
DataFrames
are
partitioned
on
the
join
key
"
)
"
)
#
Perform
the
join
"
)
Explanation
:
Repartitioning
both
DataFrames
on
the
join
key
before
performing
the
join
helps
in
colocating
rows
with
the
same
key
on
the
same
node
,
reducing
the
data
that
needs
to
be
shuffled
across
the
cluster
when
the
join
is
performed
.
This
method
significantly
improves
join
efficiency
and
scalability
.
6
clean_df = df.filter(df["key_column"].isNotNull
Scenario
15
:
Filtering
Out
Null
Values
Handling
Nulls
and
Dirty
Data
Problem
:
You
need
to
exclude
rows
where
any
key
column
contains
a
null
value
to
maintain
data
integrity
for
analysis
.
Solution
:
clean_df
=
df
.
filter
(
df
[
"
key_column
"
]
.
isNotNull
(
)
)
(
)
)
Explanation
:
The
.
isNotNull
(
)
method
is
used
to
check
for
non
-
null
values
in
a
DataFrame
column
.
This
function
is
part
of
a
filter
that
removes
rows
containing
nulls
in
critical
columns
,
ensuring
the
robustness
of
subsequent
data
processing
.
7
SELECT * FROM events
filtered_sorted_df = spark.sql
df.createOrReplaceTempView("events
ORDER BY event_date DESC
WHERE event_date >= date_sub(current_date(), 365
Scenario
14
:
Filtering
and
Sorting
Data
in
SQL
Problem
:
Filter
the
data
to
include
only
entries
from
the
last
year
and
then
sort
these
entries
by
date
in
descending
order
.
Solution
:
df
.
createOrReplaceTempView
(
"
events
"
)
filtered_sorted_df
=
spark
.
sql
(
"
"
"
SELECT
*
FROM
events
WHERE
event_date
>
=
date_sub
(
current_date
(
)
,
365
)
ORDER
BY
event_date
DESC
"
"
"
)
"
)
(
"
"
"
)
"
"
"
)
Explanation
:
The
SQL
query
filters
events
from
the
last
365
days
using
date_sub
(
current_date
(
)
,
365
)
and
sorts
them
in
descending
order
by
event_date
.
This
showcases
how
to
integrate
date
calculations
and
sorting
in
SQL
,
useful
for
time
-
series
analyses
.
8
semi_joined_df = df1.join(df2, df1["key"] == df2["key"], "left_semi
Scenario
22
:
Using
Semi
-
Joins
to
Filter
Data
Problem
:
You
want
to
filter
rows
in
one
DataFrame
based
on
the
presence
of
keys
in
another
DataFrame
without
needing
the
columns
from
the
second
DataFrame
.
Solution
:
#
Using
semi
join
to
filter
df1
based
on
presence
of
keys
in
df2
semi_joined_df
=
df1
.
join
(
df2
,
df1
[
"
key
"
]
=
=
df2
[
"
key
"
]
,
"
left_semi
"
)
-
-
-
-
#
Using
semi
join
to
filter
df1
based
on
presence
of
keys
in
df2
"
)
Explanation
:
A
left_semi
join
includes
all
rows
from
the
first
DataFrame
where
there
is
a
match
in
the
second
DataFrame
,
effectively
filtering
the
first
DataFrame
.
This
is
useful
when
you
only
need
to
check
the
existence
of
a
key
and
do
not
require
data
from
the
second
DataFrame
.
9
result_df = spark.sql("SELECT * FROM temp_view WHERE age > 30
df.createOrReplaceTempView("temp_view
Scenario
11
:
Creating
Temporary
Views
Using
SQL
Queries
in
PySpark
Problem
:
You
need
to
perform
several
complex
SQL
operations
on
a
DataFrame
,
and
prefer
to
use
SQL
syntax
.
Solution
:
df
.
createOrReplaceTempView
(
"
temp_view
"
)
result_df
=
spark
.
sql
(
"
SELECT
*
FROM
temp_view
WHERE
age
>
30
"
)
"
)
"
)
Explanation
:
By
creating
a
temporary
view
using
createOrReplaceTempView
(
"
temp_view
"
)
,
you
can
execute
SQL
queries
directly
on
the
data
stored
in
the
DataFrame
.
This
is
particularly
useful
for
users
familiar
with
SQL
,
allowing
complex
queries
without
extensive
DataFrame
manipulation
code
.
10
complete_data_df = df.dropna(subset=required_columns
required_columns = ["col1", "col2", "col3
Scenario
18
:
Handling
Missing
or
Incomplete
Data
Problem
:
You
discover
that
some
entries
in
your
DataFrame
are
missing
values
in
multiple
columns
which
are
required
for
a
specific
analysis
.
Solution
:
#
Define
columns
that
must
not
be
null
for
your
analysis
required_columns
=
[
"
col1
"
,
"
col2
"
,
"
col3
"
]
#
Filter
out
rows
where
any
of
the
required
columns
are
null
complete_data_df
=
df
.
dropna
(
subset
=
required_columns
)
-
-
-
-
-
-
#
Define
columns
that
must
not
be
null
for
your
analysis
"
]
#
Filter
out
rows
where
any
of
the
required
columns
are
null
)
Explanation
:
dropna
(
)
is
used
to
remove
rows
with
null
values
in
specified
columns
.
By
setting
subset
=
required_columns
,
it
ensures
that
only
rows
with
complete
data
in
the
required
fields
are
retained
.
This
is
essential
for
analyses
that
need
complete
records
to
produce
valid
results
.
11
df = df.withColumn("country", coalesce(df["country"], lit("Unknown
df = df.withColumn("age", coalesce(df["age"], lit(avg_age
from pyspark.sql.functions import coalesce, lit, mean
avg_age = df.select(mean(df["age"])).collect()[0][0
Scenario
16
:
Replacing
Nulls
with
Default
Values
Problem
:
In
a
customer
data
DataFrame
,
replace
nulls
in
the
'age'
column
with
the
average
age
and
nulls
in
the
'country'
column
with
'Unknown'
.
Solution
:
from
pyspark
.
sql
.
functions
import
coalesce
,
lit
,
mean
avg_age
=
df
.
select
(
mean
(
df
[
"
age
"
]
)
)
.
collect
(
)
[
0
]
[
0
]
df
=
df
.
withColumn
(
"
age
"
,
coalesce
(
df
[
"
age
"
]
,
lit
(
avg_age
)
)
)
df
=
df
.
withColumn
(
"
country
"
,
coalesce
(
df
[
"
country
"
]
,
lit
(
"
Unknown
"
)
)
)
-
-
]
)
)
)
"
)
)
)
Explanation
:
coalesce
(
)
finds
the
first
non
-
null
value
among
its
arguments
.
Here
,
it's
used
along
with
lit
(
)
,
which
creates
a
literal
column
.
For
'age'
,
it
replaces
nulls
with
the
average
age
calculated
from
the
dataset
.
For
'country'
,
it
substitutes
nulls
with
the
string
"
Unknown
"
.
12
email_regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
0
valid_emails_df = df.filter(regexp_extract(col("email"), email_regex
from pyspark.sql.functions import col, regexp_extract
Scenario
17
:
Cleaning
Dirty
Data
(
Invalid
Entries
)
Problem
:
Entries
in
the
'email'
column
of
a
customer
data
DataFrame
contain
some
invalid
emails
that
you
want
to
filter
out
.
Solution
:
from
pyspark
.
sql
.
functions
import
col
,
regexp_extract
#
Define
a
regular
expression
for
valid
emails
email_regex
=
r'
\
b
[
A
-
Za
-
z0
-
9
.
_%
+
-
]
+
@
[
A
-
Za
-
z0
-
9
.
-
]
+
\
.
[
A
-
Z
|
a
-
z
]
{
2
,
}
\
b'
#
Filter
data
to
include
only
rows
with
valid
emails
valid_emails_df
=
df
.
filter
(
regexp_extract
(
col
(
"
email
"
)
,
email_regex
,
0
)
!
=
"
"
)
-
-
-
-
#
Define
a
regular
expression
for
valid
emails
#
Filter
data
to
include
only
rows
with
valid
emails
,
)
!
=
"
"
)
Explanation
:
Using
regexp_extract
(
)
with
a
regular
expression
for
email
validation
allows
you
to
filter
rows
based
on
whether
the
'email'
column
contains
a
valid
email
.
This
method
is
effective
for
ensuring
data
quality
by
removing
rows
with
invalid
email
formats
.
13
selected_df = df.select("column1", "column2", "column3
Scenario
1
:
Selecting
Specific
Columns
Problem
:
You
want
to
reduce
the
dataset
to
only
include
a
few
relevant
columns
for
analysis
.
Solution
:
selected_df
=
df
.
select
(
"
column1
"
,
"
column2
"
,
"
column3
"
)
"
)
Explanation
:
The
select
(
)
function
is
used
to
specify
a
subset
of
columns
from
the
DataFrame
.
This
is
useful
for
focusing
on
relevant
data
and
can
improve
performance
by
reducing
the
amount
of
data
processed
in
subsequent
transformations
.
14
filtered_df = df.filter(df["age"] > 30
Scenario
2
:
Filtering
Data
Based
on
Conditions
Problem
:
You
need
to
filter
the
dataset
to
include
only
records
where
the
age
is
greater
than
30
.
Solution
:
filtered_df
=
df
.
filter
(
df
[
"
age
"
]
>
30
)
)
Explanation
:
filter
(
)
applies
a
condition
to
each
row
in
the
DataFrame
and
retains
rows
that
meet
the
criteria
.
This
is
essential
for
narrowing
down
data
to
relevant
records
before
performing
more
intensive
operations
.
15
10).cast("int
df1_skewed = df1.withColumn("key", concat(df1["key"], lit("_"), (rand()*10).cast("int
df2_skewed = df2.withColumn("key", concat(df2["key"], lit("_"), (rand
final_df = skewed_join_df.withColumn("key", expr("substring(key, 1, length(key)-2
skewed_join_df = df1_skewed.join(df2_skewed, df1_skewed["key"] == df2_skewed["key
from pyspark.sql.functions import rand
Scenario
20
:
Handling
Data
Skew
in
Joins
Problem
:
You
are
experiencing
slow
join
operations
due
to
data
skew
where
certain
keys
dominate
the
dataset
.
Solution
:
from
pyspark
.
sql
.
functions
import
rand
#
Add
a
random
suffix
to
the
key
in
both
DataFrames
df1_skewed
=
df1
.
withColumn
(
"
key
"
,
concat
(
df1
[
"
key
"
]
,
lit
(
"
_
"
)
,
(
rand
(
)
*
10
)
.
cast
(
"
int
"
)
)
)
df2_skewed
=
df2
.
withColumn
(
"
key
"
,
concat
(
df2
[
"
key
"
]
,
lit
(
"
_
"
)
,
(
rand
(
)
*
10
)
.
cast
(
"
int
"
)
)
)
#
Perform
the
join
skewed_join_df
=
df1_skewed
.
join
(
df2_skewed
,
df1_skewed
[
"
key
"
]
=
=
df2_skewed
[
"
key
"
]
)
#
Remove
the
suffix
to
recover
the
original
key
final_df
=
skewed_join_df
.
withColumn
(
"
key
"
,
expr
(
"
substring
(
key
,
1
,
length
(
key
)
-
2
)
"
)
)
#
Add
a
random
suffix
to
the
key
in
both
DataFrames
"
)
)
)
(
)
*
"
)
)
)
#
Perform
the
join
"
]
)
#
Remove
the
suffix
to
recover
the
original
key
)
"
)
)
Explanation
:
By
adding
a
random
suffix
to
the
join
key
,
you
distribute
the
data
more
evenly
across
the
cluster
,
mitigating
the
effect
of
skewed
keys
.
This
reduces
the
workload
on
any
single
node
and
balances
the
processing
load
,
enhancing
performance
.
16
from pyspark.sql import functions as F
aggregated_df = df.groupBy("department").agg(F.avg("sales").alias("average_sales
Scenario
3
:
Grouping
and
Aggregating
Data
Problem
:
Calculate
the
average
sales
by
department
in
a
retail
dataset
.
Solution
:
from
pyspark
.
sql
import
functions
as
F
aggregated_df
=
df
.
groupBy
(
"
department
"
)
.
agg
(
F
.
avg
(
"
sales
"
)
.
alias
(
"
average_sales
"
)
)
"
)
)
Explanation
:
groupBy
(
)
followed
by
agg
(
)
allows
for
complex
aggregations
,
such
as
computing
averages
,
sums
,
and
counts
,
grouped
by
specific
fields
.
This
is
pivotal
in
generating
insights
and
summaries
from
large
datasets
.
17
df = df.withColumn("total_cost", df["quantity"] * df["price_per_unit
Scenario
4
:
Adding
a
Computed
Column
Problem
:
Add
a
new
column
that
shows
the
total
cost
,
calculated
as
the
product
of
quantity
and
price
per
unit
.
Solution
:
df
=
df
.
withColumn
(
"
total_cost
"
,
df
[
"
quantity
"
]
*
df
[
"
price_per_unit
"
]
)
"
]
)
Explanation
:
withColumn
(
)
is
used
to
add
a
new
column
to
the
DataFrame
,
which
is
a
result
of
an
expression
or
calculation
involving
existing
columns
.
This
method
is
commonly
used
to
enrich
data
with
additional
metrics
or
to
prepare
data
for
further
analysis
.
18
unique_df = df.dropDuplicates(["column1", "column2
Scenario
5
:
Removing
Duplicates
Problem
:
Remove
duplicate
records
in
the
dataset
based
on
specific
columns
.
Solution
:
unique_df
=
df
.
dropDuplicates
(
[
"
column1
"
,
"
column2
"
]
)
"
]
)
Explanation
:
dropDuplicates
(
)
helps
in
removing
duplicate
rows
from
a
DataFrame
based
on
all
or
a
subset
of
columns
.
This
is
especially
useful
when
processing
datasets
where
records
have
been
entered
more
than
once
.
19
df = df.withColumnRenamed("old_name", "new_name
Scenario
6
:
Renaming
Columns
Problem
:
Rename
a
column
from
"
old_name
"
to
"
new_name
"
for
clarity
or
standardization
.
Solution
:
df
=
df
.
withColumnRenamed
(
"
old_name
"
,
"
new_name
"
)
"
)
Explanation
:
withColumnRenamed
(
)
changes
the
name
of
a
DataFrame
column
,
which
is
particularly
useful
in
standardizing
column
names
across
different
data
sources
or
making
them
more
descriptive
.
20
filtered_df = df.filter(array_contains(df["array_column"], "value_to_check
from pyspark.sql.functions import array_contains
Scenario
7
:
Working
with
Array
Type
Problem
:
You
need
to
filter
out
records
where
the
array
column
contains
a
specific
value
.
Solution
:
from
pyspark
.
sql
.
functions
import
array_contains
filtered_df
=
df
.
filter
(
array_contains
(
df
[
"
array_column
"
]
,
"
value_to_check
"
)
)
"
)
)
Explanation
:
array_contains
(
)
checks
whether
the
specified
value
exists
within
an
array
column
.
This
function
is
particularly
useful
when
you
want
to
filter
rows
based
on
the
content
of
an
array
.
21
df = df.withColumn("age", df["info.age"]).withColumn("name", df["info.name
Scenario
8
:
Extracting
Data
from
Structs
Problem
:
You
have
a
DataFrame
with
a
struct
column
info
that
contains
fields
age
and
name
.
You
need
to
access
these
fields
to
create
new
columns
.
Solution
:
df
=
df
.
withColumn
(
"
age
"
,
df
[
"
info
.
age
"
]
)
.
withColumn
(
"
name
"
,
df
[
"
info
.
name
"
]
)
"
]
)
Explanation
:
Accessing
fields
in
a
struct
is
done
using
a
dot
notation
(
struct_name
.
field_name
)
.
This
allows
you
to
flatten
structures
for
easier
analysis
and
manipulation
.
22
df = df.withColumn("specific_value", col("map_column")["key
from pyspark.sql.functions import col
Scenario
9
:
Using
Maps
Problem
:
You
want
to
retrieve
a
value
from
a
map
column
based
on
a
specific
key
.
Solution
:
from
pyspark
.
sql
.
functions
import
col
df
=
df
.
withColumn
(
"
specific_value
"
,
col
(
"
map_column
"
)
[
"
key
"
]
)
"
]
)
Explanation
:
In
PySpark
,
accessing
a
map's
value
by
its
key
is
straightforward
using
the
bracket
notation
(
map_column
[
"
key
"
]
)
.
This
is
efficient
for
retrieving
related
data
grouped
as
key
-
value
pairs
within
a
single
DataFrame
column
.
|