Completar frases
Mastering PySpark Data Transformation TechniquesVersión en línea
Drills to master data transformation techniques in PySpark
1
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("product_id").orderBy("date").rowsBetween(-6, 0
from pyspark.sql.functions import avg, col
df = df.withColumn("rolling_avg", avg(col("sales")).over(windowSpec
Scenario
5
:
Calculating
Rolling
Averages
for
Time
-
Series
Data
Data
Transformation
Techniques
:
Using
Window
Functions
Problem
:
You
need
to
calculate
a
7
-
day
rolling
average
of
sales
figures
for
each
product
in
your
dataset
to
smooth
out
daily
fluctuations
and
highlight
longer
-
term
trends
.
Solution
:
from
pyspark
.
sql
.
window
import
Window
from
pyspark
.
sql
.
functions
import
avg
,
col
#
Define
a
window
specification
windowSpec
=
Window
.
partitionBy
(
"
product_id
"
)
.
orderBy
(
"
date
"
)
.
rowsBetween
(
-
6
,
0
)
#
Calculate
the
rolling
average
using
the
defined
window
specification
df
=
df
.
withColumn
(
"
rolling_avg
"
,
avg
(
col
(
"
sales
"
)
)
.
over
(
windowSpec
)
)
-
-
-
-
#
Define
a
window
specification
)
#
Calculate
the
rolling
average
using
the
defined
window
specification
)
)
Explanation
:
Window
Specification
:
Window
.
partitionBy
(
"
product_id
"
)
groups
the
data
by
product
,
ensuring
the
average
is
computed
per
product
.
orderBy
(
"
date
"
)
ensures
that
the
data
within
each
group
is
considered
in
chronological
order
.
rowsBetween
(
-
6
,
0
)
defines
the
window
to
consider
the
current
row
and
the
6
preceding
rows
(
7
days
in
total
)
,
which
is
crucial
for
calculating
a
rolling
average
.
Rolling
Average
Calculation
:
avg
(
col
(
"
sales
"
)
)
.
over
(
windowSpec
)
computes
the
average
sales
over
the
defined
window
for
each
row
.
This
technique
is
particularly
useful
in
time
-
series
analysis
where
you
want
to
analyze
trends
over
a
moving
time
window
.
2
from pyspark.sql.functions import lower, regexp_replace
text"], "[^a-zA-Z0-9\s
cleaned_df = df.withColumn("clean_text", lower(regexp_replace(df
Scenario
1
:
Cleaning
Text
Data
Problem
:
You
need
to
clean
text
data
in
a
DataFrame
column
by
removing
whitespace
,
converting
text
to
lowercase
,
and
eliminating
punctuation
.
Solution
:
from
pyspark
.
sql
.
functions
import
lower
,
regexp_replace
cleaned_df
=
df
.
withColumn
(
"
clean_text
"
,
lower
(
regexp_replace
(
df
[
"
text
"
]
,
"
[
^a
-
zA
-
Z0
-
9
\
s
]
"
,
"
"
)
)
)
[
"
]
"
,
"
"
)
)
)
Explanation
:
lower
(
)
converts
all
text
to
lowercase
.
regexp_replace
(
)
removes
all
characters
except
alphanumeric
characters
and
spaces
.
This
method
is
used
to
clean
up
text
data
by
standardizing
its
format
,
making
it
easier
to
process
or
analyze
.
3
filled_df = df.withColumn("numeric_column", coalesce(df["numeric_column"], lit(avg_value
from pyspark.sql.functions import avg, coalesce
avg_value = df.select(avg(df["numeric_column"]).alias("avg
collect()[0]["avg
Scenario
3
:
Handling
Null
Values
in
Numeric
Data
Problem
:
You
want
to
replace
null
values
in
a
numeric
column
with
the
column's
average
value
.
Solution
:
from
pyspark
.
sql
.
functions
import
avg
,
coalesce
#
Calculate
the
average
of
the
column
,
assuming
nulls
are
ignored
in
the
calculation
avg_value
=
df
.
select
(
avg
(
df
[
"
numeric_column
"
]
)
.
alias
(
"
avg
"
)
)
.
collect
(
)
[
0
]
[
"
avg
"
]
#
Replace
nulls
with
the
average
value
filled_df
=
df
.
withColumn
(
"
numeric_column
"
,
coalesce
(
df
[
"
numeric_column
"
]
,
lit
(
avg_value
)
)
)
#
Calculate
the
average
of
the
column
,
assuming
nulls
are
ignored
in
the
calculation
"
)
)
.
"
]
#
Replace
nulls
with
the
average
value
)
)
)
Explanation
:
avg
(
)
calculates
the
average
,
collect
(
)
retrieves
it
as
a
scalar
value
,
and
coalesce
(
)
replaces
nulls
with
this
average
.
This
approach
ensures
that
your
dataset
is
free
of
nulls
,
which
can
skew
analysis
or
result
in
errors
during
further
data
processing
.
4
from pyspark.sql.functions import col, to_date
converted_df = df.withColumn("date", to_date(col("date_string"), "MM/dd/yyyy
Scenario
2
:
Converting
Data
Types
Problem
:
You
have
a
DataFrame
with
a
column
'date_string'
in
string
format
that
you
need
to
convert
to
a
Date
type
.
Solution
:
from
pyspark
.
sql
.
functions
import
col
,
to_date
converted_df
=
df
.
withColumn
(
"
date
"
,
to_date
(
col
(
"
date_string
"
)
,
"
MM
/
dd
/
yyyy
"
)
)
"
)
)
Explanation
:
to_date
(
)
converts
a
string
column
to
a
date
format
,
specified
by
the
date
pattern
"
MM
/
dd
/
yyyy
"
.
This
conversion
is
essential
for
proper
date
handling
in
time
-
series
analysis
or
date
-
based
filtering
.
5
unique_df = df.dropDuplicates(["column1", "column2
Scenario
4
:
Removing
Duplicates
Based
on
Specific
Columns
Problem
:
You
need
to
de
-
duplicate
records
in
a
DataFrame
,
considering
only
a
subset
of
columns
for
identifying
duplicates
.
Solution
:
unique_df
=
df
.
dropDuplicates
(
[
"
column1
"
,
"
column2
"
]
)
"
]
)
Explanation
:
dropDuplicates
(
)
with
specific
column
names
ensures
that
rows
are
considered
duplicates
only
if
the
specified
columns
match
.
This
method
is
useful
when
you
want
to
remove
duplicates
based
on
key
attributes
without
considering
the
entire
row
.
6
from pyspark.sql.functions import broadcast
optimized_join_df = df_large.join(broadcast(df_small), df_large["key"] == df_small["key
Scenario
6
:
Optimizing
Joins
with
Broadcast
Variables
Best
Practices
for
Scalable
Data
Processing
Problem
:
You
need
to
join
a
large
DataFrame
with
a
small
DataFrame
efficiently
in
a
Spark
job
to
prevent
excessive
shuffling
and
optimize
the
processing
time
.
Solution
:
from
pyspark
.
sql
.
functions
import
broadcast
#
Assuming
df_large
is
the
large
DataFrame
and
df_small
is
the
small
DataFrame
optimized_join_df
=
df_large
.
join
(
broadcast
(
df_small
)
,
df_large
[
"
key
"
]
=
=
df_small
[
"
key
"
]
)
#
Assuming
df_large
is
the
large
DataFrame
and
df_small
is
the
small
DataFrame
"
]
)
Explanation
:
Broadcast
Join
:
Using
broadcast
(
df_small
)
is
a
technique
where
the
small
DataFrame
is
broadcasted
to
all
the
worker
nodes
in
the
cluster
.
This
means
every
node
has
a
local
copy
of
the
smaller
DataFrame
,
which
reduces
the
need
for
shuffling
the
data
of
the
larger
DataFrame
across
the
network
when
performing
the
join
.
Performance
Benefit
:
Broadcast
joins
are
highly
beneficial
when
the
right
-
hand
side
DataFrame
is
significantly
smaller
than
the
left
-
hand
side
DataFrame
.
It
minimizes
the
volume
of
data
transfer
and
can
drastically
improve
the
performance
of
the
join
operation
.
This
method
is
an
example
of
leveraging
Spark's
broadcast
capabilities
to
optimize
join
operations
in
large
-
scale
data
environments
.
It's
a
powerful
technique
for
improving
the
efficiency
and
speed
of
data
processing
tasks
that
involve
combining
datasets
of
disparate
sizes
.
|