We were faced with a seemingly straightforward requirement: build a service to receive unstructured text documents, process them through a Natural Language Processing (NLP) model for tasks like entity recognition and sentiment analysis, and store the structured results for downstream queries. However, this came with several strict, production-grade constraints. First, the entire processing pipeline needed to be financially auditable, meaning every single process, model update, or even failed attempt had to be traceable. Second, the system had to be highly available; failures or performance bottlenecks in the NLP module could not block the document ingestion flow. Third, the architecture had to support future business expansion, such as reprocessing historical data with new NLP models or adding new data consumers.
A simple approach using a synchronous RESTful API to call a spaCy model and write the results to PostgreSQL was rejected in the first design review. This design is too tightly coupled. A synchronous call means frontend latency is dictated by the slowest link in the chain—the NLP processing. More importantly, it completely fails to meet the requirements for auditability and traceability. If data were incorrectly updated, we would permanently lose the pre-change state and the reason for the change.
A second proposal introduced an asynchronous processing model with a message queue. An API service would receive documents, place them on a queue, and a separate NLP processing service would consume them. This solved the issues of service decoupling and asynchronous processing. However, it still didn’t address the core audit problem. The records in the database would remain a snapshot of the “current state.” We would know what the data is, but not how it got that way. When the need arose to reprocess millions of historical documents with a new model, the operation would become incredibly complex and risky—essentially a massive, destructive database update.
Ultimately, we chose an architecture based on Event Sourcing (ES) and Command Query Responsibility Segregation (CQRS). This isn’t a popular silver bullet but a deliberate trade-off. Its core idea is that the single source of truth in the system is not the state itself, but an immutable log of “events” that describe state changes. This inherently satisfies the audit requirement. PostgreSQL‘s transactional capabilities and powerful JSONB type make it an ideal choice for the Event Store. NATS JetStream serves as a high-performance, reliable bus for passing events between microservices. spaCy handles the core domain logic—the NLP processing.
Architectural Decisions & Data Flow
Our system is broken down into several microservices with distinct responsibilities. They communicate via events rather than direct API calls.
graph TD
subgraph "Client"
Client[External Caller]
end
subgraph "Write Model (Commands)"
Client -- 1. HTTP POST /documents --> CommandService[Command Service (Go)]
CommandService -- 2. Validate & generate DocumentReceived event --> PG_EventStore[(PostgreSQL Event Store)]
PG_EventStore -- 3. AFTER COMMIT Trigger --> NATS[NATS JetStream]
end
subgraph "NLP Processing (Business Logic)"
NATS -- 4. Consume DocumentReceived event --> NlpProcessor[NLP Processor (Python/spaCy)]
NlpProcessor -- 5. Perform NLP analysis --> NlpProcessor
NlpProcessor -- 6. Generate NlpCompleted/NlpFailed event --> PG_EventStore
end
subgraph "Read Model (Queries)"
NATS -- 7. Consume all relevant events --> Projector[Projector Service (Go)]
Projector -- 8. Update/Insert read model --> PG_ReadModel[(PostgreSQL Read Model)]
QueryService[Query Service (Go)] -- 9. HTTP GET /documents/:id --> PG_ReadModel
Client -- 10. Query result --> QueryService
end
style PG_EventStore fill:#f9f,stroke:#333,stroke-width:2px
style PG_ReadModel fill:#ccf,stroke:#333,stroke-width:2px
- Command Service: The sole entry point for external write requests. It validates the legitimacy of a command but performs no business logic. Its only responsibility is to generate an event representing the intent (e.g.,
DocumentReceived) and atomically write it to thePostgreSQLEvent Store. - Event Store: The heart of the system. We opted to implement this with a simple table in
PostgreSQL. This table only allows appends; modifications and deletions are forbidden. Each time we write an event, we check the version number of the business entity (a document, in this case) to implement optimistic concurrency control. - Event Bus (NATS JetStream): After an event is successfully persisted to the database, we need to notify other services. Rather than having the command service write to the database and publish a message (which introduces the complexity of distributed transactions), we use the more reliable Transactional Outbox pattern. The event is written to an
outboxtable within the same transaction, and a separate process or database trigger reliably pushes new events toNATS. - NLP Processor: A background worker that subscribes to events of interest on
NATS(e.g.,DocumentReceived). Upon receiving an event, it performs the time-consumingspaCyanalysis and then wraps the result in a new event (e.g.,NlpCompletedorNlpFailed), which is also written back to the Event Store. - Projector Service: This service subscribes to all relevant event streams and “projects” them into a “read model” optimized for queries. This read model can be one or more denormalized
PostgreSQLtables. Its purpose is to decouple the write model (the event stream) from the read model (the state view). - Query Service: A very lightweight service that reads data directly from the read model and returns it to the client. It contains no business logic.
Core Implementation: PostgreSQL as an Event Store
The table structure for the event store is critical. It must be efficient, scalable, and easy to query.
-- events.sql: Core table structure for the Event Store
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- A single table to store all events
CREATE TABLE events (
-- Globally unique event ID
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- Aggregate root ID, which is the document's ID
stream_id UUID NOT NULL,
-- The version of the event within the stream, for optimistic locking
version BIGINT NOT NULL,
-- The type of event, e.g., 'DocumentReceived', 'NlpCompleted'
event_type VARCHAR(255) NOT NULL,
-- The event payload, storing specific data
payload JSONB NOT NULL,
-- Event metadata, e.g., correlation_id, causation_id
metadata JSONB DEFAULT '{}'::jsonb,
-- The timestamp when the event occurred
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Ensures that the version number for each aggregate root is unique
CONSTRAINT events_stream_id_version_unique UNIQUE (stream_id, version)
);
-- Indexes for common queries
CREATE INDEX idx_events_stream_id ON events (stream_id);
CREATE INDEX idx_events_event_type ON events (event_type);
CREATE INDEX idx_events_created_at ON events (created_at);
COMMENT ON COLUMN events.stream_id IS 'The ID of the associated business entity, i.e., the aggregate root ID.';
COMMENT ON COLUMN events.version IS 'The event version number, used for optimistic concurrency control.';
COMMENT ON COLUMN events.payload IS 'The specific data for the event, stored in JSON format.';
COMMENT ON COLUMN events.metadata IS 'Metadata for tracing and debugging purposes.';
Here, stream_id represents a business entity (like a document), and the version field is key to implementing optimistic locking. When a service wants to append a new event for a document, it must first read the document’s current version number and then increment it when writing the new event. If another process has already written a new event in the meantime, the database’s UNIQUE constraint will trigger, the operation will fail, and the application layer will need to retry.
Command Service: Appending Events Atomically
The implementation of the command service (we used Go) must guarantee the atomicity of event writes.
// command-service/main.go
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
"github.com/nats-io/nats.go"
)
// Global variables, should be managed via configuration in production
var db *sql.DB
var nc *nats.Conn
var js nats.JetStreamContext
const eventStoreTableName = "events"
// Event defines the basic structure of an event
type Event struct {
StreamID uuid.UUID `json:"stream_id"`
Version int64 `json:"version"`
EventType string `json:"event_type"`
Payload json.RawMessage `json:"payload"`
}
// DocumentReceivedPayload is the specific content for a DocumentReceived event
type DocumentReceivedPayload struct {
Content string `json:"content"`
Source string `json:"source"`
ReceivedAt time.Time `json:"received_at"`
}
func main() {
// ... Initialize DB and NATS connections ...
var err error
db, err = sql.Open("postgres", "user=... password=... dbname=... sslmode=disable")
if err != nil {
log.Fatalf("Failed to connect to PostgreSQL: %v", err)
}
// ... NATS JetStream setup ...
nc, err = nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
js, err = nc.JetStream()
if err != nil {
log.Fatalf("Failed to get JetStream context: %v", err)
}
// Create a stream named 'EVENTS'
js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.>"},
})
http.HandleFunc("/documents", handleReceiveDocument)
log.Println("Command service listening on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func handleReceiveDocument(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var reqBody struct {
Content string `json:"content"`
Source string `json:"source"`
}
if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
// This is a new document, so we generate a new UUID
docID := uuid.New()
payload, _ := json.Marshal(DocumentReceivedPayload{
Content: reqBody.Content,
Source: reqBody.Source,
ReceivedAt: time.Now().UTC(),
})
event := Event{
StreamID: docID,
Version: 1, // The first event for a new document, so version is 1
EventType: "DocumentReceived",
Payload: payload,
}
if err := appendToStream(context.Background(), event); err != nil {
// Error handling here is crucial
// Could be a version conflict or a DB connection issue
log.Printf("Error appending event: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"document_id": docID.String()})
}
// appendToStream atomically writes an event to the DB and publishes to NATS
func appendToStream(ctx context.Context, event Event) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("could not begin transaction: %w", err)
}
defer tx.Rollback() // Rollback is a no-op if Commit succeeds
query := fmt.Sprintf(
`INSERT INTO %s (stream_id, version, event_type, payload) VALUES ($1, $2, $3, $4)`,
eventStoreTableName,
)
_, err = tx.ExecContext(ctx, query, event.StreamID, event.Version, event.EventType, event.Payload)
if err != nil {
// Error could be caused by the UNIQUE constraint (version conflict)
return fmt.Errorf("could not insert event: %w", err)
}
// Publish the event to NATS
eventData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("could not marshal event for NATS: %w", err)
}
// Subject structure: events.<event_type>.<stream_id>
subject := fmt.Sprintf("events.%s.%s", event.EventType, event.StreamID.String())
_, err = js.Publish(subject, eventData)
if err != nil {
// If NATS publish fails, the transaction is rolled back, ensuring data consistency
return fmt.Errorf("could not publish event to NATS: %w", err)
}
// Only commit the transaction if all operations succeed
return tx.Commit()
}
Note the appendToStream function. It executes the INSERT and js.Publish within a single database transaction. This is a simplified implementation. In a mission-critical project, for maximum reliability, we wouldn’t publish directly to NATS inside the transaction. We would write to an outbox table and have another process poll that table to publish the events (the Transactional Outbox pattern). This guarantees that even if the NATS service is temporarily unavailable, the event will eventually be published. However, for many scenarios, placing the publish operation at the end of the transaction and rolling back on failure is sufficiently robust.
NLP Processor: An Idempotent Event Consumer
The NLP processor carries the core business logic. It must be designed to be idempotent, as messaging systems like NATS typically provide “at-least-once” delivery guarantees, meaning the same event could be consumed multiple times.
# nlp-processor/main.py
import asyncio
import json
import logging
import os
import spacy
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from uuid import UUID
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Load spaCy model. This is a time-consuming operation and should be done at startup.
try:
nlp = spacy.load("en_core_web_sm")
logging.info("spaCy model 'en_core_web_sm' loaded successfully.")
except OSError:
logging.error("Model 'en_core_web_sm' not found. Please run 'python -m spacy download en_core_web_sm'")
exit(1)
# Database connection
DB_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/db_name")
engine = create_engine(DB_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class EventStore:
"""A simple class for interacting with the event store."""
def get_current_version(self, session, stream_id: UUID) -> int:
query = text("SELECT COALESCE(MAX(version), 0) FROM events WHERE stream_id = :stream_id")
result = session.execute(query, {"stream_id": stream_id}).scalar_one()
return int(result)
def append_event(self, session, stream_id: UUID, event_type: str, payload: dict):
current_version = self.get_current_version(session, stream_id)
next_version = current_version + 1
# This is a key step for ensuring idempotency.
# If an event of the same type already exists after the current version,
# we assume it has already been processed.
check_query = text("""
SELECT 1 FROM events
WHERE stream_id = :stream_id AND version > :current_version AND event_type = :event_type
""")
exists = session.execute(check_query, {
"stream_id": stream_id,
"current_version": current_version,
"event_type": event_type
}).first()
if exists:
logging.warning(f"Event '{event_type}' for stream {stream_id} seems to be already processed. Skipping.")
return
query = text("""
INSERT INTO events (stream_id, version, event_type, payload)
VALUES (:stream_id, :version, :event_type, :payload)
""")
session.execute(query, {
"stream_id": stream_id,
"version": next_version,
"event_type": event_type,
"payload": json.dumps(payload)
})
logging.info(f"Appended event '{event_type}' v{next_version} for stream {stream_id}")
event_store = EventStore()
async def message_handler(msg):
subject = msg.subject
data = json.loads(msg.data.decode())
logging.info(f"Received a message on '{subject}'")
stream_id = UUID(data['StreamID'])
content = data['Payload']['content']
db_session = SessionLocal()
try:
# Perform NLP processing
doc = nlp(content)
entities = [{"text": ent.text, "label": ent.label_} for ent in doc.ents]
result_payload = {
"entities": entities,
"model_version": nlp.meta['version']
}
# Write the processing result as a new event
event_store.append_event(db_session, stream_id, "NlpCompleted", result_payload)
db_session.commit()
except Exception as e:
logging.error(f"Failed to process message for stream {stream_id}: {e}", exc_info=True)
db_session.rollback()
# Generate a failure event
failure_payload = {"error": str(e), "original_event": data}
try:
event_store.append_event(db_session, stream_id, "NlpFailed", failure_payload)
db_session.commit()
except Exception as commit_e:
logging.error(f"Failed to even commit failure event for stream {stream_id}: {commit_e}")
db_session.rollback()
finally:
db_session.close()
async def main():
nc = NATS()
try:
await nc.connect(servers=["nats://localhost:4222"])
logging.info("Connected to NATS.")
except ErrNoServers as e:
logging.error(f"Could not connect to NATS: {e}")
return
# Subscribe to 'DocumentReceived' events
# Use a durable queue subscription to resume from where it left off after a restart
await nc.jetstream().subscribe("events.DocumentReceived.>", durable="nlp-processor", cb=message_handler)
logging.info("Subscribed to 'events.DocumentReceived.>'")
# Keep running
try:
await asyncio.Future()
except asyncio.CancelledError:
await nc.close()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
The idempotency check in EventStore.append_event is a straightforward implementation. In more complex scenarios, you might maintain a set of processed message IDs in a separate table and check against it before processing. Error handling is also critical: when NLP processing fails, we don’t just drop the message; we generate an NlpFailed event. This ensures that the system’s state changes remain traceable, allowing us to analyze or retry these failed documents later.
Projector Service: Building a Query-Friendly Read Model
The job of the projector is to “flatten” the event stream. It consumes events and updates one or more denormalized tables designed specifically for fast queries.
-- read_models.sql: Read model table structure
CREATE TABLE documents_summary (
id UUID PRIMARY KEY,
source VARCHAR(255),
content TEXT,
status VARCHAR(50) NOT NULL DEFAULT 'PENDING',
entities JSONB,
model_version VARCHAR(50),
last_processed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX idx_documents_summary_status ON documents_summary(status);
This documents_summary table is our read model. It contains all the information we want to query quickly. The projector’s Go implementation is as follows:
// projector-service/main.go
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
"github.com/nats-io/nats.go"
)
// ... DB and NATS connection setup ...
type Event struct {
StreamID uuid.UUID `json:"StreamID"`
EventType string `json:"EventType"`
Payload json.RawMessage `json:"Payload"`
}
func main() {
// ... Initialize DB and NATS ...
js.Subscribe("events.>", func(msg *nats.Msg) {
var event Event
if err := json.Unmarshal(msg.Data, &event); err != nil {
log.Printf("Error unmarshalling event: %v", err)
msg.Nak() // Tell NATS the message failed processing; retry later
return
}
ctx := context.Background()
if err := handleEvent(ctx, event); err != nil {
log.Printf("Error handling event %s for stream %s: %v", event.EventType, event.StreamID, err)
msg.Nak()
return
}
msg.Ack() // Acknowledge successful processing
}, nats.Durable("document-projector"), nats.ManualAck())
log.Println("Projector service running...")
select {} // Block forever
}
func handleEvent(ctx context.Context, event Event) error {
switch event.EventType {
case "DocumentReceived":
return handleDocumentReceived(ctx, event)
case "NlpCompleted":
return handleNlpCompleted(ctx, event)
case "NlpFailed":
return handleNlpFailed(ctx, event)
default:
log.Printf("Ignoring unknown event type: %s", event.EventType)
return nil
}
}
func handleDocumentReceived(ctx context.Context, event Event) error {
var payload struct {
Content string `json:"content"`
Source string `json:"source"`
ReceivedAt time.Time `json:"received_at"`
}
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
query := `
INSERT INTO documents_summary (id, source, content, status, created_at, updated_at)
VALUES ($1, $2, $3, 'PROCESSING', $4, $4)
ON CONFLICT (id) DO NOTHING;`
_, err := db.ExecContext(ctx, query, event.StreamID, payload.Source, payload.Content, payload.ReceivedAt)
return err
}
func handleNlpCompleted(ctx context.tocontext, event Event) error {
var payload struct {
Entities json.RawMessage `json:"entities"`
ModelVersion string `json:"model_version"`
}
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
query := `
UPDATE documents_summary
SET status = 'COMPLETED', entities = $2, model_version = $3, last_processed_at = NOW(), updated_at = NOW()
WHERE id = $1;`
_, err := db.ExecContext(ctx, query, event.StreamID, payload.Entities, payload.ModelVersion)
return err
}
// ... handleNlpFailed is similar ...
The projector’s logic is very direct: listen for all events and execute the corresponding INSERT or UPDATE based on the event type. Using ON CONFLICT (id) DO NOTHING is a simple and effective way to handle duplicate events, guaranteeing the idempotency of the projection operation.
Architectural Boundaries and Future Outlook
While this event-sourcing architecture solves the core problems of auditability, decoupling, and extensibility, it also introduces new complexities. The first is eventual consistency. The read model will always lag slightly behind the write model. A client that queries immediately after a write may not see the latest state, which can be unacceptable for certain business scenarios.
Second, event schema management is a long-term challenge. As the business evolves, the structure of events will inevitably change. Handling old event versions (event upcasting) and ensuring compatibility between old and new code requires a comprehensive versioning and migration strategy.
Finally, for business entities with very long lifecycles, their event streams can become enormous. Rebuilding state by reading all events from the beginning can become a performance bottleneck. The common solution is to introduce “snapshots”—periodically saving the current state of an entity. To rebuild, you only need to apply events that occurred after the most recent snapshot.
Despite these challenges, this architecture gives us tremendous flexibility. When we need to reprocess all historical data with a more advanced spaCy model, we don’t need to perform a risky database migration. We can simply launch a new projector service that reads all DocumentReceived events from the events table from the beginning, republishes them to a new NATS subject, and has them consumed by an NLP processor configured with the new model. The old read model can remain online, untouched, until the new one is fully built. We can then seamlessly cut over traffic to the new model, all with zero impact on the live query service. This is the true power of an event-sourced architecture.