We faced a thorny engineering problem: a core business system uses MongoDB as its production database, featuring a highly dynamic schema where fields are frequently added to accommodate rapid business iterations. Concurrently, the analytics team requires this data to be synced in near real-time to a Snowflake data warehouse for complex OLAP analytics. The initial daily batch ETL solution suffered from high latency, failing to meet the business’s demand for data freshness. More critically, the batch jobs couldn’t capture intermediate state changes that occurred multiple times within a day, resulting in the loss of valuable business information.
We needed a solution that could satisfy all of the following stringent requirements:
- Low Latency: End-to-end latency, from a write in MongoDB to its visibility in Snowflake, should be within the minute-level.
- Data Integrity: Exactly-once semantics must be guaranteed. Every change in MongoDB (
insert,update,delete) must be processed once and only once. - Schema Evolution: The solution must automatically adapt to the addition of new fields in the source MongoDB tables without requiring manual intervention or pipeline restarts.
- Maintainability and Robustness: As transformation logic within the data pipeline could become complex, its correctness must be ensured through automated testing, and it must exhibit strong fault tolerance.
Solution Evaluation and Trade-offs
In a real-world project, we wouldn’t jump directly to the final solution. The evaluation process is paramount.
Option A: The Kafka Connect Playbook
A common approach in modern data stacks is to use the Debezium MongoDB Connector to capture Change Data Capture (CDC) streams into Kafka, followed by the Snowflake Kafka Connector to sync data from Kafka to Snowflake.
- Pros:
- Mature, well-supported components. Both Debezium and the Snowflake Connector are production-hardened.
- Configuration-driven, leading to relatively low development effort, primarily focused on connector deployment and configuration.
- Cons:
- Limited Transformation Capabilities: If complex transformations, cleaning, or data enrichment are required before landing data in Snowflake, Kafka Connect’s Single Message Transforms (SMT) are limited and difficult to test. More complex logic might necessitate introducing KSQLdb or another stream processing application, increasing architectural complexity.
- Black-Box Testing: The core logic of the pipeline is scattered across YAML configurations, making it extremely difficult to unit or integration test the transformation logic. We cannot validate its behavior with the same precision as a piece of Java code.
- Operational Overhead: Requires maintaining separate Kafka and Kafka Connect clusters, adding more moving parts and potential failure points to the system.
Option B: Apache Flink as a Unified Engine
This approach uses Flink as the core processing engine. It directly integrates the Debezium CDC Connector to read MongoDB’s oplog, leverages Flink’s powerful DataStream API for data processing, and finally writes data via Flink’s native Snowflake Sink.
- Pros:
- Powerful Processing Capabilities: Flink offers unparalleled stream processing and state management capabilities. Any complex transformation logic, from stateless
mapoperations to statefulkeyedProcessFunctions, can be implemented with ease. - Unified Architecture: From source to transformation to sink, all logic resides within a single Flink application. Operationally, we only need to monitor the health of the Flink cluster.
- Precise Delivery Guarantees: Flink’s checkpointing mechanism, combined with a sink that supports two-phase commit (2PC) like the Snowflake Sink, can provide end-to-end exactly-once guarantees.
- Excellent Testability: This is the decisive advantage. A Flink job is essentially a Java/Scala program. We can use tools like
flink-test-utils, along with JUnit5 and Mockito, to conduct fine-grained unit and integration tests on the entire data processing pipeline. This aligns perfectly with our requirement for Test-Driven Development (TDD).
- Powerful Processing Capabilities: Flink offers unparalleled stream processing and state management capabilities. Any complex transformation logic, from stateless
The Decision: We chose Option B. Although it has a slightly steeper initial development curve, the long-term maintainability, robustness, and powerful extensibility it offers are fully aligned with our engineering standards for a core data pipeline. The adoption of TDD ensures that the pipeline’s quality will not degrade when faced with complex business logic and frequent changes.
Architecture Overview
The overall data flow is clean and straightforward, with all core work handled within the Flink job.
graph TD
subgraph MongoDB Replica Set
A[Primary Node] -- Oplog --> B(Debezium Engine);
end
subgraph Apache Flink Job
B -- CDC Events --> C[DataStream API: Source];
C -- Raw JSON String --> D{ProcessFunction: Schema-Aware Deserialization & Transformation};
D -- Structured Data --> E[Snowflake Sink];
end
subgraph Snowflake
E -- Two-Phase Commit --> F[Staging Area];
F -- COPY INTO --> G[Target Table];
end
style B fill:#f9f,stroke:#333,stroke-width:2px
style E fill:#f9f,stroke:#333,stroke-width:2px
Building the Core Processing Logic with TDD
We’ll start with a test case and incrementally build our Flink job. TDD ensures every feature we implement is validated.
Step 1: Project Setup and Dependencies
First, we need a standard Maven project with the necessary dependencies.
pom.xml:
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<!-- MongoDB CDC Connector -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.4.1</version>
</dependency>
<!-- Snowflake Sink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-snowflake</artifactId>
<version>1.0.1</version>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Flink Testing Utilities -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.17.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
</dependencies>
Step 2: Define the First Test - Processing an Insert Event
Our goal is to create a MongoCdcProcessor function that can transform a JSON-formatted CDC string from Debezium into a structured Map<String, Object>, augmented with metadata (like the operation type).
Test class MongoCdcProcessorTest.java:
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import java.util.Map;
class MongoCdcProcessorTest {
@Test
void shouldProcessInsertEventCorrectly() throws Exception {
// 1. Arrange: Simulate a Debezium insert event JSON string.
// In a real project, this JSON should be captured from actual Debezium output.
String insertEvent = "{\"payload\": {\"after\": \"{\\\"_id\\\":{\\\"$oid\\\":\\\"653b4b5e8b7c4a1e3f8b0f7a\\\"},\\\"name\\\":\\\"Alice\\\",\\\"age\\\":30}\", \"op\": \"c\"}}";
// 2. Act: Invoke the function under test.
MongoCdcProcessor processor = new MongoCdcProcessor();
TestCollector<Map<String, Object>> collector = new TestCollector<>();
processor.processElement(insertEvent, null, collector); // context is null for this test
// 3. Assert: Verify the output matches expectations.
assertEquals(1, collector.getOutputs().size());
Map<String, Object> result = collector.getOutputs().get(0);
assertEquals("c", result.get("__op"));
assertEquals("Alice", result.get("name"));
assertEquals(30, result.get("age"));
assertNotNull(result.get("_id"));
}
// TestCollector is a simple helper class to collect operator output during tests.
private static class TestCollector<T> implements org.apache.flink.util.Collector<T> {
private final java.util.List<T> outputs = new java.util.ArrayList<>();
@Override public void collect(T record) { outputs.add(record); }
@Override public void close() {}
public java.util.List<T> getOutputs() { return outputs; }
}
}
This test will fail initially because the MongoCdcProcessor class doesn’t exist yet.
Step 3: Implement MongoCdcProcessor to Pass the Test
Now, we’ll write the minimal implementation to make the test pass.
MongoCdcProcessor.java:
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Map;
public class MongoCdcProcessor extends ProcessFunction<String, Map<String, Object>> {
private transient ObjectMapper objectMapper;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
// Initialize ObjectMapper here; it's good practice to avoid re-creation in processElement.
objectMapper = new ObjectMapper();
}
@Override
public void processElement(String value, Context ctx, Collector<Map<String, Object>> out) throws Exception {
try {
JsonNode root = objectMapper.readTree(value);
JsonNode payload = root.path("payload");
String op = payload.path("op").asText();
if (op.isEmpty()) {
// In a production system, log a warning or send to a dead-letter queue.
return;
}
String afterJson = payload.path("after").asText();
if (afterJson.isEmpty() || "null".equals(afterJson)) {
// For delete events, the 'after' field is null.
if ("d".equals(op)) {
// We'll ignore deletes for now; a dedicated test will drive this implementation later.
}
return;
}
TypeReference<Map<String, Object>> typeRef = new TypeReference<>() {};
Map<String, Object> data = objectMapper.readValue(afterJson, typeRef);
// Inject metadata fields, which are crucial for downstream processing.
data.put("__op", op);
out.collect(data);
} catch (Exception e) {
// Robust error handling: log the error and potentially send bad data to a side output.
// ctx.output(new OutputTag<String>("malformed-records"){}, value);
}
}
}
Running the shouldProcessInsertEventCorrectly test again should now result in a pass.
Step 4: The TDD Cycle - Adding Tests for Update and Schema Evolution
In the real world, data is updated and schemas evolve. Our next test will simulate an update operation that also introduces a new field, city.
Add a new test to MongoCdcProcessorTest.java:
@Test
void shouldHandleUpdateWithNewField() throws Exception {
// 1. Arrange: Simulate an update event where the 'after' state includes a new 'city' field.
String updateEvent = "{\"payload\": {\"after\": \"{\\\"_id\\\":{\\\"$oid\\\":\\\"653b4b5e8b7c4a1e3f8b0f7a\\\"},\\\"name\\\":\\\"Alice\\\",\\\"age\\\":31,\\\"city\\\":\\\"New York\\\"}\", \"op\": \"u\"}}";
// 2. Act
MongoCdcProcessor processor = new MongoCdcProcessor();
TestCollector<Map<String, Object>> collector = new TestCollector<>();
processor.processElement(updateEvent, null, collector);
// 3. Assert
assertEquals(1, collector.getOutputs().size());
Map<String, Object> result = collector.getOutputs().get(0);
assertEquals("u", result.get("__op"));
assertEquals(31, result.get("age"));
assertEquals("New York", result.get("city")); // Verify the new field is parsed correctly
}
Because our implementation is based on a Map, this test passes out-of-the-box. This demonstrates that our chosen data structure inherently adapts well to schema evolution. If the target table in Snowflake doesn’t have a city column yet, the Snowflake Sink’s COPY command will typically ignore this new field (depending on configuration), or we could configure ALTER TABLE behavior. We’ve opted to use a VARIANT type on the Snowflake side to ingest all fields, providing maximum flexibility.
Assembling the Complete Flink Job
With the core processing logic tested, we can now assemble it into a complete Flink job.
MongoToSnowflakeSyncJob.java:
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.snowflake.sink.SnowflakeSink;
import org.apache.flink.connector.snowflake.sink.SnowflakeSinkOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.*;
import java.util.Map;
import java.util.Properties;
public class MongoToSnowflakeSyncJob {
public static void main(String[] args) throws Exception {
// 1. Initialize the Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Critical configuration: Enable checkpointing for exactly-once semantics
// In production, intervals, timeouts, and storage backends need careful tuning.
env.enableCheckpointing(60000); // Checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // Minimum 30s between checkpoints
env.getCheckpointConfig().setCheckpointTimeout(120000); // Checkpoints must complete within 2 minutes
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Only one checkpoint at a time
// 2. Configure the MongoDB CDC Source
MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
.hosts("mongo-replica-set:27017")
.databaseList("users_db")
.collectionList("users_db.profiles")
.username("your_user")
.password("your_password")
.deserializer(new JsonDebeziumDeserializationSchema()) // Output Debezium JSON directly
.build();
// 3. Create the data stream and apply our transformation logic
DataStream<String> cdcStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB CDC Source");
DataStream<Map<String, Object>> processedStream = cdcStream
.process(new MongoCdcProcessor())
.name("CDC Event Processor");
// 4. Convert Map<String, Object> to RowData, which the Snowflake Sink requires
// We design our Snowflake target table with two columns:
// METADATA (VARCHAR): Stores metadata like __op
// PAYLOAD (VARIANT): Stores the entire business data JSON
LogicalType[] logicalTypes = new LogicalType[]{new VarCharType(16), new VarCharType(Integer.MAX_VALUE)};
DataStream<RowData> snowflakeStream = processedStream.map(map -> {
String op = (String) map.getOrDefault("__op", "i");
// For simplicity, we serialize the entire map to a JSON string for the VARIANT column
String payloadJson = new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(map);
return GenericRowData.of(StringData.fromString(op), StringData.fromString(payloadJson));
}).setParallelism(env.getParallelism());
// 5. Configure the Snowflake Sink
Properties props = new Properties();
props.put(SnowflakeSinkOptions.SNOWFLAKE_URL, "your_org-your_account.snowflakecomputing.com");
props.put(SnowflakeSinkOptions.SNOWFLAKE_USER, "your_sf_user");
props.put(SnowflakeSinkOptions.SNOWFLAKE_PRIVATE_KEY, "your_private_key_content"); // Key pair authentication is recommended
props.put(SnowflakeSinkOptions.SNOWFLAKE_DATABASE, "ANALYTICS_DB");
props.put(SnowflakeSinkOptions.SNOWFLAKE_SCHEMA, "CDC_SCHEMA");
// Use Snowflake's MERGE INTO statement to implement UPSERT logic
String mergeSql = "MERGE INTO ANALYTICS_DB.CDC_SCHEMA.PROFILES_TARGET t " +
"USING (SELECT PARSE_JSON(stg.PAYLOAD):_id::_string AS join_key, stg.* FROM IDENTIFIER(?) stg) s " +
"ON t.ID = s.join_key " +
"WHEN MATCHED AND s.METADATA = 'd' THEN DELETE " +
"WHEN MATCHED AND s.METADATA IN ('u', 'c') THEN UPDATE SET t.RAW_DATA = PARSE_JSON(s.PAYLOAD) " +
"WHEN NOT MATCHED AND s.METADATA IN ('c', 'u') THEN INSERT (ID, RAW_DATA) VALUES (s.join_key, PARSE_JSON(s.PAYLOAD))";
SnowflakeSink<RowData> sink = SnowflakeSink.<RowData>builder()
.setSnowflakeConnectionProperties(props)
.setTableName("PROFILES_STAGING") // Sink writes to a temporary staging table first
.setTableSchema(logicalTypes)
.setMerge(true)
.setMergeQuery(mergeSql)
.build();
snowflakeStream.sinkTo(sink).name("Snowflake Sink");
// 6. Execute the job
env.execute("MongoDB to Snowflake TDD Sync Job");
}
}
Preparatory Steps in Snowflake:
In Snowflake, you need to create the target and staging tables for the sink.
-- Staging table for Flink to write into
CREATE OR REPLACE TABLE ANALYTICS_DB.CDC_SCHEMA.PROFILES_STAGING (
METADATA VARCHAR(16),
PAYLOAD VARCHAR -- Flink sink will write the JSON string here
);
-- Final target table with a VARIANT column for flexibility
CREATE OR REPLACE TABLE ANALYTICS_DB.CDC_SCHEMA.PROFILES_TARGET (
ID VARCHAR PRIMARY KEY, -- Extracted from the JSON payload
RAW_DATA VARIANT,
LAST_UPDATED_AT TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
The MERGE statement is key to implementing UPSERT and DELETE logic. The Flink Snowflake Sink writes micro-batches of data into PROFILES_STAGING and then executes this MERGE statement transactionally, applying the changes to the final PROFILES_TARGET table. The transactional nature of this process is a crucial part of the exactly-once guarantee.
Limitations and Future Iterations
While this architecture is robust and testable, it’s not without trade-offs.
First, Operational Complexity. Self-hosting and maintaining a highly available Flink cluster requires specialized knowledge, including resource management (YARN/Kubernetes), checkpoint and savepoint management, and failure recovery. This is a significant investment compared to fully managed cloud services.
Second, Cost Considerations. A long-running Flink cluster and Snowflake’s compute warehouses both incur continuous costs. Especially with large data volumes, a careful cost-benefit analysis of Flink’s parallelism and the Snowflake warehouse size is necessary.
Third, Handling Complex Schema Changes. The current solution handles field additions well. However, dealing with field renames, type changes, or deletions can become complex. A more advanced strategy involves introducing a Schema Registry (like Confluent Schema Registry) to enforce schema validation and evolution management within the data pipeline, but this would further increase architectural complexity.
Finally, End-to-End Latency. Although it’s a streaming process, the Flink Sink to Snowflake is micro-batch-based. Actual latency depends on Flink’s checkpoint interval, the Snowflake Sink’s buffer size, and its flush interval. While minute-level latency is achievable, sub-second real-time performance is not. For use cases requiring lower latency, evaluating other databases designed for real-time analytics might be necessary.