Pyspark For Data Engineering

Introduction

Have you ever wondered how companies like Netflix, Amazon, or Flipkart process massive amounts of data every day? That’s where big data tools come in. One powerful tool that makes working with big data easier—especially for Python lovers—is PySpark.

What is PySpark?

PySpark is the Python API for Apache Spark. Apache Spark is an open-source big data processing engine known for its speed, scalability, and ease of use. PySpark allows you to write Spark jobs using Python instead of Scala or Java, making big data accessible to Python developers.

Why Use PySpark?

  • Fast Processing: PySpark works in memory, making it faster than traditional tools like Hadoop.
  • Handles Large Datasets: Can process terabytes or even petabytes of data efficiently.
  • Supports DataFrames: Similar to Pandas, but designed for distributed data.
  • Easy to Scale: Runs on your laptop or on a massive cluster with ease.
  • Integrates with Other Tools: Works with Hadoop, Hive, SQL, and more.

Key Components of PySpark

  • RDD (Resilient Distributed Dataset): The core data structure, like a big list spread across multiple computers.
  • DataFrame: Similar to a table or Pandas DataFrame, optimized for large-scale data operations.
  • Spark SQL: Enables SQL-style querying over your big data.
  • MLlib: A built-in library for scalable machine learning tasks.
  • Spark Streaming: Handles real-time data processing like live tweets or logs.

What Can You Do With PySpark?

With PySpark, you can:

  • Clean and analyze large datasets.
  • Run SQL-like queries on distributed data.
  • Build machine learning models on massive datasets.
  • Process real-time streaming data for analytics and alerting.

Example Use Case

Imagine a company has 1 billion customer records stored in different systems. PySpark helps:

  • Load and merge all that data efficiently.
  • Clean and prepare it for analysis.
  • Identify useful patterns, like buying habits.
  • Build recommendation engines based on user behavior.

Getting Started with PySpark

To try PySpark locally, install it using pip:

pip install pyspark

You can also use it in platforms like Databricks, Jupyter Notebooks, or Google Colab.

Final Thoughts

If you’re familiar with Python and want to explore the world of big data, PySpark is the perfect place to start. It gives you the power of distributed computing with the simplicity of Python, making big data fun and approachable.

Functions: Reading & Writing Data

In data engineering workflows, reading from and writing to storage systems is a fundamental task. Apache Spark provides flexible APIs to handle a variety of formats — CSV, JSON, Parquet, JDBC, and more — across both DataFrames and RDDs.

Reading Data in Spark

Spark allows you to read data from various sources using the spark.read API:


# Reading common file formats
spark.read.csv("path/to/file")           # Read CSV file
spark.read.parquet("path/to/file")       # Read Parquet file
spark.read.json("path/to/file")          # Read JSON file
spark.read.orc("path/to/file")           # Read ORC file

# Reading from a relational database using JDBC
spark.read.jdbc(url, table, properties)  # Read from JDBC source

These return a DataFrame, which you can then process using SQL-like operations.

Writing Data in Spark

Spark also provides powerful APIs to write the processed data back to storage:


# Writing DataFrames to various formats
df.write.csv("path/to/output")            # Write as CSV
df.write.parquet("path/to/output")        # Write as Parquet
df.write.json("path/to/output")           # Write as JSON
df.write.orc("path/to/output")            # Write as ORC
df.write.text("path/to/output")           # Write plain text
df.write.jdbc(url, table, properties)     # Write to a relational database
df.write.saveAsTable("table_name")        # Save as a managed table (Hive-compatible)

Use mode("overwrite") or mode("append") if needed to control how data is written.

Writing RDDs (Resilient Distributed Datasets)

Though DataFrames are preferred in most use cases, you might still use RDDs for low-level transformations or custom serialization:


# Save RDD as different file types
rdd.saveAsTextFile("path/to/output")          # Save RDD as text
rdd.saveAsObjectFile("path/to/output")        # Save serialized objects
rdd.saveAsSequenceFile("path/to/output")      # Save in Hadoop Sequence File format (key-value)

Data Transformations

select

Definition: Selects a specific column (or multiple columns) from the DataFrame.

Syntax: df.select("col")

Explanation: Retrieves only the column named "col" from the DataFrame df.

drop

Definition: Removes the specified column from the DataFrame.

Syntax: df.drop("col")

Explanation: Drops the column "col" from the DataFrame df.

withColumnRenamed

Definition: Renames an existing column in the DataFrame.

Syntax: df.withColumnRenamed("old", "new")

Explanation: Renames the column "old" to "new" in the DataFrame df.

withColumn

Definition: Adds a new column or updates an existing column by applying a transformation.

Syntax: df.withColumn("year", year(df["date_col"]))

Explanation: Creates a new column "year" by extracting the year from the "date_col" column.

filter

Definition: Filters rows based on the given condition.

Syntax: df.filter(df["col"] > 100)

Explanation: Returns rows where the value in "col" is greater than 100.

where

Definition: An alternative to filter() for row filtering.

Syntax: df.where(df["col"] == "value")

Explanation: Returns rows where "col" is equal to "value".

orderBy

Definition: Sorts the DataFrame by one or more columns.

Syntax: df.orderBy("col", ascending=False)

Explanation: Sorts the DataFrame by "col" in descending order.

fillna

Definition: Fills null or missing values with a specified default value.

Syntax: df.fillna("default")

Explanation: Replaces all null values in the DataFrame with "default".

dropna

Definition: Removes rows that contain any null values.

Syntax: df.dropna()

Explanation: Drops all rows where at least one column contains a null value.

dropDuplicates

Definition: Removes duplicate rows based on specified columns.

Syntax: df.dropDuplicates(["col1", "col2"])

Explanation: Removes rows that have duplicate values in both "col1" and "col2".

distinct

Definition: Returns unique values from the selected column.

Syntax: df.select("col").distinct()

Explanation: Selects only distinct (non-duplicate) values from the "col" column.

pivot

Definition: Rotates rows into columns based on a pivot column.

Syntax:

df.groupBy("col").pivot("category").sum("amount")

Explanation: Groups by "col", and for each unique "category", creates a column with the sum of "amount".

stack (Unpivoting)

Definition: Converts columns into rows (unpivot).

Syntax:

df.selectExpr("id", "stack(2, 'col1', col1, 'col2', col2) as (category, value)")

Explanation: Unpivots col1 and col2 into two rows with "category" and "value" columns.

Aggregation

groupBy + agg with sum and alias

Definition: Groups the data by a column and applies aggregation functions.

Syntax: df.groupBy("col").agg(sum("amount").alias("total"))

Explanation: Groups the DataFrame by "col" and calculates the total sum of "amount", renaming the result column as "total".

groupBy + multiple aggregation functions

Definition: Applies multiple aggregation functions on grouped data.

Syntax:

df.groupBy("col").agg(
    avg("amount"),
    sum("amount"),
    min("amount"),
    max("amount"),
    count("amount")
)

Explanation: Groups the data by "col" and calculates the average, sum, minimum, maximum, and count of the "amount" column.

groupBy + collect_list and collect_set

Definition: Aggregates data by collecting values into a list or a set.

Syntax: df.groupBy("col").agg(collect_list("amount"), collect_set("amount"))

Explanation:

  • collect_list("amount") gathers all "amount" values into a list (including duplicates).
  • collect_set("amount") gathers unique "amount" values into a set (removing duplicates), grouped by "col".

Joins

Inner Join

Definition: Returns rows with matching keys in both DataFrames.

Syntax: df1.join(df2, "id", "inner")

Explanation: Performs an inner join on the "id" column, keeping only rows that exist in both df1 and df2.

Left Join

Definition: Returns all rows from the left DataFrame and matched rows from the right.

Syntax: df1.join(df2, "id", "left")

Explanation: Performs a left outer join on the "id" column, returning all rows from df1 and matching rows from df2.

Right Join

Definition: Returns all rows from the right DataFrame and matched rows from the left.

Syntax: df1.join(df2, "id", "right")

Explanation: Performs a right outer join on the "id" column, returning all rows from df2 and matching rows from df1.

Full Join

Definition: Returns all rows when there is a match in either left or right DataFrame.

Syntax: df1.join(df2, "id", "full")

Explanation: Performs a full outer join on the "id" column, including all rows from both df1 and df2, with nulls where no match is found.

Union

Definition: Combines the rows of two DataFrames with the same schema.

Syntax: df1.union(df2)

Explanation: Merges all rows from df2 into df1. Both must have the same column structure.

Cross Join

Definition: Returns the Cartesian product of two DataFrames.

Syntax: df1.crossJoin(df2)

Explanation: Joins each row of df1 with every row of df2, producing all possible row combinations.

Date Functions

current_date

Definition: Returns the current system date (no time).

Syntax:

df = df.withColumn("current_date", current_date())

current_timestamp

Definition: Returns the current date and time.

Syntax:

df = df.withColumn("current_timestamp", current_timestamp())

date_add & date_sub

Definition: Adds or subtracts days from a date column.

Syntax:

df = df.withColumn("date_plus_5", date_add("date_column", 5)) \
       .withColumn("date_minus_5", date_sub("date_column", 5))

datediff

Definition: Calculates difference in days between two dates.

Syntax:

df = df.withColumn("days_diff", datediff("end_date", "start_date"))

year, month, dayofmonth, dayofweek

Definition: Extracts parts from a date.

Syntax:

df = df.withColumn("year", year("date_column")) \
       .withColumn("month", month("date_column")) \
       .withColumn("day", dayofmonth("date_column")) \
       .withColumn("weekday", dayofweek("date_column"))

hour, minute, second

Definition: Extracts parts from a timestamp.

Syntax:

df = df.withColumn("hour", hour("timestamp_column")) \
       .withColumn("minute", minute("timestamp_column")) \
       .withColumn("second", second("timestamp_column"))

to_date

Definition: Converts string to date using a format.

Syntax:

df = df.withColumn("as_date", to_date("string_col", "yyyy-MM-dd"))

to_timestamp

Definition: Converts string to timestamp using a format.

Syntax:

df = df.withColumn("as_timestamp", to_timestamp("string_col", "yyyy-MM-dd HH:mm:ss"))

date_format

Definition: Formats a date/timestamp as a string.

Syntax:

df = df.withColumn("formatted", date_format("date_column", "dd-MM-yyyy"))

add_months

Definition: Adds a specified number of months to a date.

Syntax:

df = df.withColumn("plus_2_months", add_months("date_column", 2))

last_day

Definition: Returns the last day of the month for a given date.

Syntax:

df = df.withColumn("month_end", last_day("date_column"))

next_day

Definition: Returns the next occurrence of a specific weekday after a date.

Syntax:

df = df.withColumn("next_sunday", next_day("date_column", "Sunday"))

trunc

Definition: Truncates a date to the beginning of a specified unit (e.g., month).

Syntax:

df = df.withColumn("trunc_month", trunc("date_column", "MM"))

Window Functions

Window Specification

Definition: Defines how rows are grouped and ordered for window operations.

Syntax: window_spec = Window.partitionBy("dept").orderBy("salary")

Explanation: Creates a window specification that groups rows by "dept" and orders them by "salary".

rank

Definition: Assigns a rank to each row within a partition, with gaps for ties.

Syntax: df.withColumn("rank", rank().over(window_spec))

Explanation: Adds a "rank" column where ranks are assigned based on "salary" within each "dept", skipping numbers for ties.

lead

Definition: Retrieves the value from the next row in the window.

Syntax: df.withColumn("lead", lead("col").over(Window.orderBy("col2")))

Explanation: Adds a "lead" column with the next row's "col" value, ordered by "col2".

lag

Definition: Retrieves the value from the previous row in the window.

Syntax: df.withColumn("lag", lag("col").over(Window.orderBy("col2")))

Explanation: Adds a "lag" column with the previous row's "col" value, ordered by "col2".

dense_rank

Definition: Similar to rank but without gaps between ranks.

Syntax: df.withColumn("dense_rank", dense_rank().over(window_spec))

Explanation: Adds a "dense_rank" column where rows with the same value get the same rank, and the next rank is incremented by 1.

row_number

Definition: Assigns a unique sequential number to each row within a window.

Syntax: df.withColumn("row_number", row_number().over(window_spec))

Explanation: Adds a "row_number" column that provides a unique number for each row ordered by "salary" within each "dept".

RDD Functions

map

Definition: Applies a function to each element in the RDD.

Syntax:

rdd.map(lambda x: (x[0], x[1] * 2))

Explanation: Multiplies the second element of each tuple by 2.

flatMap

Definition: Similar to map, but flattens the results.

Syntax:

rdd.flatMap(lambda x: (x[0], x[1] * 2))

Explanation: Returns each item in the resulting tuple as a separate record.

reduceByKey

Definition: Merges values with the same key using a specified function.

Syntax:

rdd.reduceByKey(lambda a, b: a + b)

Explanation: Sums values for each key (e.g., "a" → 1 + 3 = 4).

groupByKey

Definition: Groups values by key.

Syntax:

rdd.groupByKey().mapValues(list)

Explanation: Groups all values with the same key into a list.

sortByKey

Definition: Sorts the RDD by key.

Syntax:

rdd.sortByKey(ascending=True)

Explanation: Sorts the key-value pairs in ascending order of keys.

collect

Definition: Returns all elements of the RDD to the driver.

Syntax:

rdd.collect()

Explanation: Retrieves all RDD data as a list.

take

Definition: Returns the first n elements of the RDD.

Syntax:

rdd.take(5)

Explanation: Returns the first 5 records from the RDD.

count

Definition: Returns the number of elements in the RDD.

Syntax:

rdd.count()

Explanation: Counts the total number of records in the RDD.

df.rdd

Definition: Converts a DataFrame to an RDD.

Syntax:

df.rdd

Explanation: Accesses the underlying RDD of a DataFrame.

toDF

Definition: Converts an RDD to a DataFrame.

Syntax:

rdd.toDF(schema=["col1", "col2"])

Explanation: Creates a DataFrame from the RDD with specified column names.

String Functions

substring

Definition: Extracts a substring from a column.

Syntax:

df.withColumn("name_substring", substring(col("name"), 1, 3))

Explanation: Extracts the first 3 characters from the "name" column.

instr

Definition: Finds the position of a substring.

Syntax:

df.withColumn("address_instr", instr(col("address"), "Apple"))

Explanation: Returns the position of the word "Apple" in "address".

trim

Definition: Removes leading and trailing whitespaces.

Syntax:

df.withColumn("address_trim", trim(col("address")))

Explanation: Trims whitespaces from the "address" column.

concat

Definition: Concatenates multiple columns or strings.

Syntax:

df.withColumn("full_name", concat(col("name"), lit(" - "), col("address")))

Explanation: Combines "name" and "address" with " - " in between.

upper

Definition: Converts strings to uppercase.

Syntax:

df.withColumn("name_upper", upper(col("name")))

Explanation: Converts the "name" column to uppercase letters.

regexp_replace

Definition: Replaces substrings matching a regex pattern.

Syntax:

df.withColumn("clean_address", regexp_replace(col("address"), "[0-9]", ""))

Explanation: Removes all digits from the "address" column.

concat_ws

Definition: Concatenates strings with a separator.

Syntax:

df.withColumn("address_concat", concat_ws(", ", col("address"), col("birthdate")))

Explanation: Combines "address" and "birthdate" using ", " as a separator.

coalesce

Definition: Returns the first non-null value.

Syntax:

df.withColumn("score_non_null", coalesce(col("score"), lit(0)))

Explanation: Replaces null values in "score" with 0.

isnan

Definition: Checks for NaN (Not a Number) values.

Syntax:

df.withColumn("is_nan_score", isnan(col("score")))

Explanation: Returns True if "score" is NaN.

JSON & Array Functions

from_json

Definition: Parses a JSON string column into a struct using a schema.

Syntax:

from_json(df["json_col"], schema)

Explanation: Converts the JSON string in "json_col" into a structured column based on the provided schema.

to_json

Definition: Converts a struct or map column into a JSON string.

Syntax:

to_json(df["struct_col"])

Explanation: Serializes the contents of "struct_col" into a JSON string.

explode

Definition: Creates a new row for each element in an array column.

Syntax:

df.withColumn("exploded", explode(df["array_col"]))

Explanation: Flattens the array in "array_col" into multiple rows—one per array element.

array_contains

Definition: Checks if an array column contains a specific value.

Syntax:

df.withColumn("contains_value", array_contains(df["array_col"], "value"))

Explanation: Returns True if "array_col" contains "value".

size

Definition: Returns the number of elements in an array column.

Syntax:

df.withColumn("array_size", size(df["array_col"]))

Explanation: Adds a column showing the length of each array in "array_col".

Debugging & Execution Plans

Explain:

The explain() function in PySpark is used to display the execution plan of a DataFrame. This plan shows the logical and physical steps that Spark will take to execute the operations defined on a DataFrame.

df.explain() Usage

  • df.explain() or df.explain("simple"): By default, this shows only the physical plan, which is the optimized plan Spark will use to execute the DataFrame operations.
  • df.explain(mode="extended"): This provides a detailed plan including:
    • Parsed Logical Plan: Initial representation of the query.
    • Analyzed Logical Plan: Logical plan with resolved references.
    • Optimized Logical Plan: Optimized version of the logical plan after applying optimization rules.
    • Physical Plan: Actual steps Spark will take to execute the operations.
  • df.explain(mode="formatted"): Provides the same detailed information as "extended" but in a more human-readable format.
  • df.explain(mode="cost"): This mode shows the cost-based optimization plan (if available). It provides insights into the cost of different operations based on data statistics.

Important Notes

  • Reading Execution Plans: When analyzing the execution plan, we should read from bottom to top. The bottom represents the last operations performed (usually the actual data reads), while the top shows the final result or output stage.
  • The physical plan is usually the most crucial for performance tuning, as it tells us what Spark will actually do (like SortMergeJoin, BroadcastHashJoin, Exchange for shuffles, etc.).

Other Useful Commands

df.show(truncate=False)
df.printSchema()

Performance Optimization

cache

Definition: Caches the DataFrame in memory for faster access.

Syntax:

df.cache()

Explanation: Stores the DataFrame in memory on first computation to speed up future actions.

persist

Definition: Persists the DataFrame with a specified storage level (default is memory and disk).

Syntax:

df.persist()

Explanation: Similar to cache(), but allows custom storage levels like disk-only, memory-only, etc.

unpersist

Definition: Removes a cached/persisted DataFrame from memory/storage.

Syntax:

df.unpersist()

Explanation: Frees up memory or disk by removing previously cached/persisted data.

repartition

Definition: Increases or decreases the number of partitions with a full shuffle.

Syntax:

df.repartition(4)

Explanation: Redistributes the DataFrame into 4 partitions, useful for parallelism.

coalesce

Definition: Reduces the number of partitions without a full shuffle.

Syntax:

df.coalesce(1)

Explanation: Merges partitions into fewer (e.g., 1) for optimized output writing.

partitionBy

Definition: Writes the DataFrame to disk partitioned by column(s).

Syntax:

df.write.partitionBy("col").parquet("path")

Explanation: Saves data in folders by unique values of "col" to improve query performance.

bucketBy

Definition: Buckets and optionally sorts the data before saving as a table.

Syntax:

df.write.bucketBy(4, "col").sortBy("col").saveAsTable("table_name")

Explanation: Buckets data into 4 files by "col" and sorts it to optimize joins and reads.

broadcast

Definition: Broadcasts a small DataFrame to all worker nodes to optimize joins.

Syntax:

broadcast(df2)

Explanation: Broadcasts df2 to avoid shuffling during joins with a large DataFrame.

Share This :
Scroll to Top