Distributed Query Planning
Scaling Queries Across Multiple Machines
When data grows beyond one machine, queries must be fragmented, distributed, and reassembled efficiently.
What is "Shuffle"?
Shuffle is moving data between nodes to ensure related records end up on the same machine for processing. It's essentially hash partitioning over the network.
Local Hash Partitioning
hash(user_id) % 100 → Local file 7
All user_42 data → Same local file
Process file 7 in RAM
Distributed Shuffle
hash(user_id) % 100 → Network → Node 7
All user_42 data → Same remote node
Process on Node 7
Key Point: Same algorithm, different destination (local disk vs remote node)
Connection to Hash Partitioning: Shuffle is the network version of the hash partitioning algorithm we learned earlier. Instead of writing
hash(key) % Bto local files, we sendhash(key) % nodesover the network. Same concept, distributed execution!Review: Hash Partitioning shows the local version
The Journey: One Query, Many Nodes
Core Concepts of Distributed Planning
Fragmentation
The optimizer splits the logical plan into independent fragments. Each node executes the same code but on its own slice of data.
The Shuffle: Moving Data Efficiently
Shuffle is the network version of a hash partition. When two records need to be processed together (e.g., for a Join or a Group By) but live on different nodes, we must move them.
Shuffle Strategies
| Strategy | Mechanism | Best For | Network Cost |
|---|---|---|---|
| Broadcast Shuffle | Send a copy of the entire table to every node. | Joining a tiny table with a massive one. | Small Table Size × # Nodes |
| Hash Shuffle | Redistribute both tables by a common hash key. | Joining two large tables. | Total Size of Both Tables |
| Co-located (Zero Shuffle) | No movement; data is already partitioned correctly. | Tables pre-partitioned on the same key. | Zero Network I/O |
Reaggregation Patterns: The Two-Phase Execution
Problem: Attempting to group data globally across 1,000 nodes would overwhelm the network with billions of tiny records.
Phase 1: Local Pre-Aggregation
Each node performs the grouping and aggregation on its local data first.
-
Input: 1,000,000 "listen" rows per node.
-
Output: 1,000 "artist totals" per node.
-
Impact: Reduces data size by 1,000x before it ever hits the network.
Phase 2: Global Merge
The partial results are shuffled by the grouping key (e.g., artist_id) to a coordinator node.
-
Action: Coordinator sums the partial counts from all nodes to get the true total.
-
Why: Ensures correctness while minimizing network congestion.
Optional:Real Systems Comparison
| System | Architecture | Key Optimization | Shuffle Strategy |
|---|---|---|---|
| Apache Spark | In-memory RDDs | Catalyst optimizer + code gen | Hash & broadcast joins |
| BigQuery (Dremel) | Columnar tree | Hierarchical aggregation | Minimize data movement |
| Snowflake | Storage/compute separation | Micro-partitions + caching | Automatic clustering |
| Presto/Trino | Pipelined execution | Dynamic filtering | Statistics-driven |