The Art of Sharding — Part III: The Expert Zone (Hard)

Table of Contents

4. Rebalancing Strategies

Rebalancing Process

sequenceDiagram
    participant Admin
    participant Controller
    participant OldShard
    participant NewShard
    participant Clients

    Admin->>Controller: Add new shard
    Controller->>Controller: Calculate data to move
    Controller->>Clients: Start dual-write mode

    Note over OldShard,NewShard: Phase 1: Copy Data
    Controller->>OldShard: Read data range
    OldShard->>Controller: Return data
    Controller->>NewShard: Write data

    Note over OldShard,NewShard: Phase 2: Sync Recent Changes
    Controller->>OldShard: Get delta changes
    OldShard->>Controller: Return changes
    Controller->>NewShard: Apply changes

    Controller->>Clients: Update routing<br/>(switch to new shard)
    Controller->>OldShard: Delete migrated data
    OldShard->>Controller: Confirm deletion
    Controller->>Admin: Rebalancing complete

Fixed Partitions Strategy

graph TB
    subgraph "Initial: 2 Physical Servers, 8 Partitions"
        S1[Server 1<br/>P0 P1 P2 P3]
        S2[Server 2<br/>P4 P5 P6 P7]
    end

    subgraph "After Adding Server 3"
        S3[Server 1<br/>P0 P1 P2]
        S4[Server 2<br/>P3 P4 P5]
        S5[Server 3<br/>P6 P7]
    end

    S1 --> S3
    S2 --> S4
    S2 --> S5

    style S5 fill:#9f9,stroke:#333,stroke-width:3px

When Rebalancing is Needed

  • Adding new shards (scaling out)
  • Removing failed/decommissioned shards
  • Fixing hot partitions
  • Optimizing uneven data distribution

Techniques

Fixed Number of Partitions

  • Create many more partitions than nodes upfront
  • Assign multiple partitions to each node
  • Move entire partitions when rebalancing
  • Example: Elasticsearch, Kafka (partition reassignment)

Virtual Partitioning

  • Shard keys map to virtual shards
  • Virtual shards map to fewer physical servers
  • Change mappings without code modifications
  • Reduces impact during rebalancing

Automatic Rebalancing

  • Database handles migration automatically
  • Consistent hashing enables self-organization
  • Examples: Cassandra, DynamoDB, Vitess

Rebalancing Challenges

graph TB
    subgraph "Rebalancing Risks"
        R1[Heavy I/O<br/>Operations]
        R2[Downtime<br/>Risk]
        R3[Consistency<br/>Concerns]
        R4[Time<br/>Consuming]
    end

    subgraph "Mitigation Strategies"
        M1[Throttle<br/>Migration Rate]
        M2[Dual-Write<br/>Mode]
        M3[Atomic<br/>Switchover]
        M4[Background<br/>Migration]
    end

    R1 --> M1
    R2 --> M2
    R3 --> M3
    R4 --> M4

    style R1 fill:#f99,stroke:#333
    style R2 fill:#f99,stroke:#333
    style R3 fill:#f99,stroke:#333
    style R4 fill:#f99,stroke:#333
    style M1 fill:#9f9,stroke:#333
    style M2 fill:#9f9,stroke:#333
    style M3 fill:#9f9,stroke:#333
    style M4 fill:#9f9,stroke:#333

5. Cross-Shard Operations

The Challenge

Queries requiring data from multiple shards are complex and expensive.

Scatter-Gather Pattern

sequenceDiagram
    participant Client
    participant Router
    participant S1 as Shard 1
    participant S2 as Shard 2
    participant S3 as Shard 3
    participant S4 as Shard 4

    Client->>Router: SELECT * FROM users<br/>WHERE age > 25

    Note over Router,S4: Scatter Phase (Parallel)
    par Query All Shards
        Router->>S1: Query
        Router->>S2: Query
        Router->>S3: Query
        Router->>S4: Query
    end

    par Responses
        S1-->>Router: 100 results
        S2-->>Router: 150 results
        S3-->>Router: 120 results
        S4-->>Router: 130 results
    end

    Note over Router: Gather Phase<br/>Merge & Sort Results
    Router->>Router: Aggregate 500 results
    Router->>Client: Return merged results

Code Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import threading
from concurrent.futures import ThreadPoolExecutor
import time

class ScatterGatherQuery:
    def __init__(self, num_shards):
        self.shards = {f'shard_{i}': self._create_sample_data(i)
                      for i in range(num_shards)}

    def _create_sample_data(self, shard_id):
        """Create sample user data for each shard"""
        return [
            {'user_id': f's{shard_id}_u{i}', 'age': 20 + (i % 40),
             'name': f'User_{shard_id}_{i}'}
            for i in range(100)
        ]

    def query_shard(self, shard_name, condition):
        """Query a single shard (simulates network latency)"""
        time.sleep(0.1)  # Simulate network latency
        results = [user for user in self.shards[shard_name]
                  if condition(user)]
        print(f"{shard_name}: found {len(results)} results")
        return results

    def scatter_gather(self, condition):
        """Execute query across all shards in parallel"""
        print(f"Executing scatter-gather across {len(self.shards)} shards...")
        start_time = time.time()

        # Scatter: Query all shards in parallel
        with ThreadPoolExecutor(max_workers=len(self.shards)) as executor:
            futures = {
                shard_name: executor.submit(self.query_shard, shard_name, condition)
                for shard_name in self.shards
            }

            # Gather: Collect results
            all_results = []
            for shard_name, future in futures.items():
                all_results.extend(future.result())

        elapsed_time = time.time() - start_time
        print(f"Gathered {len(all_results)} total results in {elapsed_time:.2f}s")
        return all_results

# Example usage
sg = ScatterGatherQuery(num_shards=4)

# Query: Find all users older than 30
results = sg.scatter_gather(lambda user: user['age'] > 30)
print(f"\nFound {len(results)} users older than 30")

# Show some results
print("\nSample results:")
for user in results[:5]:
    print(f"  {user['user_id']}: {user['name']}, age {user['age']}")

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Executing scatter-gather across 4 shards...
shard_0: found 60 results
shard_1: found 60 results
shard_2: found 60 results
shard_3: found 60 results
Gathered 240 total results in 0.11s

Found 240 users older than 30

Sample results:
  s0_u11: User_0_11, age 31
  s0_u12: User_0_12, age 32
  s0_u13: User_0_13, age 33
  s0_u14: User_0_14, age 34
  s0_u15: User_0_15, age 35

Patterns

Scatter-Gather

  • Query all relevant shards in parallel
  • Aggregate results at application layer
  • Trade-off: Higher latency, more network traffic

Denormalization

  • Duplicate data across shards to avoid joins
  • Store complete records where needed
  • Trade-off: Storage overhead, consistency complexity

Application-Level Joins

  • Fetch data from multiple shards separately
  • Join in application memory
  • Trade-off: Application complexity, memory usage

Cross-Shard Join Example

graph TB
    subgraph "Problem: Cross-Shard Join"
        P1[Users Shard 1]
        P2[Users Shard 2]
        P3[Orders Shard 1]
        P4[Orders Shard 2]

        P5[Want to JOIN<br/>users and orders<br/>by user_id]
    end

    subgraph "Solution: Denormalization"
        S1[Orders Shard 1<br/>Includes user data:<br/>user_id, user_name, user_email]
        S2[Orders Shard 2<br/>Includes user data:<br/>user_id, user_name, user_email]
    end

    P5 --> S1
    P5 --> S2

    style P5 fill:#f99,stroke:#333
    style S1 fill:#9f9,stroke:#333
    style S2 fill:#9f9,stroke:#333

6. Shard Management

Routing Architectures

graph TB
    subgraph "Client-Side Routing"
        C1[Smart Client]
        C1 -->|Knows shard locations| S1[(Shard 1)]
        C1 -->|Direct connection| S2[(Shard 2)]
        C1 -->|Direct connection| S3[(Shard 3)]
    end

    subgraph "Proxy-Based Routing"
        C2[Thin Client]
        C2 --> Proxy[Query Router/Proxy]
        Proxy --> S4[(Shard 1)]
        Proxy --> S5[(Shard 2)]
        Proxy --> S6[(Shard 3)]
    end

    style C1 fill:#faa,stroke:#333
    style Proxy fill:#9f9,stroke:#333,stroke-width:3px

Configuration Service

Tracks shard topology and routing information:

  • What it stores: Shard locations, mappings, health status
  • Technologies: ZooKeeper, etcd, Consul
  • Purpose: Service discovery, configuration management

Routing Layer (Query Router)

sequenceDiagram
    participant C as Client
    participant R as Router
    participant CS as Config Service
    participant S1 as Shard 1
    participant S2 as Shard 2

    Note over C,S2: Initialization
    R->>CS: Get shard topology
    CS->>R: Return shard map

    Note over C,S2: Query Routing
    C->>R: INSERT user_id=12345
    R->>R: hash(12345) % 2 = 1
    R->>S2: Route to Shard 2
    S2->>R: Success
    R->>C: Success

    Note over C,S2: Config Changes
    CS->>R: Shard topology updated
    R->>R: Refresh routing table

Client-Side Routing

  • Application knows shard topology
  • No proxy overhead
  • Example: Cassandra native drivers

Proxy-Based Routing (Server-Side)

  • Centralized routing logic
  • Client unaware of sharding
  • Examples: ProxySQL, Vitess, MongoDB mongos, MySQL Router

7. High Availability per Shard

Shard Replication Architecture

graph TB
    subgraph "Shard 1 Replica Set"
        S1P[Primary]
        S1R1[Replica 1]
        S1R2[Replica 2]

        S1P -->|Async<br/>Replication| S1R1
        S1P -->|Async<br/>Replication| S1R2
    end

    subgraph "Shard 2 Replica Set"
        S2P[Primary]
        S2R1[Replica 1]
        S2R2[Replica 2]

        S2P -->|Async<br/>Replication| S2R1
        S2P -->|Async<br/>Replication| S2R2
    end

    C[Client] -->|Writes| S1P
    C -->|Writes| S2P
    C -->|Reads| S1R1
    C -->|Reads| S2R1

    style S1P fill:#f96,stroke:#333,stroke-width:3px
    style S2P fill:#f96,stroke:#333,stroke-width:3px
    style S1R1 fill:#9cf,stroke:#333
    style S1R2 fill:#9cf,stroke:#333
    style S2R1 fill:#9cf,stroke:#333
    style S2R2 fill:#9cf,stroke:#333

Auto-Failover Process

sequenceDiagram
    participant C as Coordinator
    participant P as Primary
    participant R1 as Replica 1
    participant R2 as Replica 2
    participant M as Monitor

    Note over P: Primary is healthy
    C->>P: Write request
    P->>C: Success

    Note over P: Primary fails!
    P-xC: Connection lost
    M->>P: Health check
    P-xM: No response
    M->>R1: Health check
    R1->>M: Healthy
    M->>R2: Health check
    R2->>M: Healthy

    Note over M,R1: Promote Replica 1
    M->>R1: Promote to Primary
    R1->>M: Promotion complete

    C->>R1: Retry write request
    R1->>C: Success

Replication

  • Primary-Replica (Master-Slave): Writes to primary, reads from replicas
  • Multi-Primary: Multiple write-capable nodes
  • Quorum-based: Require majority consensus for operations

Consensus Protocols

  • Raft: Leader election, log replication
  • Paxos: Distributed consensus
  • Purpose: Maintain consistency across replicas

8. Data Locality & Co-location

Tenant-Based Sharding

graph TB
    subgraph "Multi-Tenant Sharding"
        T1[Tenant A<br/>Small startup]
        T2[Tenant B<br/>Small startup]
        T3[Tenant C<br/>Enterprise]
        T4[Tenant D<br/>Medium company]
    end

    subgraph "Shards"
        S1[(Shard 1<br/>Tenant A + B<br/>All their data)]
        S2[(Shard 2<br/>Tenant C<br/>Dedicated)]
        S3[(Shard 3<br/>Tenant D<br/>All their data)]
    end

    T1 --> S1
    T2 --> S1
    T3 --> S2
    T4 --> S3

    style S2 fill:#faa,stroke:#333,stroke-width:3px

Entity Group Co-location

graph LR
    subgraph "Co-located: Good ✓"
        G1[(Shard 1<br/>user_123)]
        G2[User Profile<br/>user_123]
        G3[Orders<br/>for user_123]
        G4[Preferences<br/>for user_123]

        G2 --> G1
        G3 --> G1
        G4 --> G1
    end

    subgraph "Scattered: Bad ✗"
        B1[(Shard 1<br/>User Profile)]
        B2[(Shard 2<br/>Orders)]
        B3[(Shard 3<br/>Preferences)]

        B4[Need JOIN<br/>across 3 shards!]
        B4 -.-> B1
        B4 -.-> B2
        B4 -.-> B3
    end

    style G1 fill:#9f9,stroke:#333
    style B4 fill:#f99,stroke:#333

Benefits

  • Performance: Minimize network hops
  • Transactions: Enable ACID within shard
  • Cost: Reduce cross-shard query overhead

9. Indexing Strategies

Local vs Global Secondary Indexes

graph TB
    subgraph "Local Secondary Index"
        L1[(Shard 1<br/>Users A-M<br/>+ Local Index)]
        L2[(Shard 2<br/>Users N-Z<br/>+ Local Index)]

        LQ[Query: email=alice@email.com<br/>Must check ALL shards]
        LQ -.->|Scan| L1
        LQ -.->|Scan| L2
    end

    subgraph "Global Secondary Index"
        G1[(Shard 1<br/>Users A-M)]
        G2[(Shard 2<br/>Users N-Z)]
        G3[(Global Email Index<br/>Separate sharding)]

        GQ[Query: email=alice@email.com<br/>Check index only]
        GQ -->|Direct lookup| G3
        G3 -->|Found in Shard 1| G1
    end

    style LQ fill:#faa,stroke:#333
    style GQ fill:#9f9,stroke:#333

Local Secondary Indexes

How it works: Each shard maintains its own indexes

Advantages:

  • Fast writes (update only one shard)
  • Simpler consistency model
  • Lower coordination overhead

Disadvantages:

  • Queries may need to hit all shards (scatter-gather)
  • Slower reads for non-shard-key queries

Use case: Write-heavy workloads

Global Secondary Indexes

How it works: Index spans across all shards, separately partitioned

Advantages:

  • Fast reads (query only relevant index shards)
  • Efficient for non-shard-key queries

Disadvantages:

  • Slower writes (update multiple index shards)
  • Complex consistency management
  • Higher coordination overhead

Use case: Read-heavy workloads with diverse query patterns

Example: DynamoDB Global Secondary Indexes


10. Transaction Management

Single-Shard vs Cross-Shard Transactions

graph TB
    subgraph "Single-Shard Transaction (Fast)"
        ST1[BEGIN]
        ST2[UPDATE account_123<br/>balance -= 100]
        ST3[INSERT transaction_log]
        ST4[COMMIT]

        ST1 --> ST2 --> ST3 --> ST4

        STS[(Shard 1<br/>All in one shard<br/>ACID guaranteed)]
        ST2 -.-> STS
        ST3 -.-> STS
    end

    subgraph "Cross-Shard Transaction (Complex)"
        CT1[BEGIN]
        CT2[UPDATE account_123<br/>in Shard 1]
        CT3[UPDATE account_456<br/>in Shard 2]
        CT4[2-Phase Commit<br/>or Saga Pattern]
        CT5[COMMIT]

        CT1 --> CT2 --> CT3 --> CT4 --> CT5

        CTS1[(Shard 1)]
        CTS2[(Shard 2)]
        CT2 -.-> CTS1
        CT3 -.-> CTS2
    end

    style STS fill:#9f9,stroke:#333,stroke-width:3px
    style CTS1 fill:#faa,stroke:#333
    style CTS2 fill:#faa,stroke:#333

Two-Phase Commit (2PC)

sequenceDiagram
    participant C as Coordinator
    participant S1 as Shard 1
    participant S2 as Shard 2
    participant S3 as Shard 3

    Note over C,S3: Phase 1: Prepare
    C->>S1: Prepare transaction
    C->>S2: Prepare transaction
    C->>S3: Prepare transaction

    S1->>C: Ready to commit
    S2->>C: Ready to commit
    S3->>C: Ready to commit

    Note over C: All shards ready

    Note over C,S3: Phase 2: Commit
    C->>S1: Commit
    C->>S2: Commit
    C->>S3: Commit

    S1->>C: Committed
    S2->>C: Committed
    S3->>C: Committed

Saga Pattern

sequenceDiagram
    participant O as Order Service
    participant P as Payment Service
    participant I as Inventory Service
    participant S as Shipping Service

    Note over O,S: Happy Path
    O->>O: Create order (local tx)
    O->>P: Process payment (local tx)
    P->>O: Payment successful
    O->>I: Reserve inventory (local tx)
    I->>O: Inventory reserved
    O->>S: Schedule shipping (local tx)
    S->>O: Shipping scheduled

    Note over O,S: Failure & Compensation
    O->>O: Create order
    O->>P: Process payment
    P->>O: Payment successful
    O->>I: Reserve inventory
    I->>O: Inventory failed (out of stock)

    Note over O,S: Compensating Transactions
    O->>P: Refund payment
    O->>O: Cancel order

Single-Shard Transactions

  • Full ACID guarantees within a shard
  • Use traditional database transactions
  • Fast and simple
  • Design principle: Shard key should enable single-shard transactions

Cross-Shard Transactions

Two-Phase Commit (2PC)

  • Coordinator ensures atomic commits across shards
  • Disadvantages: Blocking, coordinator is SPOF, high latency

Saga Pattern

  • Break transaction into local transactions per shard
  • Compensating transactions for rollback
  • Eventual consistency model

11. Monitoring & Observability

Monitoring Dashboard

graph TB
    subgraph "Metrics Collection"
        M1[Data Distribution<br/>Size, row count per shard]
        M2[Performance<br/>Latency, throughput]
        M3[Health<br/>Uptime, replication lag]
        M4[Hotspot Detection<br/>Query patterns]
    end

    subgraph "Alerting"
        A1[Imbalanced Shards<br/>(>20% difference)]
        A2[High Latency<br/>(>100ms p99)]
        A3[Replication Lag<br/>(>5 seconds)]
        A4[Hot Partition<br/>(>2x avg load)]
    end

    M1 --> A1
    M2 --> A2
    M3 --> A3
    M4 --> A4

    style A1 fill:#f99,stroke:#333
    style A2 fill:#f99,stroke:#333
    style A3 fill:#f99,stroke:#333
    style A4 fill:#f99,stroke:#333

Key Metrics

Data Distribution

  • Size per shard
  • Row count per shard
  • Growth rate per shard
  • Identify imbalanced shards

Performance Metrics

  • Query latency per shard
  • Throughput (reads/writes per second)
  • CPU, memory, disk I/O per shard
  • Cache hit rates

Health Monitoring

  • Shard availability/uptime
  • Replication lag
  • Failed queries
  • Connection pool status

Tools

  • Prometheus + Grafana for metrics
  • Distributed tracing (Jaeger, Zipkin)
  • Database-specific tools (MongoDB Atlas, Vitess VTGate metrics)