Slowly changing data (SCD) Type 2 operation into Delta tables in spark

Pinjari Akbar
3 min readJul 27, 2024

--

Implementing Slowly Changing Dimension (SCD) Type 2 operations with Delta tables in Apache Spark involves handling historical data changes and maintaining a full history of records. Delta Lake, an open-source storage layer that brings ACID transactions to Apache Spark, provides features to manage SCD Type 2 operations effectively. Here’s how you can implement SCD Type 2 with Delta tables:

Prerequisites

  1. Install Delta Lake: Make sure you have Delta Lake integrated with your Spark environment.
  2. Delta Table Setup: Your Delta table should include columns to manage SCD Type 2, such as valid_from, valid_to, and current_flag.

1. Schema Design for SCD Type 2

Design your Delta table schema to include the following columns:

  • Business Key: A unique identifier for the dimension (e.g., id).
  • Attribute Columns: The attributes you want to track (e.g., name, address).
  • Effective Date Columns: Columns to manage validity (e.g., valid_from, valid_to).
  • Current Flag: A flag to identify the current record (e.g., current_flag).
from pyspark.sql.functions import col, current_date, lit, when

# Define schema for Delta table
schema = """
id STRING,
name STRING,
address STRING,
valid_from DATE,
valid_to DATE,
current_flag BOOLEAN
"""

2. Create Delta Table

Create the initial Delta table with the appropriate schema:

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("SCD Type 2 Example") \
.getOrCreate()

# Create initial Delta table
spark.sql("""
CREATE TABLE IF NOT EXISTS delta_table (
id STRING,
name STRING,
address STRING,
valid_from DATE,
valid_to DATE,
current_flag BOOLEAN
)
USING DELTA
""")

3. Merge Operation for SCD Type 2

To handle SCD Type 2 changes, use the Delta Lake merge operation. This operation helps update existing records, insert new records, and manage historical data.

Sample DataFrames

Assume you have two DataFrames: new_data (incoming data) and delta_table (existing Delta table).

# Sample new data DataFrame
new_data = spark.createDataFrame([
("1", "John Doe", "123 Elm St"),
("2", "Jane Smith", "456 Oak St")
], ["id", "name", "address"])

# Load Delta table
delta_table = DeltaTable.forPath(spark, "path/to/delta_table")

Merge Operation

  1. Perform the Merge: Update existing records, insert new records, and close out old records.
from delta.tables import DeltaTable

# Perform merge operation
delta_table.alias("old_data") \
.merge(
source=new_data.alias("new_data"),
condition="old_data.id = new_data.id AND old_data.current_flag = true",
whenMatchedUpdate={
"name": "new_data.name",
"address": "new_data.address",
"valid_to": "current_date()",
"current_flag": "false"
},
whenNotMatchedInsert={
"id": "new_data.id",
"name": "new_data.name",
"address": "new_data.address",
"valid_from": "current_date()",
"valid_to": "lit(None)",
"current_flag": "true"
}
) \
.execute()

Close Out Old Records: Set valid_to to the current date and current_flag to false for old records.

# Update old records to close them out
delta_table.alias("old_data") \
.update(
condition="old_data.current_flag = true AND old_data.id IN (SELECT id FROM new_data)",
set={
"valid_to": current_date(),
"current_flag": lit(False)
}
)

4. Verify and Optimize

After performing the merge, verify the results by querying the Delta table:

# Verify the results
df = spark.read.format("delta").load("path/to/delta_table")
df.show(truncate=False)

Optimize the Delta table to compact small files and improve query performance:

# Optimize the Delta table
delta_table.optimize()

Summary

  • Schema Design: Include valid_from, valid_to, and current_flag.
  • Create Delta Table: Use Delta Lake to create the table.
  • Merge Operation: Use Delta Lake’s merge operation to handle SCD Type 2 logic.
  • Update Old Records: Manage historical data effectively.
  • Verify and Optimize: Check the results and optimize for performance.

By following these steps, you can efficiently manage SCD Type 2 operations in Delta tables with Apache Spark.

After performing the merge, verify the results by querying the Delta table:

--

--

No responses yet