23 Spark Join Strategies and Performance Tuning
Spark Join Strategies and Performance Tuning
Topic focus: performance tuning for joins in Spark, including partitioning, join strategies, and practical optimization techniques tested in clusters.
Context: different join types (inner, left, cross, full) and how Spark internally chooses a strategy when joining two DataFrames/tables.
Key takeaway: three main internal join strategies and how to influence their usage for better performance: Shuffle Hash Join, Sort Merge Join, and Broadcast Join. Also introduces bucketing as an advanced technique for big-big joins.
Internal join strategies in Spark
Spark uses three main strategies for joining datasets:
Shuffle Hash Join (also called Shuffle Hash Join)
Sort Merge Join
Broadcast Join
You can steer Spark toward one strategy using configuration:
spark.sql.join.preferSortMergeJoin (boolean): if true, Spark prefers Sort Merge Join; if false, Spark may choose Shuffle Hash Join where appropriate. In the speaker’s terms: setting this to false makes Spark use Shuffle Hash Join; setting it to true enables/uses Sort Merge Join.
Broadcast join is chosen automatically when one side is small enough to broadcast, controlled by the threshold spark.sql.autoBroadcastJoinThreshold (default: 10 MB). It can be overridden by explicit hints or by disabling the threshold.
Shuffle Hash Join (SHJ)
How it works:
Spark shuffles both inputs so that rows with the same join key end up in the same partition (or executor).
After shuffling, it hashes the keys of the smaller dataset to build an in-memory hash table (RAM) of the join keys.
The join is performed by probing this hash table for matching keys from the larger dataset.
No sorting is performed during this process.
When to use SHJ:
Effective when there is a clearly smaller table to hash into memory (the “probe” side is small enough to fit in RAM).
Suitable when one table is large and the other is small; the smaller table is hashed in memory on each partition/executor after shuffling.
Important nuance from the transcript:
The smaller table is hashed (loaded into RAM) to speed up lookups; larger table data is shuffled more heavily, but actual in-memory work focuses on hashing the smaller side.
SHJ does not involve a sort step; it relies on the in-memory hash indices for fast lookups.
Practical note:
If the smaller table does not fit in memory, SHJ can fail or degrade (RAM pressure).
No sorting means SHJ can be faster in appropriate cases, but may be less predictable in ordering than Sort Merge Join.
Sort Merge Join (SMJ)
How it works:
Spark shuffles data so that keys line up in the same partition on both sides, then sorts both inputs by the join key.
After sorting, Spark performs the join by sweeping the aligned, sorted keys.
When to use SMJ:
Effective when both sides are large and there isn’t a small enough table to broadcast or hash efficiently.
Can be faster when data is already pre-sorted or when sort operations are inexpensive relative to the shuffle cost.
Trade-offs:
Involves sort operations (costly), so SMJ can be slower if sorting dominates runtime.
Tuning note:
Enabling SMJ (e.g., via spark.sql.join.preferSortMergeJoin = true) leverages sorting; disabling can push Spark toward SHJ if appropriate.
Broadcast Join (BJ)
Core idea:
Replicate the smaller dataset on every executor/partition, so there is no need to shuffle the larger dataset for the join.
The join then proceeds by simple probing against the replicated small table on each partition.
Why it speeds things up:
Eliminates or greatly reduces shuffle and sort costs since the small table is available locally on each worker.
When to use BJ:
Most effective when one dataset is small enough to fit into memory on every executor (the smaller side).
Particularly useful in star schemas or fact-to-dimension style joins where the dimension/master table is small.
Limitations and caveats:
Memory pressure: replicating the small table on all executors consumes memory; if it does not fit, you get memory errors.
The default auto-broadcast threshold governs automatic broadcasting: by default, spark.sql.autoBroadcastJoinThreshold = 10 MB (the smaller dataset must be under this size to be auto-broadcast).
Broadcast joins support EQ joins (equality joins) but not all join types (not all forms of full outer join).
Runtime control and examples:
Default behavior can trigger a broadcast join if the small table is under the threshold; this can be overridden by setting the threshold to a different value or by explicitly using a broadcast hint.
Explicit broadcast control: use the function broadcast(smallDf) (in PySpark) or a SQL hint like /*+ BROADCAST(smallTable) */ to force broadcasting of a specific dataset.
Practical example from the transcript:
A 2.6 GB sales dataset joined with a tiny customer master (size << 10 MB) triggers an automatic broadcast join by default.
To disable automatic broadcasting, set spark.sql.autoBroadcastJoinThreshold = -1 (the transcript notes using -1 to disable). Then rerun to observe different join strategy (likely SHJ or SMJ).
Explicitly broadcasting the small dataset using broadcast(customerDf) can ensure no shuffling and faster join when appropriate.
Important config values:
Default:
Maximum recommended: up to (via spark.sql.autoBroadcastJoinThreshold)
AQE (Adaptive Query Execution) and dynamic partitioning hints
Observation from the demo:
Adaptive Query Execution can reduce shuffle partitions on-the-fly (e.g., from a default of 200 shuffle partitions down to around 65 partitions).
AQE can optimize the physical plan during runtime by coalescing or repartitioning shuffle outputs to better match the data.
Practical impact:
Fewer partitions after AQE can reduce shuffle overhead and improve overall join performance, especially for large datasets.
AQE interacts with join strategies; the final plan may switch to more efficient strategies based on runtime statistics.
Bucketing (bucketed joins) for big-big joins
Core concept:
Bucketing partitions data by a join key into a fixed number of buckets (N buckets) per partition, creating a controlled, hash-based distribution.
Example: bucket on ctid into 4 buckets; data is distributed so that all records with the same ctid land in the same bucket across partitions.
How bucketing helps joins:
When both datasets are bucketed on the same join column, Spark can join corresponding buckets, potentially reducing or eliminating cross-partition shuffles.
A join is performed bucket-by-bucket (e.g., bucket 1 with bucket 1, bucket 2 with bucket 2, etc.) rather than shuffling entire datasets.
How data is laid out and read:
Each partition is split into multiple bucket files (e.g., 8 partitions × 4 buckets = 32 files for a given dataset).
During a join, Spark reads corresponding bucket files from both sides and joins them bucket-by-bucket.
Data layout example (conceptual):
Buckets for city_id with 4 buckets across 8 partitions yield 32 files per dataset (partition × bucket).
For example, bucket 1 of partition 0 from table A is joined with bucket 1 of partition 0 from table B, then bucket 2 across partitions, etc.
Practical notes from the transcript:
Bucketing reduces the volume of data shuffled for the join, but some shuffling may still occur if a bucket on one side doesn’t perfectly align with the corresponding bucket on the other side (e.g., a bucket contains data for ct_id values that appear in other partitions on the other side).
You can choose the number of buckets (e.g., 4, 8) based on data cardinality and distribution. The speaker demonstrates improving performance by bucketing and shows how a 30-second join can drop to around 19 seconds with bucketing in practice.
How to implement bucketing in Spark:
DataFrameWriter approach (example):
df.write.bucketBy(4, "ctid" ).sortBy("ctid").saveAsTable("sales_bucket")
This creates a bucketed table stored in the warehouse directory.
Reading back: spark.read.table("sales_bucket")
You can also bucket data while loading from existing files by bucketing then saving as a bucketed table.
Partition vs bucket reminders:
Partitioning creates a directory per distinct value of the partition column (e.g., city=value); good for low-cardinality columns but not practical for high cardinality (too many folders).
Bucketing uses a fixed number of buckets and stores data within each partition in bucket files, enabling more controlled, hash-based distribution for joins with high cardinality keys.
Practical guidance:
Bucketing is most beneficial when you frequently join large fact tables with large dimension tables on a column with moderate cardinality and where you can predefine a reasonable number of buckets.
Write-time bucketing (creating a bucketed table as part of table creation) is recommended so subsequent loads automatically maintain bucket structure.
Bucketing adds write-time overhead and complexity; weigh benefits against write performance and maintenance costs.
How to decide which strategy to use
Small table vs big table (one big, one small):
Prefer Broadcast Join if the small table fits in memory (under the auto-broadcast threshold). You can enforce via broadcast hints if needed.
If you don’t want automatic broadcasting or data is too large for RAM, consider SHJ or SMJ depending on data characteristics and whether you can afford shuffle cost.
Both tables big: bucketing can help
If you can bucket both datasets on the join key to a reasonable number of buckets, you can reduce or partition the shuffle cost and potentially avoid some cross-partition shuffles by processing per bucket.
When to use SQL vs PySpark for joins:
Both ultimately use the same Spark engine; Spark SQL and DataFrame APIs are interchangeable in terms of core join logic.
You can apply the same strategies via SQL hints (e.g., BROADCAST) or DataFrame APIs (e.g., df.join(…), df.write.bucketBy(…)).
Hints and configuration recap:
Broadcast hints in SQL: use a hint such as /*+ BROADCAST(city) */ in your SELECT to force broadcasting of the city table.
PySpark: use from pyspark.sql.functions import broadcast; df2 = broadcast(smallDf); joined = df1.join(df2, "key")
Disable automatic broadcasting: set spark.sql.autoBroadcastJoinThreshold = -1
Enable/disable Sort Merge vs Shuffle Hash: spark.sql.join.preferSortMergeJoin = true|false
AQE can adjust shuffle partitions at runtime to optimize performance.
Practical tips and observations from the demo
Example setup used in the talk:
A 2.6 GB sales file (large) and a very small customer master dataset.
Default behavior after loading often triggers a broadcast join due to the small size of the master data.
You can observe the join plan and the type of join in Spark UI or via df.explain().
Observed behaviors:
Turning off auto-broadcast (spark.sql.autoBroadcastJoinThreshold = -1) causes Spark to skip automatic broadcasting; the join then uses a different strategy (shuffle-based, which can be slower).
Explicitly broadcasting the small dataset with broadcast() results in a no-shuffle join across partitions, and the runtime can be significantly reduced.
Adaptive Query Execution (AQE) can reduce shuffle partitions (e.g., from 200 to about 65) to optimize performance.
When using sort-merge join, Spark performs sorting after shuffling, which adds cost; if you can avoid sorting (e.g., with SHJ or BJ), you may gain speed.
How to verify the join strategy in practice:
Use df.explain() to view the physical plan and see whether SHJ, SMJ, or BJ is used, and whether bucketing is applied.
For bucketing, you can inspect the underlying warehouse directory to confirm bucketed files per partition (e.g., partition0/bucket0, partition0/bucket1, etc.).
Summary of practical implications
Use cases at a glance:
Small-dimension/large-fact joins: prefer Broadcast Join, if the dimension truly fits in memory; otherwise, let Spark pick SHJ or SMJ based on configuration and data.
Large-dimension joins with a measurable degree of skew or high cardinality: consider bucketing to control shuffle and improve join locality.
When write performance matters (loads, ETL pipelines): weigh bucketing against the extra write-time overhead; pre-bucketing at table creation can help long-term performance.
Final tip: always validate with an action (e.g., write or show) to trigger execution and observe actual performance, and use explain() to understand the plan and confirm the chosen join strategy.
Quick reference (LaTeX-formatted snippets)
Broadcast threshold (default):
Maximum threshold (manual adjustment):
Disable automatic broadcast:
set spark.sql.autoBroadcastJoinThreshold = -1Join strategies (summary):
Shuffle Hash Join: shuffle data by join keys, build in-memory hash on smaller side, probe with larger side, no sort.
Sort Merge Join: shuffle by keys, sort both sides, then join.
Broadcast Join: replicate smaller table across executors, no shuffle for large table.
Bucketing relationship (conceptual):
Bucketing by column C into B buckets partitions data into B files per partition; join then processes per bucket across partitions to reduce cross-partition shuffles.