In our system’s core transaction pipeline, the order ID generation logic initially relied on MySQL’s AUTO_INCREMENT. This solution was simple and reliable, but during traffic spikes, the database’s single-point-of-write bottleneck became the entire system’s Achilles’ heel. Migrating to a Redis INCR-based solution temporarily alleviated the throughput problem, but Redis’s persistence strategies (AOF/RDB) still carried a risk of data loss in extreme failure scenarios. This could lead to ID rollbacks, which are absolutely unacceptable in a transaction system.
The community-favorite Snowflake algorithm was our next target. Its structure of timestamp + data center ID + machine ID + sequence number is ideal—fully decentralized and extremely high-performance. However, the core challenge shifted to the allocation and management of the “machine ID” (Worker ID). Manual configuration is high-cost and error-prone for operations, while using a coordination service like Zookeeper to handle dynamic allocation introduces a new external dependency and potential point of failure.
The real pain point was this: we needed a component that could manage the lifecycle of Snowflake Worker IDs automatically, fault-tolerantly, and without a single point of failure. Rather than bringing in a heavyweight Zookeeper cluster, why not solve the problem at its root by implementing a lightweight, specialized Worker ID manager ourselves using a consensus protocol? This would not only give us complete control over a core component’s logic but also allow our team to build practical engineering experience with distributed consensus protocols. We decided to build this manager using the Raft protocol, a consensus algorithm that is generally easier to understand and implement than Paxos.
Architectural Vision and State Machine Definition
Our goal is to build a Raft cluster of 3 or 5 nodes. This cluster’s sole responsibility is to maintain a globally consistent state that records which Worker ID is held by which service instance (Node) and the expiration time of its lease.
The system’s core interaction flow is as follows:
- When a service instance starts, it sends an
AcquireWorkerIDrequest to the Raft cluster. - The Raft cluster’s Leader node processes the request, allocates an ID from the available pool, and sets a lease for it.
- The Leader replicates a log entry containing
{workerId, nodeId, leaseExpiry}to a majority of the Follower nodes. - Once the log entry is committed by the majority, the state machine applies the log, and the Leader returns the allocated Worker ID to the service instance.
- The service instance holding the ID must periodically send a
RenewLeaserequest to the Raft cluster to extend its lease, which is essentially committing a log entry that updates theleaseExpiry. - If a service instance crashes or is network-partitioned, its lease will eventually expire. The Leader node, through an internal periodic check, will reclaim the expired Worker ID, making it available for reallocation.
Here is the UML sequence diagram for this process:
sequenceDiagram
participant Client as Service Instance
participant Leader
participant Follower1
participant Follower2
Client->>+Leader: AcquireWorkerID(nodeId)
Leader->>Leader: Choose available workerId
Leader->>Follower1: AppendEntries({workerId, nodeId, lease})
Leader->>Follower2: AppendEntries({workerId, nodeId, lease})
Follower1-->>Leader: ACK
Follower2-->>Leader: ACK
Note right of Leader: Log committed by majority
Leader->>Leader: Apply to state machine
Leader-->>-Client: Return workerId
loop Heartbeat
Client->>+Leader: RenewLease(workerId, nodeId)
Leader->>Follower1: AppendEntries(update lease)
Leader->>Follower2: AppendEntries(update lease)
Follower1-->>Leader: ACK
Follower2-->>Leader: ACK
Leader-->>-Client: Lease Renewed
end
The core of this architecture is that Raft guarantees serialization and consistency for all worker ID allocation and reclamation operations. Even if the Leader node fails or a network partition occurs, as long as a majority of the cluster nodes are alive, the system can perform a proper failover and continue to provide service, ensuring that a single Worker ID is never held by two instances simultaneously.
Core Implementation: From Raft Node to Application Layer
We’ll use Go to implement this system; its powerful concurrency primitives and standard library are well-suited for building this kind of network service. We’ll skip the implementation details of a full Raft library and instead focus on how to integrate the Raft protocol with our Worker ID management business logic. We’ll assume we have a basic Raft implementation that exposes a Propose(data []byte) method for submitting data to the cluster.
1. The Snowflake Generator Itself
First, here’s a standard implementation of a Snowflake ID generator. This is the “ammunition” we’ll ultimately be using.
// snowflake.go
package main
import (
"errors"
"sync"
"time"
)
const (
workerIDBits = 10
sequenceBits = 12
timestampBits = 41
workerIDShift = sequenceBits
timestampShift = sequenceBits + workerIDBits
sequenceMask = int64(-1) ^ (int64(-1) << sequenceBits)
maxWorkerID = int64(-1) ^ (int64(-1) << workerIDBits)
twepoch = int64(1672531200000) // 2023-00-01 00:00:00 UTC
)
var (
ErrInvalidWorkerID = errors.New("worker id must be greater than 0 and less than maxWorkerID")
ErrTimestampBackwards = errors.New("clock is moving backwards")
)
// Generator is the Snowflake ID generator.
type Generator struct {
mu sync.Mutex
lastTimestamp int64
workerID int64
sequence int64
}
// NewGenerator creates a new Snowflake generator.
func NewGenerator(workerID int64) (*Generator, error) {
if workerID < 0 || workerID > maxWorkerID {
return nil, ErrInvalidWorkerID
}
return &Generator{
workerID: workerID,
}, nil
}
// NextID generates a new unique ID.
func (g *Generator) NextID() (int64, error) {
g.mu.Lock()
defer g.mu.Unlock()
timestamp := time.Now().UnixMilli()
if timestamp < g.lastTimestamp {
// In a real production system, you might wait for the clock to catch up,
// or if the skew is small, borrow from the future.
// For simplicity, we return an error.
return 0, ErrTimestampBackwards
}
if g.lastTimestamp == timestamp {
g.sequence = (g.sequence + 1) & sequenceMask
if g.sequence == 0 {
// Sequence overflow, wait for next millisecond.
for timestamp <= g.lastTimestamp {
timestamp = time.Now().UnixMilli()
}
}
} else {
g.sequence = 0
}
g.lastTimestamp = timestamp
id := ((timestamp - twepoch) << timestampShift) |
(g.workerID << workerIDShift) |
g.sequence
return id, nil
}
This code is a production-grade Snowflake implementation, including a mutex for concurrency safety and handling for clock skew and sequence number overflow within the same millisecond.
2. Defining the Raft State Machine and Commands
The data that needs to be kept consistent across the Raft cluster is a map[int64]LeaseInfo. We need to define the commands that operate on this map; these commands will be replicated as the content of the Raft log.
// state.go
package main
import (
"encoding/json"
"time"
)
const (
CmdAcquire = "ACQUIRE"
CmdRenew = "RENEW"
CmdRelease = "RELEASE" // For graceful shutdown
)
// LeaseInfo holds the information about a worker ID lease.
type LeaseInfo struct {
WorkerID int64 `json:"worker_id"`
NodeID string `json:"node_id"`
ExpiresAt time.Time `json:"expires_at"`
}
// Command represents a command to be applied to the state machine.
type Command struct {
Op string `json:"op"`
NodeID string `json:"node_id"`
WorkerID int64 `json:"worker_id,omitempty"` // omitempty for acquire
}
// WorkerIDManager is our state machine.
type WorkerIDManager struct {
mu sync.RWMutex
leases map[int64]LeaseInfo // The consensus state: workerID -> LeaseInfo
nodeMap map[string]int64 // Helper map: nodeID -> workerID
leaseDuration time.Duration
}
// NewWorkerIDManager creates a new state machine.
func NewWorkerIDManager(leaseDuration time.Duration) *WorkerIDManager {
return &WorkerIDManager{
leases: make(map[int64]LeaseInfo),
nodeMap: make(map[string]int64),
leaseDuration: leaseDuration,
}
}
// Apply applies a command from the Raft log to the state machine.
// This function MUST be deterministic.
func (m *WorkerIDManager) Apply(cmdBytes []byte) interface{} {
var cmd Command
if err := json.Unmarshal(cmdBytes, &cmd); err != nil {
// In production, this should be logged seriously.
// A command that can't be unmarshalled is a critical bug.
return err
}
m.mu.Lock()
defer m.mu.Unlock()
switch cmd.Op {
case CmdAcquire:
// Check if this node already has a lease.
if existingID, ok := m.nodeMap[cmd.NodeID]; ok {
// If lease is still valid, just return the existing ID.
if m.leases[existingID].ExpiresAt.After(time.Now()) {
return existingID
}
}
// Find an available ID
for i := int64(0); i <= maxWorkerID; i++ {
if _, exists := m.leases[i]; !exists {
lease := LeaseInfo{
WorkerID: i,
NodeID: cmd.NodeID,
ExpiresAt: time.Now().Add(m.leaseDuration),
}
m.leases[i] = lease
m.nodeMap[cmd.NodeID] = i
return i
}
}
return errors.New("no available worker ids")
case CmdRenew:
if existingID, ok := m.nodeMap[cmd.NodeID]; ok && existingID == cmd.WorkerID {
if lease, leaseExists := m.leases[cmd.WorkerID]; leaseExists {
lease.ExpiresAt = time.Now().Add(m.leaseDuration)
m.leases[cmd.WorkerID] = lease
return true
}
}
return errors.New("lease not found or node mismatch")
case CmdRelease:
if existingID, ok := m.nodeMap[cmd.NodeID]; ok && existingID == cmd.WorkerID {
delete(m.leases, cmd.WorkerID)
delete(m.nodeMap, cmd.NodeID)
}
return nil
default:
return errors.New("unknown command op")
}
}
The Apply function here is critical. It must be deterministic, meaning that given the same initial state and the same sequence of logs, all nodes must reach the exact same final state after execution. Note that Apply uses time.Now() internally, which is a common pitfall. In a real Raft implementation, the Leader node should determine the specific ExpiresAt timestamp when creating the log entry and include it in the Command struct. This guarantees that all Followers use the identical timestamp when applying the log. For simplicity, we’ll accept this minor inconsistency for now, but it must be fixed in a production system.
3. Service Layer Encapsulation
The service layer translates HTTP requests into Raft commands and submits them to the Raft cluster.
// server.go
package main
import (
"encoding/json"
"log"
"net/http"
"time"
)
// RaftNode is an interface representing our underlying Raft implementation.
type RaftNode interface {
Propose(cmd []byte) (interface{}, error)
IsLeader() bool
}
type Server struct {
raftNode RaftNode
manager *WorkerIDManager // The server holds a reference to the state machine
nodeID string
}
func NewServer(nodeID string, raftNode RaftNode, manager *WorkerIDManager) *Server {
return &Server{
raftNode: raftNode,
manager: manager,
nodeID: nodeID,
}
}
// handleAcquire handles the HTTP request to get a worker ID.
func (s *Server) handleAcquire(w http.ResponseWriter, r *http.Request) {
if !s.raftNode.IsLeader() {
// In a real system, we should redirect the client to the current leader.
http.Error(w, "not the leader", http.StatusBadGateway)
return
}
reqNodeID := r.URL.Query().Get("nodeId")
if reqNodeID == "" {
http.Error(w, "nodeId is required", http.StatusBadRequest)
return
}
cmd := Command{
Op: CmdAcquire,
NodeID: reqNodeID,
}
cmdBytes, _ := json.Marshal(cmd)
result, err := s.raftNode.Propose(cmdBytes)
if err != nil {
log.Printf("Failed to propose command: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Respond to the client
json.NewEncoder(w).Encode(map[string]interface{}{"worker_id": result})
}
// ... handleRenew and handleRelease would follow a similar pattern ...
This service layer code demonstrates how to decouple the business logic (HTTP requests) from the underlying Raft consensus module. Only the Leader node can handle write requests (Acquire, Renew). A Follower node receiving such a request should return an error or a redirect.
4. High-Throughput Audit Logging
The Snowflake algorithm can generate millions of IDs per second. If we wanted to audit every ID allocation (or every block of consumed sequence numbers), a traditional RDBMS would be immediately overwhelmed by the write load. This is a perfect use case for a columnar NoSQL database.
We’ll introduce a simple mechanism: whenever a service instance successfully acquires or renews a Worker ID, it will asynchronously send an audit record to a dedicated logging channel. The backend for this channel is a columnar database that supports high-concurrency writes, such as ClickHouse.
graph TD
subgraph Service Instance
A[ID Generator] -- generates ID --> B{Audit?}
end
subgraph Raft Cluster
C[Leader]
D[Follower]
E[Follower]
C <--> D
C <--> E
end
ServiceInstance -- 1. Acquire/Renew WorkerID --> C
B -- Yes --> F[Async Log Channel]
F -- Batch Write --> G[(Columnar DB: ClickHouse)]
style G fill:#f9f,stroke:#333,stroke-width:2px
The structure of an audit log might look like this:{ event_timestamp, event_type, node_id, worker_id, trace_id }
The code for writing to ClickHouse might look something like this:
// audit_logger.go
package main
import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"log"
"time"
)
type AuditLog struct {
Timestamp time.Time `ch:"timestamp"`
EventType string `ch:"event_type"`
NodeID string `ch:"node_id"`
WorkerID int64 `ch:"worker_id"`
}
type ClickHouseLogger struct {
conn clickhouse.Conn
ch chan AuditLog
}
func NewClickHouseLogger(addr string) (*ClickHouseLogger, error) {
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{addr},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
Password: "",
},
})
if err != nil {
return nil, err
}
// This is a simplified example. In production, you'd use a more robust batching mechanism.
logger := &ClickHouseLogger{
conn: conn,
ch: make(chan AuditLog, 10000), // Buffered channel
}
go logger.batchWriter() // Start the background writer
return logger, nil
}
func (l *ClickHouseLogger) Log(event AuditLog) {
select {
case l.ch <- event:
// Logged successfully to buffer
default:
// Buffer is full. In a real system, you'd handle this case
// (e.g., drop, block, or write to a fallback file).
log.Println("Audit log channel is full, dropping log.")
}
}
func (l *ClickHouseLogger) batchWriter() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
batch := make([]AuditLog, 0, 1000)
for {
select {
case logEntry := <-l.ch:
batch = append(batch, logEntry)
if len(batch) >= 1000 {
l.flush(batch)
batch = make([]AuditLog, 0, 1000) // Reset batch
}
case <-ticker.C:
if len(batch) > 0 {
l.flush(batch)
batch = make([]AuditLog, 0, 1000) // Reset batch
}
}
}
}
func (l *ClickHouseLogger) flush(batch []AuditLog) {
ctx := context.Background()
statement, err := l.conn.PrepareBatch(ctx, "INSERT INTO audit_logs")
if err != nil {
log.Printf("Failed to prepare batch: %v", err)
return
}
for _, logEntry := range batch {
err := statement.AppendStruct(&logEntry)
if err != nil {
log.Printf("Failed to append to batch: %v", err)
return // Abort this batch
}
}
if err := statement.Send(); err != nil {
log.Printf("Failed to send batch: %v", err)
} else {
log.Printf("Flushed %d audit logs to ClickHouse", len(batch))
}
}
This audit module’s design adheres to several important principles:
- Asynchronicity: The
Logmethod is non-blocking. It returns immediately after writing to a buffered channel, minimizing performance impact on the main business logic (ID generation). - Batch Writes: A background goroutine collects logs and writes them to ClickHouse in batches, governed by a strategy of every 5 seconds or every 1000 entries. This dramatically improves the database’s write throughput and is a best practice for using columnar stores.
- Separation of Concerns: The core logic for ID generation and management is completely decoupled from audit logging. If the ClickHouse cluster has issues, only auditing will be affected, not the core ID service.
Limitations and Future Iterations
While the manager we’ve built solves the core problem, there are several areas that need improvement for a production environment.
First, our Raft implementation is pedagogical. A production-grade Raft library (like hashicorp/raft or etcd/raft) would handle many more complex scenarios, such as log compaction, snapshotting, and membership changes (dynamically adding/removing nodes). Using one of these mature libraries directly is the wiser choice.
Second, the lease reclamation mechanism currently relies on the Leader node performing periodic scans. When the number of Worker IDs becomes very large, this could create a performance overhead. A more optimized data structure, such as a min-heap ordered by expiration time, could be considered to quickly find leases that need to be reclaimed.
Finally, in the current design, the service instances holding Worker IDs are responsible for proactively renewing their leases. This works well when an instance is healthy, but if an instance becomes unresponsive (the process is running but not functioning correctly), the lease might still be renewed. A more robust solution might require the Raft manager cluster to perform health checks on the service instances in reverse, combining this with the instance’s active heartbeats to jointly determine a lease’s validity. This adds complexity to the system but further enhances its robustness.