spark

Pinjari Akbar
3 min readDec 15, 2024

--

Databricks Workflows in Real-Time Use with Spark

Databricks Workflows

Databricks Workflows provide a robust orchestration tool to manage ETL pipelines, machine learning (ML) tasks, and data engineering workflows in real time. They support event-driven pipelines and scheduling with rich integration into the Databricks Lakehouse platform.

Here’s how to create, manage, and utilize Databricks Workflows with PySpark code examples.

Features of Databricks Workflows

Task Orchestration:

  • Chain multiple tasks, such as notebooks, Python scripts, or Spark jobs, into a single workflow.
  • Manage dependencies between tasks.

Event-Driven Workflows:

  • Trigger workflows based on events like file arrivals in cloud storage (S3, Azure Blob, etc.).

Scalable Execution:

Automatically scale clusters for parallel execution of tasks.

Error Handling:

  • Includes retry mechanisms and task failure notification.

Built-In Monitoring:

  • Visualize execution with a UI to monitor task status and logs.

Example Use Case: Real-Time ETL Pipeline

Scenario:

  • Data arrives in Azure Data Lake or AWS S3 in near real-time.
  • You need to process this data using Spark, perform transformations, and load it into a Delta Lake table.

Step-by-Step Implementation

1. Create a Workflow

  • In Databricks, navigate to the Workflows tab.
  • Click Create Workflow and define the name of your workflow.

2. Define Workflow Tasks

Each task can be a notebook, JAR, Python script, or SQL command.

3. Use Event Triggers

Configure the workflow to trigger on events such as a file upload in cloud storage. Example:

  • AWS S3 Event: Configure an S3 bucket to send notifications to an AWS Lambda function that invokes your Databricks Workflow.
  • Azure Event Grid: Configure Event Grid to trigger on blob creation and call the Databricks REST API.

Real-Time ETL with PySpark

Here’s an example pipeline to process JSON data uploaded to a cloud bucket.

PySpark Code for Data Processing

  1. Read Real-Time Data from Cloud Storage:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
.appName("RealTimeETL") \
.getOrCreate()

# Define the cloud storage path
source_path = "s3a://your-bucket-name/raw-data/"

# Read JSON data
df = spark.readStream \
.format("json") \
.schema("id STRING, timestamp STRING, value DOUBLE") \
.load(source_path)

Perform Transformations

# Transform data (example: filter out rows with null values)
transformed_df = df.filter(col("value").isNotNull())

Write Data to Delta Lake:

# Delta table path
delta_table_path = "s3a://your-bucket-name/processed-data/"

# Write transformed data to Delta Lake
transformed_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3a://your-bucket-name/checkpoints/") \
.start(delta_table_path)

Schedule the Workflow

In the Databricks Workflows UI:

  • Set the schedule (e.g., every 5 minutes).
  • Configure retries and alerts for failure.

Trigger-Based Real-Time Execution

Use event-driven triggers:

  1. Configure your cloud storage (e.g., S3 or Azure Blob) to send a notification to Databricks.
  2. Use the Databricks REST API to programmatically start the workflow:
import requests

url = "https://<databricks-instance>/api/2.0/jobs/run-now"
headers = {"Authorization": f"Bearer <access-token>"}

payload = {
"job_id": "<job-id>"
}

response = requests.post(url, headers=headers, json=payload)
print(response.json())

Complete Workflow Example

Workflow Setup in JSON

You can use the Databricks REST API to define workflows programmatically.

{
"name": "RealTimeETLWorkflow",
"tasks": [
{
"task_key": "LoadRawData",
"notebook_task": {
"notebook_path": "/Users/your_user/load_raw_data"
},
"cluster_spec": {
"existing_cluster_id": "<cluster-id>"
}
},
{
"task_key": "TransformData",
"depends_on": [
{"task_key": "LoadRawData"}
],
"notebook_task": {
"notebook_path": "/Users/your_user/transform_data"
},
"cluster_spec": {
"new_cluster": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
}
}
},
{
"task_key": "WriteToDelta",
"depends_on": [
{"task_key": "TransformData"}
],
"notebook_task": {
"notebook_path": "/Users/your_user/write_to_delta"
}
}
]
}

Submit the workflow using the Databricks REST API:

curl -X POST https://<databricks-instance>/api/2.1/jobs/create \
-H "Authorization: Bearer <access-token>" \
-H "Content-Type: application/json" \
-d @workflow.json

Monitoring and Debugging

  • Task Logs: Check logs for each task in the workflow UI.
  • Metrics: Use Spark UI to monitor job performance.
  • Retries: Automatically retries tasks upon failure, if configured

Benefits of Using Databricks Workflows

  1. End-to-End Management: Automate data ingestion, transformation, and output.
  2. Scalability: Dynamically scale clusters based on workload.
  3. Flexibility: Easily integrate Python, SQL, and Spark tasks.
  4. Robust Error Handling: Automatically retries failed tasks.
  5. Real-Time Insights: Continuous processing with Spark Structured Streaming.

--

--

No responses yet