Databases

Sharding & Partitioning

Your single PostgreSQL is drowning under 50K writes/sec. Read replicas don't help — writes still hit one machine. The fix: split the data itself across multiple databases. Instagram does it. Discord does it. Vitess does it for YouTube. Here's exactly how, with real math, real commands, and real trade-offs.

8 Think Firsts 16 SVG Diagrams Real Commands 15 Sections 24 Tooltips
Section 1

TL;DR — The One-Minute Version

Mental Model: Imagine a library with 10 million books crammed into a single building. The hallways are jammed, the checkout line wraps around the block, and the librarian can't shelve returns fast enough. The fix isn't a bigger building — it's splitting the collection across 10 buildings, each holding authors A–C, D–F, and so on. Every building has its own checkout desk, its own shelving crew, its own capacity. That's shardingSplitting a database's rows across multiple independent database servers (called "shards"), where each shard holds a subset of the data. Each shard is a fully functional database that can handle reads AND writes independently. The key difference from read replicas: replicas copy ALL data and only handle reads. Shards each hold DIFFERENT data and handle both reads and writes.. Each building is a shard — an independent database holding a slice of the total data.

Here's the core idea in one sentence: when a single database can't handle the write load no matter how big you make it, you split the data across multiple databases so each one handles a fraction of the traffic. Read replicas solve read bottlenecks. Sharding solves write bottlenecks. That's the distinction that matters.

InstagramInstagram shards PostgreSQL by user_id. Each shard is a separate PostgreSQL instance holding roughly 10–15 million users. The shard ID is embedded directly into their Snowflake-style IDs, so given any photo or comment ID, they can extract which shard it lives on without a lookup. As of their 2012 engineering blog post, they used about 64 logical shards mapped across fewer physical servers. shards PostgreSQL by user_id — 1 billion users across 64 shards, about 15 million users per shard. DiscordDiscord shards messages in Cassandra by (channel_id, bucket), where a bucket represents 10 days of messages. Old buckets get archived to cold storage. At scale, they manage ~120 billion messages across thousands of Cassandra nodes. In 2022, they migrated from Cassandra to ScyllaDB for better tail latency. shards messages by (channel_id, time_bucket) across thousands of Cassandra nodes. ShopifyShopify uses Vitess (originally built by YouTube) to shard MySQL. Each merchant's data lives on a single shard, so all queries for one store hit one shard. They manage thousands of shards across their fleet. The Vitess proxy (vtgate) routes queries transparently — the application code doesn't know sharding exists. shards MySQL through Vitess, one shard per merchant. These aren't theoretical examples — they're running right now, serving billions of requests per day.

BEFORE: Single DB App 1 App 2 App 3 PostgreSQL 50K writes/sec CPU: 100% — DEAD WRITE BOTTLENECK AFTER: 4 Shards Shard 0 Users A–F 12.5K w/s Shard 1 Users G–M 12.5K w/s Shard 2 Users N–S 12.5K w/s Shard 3 Users T–Z 12.5K w/s Shard Router 50K writes ÷ 4 shards = 12.5K writes per shard — each shard is comfortable Need more capacity? Add shard 4, 5, 6... Write throughput scales linearly.
Real-world sharding in one command. In Citus (PostgreSQL extension): SELECT create_distributed_table('orders', 'user_id'); — your orders table is now sharded across worker nodes. In MongoDB: sh.shardCollection("mydb.orders", { "user_id": "hashed" }). In Vitess: define a VSchema and vtctl ApplyVSchema. Sharding isn't just a concept — it's a real operation you can run today.
Section 2

The Scenario — 500 GB, 50K Writes/Sec, and Nowhere Left to Go

You're the database engineer for a fast-growing e-commerce platform. Your single PostgreSQLA free, open-source relational database used by Instagram, Stripe, Reddit, Notion, and thousands of other companies. You can spin one up right now: docker run -e POSTGRES_PASSWORD=test -p 5432:5432 postgres:16. It stores data in 8 KB pages, supports streaming replication, and can handle tens of thousands of queries per second on modern hardware. instance holds 500 GB of data across three main tables: users (80 million rows), orders (600 million rows), and products (2 million rows). You've already done everything the "Scaling Databases" playbook told you to do:

And yet your database is still dying. Here's why: the bottleneck isn't reads anymore — it's writes. Your platform processes 50,000 order writes per second during peak hours (flash sales, holiday events). Every single one of those writes — INSERT, UPDATE, DELETE — must go through the single primary database. Read replicas don't help with writes. A bigger machine doesn't exist. You're stuck.

Your Current Setup — Reads Are Fine, Writes Are the Problem PRIMARY (db.r6g.16xlarge) 64 vCPUs • 512 GB RAM • 500 GB data 50K writes/sec → CPU: 98% Reads offloaded. Writes CAN'T be. Replica 1 (reads) 50K reads/sec ✓ Replica 2 (reads) 50K reads/sec ✓ Replica 3 (reads) 50K reads/sec ✓ 50K writes ALL here Reads: SOLVED (replicas). Writes: UNSOLVABLE with current architecture.
Think First

You have one database handling 50K writes/sec. You can't buy a bigger machine. Read replicas only help with reads. What's the only option left? Hint: if one kitchen can't cook 500 meals/hour, and you can't make the kitchen bigger, what do you do?

You open a second kitchen. And a third. Each one cooks a portion of the orders.

This is the exact moment where every fast-growing company reaches the same conclusion. Instagram hit it when their single PostgreSQL couldn't handle the write volume from 100 million users liking and commenting simultaneously. Discord hit it when message writes across millions of channels overwhelmed a single Cassandra cluster. The answer is always the same: stop trying to push all writes through one machine. Split the data.

Section 3

The First Attempt — "Just Buy a Bigger Server" and "Add More Replicas"

Before we reach the breakthrough, let's trace the two obvious ideas that don't work — because understanding why they fail makes the real solution click instantly.

Idea 1: Vertical Scaling (Bigger Machine)

You're already on the biggest machine AWS sells for RDS: the db.r6g.16xlarge with 64 vCPUs and 512 GB RAM. There is no db.r6g.32xlarge. You've hit the ceiling. Even if a bigger machine existed, the gains are sub-linear — doubling cores doesn't double write throughput because of PostgreSQL's internal lock contentionWhen multiple database processes try to modify the same data structure at the same time, they have to wait for each other. PostgreSQL uses lightweight locks (LWLocks) and heavyweight locks to coordinate. More cores means more processes competing for these locks, which means more time spent waiting instead of working. This is why 64 cores don't give you 16x the write throughput of 4 cores — you might get 8-10x at best. on shared memory.

Instance vCPUs RAM Max Writes/sec Cost/mo Verdict
db.r6g.4xlarge 16 128 GB ~15K $1,800 Not close
db.r6g.8xlarge 32 256 GB ~28K $3,600 Still short
db.r6g.16xlarge 64 512 GB ~45K $7,200 Barely, no headroom
db.r6g.32xlarge? 128 1 TB ~70K (theory) $14K+ Doesn't exist

Even at the maximum, you're running at 98% capacity with zero headroom for traffic spikes. And your traffic is growing. In 3 months, you'll need 80K writes/sec. No single machine on Earth can handle that for a PostgreSQL process.

Think First

You doubled your RDS instance from 32 to 64 vCPUs. Write throughput went from 28K to 45K — a 1.6x improvement for a 2x cost increase. If you could somehow get 128 vCPUs, what write throughput would you predict? Linear (90K)? Or something worse?

Worse. Lock contention grows super-linearly with cores. You'd likely see ~55–60K writes/sec — barely a 1.3x improvement for another 2x cost. This is Amdahl's Law: the serial portion (WAL writes, lock manager) limits total speedup no matter how many cores you add.

Idea 2: More Read Replicas

This is the most common mistake engineers make when they first see write bottlenecks. "We added read replicas to handle more reads — can't we add write replicas too?" The answer is no, and understanding why is fundamental to understanding sharding.

A read replicaA copy of your primary database that receives a stream of changes (via WAL shipping or logical replication) and applies them locally. It can serve SELECT queries but CANNOT accept INSERT, UPDATE, or DELETE. All writes must go to the primary, which then streams the changes to all replicas. Adding more replicas gives you more read capacity but zero additional write capacity. receives changes FROM the primary. It doesn't generate changes. Think of it like a TV broadcast: the studio (primary) creates the content, and TV sets (replicas) display it. Adding more TVs doesn't help the studio produce content faster. Every write still goes through one machine:

Why Read Replicas Don't Fix Write Bottlenecks App Server 50K writes/sec ALL WRITES PRIMARY Bottleneck: 50K w/s CPU: 98% replication stream Replica 1 (reads) Replica 2 (reads) Replica 3 (reads) Adding 10 more replicas = ZERO extra write capacity Replicas scale reads. Nothing in the replica model scales writes. That's sharding's job.
The Dead End

You've exhausted every tool in the single-database playbook: vertical scaling (maxed out), read replicas (writes are the problem), connection pooling (already done), query optimization (already done). The architecture itself is the limit. No amount of tuning can make one machine handle writes it physically cannot process. You need a fundamentally different approach.

Section 4

Where It Breaks — The Single-Writer Math That Forces Your Hand

Let's put real numbers to the pain. This isn't "the database is slow" — this is exact arithmetic that proves a single machine physically cannot keep up.

The Write Math

Each write operation (INSERT, UPDATE, DELETE) in PostgreSQL involves several steps: parse the SQL, acquire row locks, write to the WALWrite-Ahead Log. Before PostgreSQL changes any actual data page, it first writes a description of the change to the WAL. This guarantees durability — if the server crashes mid-write, PostgreSQL replays the WAL on startup to recover. The WAL is an append-only file on disk. You can see your WAL location right now: SELECT pg_current_wal_lsn();. Every replica streams this WAL to stay in sync. (write-ahead log), update the data page in shared buffers, and eventually flush to disk. On a well-tuned PostgreSQL, a typical write takes about 0.2 ms (200 microseconds) under low load. But as concurrency increases, lock contention adds up.

At 50,000 writes per second, here's what happens on a single machine:

write-bottleneck-math.txt
# Each write: ~0.2ms base + lock contention overhead at high concurrency
# At 50K writes/sec with contention, effective time per write ≈ 0.8ms

50,000 writes/sec × 0.8 ms/write = 40,000 ms of write work per second
                                   = 40 seconds of work per 1 second of wall time

# PostgreSQL can use multiple cores, but WAL writing is largely serialized.
# The WAL insert lock is a SINGLE bottleneck — only one process writes WAL at a time.

# On 64 cores, you can parallelize some work, but:
# - WAL insert: serialized (1 core)
# - Lock manager: shared (contention grows with cores)
# - Buffer pool: shared (contention grows with cores)

# Real measured throughput ceiling on db.r6g.16xlarge:
# ~45K simple INSERTs/sec (pgbench, 64 clients)
# ~30K mixed writes with UPDATEs and foreign keys
# ~20K writes with complex transactions (BEGIN...multiple statements...COMMIT)

# Your 50K writes/sec with real transactions?
# You need roughly 1.5x the capacity of the biggest machine that exists.

The WAL insert lockPostgreSQL 9.4+ uses a WAL insert lock array (8 locks by default) instead of a single lock, but at extremely high write rates, these locks still become a bottleneck. The WAL is fundamentally an append-only sequential log — you can't parallelize sequential writes beyond a certain point. Run SELECT * FROM pg_stat_activity WHERE wait_event = 'WALInsertLock'; to see processes waiting on this lock right now. is the key insight. PostgreSQL writes every change to the WAL before confirming the transaction. At low volume, this is instant. At 50K writes/sec, processes start queuing up to write to the WAL. It's like a single-lane highway — no matter how many lanes (cores) you have leading up to it, everyone has to merge into one lane to cross the bridge.

The WAL Bottleneck — 64 Cores, 1 Lock Core 1 Core 2 Core 3 ... Core 64 WAL INSERT LOCK WAL on Disk (sequential append) The Queue at 50K writes/sec 50K processes want to write WAL Only 1 can write at a time Processes wait → latency spikes → timeouts (pg_stat_activity shows WALInsertLock waits) Adding more cores doesn't help — you're bottlenecked on a serial operation. This is Amdahl's Law in action: the serial portion limits your total speedup.
Think First

If a single WAL is the bottleneck, and you can't make one WAL faster, what's the logical next step? Think about it in terms of highways: if one bridge causes a traffic jam, what's the civil engineering solution?

You build more bridges. Each bridge (WAL) handles a fraction of the traffic. That's what sharding does — each shard has its own WAL.
Arpit's 2×2 Matrix

Arpit Bhayani describes four levels of database architecture as a 2×2 grid: No partition + No shard = a monolith (single DB, single server). Partition + No shard = the same server with the data split into chunks (like PostgreSQL table partitioning). No partition + Shard = multiple servers each holding a full copy (read replicas). Partition + Shard = the full distributed setup where data is both split AND spread across machines. We're about to move from the third box to the fourth.

Section 5

The Breakthrough — Split the Data, Multiply the Capacity

The insight is almost embarrassingly simple once you see it. If one database can handle 30K writes/sec, and you need 50K writes/sec, then two databases can handle 60K writes/sec — as long as each one only receives half the writes. The trick is making sure each write goes to the right database.

Think First

You split 80M users across 4 databases. User 25,000,000 writes an order. How does the application know which of the 4 databases to send that write to? Think about what information you'd need and where that routing logic lives.

You need a function: given a user_id, return a shard number. The simplest: user_id % 4. But range-based works too: IDs 1–20M = Shard 0, 20M–40M = Shard 1, etc. The routing logic lives in a "shard router" — either in your app code, a proxy (Vitess vtgate, Citus coordinator), or the database driver.

This is shardingThe practice of horizontally splitting a database's rows across multiple independent database instances (shards). Each shard holds a different subset of the data and operates independently with its own CPU, RAM, WAL, and disk. Unlike replicas (which hold copies of ALL data for read scaling), shards hold DIFFERENT data for write scaling. The word "shard" comes from the MMORPG Ultima Online (1997), where the game world was split into parallel "shards" to handle more players.. You pick a column — say user_id — and use it to decide which database a row lives on. User IDs 1–40M go to Shard 0. User IDs 40M–80M go to Shard 1. When a write comes in for user 25,000,000, it goes to Shard 0. When a write comes in for user 65,000,000, it goes to Shard 1. Each shard gets roughly half the traffic, and each shard has its own CPU, its own RAM, its own WAL — its own everything.

The column you pick is called the shard keyThe column (or combination of columns) used to determine which shard a row belongs to. Choosing the right shard key is the single most important sharding decision. A good shard key distributes data evenly AND ensures that related data lives on the same shard. Instagram uses user_id (all of a user's data on one shard). Discord uses (channel_id, time_bucket). A bad shard key creates hotspots — one shard getting far more traffic than others. (sometimes called the partition key). And that choice is the single most important decision in sharding — get it right and everything flows smoothly; get it wrong and you create hotspots that defeat the entire purpose.

The Breakthrough: Each Shard = Independent Database SINGLE DB 1 CPU pool 1 WAL 1 lock manager 50K w/s = OVERLOADED SHARD SHARD 0 Users 1–20M Own CPU • Own WAL Own locks • Own disk 12.5K w/s ✓ SHARD 1 Users 20M–40M Own CPU • Own WAL Own locks • Own disk 12.5K w/s ✓ SHARD 2 Users 40M–60M Own CPU • Own WAL Own locks • Own disk 12.5K w/s ✓ SHARD 3 Users 60M–80M Own CPU • Own WAL Own locks • Own disk 12.5K w/s ✓ Near-linear scaling: 4 shards × 30K each = ~110–120K total capacity Not perfectly linear (router overhead, cross-shard queries) but no ceiling. Instagram 1B users ÷ 64 shards = 15.6M/shard Each shard ~500GB PostgreSQL Discord 120B messages across 1000s of nodes Shard by (channel_id, 10-day bucket) Shopify / Vitess MySQL sharded per merchant vtgate routes queries transparently The only way to scale writes beyond one machine: split the data. Vertical scaling has a ceiling. Sharding doesn't.

But here's the trade-off nobody mentions in the YouTube videos: once you shard, your database isn't "one thing" anymore. It's N separate things. A query that used to scan one table now has to scan N shards. A JOIN between orders and users that used to be instant now requires a scatter-gatherA query pattern where the router sends the same query to ALL shards (scatter), waits for all responses, then merges the results (gather). If orders are on shard 0 and users are on shard 3, the JOIN can't happen inside a single database — the router has to fetch from both shards and combine them in memory. This is O(N) where N is the number of shards, and it's the primary reason sharding makes some queries dramatically slower. across all shards. Transactions that span multiple shards require two-phase commitA distributed transaction protocol (2PC) where a coordinator asks all participants "can you commit?" (Phase 1: Prepare). If ALL say yes, the coordinator sends "commit" (Phase 2: Commit). If ANY says no, the coordinator sends "abort" to everyone. 2PC guarantees atomicity across shards but adds latency (2 round trips instead of 1) and can block if the coordinator crashes between phases., which is slow and fragile.

This is why sharding is called "the weapon of last resort." It gives you unlimited write scaling, but it takes away things you used to get for free: JOINs, transactions, and global queries. Every company that shards wishes they didn't have to. But every company that needs to handle 50K+ writes/sec has no other option.

The Anti-Lesson — Premature Sharding Kills Startups: Don't shard until you MUST. Instagram ran on a single PostgreSQL for their first million users. Discord didn't shard until they had billions of messages. Foursquare sharded MongoDB early (2010) and spent 2 engineers full-time for 6 months fixing shard key issues, hotspots, and cross-shard query performance — time that could have gone to features. Sharding adds permanent operational complexity: on-call engineers need to understand shard routing, every new feature must consider "does this query cross shards?", and migrations that used to be one ALTER TABLE now touch 64 independent databases. If read replicas, caching, and query optimization can keep you alive, stay unsharded.
Section 6

How It Works — 5 Sharding Strategies

You've decided to shard. The next question is: how do you decide which rows go to which shard? There are five main strategies, each with different trade-offs. The right choice depends on your data shape and your query patterns — and getting it wrong means hotspots, scattered queries, or painful migrations.

Range Sharding

The simplest approach: split data into contiguous ranges based on the shard key. Users 1–20M go to Shard 0, users 20M–40M to Shard 1, and so on. This is how you'd organize a library — books A–F on shelf 1, G–M on shelf 2. Easy to understand, easy to implement.

How it works: The router checks the shard key value and maps it to a range. Given user_id = 35,000,000, the router knows that falls in the 20M–40M range, so it sends the query to Shard 1.

range-routing.sql
-- Range sharding logic (conceptual)
-- Shard 0: user_id 1 to 20,000,000
-- Shard 1: user_id 20,000,001 to 40,000,000
-- Shard 2: user_id 40,000,001 to 60,000,000
-- Shard 3: user_id 60,000,001 to 80,000,000

-- Query for user 35M → goes to Shard 1
SELECT * FROM orders WHERE user_id = 35000000;

-- Range scan for users 10M-15M → goes to Shard 0 only (efficient!)
SELECT * FROM orders WHERE user_id BETWEEN 10000000 AND 15000000;

-- In MongoDB, this is automatic with range-based sharding:
-- sh.shardCollection("mydb.orders", { "user_id": 1 })
-- MongoDB's balancer automatically creates chunks and moves them
Range Sharding: Contiguous Ranges 0 20M 40M 60M 80M Shard 0 Shard 1 Shard 2 Shard 3 ✓ Range queries fast ✓ Simple to understand ✓ Easy to add shards ✗ Hotspots: new users all hit the last shard ✗ Uneven distribution over time
The Hotspot Problem

With range sharding by user_id, all new users get IDs at the high end. If your newest shard holds users 60M–80M and you're getting 10K new signups/hour, ALL those signups hit Shard 3 while Shards 0–2 sit nearly idle. The math: Shard 3 absorbs 10K writes/hour from new signups plus its normal traffic — maybe 15K total writes/sec, while Shards 0–2 handle just 3K–5K writes/sec each. With hash sharding, those same 10K signups spread evenly: ~2,500/hour per shard. This is the #1 reason range sharding fails in practice for auto-incrementing keys.

ProsCons
Range queries are efficient (scan one shard)Hotspots on auto-increment keys
Easy to understand and implementUneven distribution as data grows
Adding shards is straightforwardNew shards get all new traffic

Instead of putting user_id 1–20M on Shard 0, you run every user_id through a hash functionA function that takes any input (like a user_id) and produces a fixed-size number that looks random but is deterministic — the same input always produces the same output. Common hash functions: MD5, SHA-256, MurmurHash, xxHash. For sharding, you typically do hash(key) % num_shards to get a shard number. MurmurHash is popular because it's fast and distributes evenly. and use the result to pick a shard: shard = hash(user_id) % num_shards. This distributes data evenly regardless of the key's value. User 1 might go to Shard 3, user 2 to Shard 0, user 3 to Shard 1. It looks random, but it's deterministic — the same user_id always maps to the same shard.

hash-sharding.sql
-- Hash sharding: shard = hash(user_id) % 4

-- In MongoDB (hashed shard key):
-- sh.shardCollection("mydb.orders", { "user_id": "hashed" })
-- MongoDB automatically hashes user_id and distributes chunks evenly

-- In Citus (PostgreSQL):
-- SELECT create_distributed_table('orders', 'user_id');
-- Citus uses a hash function internally, creates 32 shards by default

-- In Vitess (MySQL):
-- VSchema defines the sharding function:
-- { "sharded": true,
--   "vindexes": { "hash": { "type": "hash" } },
--   "tables": { "orders": { "column_vindexes": [
--     { "column": "user_id", "name": "hash" }
--   ]}}
-- }
-- vtctl ApplyVSchema -vschema_file=vschema.json commerce

-- Manual hash routing (pseudocode):
-- shard_id = murmur3_hash(user_id) % 4
-- connection = shard_connections[shard_id]
-- connection.execute("INSERT INTO orders ...")
Hash Sharding: Even Distribution user_ids 1, 2, 3, 4, 5, 6, 7, 8, ... 79,999,999 80,000,000 hash(key) % 4 Shard 0: ~20M Shard 1: ~20M Shard 2: ~20M Shard 3: ~20M Why hash beats range: New user (id=80,000,001)? Range: ALWAYS goes to last shard Hash: evenly spread across all 4 No hotspots. Perfectly balanced. Trade-off: range queries hit ALL shards
ProsCons
Even distribution — no hotspots from sequential keysRange queries require scatter-gather (all shards)
Works with any key type (strings, UUIDs, integers)Adding shards requires rehashing — ~75–80% of data moves
Most widely used strategy (Instagram, Discord, Citus)Can't easily rebalance without consistent hashing
The rehashing problem: With 4 shards, hash(key) % 4 puts each key on a specific shard. When you add a 5th shard and switch to % 5, roughly 80% of keys change which shard they belong to. That means moving 80% of your data between shards. This is why consistent hashing exists — we'll cover it in Section 7.

Instead of computing the shard from the key (range or hash), you maintain a lookup table that explicitly maps each key (or key range) to a shard. Think of it as a phone book: to find which shard user 42,000,000 is on, you look it up in the directory. This gives you maximum flexibility — you can move any user to any shard at any time by updating one row in the directory.

This is how VitessAn open-source database clustering system for horizontal scaling of MySQL. Originally built by YouTube in 2010 to handle their explosive growth. Now used by Shopify, Slack, GitHub, Square, and others. The key components: vtgate (the proxy that routes queries), vttablet (manages each MySQL shard), VSchema (defines which tables are sharded and how). vtctl ApplyVSchema changes routing rules without downtime. works. The VSchema is essentially a directory that tells vtgate "this key maps to this shard." When you need to rebalance, you update the VSchema and Vitess handles the data migration.

directory-lookup.sql
-- The directory: a separate table (or service) mapping keys to shards
CREATE TABLE shard_directory (
    user_id_start  BIGINT,
    user_id_end    BIGINT,
    shard_id       INT,
    shard_host     VARCHAR(255),
    PRIMARY KEY (user_id_start)
);

-- Example entries:
-- | user_id_start | user_id_end | shard_id | shard_host          |
-- |          1    | 20,000,000  |    0     | shard-0.db.internal |
-- | 20,000,001    | 40,000,000  |    1     | shard-1.db.internal |
-- | 40,000,001    | 55,000,000  |    2     | shard-2.db.internal |
-- | 55,000,001    | 80,000,000  |    3     | shard-3.db.internal |
-- Note: ranges can be UNEQUAL — you can split a hot shard without touching others

-- Routing query:
SELECT shard_host FROM shard_directory
WHERE user_id_start <= 35000000 AND user_id_end >= 35000000;
-- Returns: shard-1.db.internal
ProsCons
Maximum flexibility — move any key group to any shardDirectory itself becomes a single point of failure
Unequal shard sizes are easy (split hot shards)Every query requires a directory lookup (added latency)
Rebalancing is just updating the directoryDirectory must be cached and replicated carefully
Think First

Your shard directory (the lookup table mapping keys to shards) is stored in a single PostgreSQL instance. Every query first hits this directory to find the right shard. You're doing 50K queries/sec. What's the problem? How would you solve it?

The directory becomes a single point of failure AND a throughput bottleneck — 50K lookups/sec on one machine. Solutions: (1) cache the directory in each app server's memory (most common), (2) replicate it with read replicas, (3) use Redis as a fast directory cache. The trade-off: cached directories can be stale during rebalancing.

Shard by physical location: all US users on a shard in us-east-1, all EU users on a shard in eu-west-1, all APAC users in ap-southeast-1. The shard key is the user's region. This strategy isn't just about scaling — it also reduces latency (users talk to nearby servers) and helps with data residencyLaws that require user data to be stored in specific geographic locations. The EU's GDPR requires that EU citizen data either stays in the EU or is transferred under strict legal frameworks. Brazil's LGPD, India's proposed DPDPA, and China's PIPL have similar requirements. Geographic sharding naturally satisfies these laws by keeping each region's data in that region's data center. compliance (EU data stays in EU data centers for GDPR).

Who uses it: Netflix stores user profiles in the nearest region. Uber shards ride data by city. WhatsApp assigns users to the nearest data center. Shopify lets merchants choose their data region.

Geographic Sharding: Data Near Users US Shard (us-east-1) US users • 30M rows Latency: 20ms from New York No GDPR concerns 20K writes/sec EU Shard (eu-west-1) EU users • 25M rows Latency: 15ms from Frankfurt GDPR compliant by design 18K writes/sec APAC Shard (ap-southeast-1) APAC users • 25M rows Latency: 10ms from Singapore India/China data residency 12K writes/sec
ProsCons
Low latency (data center near users)Uneven distribution (US/EU may be much larger than APAC)
Natural GDPR/data residency complianceCross-region queries are expensive (200ms+ round trip)
Regional failures are isolatedUsers who travel may hit the "wrong" shard

Sometimes a single column isn't enough to shard well. Discord uses (channel_id, bucket) as their shard key. The channel_id groups all messages for a channel together, and the bucket (a 10-day time window) prevents any single channel from growing unbounded. This way, a popular channel with millions of messages is spread across many buckets on different shards, while all messages within a 10-day window for one channel stay on the same shard for fast retrieval.

composite-key-discord.sql
-- Discord's composite shard key: (channel_id, bucket)
-- bucket = message_timestamp / (10 days in ms)
-- This ensures:
--   1. Recent messages for a channel are on the SAME shard (fast reads)
--   2. Old messages "age out" to different shards (cold storage)
--   3. No single shard grows forever

-- Cassandra (what Discord originally used):
CREATE TABLE messages (
    channel_id   BIGINT,
    bucket       INT,          -- epoch_ms / (10 * 86400 * 1000)
    message_id   BIGINT,       -- Snowflake ID (contains timestamp)
    author_id    BIGINT,
    content      TEXT,
    PRIMARY KEY ((channel_id, bucket), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC);

-- The PRIMARY KEY ((channel_id, bucket), ...) means:
-- (channel_id, bucket) is the PARTITION KEY → determines which shard
-- message_id is the CLUSTERING KEY → determines sort order within partition

-- Query: "get latest 50 messages in channel 12345"
-- 1. Compute current bucket: now_ms / (10 * 86400 * 1000)
-- 2. Route to shard holding (12345, current_bucket)
-- 3. Read 50 messages sorted by message_id DESC
-- → Hits exactly ONE shard. Fast.
Why Composite Keys Are Powerful

A single-column shard key forces you to choose: group by user (all user data together, but one celebrity user creates a hotspot) or group by time (even distribution, but a user's data is scattered everywhere). A composite key gives you both: group related data together AND distribute load evenly. Discord gets fast per-channel reads AND even shard distribution — the holy grail of sharding.

ProsCons
Best of both worlds (locality + distribution)More complex to design and reason about
Prevents single-entity hotspotsQueries spanning multiple buckets hit multiple shards
Natural time-based data lifecycle (archive old buckets)Requires careful bucket sizing (too small = too many, too large = hotspots)
Think First

You're building a social media platform. Users post content, follow each other, and have a timeline feed. What would you choose as the shard key? Consider: user_id (all of a user's data on one shard), post_id (evenly distributed but a user's posts are scattered), or something else?

Instagram chose user_id because most queries are "show me this user's photos" or "show me this user's profile." If all of a user's data is on one shard, those queries hit exactly one shard. The trade-off: celebrity users create hotspots. Instagram handles this with caching in front of the shard.
Section 7

Going Deeper — The Hard Problems of Sharding

You've split 80M users across 4 shards. Traffic doubles in 6 months and you need 8 shards. With naive hash(key) % 4 becoming % 8, roughly 50% of all keys change shards — that's 40 million rows physically moving between machines. And what about that JOIN you used to run in 5ms? It now scatter-gathers across every shard, ballooning to 80ms. These are the hard problems that separate "I know what sharding is" from "I can operate a sharded system in production."

Think First

You have 4 shards. Keys are routed via hash(key) % 4. Now you add a 5th shard and switch to hash(key) % 5. A key that previously hashed to 17 went to shard 1 (17 % 4 = 1). Where does it go now? What percentage of ALL keys change shards?

17 % 5 = 2 — it moves to shard 2. In general, going from N to N+1 shards with naive modulo: roughly (N-1)/N of all keys change shards. From 4 to 5: ~80% of keys move. That's why consistent hashing exists — it moves only ~20%.

With naive hash sharding (hash(key) % N), adding one shard changes where most keys land. Going from 4 shards to 5: hash(key) % 4 becomes hash(key) % 5. A key that hashed to 17 went to shard 1 (17 % 4 = 1) but now goes to shard 2 (17 % 5 = 2). Roughly 80% of keys change shards. That means moving 80% of your data between machines — a migration nightmare.

Consistent hashingA hashing technique where adding or removing a node only requires moving K/N keys on average (where K = total keys, N = total nodes). The trick: both keys AND nodes are hashed onto a ring (0 to 2^32). Each key is assigned to the nearest node clockwise on the ring. Adding a node only affects keys between the new node and its predecessor. Originally described by Karger et al. in 1997 for web caching. Used by DynamoDB, Cassandra, Riak, and Akka Cluster. fixes this. Instead of % N, imagine a circle (ring) from 0 to 2^32. Both shards and keys are hashed onto this ring. Each key is assigned to the nearest shard clockwise. When you add a new shard, it lands at one point on the ring and only "steals" keys from its immediate neighbor — not from everyone.

Before: 4 Shards on Ring S0 S1 S2 S3 k1→S1 k2→S2 k3→S3 After: Add Shard 4 S0 S1 S2 S3 S4 NEW k1→S4 k2→S2 (same) k3→S3 (same) Naive % N: move ~80% of keys Consistent hashing: move only K/N keys (~20%)

The math: with consistent hashing and N shards, adding one shard moves only K/N keys (where K = total keys). With 4 shards and 80M keys, adding a 5th shard moves ~16M keys (20%) instead of ~64M keys (80%). That's a 4x reduction in data movement.

Virtual nodes make it even better. Instead of placing each shard at one point on the ring, you place it at 100–200 points (virtual nodes). This ensures even distribution even with few physical shards. DynamoDBAmazon's fully managed NoSQL database that uses consistent hashing internally. Each table's partition key is hashed to determine placement. DynamoDB automatically splits and merges partitions as load changes. It targets <10ms latency at any scale. Run aws dynamodb describe-table --table-name MyTable to see partition count., Cassandra, and Riak all use virtual nodes in production.

Real command: In Cassandra, virtual nodes (vnodes) are configured in cassandra.yaml: num_tokens: 128. Each physical node gets 128 positions on the ring. When you add a node, Cassandra automatically streams the right data to it: nodetool status shows token ownership, nodetool move rebalances manually.

Before sharding, this query was instant:

cross-shard-problem.sql
-- BEFORE SHARDING: One database, one JOIN, ~5ms
SELECT o.order_id, o.total, u.name, u.email
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.created_at > '2025-01-01'
ORDER BY o.total DESC
LIMIT 100;

-- AFTER SHARDING: orders and users might be on DIFFERENT shards
-- The router must:
-- 1. Send the query to ALL N shards (scatter)
-- 2. Wait for ALL N responses
-- 3. Merge and sort ALL results in memory (gather)
-- 4. Return top 100

-- With 64 shards:
-- Best case (single shard, key in WHERE): ~5ms (same as before)
-- Worst case (scatter-gather, no shard key filter): ~5ms × 64 = 320ms
--   + merge overhead + network latency per shard

-- Vitess handles this automatically:
-- vtgate parses the query, sees it needs scatter-gather,
-- fans out to all vttablets, merges results
-- But it's still O(N) — no way around the physics

The key insight: queries that include the shard key are fast (they hit exactly one shard). Queries that don't include the shard key require scatter-gather across all shards. This is why choosing the right shard key is so critical — it determines which queries are fast (single-shard) and which are slow (scatter-gather).

Single-Shard Query (~5ms) WHERE user_id=35M Shard 1 Shard 0 Shard 2 Shard 3 ✓ 1 shard 5ms latency Scatter-Gather Query (~80ms) ORDER BY total Shard 0 Shard 1 Shard 2 Shard 3 ✗ ALL shards 5ms × 4 + merge = ~80ms Rule: Include the shard key in every query. Scatter-gather is O(N) where N = shard count.

Mitigation strategies: (1) Denormalize — embed user data in the orders table so you don't need JOINs. (2) Maintain a global index — a separate unsharded table for cross-shard lookups. (3) Use CitusA PostgreSQL extension for distributed databases. SELECT create_distributed_table('orders', 'user_id'); shards automatically. SELECT create_reference_table('countries'); replicates small tables to every shard so JOINs with them are local. Citus can co-locate tables: if orders and users are both sharded by user_id, JOINs between them stay on one shard. Run EXPLAIN to see which shards a query hits. reference tables for small lookup tables (countries, categories) — they're replicated to every shard so JOINs are local.

Your 4-shard cluster has been running for a year. Shard 2 is 80% full while Shard 0 is only 40% full. Or you need to grow from 4 to 8 shards because write volume doubled. Either way, you need to move data between shards without taking the system offline. This is rebalancing, and it's one of the hardest operational challenges in distributed databases.

There are three main approaches:

Strategy How It Works Downtime Used By
Double-and-split Double shard count (4→8). Each old shard splits into two. Stream half its data to the new partner. Zero (dual-writes during migration) Vitess, Instagram
Virtual shard migration Create many virtual shards (e.g., 1024) mapped to few physical nodes. Move virtual shards between nodes. Zero (move one virtual shard at a time) DynamoDB, Cassandra
Background copy + cutover Copy data in background, dual-write during copy, switch routing once caught up. Brief cutover pause (~seconds) Custom solutions, Citus
The consistent hashing advantage: With hash(key) % N, going from 4 to 5 shards moves ~80% of data. With consistent hashing, it moves ~20%. With 1024 virtual shards mapped to 4 physical nodes, adding a 5th node just reassigns ~200 virtual shards — and the data streams in the background. This is how DynamoDB scales to millions of requests/sec without downtime.
vitess-rebalance.sh
# Vitess: Split shard "-80" into "-40" and "40-80"
# This is how YouTube/Shopify scale their MySQL clusters

# 1. Create the target shards (empty MySQL instances)
vtctl CreateShard commerce/-40
vtctl CreateShard commerce/40-80

# 2. Start vttablets for new shards
# (launches MySQL + Vitess agent on each)

# 3. Begin the split — Vitess streams data from source to targets
vtctl Reshard -- --source_shards='-80' --target_shards='-40,40-80' Create
vtctl Reshard -- --source_shards='-80' --target_shards='-40,40-80' SwitchTraffic

# 4. During SwitchTraffic:
#    - Reads switch first (safe, idempotent)
#    - Writes switch second (brief pause, usually <1 second)
#    - vtgate automatically routes to new shards

# 5. Clean up the old shard once verified
vtctl Reshard -- --source_shards='-80' --target_shards='-40,40-80' Complete

The gold standard for sharded systems is shared-nothingAn architecture where each node has its own CPU, RAM, storage, and network. No node shares any hardware resource with any other node. If Shard 3 crashes, Shards 0, 1, and 2 keep running at full speed. Contrast with "shared-disk" (Oracle RAC, where multiple nodes share a SAN) or "shared-memory" (a single multi-core server). Shared-nothing is what PostgreSQL+Citus, Cassandra, CockroachDB, and virtually all modern distributed databases use. — every shard is a completely independent database with its own CPU, its own RAM, its own disk. No shared resources. This means:

  • Fault isolation: If Shard 2 crashes, Shards 0, 1, and 3 continue serving traffic. Only users on Shard 2 are affected. With 64 shards, a single shard failure impacts roughly 1.5% of users.
  • Linear scaling: Adding a shard adds proportional capacity. No shared bottleneck (like a shared SAN or shared lock manager) limits growth.
  • Independent maintenance: You can upgrade Shard 0 to PostgreSQL 17 while Shard 1 stays on 16. Rolling upgrades, zero downtime.
Shared-Nothing (Modern) Shard 0 CPU+RAM+Disk (independent) Shard 1 CPU+RAM+Disk (independent) Shard 2 CPU+RAM+Disk (independent) ✓ Shard 1 dies? Others keep running. ✓ Linear scaling, no shared bottleneck. Shared-Disk (Legacy) Node 0 CPU+RAM Node 1 CPU+RAM Node 2 CPU+RAM SHARED SAN STORAGE (Oracle RAC) ✗ SAN dies? ALL nodes go down. ✗ SAN bandwidth = scaling ceiling.
This is why every modern sharded system is shared-nothing. PostgreSQL + Citus, MongoDB, Cassandra, CockroachDB, TiDB, Vitess — all shared-nothing. The only major shared-disk database still in production is Oracle RAC, and even Oracle is moving toward shared-nothing with their Autonomous Database on Exadata.
Section 8

Variations — Not All Splitting Is Sharding

Interviews love this question: "what's the difference between partitioning and sharding?" Most candidates get it wrong — and the confusion costs real architectural decisions. You can split a 600M-row table into 4 chunks on the same server (partitioning) or across 4 different servers (sharding). Same split, wildly different trade-offs. Let's untangle them.

Horizontal vs. Vertical Partitioning

Horizontal partitioning splits rows. Table has 80M rows? Split into 4 partitions of 20M rows each. Every partition has the same columns, just different rows. This is what sharding does across servers, but you can also do it within a single server (PostgreSQL table partitioning).

Vertical partitioning splits columns. Your users table has 50 columns, but most queries only need 5 of them (id, name, email, created_at, status). Move the rarely-used 45 columns (address, bio, preferences, avatar_url, etc.) to a separate table. Now the "hot" table fits entirely in RAM, and queries on common columns are faster because each row is smaller.

Horizontal (Split Rows) users (80M rows) id | name | email ... row 1 ... row 80M (all columns, all rows) Partition 0: rows 1–20M same columns Partition 1: rows 20M–40M Partition 2: rows 40M–60M Vertical (Split Columns) users (50 columns) id|name|email|bio|addr |prefs|avatar|...|col50 (all columns, all rows) users_core (hot) id|name|email|status users_profile (cold) id|bio|addr|prefs|avatar Horizontal = same columns, fewer rows Used for: sharding, table partitioning Vertical = same rows, fewer columns Used for: hot/cold splits, microservices
Key distinction: Horizontal partitioning within one server is called partitioning. Horizontal partitioning across multiple servers is called sharding. The split logic is similar but the operational implications are completely different — sharding adds network hops, distributed transactions, and cross-shard query complexity.

PostgreSQL 10+ supports declarative partitioningA built-in PostgreSQL feature where you define a parent table and PostgreSQL automatically routes rows to the correct child partition based on the partition key. Supports RANGE, LIST, and HASH partitioning. The query planner uses "partition pruning" to skip irrelevant partitions during queries. Check it: EXPLAIN SELECT * FROM orders WHERE created_at > '2025-01-01'; — you'll see only relevant partitions scanned. natively. This is NOT sharding — all partitions live on the same server. But it solves a different problem: query performance on very large tables. Instead of scanning 600M rows, PostgreSQL can prune partitions and scan only the relevant 50M.

pg-partition.sql
-- Create a partitioned orders table by date range
CREATE TABLE orders (
    id          BIGSERIAL,
    user_id     BIGINT NOT NULL,
    total       DECIMAL(10,2),
    created_at  TIMESTAMPTZ NOT NULL,
    status      VARCHAR(20)
) PARTITION BY RANGE (created_at);

-- Create partitions for each quarter
CREATE TABLE orders_2025_q1 PARTITION OF orders
    FOR VALUES FROM ('2025-01-01') TO ('2025-04-01');
CREATE TABLE orders_2025_q2 PARTITION OF orders
    FOR VALUES FROM ('2025-04-01') TO ('2025-07-01');
CREATE TABLE orders_2025_q3 PARTITION OF orders
    FOR VALUES FROM ('2025-07-01') TO ('2025-10-01');
CREATE TABLE orders_2025_q4 PARTITION OF orders
    FOR VALUES FROM ('2025-10-01') TO ('2026-01-01');

-- Query automatically uses partition pruning:
EXPLAIN SELECT * FROM orders WHERE created_at > '2025-06-01';
-- Output shows: only orders_2025_q3 and orders_2025_q4 scanned
-- orders_2025_q1 and q2 are SKIPPED entirely

-- Hash partitioning (PostgreSQL 11+):
CREATE TABLE users (
    id      BIGSERIAL,
    name    TEXT,
    email   TEXT
) PARTITION BY HASH (id);

CREATE TABLE users_p0 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE users_p1 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 1);
CREATE TABLE users_p2 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 2);
CREATE TABLE users_p3 PARTITION OF users FOR VALUES WITH (MODULUS 4, REMAINDER 3);
-- Each partition holds ~25% of users, evenly distributed
Feature PostgreSQL Partitioning Sharding (Citus/Vitess)
Where data lives Same server, different files Different servers entirely
Write scaling No (same CPU/WAL) Yes (each shard = own CPU/WAL)
Read scaling Marginal (partition pruning) Yes (parallel across shards)
JOINs Normal (same database) Cross-shard = scatter-gather
Transactions Normal ACID Distributed 2PC needed
Maintenance VACUUM per partition, DROP old partitions Per-shard maintenance, rolling upgrades
Complexity Low (just DDL) High (routing, rebalancing, monitoring)
Start with partitioning, graduate to sharding. If your problem is "queries are slow on a 600M row table" but write volume is fine, PostgreSQL partitioning solves it without any distributed systems complexity. Only move to sharding when write volume exceeds what a single server can handle.

Logs, events, metrics, and audit trails have a unique property: they're append-only and time-ordered. You almost never update old logs. You almost always query recent ones. This makes time-based partitioning a perfect fit: each partition holds one day (or one week, or one month) of data. Queries for "last 7 days" only scan 7 partitions. Old partitions can be archived or deleted with a single DROP TABLE — no DELETE needed, no VACUUM needed.

This is how TimescaleDBA PostgreSQL extension for time-series data. It automatically creates and manages time-based partitions (called "chunks"). SELECT create_hypertable('metrics', 'time', chunk_time_interval => INTERVAL '1 day'); turns a regular table into an auto-partitioned hypertable. Each chunk is a separate PostgreSQL table under the hood. Queries automatically prune to relevant chunks. TimescaleDB also supports data retention policies: SELECT add_retention_policy('metrics', INTERVAL '90 days'); automatically drops chunks older than 90 days. works, and it's what Discord does with their message buckets. Each bucket (10 days of messages) can be independently archived, moved to cold storage, or deleted when it ages out.

time-partition.sql
-- Time-based partitioning for application logs
CREATE TABLE app_logs (
    id          BIGSERIAL,
    timestamp   TIMESTAMPTZ NOT NULL,
    level       VARCHAR(10),
    message     TEXT,
    service     VARCHAR(50)
) PARTITION BY RANGE (timestamp);

-- Monthly partitions
CREATE TABLE app_logs_2025_01 PARTITION OF app_logs
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE app_logs_2025_02 PARTITION OF app_logs
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- ... one per month

-- Retention: drop data older than 90 days (instant, no DELETE + VACUUM)
DROP TABLE app_logs_2024_12;  -- Gone. Instantly. No IO overhead.

-- TimescaleDB makes this automatic:
-- CREATE EXTENSION timescaledb;
-- SELECT create_hypertable('app_logs', 'timestamp',
--     chunk_time_interval => INTERVAL '1 day');
-- SELECT add_retention_policy('app_logs', INTERVAL '90 days');
-- That's it. Chunks auto-create and auto-delete.

-- Elasticsearch (the most common log store) does the same with Index Lifecycle Management:
-- PUT _ilm/policy/logs_policy
-- { "phases": {
--     "hot": { "actions": { "rollover": { "max_age": "1d" }}},
--     "delete": { "min_age": "90d", "actions": { "delete": {} }}
-- }}
Why DROP TABLE beats DELETE: Deleting 100M rows from a table takes minutes, generates WAL, and requires VACUUM to reclaim space. Dropping a partition is instant — PostgreSQL just removes the file from disk. For time-series data, this is the single biggest operational advantage of partitioning. Companies like Datadog and Grafana Labs rely on this for managing petabytes of metrics data.
Think First

You're building an analytics dashboard that queries the last 30 days of user events. The events table has 2 billion rows spanning 3 years of data. Would you use sharding or partitioning? What's the shard/partition key?

Writes are moderate (~5K/sec), so a single server can handle them. The problem is query speed on 2B rows. Time-based partitioning by month means "last 30 days" hits exactly 1–2 partitions instead of scanning 2B rows. No need for the complexity of sharding here.
Section 9

At Scale — How the Giants Actually Shard

Instagram routes 1 billion users across 64 PostgreSQL shards using a Snowflake ID trick. Discord stores 120 billion messages across thousands of ScyllaDB nodes using a composite key. Shopify hides sharding entirely behind Vitess so developers never see it. MongoDB automates everything with an internal balancer. Four companies, four different architectures — each one shaped by their specific data access patterns.

Instagram — 64 PostgreSQL Shards, Snowflake IDs

Instagram needed to handle over a billion users, and they chose the most straightforward approach: shard PostgreSQL by user_id. Every user's photos, comments, likes, and stories live on the same shard. No cross-shard queries needed for "show me this user's feed."

Here's the math: 1 billion users across 64 logical shards = roughly 15.6 million users per shard. Each shard is a separate PostgreSQL instance running on its own hardware with its own connection pool, its own pg_partman config for time-based sub-partitioning, and its own replication setup.

The clever part is how they route requests. Instead of looking up "which shard is user 482,391 on?" in some mapping table, they embed the shard ID directly into every ID they generate. Instagram uses a Snowflake-style ID generator where the shard ID is baked into the bits of the ID itself. Given any photo ID or comment ID, you can extract the shard number with a bit shift — no lookup required.

instagram-routing.sql
-- Instagram's routing logic (simplified)
-- Shard = user_id % 64
-- user_id = 482391 → 482391 % 64 = 7 → Shard 7

-- Snowflake ID structure (64 bits):
-- [timestamp 41 bits][shard_id 13 bits][sequence 10 bits]

-- Given photo_id = 1375782301427_07_0042:
--   shard_id = 07 → route to shard-07.instagram.internal

-- pg_partman handles sub-partitioning within each shard:
SELECT partman.create_parent(
  'public.photos',
  'created_at',
  'native',
  'monthly'
);
-- Result: photos_2024_01, photos_2024_02, ... auto-created

-- Migration: split shard_03 into shard_03a + shard_03b
-- Step 1: Create new shard with logical replication
-- Step 2: Start replicating shard_03 → shard_03b
-- Step 3: Once caught up, update routing: user_id % 128
-- Step 4: Cut over, stop old replication
Instagram: user_id % 64 → Shard Routing GET /user/482391 user_id = 482391 Shard Router 482391 % 64 = 7 Shard 0 Shard 1 ... Shard 7 ✓ Shard 8 ... Shard 62 Shard 63 Snowflake ID (64 bits) timestamp (41 bits) shard_id (13 bits) seq (10 bits)
Why 64? Power of two makes modulo operations fast (just a bitwise AND). 64 shards at ~15M users each means each PostgreSQL instance holds about 60–80 GB of data — small enough to fit the working set in RAM, big enough that you don't drown in coordination overhead.

Discord's problem is different from Instagram's. Instagram shards by user — neat and clean. Discord has to shard messages, and messages belong to channels, not users. A single server like MrBeast's has 40+ million members, and every message in #general goes to the same channel_id. If you shard only by channel_id, that one channel becomes a massive hotspotA hotspot occurs when one shard receives disproportionately more traffic than others. In Discord's case, a popular channel like MrBeast's #general (40M+ members) would receive thousands of writes per second, while most channels get near zero. Hotspots defeat the entire purpose of sharding: even distribution of load..

Discord's solution: a composite shard key of (channel_id, bucket), where a bucket represents a 10-day time window. Old messages from January 2023 in #general live on a different partition than this week's messages. This means the "hot" partition (current bucket) for a massive channel might still be heavy, but at least the historical data is spread across many partitions and can be archived to cold storage.

In 2022, Discord migrated from Cassandra to ScyllaDBA drop-in replacement for Cassandra, written in C++ instead of Java. Discord switched because Cassandra's garbage collection pauses caused unpredictable tail latencies — sometimes a read that should take 5ms would take 500ms. ScyllaDB doesn't use a garbage collector, so tail latencies are much more predictable. Same data model, same query language (CQL), dramatically better P99 latency. because Cassandra's Java garbage collection pauses caused unpredictable latency spikes. The shard key stayed the same — they swapped the engine, not the architecture.

discord-messages.cql
-- Discord's message table (simplified CQL)
CREATE TABLE messages (
  channel_id  bigint,
  bucket      int,        -- 10-day window: floor(epoch_days / 10)
  message_id  bigint,     -- Snowflake ID (contains timestamp)
  author_id   bigint,
  content     text,
  PRIMARY KEY ((channel_id, bucket), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC);

-- Partition key = (channel_id, bucket)
-- Each partition = one channel's messages for 10 days
-- "Show latest messages in #general" → hits 1 partition (current bucket)
-- "Search last 90 days" → hits 9 partitions (9 buckets)

-- Hot channels (servers with 1M+ members) get dedicated nodes:
-- Discord's internal tooling detects hotspots and moves
-- heavy partitions to beefier ScyllaDB nodes with more RAM/CPU

-- Total scale: 120+ billion messages across thousands of nodes
The Taylor Swift Problem. When Taylor Swift drops an album and her Discord server explodes with activity, one channel might get 10,000 writes/sec while the average channel gets 0.1 writes/sec. Discord handles this by detecting hot partitions in real-time and migrating them to dedicated high-spec nodes. You can't prevent hotspots entirely — you can only detect and react to them.

VitessAn open-source database clustering system originally built at YouTube (Google) to shard MySQL. It sits between your application and MySQL, transparently routing queries to the right shard. Your app connects to vtgate (the Vitess proxy) and thinks it's talking to a single MySQL. Vitess handles sharding, replication, schema migrations, and failover. Used by Shopify, Slack, GitHub, and Square. gives Shopify something powerful: the application code doesn't know sharding exists. The app connects to vtgate (the Vitess proxy) and sends normal SQL. vtgate reads the VSchemaA JSON/YAML config file that tells Vitess how your database is sharded. It defines which column is the shard key for each table, how to hash values to keyspace IDs, and which tables are "reference tables" (replicated to every shard). Think of it as the sharding blueprint — Vitess reads it and routes queries accordingly. (a JSON config defining the shard key for each table) and routes the query to the correct shard.

Shopify's model: each merchant is a logical shard. All of Kylie Cosmetics' data (orders, products, customers, inventory) lives on one shard. Nike's data lives on another. This means queries within one store never cross shards — a massive simplification. They group shards into "pods" (clusters of shards) for operational isolation, so a misbehaving merchant can't affect others.

vitess-commands.sh
# 1. Define how to shard the 'orders' table by merchant_id
vtctl ApplyVSchema -- --vschema='{
  "tables": {
    "orders": {
      "column_vindexes": [{
        "column": "merchant_id",
        "name": "hash"
      }]
    }
  }
}' commerce

# 2. Reshard: split 2 shards into 4 (when load doubles)
vtctl Reshard -- --source_shards='-80,80-' \
  --target_shards='-40,40-80,80-c0,c0-' commerce.reshard1

# 3. vtgate routes transparently — app sees one "database"
mysql -h vtgate.shopify.internal -u app -p commerce \
  -e "SELECT * FROM orders WHERE merchant_id = 12345;"
# vtgate: hash(12345) → keyspace_id → shard '-40' → route there

# 4. Check shard health
vtctl ListAllTablets | grep commerce

When a merchant grows massive (think Gymshark during a flash sale), Vitess can reshard on the fly: split one shard into two using the vtctl Reshard command. It creates the new shards, replicates data over, and cuts traffic with near-zero downtime. No application changes required.

MongoDB has sharding built right into the database — you don't need an external proxy. It uses three components: mongos (the router your app connects to), config servers (store the mapping of which chunks live on which shard), and the shards themselves (each a replica set). When you issue a query, mongos checks the config servers to find out which shard has the data, then routes the query there.

The data is split into chunks (default 128 MB each). MongoDB's balancerA background process that runs on the config servers and monitors chunk distribution across shards. When one shard has significantly more chunks than others (default threshold: 8 chunks difference), the balancer automatically migrates chunks from the heavy shard to a lighter one. You can see it in action with sh.status(). automatically moves chunks between shards to keep the distribution even. You don't manually assign data to shards — MongoDB handles it.

mongodb-sharding.js
# Connect to mongos (the router)
mongosh --host mongos1.prod.internal

# Enable sharding on the database
sh.enableSharding("ecommerce")

# Shard the orders collection by hashed user_id
sh.shardCollection("ecommerce.orders", { "user_id": "hashed" })
# "hashed" = MongoDB hashes user_id for even distribution
# vs { "user_id": 1 } = range-based (good for range queries)

# Check shard distribution
sh.status()
# --- Sharding Status ---
# shards:
#   { "_id": "shard0", "host": "shard0/rs0-0:27017,rs0-1:27017" }
#   { "_id": "shard1", "host": "shard1/rs1-0:27017,rs1-1:27017" }
#   { "_id": "shard2", "host": "shard2/rs2-0:27017,rs2-1:27017" }
# databases:
#   { "_id": "ecommerce", "partitioned": true }
#     ecommerce.orders
#       shard key: { "user_id": "hashed" }
#       chunks:
#         shard0: 42
#         shard1: 41
#         shard2: 43
#       → balanced ✓

# When you query WITH the shard key, mongos routes to 1 shard:
db.orders.find({ user_id: 12345 })  // → targeted query (fast)

# When you query WITHOUT the shard key, mongos asks ALL shards:
db.orders.find({ status: "pending" })  // → scatter-gather (slow)
MongoDB Sharded Cluster App Server mongos (router) stateless proxy Config Servers chunk → shard map Shard 0 (RS) 42 chunks • 5.4 GB Shard 1 (RS) 41 chunks • 5.3 GB Shard 2 (RS) 43 chunks • 5.5 GB Balancer (automatic) Moves chunks when Δ > 8
Targeted vs. Scatter-Gather. If your query includes the shard key (user_id), mongos sends it to exactly one shard — that's a targeted query and it's fast. If your query doesn't include the shard key (WHERE status = 'pending'), mongos has to ask every shard and merge results — that's scatter-gather and it's slow. This is why picking the right shard key matters so much.
Section 10

Anti-Lessons — Things That Sound Smart but Aren't

These are mistakes that look reasonable on a whiteboard but cause real pain in production. Each one comes from a company that learned the hard way.

Sounds smart: "We're going to be huge. Let's shard now so we don't have to migrate later."

Reality: Shopify ran on a single MySQL server for years before they needed Vitess. Instagram launched on one PostgreSQL and didn't shard until they had over a million users. Premature sharding means every feature now has to consider cross-shard queries, every migration touches multiple databases, and your team spends time on shard routing instead of product features.

Foursquare sharded MongoDB early in 2010 and spent two engineers full-time for six months fixing shard key issues, rebalancing hotspots, and debugging cross-shard queries — time that could have built features to compete with Yelp.

The Rule: If a single vertically-scaled database with read replicas, connection pooling, and proper indexing can handle your load — don't shard. Sharding is a 10× complexity multiplier for a problem you might not have yet. A $7,200/month RDS instance handles a LOT of traffic.

Sounds smart: "Shard by created_at so recent data is together."

Reality: All new data goes to the same shard — the one handling "today." Shards holding last year's data sit idle while the current shard melts down. You've created the worst kind of hotspot: one that gets worse every day.

This is exactly the problem Discord would've had if they sharded only by timestamp. Instead, they use a composite key: (channel_id, time_bucket). The channel_id spreads writes across many partitions, and the time bucket keeps each partition from growing forever. The time component helps with range queries ("last 10 days"), but it's not the only component — that's the key insight.

Timestamp-Only Sharding: All Writes Hit One Shard Jan 2024 shard 0 writes/sec Feb 2024 shard 0 writes/sec ... Jan 2026 shard 2 writes/sec Mar 2026 shard 50K writes/sec CPU: 100% — HOTSPOT Fix: composite key (entity_id + time_bucket) distributes writes across shards

Sounds smart: "We'll just JOIN across shards — the proxy handles it."

Reality: A cross-shard JOIN means the router sends a query to every shard, waits for every response, then merges the results in memory. With 64 shards, that's 64 parallel queries. If each shard takes 5ms, the best case is 5ms (perfect parallelism). The real case? The slowest shard determines your latency, and with 64 shards, P99 is much worse than any single shard's P99.

Real-world numbers: a scatter-gather query across 64 shards typically takes 500ms–2s for anything involving aggregation, sorting, or LIMIT. That's fine for a nightly analytics job. It's not fine for a page load.

The fix: Denormalize (store redundant data so JOINs aren't needed) or co-locate (put related data on the same shard). Instagram puts a user's photos, comments, and likes all on the user's shard — so "show me this user's feed" never crosses shards. The cost: if you need "all comments on photo X from different users," that's a cross-shard query. You pick your trade-off.
Section 11

Common Mistakes — 6 Traps That Bite Real Teams

These aren't theoretical pitfalls. Every one of these has caused production incidents, midnight pages, and painful migrations at real companies.

The mistake: Sharding by country. There are ~200 countries, but the US, India, and Brazil together handle 80% of your traffic. You end up with 3 overloaded shards and 197 nearly empty ones.

Why it hurts: Low-cardinality shard keys create permanent imbalance. You can't fix it by adding shards — the data is inherently skewed. The US shard will always be 50× bigger than the Iceland shard.

The fix: Use a high-cardinality key like user_id (billions of unique values) with a hash function for even distribution. If you must include geography, make it a secondary index, not the shard key.

The mistake: Using shard = user_id % N where N is the number of shards. When you add a shard (N changes from 64 to 65), almost every user's shard assignment changes. You'd have to migrate ~98% of your data.

Why it hurts: Naive modulo-based routing makes adding capacity a massive, risky operation. You can't just "add a shard" — you have to reshuffle billions of rows.

The fix: Use consistent hashing (only ~1/N of data moves when adding a shard) or a proxy like Vitess that handles resharding for you. Vitess's Reshard command creates new shards, replicates data via VReplication, and cuts over with minimal downtime. Plan for growth before you need it.

The mistake: Assuming BEGIN; UPDATE shard_1...; UPDATE shard_2...; COMMIT; will work like a normal transaction.

Why it hurts: There's no single transaction manager across independent databases. Two-phase commit (2PC)A protocol where a coordinator asks all participants to "prepare" (lock resources and promise to commit). If all say yes, the coordinator says "commit." If any says no, everyone rolls back. The problem: if the coordinator crashes after "prepare" but before "commit," all participants are stuck holding locks until the coordinator recovers. This can block other transactions for minutes or hours. exists but it's slow (extra round-trip, lock holding) and fragile (coordinator failure blocks everything). Saga patternInstead of one big transaction, break it into a chain of local transactions with compensating actions. "Deduct inventory" → "Charge payment" → "Create shipment." If "Charge payment" fails, run a compensating action: "Restore inventory." Eventual consistency, not strict ACID, but much simpler than 2PC at scale. provides eventual consistency but adds complexity.

The fix: Design your shard key so that related data lives on the same shard. If an order always belongs to one user, shard by user_id and keep orders on the user's shard. The goal: 99% of transactions should be shard-local.

The mistake: You shard orders by user_id, then need to look up an order by order_id. Your index on order_id is local to each shard — it only knows about orders on that shard. To find order #ABC123, you'd have to query all 64 shards.

Why it hurts: Every "lookup by non-shard-key" becomes a scatter-gather query. If this happens on every page load, your P99 latency skyrockets.

The fix: Maintain a small global lookup table (order_id → shard_id) in a shared service or cache. Or embed the shard_id in the order_id itself (Instagram's Snowflake approach). Or use Vitess vindexes — secondary indexes that map a column value to a keyspace ID, letting vtgate route non-shard-key lookups to the right shard.

The mistake: You set up sharding and forget about it. Six months later, shard_03 is at 450 GB while shard_12 has 80 GB. One shard is slow, others are wasting resources.

Why it hurts: Uneven shard sizes mean uneven performance. The overloaded shard becomes the bottleneck for any query that touches it, and it fills up disk faster than others.

monitor-shards.sql
-- PostgreSQL: check shard sizes (run on each shard)
SELECT pg_size_pretty(pg_database_size('shard_01'));  -- 120 GB
SELECT pg_size_pretty(pg_database_size('shard_03'));  -- 450 GB ← problem!

-- MongoDB: check chunk distribution
sh.status()
-- shard0: 42 chunks
-- shard1: 41 chunks
-- shard2: 180 chunks  ← balancer might be stuck!

-- Vitess: check tablet health
vtctl ListAllTablets | awk '{print $1, $4, $5}'
-- shard-00 SERVING master
-- shard-01 SERVING master
-- shard-02 NOT_SERVING master  ← investigate!

The fix: Set up alerts for shard size divergence. If any shard is 2× the median size, trigger a resharding review. Automate the check — don't rely on engineers remembering to look.

The mistake: Every shard gets the same hardware, same connection limits, same resource allocation. But shard_07 holds Taylor Swift's channel with 40M followers, while shard_42 holds channels with 50 members each.

Why it hurts: Hot shards need more CPU, more RAM, and more network bandwidth. Treating them the same as cold shards means the hot ones are constantly throttled.

The fix: Discord dedicates extra nodes to hot shards — they detect hotspots with real-time monitoring and dynamically reassign resources. In Vitess, you can place hot shards on beefier tablets. In MongoDB, you can use zones to pin hot collections to specific shard hardware. The key: observe, then adapt — not "set and forget."

Section 12

Interview Playbook — "Design the Sharding Strategy"

The prompt: "Design the sharding strategy for a social media platform with 500M users, 10B posts, and 50K writes/sec." Here's how a junior, mid-level, and senior engineer would answer — and what the interviewer is looking for at each level.

Junior Answer: The Basics

"I'd shard the users table by user_id using hash-based sharding. user_id % N where N is the number of shards. Posts would be on the same shard as their author, since posts.user_id is the foreign key."

What the interviewer notices:

  • Good: Chose user_id (high cardinality), co-located posts with users
  • Missing: No mention of what happens when you add shards (modulo problem), no discussion of hotspots (celebrity users), no consideration of cross-shard queries (feeds from multiple users)

Upgrade tip: After stating the basic approach, immediately say: "The risk with user_id % N is that adding shards requires reshuffling data. I'd use consistent hashing instead to minimize data movement."

Mid-Level Answer: Trade-offs and Nuance

"I'd shard by user_id with consistent hashing so we can add shards without massive data migration. Posts and comments go on the author's shard. For the news feed — which pulls posts from many users — I'd pre-compute a materialized feed per user, stored on the user's shard. This way, reading your feed is a single-shard query."

"For celebrity users with millions of followers, I'd handle them differently: their posts get written to a fanout service that pushes to followers' feeds asynchronously, rather than doing scatter-gather at read time."

What the interviewer notices:

  • Good: Consistent hashing, fan-out-on-write for feeds, celebrity hotspot awareness
  • Good: Materialized feeds avoid cross-shard reads
  • Missing: No mention of operational concerns (monitoring, rebalancing), no discussion of the shard split procedure, no global secondary indexes

Senior Answer: Full Operational Picture

"Let me walk through the full strategy."

Shard key: user_id with consistent hashing (virtual nodes for balance). Posts co-located with author. Feeds are pre-computed and stored on the follower's shard using fan-out-on-write with async workers."

Capacity planning: 500M users / 50K writes/sec. A single PostgreSQL handles ~15K writes/sec. That's a minimum of 4 shards for writes. But I'd plan for 16 shards to give 4× headroom, keep each shard under 200 GB, and leave room for rebalancing without hitting capacity."

Hotspot mitigation: Top 0.01% users (celebrities) get flagged. Their follower fan-out is rate-limited and processed by a dedicated queue, not the main write path. Shard health is monitored per-shard: if any shard's P99 exceeds 2× median, alert fires and we evaluate a split."

Cross-shard concerns: Lookup by post_id needs a global index — either a small Redis mapping (post_id → shard_id) or embed shard_id in the post_id (Snowflake approach). Search and analytics go through a separate pipeline (Elasticsearch/ClickHouse), not the sharded OLTP database."

Rebalancing plan: Using Vitess or similar proxy. When we need to split shard_03 — start VReplication to a new shard, verify data consistency, cut traffic, retire old shard. Zero downtime. Runbook documented, tested quarterly."

What the interviewer notices:

  • Excellent: Complete picture — from math to monitoring to migration
  • Excellent: Quantified the capacity (15K writes/shard, 16 shards for headroom)
  • Excellent: Separated OLTP from analytics, addressed operational concerns
Section 13

Practice Exercises

Try these yourself before opening the solutions. The best way to internalize sharding is to work through the math and the trade-offs on your own.

Easy Exercise 1: Calculate Shard Count

Your e-commerce platform has 500 GB of order data and handles 50,000 writes/sec during peak. A single database node can comfortably handle 200 GB of data and 15,000 writes/sec. How many shards do you need? Which constraint is the bottleneck — storage or throughput?

Calculate independently for each constraint: ceil(500 / 200) = ? for storage, ceil(50000 / 15000) = ? for throughput. The answer is the larger of the two.

Storage: 500 GB / 200 GB per node = 3 shards (rounded up from 2.5).

Throughput: 50,000 writes/sec / 15,000 writes/sec per node = 4 shards (rounded up from 3.33).

Answer: 4 shards minimum, bottlenecked by write throughput. In practice, you'd use 8 shards for 2× headroom — because you don't want to be at 100% capacity when the next flash sale hits. Each shard: ~62.5 GB data, ~6,250 writes/sec — comfortable on standard hardware.

Medium Exercise 2: Design a Shard Key for E-Commerce

Your platform has three main entities: users (80M), orders (600M), and products (2M). Users browse products and place orders. The most common queries are: (a) "show me my orders" (by user), (b) "show me this order" (by order_id), (c) "show me this product's reviews" (by product). Design the shard key. Which entities get co-located? What's the trade-off?

Think about which queries must be fast (single-shard) vs. which can tolerate scatter-gather. With 2M products and 80M users, which table is small enough to replicate everywhere?

Shard key: user_id. Users and their orders co-locate on the same shard. Query (a) "show me my orders" is a single-shard query — fast.

Products: reference table. At 2M rows (~500 MB), products are small enough to replicate to every shard. This is what Vitess calls a "reference table." Every shard has a full copy, so product lookups are always local.

Query (b) "show me this order" by order_id: embed the shard_id in the order_id (Snowflake style), or maintain a small global index in Redis.

Query (c) "product reviews": reviews are scattered across user shards. Options: (1) denormalize — store a reviews summary on the products reference table, (2) use a separate reviews service that aggregates from all shards, or (3) accept the scatter-gather cost since product pages are cacheable.

Trade-off: Optimized for user-centric queries (most common), at the cost of cross-shard queries for product-centric aggregations (less common, cacheable).

Medium Exercise 3: Implement Consistent Hashing

Implement a basic consistent hash ring in Python. Your ring should support: (a) adding a node, (b) removing a node, (c) finding which node a key maps to. Use 150 virtual nodes per physical node for balance. Test with 10,000 keys and verify that adding a 4th node to a 3-node ring only moves ~25% of keys.

consistent_hash.py
import hashlib, bisect

class ConsistentHashRing:
    def __init__(self, virtual_nodes=150):
        self.virtual_nodes = virtual_nodes
        self.ring = {}         # hash_value → node_name
        self.sorted_keys = []  # sorted list of hash values

    def _hash(self, key: str) -> int:
        """Hash a string to a position on the ring (0 to 2^32-1)."""
        # TODO: use hashlib.md5 or sha256, return int

    def add_node(self, node: str):
        """Add a node with virtual_nodes copies on the ring."""
        # TODO: for each virtual node, compute hash and insert

    def remove_node(self, node: str):
        """Remove all virtual nodes for this node."""
        # TODO: remove entries from ring and sorted_keys

    def get_node(self, key: str) -> str:
        """Find which node a key maps to."""
        # TODO: hash the key, bisect into sorted_keys, return node

# Test: add 3 nodes, hash 10K keys, add 4th node, count moves
consistent_hash_solution.py
import hashlib, bisect

class ConsistentHashRing:
    def __init__(self, virtual_nodes=150):
        self.virtual_nodes = virtual_nodes
        self.ring = {}
        self.sorted_keys = []

    def _hash(self, key: str) -> int:
        digest = hashlib.md5(key.encode()).hexdigest()
        return int(digest, 16) % (2**32)

    def add_node(self, node: str):
        for i in range(self.virtual_nodes):
            vnode_key = f"{node}#vn{i}"
            h = self._hash(vnode_key)
            self.ring[h] = node
            bisect.insort(self.sorted_keys, h)

    def remove_node(self, node: str):
        for i in range(self.virtual_nodes):
            vnode_key = f"{node}#vn{i}"
            h = self._hash(vnode_key)
            del self.ring[h]
            self.sorted_keys.remove(h)

    def get_node(self, key: str) -> str:
        if not self.ring:
            return None
        h = self._hash(key)
        idx = bisect.bisect_right(self.sorted_keys, h)
        if idx == len(self.sorted_keys):
            idx = 0  # wrap around the ring
        return self.ring[self.sorted_keys[idx]]

# Test
ring = ConsistentHashRing(virtual_nodes=150)
for node in ["shard-0", "shard-1", "shard-2"]:
    ring.add_node(node)

# Hash 10K keys with 3 nodes
before = {f"key-{i}": ring.get_node(f"key-{i}") for i in range(10000)}

# Add 4th node
ring.add_node("shard-3")
after = {f"key-{i}": ring.get_node(f"key-{i}") for i in range(10000)}

# Count moves
moves = sum(1 for k in before if before[k] != after[k])
print(f"Keys moved: {moves}/10000 = {moves/100:.1f}%")
# Expected: ~25% (1/4 of keys move to the new node)
Hard Exercise 4: Plan a Shard Split

Shard_03 in your Vitess cluster is at 90% capacity (380 GB data, 14,000 writes/sec out of a 15K max). Walk through the full Vitess Reshard process to split it into two shards. What's the sequence of steps? What's the risk at each stage? How do you validate data consistency before cutting over?

Step 1: Provision new tablets. Create shard_03a and shard_03b with fresh MySQL instances. Risk: misconfiguration. Validate: vtctl ListAllTablets shows both as NOT_SERVING.

Step 2: Start VReplication. vtctl Reshard --source_shards='30-40' --target_shards='30-38,38-40' commerce.split03. Vitess starts copying data from shard_03 to both new shards using binary log replication. Risk: replication lag if shard_03 is under heavy write load. Monitor with vtctl VReplicationExec.

Step 3: Verify data consistency. Once replication catches up (lag < 1 sec), run vtctl VDiff commerce.split03. This compares row-by-row between source and targets. Risk: false positives from in-flight writes. Run during low-traffic window.

Step 4: Cut over reads. vtctl SwitchReads --tablet_types=rdonly,replica commerce.split03. Read traffic moves to new shards. Write traffic still goes to old shard. Risk: low (reads are idempotent). Validate: monitor P99 latency on new shards.

Step 5: Cut over writes. vtctl SwitchWrites commerce.split03. Brief write pause (~1-2 seconds) while Vitess updates the routing. Risk: highest-risk step. Have a rollback command ready. Validate: write success rate returns to 100% within 5 seconds.

Step 6: Clean up. Once satisfied, vtctl Complete commerce.split03. Removes old shard_03's VReplication streams and marks it for decommission.

Expert Exercise 5: Cross-Shard Query Strategy

Your orders are sharded by user_id across 64 shards. The business team needs: "Find all orders from users in California placed in the last 30 days." Users don't have a state column on the orders table. Design a query strategy that returns results in under 500ms. Consider: where does the state data live? Can you avoid scatter-gather?

Option A: Denormalize. Add user_state to the orders table. Now the query is: SELECT * FROM orders WHERE user_state = 'CA' AND created_at > NOW() - INTERVAL '30 days'. Still a scatter-gather (64 shards), but each shard query is fast (indexed on user_state, created_at). Expected: ~50ms per shard, ~80ms total with parallelism. Under 500ms.

Option B: Pre-computed view. Maintain a separate (unsharded) table: california_orders_30d updated by a change data capture (CDC) pipeline. The business query hits this one table, not 64 shards. Near-instant, but adds pipeline complexity and ~30 second data staleness.

Option C: Analytics sidecar. Stream all order events to ClickHouse or Elasticsearch via Kafka. The analytics query runs there, not on the OLTP shards. Best for complex queries, but adds a full analytics pipeline.

Recommended: Option A for this specific query (low effort, fast enough). Option C if the business team has many ad-hoc queries. Never Option "just scatter-gather with JOINs across 64 shards" — that's the path to 2-second page loads.

Section 14

Cheat Cards — Quick Reference

Shard Key Rules
High cardinality (user_id, order_id)
Even distribution (hash for balance)
Query-aligned (most queries include it)
Immutable (never changes after insert)

Bad keys: country, status, boolean
Capacity Math
shards_storage = ceil(total_GB / GB_per_node)
shards_writes  = ceil(peak_wps / wps_per_node)
shards_needed  = max(storage, writes)
shards_actual  = shards_needed × 2  (headroom)

Instagram: 1B users / 64 = 15.6M/shard
Routing Strategies
Hash:      shard = hash(key) % N
Range:     shard = lookup(key_range)
Directory: shard = lookup_table[key]
Consistent hash: minimal reshuffling

Best for most: hash + consistent hashing
Rebalancing Tools
Vitess:   vtctl Reshard (VReplication)
MongoDB:  balancer (auto, chunk-based)
Citus:    rebalance_table_shards()
Manual:   logical replication + cutover

Goal: zero-downtime, <1% data loss risk
Hotspot Fixes
Composite key (entity + time bucket)
Dedicated nodes for hot shards
Rate-limit writes to hot keys
Shard splitting (Vitess Reshard)
Cache hot reads (Redis in front)

Discord: detect + migrate hot partitions
Cross-Shard Survival
Avoid: co-locate related data
Denormalize: duplicate for local reads
Reference tables: small, replicated everywhere
Global index: Redis lookup (key → shard)
Analytics: separate pipeline (ClickHouse)

2PC = last resort, Saga = eventual consistency
Section 15

Connected Topics — Where to Go Next