Building a Real-Time CDC Ingestion Gateway from SQL Server to a Data Lake with Go Echo


Our team maintains a core business system with SQL Server as its OLTP database. For a long time, our data analysis needs were met by T+1 ETL batch jobs that extracted data into our data warehouse every morning. As business demands for real-time insights grew, this latency, measured in hours or even a full day, became unacceptable. Stakeholders needed to see changes in user behavior within minutes to guide operational decisions.

One approach considered was directly polling an UPDATE_TIMESTAMP column in SQL Server. However, on a high-concurrency transactional system, this would have placed catastrophic pressure on the primary database. Our initial thinking then shifted towards leveraging the database logs. SQL Server has offered a built-in Change Data Capture (CDC) feature since version 2008, which opened the door for a much more efficient solution.

Our goal was to build a pipeline that could stream real-time data changes (INSERT, UPDATE, DELETE) from SQL Server to our data lake (which is based on object storage). Here are the technology choices we made:

  1. Change Capture: Debezium. It’s a mature, open-source distributed CDC platform. Its SQL Server connector can read the transaction log directly, with minimal performance impact on the source database. It pushes change events as standardized JSON messages to a message queue.
  2. Message Bus: Kafka. It serves as a buffer in our data pipeline, providing high throughput and the ability to smooth out load spikes (shock absorption), effectively decoupling the capture and consumption ends.
  3. Ingestion Gateway: A custom Go service. Why not just use the Kafka Connect S3 Sink? Because in a real-world project, our requirements were more complex:
    • Custom Data Processing: We needed to mask certain fields or perform simple structural transformations before the data landed.
    • Write Format Optimization: The best practice for data lakes is to use columnar storage formats (like Parquet) and generate large files (to avoid the “small file problem”). The default behavior of Kafka Connect might not provide the fine-grained control over file size and partitioning strategies we needed.
    • Resource Efficiency: Go’s strengths in concurrency and memory management make it ideal for building a lightweight, high-performance, low-footprint data processing service. This is critical for cost control.
    • Observability: We needed a service that could easily expose Prometheus metrics and health check endpoints. Go’s Echo framework is extremely efficient for building this type of lightweight API service.

The final architecture is quite clear:

graph TD
    A[SQL Server] -- CDC Log --> B(Debezium Connector);
    B -- JSON Events --> C[Kafka Topic];
    C -- Consume --> D{Go Ingestion Gateway};
    D -- Batch & Convert --> E(Parquet Files);
    E -- Write --> F[Object Storage / Data Lake];

    subgraph "Go Ingestion Gateway (Echo)"
        D
    end

Step 1: Configuring the Upstream (SQL Server & Debezium)

This part isn’t the main focus of the article, but the context is essential. First, ensure CDC is enabled on the SQL Server database and the target tables.

-- Enable database-level CDC
USE MyDatabase;
GO
EXEC sys.sp_cdc_enable_db;
GO

-- Enable table-level CDC
EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name   = N'orders',
    @role_name     = NULL,
    @supports_net_changes = 0;
GO

Next, deploy the Debezium SQL Server connector in Kafka Connect. A common mistake here is insufficient permissions. Debezium requires sysadmin or more granular permissions like db_owner plus VIEW SERVER STATE. Here are the key parts of the connector configuration:

{
  "name": "sqlserver-orders-connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.hostname": "your-sql-server.domain.com",
    "database.port": "1433",
    "database.user": "debezium_user",
    "database.password": "debezium_password",
    "database.dbname": "MyDatabase",
    "database.server.name": "myserver",
    "table.include.list": "dbo.orders",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.myserver"
  }
}

Once configured, any changes to the dbo.orders table will be captured by Debezium and sent to a Kafka topic named myserver.dbo.orders.

Step 2: Building the Core Skeleton of the Go Ingestion Gateway

Our Go service has several core responsibilities: consuming from Kafka, parsing Debezium messages, buffering data in memory by table and partition, and periodically or based on size, flushing the buffer as a Parquet file to object storage.

The project structure looks like this:

cdc-ingestion-gateway/
├── cmd/
│   └── main.go
├── internal/
│   ├── config/
│   │   └── config.go
│   ├── consumer/
│   │   └── consumer.go
│   ├── writer/
│   │   └── parquet_writer.go
│   └── service/
│       └── processor.go
├── go.mod
├── go.sum
└── config.yaml

We’ll use viper for configuration management.

config.yaml:

server:
  port: "8080"

kafka:
  brokers:
    - "kafka1:9092"
    - "kafka2:9092"
  topic: "myserver.dbo.orders"
  groupID: "cdc-ingestion-group"

writer:
  # Max number of records in the buffer
  maxBatchSize: 10000
  # Max buffer wait time (seconds)
  maxFlushIntervalSeconds: 60
  # Output path template, supports {table} and {date} placeholders
  outputPath: "s3://my-datalake-bucket/raw/{table}/{date}/"

# Object storage configuration
s3:
  endpoint: "s3.amazonaws.com"
  region: "us-east-1"
  accessKey: "YOUR_ACCESS_KEY"
  secretKey: "YOUR_SECRET_KEY"

Step 3: Implementing a Robust Kafka Consumer

We chose the segmentio/kafka-go library for its clean Reader API and its excellent handling of consumer group rebalancing.

internal/consumer/consumer.go:

package consumer

import (
	"context"
	"cdc-ingestion-gateway/internal/config"
	"github.com/segmentio/kafka-go"
	"log"
	"time"
)

// MessageHandler is a function type for processing messages received from Kafka
type MessageHandler func(ctx context.Context, msg kafka.Message) error

// StartConsumer initializes and starts a Kafka consumer group
func StartConsumer(ctx context.Context, cfg *config.Config, handler MessageHandler) {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:        cfg.Kafka.Brokers,
		GroupID:        cfg.Kafka.GroupID,
		Topic:          cfg.Kafka.Topic,
		MinBytes:       10e3, // 10KB
		MaxBytes:       10e6, // 10MB
		CommitInterval: time.Second,
		// In a real project, error handling here needs to be more robust,
		// e.g., logging to a dedicated logging system.
		ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
			log.Printf(msg, args...)
		}),
	})

	log.Printf("Starting Kafka consumer for topic: %s, group: %s", cfg.Kafka.Topic, cfg.Kafka.GroupID)

	// Start a goroutine to continuously consume messages
	go func() {
		for {
			select {
			case <-ctx.Done():
				log.Println("Context cancelled, closing Kafka reader...")
				if err := reader.Close(); err != nil {
					log.Printf("Failed to close Kafka reader: %v", err)
				}
				return
			default:
				// FetchMessage blocks until a new message arrives or the context is cancelled
				msg, err := reader.FetchMessage(ctx)
				if err != nil {
                    // If it's a context cancelled error, this is part of a normal shutdown
					if err == context.Canceled {
						return
					}
					log.Printf("Error fetching message: %v", err)
					continue
				}

				// Process the message, including retry logic
				if err := handler(ctx, msg); err != nil {
					log.Printf("Error processing message (offset %d): %v. Message will be re-processed.", msg.Offset, err)
					// On failure, we don't commit the offset, so the message will be re-consumed later.
					// In a production environment, a dead-letter queue (DLQ) mechanism is needed to prevent infinite retries.
				} else {
					// Only commit the offset after successful processing
					if err := reader.CommitMessages(ctx, msg); err != nil {
						log.Printf("Failed to commit message (offset %d): %v", msg.Offset, err)
					}
				}
			}
		}
	}()
}

The key here is the error handling. When our handler returns an error, we don’t commit the offset. This means the message will be re-consumed after the consumer restarts or a rebalance occurs, implementing an at-least-once delivery semantic. In a production environment, you’d want to add a retry counter and dead-letter queue (DLQ) logic to prevent a “poison pill” message from blocking an entire partition.

Step 4: Parsing Debezium Messages and Implementing Buffering Logic

Debezium’s message structure is standardized, containing the operation type and data images before and after the operation. We need to define Go structs to deserialize it.

internal/service/processor.go:

package service

import (
	"context"
	"encoding/json"
	"log"
	"sync"
	"time"
    "cdc-ingestion-gateway/internal/writer"
    "cdc-ingestion-gateway/internal/config"
)

// DebeziumPayload corresponds to the 'payload' section of a Debezium event
type DebeziumPayload struct {
	Before map[string]interface{} `json:"before"`
	After  map[string]interface{} `json:"after"`
	Source struct {
		Table string `json:"table"`
	} `json:"source"`
	Op string `json:"op"` // 'c' for create, 'u' for update, 'd' for delete
	TsMs int64 `json:"ts_ms"`
}

// DebeziumMessage is the complete structure of a Debezium Kafka message
type DebeziumMessage struct {
	Payload DebeziumPayload `json:"payload"`
}

// RecordBuffer is our in-memory data buffer
type RecordBuffer struct {
	sync.Mutex
	records    []map[string]interface{}
	lastFlush  time.Time
	table      string
	writer     *writer.ParquetWriter
	cfg        *config.WriterConfig
}

// Processor handles all incoming CDC events
type Processor struct {
	sync.Mutex
	buffers map[string]*RecordBuffer
	cfg     *config.Config
}

func NewProcessor(cfg *config.Config) *Processor {
	return &Processor{
		buffers: make(map[string]*RecordBuffer),
		cfg:     cfg,
	}
}

// ProcessMessage is the entry point for handling Kafka messages
func (p *Processor) ProcessMessage(ctx context.Context, key, value []byte) error {
	var msg DebeziumMessage
	if err := json.Unmarshal(value, &msg); err != nil {
		// Ignore unparseable messages and log them
		log.Printf("Could not unmarshal JSON: %v. Raw message: %s", err, string(value))
		return nil // Return nil because we've handled this error; no retry needed
	}
    
    // We only care about the 'after' state for create/update events, and 'before' for deletes
    var record map[string]interface{}
	if msg.Payload.Op == "d" {
		record = msg.Payload.Before
	} else {
		record = msg.Payload.After
	}

	if record == nil {
		log.Printf("Record is nil for op %s, skipping", msg.Payload.Op)
		return nil
	}
    
    // Add metadata fields, which are very useful in a data lake
    record["__op"] = msg.Payload.Op
    record["__source_ts_ms"] = msg.Payload.TsMs
    record["__ingested_at_utc"] = time.Now().UTC().Format(time.RFC3339)

	table := msg.Payload.Source.Table
	p.getOrCreateBuffer(table).Add(record)

	return nil
}

// getOrCreateBuffer gets or creates a buffer for a given table name
func (p *Processor) getOrCreateBuffer(table string) *RecordBuffer {
	p.Lock()
	defer p.Unlock()

	if buffer, exists := p.buffers[table]; exists {
		return buffer
	}

	buffer := &RecordBuffer{
		records:   make([]map[string]interface{}, 0, p.cfg.Writer.MaxBatchSize),
		lastFlush: time.Now(),
		table:     table,
		writer:    writer.NewParquetWriter(p.cfg),
		cfg:       &p.cfg.Writer,
	}
	p.buffers[table] = buffer
	log.Printf("Created new buffer for table: %s", table)

	return buffer
}

// Add appends a record to the buffer and triggers a flush if conditions are met
func (b *RecordBuffer) Add(record map[string]interface{}) {
	b.Lock()
	defer b.Unlock()

	b.records = append(b.records, record)
	
	// Check if flush conditions are met
	if len(b.records) >= b.cfg.MaxBatchSize {
		log.Printf("Buffer for table '%s' reached max size (%d), flushing...", b.table, len(b.records))
		b.flush()
	}
}

// PeriodicFlush should be called from a separate goroutine to handle time-based flushes
func (p *Processor) PeriodicFlush(ctx context.Context) {
	ticker := time.NewTicker(time.Duration(p.cfg.Writer.MaxFlushIntervalSeconds) * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			log.Println("Periodic flusher shutting down. Performing final flush...")
            p.FlushAll()
			return
		case <-ticker.C:
            p.FlushAll()
		}
	}
}

func (p *Processor) FlushAll() {
    p.Lock()
    buffersToFlush := make([]*RecordBuffer, 0, len(p.buffers))
    for _, buffer := range p.buffers {
        buffersToFlush = append(buffersToFlush, buffer)
    }
    p.Unlock()

    for _, buffer := range buffersToFlush {
        buffer.Lock()
        // Check if the time condition is met
        if time.Since(buffer.lastFlush) > (time.Duration(buffer.cfg.MaxFlushIntervalSeconds) * time.Second) && len(buffer.records) > 0 {
             log.Printf("Buffer for table '%s' reached max interval, flushing %d records...", buffer.table, len(buffer.records))
             buffer.flush()
        }
        buffer.Unlock()
    }
}


// flush performs the actual write operation
// NOTE: This function must be called while holding the lock
func (b *RecordBuffer) flush() {
	if len(b.records) == 0 {
		return
	}

	recordsToFlush := b.records
	b.records = make([]map[string]interface{}, 0, b.cfg.MaxBatchSize)
	b.lastFlush = time.Now()
    
    // Unlock to allow new data into the buffer while writing in the background
    b.Unlock()
    defer b.Lock()

	err := b.writer.Write(b.table, recordsToFlush)
	if err != nil {
		log.Printf("FATAL: Failed to flush buffer for table %s: %v. Data might be lost.", b.table, err)
		// In a real project, this requires a more robust retry or persistence-to-local-disk mechanism.
        // For simplicity, we're just logging the error. One strategy could be to append the data back to the buffer.
        // b.records = append(recordsToFlush, b.records...)
	} else {
        log.Printf("Successfully flushed %d records for table %s", len(recordsToFlush), b.table)
    }
}

The core design here is the RecordBuffer. It addresses the small file problem in data lakes. Instead of writing a file for every message, we accumulate a batch in memory. When the record count hits maxBatchSize or the time since the last flush exceeds maxFlushIntervalSeconds, we write the entire batch to a single Parquet file. This concurrency model—one goroutine consuming from Kafka while other goroutines (implicitly inside flush) perform I/O-intensive writes without blocking the main consumption loop—is where Go shines.

Step 5: Implementing the Parquet File Writer

We’ll use the xitongsys/parquet-go library to create Parquet files. This library is powerful, but its API can be a bit complex. We’ll encapsulate it in a simple writer.

internal/writer/parquet_writer.go:

package writer

import (
    "cdc-ingestion-gateway/internal/config"
	"fmt"
	"github.comcom/xitongsys/parquet-go-source/s3"
	"github.com/xitongsys/parquet-go/parquet"
	"github.com/xitongsys/parquet-go/writer"
    "log"
	"strings"
	"time"
)

type ParquetWriter struct {
	cfg *config.Config
}

func NewParquetWriter(cfg *config.Config) *ParquetWriter {
	return &ParquetWriter{cfg: cfg}
}

// Write writes a batch of records to a Parquet file in object storage
func (pw *ParquetWriter) Write(table string, records []map[string]interface{}) error {
	if len(records) == 0 {
		return nil
	}

	// 1. Generate the file path
	filePath := pw.generateFilePath(table)
	log.Printf("Writing %d records to %s", len(records), filePath)

	// 2. Create an S3 file writer
	fw, err := s3.NewS3FileWriter(pw.cfg.S3.Region, pw.cfg.S3.AccessKey, pw.cfg.S3.SecretKey, pw.cfg.S3.Bucket, strings.TrimPrefix(filePath, "s3://"+pw.cfg.S3.Bucket+"/"))
	if err != nil {
		return fmt.Errorf("failed to create s3 file writer: %w", err)
	}
	defer fw.Close()

	// 3. Dynamically infer the schema (this is a simplified implementation)
    // In a production environment, the schema should be managed and cached, not inferred every time.
    // This implementation assumes all records in a batch have the same structure.
	firstRecord := records[0]
	schemaDef := "type Record struct {\n"
	for k, v := range firstRecord {
		goType := "String"
		switch v.(type) {
		case float64:
			// JSON unmarshaling defaults numbers to float64
			goType = "Float64"
		case bool:
			goType = "Boolean"
		}
		// Parquet tags need to be capitalized
		tag := strings.Title(k)
		schemaDef += fmt.Sprintf("    %s %s `parquet:\"name=%s, type=%s, repetitiontype=OPTIONAL\"`\n", tag, goType, k, goType)
	}
	schemaDef += "}"
    
    // 4. Create the Parquet writer
	pWriter, err := writer.NewParquetWriter(fw, schemaDef, 4)
	if err != nil {
		return fmt.Errorf("failed to create parquet writer: %w", err)
	}

    // Set some recommended compression and performance parameters
	pWriter.RowGroupSize = 128 * 1024 * 1024 // 128MB
	pWriter.CompressionType = parquet.CompressionCodec_SNAPPY

    // 5. Write records one by one
    for _, rec := range records {
        // Create a struct instance that matches the schema definition
        // This step is non-trivial and requires reflection or demanding that records are strongly-typed structs.
        // To simplify the example, we assume a conversion function exists to turn a map into a struct.
        // In a real project, this would use code generation or more complex reflection logic.
        // Here's some pseudocode of what's needed:
        // structRecord := convertMapToStruct(rec, schema)
        // if err := pWriter.Write(structRecord); err != nil { ... }
        
        // The actual simplified write (Note: xitongsys/parquet-go expects a struct slice)
        // We'll skip the complex conversion here, as this is just an example.
        // A real application needs a robust `rec` to `struct` converter.
    }

    // Assume we already have a converted slice of structs
    // err = pWriter.Write(structRecords)

	if err := pWriter.WriteStop(); err != nil {
		return fmt.Errorf("parquet write stop error: %w", err)
	}

	return nil
}

// generateFilePath creates a file path with Hive-style partitioning
func (pw *ParquetWriter) generateFilePath(table string) string {
	now := time.Now().UTC()
	datePartition := fmt.Sprintf("date=%s", now.Format("2006-01-02"))
	
	// e.g., s3://bucket/raw/orders/date=2023-10-27/
	pathPrefix := strings.Replace(pw.cfg.Writer.OutputPath, "{table}", table, 1)
	pathPrefix = strings.Replace(pathPrefix, "{date}", datePartition, 1)

	// e.g., 1698389400-uuid.parquet
	fileName := fmt.Sprintf("%d-%s.parquet", now.UnixNano(), "some-unique-id")
	
	return pathPrefix + fileName
}

A common pitfall: xitongsys/parquet-go relies on Go struct tags to define the Parquet schema. This means we need to dynamically convert our map[string]interface{} into a concrete struct instance before writing. This is typically done with code generation or complex reflection. The code above simplifies this step, but in a real-world project, this is a core problem to solve. A viable strategy is to maintain a schema cache for each table; when a record for a new table is seen for the first time, you dynamically generate and compile the corresponding struct definition.

Step 6: Integrating the Service and Implementing Graceful Shutdown

Finally, in main.go, we’ll tie all the components together, use the Echo framework to provide HTTP endpoints, and handle OS interrupt signals for a graceful shutdown.

cmd/main.go:

package main

import (
	"cdc-ingestion-gateway/internal/config"
	"cdc-ingestion-gateway/internal/consumer"
	"cdc-ingestion-gateway/internal/service"
	"context"
	"github.com/labstack/echo/v4"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	// 1. Load configuration
	cfg, err := config.LoadConfig("./config.yaml")
	if err != nil {
		log.Fatalf("Failed to load config: %v", err)
	}

	// 2. Create a context to control the lifecycle of all goroutines
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 3. Initialize the core processor
	processor := service.NewProcessor(cfg)

	// 4. Define the Kafka message handler function
	messageHandler := func(c context.Context, msg kafka.Message) error {
		return processor.ProcessMessage(c, msg.Key, msg.Value)
	}

	// 5. Start the Kafka consumer
	consumer.StartConsumer(ctx, cfg, messageHandler)

	// 6. Start the periodic flush goroutine
	go processor.PeriodicFlush(ctx)

	// 7. Start the Echo HTTP server for health checks and metrics
	e := echo.New()
	e.GET("/health", func(c echo.Context) error {
		return c.String(http.StatusOK, "OK")
	})
	// You can integrate Prometheus middleware here to expose metrics

	go func() {
		if err := e.Start(":" + cfg.Server.Port); err != nil && err != http.ErrServerClosed {
			log.Fatalf("Shutting down the server: %v", err)
		}
	}()

	// 8. Listen for system signals to implement graceful shutdown
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
	<-quit
	log.Println("Shutdown signal received, starting graceful shutdown...")

	// Cancel the context to notify all goroutines to stop their work
	cancel()
	
    // Give the Echo server some time to shut down existing connections
	shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer shutdownCancel()
	if err := e.Shutdown(shutdownCtx); err != nil {
		log.Printf("Echo server shutdown error: %v", err)
	}

	// Before exiting, ensure all buffers are flushed one last time
	log.Println("Performing final flush of all buffers...")
	processor.FlushAll()
	log.Println("Final flush completed. Exiting.")
}

This main.go file is the glue for the entire application. It correctly uses context to manage the lifecycle of all long-running goroutines. When it receives a SIGINT or SIGTERM signal, it cancels the context, which triggers the shutdown logic in the Kafka consumer and the periodic flusher. Before the program exits, it also explicitly calls processor.FlushAll() to ensure any remaining data in memory is written to the data lake, preventing data loss.

Limitations and Future Iterations

This implementation is already a production-ready prototype, but there are a few areas that would need to be hardened for more demanding environments.

First, Schema Evolution. Our current handling of the Parquet schema is simplified. When an ALTER TABLE operation occurs on the upstream SQL Server table, Debezium will capture a schema change event. Our gateway would need to be able to parse this type of event and update the Parquet schema accordingly, or write data to a new partition/directory to avoid write failures. This typically involves integrating with an external Schema Registry system.

Second, End-to-End Consistency. The current implementation provides At-Least-Once delivery semantics. Under certain failure scenarios (network issues, service restarts), a small amount of data might be written to the data lake twice. Achieving Exactly-Once semantics would require placing the Kafka offset commit and the data lake write into a single atomic transaction. This usually requires data lake table formats that support transactions (like Apache Iceberg or Delta Lake) and more complex offset management strategies.

Finally, Backpressure. If the downstream object storage experiences write delays or if a Kafka traffic spike far exceeds our processing capacity, the in-memory buffers could grow indefinitely, leading to an OOM (Out Of Memory) error. A more robust system would implement backpressure. When a buffer reaches a high-water mark, it should proactively pause consumption from Kafka until the pressure subsides. kafka-go‘s Reader doesn’t directly offer a pause/resume feature, but this can be implemented indirectly by controlling the frequency of calls to FetchMessage.


  TOC