Slowly changing data (SCD) Type 2 operation into Delta tables in spark
Implementing Slowly Changing Dimension (SCD) Type 2 operations with Delta tables in Apache Spark involves handling historical data changes and maintaining a full history of records. Delta Lake, an open-source storage layer that brings ACID transactions to Apache Spark, provides features to manage SCD Type 2 operations effectively. Here’s how you can implement SCD Type 2 with Delta tables:
Prerequisites
- Install Delta Lake: Make sure you have Delta Lake integrated with your Spark environment.
- Delta Table Setup: Your Delta table should include columns to manage SCD Type 2, such as
valid_from
,valid_to
, andcurrent_flag
.
1. Schema Design for SCD Type 2
Design your Delta table schema to include the following columns:
- Business Key: A unique identifier for the dimension (e.g.,
id
). - Attribute Columns: The attributes you want to track (e.g.,
name
,address
). - Effective Date Columns: Columns to manage validity (e.g.,
valid_from
,valid_to
). - Current Flag: A flag to identify the current record (e.g.,
current_flag
).
from pyspark.sql.functions import col, current_date, lit, when
# Define schema for Delta table
schema = """
id STRING,
name STRING,
address STRING,
valid_from DATE,
valid_to DATE,
current_flag BOOLEAN
"""
2. Create Delta Table
Create the initial Delta table with the appropriate schema:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SCD Type 2 Example") \
.getOrCreate()
# Create initial Delta table
spark.sql("""
CREATE TABLE IF NOT EXISTS delta_table (
id STRING,
name STRING,
address STRING,
valid_from DATE,
valid_to DATE,
current_flag BOOLEAN
)
USING DELTA
""")
3. Merge Operation for SCD Type 2
To handle SCD Type 2 changes, use the Delta Lake merge
operation. This operation helps update existing records, insert new records, and manage historical data.
Sample DataFrames
Assume you have two DataFrames: new_data
(incoming data) and delta_table
(existing Delta table).
# Sample new data DataFrame
new_data = spark.createDataFrame([
("1", "John Doe", "123 Elm St"),
("2", "Jane Smith", "456 Oak St")
], ["id", "name", "address"])
# Load Delta table
delta_table = DeltaTable.forPath(spark, "path/to/delta_table")
Merge Operation
- Perform the Merge: Update existing records, insert new records, and close out old records.
from delta.tables import DeltaTable
# Perform merge operation
delta_table.alias("old_data") \
.merge(
source=new_data.alias("new_data"),
condition="old_data.id = new_data.id AND old_data.current_flag = true",
whenMatchedUpdate={
"name": "new_data.name",
"address": "new_data.address",
"valid_to": "current_date()",
"current_flag": "false"
},
whenNotMatchedInsert={
"id": "new_data.id",
"name": "new_data.name",
"address": "new_data.address",
"valid_from": "current_date()",
"valid_to": "lit(None)",
"current_flag": "true"
}
) \
.execute()
Close Out Old Records: Set valid_to
to the current date and current_flag
to false
for old records.
# Update old records to close them out
delta_table.alias("old_data") \
.update(
condition="old_data.current_flag = true AND old_data.id IN (SELECT id FROM new_data)",
set={
"valid_to": current_date(),
"current_flag": lit(False)
}
)
4. Verify and Optimize
After performing the merge, verify the results by querying the Delta table:
# Verify the results
df = spark.read.format("delta").load("path/to/delta_table")
df.show(truncate=False)
Optimize the Delta table to compact small files and improve query performance:
# Optimize the Delta table
delta_table.optimize()
Summary
- Schema Design: Include
valid_from
,valid_to
, andcurrent_flag
. - Create Delta Table: Use Delta Lake to create the table.
- Merge Operation: Use Delta Lake’s
merge
operation to handle SCD Type 2 logic. - Update Old Records: Manage historical data effectively.
- Verify and Optimize: Check the results and optimize for performance.
By following these steps, you can efficiently manage SCD Type 2 operations in Delta tables with Apache Spark.
After performing the merge, verify the results by querying the Delta table: