Spark Tutorial
Index
Introduction
Apache Spark is a distributed data processing engine designed for fast and efficient large-scale data processing.
Resilient Distributed Datasets (RDDs)
RDDs are the fundamental data abstraction in Spark. They represent distributed collections of data partitioned across the cluster and support parallel processing.
RDDs are immutable, resilient to failures, and support transformations (e.g., map, filter, reduce) and actions (e.g., collect, count, save).
Directed Acyclic Graph (DAG) Scheduler
Spark utilizes Directed Acyclic Graphs (DAGs) for data processing. When applying transformations to an RDD, Spark constructs a DAG that depicts the series of operations to be performed. This enables Spark to optimize the execution plan and execute operations with greater efficiency.
Architecture
Apache Spark follows a master-slave architecture, consisting of the Driver, Cluster Manager, Worker Nodes, and Executors. Here's a breakdown of each component and key concepts in the Spark architecture:
1. Driver
The driver node is the master node in a Spark application. It is responsible for the entire Spark orchestration, such as monitoring job execution, scheduling future jobs, and handling failures. When a Spark application is submitted, the driver creates a Spark session, converts transformations into an optimized physical execution plan, and divides the tasks into jobs and stages using a Directed Acyclic Graph (DAG). It then submits these jobs to the cluster manager.
2. Cluster Manager
The cluster manager is responsible for managing resources and allocating CPU and memory across all nodes by coordinating with the driver node. There are a few types of cluster managers: YARN, Mesos, Standalone (Spark built-in and default), and Kubernetes (open source). The most used one is YARN, which stands for Yet Another Resource Negotiator.
3. Worker Node
Worker Nodes (or simply Workers) are the slave nodes that perform the actual data processing in Spark. Each Worker node runs on an individual machine (physical or virtual) in a cluster.
Each Worker node has one or more Executors:
Executors: Executors are responsible for executing the code sent by the Driver, as well as storing data in memory or disk storage (caching).
4. Job
- A Job is a sequence of Stages triggered by an action (e.g.,
count(),collect()) in a Spark application. - Each action in Spark triggers a separate job. The Driver breaks down a job into stages based on shuffle boundaries (operations like
reduceByKey()that require repartitioning of data).
5. Stages
- A Stage is a collection of tasks executed as part of a single job. Stages are determined by the DAG Scheduler in Spark.
- There are two types of stages:
- ShuffleMapStage: Performs a shuffle operation, where data needs to be redistributed across different nodes.
- ResultStage: Computes the final result for an action.
- Note: Each Spark application typically has
n + 1stages, wherenis the number of shuffle transformations. The extra stage handles the final result and basic operations.
6. Tasks
- A Task is the smallest unit of execution in Spark and represents a single unit of work that will be sent to an Executor.
- A task performs operations on a portion of data.
- Tasks are executed in parallel across multiple Executors, allowing for distributed processing.
- Each task processes one partition of data, but tasks are not called partitions. Multiple tasks can run in parallel on different worker nodes.
Distribution and Processing Methods in Spark
Distribution
Spark distributes data across a cluster using Resilient Distributed Datasets (RDDs), which are partitioned across multiple nodes. Each partition of an RDD can be processed independently on different nodes, enabling parallel and efficient processing.
Processing
Spark utilizes Directed Acyclic Graphs (DAGs) for data processing. When transformations are applied to an RDD, Spark builds a DAG that represents the sequence of operations to be executed. This DAG allows Spark to:
- Optimize the physical execution plan
- Execute operations more efficiently
- Minimize unnecessary computations
Memory Types in Spark:
- JVM Heap Memory:
- This is the memory allocated to the Spark application running on the Java Virtual Machine (JVM). It is split into 3 main components: Spark Memory, User Memory, and Reserved memory.
- Overhead Memory:
- Overhead memory is additional memory used for tasks outside of heap memory, such as network buffers and other JVM-level operations.
- The overhead memory is typically 10% of the executor memory or 384 MB, whichever is larger.
JVM Heap Memory Breakdown:
- Reserved Memory:
- Reserved for Spark internal operations, typically around 300 MB.
- This memory is used for basic Spark infrastructure and is not available for general execution or storage tasks.
- Spark Memory (60% of total memory):
- Divided into execution memory and storage memory pools. Spark dynamically adjusts memory between the execution and storage memory pools based on task demands, allowing for more efficient use of resources.
- Execution Memory Pool:
- Used for computation tasks such as joins, aggregations, and shuffles. Memory allocated for holding intermediate data during transformations.
- Storage Memory Pool:
- Used to cache and persist data (RDDs, DataFrames, Datasets) in memory.
- User Memory (40% of total memory):
- Used for operations not managed by Spark, such as:
- User-defined functions (UDFs)
- Broadcast variables
- Objects created by the user that aren’t part of Spark’s internal memory management.
- Also handles RDD conversion operations that are not optimized by Spark.
- Used for operations not managed by Spark, such as:
Additional Notes:
- If the executor memory is less than 1.5 times the reserved memory, Spark will fail with an error: "Please use a larger heap size".
Key Concepts
RDD (Resilient Distributed Dataset):
RDDs are the fundamental data abstraction in Spark. They represent distributed collections of data partitioned across the cluster and support parallel processing. RDDs are immutable, resilient to failures, and support transformations (e.g., map, filter, reduce) and actions (e.g., collect, count, save).
They provide low-level APIs in Scala, Java, and Python for manipulating distributed data, offering fine-grained control over transformations and actions. RDDs can be created from external data sources (e.g., files, databases) or by transforming existing RDDs using operations like map, filter, and reduce.
Disadvantages of RDDs:
- Manual Optimization: Each RDD requires manual optimization, which can be time-consuming and complex.
- Lack of Schema Inference: Unlike Datasets and DataFrames, RDDs do not automatically infer the schema of ingested data, necessitating manual specification.
- RDD doesn’t support R language.
From an existing collection (parallelize):
from pyspark import SparkContext
sc = SparkContext("local", "Example App")
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
From an external file (textFile):
rdd = sc.textFile("path/to/your/file.txt")
RDD ➝ DataFrame:
df = rdd.toDF(["col1", "col2"])
DataFrame:
A DataFrame is a distributed collection of data organized into named columns, similar to a table in a relational database. It is an abstraction built on top of Resilient Distributed Datasets (RDDs) in Apache Spark, making it suitable for large-scale data processing. DataFrames are optimized for performance and can handle both structured and semi-structured data. They also provide a domain-specific language (DSL) API to manipulate distributed data. (The DSL API is like a specialized "language" that makes it easier for you to query and process large datasets across multiple computers, especially in tools like Spark, Flink, etc.)
A DataFrame can be created from various external sources like CSV files, Parquet, JSON, JDBC, or from existing RDDs or other DataFrames. It is well-suited for performing SQL queries and data manipulation tasks such as filtering, grouping, joining, and aggregating data.
One disadvantage of DataFrames is the lack of compile-time type safety. Since DataFrames in Spark are dynamically typed, errors related to data types might not be caught until runtime, which could lead to runtime exceptions if there is a mismatch in expected data types.
From a collection (using createDataFrame):
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example App").getOrCreate()
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
df.show()
From a CSV file (using read.csv):
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df.show()
DataFrame ➝ RDD:
rdd = df.rdd
Dataset:
A Dataset is an abstraction in Apache Spark that is like a DataFrame. It is a distributed collection of data organized into named columns. A Dataset is strongly-typed, meaning it provides compile-time type safety and offers the benefits of both RDDs (Resilient Distributed Datasets) and DataFrames.
A Dataset can be created from various external data sources (such as CSV, JSON, Parquet, etc.), or from an existing RDD or DataFrame.
One disadvantage of Datasets is that they are only supported in Java and Scala, as Python does not support compile-time type safety. Because of this, Datasets are not available in Python.
Clusters:
Spark clusters can generally be categorized into interactive clusters and job clusters.
Interactive Clusters:
- High concurrency: Optimized for resource allocation and management across multiple notebooks or jobs. It's suitable for scenarios where multiple users or applications share the same cluster resources.
- Standard: A balance between performance and resource utilization that's used for general-purpose data processing and analytics tasks. It's suitable for a moderate level of concurrency.
- Single node: A single virtual machine (VM) node that's used for small-scale development, testing, or debugging. It's not suitable for large production workloads or scenarios that require parallel processing.
Job Clusters:
The cluster will be created once a job starts and terminated upon job completion automatically. Designed for scheduled or batch-oriented jobs in production. Cannot be restarted once terminated, and it saves costs.
Garbage Collector in Spark
Garbage Collector (GC) is responsible for automatically cleaning up unused or unnecessary objects in memory to free up space. In Spark, GC helps manage memory efficiently by clearing out objects that are no longer needed during task execution.
Speculative Execution:
Speculative execution is a feature in Apache Spark that identifies tasks running slower than expected in a job. It launches a duplicate copy of the slow task on a different worker node. Spark will accept the result from whichever task (either the original or the duplicate) finishes first and will kill the other one, ensuring that the overall job completes faster.
spark = SparkSession.builder \
.appName("SpeculativeExecutionExample") \
.config("spark.speculation", "true") \
.config("spark.speculation.quantile", "0.75") # Percentage of tasks considered "slow" (e.g., 75%) \
.config("spark.speculation.multiplier", "1.5") # Multiplier for what counts as "slow" (1.5x the median) \
.getOrCreate()
Types of Partitioning:
Hash Partitioning:
Distributes data evenly across partitions using a hash function applied to one or more columns.
df.write.partitionBy("gender").mode("overwrite").parquet("/path/to/output")
Range Partitioning:
Range partitioning in PySpark divides data into partitions based on specified column range values, useful for querying specific ranges of values within each partition.
df.write.partitionBy("age").mode("overwrite").parquet("/path/to/output")
List Partitioning:
List partitioning in PySpark divides data based on specific values in a designated column, suitable for partitioning by discrete categories or values.
df.write.partitionBy("department").mode("overwrite").parquet("/path/to/output")
Custom Partitioning:
Allows defining custom partitioning logic using an expression.
df.write.partitionBy("custom_expression").mode("overwrite").parquet("/path/to/output")
Partition Functions:
repartition(numPartitions, cols):
Repartitioning helps distribute unevenly distributed data more evenly across partitions, enabling better parallel processing. Repartitioning is used to increase or decrease the number of partitions, and it also involves shuffling the data as part of the redistribution process. Shuffling is computationally expensive, so it’s better to avoid it when not necessary.
For instance, when reading large files into Spark, repartitioning the data will distribute it evenly across the defined number of partitions.
Parameters:
- numPartitions: Specifies the desired number of partitions for the DataFrame.
- *cols: Optional parameter to specify one or more columns to partition by. If provided, the data will be partitioned based on the values in these columns.
Syntax: DataFrame.repartition(numPartitions, *cols)
coalesce(numPartitions):
Coalescing reduces the number of partitions in the DataFrame by merging existing partitions. Unlike repartition(), coalesce() does not perform a full shuffle of the data, which can lead to better performance for reducing the number of partitions. However, it does not provide control over the distribution of data across partitions.
Parameters:
- numPartitions: Specifies the desired number of partitions for the DataFrame after coalescing.
Syntax: DataFrame.coalesce(numPartitions)
getNumPartitions():
In the context of Apache Spark, getNumPartitions() is a method used on a Resilient Distributed Dataset (RDD) or a DataFrame to retrieve the number of partitions.
RDD Usage
Accumulator:
In Databricks, an accumulator is a shared variable used to aggregate values, such as sums or counts, from tasks running on worker nodes back to the driver program. They provide a way for tasks to incrementally update a shared variable (the accumulator) safely during distributed computations.
Sum:
data = [(1,), (2,), (3,), (4,), (5,)]
df = spark.createDataFrame(data, columns)
# Create an accumulator for sum
sum_acc = spark.sparkContext.accumulator(0)
# Define a function to add numbers to the accumulator
def add_to_sum(row):
global sum_acc
sum_acc += row["Number"]
df.rdd.foreach(add_to_sum)
# Print the total sum
print(f"Total Sum: {sum_acc.value}")
Total Sum: 15
Count of Empty:
data = [("This is a sentence."), # Not empty
(None), # Empty
("Another sentence here."), # Not empty
(""), # Empty (empty string)
("Third sentence."), # Not empty
(None)] # Empty
df = spark.createDataFrame(data, columns)
# Create an accumulator for counting empty lines
empty_line_acc = spark.sparkContext.accumulator(0)
# Define a function to check and count empty lines
def count_empty(row):
global empty_line_acc
if row["Sentence"] is None or row["Sentence"] == "":
empty_line_acc += 1
# Apply the function using RDD
df.rdd.foreach(count_empty)
print(f"Total Empty Lines: {empty_line_acc.value}")
Total Empty Lines: 3
Performance Optimization Techniques:
Predicate Pushdown:
When working with large datasets, applying filters directly in the data source using a WHERE clause in the source query helps prevent Spark from loading unnecessary records.
df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/test")
.option("dbtable", "sales")
.option("user", "username")
.option("password", "password")
.load()
df = df.filter("date > '2023-01-01'") # Predicate Pushdown
Project Pushdown:
Retrieving only the required columns instead of using SELECT * helps reduce resource consumption, avoid unnecessary data transfer, and improve performance.
df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/test")
.option("dbtable", "sales")
.option("user", "username")
.option("password", "password")
.load()
df = df.select("sales_amount", "transaction_date") # Project Pushdown
Broadcast Join:
When one dataset is significantly smaller than the other, broadcasting the smaller dataset across all nodes reduces shuffling and improves join performance. If AQE is enabled and the dataset size is less than or equal to 10 MB, it will automatically use a broadcast join by default.
For example, when joining a large sales dataset with a small product details table, broadcasting the product table to all nodes ensures that each node has access to the product information without shuffling the large sales dataset, speeding up the join.
large_df = spark.read.format("csv").load("large_sales_data.csv")
small_df = spark.read.format("csv").load("product_details.csv")
broadcast_df = broadcast(small_df)
joined_df = large_df.join(broadcast_df, "product_id")
Broadcast Variable:
A broadcast variable is used to distribute a variable across all nodes, reducing I/O operations and enhancing performance.
For example, if you're processing large datasets but need a small reference dataset, like a mapping of state codes to state names, broadcasting the reference dataset ensures faster computation without sending it repeatedly.
broadcast_var = sc.broadcast(reference_data) # Broadcasting a variable
Repartition:
Repartitioning is used to increase or decrease the number of partitions by redistributing data more evenly across them. This process also involves shuffling the data, which is computationally expensive. Therefore, it is better to avoid repartitioning when it is not necessary.
For instance, when reading large files into Spark, repartitioning the data will distribute it evenly across the defined number of partitions.
df = df.repartition(10) # Repartition the dataframe into 10 partitions
Parameters:
- numPartitions: Specifies the desired number of partitions for the DataFrame.
- *cols: Optional parameter to specify one or more columns to partition by. If provided, the data will be partitioned based on the values in these columns.
Syntax: DataFrame.repartition(numPartitions, *cols)
Bucketing:
Bucketing works based on hash keys. It distributes the data evenly into a specified number of buckets by applying a hash function to one or more columns. The hash value determines which bucket the data will be placed in. Each bucket is stored in a separate folder, and within each folder, data is stored in files.
Bucketing helps optimize joins and queries on the bucketed columns, as data with the same bucketed column value will be placed together in the same bucket, minimizing the need for shuffling during joins.
df.write.bucketBy(10, "customer_id").sortBy("customer_id").saveAsTable("customer_data")
Bucketing is especially helpful when you know you will be performing operations on a specific column frequently (such as joins).
Optimize (OPTIMIZE):
The OPTIMIZE command is used to consolidate small files into larger files, commonly around 1 GB, although this may vary depending on the cluster configuration. This process reduces I/O operations and improves performance for Delta Lake tables. It is particularly useful when there are numerous small files, which can increase I/O overhead and degrade performance. The OPTIMIZE command effectively addresses these small file issues.
OPTIMIZE delta.`/path/to/delta/table`
GZIP and Snappy Compression:
GZIP provides high compression and is used for archival data, while Snappy offers lower compression compared to GZIP but enables faster data processing tasks.
Snappy compression speeds up read operations for real-time analytics, while GZIP is used for long-term storage of rarely accessed data to save on storage costs.
# Using Snappy compression for fast read/write operations
df.write.option("compression", "snappy").parquet("/path/to/output")
# Using GZIP compression for long-term storage
df.write.option("compression", "gzip").parquet("/path/to/output")
Vacuum (Delta Lake):
The VACUUM command is used to delete outdated and removed files, reclaiming storage space and enhancing performance. By default, it operates with a retention period of 168 hours (7 days). This retention period can be modified as per requirements.
For example, in a data lake with frequently updated transaction data, running VACUUM periodically ensures that deleted data does not clutter the storage, reducing overhead and improving query performance.
VACUUM delta.`/path/to/delta/table` RETAIN 168 HOURS
DataFrame Checkpoint:
DataFrame Checkpointing is the process of materializing a DataFrame’s result and stores the result on disk and removes the lineage, ensuring that further transformations won’t need to recompute the earlier DataFrame stages. Checkpointing is especially beneficial for use-cases where a particular DataFrame is iterated multiple times. It is useful in machine learning algorithms or iterative workflows where the transformation plan can grow exponentially, leading to large memory usage.
Example of Applying Checkpoint:
df = df.checkpoint()
Types of Checkpoints:
- Eager Checkpoint: Saves the checkpoint immediately when called. This can be triggered by using .checkpoint(eager=True).
- Lazy Checkpoint: The checkpoint is deferred until the DataFrame is actually used in an action, such as .collect() or .count().
Although cache and checkpoint might seem similar, they serve different purposes:
- Cache: Stores the DataFrame in memory and retains the data lineage (dependencies for recomputation).
Cache vs. Persist:
Cache and persist are both methods used to store intermediate results in memory to speed up subsequent operations. Both methods help reuse the results of expensive computations without recalculating them.
Cache:
The default storage level is MEMORY_AND_DISK. It stores the data in memory and, if memory is insufficient, spills it to disk.
df.cache()
Persist:
The persist() method is more flexible than cache(), allowing you to specify different storage levels for storing data.
- MEMORY_ONLY: Stores data only in memory.
- MEMORY_AND_DISK: Stores data in memory, and spills to disk if there’s insufficient memory.
- DISK_ONLY: Stores data only on disk.
- MEMORY_ONLY_SER: Stores data in memory in a serialized format (space-efficient).
- MEMORY_AND_DISK_SER: Stores data in a serialized format in memory and spills to disk when needed.
from pyspark import StorageLevel df.persist(StorageLevel.MEMORY_ONLY)
Note: Cached data is deleted after the execution, while persisted data is stored and can be reused across multiple sessions or notebooks. For best practices, you should call unpersist() after you're done using the persisted data to free up resources.
df.unpersist()
Serialized or Serialization:
Serialization is the process of transforming an object into a binary format, which takes up less memory and is more efficient for storage and transmission.
PartitionBy:
The PARTITION BY command is used to apply partitions based on specific columns, improving query performance by limiting the data scan during query execution.
For example, partitioning customer data by region ensures that only the relevant partition is read when querying for customers in a specific region, reducing I/O and speeding up query performance.
df.write.partitionBy("region").parquet("/path/to/output")
Salting:
Salting is a technique used to distribute data more evenly across partitions. It involves adding a 'salt' value to the key column to create a unique key for the same data, ensuring that the data is spread across multiple partitions or processing nodes. By creating unique keys, salting prevents excessive clustering of data under a single key or partition, reducing processing time and improving parallelism.
Code Example:
from pyspark.sql import functions as F
# Original DataFrame with CustomerID
df = spark.table("customer_purchases")
# Adding a random salt column to balance partitioning
salted_df = df.withColumn("salt", F.expr("floor(rand() * 10)")) # Adding a salt range from 0 to 9
# Creating a salted key for partitioning
salted_df = salted_df.withColumn("salted_key", F.concat(F.col("CustomerID"), F.col("salt")))
# Now use the salted_key for partitioning or joining to reduce data skew
Use Case:
Especially useful for large distributed joins and aggregations on skewed columns.
Pros:
- Distributes data evenly across partitions, preventing data skew.
- Helps balance the processing load in operations on distributed systems.
Cons:
- Adds complexity because the salted keys need to be "unsalted" (e.g., aggregated back to the original keys) in some queries.
- Increases the data size slightly due to the extra "salt" column.
Unsalt:
unsalted_df = salted_df.withColumn("unsalted_key", F.substring(F.col("salted_key"), 1, F.length(F.col("salted_key")) - 1))
# Drop the 'salt' and 'salted_key' columns if no longer needed
unsalted_df = unsalted_df.drop("salt", "salted_key")
Z-Ordering (Delta Lake):
Z-Ordering is an advanced optimization technique that builds upon the OPTIMIZE command. The OPTIMIZE command consolidates small files into larger ones (typically around 1 GB) to reduce I/O overhead caused by numerous small files, but it does not sort the data. Without sorting, shuffle operations may still occur when processing the data, which can degrade performance.
To overcome this, Z-Order is used. Z-Ordering sorts the data as part of the optimization process, reducing both shuffle and I/O. This improves performance for queries that involve range filters or aggregations on the specified columns.
The OPTIMIZE command can be followed by ZORDER BY to ensure data is not only optimized but also sorted for efficient query execution:
OPTIMIZE delta.`/path/to/customer_data` ZORDER BY (customer_id)
Note: The SORT BY command is used to temporarily sort data during transformations or at the DataFrame level. Z-Order is used to permanently sort data at the file level, optimizing it for query performance in Delta Lake tables.
Data Skew Hint:
This is a feature in Spark SQL that provides a hint indicating that a column contains skewed data, prompting Spark to optimize partitioning accordingly. This hint is available only in Spark SQL and does not apply to PySpark or the DataFrame API. This hint helps Spark optimize its query plan and adjust its processing strategy to handle skewed data more effectively.
To manage data skewness in PySpark or DataFrame API, techniques such as salting and bucketing can be used.
Syntax:
SELECT /*+ SKEW(t1.join_key) */ * FROM table1 t1 JOIN table2 t2 ON t1.join_key = t2.join_key
Sort-Merge Join (equi-joins):
Sort merge join is particularly useful for processing large datasets. It operates by sorting the join keys from both tables and then merging the data to perform equi-joins efficiently.
result_df = df1.join(df2, on="id", how="inner")
By default, Sort-Merge Join is enabled for all joins. By setting spark.sql.join.preferSortMergeJoin to false, we tell Spark to avoid Sort-Merge Join where possible. It only supports equi-joins (=).
Spark automatically chooses Sort-Merge Join if the dataset sizes exceed the broadcast join threshold (default: 10 MB).
If you want to ensure the data is explicitly prepared for the Sort-Merge Join (e.g., for performance tuning or to avoid redundant processing), you can manually repartition and sort the datasets as follows:
df1_sorted = df1.repartition("key").sortWithinPartitions("key")
df2_sorted = df2.repartition("key").sortWithinPartitions("key")
# Perform the join
result = df1_sorted.join(df2_sorted, on="key", how="inner")
Shuffle Hash Join
Shuffle Hash Join is useful when Sort-Merge Join cannot be performed effectively due to data skew and expensive sorting. Shuffle Hash Join redistributes the data across partitions based on the hash value and can perform both equi-joins and non-equi joins.
result_df = df1.join(df2.hint("shuffle_hash"), on="id", how="inner")
Shuffle Partition:
Adjusting the shuffle partition size is important for operations like joins or aggregations, especially in large-scale data processing. Tuning the partition size based on cluster size and workload minimizes shuffle overhead.
For example, adjusting shuffle partitions helps balance the data load across workers and reduces the time spent on shuffling data between nodes.
spark.conf.set("spark.sql.shuffle.partitions", "200") # Adjust shuffle partitions
AQE & Catalyst Optimizer:
Catalyst Optimizer: This is the primary query optimizer in Spark SQL. It performs a series of rule-based and cost-based optimizations on a query's execution plan before the query is executed. These optimizations include predicate pushdown, column pruning, and other logical plan optimizations.
Adaptive Query Execution (AQE): This optimization technique is applied during the execution phase of a query. It adapts and dynamically optimizes the query execution plan based on runtime statistics. For example, AQE can optimize join strategies, reduce shuffle partitions, and handle skewed data better, making the execution more efficient.
Both work together to ensure efficient query execution in Spark, with Catalyst handling static optimizations and AQE addressing runtime variations and data characteristics.
spark.conf.set("spark.sql.adaptive.enabled", "true")
Advanced Features
Managed and Unmanaged tables
In Azure Data Factory, Managed tables and Unmanaged tables refer to how data and metadata are stored and handled.
Managed Tables:
Spark manages both the data and metadata. When you drop the table, Spark deletes both the table and the data. The data is stored in Spark's default storage.
Example:
CREATE TABLE my_managed_table USING DELTA;
Unmanaged Tables:
Spark only manages the metadata (table structure), while the data is stored externally (e.g., in Azure Data Lake). Dropping the table only removes metadata, not the data. This is ideal for using data that is shared or needs to be stored in a custom location.
Example:
CREATE TABLE my_unmanaged_table USING DELTA LOCATION '/path/to/external/storage';
Change Data Feed(CDF)
Change Data Feed (CDF) is a concept used to capture and track changes in a data source and deliver those changes to a target system for further processing, analysis, or storage. CDF is commonly used in data pipelines to handle real-time or incremental updates in data without having to reload the entire dataset.
Practical Benefits:
- Efficiency: Processes only relevant data, reducing resource usage.
- Flexibility: Supports batch and streaming workloads.
- Real-Time Updates: Facilitates near-real-time data synchronization with downstream systems.
Structured Streaming
Structured Streaming provides a powerful, high-level API for building scalable and fault-tolerant stream processing applications. It's designed to handle real-time data and supports various data sources and sinks, including Kafka, files, and many more. Structured Streaming benefits from Spark's optimizations, so you can treat real-time data like batch data and apply the same operations you would in a batch processing scenario. The built-in support for fault tolerance and checkpointing ensures data consistency, even in case of failures.
Delta Live Table
A managed service that automates the creation and management of data pipelines, including data quality enforcement, transformation, and orchestration. It is more comprehensive, handling both data ingestion and transformations in a structured, declarative way. It supports both batch and streaming data processing and enforces data quality rules (such as constraints, expectations, and error handling) directly within the pipeline, making it easier to ensure the correctness of the ingested data. It provides a more integrated solution for managing the entire data pipeline and is more suitable for larger, more intricate data workflows. DLT ensures the benefits of Delta Lake (such as ACID transactions, schema enforcement, etc.) are utilized throughout the pipeline.
Key Features:
- DLT automatically creates checkpoints, so if a pipeline fails with errors, it can restart from the last saved point without losing progress and handle issues as they arise.
- You can trigger pipelines when new files are dropped into a designated location. You can define data quality rules, and DLT will automatically apply and enforce them as data is ingested.
- DLT allows schema changes (e.g., adding or modifying columns) to happen automatically as new data arrives without breaking the pipeline.
- DLT takes care of optimizing your data pipelines and managing the compute clusters required to run them. You can integrate and process data from on-premises systems.
Limitations:
- DLT only supports programming in Python and SQL for pipeline creation. Each DLT notebook should only use one programming language (either Python or SQL).
- DLT notebooks cannot be run interactively, meaning it cannot execute cells one by one as you can with regular notebooks. Magic commands (like %sql or %fs) are not supported in DLT notebooks.
- DLT pipelines require careful configuration, such as cluster sizing and resource allocations. They are supported in premium workspaces with at least 20 cores, which may lead to higher costs for processing large data.
Unity Catalog
Unity Catalog is a centralized metadata repository for Databricks that provides robust data governance, lineage, and discovery capabilities. It is designed to manage data across multiple workspaces, users, and applications, offering a unified approach to data management.
Unity Catalog empowers administrators to manage access at various levels, including workspaces, schemas, and tables, while efficiently handling metadata such as file names, types, and authorship. Additionally, it enables organizations to enforce policies and rules that ensure proper data usage, delivering a scalable solution for consistency, control, and comprehensive governance across the Databricks ecosystem.
Unity Catalog Setup:
In a real-time scenario, the admin, along with the developer, will set up the Unity Catalog during a meeting. The steps include:
- Creating a new user with Global Admin permissions.
- Adding the new user to the subscription with Owner permissions.
- Setting up Data Lake Storage (with a metastore container) and the Databricks service.
- Configuring the Access Connector for Azure Databricks.
- Adding the Azure Databricks Connector to Azure Data Lake with the Blob Contributor role.
Next Steps:
- Collect the Azure Data Lake container name and resource ID.
- In Azure Databricks, navigate to Manage Account > Data > Create Metastore, then assign the appropriate workspaces.
Once created, you will see four default catalogs: hive_metastore, main, samples, and system. Additionally, you can create your own catalog.
Additional Information:
- Catalogs can contain multiple Databricks workspaces, allowing resource sharing between workspaces.
- A catalog provides a layer of abstraction and control, enabling administrators to manage and share data resources across multiple workspaces.
Auto Loader
It automates the detection and processing of new files, reducing the need for manual intervention. It can handle both new and existing file directories by storing file metadata in a scalable key-value store at the checkpoint location (RocksDB), ensuring that files are processed exactly once. It supports streaming and batch. However, data quality enforcement (such as constraints and rules) needs to be handled separately. It is suitable for simple scenarios where you need to process new files as they arrive.
RocksDB is an embedded, high-performance, key-value store designed for fast storage, especially on flash drives and SSDs. It was developed by Facebook as a fork of LevelDB, enhancing it with advanced features and optimizations for high-speed, low-latency workloads.
Supported Formats:
- JSON
- CSV
- Parquet
- Avro
- ORC
- Text
- Binary files
Advantages & Disadvantages:
Advantages:
- In-memory Processing: Spark processes data in-memory, which makes it significantly faster than Hadoop's disk-based processing model. This makes Spark more suitable for iterative algorithms, interactive analytics, and real-time processing.
- Unified Framework: Spark provides a unified framework for batch processing, interactive queries, streaming, and machine learning, whereas Hadoop's MapReduce is primarily designed for batch processing.
- Expressive API: Spark offers a more expressive API compared to Hadoop's MapReduce, making it easier to write complex data processing pipelines. Spark also provides high-level libraries for SQL, streaming, machine learning, and graph processing.
- Fault Tolerance: Spark provides built-in fault tolerance through RDDs, allowing it to recover from failures automatically without needing to write custom recovery logic.
Disadvantages:
- Spark's in-memory processing uses a lot of memory.
- Serialization (converting Spark application into byte code) can be a performance issue for Spark.
- The
UPDATEstatement with subqueries in Spark SQL doesn’t support.