Completar frases
Mastering PySpark: Setting Up and Reading DataVersión en línea
Mastering PySpark setup up and reading data
1
df_transformed = df.withColumn("new_column", df["existing_column"] * 2
df = spark.table("sample_database.sample_table
df_transformed.write.mode("overwrite").saveAsTable("sample_database.transformed_table
appName("Walmart Data Engineer Interview Preparation
from pyspark.sql import SparkSession
spark = SparkSession.builder
enableHiveSupport
getOrCreate
Example
:
PySpark
Code
for
a
Simple
ETL
Task
To
give
you
a
practical
insight
,
let's
go
through
a
simple
example
of
reading
,
transforming
,
and
writing
data
:
from
pyspark
.
sql
import
SparkSession
#
Create
Spark
session
spark
=
SparkSession
.
builder
\
.
appName
(
"
Walmart
Data
Engineer
Interview
Preparation
"
)
\
.
enableHiveSupport
(
)
\
.
getOrCreate
(
)
#
Read
data
from
a
Hive
table
df
=
spark
.
table
(
"
sample_database
.
sample_table
"
)
#
Perform
transformation
:
Add
a
new
column
with
transformed
data
df_transformed
=
df
.
withColumn
(
"
new_column
"
,
df
[
"
existing_column
"
]
*
2
)
#
Write
transformed
data
back
to
a
new
Hive
table
df_transformed
.
write
.
mode
(
"
overwrite
"
)
.
saveAsTable
(
"
sample_database
.
transformed_table
"
)
#
Create
Spark
session
\
.
"
)
\
.
(
)
\
.
(
)
#
Read
data
from
a
Hive
table
"
)
#
Perform
transformation
:
Add
a
new
column
with
transformed
data
)
#
Write
transformed
data
back
to
a
new
Hive
table
"
)
This
code
snippet
provides
a
basic
framework
for
reading
data
from
a
Hive
table
,
performing
a
transformation
,
and
writing
the
results
back
to
Hive
.
For
your
interview
,
it's
important
to
adapt
these
concepts
to
more
complex
scenarios
and
demonstrate
an
understanding
of
performance
considerations
and
best
practices
.
2
df = spark.read.format("csv").option("header", "true").schema(schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
StructField("name", StringType(), True
StructField("id", IntegerType(), True
schema = StructType
load("/path/to/csv/files
Scenario
3
:
Reading
CSV
Files
with
Specific
Schema
Problem
:
You
need
to
read
CSV
files
and
enforce
a
specific
schema
to
ensure
data
types
are
correct
.
Solution
:
from
pyspark
.
sql
.
types
import
StructType
,
StructField
,
IntegerType
,
StringType
schema
=
StructType
(
[
StructField
(
"
id
"
,
IntegerType
(
)
,
True
)
,
StructField
(
"
name
"
,
StringType
(
)
,
True
)
]
)
df
=
spark
.
read
.
format
(
"
csv
"
)
.
option
(
"
header
"
,
"
true
"
)
.
schema
(
schema
)
.
load
(
"
/
path
/
to
/
csv
/
files
/
"
)
(
[
)
,
)
]
)
)
.
/
"
)
Explanation
:
Specifying
a
schema
with
StructType
and
StructField
ensures
that
each
column
in
the
CSV
is
read
with
the
correct
data
type
,
preventing
data
type
issues
during
data
processing
.
The
option
(
"
header
"
,
"
true
"
)
indicates
that
the
first
line
of
files
defines
the
column
names
,
ensuring
columns
are
correctly
named
.
3
from pyspark.sql import SparkSession
appName("Walmart ETL Job
spark = SparkSession.builder
enableHiveSupport
config("spark.sql.warehouse.dir", "/user/hive/warehouse
getOrCreate
Problem
:
You
need
to
set
up
a
PySpark
environment
that
can
interact
with
a
Hive
database
for
batch
data
processing
.
Solution
:
from
pyspark
.
sql
import
SparkSession
spark
=
SparkSession
.
builder
\
.
appName
(
"
Walmart
ETL
Job
"
)
\
.
config
(
"
spark
.
sql
.
warehouse
.
dir
"
,
"
/
user
/
hive
/
warehouse
"
)
\
.
enableHiveSupport
(
)
\
.
getOrCreate
(
)
\
.
"
)
\
.
"
)
\
.
(
)
\
.
(
)
Explanation
:
SparkSession
:
The
entry
point
to
programming
Spark
with
the
Dataset
and
DataFrame
API
.
This
setup
initializes
a
SparkSession
with
configurations
tailored
for
Hive
interaction
.
appName
(
"
Walmart
ETL
Job
"
)
:
Names
the
application
,
making
it
easier
to
identify
in
the
Spark
web
UI
.
config
(
"
spark
.
sql
.
warehouse
.
dir
"
,
"
/
user
/
hive
/
warehouse
"
)
:
Specifies
the
directory
where
the
Hive
data
is
stored
,
ensuring
that
Spark
and
Hive
can
work
together
effectively
.
enableHiveSupport
(
)
:
Enables
support
for
Hive
features
,
including
the
ability
to
write
queries
using
HiveQL
and
access
Hive
tables
directly
.
getOrCreate
(
)
:
Returns
an
existing
SparkSession
if
there's
one
running
;
otherwise
,
it
creates
a
new
one
based
on
the
options
set
.
SparkContext
:
The
Spark
Context
(
SparkContext
)
is
the
main
entry
point
for
Spark
functionality
before
the
introduction
of
Spark
2
.
0
.
It
was
used
to
connect
to
the
Spark
execution
environment
,
manage
Spark
job
configurations
,
and
orchestrate
the
distribution
of
data
and
computations
across
the
Spark
cluster
.
When
you
start
a
Spark
application
,
a
SparkContext
is
created
to
enable
your
Spark
application
to
access
the
cluster
through
a
resource
manager
(
like
YARN
,
Mesos
,
or
Spark's
own
cluster
manager
)
.
4
df = spark.sql("SELECT * FROM your_hive_table
Scenario
1
:
Reading
Data
from
a
Hive
Table
Problem
:
You
need
to
read
data
from
a
Hive
table
for
further
processing
.
Solution
:
df
=
spark
.
sql
(
"
SELECT
*
FROM
your_hive_table
"
)
"
)
Explanation
:
Using
spark
.
sql
(
)
,
you
can
execute
SQL
queries
directly
on
Hive
tables
within
your
Spark
application
.
This
method
leverages
Spark's
ability
to
integrate
seamlessly
with
Hive
,
allowing
for
complex
queries
and
integration
into
your
ETL
pipelines
.
5
df = spark.read.parquet("/path/to/parquet/files
Scenario
2
:
Reading
Parquet
Files
Problem
:
You
have
a
directory
of
Parquet
files
that
you
need
to
read
into
a
DataFrame
.
Solution
:
df
=
spark
.
read
.
parquet
(
"
/
path
/
to
/
parquet
/
files
/
"
)
/
"
)
Explanation
:
spark
.
read
.
parquet
(
)
efficiently
reads
from
Parquet
files
,
a
columnar
storage
format
,
which
is
ideal
for
high
-
performance
data
processing
.
Spark's
built
-
in
support
for
Parquet
allows
for
automatic
schema
inference
and
pushdown
optimizations
,
improving
performance
and
reducing
I
/
O
.
6
df = spark.read.option("multiline", "true").json("/path/to/json/files
Scenario
4
:
Reading
JSON
Files
with
Options
Problem
:
Load
JSON
files
,
considering
multiline
JSON
records
.
Solution
:
df
=
spark
.
read
.
option
(
"
multiline
"
,
"
true
"
)
.
json
(
"
/
path
/
to
/
json
/
files
/
"
)
/
"
)
Explanation
:
JSON
files
can
sometimes
contain
multiline
records
.
Setting
the
multiline
option
to
true
enables
Spark
to
interpret
each
multiline
record
as
a
single
row
in
the
DataFrame
.
This
is
crucial
for
correctly
parsing
files
where
JSON
objects
are
formatted
over
multiple
lines
.
|