Distributed Query Planning

Scaling Queries Across Multiple Machines

When data grows beyond one machine, queries must be fragmented, distributed, and reassembled efficiently.

100TB
Too big for one node
1000 Nodes
Parallel execution
Shuffle
Data movement
100x Faster
vs single node

What is "Shuffle"?

Shuffle is moving data between nodes to ensure related records end up on the same machine for processing. It's essentially hash partitioning over the network.

Local Hash Partitioning

hash(user_id) % 100 β†’ Local file 7
All user_42 data β†’ Same local file
Process file 7 in RAM

Distributed Shuffle

hash(user_id) % 100 β†’ Network β†’ Node 7  
All user_42 data β†’ Same remote node
Process on Node 7

Key Point: Same algorithm, different destination (local disk vs remote node)

Connection to Hash Partitioning: Shuffle is the network version of the hash partitioning algorithm you learned earlier. Instead of writing hash(key) % B to local files, we send hash(key) % nodes over the network. Same concept, distributed execution!

Review: Hash Partitioning shows the local version


The Journey: One Query, Many Nodes

Distributed Query Execution: Spotify Top Artists Original Query (Single Node View) SELECT artist, COUNT(*) as plays FROM listens l JOIN songs s ON ... WHERE timestamp > '2024-01-01' GROUP BY artist ORDER BY plays DESC Fragment Data Partitioned Across Nodes (Hash Partitioned by song_id) Node 0 Node 1 Node 2 ... Node 999 Phase 1: Local Execution (Parallel on Each Node) Node 0 Local Query SELECT artist, COUNT(*) FROM local_listens l JOIN local_songs s ON ... WHERE timestamp > '2024' GROUP BY artist β†’ Results Node 1 Local Query SELECT artist, COUNT(*) FROM local_listens l JOIN local_songs s ON ... WHERE timestamp > '2024' GROUP BY artist β†’ Results ... Node 999 Local Query SELECT artist, COUNT(*) FROM local_listens l JOIN local_songs s ON ... WHERE timestamp > '2024' GROUP BY artist β†’ Results Phase 2: Shuffle (Hash Partition by Artist) Each node sends data to coordinator nodes hash(artist_name) % 1000 β†’ Same artist β†’ Same node Phase 3: Global Aggregation (Merge Results) Coordinator Nodes (Each handles subset of artists) "Taylor Swift" counts All nodes β†’ 3.5M total "Drake" counts All nodes β†’ 3.2M total Final: Sort & Return Top 10 1. Taylor Swift - 3.5M plays 2. Drake - 3.2M plays 3. Bad Bunny - 2.9M plays ...

Core Concepts

Query Fragmentation

Each node runs identical queries on different data partitions, then results are combined.

Data Partitioning Strategies

Hash Partition

hash(user_id) % 1000 β†’ Node 42
Best for: Equality joins Example: GROUP BY user_id

Range Partition

A-F β†’ Node 0, G-M β†’ Node 1
Best for: Range queries Example: WHERE name BETWEEN 'A' AND 'F'

Broadcast

Small table β†’ Copy to all nodes
Best for: Small lookup tables Example: Countries (200 rows)

Round Robin

Row 1β†’Node 0, Row 2β†’Node 1...
Best for: Load balancing Example: Even data distribution

Shuffle Patterns: Moving Data Efficiently

Broadcast Shuffle

Small table β†’ Copy to all nodes
Large table β†’ Stays put
When: Small Γ— Large joins Cost: Small table size Γ— nodes Example: Countries JOIN Users

Hash Shuffle

Both tables β†’ Shuffle by key
Same key β†’ Same node
When: Large Γ— Large joins Cost: Both table sizes Example: Users JOIN Orders

Co-located (No Shuffle)

Already partitioned same way
No network transfer needed
When: Pre-partitioned data Cost: Zero network I/O Example: Partitioned by user_id

Two-Phase Aggregation: Minimize Network Traffic

Problem: GROUP BY artist on 1000 nodes Γ— 10M artists = 10B network messages!

Solution: Local pre-aggregation then shuffle partial results

-- Phase 1: Each node does LOCAL aggregation first
-- Instead of sending 500K raw rows, send 10K aggregated results
-- Phase 2: Shuffle by artist_name, final aggregation
-- Result: 100x less network traffic

Single vs Distributed Query Planning

Aspect Single Node Distributed
Join Algorithms Hash, Sort-Merge, Nested Loop + Broadcast, Shuffle, Co-located
Cost Model CPU + Disk I/O + Network I/O + Data Skew
Aggregation Single pass Two-phase (local + global)
Optimization Goals Join order, index selection + Data placement, shuffle reduction

Real Systems Comparison

System Architecture Key Optimization Shuffle Strategy
Apache Spark In-memory RDDs Catalyst optimizer + code gen Hash & broadcast joins
BigQuery (Dremel) Columnar tree Hierarchical aggregation Minimize data movement
Snowflake Storage/compute separation Micro-partitions + caching Automatic clustering
Presto/Trino Pipelined execution Dynamic filtering Statistics-driven

Performance Problems & Solutions

Data Skew

Problem: One node gets 90% of data Example: Celebrity user has 10M tweets Solution: Pre-split hot keys, sampling

Network Bottleneck

Problem: Shuffle takes longer than compute Example: Joining two 1TB tables Solution: Co-locate data, compression

Stragglers

Problem: Slowest node delays everyone Example: Bad disk on one machine Solution: Speculative execution, retries

Small Files

Problem: Metadata overhead > data Example: 1000 files of 1MB each Solution: Coalesce into fewer large files

Key Takeaways

  1. Fragment queries - Split into parallel pieces

  2. Minimize shuffle - Co-locate when possible

  3. Two-phase aggregation - Reduce network traffic

  4. Broadcast small tables - Avoid shuffling large data

  5. Monitor skew - Balance data distribution

The Art: Minimizing data movement while maximizing parallelism!