Primer on Spark Join strategy

--

How joins are performed in Spark

The Join operation is the most commonly used transformation in Spark. Behind the scene, spark uses some algorithms to decide how to implement the join. The algorithms are defined as strategies in the Physical planning phase of the catalyst engine.

In this blog, we will see how spark decides which strategy is used for Join.

Factors affecting Spark Join strategy

Data Size
Join Hint
Type of join
Sortability of rows

Data Size

Spark picks a join strategy that avoids shuffle and sort operations as they are expensive. Therefore, hash-based join strategies are preferred if data can be broadcasted.

Type of Join

Equi Join — “=”
Non Equi Join — “<,>,≥, ≤”

As the Non-Equi-Join needs to make comparisons to a range of unspecific values, the nested loop is required. Therefore, Spark only supports Broadcast nested loop join and Cartesian product join these join types.

Join hints

If end-user want more control over the join strategy selection then they supply Join Hints like /*+ BROADCAST(table name)*/*

BROADCAST | BROADCASTJOIN | MAPJOIN
MERGE | SHUFFLE_MERGE | MERGEJOIN
SHUFFLE_HASH
SHUFFLE_REPLICATE_NL

Order of join hints in if multiple hints are passed, highest to lowest.

BROADCAST > MERGE > SHUFFLE_HASH > SHUFFLE_REPLICATE_NL

Spark Join strategies

Decision flow for join strategy

Broadcast Hash Join

In Broadcast hash join, a hash table is created for the join key of the smaller dataset, and then this hashtable is broadcasted to all nodes where the bigger dataset’s partitions lie. Then from the larger dataset lookup is performed on the hashtable for the presence of the key. Since it is a hashtable lookup, only the equality join condition is supported. Spark provides a configuration to decide the size threshold for the smaller dataset. spark.sql.autoBroadcastJoinThreshold=10MB . While setting this configuration value we need to keep in mind that the hashtable needs to fit in the driver, as it is cached there and then broadcasted to nodes. To disable this join set the conf value to -1. This strategy is preferred when at least one side of the dataset is small enough to collect to the driver and then broadcast to each executor.

BHJ neither causes a shuffle nor any sorting operation and hence it is the fastest join strategy in most of the scenarios.

Shuffle Hash Join

As the name suggests this join first shuffles the data and then applies the hash. In the shuffle phase, datasets with the same join keys are moved to the same executor, and then on the executor node, a hash table is created for the smaller dataset, and then from the bigger dataset lookup is performed.

partition → shuffle → Hashing → In memory Join → Result aggregation

Since this is an expensive join (due to shuffle), spark provides a configuration to switch it on/off spark.sql.join.preferSortMergeJoin=false/true . This join strategy is preferred when at least one of the datasets is small enough for building a hash table (smaller than the product of the broadcast threshold and the number of shuffle partitions). Also, the smaller dataset should be at least three times smaller, or else sort-based join will be used.

Shuffle Sort Merge Join

This is a three-stage join approach. First datasets with the same join keys are moved to the same executor, and then on the executor node, the dataset partitions on the node are sorted based on the join keys and then merged based on the join keys. Since sorting is applied the keys need to be sortable.

For merging the data Spark uses two pointers one in each table and moves the pointer to match the keys. Since the dataset is sorted the merge or the join operation is stopped for an element as soon as a key mismatch is encountered. In case we have multiple keys with the same value (duplicates) it creates groups and does a cartesian product.

Broadcast Nested Loop Join

This is the fallback join strategy for Spark. If no join condition and hint type are not mentioned, this will be used. Broadcast Nested Loop join works by broadcasting one of the entire datasets and performing a nested loop to join the data. So essentially every record from dataset 1 is attempted to join with every record from dataset 2.

for record_d1 in dataset_1:
for record_d2 in dataset_2:
do check for join

When we give broadcast hints on a non-equi join, the spark query optimizer always chooses this strategy.

It supports both Equi-Joins and Non-Equi-Joins. The implementation is optimized when:

  • The Left side is broadcasted in the right outer Join.
  • The Right side is broadcasted in a left outer, left semi, and left anti-Join.
  • either side in an inner-like Join.

In other cases, we need to scan the data multiple times, which can be rather slow.

Cartesian Join

For an equi-join type, if there are no join keys mentioned, Cartesian Join will be selected. To use this we need to set spark.sql.crossJoin.enabled=true. In this join, the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested loop join. Join hint SHUFFLE_REPLICATE_NL

# Necessary import
from pyspark.sql.functions import broadcast

# Get spark Session object
spark = SparkSession.builder.appName('Join Strategy').getOrCreate()

# Prepare data
integer_values1 = [10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50]
df1 = spark.createDataFrame([(value) for value in integer_values1], ["id1"])

integer_values1 = [30, 20, 40, 50]
df2 = spark.createDataFrame([(value) for value in integer_values2], ["id2"])

# Set conf
spark.conf.set("spark.sql.autoBroadcastJoinThreshold0", 10*1024*1024)
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

# Helper function
def show_plan(joined_df):
execution_plan = joined_df._jdf.queryExecution().executedPlan()
print(execution_plan.toString())
return

# Broadcast Join
joined_df = df1.join(broadcast(df2), df1.id1 == df2.id2)
show_plan(joined_df)


spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold0", 2)

# Shuffle hash Join
joined_df = df1.join(df2.hint("SHUFFLE_HASH"), df1.id1 == df2.id2)
show_plan(joined_df)

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
# Disable broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold0", -1)

# SM Join
joined_df = df1.join(df2.hint("MERGE"), df1.id1 == df2.id2)
show_plan(joined_df)

# Set Broadcast Join to default 10Mb
spark.conf.set("spark.sql.autoBroadcastJoinThreshold0", 10*1024*1024)

# Broadcast Nested lookup Join (Non equi)
joined_df = df1.join(broadcast(df2), df1.id1 >= df2.id2)
show_plan(joined_df)

# Cartesian Join (Non equi no broadcast)
joined_df = df1.join(df2.hint("SHUFFLE_REPLICATE_NL"), df1.id1 >= df2.id2)
show_plan(joined_df)

Note: Broadcast Nested Loop Join doesn’t result in an all-to-all join as opposed to Cartesian Join. The number of output partitions is always equal to the product of the number of partitions of the input data sets

Symmetric Hash Join: Stream-stream join in spark structured streaming uses this join strategy by default. Currently, this is the best we have for streaming. If we use the foreachBatch we can influence Spark to use other join strategies in streaming cases.

Note:

We can convert a non-equi-join to equi-join for range-based queries.

points = [2, 5, 13, 21, 27, 30, 45,]
lines = [("line1", 1, 5), ("line2", 10, 18), ("line3", 25, 34),] # Max 10 unit line

points_df = spark.createDataFrame(points, "int").toDF("point")
lines_df = spark.createDataFrame(lines, ["string", "int", "int"]).toDF("id", "start", "end")

# Approach 1 brute force, (Cartesiann Join)
result = points_df.join(lines_df, [points_df.point >= lines_df.start, points_df.point <= lines_df.end])

# Approach 2, creating range buckets of 10
range_points_df = points_df.withColumn("point_bin", f.floor(points_df.point/10))
range_lines_df = lines_df.withColumn("line_bins", f.array_distinct(f.array([f.floor(lines_df.start/10), f.floor(lines_df.end/10)])))
range_lines_df = range_lines_df.select("id", "start", "end", f.explode(f.col("line_bins")).alias("line_bin"))

result = range_points_df.join(range_lines_df,
[range_points_df.point_bin == range_lines_df.line_bin,
range_points_df.point >= range_lines_df.start,
range_points_df.point <= range_lines_df.end])

# Databricks has in build range join hint
result = points_df.join(lines_df.hint("range_join", 10),
[points_df.point >= lines_df.start,
points_df.point <= lines_df.end])

That is all how Spark internally decides which strategy is used in joins. Happy joins!!

Join FAUN: Website 💻|Podcast 🎙️|Twitter 🐦|Facebook 👥|Instagram 📷|Facebook Group 🗣️|Linkedin Group 💬| Slack 📱|Cloud Native News 📰|More.

If this post was helpful, please click the clap 👏 button below a few times to show your support for the author 👇

--

--