The traditional perimeter security model is failing. In the past, we relied on VPNs and firewalls; once a user authenticated and entered the internal network, they were granted excessive implicit trust. The fundamental flaw in this model is its one-time, static authorization. If a user’s credentials are stolen or their device is compromised after authentication, the entire internal network is at risk. This was the core pain point our team faced: how to implement dynamic, continuous access control for our iOS clients, ensuring that every API request is authorized based on the latest user and device state, rather than relying on a JWT issued hours ago.
Our initial idea was to place an API gateway in front of every service, which would synchronously call our IAM (Identity and Access Management) and device management services for real-time validation on every request. This plan was quickly rejected. It introduced significant latency and turned the IAM service into a system-wide performance bottleneck. We needed a solution that could achieve near-real-time policy updates without sacrificing performance. Ultimately, we decided to build an event-streaming-based continuous authorization proxy. The tech stack might seem unconventional, but every choice was a deliberate trade-off.
- Sanic: The core of our authorization proxy. Built on
asyncio, it delivers extremely high performance, making it ideal for I/O-bound proxy tasks. We didn’t need a monolithic framework, just a lightweight engine capable of handling requests with minimal latency and running asynchronous background tasks (like message consumption). - Apache Pulsar: Our event bus. When any security-policy-related event occurs—a user’s role is changed in IAM, a device’s health status changes from “secure” to “at-risk,” or a user triggers a risky behavior—the relevant system immediately publishes an event to Pulsar. Its multi-tenancy, persistence, and flexible subscription models ensure the reliable delivery of these critical events.
- iOS Client: As the request originator, it must not only carry user credentials but also include proof of the device’s state with every request. We leverage the
DeviceCheckframework to generate hardware-bound attestations, preventing replay attacks. - IAM System: The “source of truth” for identities and policies. It manages users, roles, and permissions, acting as an event producer when policies change.
- Buildah: Responsible for building our Sanic proxy’s container image. In a Zero Trust architecture, the identity and integrity of the service itself are just as important. With Buildah, we can construct a minimal, unprivileged, and verifiable container image, drastically reducing the attack surface.
The overall architecture is depicted below. Its core idea is to transform synchronous, blocking authorization checks into asynchronous, event-based policy cache updates.
graph TD
subgraph "Client Environment"
A[iOS App]
end
subgraph "Proxy & Backend"
B[Sanic Continuous Auth Proxy]
C[Backend Business Services]
end
subgraph "Security Infrastructure"
D[Central IAM Service]
E[Device Health Service]
F[Apache Pulsar Cluster]
end
A -- "API Request (with User Token + Device Attestation)" --> B
B -- "1. Check local policy cache" --> B
B -- "2. If authorized, proxy request" --> C
C -- "Response" --> B
B -- "Response" --> A
D -- "User Policy Change Event" --> F
E -- "Device State Change Event" --> F
F -- "Subscribe to Policy Updates" --> B
Core Implementation of the Sanic Proxy
The Sanic proxy is the heart of this solution. It must efficiently handle all incoming API requests while silently consuming policy update events from Pulsar in the background to maintain a local authorization policy cache.
1. Startup and Background Task Setup
We use Sanic’s startup and shutdown hooks to manage the Pulsar client’s lifecycle and app.add_task to launch a long-running background coroutine for message consumption.
# file: proxy_server.py
import asyncio
import logging
import json
from sanic import Sanic, response
from sanic.request import Request
from httpx import AsyncClient
import pulsar
# Configuration
PULSAR_SERVICE_URL = 'pulsar://localhost:6650'
POLICY_UPDATE_TOPIC = 'persistent://public/default/iam-policy-updates'
BACKEND_SERVICE_URL = 'http://localhost:9000'
CACHE_TTL = 3600 # Local cache TTL to guard against message loss
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# A simple in-memory cache to store authorization decisions
# In a production environment, this should be a more robust implementation like Redis
# key: f"{user_id}:{device_id}"
# value: {"authorized": True/False, "timestamp": unix_timestamp}
policy_cache = {}
app = Sanic("ContinuousAuthProxy")
http_client = AsyncClient()
# Pulsar client and consumer instances
pulsar_client = None
pulsar_consumer = None
@app.before_server_start
async def setup_pulsar(app_instance, loop):
"""
Initializes the Pulsar client and consumer before the server starts.
"""
global pulsar_client, pulsar_consumer
try:
logging.info(f"Connecting to Pulsar at {PULSAR_SERVICE_URL}...")
pulsar_client = pulsar.Client(PULSAR_SERVICE_URL, logger=logging.getLogger())
pulsar_consumer = pulsar_client.subscribe(
POLICY_UPDATE_TOPIC,
subscription_name='sanic-proxy-subscription',
consumer_type=pulsar.ConsumerType.Shared, # Allows multiple proxy instances to share consumption
schema=pulsar.schema.JsonSchema(PolicyUpdateEvent) # Use a Schema to define message structure
)
logging.info("Pulsar client and consumer initialized successfully.")
# Add the Pulsar consumption task to Sanic's background tasks
app_instance.add_task(consume_policy_updates())
except Exception as e:
logging.critical(f"Failed to connect to Pulsar or subscribe to topic: {e}")
# In a real project, this should cause the service startup to fail
exit(1)
@app.after_server_stop
async def teardown_pulsar(app_instance, loop):
"""
Gracefully closes the Pulsar client when the server shuts down.
"""
if pulsar_consumer:
pulsar_consumer.close()
if pulsar_client:
pulsar_client.close()
logging.info("Pulsar client and consumer closed.")
# Define the Pulsar message Schema for better type safety
class PolicyUpdateEvent(pulsar.schema.Record):
user_id = pulsar.schema.String()
device_id = pulsar.schema.String()
event_type = pulsar.schema.String(choices=["SESSION_REVOKED", "DEVICE_UNHEALTHY", "ROLE_UPDATED"])
is_authorized = pulsar.schema.Boolean()
timestamp = pulsar.schema.Long()
The key here is running the Pulsar consumption logic as a background task. This ensures that message processing doesn’t block the main event loop that handles HTTP requests—the core advantage of using an async framework like Sanic.
2. Policy Update Consumption Logic
The background task needs to continuously receive messages from Pulsar and update the local policy_cache.
# file: proxy_server.py (continued)
async def consume_policy_updates():
"""
A background coroutine that continuously consumes policy update events from Pulsar.
"""
logging.info("Policy update consumer task started.")
while True:
try:
msg = await asyncio.get_event_loop().run_in_executor(None, pulsar_consumer.receive)
event_data = msg.value()
cache_key = f"{event_data.user_id}:{event_data.device_id}"
logging.info(f"Received policy update event: {event_data.event_type} for {cache_key}. New state: authorized={event_data.is_authorized}")
# Update the local cache
policy_cache[cache_key] = {
"authorized": event_data.is_authorized,
"timestamp": event_data.timestamp
}
# Acknowledge the message was processed successfully
pulsar_consumer.acknowledge(msg)
except Exception as e:
logging.error(f"Error while consuming Pulsar message: {e}")
# Add a backoff strategy to avoid high CPU usage on persistent errors
await asyncio.sleep(5)
A common pitfall is to use blocking SDK calls directly in the main async loop. The pulsar-client‘s receive() method is blocking, so we must wrap it with run_in_executor to run it in a thread pool, thereby preventing it from stalling the entire Sanic service.
3. Request Interception and Authorization Middleware
This is the proxy’s core functionality. We use Sanic middleware to intercept all requests, perform the authorization check, and then decide whether to proxy the request or deny access.
# file: proxy_server.py (continued)
@app.middleware('request')
async def continuous_authorization(request: Request):
"""
Performs a continuous authorization check on every request.
"""
# Extract identity and device info from headers
# In a real project, this would involve parsing a JWT and verifying a device attestation
user_id = request.headers.get('X-User-ID')
device_id = request.headers.get('X-Device-ID')
if not user_id or not device_id:
return response.json({"error": "Missing user or device identity"}, status=401)
cache_key = f"{user_id}:{device_id}"
cached_policy = policy_cache.get(cache_key)
# Cache hit and authorized
if cached_policy and cached_policy["authorized"]:
logging.info(f"[CACHE HIT] Access granted for {cache_key}")
return # Continue to process the request
# Cache hit but denied
if cached_policy and not cached_policy["authorized"]:
logging.warning(f"[CACHE HIT] Access denied for {cache_key} due to policy.")
return response.json({"error": "Access denied by policy"}, status=403)
# Cache miss, requires a synchronous fallback check
# This is a fallback strategy for new users/devices or after a proxy restart
# In a real project, this would be a call to the IAM service
logging.warning(f"[CACHE MISS] Performing fallback check for {cache_key}")
is_valid_fallback = await fallback_auth_check(user_id, device_id)
if is_valid_fallback:
# Update cache and allow the request
policy_cache[cache_key] = {"authorized": True, "timestamp": asyncio.get_event_loop().time()}
return
else:
# Update cache and deny the request
policy_cache[cache_key] = {"authorized": False, "timestamp": asyncio.get_event_loop().time()}
return response.json({"error": "Access denied by fallback check"}, status=403)
async def fallback_auth_check(user_id: str, device_id: str) -> bool:
"""
Simulates a synchronous fallback check to an IAM service.
"""
# In a real project, this would be an RPC or HTTP call to the IAM service
await asyncio.sleep(0.05) # Simulate network latency
# Default to allowing unknown users; a real implementation would be more complex
if user_id == "user-blocked":
return False
return True
# Proxy all authorized requests to the backend service
@app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
async def proxy_to_backend(request: Request, path: str):
"""
Proxies the request to the actual backend service.
"""
url = f"{BACKEND_SERVICE_URL}/{path}"
req_headers = {k: v for k, v in request.headers.items() if k.lower() not in ['host']}
try:
resp = await http_client.request(
method=request.method,
url=url,
headers=req_headers,
params=request.args,
data=request.body,
timeout=10.0
)
return response.raw(resp.content, status=resp.status_code, headers=resp.headers)
except Exception as e:
logging.error(f"Failed to proxy request to backend: {e}")
return response.json({"error": "Backend service unavailable"}, status=503)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000, workers=4)
This middleware implements a “cache-first” logic. The vast majority of requests should complete their checks in milliseconds against the local cache. Only when a record is missing from the cache (e.g., a user’s first request through this proxy instance) will a synchronous fallback check be triggered. This is a critical degradation and fault-tolerance mechanism.
Device Attestation on the iOS Client
On the iOS side, we can’t just trust a self-reported device ID. We need a way to verify that the request is genuinely from a legitimate, untampered device instance. Apple’s DeviceCheck framework provides the solution.
// file: AttestationService.swift
import Foundation
import DeviceCheck
import CryptoKit
// This is a simplified example to illustrate the core flow.
// Production code requires more robust error handling and key management.
class AttestationService {
static let shared = AttestationService()
func generateAttestationHeader(for userId: String, completion: @escaping (String?) -> Void) {
let service = DCAppAttestService.shared
// 1. Generate a client-specific key pair. This is done only once.
// In a real application, you need to store this keyId securely.
let keyId = "com.myapp.attestation.key"
service.generateKey { (generatedKeyId, error) in
guard let keyId = generatedKeyId, error == nil else {
print("Error generating key: \(error!.localizedDescription)")
completion(nil)
return
}
// 2. Use the user ID and other request data as a challenge
// to prevent replay attacks.
let challengeData = "\(userId)-\(Date().timeIntervalSince1970)".data(using: .utf8)!
let hash = SHA265.hash(data: challengeData)
// 3. Sign the challenge to generate the attestation object.
service.attestKey(keyId, clientDataHash: hash) { (attestationObject, error) in
guard let attestation = attestationObject, error == nil else {
print("Error attesting key: \(error!.localizedDescription)")
completion(nil)
return
}
// 4. Base64-encode the attestation object to be sent in a request header.
// The backend will need to send this object to Apple's servers for validation.
let base64Attestation = attestation.base64EncodedString()
completion(base64Attestation)
}
}
}
}
// Example usage:
// var request = URLRequest(url: someURL)
// AttestationService.shared.generateAttestationHeader(for: "user-123") { attestationHeaderValue in
// if let headerValue = attestationHeaderValue {
// request.setValue(headerValue, forHTTPHeaderField: "X-Device-Attestation")
// // ... send the request
// }
// }
This attestation object is then sent to our backend (either the IAM service or a dedicated verification service), which forwards it to Apple’s servers for validation. Only after successful validation can we trust the device’s identity. In our architecture, this process is part of the fallback check.
Building a Secure Proxy Image with Buildah
Zero Trust isn’t just about users and devices; it’s also about the workloads themselves. We must ensure that the container running our Sanic proxy is minimal and trustworthy. Compared to a Dockerfile, Buildah provides a more granular and secure build process because it doesn’t depend on a Docker daemon and makes it easier to build “from scratch” images.
#!/bin/bash
# file: build.sh
set -euo pipefail
IMAGE_NAME="continuous-auth-proxy"
TAG="1.0.0"
FINAL_IMAGE="quay.io/my-org/${IMAGE_NAME}:${TAG}"
# 1. Start from a minimal base image. We use ubi-minimal here.
# For production, consider distroless or scratch.
container=$(buildah from registry.access.redhat.com/ubi8/ubi-minimal)
echo "Working container: ${container}"
# Mount the container filesystem
mnt=$(buildah mount $container)
# 2. Install Python and dependencies
# Note: commands are run inside the container context
buildah run $container -- dnf install -y python39 python39-pip
buildah run $container -- pip3 install sanic httpx pulsar-client
# 3. Copy the application code
# We copy only the necessary files, not the entire working directory
buildah copy $container 'proxy_server.py' '/app/proxy_server.py'
# 4. Create an unprivileged user to run the application
buildah run $container -- groupadd -r appuser && useradd -r -g appuser appuser
buildah run $container -- chown -R appuser:appuser /app
# 5. Configure image metadata
buildah config --workingdir /app $container
buildah config --port 8000 $container
buildah config --user appuser $container
buildah config --entrypoint '["python3", "/app/proxy_server.py"]' $container
# 6. Commit the image
buildah commit --squash $container $FINAL_IMAGE
echo "Successfully built ${FINAL_IMAGE}"
# 7. Clean up
buildah unmount $container
buildah rm $container
The value of this script lies in:
- Minimal Dependencies: Only the packages essential for runtime are installed.
- Unprivileged Execution: A dedicated
appuseris created and used to run the application, adhering to the principle of least privilege. - Repeatable and Automated: The entire process is scripted and can be seamlessly integrated into a CI/CD pipeline.
Limitations and Future Iterations
This architecture is not without its trade-offs. First, the local cache in the Sanic proxy is a component that requires careful management. In a multi-instance deployment, each instance maintains its own cache, which can lead to brief periods of policy inconsistency. A viable optimization is to introduce a shared distributed cache like Redis; the Pulsar consumer would write policy updates directly to Redis, and all Sanic instances would read from it. However, this adds complexity and another potential point of failure to the system.
Second, the policy expression capability is currently very simple—a boolean “allow/deny.” More complex scenarios may require Attribute-Based Access Control (ABAC). Integrating a policy engine like Open Policy Agent (OPA) would be a natural next step. The Sanic proxy could pass the request context to an OPA sidecar for a decision, while Pulsar would be responsible for distributing policy and data updates to OPA.
Finally, assessing device health and generating corresponding events is a complex domain in itself. It requires deep integration with MDM (Mobile Device Management) systems and endpoint security software to gather signals like whether a device is jailbroken, if its OS is outdated, or if malicious apps are present. Our current solution only defines the consumption side of these events; implementing the production side is another significant engineering challenge.