Building a Transaction-Aware Logging Architecture with Fluentd, OpenSearch, and DynamoDB


Production logs without context are just meaningless noise. When a user request traverses an API Gateway, flows through three microservices, and culminates in a state change in DynamoDB, a subtle error can be a disaster to debug. If you’re faced with four separate, differently formatted, and uncorrelated log streams, troubleshooting becomes a nightmare. This is the exact pain point we faced: fragmented logs and a complete loss of context.

Our goal is to build a system where any request can be tracked end-to-end. More than that, we need to tightly couple the technical trace ID (trace_id) with the business-level transaction ID (transaction_id). This way, whether you’re in tech support or development, you can take a business order number, instantly pull up every related technical log, and see the complete lifecycle of that interaction.

Architectural Vision and Technical Trade-offs

To achieve this, we need a clean architecture and a set of cooperating components.

  1. Unified Entrypoint & Trace Initiation: All traffic must pass through an API Gateway. This is the perfect place to generate the end-to-end trace_id. We’ll leverage the gateway’s capabilities to inject a unique X-Trace-ID header into every incoming request.
  2. Service Framework & Context Propagation: Our backend services are built with Go and the Echo framework. We must implement a generic Echo middleware that automatically parses the X-Trace-ID header and injects it into the request’s context. All subsequent log generation must be able to automatically extract this ID from the context.
  3. Structured Logging: Say goodbye to plain text logs. All services must output logs in JSON format. We’ll define a single, global logging schema that includes timestamp, level, service_name, trace_id, transaction_id, message, and a payload field for structured business-related data.
  4. Log Aggregation & Routing: Fluentd is the core of this stage. It’s lightweight with a rich plugin ecosystem. We’ll deploy it as a daemon on each service node or as a sidecar container, responsible for collecting local log streams, performing necessary parsing and buffering, and then reliably forwarding them to the backend.
  5. Storage & Querying: OpenSearch provides powerful full-text search and aggregation capabilities, making it the ideal choice for storing and querying structured logs. By pre-defining an index template, we can ensure log data is indexed correctly, optimizing query performance.
  6. Business Context Correlation: DynamoDB is our primary business database. When a core business process (e.g., creating an order) begins, a unique business transaction_id is generated and stored in DynamoDB. When services handle this process, they must log not only the trace_id but also this transaction_id. This is the critical bridge between the technical and business domains.

The complete data flow is as follows:

graph TD
    A[Client] --> B(API Gateway);
    B -- "Adds X-Trace-ID header" --> C{Echo Service};
    C -- "Reads X-Trace-ID, adds to context" --> D[Business Logic];
    D -- "Generates transaction_id" --> E(DynamoDB);
    D -- "Logs with trace_id & transaction_id" --> F((Log Stream));
    F --> G[Fluentd Agent];
    G -- "Forwards structured logs" --> H(OpenSearch);
    I[Developer/SRE] -- "Queries by trace_id or transaction_id" --> H;

Core Implementation Details

1. Defining a Unified Log Structure

This is the foundation for everything else. A poor log structure will handicap all subsequent efforts. Our standard structure is:

{
  "timestamp": "2023-10-27T10:30:00.123Z",
  "level": "info",
  "service": "order-service",
  "trace_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
  "transaction_id": "txn-987654321",
  "user_id": "usr-abcdef123",
  "message": "Order successfully created",
  "payload": {
    "order_id": "ord-fedcba987",
    "item_count": 3,
    "total_amount": 99.99
  },
  "source_ip": "192.168.1.100",
  "duration_ms": 15.6
}
  • trace_id: Generated by the API Gateway, persists throughout the request’s lifecycle.
  • transaction_id: Generated by business logic, correlates a specific business process.
  • payload: A flexible JSON object for carrying rich business context.

2. Echo Middleware: Context Injection and Structured Logging

We’ll use slog as the standard structured logging library for Go 1.21+ and write an Echo middleware to automate context handling.

pkg/logger/logger.go:

package logger

import (
	"context"
	"log/slog"
	"os"
	"sync"
)

// customContextKey is a private type to avoid key collisions in context.
type customContextKey string

const (
	TraceIDContextKey      customContextKey = "trace_id"
	TransactionIDContextKey customContextKey = "transaction_id"
)

var (
	defaultLogger *slog.Logger
	once          sync.Once
)

// Init initializes the singleton structured logger.
func Init(serviceName string) {
	once.Do(func() {
		handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
			Level: slog.LevelDebug,
			// Custom replacer to enrich logs with context values
			ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
				// This is a placeholder for where context would be available.
				// The actual enrichment happens in the context-aware logger.
				return a
			},
		}).WithAttrs([]slog.Attr{slog.String("service", serviceName)})
		defaultLogger = slog.New(handler)
	})
}

// FromContext returns a logger that automatically includes fields from the context.
// In a real application, you might use a more sophisticated handler that
// inspects the context given to log methods, but this approach is simpler for demonstration.
func FromContext(ctx context.Context) *slog.Logger {
	if defaultLogger == nil {
		Init("unknown-service") // Fallback
	}

	attrs := []slog.Attr{}
	if traceID, ok := ctx.Value(TraceIDContextKey).(string); ok && traceID != "" {
		attrs = append(attrs, slog.String("trace_id", traceID))
	}
	if transactionID, ok := ctx.Value(TransactionIDContextKey).(string); ok && transactionID != "" {
		attrs = append(attrs, slog.String("transaction_id", transactionID))
	}
	
	if len(attrs) > 0 {
		return defaultLogger.With(attrs...)
	}

	return defaultLogger
}

internal/middleware/logging.go:

package middleware

import (
	"context"
	"github.com/google/uuid"
	"github.com/labstack/echo/v4"
	"your-project/pkg/logger" // Import your logger package
)

const TraceIDHeader = "X-Trace-ID"

// ContextualLoggingMiddleware injects trace_id into the request context
// and ensures all subsequent logs for this request are structured and contextual.
func ContextualLoggingMiddleware() echo.MiddlewareFunc {
	return func(next echo.HandlerFunc) echo.HandlerFunc {
		return func(c echo.Context) error {
			req := c.Request()
			ctx := req.Context()

			// 1. Extract or generate Trace ID
			traceID := req.Header.Get(TraceIDHeader)
			if traceID == "" {
				traceID = uuid.New().String()
				// In a real scenario, you might want to log that a trace ID was generated here.
			}
			c.Response().Header().Set(TraceIDHeader, traceID) // Echo it back in the response

			// 2. Inject into context
			ctx = context.WithValue(ctx, logger.TraceIDContextKey, traceID)
			c.SetRequest(req.WithContext(ctx))

			// Use the context-aware logger for the request log
			reqLogger := logger.FromContext(ctx)
			reqLogger.Info("Request started",
				"method", req.Method,
				"uri", req.RequestURI,
				"remote_ip", c.RealIP(),
			)

			err := next(c)

            // After request processing, log the final status
			reqLogger.Info("Request finished",
				"status", c.Response().Status,
				"size", c.Response().Size,
			)

			return err
		}
	}
}

Enable it in main.go:

package main

import (
	"context"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/google/uuid"
	"github.com/labstack/echo/v4"
	"log/slog"
	"net/http"
	"your-project/internal/middleware"
	"your-project/pkg/logger"
)


// Mock handler demonstrating context usage
func createOrderHandler(dbClient *dynamodb.Client) echo.HandlerFunc {
	return func(c echo.Context) error {
		ctx := c.Request().Context()
		
		// 1. Generate a business transaction ID
		transactionID := "txn-" + uuid.New().String()
		
		// 2. Add it to a new context for this specific transaction's scope
		txnCtx := context.WithValue(ctx, logger.TransactionIDContextKey, transactionID)
		
		// 3. Use the new context for logging
		log := logger.FromContext(txnCtx)

		log.Info("Starting order creation process")

		// --- MOCK DYNAMODB INTERACTION ---
		// In a real application, you'd use the dbClient to write to DynamoDB.
		// For example:
		// _, err := dbClient.PutItem(txnCtx, &dynamodb.PutItemInput{...})
		// if err != nil {
		//     log.Error("Failed to save order state to DynamoDB", "error", err)
		//     return c.JSON(http.StatusInternalServerError, map[string]string{"error": "db failure"})
		// }
		log.Info("Order state persisted to DynamoDB", "payload", map[string]string{
			"order_id": "ord-12345",
			"status": "PENDING",
		})
		// --- END MOCK ---
		
		log.Info("Order creation process finished successfully")

		return c.JSON(http.StatusOK, map[string]string{
			"status": "success",
			"transaction_id": transactionID,
		})
	}
}

func main() {
	// Initialize logger
	logger.Init("order-service")

	e := echo.New()

	// Apply middlewares
	e.Use(middleware.ContextualLoggingMiddleware())

	// Setup AWS SDK config
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
	if err != nil {
		slog.Error("Unable to load AWS SDK config", "error", err)
		return
	}
	dbClient := dynamodb.NewFromConfig(cfg)
	
	// Define routes
	e.POST("/orders", createOrderHandler(dbClient))

	// Start server
	e.Logger.Fatal(e.Start(":1323"))
}

The key here is that the middleware handles the trace_id, while the business logic handler is responsible for generating and injecting the transaction_id. The logger.FromContext function ensures that any logger instance derived from the context automatically carries these critical IDs.

3. Fluentd Configuration: Collect, Parse, and Forward

The Fluentd configuration is the artery of this pipeline. A misconfigured Fluentd node can become a performance bottleneck or a source of data loss during traffic spikes.

fluent.conf

# INPUT: Listen for logs from applications via forward protocol
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

# FILTER: Ensure the log field is a parsed JSON object.
# The Go slog library already outputs JSON, so we just need to parse it.
<filter **>
  @type parser
  key_name log # Assumes logs arrive in the format: {"log":"{...json...}"}
  reserve_data true
  <parse>
    @type json
  </parse>
</filter>

# FILTER: Add Kubernetes metadata if running in K8s.
# This adds valuable context like pod_name, namespace, etc.
<filter **>
  @type kubernetes_metadata
</unfilter>

# OUTPUT: Buffer logs and send to OpenSearch. This is the critical part.
<match **>
  @type opensearch
  
  host "opensearch-node1.my-domain.com"
  port 9200
  scheme https
  ssl_verify true
  # For production, use proper authentication
  # user "admin"
  # password "secret"

  logstash_format true
  logstash_prefix_key "service" # Use the 'service' field from the log for the index prefix
  logstash_prefix "logs"       # Default prefix if service field is missing
  logstash_dateformat "%Y.%m.%d"
  index_name logs-${service}-%Y.%m.%d # Example: logs-order-service-2023.10.27

  # --- Resiliency and Performance Tuning ---
  # This buffer configuration is critical for production.
  <buffer tag, service>
    @type file
    path /var/log/fluentd/buffer/opensearch
    
    # Total buffer size on disk. Adjust based on available disk space
    # and expected downtime of OpenSearch.
    total_limit_size 2g 

    # Break down data into chunks.
    chunk_limit_size 16m

    # How often to try to send data to OpenSearch.
    flush_interval 5s
    # Use multiple threads to send data.
    flush_thread_count 4
    
    # Retry logic for network or OpenSearch issues.
    retry_type exponential_backoff
    retry_wait 1s
    retry_max_interval 60s
    retry_timeout 12h # Keep retrying for 12 hours before dropping data.
  </buffer>
  
  # Suppress errors for records that OpenSearch rejects permanently.
  <secondary>
    @type file
    path /var/log/fluentd/error/opensearch-error
  </secondary>
</match>

The <buffer> block is the key here. It stages logs to local files, preventing data loss if OpenSearch becomes unavailable. With an exponential_backoff retry strategy, it gracefully handles network jitter or temporary failures in the target cluster. This is the core of a production-grade configuration.

4. OpenSearch Index Template

Without a proper mapping, OpenSearch will dynamically guess field types, which often leads to problems (e.g., mapping a numeric ID as long instead of keyword). We must define an index template beforehand.

Apply this template via the OpenSearch API (e.g., PUT _index_template/logs_template):

{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1
    },
    "mappings": {
      "properties": {
        "timestamp": { "type": "date" },
        "level": { "type": "keyword" },
        "service": { "type": "keyword" },
        "trace_id": { "type": "keyword" },
        "transaction_id": { "type": "keyword" },
        "user_id": { "type": "keyword" },
        "message": { "type": "text" },
        "source_ip": { "type": "ip" },
        "duration_ms": { "type": "float" },
        "payload": { "type": "object", "enabled": false }
      }
    }
  },
  "priority": 500,
  "composed_of": []
}

Key Decisions:

  • trace_id and transaction_id are defined as keyword. This is crucial because we need to perform exact matches and aggregations on them, not full-text searches.
  • message is a text type, enabling full-text search.
  • payload is set to "enabled": false. This means OpenSearch will not create an index for any fields inside the payload object. This is a trade-off: it saves significant storage space and indexing overhead, but at the cost of being unable to directly search for specific fields within the payload (e.g., payload.order_id). For our scenario, where we assume logs are retrieved primarily by IDs and the payload is inspected afterward, this is a reasonable setting.

Limitations and Future Path

This architecture solves the core log correlation problem, but in a true large-scale production environment, several other aspects need consideration:

  1. Fluentd High Availability: A single Fluentd node is still a single point of failure. In production, you would deploy a Fluentd aggregation layer, forming a highly available forwarding cluster that receives logs from Fluentd agents on individual application nodes.
  2. Cost and Sampling: Ingesting and storing DEBUG-level logs from all services can be prohibitively expensive. The next step is to implement a dynamic sampling strategy. For instance, you could default to collecting INFO-level logs and above, but allow a special header (e.g., X-Debug-Trace: true) from the API Gateway to enable full logging for a specific trace_id. This is incredibly effective for live issue investigation.
  3. Integration with Distributed Tracing: A trace_id only correlates logs. A complete observability system also requires distributed tracing data (Traces). A future iteration would be to unify this trace_id with a standard Trace ID from a system like OpenTelemetry, seamlessly linking Logs and Traces for deeper system insights.
  4. Index Lifecycle Management (ILM): Log data grows rapidly. You must configure ILM policies in OpenSearch to automatically migrate older indices from hot nodes to warm/cold nodes, and eventually delete them after a set period (e.g., 90 days) to control storage costs.

  TOC