Building a Highly Available, Distributed Snowflake Worker ID Manager with Raft


In our system’s core transaction pipeline, the order ID generation logic initially relied on MySQL’s AUTO_INCREMENT. This solution was simple and reliable, but during traffic spikes, the database’s single-point-of-write bottleneck became the entire system’s Achilles’ heel. Migrating to a Redis INCR-based solution temporarily alleviated the throughput problem, but Redis’s persistence strategies (AOF/RDB) still carried a risk of data loss in extreme failure scenarios. This could lead to ID rollbacks, which are absolutely unacceptable in a transaction system.

The community-favorite Snowflake algorithm was our next target. Its structure of timestamp + data center ID + machine ID + sequence number is ideal—fully decentralized and extremely high-performance. However, the core challenge shifted to the allocation and management of the “machine ID” (Worker ID). Manual configuration is high-cost and error-prone for operations, while using a coordination service like Zookeeper to handle dynamic allocation introduces a new external dependency and potential point of failure.

The real pain point was this: we needed a component that could manage the lifecycle of Snowflake Worker IDs automatically, fault-tolerantly, and without a single point of failure. Rather than bringing in a heavyweight Zookeeper cluster, why not solve the problem at its root by implementing a lightweight, specialized Worker ID manager ourselves using a consensus protocol? This would not only give us complete control over a core component’s logic but also allow our team to build practical engineering experience with distributed consensus protocols. We decided to build this manager using the Raft protocol, a consensus algorithm that is generally easier to understand and implement than Paxos.

Architectural Vision and State Machine Definition

Our goal is to build a Raft cluster of 3 or 5 nodes. This cluster’s sole responsibility is to maintain a globally consistent state that records which Worker ID is held by which service instance (Node) and the expiration time of its lease.

The system’s core interaction flow is as follows:

  1. When a service instance starts, it sends an AcquireWorkerID request to the Raft cluster.
  2. The Raft cluster’s Leader node processes the request, allocates an ID from the available pool, and sets a lease for it.
  3. The Leader replicates a log entry containing {workerId, nodeId, leaseExpiry} to a majority of the Follower nodes.
  4. Once the log entry is committed by the majority, the state machine applies the log, and the Leader returns the allocated Worker ID to the service instance.
  5. The service instance holding the ID must periodically send a RenewLease request to the Raft cluster to extend its lease, which is essentially committing a log entry that updates the leaseExpiry.
  6. If a service instance crashes or is network-partitioned, its lease will eventually expire. The Leader node, through an internal periodic check, will reclaim the expired Worker ID, making it available for reallocation.

Here is the UML sequence diagram for this process:

sequenceDiagram
    participant Client as Service Instance
    participant Leader
    participant Follower1
    participant Follower2

    Client->>+Leader: AcquireWorkerID(nodeId)
    Leader->>Leader: Choose available workerId
    Leader->>Follower1: AppendEntries({workerId, nodeId, lease})
    Leader->>Follower2: AppendEntries({workerId, nodeId, lease})
    Follower1-->>Leader: ACK
    Follower2-->>Leader: ACK
    Note right of Leader: Log committed by majority
    Leader->>Leader: Apply to state machine
    Leader-->>-Client: Return workerId

    loop Heartbeat
        Client->>+Leader: RenewLease(workerId, nodeId)
        Leader->>Follower1: AppendEntries(update lease)
        Leader->>Follower2: AppendEntries(update lease)
        Follower1-->>Leader: ACK
        Follower2-->>Leader: ACK
        Leader-->>-Client: Lease Renewed
    end

The core of this architecture is that Raft guarantees serialization and consistency for all worker ID allocation and reclamation operations. Even if the Leader node fails or a network partition occurs, as long as a majority of the cluster nodes are alive, the system can perform a proper failover and continue to provide service, ensuring that a single Worker ID is never held by two instances simultaneously.

Core Implementation: From Raft Node to Application Layer

We’ll use Go to implement this system; its powerful concurrency primitives and standard library are well-suited for building this kind of network service. We’ll skip the implementation details of a full Raft library and instead focus on how to integrate the Raft protocol with our Worker ID management business logic. We’ll assume we have a basic Raft implementation that exposes a Propose(data []byte) method for submitting data to the cluster.

1. The Snowflake Generator Itself

First, here’s a standard implementation of a Snowflake ID generator. This is the “ammunition” we’ll ultimately be using.

// snowflake.go
package main

import (
	"errors"
	"sync"
	"time"
)

const (
	workerIDBits     = 10
	sequenceBits     = 12
	timestampBits    = 41
	workerIDShift    = sequenceBits
	timestampShift   = sequenceBits + workerIDBits
	sequenceMask     = int64(-1) ^ (int64(-1) << sequenceBits)
	maxWorkerID      = int64(-1) ^ (int64(-1) << workerIDBits)
	twepoch          = int64(1672531200000) // 2023-00-01 00:00:00 UTC
)

var (
	ErrInvalidWorkerID      = errors.New("worker id must be greater than 0 and less than maxWorkerID")
	ErrTimestampBackwards   = errors.New("clock is moving backwards")
)

// Generator is the Snowflake ID generator.
type Generator struct {
	mu            sync.Mutex
	lastTimestamp int64
	workerID      int64
	sequence      int64
}

// NewGenerator creates a new Snowflake generator.
func NewGenerator(workerID int64) (*Generator, error) {
	if workerID < 0 || workerID > maxWorkerID {
		return nil, ErrInvalidWorkerID
	}
	return &Generator{
		workerID: workerID,
	}, nil
}

// NextID generates a new unique ID.
func (g *Generator) NextID() (int64, error) {
	g.mu.Lock()
	defer g.mu.Unlock()

	timestamp := time.Now().UnixMilli()

	if timestamp < g.lastTimestamp {
		// In a real production system, you might wait for the clock to catch up,
		// or if the skew is small, borrow from the future.
		// For simplicity, we return an error.
		return 0, ErrTimestampBackwards
	}

	if g.lastTimestamp == timestamp {
		g.sequence = (g.sequence + 1) & sequenceMask
		if g.sequence == 0 {
			// Sequence overflow, wait for next millisecond.
			for timestamp <= g.lastTimestamp {
				timestamp = time.Now().UnixMilli()
			}
		}
	} else {
		g.sequence = 0
	}

	g.lastTimestamp = timestamp

	id := ((timestamp - twepoch) << timestampShift) |
		(g.workerID << workerIDShift) |
		g.sequence

	return id, nil
}

This code is a production-grade Snowflake implementation, including a mutex for concurrency safety and handling for clock skew and sequence number overflow within the same millisecond.

2. Defining the Raft State Machine and Commands

The data that needs to be kept consistent across the Raft cluster is a map[int64]LeaseInfo. We need to define the commands that operate on this map; these commands will be replicated as the content of the Raft log.

// state.go
package main

import (
	"encoding/json"
	"time"
)

const (
	CmdAcquire = "ACQUIRE"
	CmdRenew   = "RENEW"
	CmdRelease = "RELEASE" // For graceful shutdown
)

// LeaseInfo holds the information about a worker ID lease.
type LeaseInfo struct {
	WorkerID  int64     `json:"worker_id"`
	NodeID    string    `json:"node_id"`
	ExpiresAt time.Time `json:"expires_at"`
}

// Command represents a command to be applied to the state machine.
type Command struct {
	Op       string `json:"op"`
	NodeID   string `json:"node_id"`
	WorkerID int64  `json:"worker_id,omitempty"` // omitempty for acquire
}

// WorkerIDManager is our state machine.
type WorkerIDManager struct {
	mu      sync.RWMutex
	leases  map[int64]LeaseInfo // The consensus state: workerID -> LeaseInfo
	nodeMap map[string]int64    // Helper map: nodeID -> workerID
	
	leaseDuration time.Duration
}

// NewWorkerIDManager creates a new state machine.
func NewWorkerIDManager(leaseDuration time.Duration) *WorkerIDManager {
	return &WorkerIDManager{
		leases:        make(map[int64]LeaseInfo),
		nodeMap:       make(map[string]int64),
		leaseDuration: leaseDuration,
	}
}

// Apply applies a command from the Raft log to the state machine.
// This function MUST be deterministic.
func (m *WorkerIDManager) Apply(cmdBytes []byte) interface{} {
	var cmd Command
	if err := json.Unmarshal(cmdBytes, &cmd); err != nil {
		// In production, this should be logged seriously.
		// A command that can't be unmarshalled is a critical bug.
		return err
	}
	
	m.mu.Lock()
	defer m.mu.Unlock()

	switch cmd.Op {
	case CmdAcquire:
		// Check if this node already has a lease.
		if existingID, ok := m.nodeMap[cmd.NodeID]; ok {
			// If lease is still valid, just return the existing ID.
			if m.leases[existingID].ExpiresAt.After(time.Now()) {
				return existingID
			}
		}
		
		// Find an available ID
		for i := int64(0); i <= maxWorkerID; i++ {
			if _, exists := m.leases[i]; !exists {
				lease := LeaseInfo{
					WorkerID:  i,
					NodeID:    cmd.NodeID,
					ExpiresAt: time.Now().Add(m.leaseDuration),
				}
				m.leases[i] = lease
				m.nodeMap[cmd.NodeID] = i
				return i
			}
		}
		return errors.New("no available worker ids")

	case CmdRenew:
		if existingID, ok := m.nodeMap[cmd.NodeID]; ok && existingID == cmd.WorkerID {
			if lease, leaseExists := m.leases[cmd.WorkerID]; leaseExists {
				lease.ExpiresAt = time.Now().Add(m.leaseDuration)
				m.leases[cmd.WorkerID] = lease
				return true
			}
		}
		return errors.New("lease not found or node mismatch")

	case CmdRelease:
		if existingID, ok := m.nodeMap[cmd.NodeID]; ok && existingID == cmd.WorkerID {
			delete(m.leases, cmd.WorkerID)
			delete(m.nodeMap, cmd.NodeID)
		}
		return nil
	
	default:
		return errors.New("unknown command op")
	}
}

The Apply function here is critical. It must be deterministic, meaning that given the same initial state and the same sequence of logs, all nodes must reach the exact same final state after execution. Note that Apply uses time.Now() internally, which is a common pitfall. In a real Raft implementation, the Leader node should determine the specific ExpiresAt timestamp when creating the log entry and include it in the Command struct. This guarantees that all Followers use the identical timestamp when applying the log. For simplicity, we’ll accept this minor inconsistency for now, but it must be fixed in a production system.

3. Service Layer Encapsulation

The service layer translates HTTP requests into Raft commands and submits them to the Raft cluster.

// server.go
package main

import (
	"encoding/json"
	"log"
	"net/http"
	"time"
)

// RaftNode is an interface representing our underlying Raft implementation.
type RaftNode interface {
	Propose(cmd []byte) (interface{}, error)
	IsLeader() bool
}

type Server struct {
	raftNode RaftNode
	manager  *WorkerIDManager // The server holds a reference to the state machine
	nodeID   string
}

func NewServer(nodeID string, raftNode RaftNode, manager *WorkerIDManager) *Server {
	return &Server{
		raftNode: raftNode,
		manager:  manager,
		nodeID:   nodeID,
	}
}

// handleAcquire handles the HTTP request to get a worker ID.
func (s *Server) handleAcquire(w http.ResponseWriter, r *http.Request) {
	if !s.raftNode.IsLeader() {
		// In a real system, we should redirect the client to the current leader.
		http.Error(w, "not the leader", http.StatusBadGateway)
		return
	}

	reqNodeID := r.URL.Query().Get("nodeId")
	if reqNodeID == "" {
		http.Error(w, "nodeId is required", http.StatusBadRequest)
		return
	}

	cmd := Command{
		Op:     CmdAcquire,
		NodeID: reqNodeID,
	}
	cmdBytes, _ := json.Marshal(cmd)

	result, err := s.raftNode.Propose(cmdBytes)
	if err != nil {
		log.Printf("Failed to propose command: %v", err)
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	
	// Respond to the client
	json.NewEncoder(w).Encode(map[string]interface{}{"worker_id": result})
}
// ... handleRenew and handleRelease would follow a similar pattern ...

This service layer code demonstrates how to decouple the business logic (HTTP requests) from the underlying Raft consensus module. Only the Leader node can handle write requests (Acquire, Renew). A Follower node receiving such a request should return an error or a redirect.

4. High-Throughput Audit Logging

The Snowflake algorithm can generate millions of IDs per second. If we wanted to audit every ID allocation (or every block of consumed sequence numbers), a traditional RDBMS would be immediately overwhelmed by the write load. This is a perfect use case for a columnar NoSQL database.

We’ll introduce a simple mechanism: whenever a service instance successfully acquires or renews a Worker ID, it will asynchronously send an audit record to a dedicated logging channel. The backend for this channel is a columnar database that supports high-concurrency writes, such as ClickHouse.

graph TD
    subgraph Service Instance
        A[ID Generator] -- generates ID --> B{Audit?}
    end
    subgraph Raft Cluster
        C[Leader]
        D[Follower]
        E[Follower]
        C <--> D
        C <--> E
    end

    ServiceInstance -- 1. Acquire/Renew WorkerID --> C
    
    B -- Yes --> F[Async Log Channel]
    F -- Batch Write --> G[(Columnar DB: ClickHouse)]

    style G fill:#f9f,stroke:#333,stroke-width:2px

The structure of an audit log might look like this:
{ event_timestamp, event_type, node_id, worker_id, trace_id }

The code for writing to ClickHouse might look something like this:

// audit_logger.go
package main

import (
	"context"
	"fmt"
	"github.com/ClickHouse/clickhouse-go/v2"
	"log"
	"time"
)

type AuditLog struct {
	Timestamp time.Time `ch:"timestamp"`
	EventType string    `ch:"event_type"`
	NodeID    string    `ch:"node_id"`
	WorkerID  int64     `ch:"worker_id"`
}

type ClickHouseLogger struct {
	conn clickhouse.Conn
	ch   chan AuditLog
}

func NewClickHouseLogger(addr string) (*ClickHouseLogger, error) {
	conn, err := clickhouse.Open(&clickhouse.Options{
		Addr: []string{addr},
		Auth: clickhouse.Auth{
			Database: "default",
			Username: "default",
			Password: "",
		},
	})
	if err != nil {
		return nil, err
	}

	// This is a simplified example. In production, you'd use a more robust batching mechanism.
	logger := &ClickHouseLogger{
		conn: conn,
		ch:   make(chan AuditLog, 10000), // Buffered channel
	}

	go logger.batchWriter() // Start the background writer

	return logger, nil
}

func (l *ClickHouseLogger) Log(event AuditLog) {
	select {
	case l.ch <- event:
		// Logged successfully to buffer
	default:
		// Buffer is full. In a real system, you'd handle this case
		// (e.g., drop, block, or write to a fallback file).
		log.Println("Audit log channel is full, dropping log.")
	}
}

func (l *ClickHouseLogger) batchWriter() {
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()
	
	batch := make([]AuditLog, 0, 1000)

	for {
		select {
		case logEntry := <-l.ch:
			batch = append(batch, logEntry)
			if len(batch) >= 1000 {
				l.flush(batch)
				batch = make([]AuditLog, 0, 1000) // Reset batch
			}
		case <-ticker.C:
			if len(batch) > 0 {
				l.flush(batch)
				batch = make([]AuditLog, 0, 1000) // Reset batch
			}
		}
	}
}

func (l *ClickHouseLogger) flush(batch []AuditLog) {
	ctx := context.Background()
	statement, err := l.conn.PrepareBatch(ctx, "INSERT INTO audit_logs")
	if err != nil {
		log.Printf("Failed to prepare batch: %v", err)
		return
	}
	
	for _, logEntry := range batch {
		err := statement.AppendStruct(&logEntry)
		if err != nil {
			log.Printf("Failed to append to batch: %v", err)
			return // Abort this batch
		}
	}

	if err := statement.Send(); err != nil {
		log.Printf("Failed to send batch: %v", err)
	} else {
		log.Printf("Flushed %d audit logs to ClickHouse", len(batch))
	}
}

This audit module’s design adheres to several important principles:

  1. Asynchronicity: The Log method is non-blocking. It returns immediately after writing to a buffered channel, minimizing performance impact on the main business logic (ID generation).
  2. Batch Writes: A background goroutine collects logs and writes them to ClickHouse in batches, governed by a strategy of every 5 seconds or every 1000 entries. This dramatically improves the database’s write throughput and is a best practice for using columnar stores.
  3. Separation of Concerns: The core logic for ID generation and management is completely decoupled from audit logging. If the ClickHouse cluster has issues, only auditing will be affected, not the core ID service.

Limitations and Future Iterations

While the manager we’ve built solves the core problem, there are several areas that need improvement for a production environment.

First, our Raft implementation is pedagogical. A production-grade Raft library (like hashicorp/raft or etcd/raft) would handle many more complex scenarios, such as log compaction, snapshotting, and membership changes (dynamically adding/removing nodes). Using one of these mature libraries directly is the wiser choice.

Second, the lease reclamation mechanism currently relies on the Leader node performing periodic scans. When the number of Worker IDs becomes very large, this could create a performance overhead. A more optimized data structure, such as a min-heap ordered by expiration time, could be considered to quickly find leases that need to be reclaimed.

Finally, in the current design, the service instances holding Worker IDs are responsible for proactively renewing their leases. This works well when an instance is healthy, but if an instance becomes unresponsive (the process is running but not functioning correctly), the lease might still be renewed. A more robust solution might require the Raft manager cluster to perform health checks on the service instances in reverse, combining this with the instance’s active heartbeats to jointly determine a lease’s validity. This adds complexity to the system but further enhances its robustness.


  TOC