What is broadcast join, how to perform broadcast in pyspark ?

Pinjari Akbar
3 min readNov 18, 2023

--

A broadcast join is a specific type of join optimization used in distributed computing frameworks like Apache Spark, and it’s designed to improve the efficiency of joining large and small DataFrames.

In a typical join operation, data from two DataFrames is matched based on a specified condition, and the result is a new DataFrame. However, when one of the DataFrames is significantly smaller than the other, broadcasting that smaller DataFrame to all nodes in the cluster can be more efficient than the traditional shuffle-based join.

Here’s a more detailed explanation of a broadcast join:

Large DataFrame and Small DataFrame:

  • The large DataFrame is the one that contains a substantial amount of data.
  • The small DataFrame is the one with significantly fewer rows or a smaller size.

1.Traditional Join:

  • In a traditional join, each node in the cluster needs to shuffle and exchange data related to the join key across the network. This can be computationally expensive, especially if the large DataFrame is shuffled.

2.Broadcast Join:

  • Instead of shuffling the entire large DataFrame, the small DataFrame (broadcast DataFrame) is broadcasted to all nodes in the cluster.
  • The broadcast DataFrame is replicated across all nodes, reducing the need for data movement across the network.

3.Efficiency Gains:

  • Since the smaller DataFrame is available on each node, the join operation can be performed locally on each node without the need for extensive data movement.
  • This reduces the amount of network traffic and can significantly improve the performance of the join, especially when the small DataFrame fits in the memory of each node.

4.Use Cases:

  • Broadcast joins are most effective when one of the DataFrames is small enough to fit in the memory of each worker node.
  • Examples of small DataFrames include lookup tables, reference tables, or DataFrames resulting from aggregations.

5.Syntax in PySpark:

  • In PySpark, the broadcast function is used to explicitly specify that a DataFrame should be broadcasted during a join. This is seen in your code where broadcast is used before joining.

Here’s a simplified example in PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

spark = SparkSession.builder.appName("BroadcastJoinExample").getOrCreate()

large_df = ...
small_df = ...

result_df = large_df.join(broadcast(small_df), "join_key", "inner")

In this example, broadcast(small_df) indicates that small_df should be broadcasted during the join operation.

Remember that while broadcast joins can be highly efficient in certain scenarios, they are not suitable for all situations, particularly when both DataFrames are large. The decision to use a broadcast join depends on the size of the DataFrames, available memory, and the nature of the join operation.

one more example broadcast with multiple join:

final_df=df_races.join(broadcast(df_sprint_results),df_races.race_Id==df_sprint_results.race_Id,"left")\
.join(broadcast(df_qualifying),df_races.race_Id==df_qualifying.race_Id,"left")\
.join(broadcast(df_results),df_races.race_Id==df_results.raceId,"left")\
.join(broadcast(df_driver_standings),df_races.race_Id==df_driver_standings.race_Id,"left")\
.join(broadcast(df_constructor_standings),df_races.race_Id==df_constructor_standings.raceId,"left")\
.join(broadcast(df1_constructor_results),df_races.race_Id==df1_constructor_results.race_Id,"left")\
.join(broadcast(df_pit_stops),df_races.race_Id==df_pit_stops.race_Id,"left")\
.join(broadcast(df_lap_times),df_races.race_Id==df_lap_times.race_Id,"left")\
.join(broadcast(df1_circuits),df_races.circuit_Id==df1_circuits.circuit_Id,"left")\
.join(broadcast(df_seasons),df_races.year==df_seasons.year,"left")
final_df.display(truncate=False)

Output:

--

--

No responses yet