Building a Real-time Feature Store with Automated Quality Monitoring using GitOps and Pulsar


The initial motivation for building a machine learning feature store is to solve the classic problem of inconsistency between feature calculations for model training and online inference. However, a feature store that only solves for “consistency” is far from sufficient in a production environment; it can quickly devolve into an unmaintainable, low-quality data swamp. The real challenge lies in constructing a system that is trustworthy, observable, and highly automated. When business requirements demand feature latency within seconds, coupled with strict governance over feature iteration and quality, traditional batch processing architectures fall short.

The core technical challenge we faced was: how to design a system that not only provides low-latency, real-time features but also seamlessly integrates feature definition, iteration, quality monitoring, and end-to-end validation into the team’s existing development workflow.

Option A: Traditional Batch ETL Architecture

Before discussing the final solution, it’s worth examining the common initial approach in the industry: a feature engineering pipeline based on batch processing.

Its typical architecture usually consists of:

  1. Data Source: Business data resides in a data lake (e.g., Parquet files on S3) or a data warehouse.
  2. Compute Engine: A scheduled Spark or Flink batch job runs daily or hourly, reading raw data from the source and executing complex feature computation logic.
  3. Storage: The computed features are written to a key-value store (like Redis or DynamoDB) for fast online lookups and potentially to the data lake for offline training.
  4. Governance & Monitoring: Often missing or manual. Feature definitions are scattered across various Spark job codebases. Quality checks rely on data engineers writing ad-hoc SQL queries or reactively investigating after a model’s performance degrades.

Pros:

  • Mature Ecosystem: The Spark ecosystem is powerful and naturally suited for processing massive datasets and complex batch transformations.
  • Simple Implementation: For teams already equipped with a data lake and a batch processing platform, the initial setup cost for such a pipeline is relatively low.

Cons:

  • High Latency: Feature update frequency is limited by the batch job’s schedule, failing to meet the needs of scenarios requiring real-time data, such as real-time fraud detection or recommendations.
  • Lack of Governance: Feature definitions are tightly coupled with the code, with no central registry. When multiple teams need to reuse or modify features, conflicts and chaos easily arise. In this scenario, Code Review is limited to verifying the correctness of the computation logic, rather than ensuring feature clarity and consistency at an architectural level.
  • Reactive Quality Monitoring: Data quality issues are typically discovered only after downstream model applications fail, making troubleshooting extremely costly. It lacks the capability for real-time alerting when problems occur.
  • Difficult to Test: The entire pipeline is asynchronous and fragmented, making automated end-to-end testing incredibly difficult and often reliant on manual verification.

In our context, sub-second latency and strict feature governance were non-negotiable requirements, so Option A was immediately ruled out.

Option B: An Event-Driven, Real-time, and Observable Feature Architecture

To overcome the shortcomings of the batch architecture, we designed a solution centered around an event stream, deeply integrated with GitOps, automated monitoring, and end-to-end testing.

Its core components and workflow are as follows:

graph TD
    subgraph Git Repository [GitOps: Feature Definition as Code]
        A[feature.yaml] -- PR & Code Review --> B(CI/CD Pipeline)
    end

    subgraph Real-time Ingestion & Processing
        C[Upstream Services] -- Raw Events --> D[Apache Pulsar: Raw Topics]
        D -- "Schema-driven Feature Engineering" --> E[Feature Generation Service]
        E -- "Typed Feature Events" --> F[Apache Pulsar: Feature Topics]
    end

    subgraph Online & Monitoring Consumers
        F -- "Shared Subscription" --> G[Feature Materializer Service]
        G -- "Write" --> H[Online Store: Redis]
        F -- "Shared Subscription" --> I[Data Quality Monitor]
        I -- "Calculate Stats & Generate Plots" --> J[Seaborn Engine]
        J -- "Save Report Artifacts" --> K[Monitoring Dashboard / S3]
    end

    subgraph Serving & Testing
        L[ML Model Service] -- "Read Features" --> H
        M[Cypress E2E Test] -- "1. Trigger Event" --> C
        M -- "2. Validate UI/API" --> N[Feature Store UI]
        N -- "Reads Data From" --> H & K
    end

    B -- "Deploy/Configure Consumers" --> G & I
    B -- "Register Pulsar Schema" --> F

Here’s the rationale behind our architectural decisions:

  1. Apache Pulsar as the Messaging Core: We chose Pulsar over Kafka. The key differentiators were Pulsar’s architecture that separates compute from storage, its built-in Schema Registry, and tiered storage. This provides immense flexibility: hot data is stored in BookKeeper for real-time consumption, while cold data can be seamlessly offloaded to object storage like S3. This naturally unifies online and offline storage, simplifying the process of generating datasets for model training.
  2. Feature Governance via GitOps and Code Review: All feature definitions (name, type, description, owner, etc.) are stored as YAML files in a Git repository. Any new feature or modification must go through a Pull Request and a rigorous Code Review. This is not just a code review; it’s a confirmation of business semantics, data contracts, and ownership. The CI pipeline automatically validates the YAML format and registers the feature metadata in the system.
  3. Automated Quality Monitoring Driven by Seaborn: We introduced a separate monitoring service that, like the feature materialization service, consumes the feature event stream from Pulsar. It performs in-memory micro-batch aggregation on features, using Pandas and the Seaborn library to calculate statistical summaries (e.g., null rate, cardinality, quantiles) and generate distribution plots (like histograms and kernel density estimations). These plots and stats are persisted, enabling real-time monitoring for feature distribution drift. Seaborn is transformed from a data analyst’s tool into a component of an automated production system.
  4. End-to-End Closed-Loop Validation with Cypress: To ensure the reliability of the entire system, we use Cypress for end-to-end testing. Test cases don’t just check an API or UI; they simulate a complete business flow: triggering an upstream event via a script -> waiting for the event to flow through Pulsar and the processing services -> verifying that the feature correctly appears in Redis -> and finally, checking if the corresponding quality report has been generated on the monitoring UI. This type of test covers the entire data pipeline and dramatically increases our confidence in system changes.

Although this solution has a higher initial complexity, it elevates feature lifecycle management, quality assurance, and system reliability to a whole new level.

Core Implementation Overview

Let’s dive into the code implementation of a few key components.

1. GitOps-driven Feature Definition

Everything starts with a Git repository dedicated to managing feature definitions.

Directory Structure:

feature_repository/
├── features/
│   ├── user_profile/
│   │   ├── user_daily_spend.yaml
│   │   └── user_login_frequency.yaml
│   └── item_profile/
│       └── item_popularity.yaml
└── ci/
    ├── validate_schema.py
    └── register_feature.py

An example feature definition file user_daily_spend.yaml:

# features/user_profile/user_daily_spend.yaml
feature_name: user_daily_spend_7d
version: 1
owner: risk-[email protected]
description: "User's average daily spend over the last 7 days"
value_type: FLOAT
entity_id: user_id
topic_name: "persistent://public/default/feature-user_profile"
tags:
  - risk_control
  - user_behavior

validation_rules:
  - rule: not_null
  - rule: range
    min: 0.0
    max: 100000.0

# Baseline profile for data quality monitoring and drift detection
baseline_profile:
  mean: 150.75
  std_dev: 45.5
  p95: 350.0

When an engineer submits a PR to add or modify this file, the Code Review process begins. Reviewers assess not just the code but also the soundness of the business logic: Is the feature definition clear? Is the owner correct? Are the validation rules reasonable?

The CI pipeline (e.g., GitHub Actions) executes the ci/validate_schema.py script to ensure the YAML file conforms to a predefined schema, then calls ci/register_feature.py to register the metadata with the Pulsar Schema Registry or a separate metadata service.

2. Pulsar Real-time Data Pipeline

Feature Generation Service (Producer):
This service consumes raw upstream events (like order events) and produces feature events.

# feature_generation_service.py
import pulsar
import json
import logging
from time import sleep

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Pulsar client configuration
PULSAR_SERVICE_URL = 'pulsar://localhost:6650'
RAW_TOPIC = 'persistent://public/default/raw-order-events'
FEATURE_TOPIC = 'persistent://public/default/feature-user_profile'
SUBSCRIPTION_NAME = 'feature-generator-sub'

class UserProfileSchema(pulsar.schema.Record):
    entity_id = pulsar.schema.String()
    feature_name = pulsar.schema.String()
    feature_value = pulsar.schema.Float()
    event_timestamp = pulsar.schema.Long()

def main():
    client = None
    producer = None
    consumer = None
    try:
        client = pulsar.Client(PULSAR_SERVICE_URL)
        
        # Create a typed producer to ensure data conforms to the schema
        producer = client.create_producer(
            topic=FEATURE_TOPIC,
            schema=pulsar.schema.JsonSchema(UserProfileSchema)
        )

        consumer = client.subscribe(
            topic=RAW_TOPIC,
            subscription_name=SUBSCRIPTION_NAME,
            schema=pulsar.schema.JsonSchema(dict) # Assuming raw events are JSON dictionaries
        )

        logging.info("Service started, listening for raw events...")

        while True:
            msg = consumer.receive()
            try:
                raw_event = msg.value()
                logging.info(f"Received raw event: {raw_event}")

                # This is a simplified business logic
                # In a real project, this might involve complex stateful computation or windowed aggregation
                if raw_event.get('event_type') == 'payment_success':
                    feature_record = UserProfileSchema(
                        entity_id=str(raw_event['user_id']),
                        feature_name='user_last_payment_amount', # Example feature
                        feature_value=float(raw_event['amount']),
                        event_timestamp=int(raw_event['timestamp'])
                    )
                    
                    producer.send(
                        feature_record,
                        partition_key=feature_record.entity_id # Ensure features for the same user go to the same partition
                    )
                    logging.info(f"Generated and sent feature event: {feature_record}")

                consumer.acknowledge(msg)
            except Exception as e:
                logging.error(f"Failed to process message: {e}", exc_info=True)
                # Failed to process message, will negatively acknowledge for retry
                consumer.negative_acknowledge(msg) 

    except KeyboardInterrupt:
        logging.info("Shutting down service...")
    except Exception as e:
        logging.error(f"A critical error occurred: {e}", exc_info=True)
    finally:
        if producer:
            producer.close()
        if consumer:
            consumer.close()
        if client:
            client.close()
        logging.info("Service has been shut down.")

if __name__ == '__main__':
    main()

The key here is using Pulsar’s typed Schema, which enforces the data structure sent by the producer, serving as the first line of defense in data governance. Additionally, using a partition_key ensures that feature messages for the same entity (e.g., a user) are ordered, facilitating stateful computations downstream.

3. Seaborn-driven Automated Quality Monitoring

This is one of the most innovative aspects of the architecture. A separate consumer service is dedicated to being the “measurement standard” for data quality.

# quality_monitor_service.py
import pulsar
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import logging
import os
from collections import defaultdict
from threading import Timer, Lock

# Configuration
PULSAR_SERVICE_URL = 'pulsar://localhost:6650'
FEATURE_TOPIC = 'persistent://public/default/feature-user_profile'
SUBSCRIPTION_NAME = 'quality-monitor-sub' # Must be different from the materializer, but uses a shared subscription
REPORT_DIR = "/var/reports/feature_store"
AGGREGATION_WINDOW_SECONDS = 300 # Aggregate every 5 minutes

# A thread-safe data buffer
class FeatureBuffer:
    def __init__(self):
        self.buffer = defaultdict(list)
        self.lock = Lock()

    def add(self, feature_name, value):
        with self.lock:
            self.buffer[feature_name].append(value)

    def drain(self):
        with self.lock:
            data = self.buffer
            self.buffer = defaultdict(list)
            return data

feature_buffer = FeatureBuffer()

# Ensure the report directory exists
os.makedirs(REPORT_DIR, exist_ok=True)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def generate_report():
    """
    Periodically triggered to drain the buffer and generate statistical reports and visualizations.
    """
    try:
        data_batch = feature_buffer.drain()
        if not data_batch:
            logging.info("No data in buffer, skipping report generation.")
            return

        for feature_name, values in data_batch.items():
            if not values:
                continue

            df = pd.DataFrame(values, columns=['value'])
            
            # 1. Calculate statistical summary
            stats = {
                'count': len(df),
                'null_count': int(df['value'].isnull().sum()),
                'mean': float(df['value'].mean()),
                'std': float(df['value'].std()),
                'min': float(df['value'].min()),
                'max': float(df['value'].max()),
                'p50': float(df['value'].quantile(0.5)),
                'p95': float(df['value'].quantile(0.95)),
                'p99': float(df['value'].quantile(0.99)),
            }
            logging.info(f"Statistical report for feature '{feature_name}': {stats}")
            # In a real project, these stats would be written to a time-series database (e.g., Prometheus) for alerting

            # 2. Generate distribution plot using Seaborn
            plt.figure(figsize=(10, 6))
            sns.histplot(df['value'], kde=True, bins=30)
            plt.title(f'Feature Distribution: {feature_name}')
            plt.xlabel('Value')
            plt.ylabel('Frequency')
            
            report_path = os.path.join(REPORT_DIR, f"{feature_name}_dist_{pd.Timestamp.now().strftime('%Y%m%d%H%M%S')}.png")
            plt.savefig(report_path)
            plt.close() # Release memory
            logging.info(f"Generated feature distribution plot: {report_path}")

    except Exception as e:
        logging.error(f"Report generation failed: {e}", exc_info=True)
    finally:
        # Reset the timer to create a loop
        Timer(AGGREGATION_WINDOW_SECONDS, generate_report).start()


def main():
    # Start the periodic report generator
    Timer(AGGREGATION_WINDOW_SECONDS, generate_report).start()
    logging.info(f"Quality monitoring report generator started, runs every {AGGREGATION_WINDOW_SECONDS} seconds.")

    client = None
    consumer = None
    try:
        client = pulsar.Client(PULSAR_SERVICE_URL)
        consumer = client.subscribe(
            topic=FEATURE_TOPIC,
            subscription_name=SUBSCRIPTION_NAME,
            subscription_type=pulsar.SubscriptionType.Shared, # Shared mode allows multiple consumers on the same subscription
            schema=pulsar.schema.JsonSchema(dict)
        )
        logging.info("Monitoring service started, consuming feature events...")
        
        while True:
            msg = consumer.receive()
            try:
                feature_event = msg.value()
                feature_buffer.add(feature_event['feature_name'], feature_event['feature_value'])
                consumer.acknowledge(msg)
            except Exception as e:
                logging.error(f"Failed to process feature event: {e}", exc_info=True)
                consumer.negative_acknowledge(msg)

    except KeyboardInterrupt:
        logging.info("Shutting down service...")
    except Exception as e:
        logging.error(f"A critical error occurred: {e}", exc_info=True)
    finally:
        if consumer:
            consumer.close()
        if client:
            client.close()
        logging.info("Service has been shut down.")

if __name__ == '__main__':
    main()

The core of this code is the generate_report function. It periodically converts collected feature values into a Pandas DataFrame and then effortlessly plots a histogram with a kernel density estimate using Seaborn’s histplot function. This is a classic example of engineering a data science tool for production use. In a production system, you could also compare the statistics of the current window against the baseline_profile defined in feature.yaml to implement automated drift detection and alerting.

4. Cypress End-to-End Validation

Finally, we need a powerful mechanism to guarantee the correctness of the entire data flow. Cypress plays the role of the “final inspector” here.

Assume we have a simple front-end page that displays the latest feature values for a given user_id.

Cypress test file (feature_e2e.cy.js):

// cypress/e2e/feature_e2e.cy.js

describe('Real-time Feature Store E2E Test', () => {
  it('should correctly process a new event and update the feature value in the UI', () => {
    // Step 0: Define test data
    const testUserId = `e2e-user-${Date.now()}`;
    const testAmount = Math.round(Math.random() * 1000 * 100) / 100; // Random amount
    const featureName = 'user_last_payment_amount';

    // Step 1: Invoke a Cypress task to send a raw event to Pulsar.
    // This is cleaner than interacting with Pulsar directly in the test code; 
    // Cypress tasks run in the Node.js backend.
    cy.task('sendRawPulsarEvent', {
      topic: 'persistent://public/default/raw-order-events',
      payload: {
        event_type: 'payment_success',
        user_id: testUserId,
        amount: testAmount,
        timestamp: Date.now(),
      },
    }).then((result) => {
      expect(result.success).to.be.true;
      cy.log(`Pulsar event sent for user ${testUserId}`);
    });

    // Step 2: Wait for a reasonable processing time.
    // In a real-world scenario, you might poll an API until the value is updated,
    // but we'll use a fixed wait for simplicity.
    cy.wait(5000); // Wait 5 seconds for the event to flow through the system

    // Step 3: Visit the feature store's front-end UI page
    cy.visit(`/features/user/${testUserId}`);

    // Step 4: Assert that the feature value displayed on the page is correct
    cy.get(`[data-testid="feature-${featureName}"] .feature-value`)
      .should('contain.text', testAmount.toString());

    // Step 5: (Optional but highly recommended) Validate that the monitoring report has been updated.
    // This requires a place in the UI to display the latest monitoring report.
    cy.get(`[data-testid="feature-${featureName}"] .quality-report-link`).click();
    cy.get('.report-image').should('be.visible'); // Verify the report image has loaded
  });
});

// Task configuration is needed in cypress.config.js
// const { defineConfig } = require('cypress')
// const { PulsarClient } = require('pulsar-client'); // Assuming a Node.js Pulsar client library is available
//
// module.exports = defineConfig({
//   e2e: {
//     setupNodeEvents(on, config) {
//       on('task', {
//         async sendRawPulsarEvent({ topic, payload }) {
//           // This code executes in the Node environment
//           const client = new PulsarClient({ serviceUrl: 'pulsar://localhost:6650' });
//           const producer = await client.createProducer({ topic });
//           await producer.send({ data: Buffer.from(JSON.stringify(payload)) });
//           await producer.close();
//           await client.close();
//           return { success: true };
//         },
//       })
//     },
//   },
// })

The value of this test case lies in its ability to cross multiple system boundaries: it starts from the earliest point of event injection, travels through the message queue, processing services, and the online store, and is finally validated on the user-facing UI. This level of testing is crucial for ensuring the stability of complex distributed systems.

Architecture Extensibility and Limitations

Extensibility:

  • Offline Dataset Generation: Pulsar’s tiered storage feature simplifies dataset generation for training. A batch job (e.g., Spark) can directly read historical data from the Pulsar topic (including parts offloaded to S3), perform point-in-time joins, and generate training samples.
  • New Feature Types: Adding new data types (like strings or arrays) only requires extending the schema definitions and processing logic.
  • More Sophisticated Monitoring: The Seaborn component can be replaced or augmented with more specialized data quality tools (like Great Expectations) to perform more complex assertions and validations.

Limitations:

  • Implementation Complexity: Compared to a simple batch system, this solution introduces multiple components like Pulsar, microservices, and CI/CD, placing higher demands on the team’s DevOps and distributed systems operational capabilities.
  • State Management: For features requiring complex windowed calculations or long-term state (e.g., “number of categories a user has purchased from in the last 90 days”), a simple stateless generation service is insufficient. It would require introducing a stream processing engine (like Flink) or implementing more complex state management within the service.
  • Cost: The operational cost of maintaining a highly available Pulsar cluster and multiple real-time services will be higher than that of periodically running batch jobs. This represents a classic trade-off between latency, reliability, and cost.

This architecture is not a silver bullet. It is best suited for scenarios where real-time features are considered a core asset, and the organization is willing to invest engineering resources in their quality, governance, and reliability. It embodies an engineering philosophy: systematically managing and constraining the uncertainties in the data science workflow through software engineering best practices such as version control, automated testing, and continuous monitoring.


  TOC