from pyspark.sql.functions import rand
skewed_join_df = df1_skewed.join(df2_skewed, df1_skewed["key"] == df2_skewed["key
10).cast("int
final_df = skewed_join_df.withColumn("key", expr("substring(key, 1, length(key)-2
df2_skewed = df2.withColumn("key", concat(df2["key"], lit("_"), (rand
df1_skewed = df1.withColumn("key", concat(df1["key"], lit("_"), (rand()*10).cast("int
Scenario
20
:
Handling
Data
Skew
in
Joins
Problem
:
You
are
experiencing
slow
join
operations
due
to
data
skew
where
certain
keys
dominate
the
dataset
.
Solution
:
from
pyspark
.
sql
.
functions
import
rand
#
Add
a
random
suffix
to
the
key
in
both
DataFrames
df1_skewed
=
df1
.
withColumn
(
"
key
"
,
concat
(
df1
[
"
key
"
]
,
lit
(
"
_
"
)
,
(
rand
(
)
*
10
)
.
cast
(
"
int
"
)
)
)
df2_skewed
=
df2
.
withColumn
(
"
key
"
,
concat
(
df2
[
"
key
"
]
,
lit
(
"
_
"
)
,
(
rand
(
)
*
10
)
.
cast
(
"
int
"
)
)
)
#
Perform
the
join
skewed_join_df
=
df1_skewed
.
join
(
df2_skewed
,
df1_skewed
[
"
key
"
]
=
=
df2_skewed
[
"
key
"
]
)
#
Remove
the
suffix
to
recover
the
original
key
final_df
=
skewed_join_df
.
withColumn
(
"
key
"
,
expr
(
"
substring
(
key
,
1
,
length
(
key
)
-
2
)
"
)
)
____________________
#
Add
a
random
suffix
to
the
key
in
both
DataFrames
____________________
"
)
)
)
____________________
(
)
*
____________________
"
)
)
)
#
Perform
the
join
____________________
"
]
)
#
Remove
the
suffix
to
recover
the
original
key
____________________
)
"
)
)
Explanation
:
By
adding
a
random
suffix
to
the
join
key
,
you
distribute
the
data
more
evenly
across
the
cluster
,
mitigating
the
effect
of
skewed
keys
.
This
reduces
the
workload
on
any
single
node
and
balances
the
processing
load
,
enhancing
performance
.