Production logs without context are just meaningless noise. When a user request traverses an API Gateway, flows through three microservices, and culminates in a state change in DynamoDB, a subtle error can be a disaster to debug. If you’re faced with four separate, differently formatted, and uncorrelated log streams, troubleshooting becomes a nightmare. This is the exact pain point we faced: fragmented logs and a complete loss of context.
Our goal is to build a system where any request can be tracked end-to-end. More than that, we need to tightly couple the technical trace ID (trace_id) with the business-level transaction ID (transaction_id). This way, whether you’re in tech support or development, you can take a business order number, instantly pull up every related technical log, and see the complete lifecycle of that interaction.
Architectural Vision and Technical Trade-offs
To achieve this, we need a clean architecture and a set of cooperating components.
- Unified Entrypoint & Trace Initiation: All traffic must pass through an API Gateway. This is the perfect place to generate the end-to-end
trace_id. We’ll leverage the gateway’s capabilities to inject a uniqueX-Trace-IDheader into every incoming request. - Service Framework & Context Propagation: Our backend services are built with Go and the Echo framework. We must implement a generic Echo middleware that automatically parses the
X-Trace-IDheader and injects it into the request’s context. All subsequent log generation must be able to automatically extract this ID from the context. - Structured Logging: Say goodbye to plain text logs. All services must output logs in JSON format. We’ll define a single, global logging schema that includes
timestamp,level,service_name,trace_id,transaction_id,message, and apayloadfield for structured business-related data. - Log Aggregation & Routing: Fluentd is the core of this stage. It’s lightweight with a rich plugin ecosystem. We’ll deploy it as a daemon on each service node or as a sidecar container, responsible for collecting local log streams, performing necessary parsing and buffering, and then reliably forwarding them to the backend.
- Storage & Querying: OpenSearch provides powerful full-text search and aggregation capabilities, making it the ideal choice for storing and querying structured logs. By pre-defining an index template, we can ensure log data is indexed correctly, optimizing query performance.
- Business Context Correlation: DynamoDB is our primary business database. When a core business process (e.g., creating an order) begins, a unique business
transaction_idis generated and stored in DynamoDB. When services handle this process, they must log not only thetrace_idbut also thistransaction_id. This is the critical bridge between the technical and business domains.
The complete data flow is as follows:
graph TD
A[Client] --> B(API Gateway);
B -- "Adds X-Trace-ID header" --> C{Echo Service};
C -- "Reads X-Trace-ID, adds to context" --> D[Business Logic];
D -- "Generates transaction_id" --> E(DynamoDB);
D -- "Logs with trace_id & transaction_id" --> F((Log Stream));
F --> G[Fluentd Agent];
G -- "Forwards structured logs" --> H(OpenSearch);
I[Developer/SRE] -- "Queries by trace_id or transaction_id" --> H;
Core Implementation Details
1. Defining a Unified Log Structure
This is the foundation for everything else. A poor log structure will handicap all subsequent efforts. Our standard structure is:
{
"timestamp": "2023-10-27T10:30:00.123Z",
"level": "info",
"service": "order-service",
"trace_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"transaction_id": "txn-987654321",
"user_id": "usr-abcdef123",
"message": "Order successfully created",
"payload": {
"order_id": "ord-fedcba987",
"item_count": 3,
"total_amount": 99.99
},
"source_ip": "192.168.1.100",
"duration_ms": 15.6
}
-
trace_id: Generated by the API Gateway, persists throughout the request’s lifecycle. -
transaction_id: Generated by business logic, correlates a specific business process. -
payload: A flexible JSON object for carrying rich business context.
2. Echo Middleware: Context Injection and Structured Logging
We’ll use slog as the standard structured logging library for Go 1.21+ and write an Echo middleware to automate context handling.
pkg/logger/logger.go:
package logger
import (
"context"
"log/slog"
"os"
"sync"
)
// customContextKey is a private type to avoid key collisions in context.
type customContextKey string
const (
TraceIDContextKey customContextKey = "trace_id"
TransactionIDContextKey customContextKey = "transaction_id"
)
var (
defaultLogger *slog.Logger
once sync.Once
)
// Init initializes the singleton structured logger.
func Init(serviceName string) {
once.Do(func() {
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
// Custom replacer to enrich logs with context values
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
// This is a placeholder for where context would be available.
// The actual enrichment happens in the context-aware logger.
return a
},
}).WithAttrs([]slog.Attr{slog.String("service", serviceName)})
defaultLogger = slog.New(handler)
})
}
// FromContext returns a logger that automatically includes fields from the context.
// In a real application, you might use a more sophisticated handler that
// inspects the context given to log methods, but this approach is simpler for demonstration.
func FromContext(ctx context.Context) *slog.Logger {
if defaultLogger == nil {
Init("unknown-service") // Fallback
}
attrs := []slog.Attr{}
if traceID, ok := ctx.Value(TraceIDContextKey).(string); ok && traceID != "" {
attrs = append(attrs, slog.String("trace_id", traceID))
}
if transactionID, ok := ctx.Value(TransactionIDContextKey).(string); ok && transactionID != "" {
attrs = append(attrs, slog.String("transaction_id", transactionID))
}
if len(attrs) > 0 {
return defaultLogger.With(attrs...)
}
return defaultLogger
}
internal/middleware/logging.go:
package middleware
import (
"context"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"your-project/pkg/logger" // Import your logger package
)
const TraceIDHeader = "X-Trace-ID"
// ContextualLoggingMiddleware injects trace_id into the request context
// and ensures all subsequent logs for this request are structured and contextual.
func ContextualLoggingMiddleware() echo.MiddlewareFunc {
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
req := c.Request()
ctx := req.Context()
// 1. Extract or generate Trace ID
traceID := req.Header.Get(TraceIDHeader)
if traceID == "" {
traceID = uuid.New().String()
// In a real scenario, you might want to log that a trace ID was generated here.
}
c.Response().Header().Set(TraceIDHeader, traceID) // Echo it back in the response
// 2. Inject into context
ctx = context.WithValue(ctx, logger.TraceIDContextKey, traceID)
c.SetRequest(req.WithContext(ctx))
// Use the context-aware logger for the request log
reqLogger := logger.FromContext(ctx)
reqLogger.Info("Request started",
"method", req.Method,
"uri", req.RequestURI,
"remote_ip", c.RealIP(),
)
err := next(c)
// After request processing, log the final status
reqLogger.Info("Request finished",
"status", c.Response().Status,
"size", c.Response().Size,
)
return err
}
}
}
Enable it in main.go:
package main
import (
"context"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"log/slog"
"net/http"
"your-project/internal/middleware"
"your-project/pkg/logger"
)
// Mock handler demonstrating context usage
func createOrderHandler(dbClient *dynamodb.Client) echo.HandlerFunc {
return func(c echo.Context) error {
ctx := c.Request().Context()
// 1. Generate a business transaction ID
transactionID := "txn-" + uuid.New().String()
// 2. Add it to a new context for this specific transaction's scope
txnCtx := context.WithValue(ctx, logger.TransactionIDContextKey, transactionID)
// 3. Use the new context for logging
log := logger.FromContext(txnCtx)
log.Info("Starting order creation process")
// --- MOCK DYNAMODB INTERACTION ---
// In a real application, you'd use the dbClient to write to DynamoDB.
// For example:
// _, err := dbClient.PutItem(txnCtx, &dynamodb.PutItemInput{...})
// if err != nil {
// log.Error("Failed to save order state to DynamoDB", "error", err)
// return c.JSON(http.StatusInternalServerError, map[string]string{"error": "db failure"})
// }
log.Info("Order state persisted to DynamoDB", "payload", map[string]string{
"order_id": "ord-12345",
"status": "PENDING",
})
// --- END MOCK ---
log.Info("Order creation process finished successfully")
return c.JSON(http.StatusOK, map[string]string{
"status": "success",
"transaction_id": transactionID,
})
}
}
func main() {
// Initialize logger
logger.Init("order-service")
e := echo.New()
// Apply middlewares
e.Use(middleware.ContextualLoggingMiddleware())
// Setup AWS SDK config
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
slog.Error("Unable to load AWS SDK config", "error", err)
return
}
dbClient := dynamodb.NewFromConfig(cfg)
// Define routes
e.POST("/orders", createOrderHandler(dbClient))
// Start server
e.Logger.Fatal(e.Start(":1323"))
}
The key here is that the middleware handles the trace_id, while the business logic handler is responsible for generating and injecting the transaction_id. The logger.FromContext function ensures that any logger instance derived from the context automatically carries these critical IDs.
3. Fluentd Configuration: Collect, Parse, and Forward
The Fluentd configuration is the artery of this pipeline. A misconfigured Fluentd node can become a performance bottleneck or a source of data loss during traffic spikes.
fluent.conf
# INPUT: Listen for logs from applications via forward protocol
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# FILTER: Ensure the log field is a parsed JSON object.
# The Go slog library already outputs JSON, so we just need to parse it.
<filter **>
@type parser
key_name log # Assumes logs arrive in the format: {"log":"{...json...}"}
reserve_data true
<parse>
@type json
</parse>
</filter>
# FILTER: Add Kubernetes metadata if running in K8s.
# This adds valuable context like pod_name, namespace, etc.
<filter **>
@type kubernetes_metadata
</unfilter>
# OUTPUT: Buffer logs and send to OpenSearch. This is the critical part.
<match **>
@type opensearch
host "opensearch-node1.my-domain.com"
port 9200
scheme https
ssl_verify true
# For production, use proper authentication
# user "admin"
# password "secret"
logstash_format true
logstash_prefix_key "service" # Use the 'service' field from the log for the index prefix
logstash_prefix "logs" # Default prefix if service field is missing
logstash_dateformat "%Y.%m.%d"
index_name logs-${service}-%Y.%m.%d # Example: logs-order-service-2023.10.27
# --- Resiliency and Performance Tuning ---
# This buffer configuration is critical for production.
<buffer tag, service>
@type file
path /var/log/fluentd/buffer/opensearch
# Total buffer size on disk. Adjust based on available disk space
# and expected downtime of OpenSearch.
total_limit_size 2g
# Break down data into chunks.
chunk_limit_size 16m
# How often to try to send data to OpenSearch.
flush_interval 5s
# Use multiple threads to send data.
flush_thread_count 4
# Retry logic for network or OpenSearch issues.
retry_type exponential_backoff
retry_wait 1s
retry_max_interval 60s
retry_timeout 12h # Keep retrying for 12 hours before dropping data.
</buffer>
# Suppress errors for records that OpenSearch rejects permanently.
<secondary>
@type file
path /var/log/fluentd/error/opensearch-error
</secondary>
</match>
The <buffer> block is the key here. It stages logs to local files, preventing data loss if OpenSearch becomes unavailable. With an exponential_backoff retry strategy, it gracefully handles network jitter or temporary failures in the target cluster. This is the core of a production-grade configuration.
4. OpenSearch Index Template
Without a proper mapping, OpenSearch will dynamically guess field types, which often leads to problems (e.g., mapping a numeric ID as long instead of keyword). We must define an index template beforehand.
Apply this template via the OpenSearch API (e.g., PUT _index_template/logs_template):
{
"index_patterns": ["logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"timestamp": { "type": "date" },
"level": { "type": "keyword" },
"service": { "type": "keyword" },
"trace_id": { "type": "keyword" },
"transaction_id": { "type": "keyword" },
"user_id": { "type": "keyword" },
"message": { "type": "text" },
"source_ip": { "type": "ip" },
"duration_ms": { "type": "float" },
"payload": { "type": "object", "enabled": false }
}
}
},
"priority": 500,
"composed_of": []
}
Key Decisions:
-
trace_idandtransaction_idare defined askeyword. This is crucial because we need to perform exact matches and aggregations on them, not full-text searches. -
messageis atexttype, enabling full-text search. -
payloadis set to"enabled": false. This means OpenSearch will not create an index for any fields inside thepayloadobject. This is a trade-off: it saves significant storage space and indexing overhead, but at the cost of being unable to directly search for specific fields within the payload (e.g.,payload.order_id). For our scenario, where we assume logs are retrieved primarily by IDs and the payload is inspected afterward, this is a reasonable setting.
Limitations and Future Path
This architecture solves the core log correlation problem, but in a true large-scale production environment, several other aspects need consideration:
- Fluentd High Availability: A single Fluentd node is still a single point of failure. In production, you would deploy a Fluentd aggregation layer, forming a highly available forwarding cluster that receives logs from Fluentd agents on individual application nodes.
- Cost and Sampling: Ingesting and storing DEBUG-level logs from all services can be prohibitively expensive. The next step is to implement a dynamic sampling strategy. For instance, you could default to collecting INFO-level logs and above, but allow a special header (e.g.,
X-Debug-Trace: true) from the API Gateway to enable full logging for a specifictrace_id. This is incredibly effective for live issue investigation. - Integration with Distributed Tracing: A
trace_idonly correlates logs. A complete observability system also requires distributed tracing data (Traces). A future iteration would be to unify thistrace_idwith a standard Trace ID from a system like OpenTelemetry, seamlessly linking Logs and Traces for deeper system insights. - Index Lifecycle Management (ILM): Log data grows rapidly. You must configure ILM policies in OpenSearch to automatically migrate older indices from hot nodes to warm/cold nodes, and eventually delete them after a set period (e.g., 90 days) to control storage costs.