The primary bottleneck our team faced on a core data platform wasn’t compute or storage, but metadata management. Specifically, we were dealing with high-concurrency write contention on the same Delta Lake table. Our business scenario requires multiple independent ETL jobs, streaming applications, and even some ad-hoc data fix scripts to append data to a central fact table simultaneously. Under this extreme write pressure, Delta Lake’s built-in optimistic concurrency control led to a high volume of transaction retries and failures, wasting valuable compute resources and increasing data latency.
Conventional optimizations, like tuning Spark’s retry configurations or reducing batch sizes, only treated the symptoms, not the root cause. The fundamental problem was that multiple writers were attempting to modify the _delta_log simultaneously without any coordination, a classic “thundering herd problem.” We needed a coordination layer before the Delta Lake transaction commit—an external, strongly consistent locking mechanism to queue the write operations.
Our initial idea was to introduce a distributed lock manager. Among the many options, we chose Apache ZooKeeper for three main reasons. First, its hierarchical, filesystem-like node structure and ephemeral nodes provide a classic and extremely reliable pattern for implementing distributed locks. Second, it has a mature ecosystem with stable and dependable Python client libraries like kazoo. Finally, its operational overhead is relatively manageable.
The entire infrastructure, including the S3 bucket for Delta Lake storage, the ZooKeeper cluster itself, and the corresponding network and security configurations, had to be managed as code. Here, Pulumi was the clear choice. Compared to Terraform’s HCL, defining infrastructure with Python means we can use the same language for our application logic and our infrastructure code, even sharing configurations and components. This is crucial for building a cohesive and maintainable system.
Step 1: Defining the Infrastructure Stack with Pulumi
Our goal is to create an infrastructure stack comprising an S3 bucket and a three-node ZooKeeper cluster. In a real-world project, deploying and managing a ZooKeeper cluster directly on EC2 is tedious and error-prone. A more robust approach would be to use ECS, EKS, or a managed cloud service. However, to clearly demonstrate the core logic, we’ll use EC2 instances with user_data scripts for automated configuration, which sufficiently showcases Pulumi’s power.
First, let’s set up the networking and security groups to ensure secure communication between ZooKeeper nodes and between clients and the cluster.
# pulumi_stack/networking.py
import pulumi
import pulumi_aws as aws
def setup_networking():
"""
Configures the VPC, subnets, and security groups required for the ZooKeeper cluster.
"""
# Create a new VPC
vpc = aws.ec2.Vpc("zookeeper-vpc",
cidr_block="10.100.0.0/16",
enable_dns_hostnames=True,
tags={"Name": "zookeeper-vpc"})
# Create a public subnet
subnet = aws.ec2.Subnet("zookeeper-subnet",
vpc_id=vpc.id,
cidr_block="10.100.1.0/24",
map_public_ip_on_launch=True,
availability_zone="us-west-2a",
tags={"Name": "zookeeper-subnet"})
# Create a security group for the ZooKeeper cluster
# Allows intra-cluster communication (2888, 3888)
# Allows client connections (2181)
# Allows SSH access (22)
zk_security_group = aws.ec2.SecurityGroup("zookeeper-sg",
vpc_id=vpc.id,
description="Allow traffic for ZooKeeper cluster",
ingress=[
aws.ec2.SecurityGroupIngressArgs(
protocol="tcp",
from_port=2181,
to_port=2181,
cidr_blocks=["0.0.0.0/0"], # In production, restrict to VPC or specific IPs
description="ZooKeeper client port"
),
aws.ec2.SecurityGroupIngressArgs(
protocol="tcp",
from_port=2888,
to_port=2888,
self=True,
description="ZooKeeper follower to leader port"
),
aws.ec2.SecurityGroupIngressArgs(
protocol="tcp",
from_port=3888,
to_port=3888,
self=True,
description="ZooKeeper leader election port"
),
aws.ec2.SecurityGroupIngressArgs(
protocol="tcp",
from_port=22,
to_port=22,
cidr_blocks=["0.0.0.0/0"], # Again, restrict this strictly in production
description="SSH access"
)
],
egress=[
aws.ec2.SecurityGroupEgressArgs(
protocol="-1",
from_port=0,
to_port=0,
cidr_blocks=["0.0.0.0/0"],
)
],
tags={"Name": "zookeeper-sg"})
return {
"vpc_id": vpc.id,
"subnet_id": subnet.id,
"security_group_id": zk_security_group.id
}
Next, we define the ZooKeeper cluster itself and the S3 bucket. The key part here is the user_data script, which automatically runs on each EC2 instance at boot to install and configure ZooKeeper.
# pulumi_stack/__main__.py
import pulumi
import pulumi_aws as aws
import base64
from .networking import setup_networking
# --- Infrastructure Configuration ---
# Choose a suitable AMI, here we use Amazon Linux 2
AMI_ID = "ami-0c55b159cbfafe1f0"
INSTANCE_TYPE = "t2.micro"
CLUSTER_SIZE = 3
STACK_NAME = pulumi.get_stack()
# --- 1. Delta Lake Storage Bucket ---
delta_lake_bucket = aws.s3.Bucket("delta-lake-store",
bucket=f"delta-lake-locking-demo-{STACK_NAME}",
acl="private"
)
# --- 2. Network Infrastructure ---
net_config = setup_networking()
# --- 3. ZooKeeper Cluster ---
zk_instance_ips = []
zk_instances = []
# User data script template
# This script runs on each EC2 instance at boot.
# It downloads, extracts, and configures ZooKeeper.
def get_user_data(node_id: int, zk_servers_config: str) -> str:
zk_version = "3.7.1"
# NOTE: In a production environment, config files should be managed via templates, not hardcoded.
zoo_cfg_content = f"""
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
{zk_servers_config}
"""
user_data_script = f"""#!/bin/bash
# Update and install Java
sudo yum update -y
sudo yum install -y java-1.8.0-openjdk
# Download and install ZooKeeper
wget https://archive.apache.org/dist/zookeeper/zookeeper-{zk_version}/apache-zookeeper-{zk_version}-bin.tar.gz
tar -xzf apache-zookeeper-{zk_version}-bin.tar.gz
sudo mv apache-zookeeper-{zk_version}-bin /opt/zookeeper
# Create data directory
sudo mkdir -p /var/lib/zookeeper
sudo chown -R ec2-user:ec2-user /var/lib/zookeeper
# Write myid file
echo "{node_id}" | sudo tee /var/lib/zookeeper/myid
# Write configuration file
echo '{zoo_cfg_content}' | sudo tee /opt/zookeeper/conf/zoo.cfg
# Start ZooKeeper service
/opt/zookeeper/bin/zkServer.sh start
"""
return base64.b64encode(user_data_script.encode('utf-8')).decode('utf-8')
# To generate the complete server.X=... configuration, we need the private IPs of the instances.
# This creates a circular dependency, which Pulumi handles gracefully with Apply.
# For simplicity here, we'll create Elastic IPs first and assume they are static.
# A better approach would be to use internal DNS or service discovery.
elastic_ips = [aws.ec2.Eip(f"zk-eip-{i}", vpc=True) for i in range(CLUSTER_SIZE)]
zk_servers_config_str = pulumi.Output.all(*[eip.private_ip for eip in elastic_ips]).apply(
lambda ips: "\n".join([f"server.{i+1}={ips[i]}:2888:3888" for i in range(len(ips))])
)
for i in range(CLUSTER_SIZE):
instance_name = f"zookeeper-node-{i+1}"
# Each instance depends on the complete cluster configuration string.
user_data = zk_servers_config_str.apply(lambda cfg: get_user_data(i + 1, cfg))
instance = aws.ec2.Instance(instance_name,
instance_type=INSTANCE_TYPE,
ami=AMI_ID,
subnet_id=net_config["subnet_id"],
vpc_security_group_ids=[net_config["security_group_id"]],
user_data=user_data,
tags={"Name": instance_name})
# Associate the Elastic IP
aws.ec2.EipAssociation(f"zk-eip-assoc-{i}",
instance_id=instance.id,
allocation_id=elastic_ips[i].id)
zk_instances.append(instance)
zk_instance_ips.append(elastic_ips[i].public_ip)
# --- 4. Export Key Outputs ---
zookeeper_connection_string = pulumi.Output.all(*zk_instance_ips).apply(
lambda ips: ",".join([f"{ip}:2181" for ip in ips])
)
pulumi.export("delta_lake_bucket_name", delta_lake_bucket.id)
pulumi.export("zookeeper_connection_string", zookeeper_connection_string)
After running pulumi up, we have a fully functional environment. A common mistake is to overlook the dynamic nature of the ZooKeeper cluster configuration. The pulumi.Output.all(...).apply(...) construct is a core feature of Pulumi that allows us to handle dependencies whose values are unknown until the deployment is complete, such as the private IPs of the instances.
Step 2: Implementing a Robust ZooKeeper Distributed Lock
With the infrastructure in place, the next step is to write the client-side locking logic. Using kazoo‘s low-level API directly can be error-prone, especially when handling connection jitter and session expiration. A best practice is to encapsulate it within a context manager, ensuring that lock acquisition and release occur in pairs, even if exceptions are raised within the code block.
# lock_manager/zookeeper_lock.py
import logging
from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import LockTimeout, NoNodeError
from kazoo.recipe.lock import Lock
import time
import hashlib
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ZookeeperLockManager:
"""
A ZooKeeper-based distributed lock manager using Kazoo that implements the context protocol.
"""
def __init__(self, zk_hosts: str, timeout_seconds: int = 60):
"""
Initializes the client and establishes a connection.
:param zk_hosts: ZooKeeper connection string, e.g., "host1:2181,host2:2181"
:param timeout_seconds: Timeout for acquiring the lock.
"""
self._zk_hosts = zk_hosts
self._timeout = timeout_seconds
self._zk_client = KazooClient(hosts=self._zk_hosts, timeout=15.0)
self._lock_handles = {}
try:
self._zk_client.start(timeout=20)
self._zk_client.add_listener(self._state_listener)
logging.info(f"Successfully connected to ZooKeeper cluster: {self._zk_hosts}")
except Exception as e:
logging.error(f"Failed to connect to ZooKeeper: {e}")
raise ConnectionError(f"Failed to connect to ZooKeeper: {self._zk_hosts}") from e
def _state_listener(self, state):
if state == KazooState.LOST:
logging.warning("ZooKeeper session lost. Any held locks have been released.")
elif state == KazooState.SUSPENDED:
logging.warning("Connection to ZooKeeper has been suspended.")
else:
logging.info(f"ZooKeeper connection state changed: {state}")
def _get_lock_path(self, resource_id: str) -> str:
"""
Generates a ZooKeeper-friendly lock path from a resource ID.
:param resource_id: A unique identifier for the resource, e.g., a Delta Table's S3 path.
:return: The lock path in ZooKeeper.
"""
# Use a hash to avoid illegal characters from the S3 path.
hashed_id = hashlib.sha256(resource_id.encode('utf-8')).hexdigest()
# The '/locks' root node should be pre-created with proper ACLs in production.
return f"/locks/delta_tables/{hashed_id}"
def acquire(self, resource_id: str):
"""
Acquires a lock for the specified resource. This is a blocking operation.
:param resource_id: The ID of the resource to lock.
"""
lock_path = self._get_lock_path(resource_id)
# Ensure the parent path for the lock exists.
self._zk_client.ensure_path(lock_path)
lock = Lock(self._zk_client, lock_path)
self._lock_handles[resource_id] = lock
logging.info(f"Attempting to acquire lock for resource '{resource_id}' (path: {lock_path})...")
try:
# Block until the lock is acquired or timeout occurs.
acquired = lock.acquire(timeout=self._timeout)
if not acquired:
raise LockTimeout(f"Timed out acquiring lock for resource '{resource_id}' ({self._timeout}s)")
logging.info(f"Successfully acquired lock for resource '{resource_id}'.")
except Exception as e:
# Clean up the handle and re-raise the exception.
del self._lock_handles[resource_id]
logging.error(f"Failed to acquire lock for '{resource_id}': {e}")
raise
def release(self, resource_id: str):
"""
Releases the lock for the specified resource.
:param resource_id: The ID of the locked resource.
"""
if resource_id in self._lock_handles:
try:
self._lock_handles[resource_id].release()
logging.info(f"Released lock for resource '{resource_id}'.")
except Exception as e:
# Log the error even if release fails.
# The catch here is that if release fails due to connection issues,
# ZooKeeper's ephemeral node mechanism will eventually clean it up.
logging.error(f"Failed to release lock for '{resource_id}': {e}")
finally:
del self._lock_handles[resource_id]
else:
logging.warning(f"Attempted to release a lock not held by this instance: '{resource_id}'")
def close(self):
"""
Closes the ZooKeeper client connection.
"""
if self._zk_client.state != 'CLOSED':
self._zk_client.stop()
self._zk_client.close()
logging.info("ZooKeeper client connection closed.")
def __enter__(self):
# The instance itself is returned but not typically used directly in the 'with' statement.
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Ensure all held locks are released as a safety net.
# Normal logic should explicitly release locks within the 'with' block.
for resource_id in list(self._lock_handles.keys()):
self.release(resource_id)
self.close()
This wrapper addresses several key production considerations:
- Connection State Listening: Knowing when a session is lost (
KazooState.LOST) is critical. A lost session means all ephemeral nodes created by that client (including lock nodes) have been automatically deleted by ZooKeeper, implicitly releasing the lock. - Resource ID to ZNode Path Mapping: A Delta Table path (e.g.,
s3://...) cannot be used directly as a ZNode path. We convert it to a safe string via hashing. - Timeout Mechanism: Waiting indefinitely for a lock is dangerous; a reasonable timeout must be set.
- Context Management:
__enter__and__exit__ensure that connection closing and lock cleanup logic are attempted even if an exception occurs during processing.
Step 3: Integrating the Lock Mechanism into the Write Flow
Now, we can apply this lock manager to our PySpark data writing job. The job’s logic becomes very clear: acquire the lock before executing df.write.save(), and release the lock after the operation completes (whether it succeeds or fails).
# writer_job/main.py
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os
import sys
import time
# Add the lock_manager module to sys.path
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from lock_manager.zookeeper_lock import ZookeeperLockManager
def run_concurrent_writer(writer_id: int, zk_hosts: str, table_path: str):
"""
A simulated writer job that acquires a lock before writing to a Delta table.
"""
spark = (
SparkSession.builder
.appName(f"ConcurrentWriter-{writer_id}")
.master("local[*]") # In a real cluster, configure for YARN/K8s
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# S3 access credentials need to be properly configured, omitted here for brevity
.getOrCreate()
)
lock_manager = None
try:
lock_manager = ZookeeperLockManager(zk_hosts=zk_hosts, timeout_seconds=120)
# Acquire the lock before any Delta table operations
lock_manager.acquire(table_path)
# ---- Core business logic starts ----
print(f"Writer {writer_id}: Lock acquired. Starting data write...")
# Simulate generating some data
data = [(writer_id, f"event_{i}", "processed", time.time()) for i in range(10)]
columns = ["writer_id", "event_id", "status", "timestamp"]
df = spark.createDataFrame(data, columns)
# Write to the Delta Lake table
(
df.write
.format("delta")
.mode("append")
.save(table_path)
)
print(f"Writer {writer_id}: Data written successfully.")
# ---- Core business logic ends ----
except Exception as e:
print(f"Writer {writer_id}: Job execution failed: {e}")
finally:
if lock_manager:
# Ensure the lock is released
lock_manager.release(table_path)
lock_manager.close()
spark.stop()
if __name__ == "__main__":
# These values would typically be passed by a scheduler (like Airflow) or environment variables.
# We hardcode them here, assuming they come from pulumi stack output.
ZK_CONNECTION_STRING = "YOUR_PULUMI_ZK_OUTPUT" # e.g., "54.x.x.x:2181,35.x.x.x:2181"
DELTA_TABLE_PATH = "s3a://YOUR_PULUMI_S3_BUCKET_OUTPUT/my_delta_table"
# Unit testing strategies:
# 1. Mock ZookeeperLockManager to verify that acquire and release are called correctly.
# 2. Spin up a local test ZooKeeper container.
# 3. Write a multi-process/multi-threaded test script that launches multiple run_concurrent_writer
# instances and verifies (via logs or unique IDs in the data) that only one writer
# is executing the write logic at any given time.
# Simple concurrency test
from multiprocessing import Process
if ZK_CONNECTION_STRING == "YOUR_PULUMI_ZK_OUTPUT":
print("Please replace ZK_CONNECTION_STRING and DELTA_TABLE_PATH with your Pulumi stack outputs.")
else:
processes = []
for i in range(5): # Simulate 5 concurrent writers
p = Process(target=run_concurrent_writer, args=(i, ZK_CONNECTION_STRING, DELTA_TABLE_PATH))
processes.append(p)
p.start()
for p in processes:
p.join()
print("All concurrent writer jobs have completed.")
This integration code demonstrates the final architecture. Each writer adheres to the “acquire lock, then operate” principle. This effectively shifts concurrency conflicts from Delta Lake’s commit phase to ZooKeeper’s lock acquisition phase, transforming chaotic contention into an orderly queue.
Visualizing the Architecture and Flow
A Mermaid diagram helps visualize the entire process more intuitively.
sequenceDiagram
participant Writer A
participant Writer B
participant ZooKeeper
participant Delta Lake (S3)
Writer A->>+ZooKeeper: acquireLock("/locks/delta/.../...")
Note right of Writer A: Blocking, waiting for lock
ZooKeeper-->>-Writer A: Lock Acquired (Ephemeral ZNode Created)
Writer B->>+ZooKeeper: acquireLock("/locks/delta/.../...")
Note right of Writer B: Blocking, waiting, as lock is held by A
Writer A->>+Delta Lake (S3): Read _delta_log/_last_checkpoint
Delta Lake (S3)-->>-Writer A: Latest Table Version
Writer A->>+Delta Lake (S3): Write new data files (Parquet)
Delta Lake (S3)-->>-Writer A: Files Written
Writer A->>+Delta Lake (S3): Commit new version to _delta_log (e.g., 0001.json)
Note over Writer A, Delta Lake (S3): With the external lock held,
the probability of an internal Delta Lake optimistic conflict is extremely low
Delta Lake (S3)-->>-Writer A: Commit Successful
Writer A->>+ZooKeeper: releaseLock()
ZooKeeper-->>-Writer A: Lock Released (Ephemeral ZNode Deleted)
ZooKeeper-->>-Writer B: Lock Acquired
Writer B->>+Delta Lake (S3): Read _delta_log (now includes A's commit)
Delta Lake (S3)-->>-Writer B: Latest Table Version
Writer B->>...: (Performs write and commit)
Writer B->>+ZooKeeper: releaseLock()
ZooKeeper-->>-Writer B: Lock Released
This sequence diagram clearly shows how ZooKeeper acts as the coordinator between multiple writers, ensuring that write operations to the Delta Lake _delta_log are serialized.
Limitations and Future Iterations
While this solution effectively solved our initial high-concurrency write problem, it’s not a silver bullet. In a real-world project, we must be aware of its limitations and potential trade-offs.
First, introducing ZooKeeper as a centralized coordination service could become a new performance bottleneck or single point of failure. Although a ZooKeeper ensemble is highly available, its write throughput is finite. If the frequency of write requests exceeds ZooKeeper’s capacity, lock acquisition latency will increase significantly. This solution is therefore better suited for medium-to-high concurrency scenarios where the write transactions themselves are relatively long-running, rather than for use cases requiring thousands of ultra-low-latency writes per second.
Second, the current lock is a coarse-grained, table-level lock. This means that even if two jobs are writing to different partitions, they must wait for each other. A significant optimization would be to implement partition-level locking. This would require modifying the _get_lock_path logic to include partition information, such as f"/locks/{table_hash}/{partition_hash}". However, this introduces new complexities, such as how to handle transactions that span multiple partitions and the potential risk of deadlocks.
Finally, self-hosting and maintaining a ZooKeeper cluster requires dedicated operational effort. Teams unwilling to take on this burden could consider managed alternatives from cloud providers, such as AWS Step Functions or a Redis-based distributed lock implementation (like Redlock), although their consistency guarantees differ subtly from ZooKeeper and require careful evaluation.
In summary, by using Pulumi to declaratively manage the entire infrastructure as code and combining it with a custom, ZooKeeper-based distributed lock manager, we built a robust external concurrency control layer for Delta Lake. At the cost of a small amount of lock acquisition latency, it dramatically reduces transaction conflict rates, improving the stability and resource utilization of our entire data pipeline.