Distributed Query Planning

Concept. A distributed query splits work across many machines by partitioning the input tables, executing local plans on each shard, and shuffling intermediate results so matching keys land together.

Intuition. When Listens is 100 TB and lives on 1,000 machines, no one machine can join it to Users alone. The optimizer hashes both tables on user_id, ships each user's rows to one shard, then runs the same join algorithm locally on every shard.

Scaling Queries Across Multiple Machines

When your data outgrows a single machine, you don't just need more storage—you need a strategy. Queries must be sliced, diced, and reassembled with precision.

100TB

Beyond single node capacity

1000 Nodes

Parallel execution

Shuffle

Data movement

100x Faster

Compared to single node


What is "Shuffle"?

Shuffle is the art of data relocation across nodes, ensuring that related records rendezvous on the same machine for processing. It's essentially hash partitioning over the network.

Local Hash Partitioning

Distributed Shuffle

Key Point: Same algorithm, different destination. Whether it's local disk or a remote node, the principle remains unchanged.

Connection to Hash Partitioning: Shuffle is network hash partitioning. Instead of hash(key) % B for local files, it's hash(key) % nodes over the network. Same logic, broader execution.

Review: Hash Partitioning covers the local approach.


The Journey: One Query, Many Nodes


Core Concepts of Distributed Planning

Fragmentation

The optimizer dissects the logical plan into independent fragments. Each node runs the same code but on its own data slice.


The Shuffle: Moving Data Efficiently

Shuffle translates to network hash partitioning. When records destined for joint processing reside on different nodes, movement is mandatory.

Shuffle Strategies

Strategy Mechanism Best For Network Cost
Broadcast Shuffle Send a copy of the entire table to every node. Joining a tiny table with a massive one. Small Table Size × # Nodes
Hash Shuffle Redistribute both tables by a common hash key. Joining two large tables. Total Size of Both Tables
Co-located (Zero Shuffle) No movement; data is already partitioned correctly. Tables pre-partitioned on the same key. Zero Network I/O

Reaggregation Patterns: The Two-Phase Execution

Problem: Global grouping across 1,000 nodes would flood the network with billions of minuscule records.

Phase 1: Local Pre-Aggregation

Each node groups and aggregates its local data.

  • Input: 1,000,000 "listen" rows per node.

  • Output: 1,000 "artist totals" per node.

  • Impact: Shrinks data by 1,000x before network transfer.

Phase 2: Global Merge

Partial results are shuffled by the grouping key (e.g., artist_id) to a coordinator node.

  • Action: Coordinator aggregates partial counts to derive the true total.

  • Why: Ensures accuracy while reducing network load.


Optional: Real Systems Comparison

System Architecture Key Optimization Shuffle Strategy
Apache Spark In-memory RDDs Catalyst optimizer + code gen Hash & broadcast joins
BigQuery (Dremel) Columnar tree Hierarchical aggregation Minimize data movement
Snowflake Storage/compute separation Micro-partitions + caching Automatic clustering
Presto/Trino Pipelined execution Dynamic filtering Statistics-driven