Pipeline Design
Design Philosophy: Rich Hickey’s “Simple Made Easy”
Core Principles
Section titled “Core Principles”The pipeline is a linear composition of simple functions. Each stage:
- Takes plain data in, returns plain data out
- Is connected to the next stage via a queue (not a direct call)
- Has no shared mutable state with any other stage
- Returns errors as data (a Result/Either shape), never raises exceptions
- Can be tested, deployed, and scaled in complete isolation
- Receives only the keys it needs — the runner selects from the cumulative context
The queue between stages is the contract. The producer does not know who consumes. The consumer does not know who produced. This is the essence of decoupling order from processing.
Data Shape Conventions
Section titled “Data Shape Conventions”All data flowing through the pipeline is plain values — no objects, no methods, no hidden state.
Runner Context and Stage Payloads
Section titled “Runner Context and Stage Payloads”The runner maintains a cumulative context dict that accretes results as the message travels through the pipeline. Each stage declares which keys it requires (input) and which keys it produces (output). The runner selects the required keys from the context to build the stage’s payload, and merges the stage’s output back into the context.
This separation prevents stages from depending on keys they should not see. A stage cannot accidentally couple to upstream data it has no business reading.
# Runner context (managed by the runner, invisible to stages){ "trace_id": str, # UUID generated by the inbound adapter "operation": str, # "SEARCH" | "VERIFY" | "ENROL" | "DELETE" "subject_id": str | None, "partition": str, "filters": dict, "top_k": int, "threshold": float | None, "received_at": str, # Accumulated stage results (accreted, never overwritten): "image_bytes": bytes | None, "pad": dict | None, "morphing": dict | None, "detections": list | None, "crop": bytes | None, "quality": dict | None, "template": dict | None, "result": dict | None,}trace_id: A UUID generated by the inbound adapter (not by Receive). The runner propagates it unchanged across all stage boundaries. No stage function reads, writes, or forwards it — this is the runner’s responsibility.
Result Envelope
Section titled “Result Envelope”When a stage succeeds, it returns a plain dict containing its produced keys. When a stage fails, it returns an error-as-data envelope:
# Success — stage returns a plain dict of its produced keys<stage-output-dict>
# Failure (error as data, not exception){"ok": False, "error": {"code": str, "reason": str, "stage": str}}- The runner checks for error envelopes before calling the next stage. Stages never see
{"ok": False}payloads. When the runner receives an error envelope from a stage, it short-circuits the remaining pipeline and routes the context directly to the Respond queue. This means stage functions never contain error pass-through logic — they can assume their input is always valid stage data. - The runner attaches
trace_idto the context. The failure dict never containstrace_id— it is always on the outer context managed by the runner.
Operation Type
Section titled “Operation Type”Operations are plain string discriminants, not polymorphic types:
"operation": "SEARCH" | "VERIFY" | "ENROL" | "DELETE"Pipeline Dict Schemas
Section titled “Pipeline Dict Schemas”Each stage boundary has a typed contract. Stages declare their requirements and productions via requires and produces on the Stage dataclass. For documentation and runtime validation, the schemas are also expressed as TypedDicts:
from typing import TypedDict
# --- Receive output ---class ReceiveOut(TypedDict): operation: str image_bytes: bytes | None subject_id: str | None partition: str filters: dict top_k: int threshold: float | None received_at: str
# --- PAD output (what PAD produces, merged into context) ---class PadOut(TypedDict): pad: dict # {"spoof_score": float, "attack_type": str, "confidence": float}
# --- MAD output ---class MadOut(TypedDict): morphing: dict # {"morph_score": float, "confidence": float}
# --- Detect output ---class DetectOut(TypedDict): detections: list # [{bbox, score, landmarks}, ...]
# --- Align output ---class AlignOut(TypedDict): crop: bytes # 112x112 aligned face crop
# --- Quality output ---class QualityOut(TypedDict): quality: dict # {"score": float, "blur": float, ...}
# --- Extract output ---class ExtractOut(TypedDict): template: dict # {"vector": [float32 x 512], "model_id": str}
# --- Executor outputs ---class SearchResultOut(TypedDict): operation: str result: dict
class VerifyResultOut(TypedDict): operation: str result: dict
class EnrolResultOut(TypedDict): operation: str result: dict
class DeleteResultOut(TypedDict): operation: str result: dict
# --- Respond output ---class RespondOut(TypedDict): http_status: int response_body: bytes content_type: str latency_ms: floatThe runner uses requires and produces on each Stage to select and merge keys. The TypedDicts above are the reference contracts — they can also serve as runtime validators in development mode (e.g., via TypeAdapter from pydantic or a simple key-set assertion).
Accretion, not replacement: Each stage adds keys to the context. No stage overwrites another stage’s keys (with the deliberate exception of image_bytes being dropped by Align). The context grows monotonically.
Accretion Policy
Section titled “Accretion Policy”Stages may add new keys to their produced dicts but must never rename or remove existing keys. This follows the “provide more, require less” principle: downstream consumers that depend on a key will always find it, and new keys are invisible to stages that do not declare them in requires. Adding a key is a compatible change; renaming or removing is a breaking change.
Schema Derivation
Section titled “Schema Derivation”The TypedDict schemas above are maintained manually for documentation, but in implementation the canonical source of truth is the requires and produces declarations on each Stage. A build-time or startup-time check can derive the expected key sets from the Stage list and validate that the TypedDicts remain consistent, eliminating redundant specification drift.
Stage Definitions
Section titled “Stage Definitions”Stage 1: Receive
Section titled “Stage 1: Receive”Purpose: Parse the raw inbound XML payload into plain Python values. All XML parsing complexity is contained here and nowhere else. The Receive stage does not generate trace_id — the inbound adapter or runner does that before the message enters the pipeline.
Requires: raw_payload, source_ip, received_at
Produces: operation, image_bytes, subject_id, partition, filters, top_k, threshold, received_at
Input (from inbound adapter / HTTP POST body):
{ "raw_payload": bytes, # Raw inbound XML body "source_ip": str, # For audit log only "received_at": str # ISO-8601 timestamp (UTC)}Output (on success):
{ "operation": str, # "SEARCH" | "VERIFY" | "ENROL" | "DELETE" "image_bytes": bytes | None, # Raw JPEG/PNG, present for SEARCH/VERIFY/ENROL "subject_id": str | None, # Present for VERIFY/DELETE "partition": str, # Gallery partition ID e.g. "IABS", "IDENT1" "filters": dict, # Metadata filters: {"gender": "M", ...} or {} "top_k": int, # For SEARCH: max candidates (default 50) "threshold": float | None, # Optional override; None means use system default "received_at": str # Propagated from input}The inbound adapter generates a trace_id UUID and places it on the runner context before the message reaches Receive. Receive is purely XML parsing — it has no transport concerns.
Output (on failure):
{ "ok": False, "error": { "code": "PARSE_ERROR" | "SCHEMA_VIOLATION" | "UNSUPPORTED_OPERATION", "reason": str, "stage": "receive" }}Error handling:
- Malformed XML ->
PARSE_ERROR - Missing required fields ->
SCHEMA_VIOLATION - Unknown operation type ->
UNSUPPORTED_OPERATION - No exception propagates beyond this function boundary
Routing (embedded in Stage):
- Successes: the Stage’s gate routes
DELETEoperations directly to the Route queue (bypassing PAD/MAD/Detect/Align/Quality/Extract) and all other operations to the PAD queue - Failures: the runner short-circuits to the Respond queue
Stage 1.5: PAD (Presentation Attack Detection)
Section titled “Stage 1.5: PAD (Presentation Attack Detection)”Purpose: Run unified physical+digital attack detection on the raw image before any other processing. Produces a spoof score as data. Does not make the rejection decision — that is a configurable gate on the Stage, applied by the runner after this function returns.
Covers: print attacks, replay attacks, 3D masks, deepfakes, face swaps, GAN-generated faces. Uses a ViT-based unified model (e.g., S-Adapter or UniAttack approach) exported via ONNX.
Requires: image_bytes
Produces: pad
Input (payload selected by runner):
{ "image_bytes": bytes # Raw image}Output (on success):
{ "pad": { "spoof_score": float, # [0.0, 1.0]; higher = more likely spoof "attack_type": str, # e.g. "print", "replay", "deepfake", "live" "confidence": float # Model confidence in classification [0.0, 1.0] }}Output (on failure):
{ "ok": False, "error": { "code": "PAD_INFERENCE_ERROR", "reason": str, "stage": "pad" }}Error handling:
- ONNX Runtime fault ->
PAD_INFERENCE_ERROR - The stage function always returns the spoof score as data if inference succeeds
PAD_REJECTEDis emitted by the gate on the PAD Stage (see below), not by the stage function itself
Gate (configurable policy, embedded in Stage):
The gate is a pure routing predicate applied by the runner after the stage function returns. It reads from the runner context (which now includes the PAD result):
from functools import partial
def pad_gate(spoof_score_threshold: float, ctx: dict) -> str: """Single-concern predicate: spoof score above threshold? Config injected via partial. Does NOT inspect operation type.""" if ctx.get("pad", {}).get("spoof_score", 0.0) > spoof_score_threshold: return "respond_q" # Runner emits PAD_REJECTED error envelope return "enrol_router_q"
# Construction: config values bound at startup, not closed overpad_gate_configured = partial(pad_gate, spoof_score_threshold=0.85)The threshold is injected via functools.partial at construction time. No global config dict is captured in a closure. Changing the threshold requires only changing the construction site.
After PAD passes, the context is routed to a separate enrol_router stage that handles operation-type branching. This keeps pad_gate as a single-concern predicate (spoof detection) without entangling it with operation routing logic.
def enrol_router(ctx: dict) -> str: """Routes ENROL operations to MAD, all others to Detect.""" if ctx["operation"] == "ENROL": return "mad_q" return "detect_q"Routing (PAD + Enrol Router combined):
- Spoof score > threshold -> Respond queue (runner wraps as
PAD_REJECTEDerror envelope) - Spoof score <= threshold, ENROL operation -> MAD queue (via enrol_router)
- Spoof score <= threshold, all other operations -> Detect queue (via enrol_router)
Stage 1.6: MAD (Morphing Attack Detection)
Section titled “Stage 1.6: MAD (Morphing Attack Detection)”Purpose: Detect morphing attacks in enrollment images. Morphing attacks blend two identities into a single passport/ID photo; they are the primary document-based attack vector in border control contexts. This stage runs only for ENROL operations.
Uses a CLIP+LoRA foundation model approach (e.g., MADation) for strong cross-domain generalisation. Exported via ONNX.
Requires: image_bytes
Produces: morphing
Input (payload selected by runner):
{ "image_bytes": bytes}Output (on success):
{ "morphing": { "morph_score": float, # [0.0, 1.0]; higher = more likely morphed "confidence": float # Model confidence [0.0, 1.0] }}Output (on failure):
{ "ok": False, "error": { "code": "MAD_INFERENCE_ERROR", "reason": str, "stage": "mad" }}Error handling:
- ONNX Runtime fault ->
MAD_INFERENCE_ERROR MORPHING_DETECTEDis emitted by the gate on the MAD Stage, not by the stage function itself- The stage function always returns the morph score as data if inference succeeds
Gate (configurable policy, embedded in Stage):
from functools import partial
def mad_gate(morph_score_threshold: float, ctx: dict) -> str: if ctx.get("morphing", {}).get("morph_score", 0.0) > morph_score_threshold: return "respond_q" # Runner emits MORPHING_DETECTED error envelope return "detect_q"
mad_gate_configured = partial(mad_gate, morph_score_threshold=0.75)Routing:
- Morph score <= threshold -> Detect queue
- Morph score > threshold -> Respond queue (runner wraps as
MORPHING_DETECTEDerror envelope)
Stage 2: Detect
Section titled “Stage 2: Detect”Purpose: Run face detection on the raw image via DetectionPort.detect(). Produce face bounding boxes, five-point landmarks, and confidence scores. Nothing more.
Note: SCRFD_10G (InsightFace) replaces RetinaFace as the detection model. It achieves better accuracy on the WiderFace Hard split (~1% gain on occluded/small faces) using the same ONNX export path and 5-point landmark format. The DetectionPort adapter wraps the ONNX model; the stage function calls DetectionPort.detect(image_bytes) and translates the returned list[Detection] domain types into pipeline dicts.
Requires: image_bytes
Produces: detections
Input (payload selected by runner):
{ "image_bytes": bytes # Raw image data}Output (on success):
{ "detections": [ # List of detected faces, ordered by confidence desc { "bbox": [x1, y1, x2, y2], # Normalised to [0, 1], float32 "score": float, # Detection confidence [0.0, 1.0] "landmarks": [ # Five landmarks: left_eye, right_eye, nose, [x, y], # mouth_left, mouth_right [x, y], [x, y], [x, y], [x, y] ] } ]}Output (on failure):
{ "ok": False, "error": { "code": "NO_FACE_DETECTED" | "MULTIPLE_FACES" | "INFERENCE_ERROR", "reason": str, "stage": "detect" }}Error handling:
- Zero faces found ->
NO_FACE_DETECTED - More than one face above confidence threshold (configurable, default 0.85) ->
MULTIPLE_FACES(policy: reject ambiguous inputs; the caller must provide a single-face image) - ONNX Runtime fault ->
INFERENCE_ERROR
Routing: Detect -> Align queue (fixed queue_out)
Scaling note: This stage is CPU/GPU-bound. Scale horizontally by adding more Detect workers consuming from the same queue.
Stage 3: Align
Section titled “Stage 3: Align”Purpose: Apply an affine similarity transform to produce a single, normalised 112x112 pixel crop. All spatial variation is eliminated here.
Requires: image_bytes, detections
Produces: crop
Input (payload selected by runner):
{ "image_bytes": bytes, "detections": [ ... ] # From Detect stage}Output (on success):
{ "crop": bytes # PNG-encoded 112x112 aligned face crop}After Align produces its output, the runner drops image_bytes from the context. This enforces the “no raw imagery persisted” policy structurally, not procedurally. Only the 112x112 crop continues downstream.
Output (on failure):
{ "ok": False, "error": { "code": "ALIGNMENT_FAILED", "reason": str, "stage": "align" }}Error handling:
- Degenerate landmark geometry (colinear points, out-of-bounds) ->
ALIGNMENT_FAILED - The function selects the highest-confidence detection from the
detectionslist. Multi-face rejection is already handled by Detect.
Privacy note: image_bytes (the full original image) is explicitly dropped from the context at this stage boundary. The runner handles this via a drops declaration on the Stage dataclass.
Routing: Align -> Quality queue (fixed queue_out)
Stage 4: Assess Quality
Section titled “Stage 4: Assess Quality”Purpose: Score the aligned crop for biometric quality. The stage produces a quality score as data. The rejection decision is made by the gate on this Stage, not by the stage function itself.
Reference implementation: OFIQ (Open Source Face Image Quality), the BSI/eu-LISA reference implementation for ISO/IEC 29794-5. OFIQ is the mandatory quality standard for European border control and identity systems. It is a pure measurement function that satisfies the QualityPort protocol.
Requires: crop
Produces: quality
Input (payload selected by runner):
{ "crop": bytes # 112x112 aligned face crop}Output (on success):
{ "quality": { "score": float, # Overall quality [0.0, 1.0] per ISO 29794-5 "blur": float, # Component scores for diagnostics "illumination": float, "pose_yaw": float, # Degrees "pose_pitch": float, "pose_roll": float }}Output (on failure):
{ "ok": False, "error": { "code": "INFERENCE_ERROR", "reason": str, "stage": "quality" }}Error handling:
- ONNX Runtime fault ->
INFERENCE_ERROR - Quality rejection (
QUALITY_REJECTED) is handled by the gate, not the stage function
Gate (configurable policy, embedded in Stage):
from functools import partial
def quality_gate(min_score: float, ctx: dict) -> str: if ctx.get("quality", {}).get("score", 0.0) < min_score: return "respond_q" # Runner emits QUALITY_REJECTED error envelope return "extract_q"
quality_gate_configured = partial(quality_gate, min_score=0.40)When the gate routes to respond_q, the runner constructs the error envelope including the full quality dict for diagnostics:
{ "ok": False, "error": { "code": "QUALITY_REJECTED", "reason": str, # e.g. "blur=0.12 below threshold=0.40" "stage": "quality", "quality": { ... } # Full quality dict included for diagnostics/feedback }}Routing:
- Quality score >= threshold -> Extract queue
- Quality score < threshold -> Respond queue (as
QUALITY_REJECTEDerror envelope)
Stage 5: Extract
Section titled “Stage 5: Extract”Purpose: Run the ViT model on the aligned crop. Produce the 512-dimensional, L2-normalised float32 template vector. This is the most compute-intensive stage.
Requires: crop
Produces: template
Input (payload selected by runner):
{ "crop": bytes # 112x112 aligned face crop}Output (on success):
{ "template": { "vector": [float32 x 512], # L2-normalised; norm == 1.0 is an invariant "model_id": str # e.g. "vit-base-adaface-v1.0" }}After Extract, the runner drops crop from the context. No raw imagery remains.
Output (on failure):
{ "ok": False, "error": { "code": "EXTRACTION_FAILED" | "INFERENCE_ERROR", "reason": str, "stage": "extract" }}Error handling:
- Output vector norm deviates from 1.0 by more than 1e-5 ->
EXTRACTION_FAILED(invariant violation; the function self-validates its output) - ONNX Runtime fault ->
INFERENCE_ERROR model_idis stamped on every template so future schema migrations are traceable
Routing: Extract -> route_q (fixed queue_out)
Stage 6a: Route (Operation Router)
Section titled “Stage 6a: Route (Operation Router)”Purpose: A pure routing gate embedded in the Stage dataclass. It inspects the operation discriminant and returns the target queue name. It does not execute any FAISS operations — it only decides where the message goes next. This is the pipeline’s branching point into operation-specific executor stages.
DELETE operations arrive here directly from Receive (bypassing the image pipeline). SEARCH, VERIFY, and ENROL arrive after extraction.
The router is implemented as a gate on the Stage:
def operation_router(ctx: dict) -> str: """Pure routing function. Returns the name of the target queue.""" return { "SEARCH": "search_q", "VERIFY": "verify_q", "ENROL": "enrol_q", "DELETE": "delete_q", }[ctx["operation"]]Routing: route_q -> search_q | verify_q | enrol_q | delete_q
Stage 6b: Search Executor
Section titled “Stage 6b: Search Executor”Purpose: Execute a 1:N FAISS search. Vector in, ranked candidates out. This is the boundary where pipeline dicts become domain types (see “Translation Boundary” below).
Requires: operation, partition, filters, top_k, threshold, template
Produces: operation, result
Input (payload selected by runner):
{ "operation": "SEARCH", "partition": str, "filters": dict, "top_k": int, "threshold": float | None, "template": { "vector": [...], "model_id": str }}Output (success):
{ "operation": "SEARCH", "result": { "candidates": [ { "subject_id": str, "score": float, # Inner product similarity [0.0, 1.0] "rank": int # 1-indexed } ], "searched_partition": str, "filters_applied": dict, "shards_queried": int, "latency_ms": float }}Error codes: SHARD_UNAVAILABLE | FAISS_ERROR
Routing: search_q -> respond_q (fixed queue_out)
Stage 6c: Verify Executor
Section titled “Stage 6c: Verify Executor”Purpose: Execute a 1:1 FAISS comparison. Probe template vs enrolled template for a given subject_id. Returns the score and a threshold-based decision as data.
Requires: operation, subject_id, partition, threshold, template
Produces: operation, result
Input (payload selected by runner):
{ "operation": "VERIFY", "subject_id": str, "partition": str, "threshold": float | None, "template": { "vector": [...], "model_id": str }}Output (success):
{ "operation": "VERIFY", "result": { "subject_id": str, "score": float, "decision": "MATCH" | "NON_MATCH", # Applied threshold decision as data "threshold_used": float }}Error codes: SUBJECT_NOT_FOUND | SHARD_UNAVAILABLE | FAISS_ERROR
Routing: verify_q -> respond_q (fixed queue_out)
Stage 6d: Enrol Executor
Section titled “Stage 6d: Enrol Executor”Purpose: Add a new template vector to the FAISS index. Returns the assigned shard and vector index.
Requires: operation, subject_id, partition, template
Produces: operation, result
Input (payload selected by runner):
{ "operation": "ENROL", "subject_id": str | None, "partition": str, "template": { "vector": [...], "model_id": str }}Output (success):
{ "operation": "ENROL", "result": { "subject_id": str, # Assigned or confirmed "shard_id": str, # Which shard holds this vector "vector_index": int # FAISS internal ID, for Delete }}Error codes: DUPLICATE_ENROL | SHARD_UNAVAILABLE | FAISS_ERROR
Routing: enrol_q -> respond_q (fixed queue_out)
Stage 6e: Delete Executor
Section titled “Stage 6e: Delete Executor”Purpose: Remove a subject’s template from the FAISS index. Arrives directly from Receive (no image processing needed).
Requires: operation, subject_id, partition
Produces: operation, result
Input (payload selected by runner):
{ "operation": "DELETE", "subject_id": str, "partition": str}Output (success):
{ "operation": "DELETE", "result": { "subject_id": str, "deleted": True }}Error codes: SUBJECT_NOT_FOUND | FAISS_ERROR
Routing: delete_q -> respond_q (fixed queue_out)
Shared executor error handling:
- SEARCH: if fewer than
min_shards(configurable) respond within timeout ->SHARD_UNAVAILABLE(partial results are not surfaced; a degraded answer is worse than an honest error in a law enforcement context) - DELETE for unknown
subject_id->SUBJECT_NOT_FOUND - ENROL for already-present
subject_id->DUPLICATE_ENROL(idempotent enrol is a separate operation flag, not the default)
All executor error outputs follow the standard envelope:
{ "ok": False, "error": { "code": str, "reason": str, "stage": "search" | "verify" | "enrol" | "delete" }}Stage 7: Respond
Section titled “Stage 7: Respond”Purpose: Format the plain result dict back into the outbound XML response format. All serialisation complexity is contained here and nowhere else.
Requires: operation, received_at (always present); result, error, ok (present depending on success/error path)
Produces: http_status, response_body, content_type, latency_ms
The Respond stage handles both success and error paths. On the success path, result is present and ok/error are absent. On the error path (runner short-circuit or gate rejection), ok is False and error contains the error envelope. The runner selects only the keys that exist in the context (the if k in ctx guard in payload construction), so Respond receives whichever subset is available.
The runner injects trace_id into the Respond stage’s payload via the inject_envelope_keys declaration on the Stage dataclass. This is the only stage boundary where envelope keys cross into the payload — and it is explicit in the pipeline definition, not hidden in runner logic.
Input (payload selected by runner, with trace_id injected):
{ "trace_id": str, # Injected by runner via inject_envelope_keys "operation": str, # Always present "received_at": str, # Always present, for latency calculation "ok": bool | None, # Present on error path (False); absent on success "result": dict | None, # Present on success path "error": dict | None # Present on error path}The Respond function checks ok to decide between success and error response construction:
async def respond(payload: dict) -> dict: if payload.get("ok") is False: # Error path: format error response from payload["error"] return build_error_response(payload) else: # Success path: format result response from payload["result"] return build_success_response(payload)Output:
{ "http_status": int, # 200 | 400 | 422 | 500 | 503 "response_body": bytes, # XML-encoded response (includes trace_id) "content_type": str, # "application/xml" "latency_ms": float # End-to-end: received_at -> now}Error handling:
- All failure envelopes from any prior stage are mapped to appropriate error response codes here
- Serialisation fault -> 500 with a minimal valid error XML
Routing: Respond -> output (HTTP response / gRPC reply)
This stage is the only place that knows about the XML response format. No other stage touches serialisation.
Queue Topology
Section titled “Queue Topology”Inbound (adapter generates trace_id, builds initial context) | v[receive_q] --> receive --> gate --> (DELETE) --------------------------------> [route_q] \-> (others) --> [pad_q] | v pad --> gate --> (PAD_REJECTED) -----> [respond_q] \-> (passed) ---------> [enrol_router_q] | v enrol_router --> (ENROL) --> [mad_q] \-> (others) -> [detect_q] | [mad_q] (ENROL only) | | | v | mad --> gate --> (MORPHING_DETECTED) -----------> [respond_q] \-> (clean) ----------------------> [detect_q] | v detect (fixed -> align_q) | v align (fixed -> quality_q) | v quality --> gate --> (REJECTED) -> [respond_q] \-> (accepted) -> [extract_q] | v extract (fixed -> route_q) | v[route_q] --> gate --> [search_q] --> search_executor (fixed -> respond_q) \-> [verify_q] --> verify_executor (fixed -> respond_q) \-> [enrol_q] --> enrol_executor (fixed -> respond_q) \-> [delete_q] --> delete_executor (fixed -> respond_q)
[respond_q] --> respond (runner injects trace_id) --> Outbound (HTTP/gRPC)
Runner short-circuit: any stage error envelope --> [respond_q] (no further stages called)Queue implementation:
- In-process:
asyncio.Queue(single-host development / small deployments) - Production: Redis Streams or Kafka topics (one topic per stage boundary)
- The queue abstraction is a port. Swapping implementations requires no changes to any stage function.
Backpressure
Section titled “Backpressure”Extract is the pipeline bottleneck at ~120ms p99. At 1,900 searches/min (~32/sec), each queue must absorb burst traffic without unbounded growth. Bounded queues enforce backpressure: producers block when the downstream queue is full, propagating load signals upstream to the inbound adapter.
Queue size defaults:
| Queue | maxsize | Rationale |
|---|---|---|
receive_q | 128 | Inbound buffer; sized for ~4s burst at peak rate |
pad_q | 64 | PAD at 30ms can drain ~33/sec; 2s buffer |
mad_q | 16 | ENROL-only path; low traffic |
detect_q | 64 | Detect at 80ms can drain ~12/sec; 5s buffer |
align_q | 64 | Align at 2ms is never the bottleneck |
quality_q | 64 | Quality at 15ms drains fast |
extract_q | 32 | Bottleneck stage; smaller buffer to propagate backpressure quickly |
enrol_router_q | 64 | Pure routing after PAD, near-instant |
route_q | 64 | Pure routing, near-instant |
search_q | 32 | FAISS search at 50ms; ~1s buffer |
verify_q | 32 | Similar to search |
enrol_q | 16 | Lower volume than search |
delete_q | 16 | Lowest volume path |
respond_q | 128 | Must not drop responses; generous buffer |
Saturation behavior:
- Development (
asyncio.Queue):asyncio.Queue(maxsize=N)—put()awaits (non-blocking coroutine yield) when the queue is full. The producing stage’s runner loop naturally pauses until space is available. - Production (Redis Streams):
XADDwithMAXLEN ~Nprovides approximate trimming. For strict backpressure, the adapter checksXLENbefore submitting; if at capacity, it rejects the request. - Inbound adapter under saturation: When
receive_qis full, the inbound adapter returns HTTP 503 Service Unavailable (REST) or gRPCRESOURCE_EXHAUSTED(gRPC). The upstream biometric provider is expected to retry with exponential backoff. This is an honest signal — a degraded acceptance is worse than a transparent rejection.
Stage Dataclass and Composition
Section titled “Stage Dataclass and Composition”Routing is embedded in the Stage dataclass. A stage either has a fixed queue_out (unconditional routing) or a gate callable (dynamic routing). There is no separate Gate type — gates are attributes of stages.
from collections.abc import Awaitablefrom dataclasses import dataclassfrom typing import Callablefrom functools import partial
@dataclass(frozen=True)class Stage: name: str queue_in: str fn: Callable[[dict], Awaitable[dict]] # All stage functions are async requires: frozenset[str] # Keys selected from context for payload produces: frozenset[str] # Keys merged back into context queue_out: str | None = None # Fixed routing (mutually exclusive with gate) gate: Callable[[dict], str] | None = None # Dynamic routing drops: frozenset[str] = frozenset() # Keys removed from context after this stage inject_envelope_keys: frozenset[str] = frozenset() # Envelope keys injected into payload (e.g. trace_id for Respond)All stage functions are async. Pure computation stages (e.g., receive, align) use async def for type uniformity — they don’t need to await anything internally but conform to the single await stage.fn(payload) calling convention used by the runner. I/O-bound stages (e.g., executors calling orchestrators, PAD/MAD calling ONNX) naturally use await for their async operations.
Invariant: Exactly one of queue_out or gate must be set. The runner validates this at startup.
Gate Construction with Explicit Config
Section titled “Gate Construction with Explicit Config”Gates receive config values as arguments via functools.partial at construction time. No gate captures a global config dict in a closure. This makes dependencies visible and testable:
# --- Gate functions: config params are explicit arguments ---
# pad_gate is a single-concern predicate: does the spoof score exceed the threshold?# Routing by operation type is a separate concern handled by enrol_router.def pad_gate(spoof_score_threshold: float, ctx: dict) -> str: if ctx.get("pad", {}).get("spoof_score", 0.0) > spoof_score_threshold: return "respond_q" return "enrol_router_q"
def enrol_router(ctx: dict) -> str: """Routes ENROL operations to MAD, all others to Detect.""" if ctx["operation"] == "ENROL": return "mad_q" return "detect_q"
def mad_gate(morph_score_threshold: float, ctx: dict) -> str: if ctx.get("morphing", {}).get("morph_score", 0.0) > morph_score_threshold: return "respond_q" return "detect_q"
def quality_gate(min_score: float, ctx: dict) -> str: if ctx.get("quality", {}).get("score", 0.0) < min_score: return "respond_q" return "extract_q"
def receive_gate(ctx: dict) -> str: if ctx["operation"] == "DELETE": return "route_q" return "pad_q"
def operation_router(ctx: dict) -> str: return { "SEARCH": "search_q", "VERIFY": "verify_q", "ENROL": "enrol_q", "DELETE": "delete_q", }[ctx["operation"]]Pipeline Definition
Section titled “Pipeline Definition”# Each stage is an async function: dict -> Awaitable[dict]# Pure computation stages use async def for type uniformity.async def receive(payload: dict) -> dict: ...async def pad(payload: dict) -> dict: ...async def mad(payload: dict) -> dict: ...async def detect(payload: dict) -> dict: ...async def align(payload: dict) -> dict: ...async def assess_quality(payload: dict) -> dict: ...async def extract(payload: dict) -> dict: ...async def search_executor(payload: dict) -> dict: ...async def verify_executor(payload: dict) -> dict: ...async def enrol_executor(payload: dict) -> dict: ...async def delete_executor(payload: dict) -> dict: ...async def respond(payload: dict) -> dict: ...
async def _identity(payload: dict) -> dict: """Async identity function for routing-only stages.""" return payload
# Construction: bind config values at startupcfg = load_config()
PIPELINE = [ Stage( name="receive", queue_in="receive_q", fn=receive, requires=frozenset({"raw_payload", "source_ip", "received_at"}), produces=frozenset({"operation", "image_bytes", "subject_id", "partition", "filters", "top_k", "threshold", "received_at"}), gate=receive_gate, ), Stage( name="pad", queue_in="pad_q", fn=pad, requires=frozenset({"image_bytes"}), produces=frozenset({"pad"}), gate=partial(pad_gate, spoof_score_threshold=cfg["pad"]["spoof_score_threshold"]), ), Stage( name="enrol_router", queue_in="enrol_router_q", fn=_identity, # async identity; routing only requires=frozenset({"operation"}), produces=frozenset(), gate=enrol_router, ), Stage( name="mad", queue_in="mad_q", fn=mad, requires=frozenset({"image_bytes"}), produces=frozenset({"morphing"}), gate=partial(mad_gate, morph_score_threshold=cfg["mad"]["morph_score_threshold"]), ), Stage( name="detect", queue_in="detect_q", fn=detect, requires=frozenset({"image_bytes"}), produces=frozenset({"detections"}), queue_out="align_q", ), Stage( name="align", queue_in="align_q", fn=align, requires=frozenset({"image_bytes", "detections"}), produces=frozenset({"crop"}), queue_out="quality_q", drops=frozenset({"image_bytes"}), ), Stage( name="quality", queue_in="quality_q", fn=assess_quality, requires=frozenset({"crop"}), produces=frozenset({"quality"}), gate=partial(quality_gate, min_score=cfg["quality"]["min_score"]), ), Stage( name="extract", queue_in="extract_q", fn=extract, requires=frozenset({"crop"}), produces=frozenset({"template"}), queue_out="route_q", drops=frozenset({"crop"}), ), Stage( name="route", queue_in="route_q", fn=_identity, # async identity; routing only requires=frozenset({"operation"}), produces=frozenset(), gate=operation_router, ), Stage( name="search", queue_in="search_q", fn=search_executor, requires=frozenset({"operation", "partition", "filters", "top_k", "threshold", "template"}), produces=frozenset({"operation", "result"}), queue_out="respond_q", ), Stage( name="verify", queue_in="verify_q", fn=verify_executor, requires=frozenset({"operation", "subject_id", "partition", "threshold", "template"}), produces=frozenset({"operation", "result"}), queue_out="respond_q", ), Stage( name="enrol", queue_in="enrol_q", fn=enrol_executor, requires=frozenset({"operation", "subject_id", "partition", "template"}), produces=frozenset({"operation", "result"}), queue_out="respond_q", ), Stage( name="delete", queue_in="delete_q", fn=delete_executor, requires=frozenset({"operation", "subject_id", "partition"}), produces=frozenset({"operation", "result"}), queue_out="respond_q", ), Stage( name="respond", queue_in="respond_q", fn=respond, requires=frozenset({"operation", "received_at", "result", "error", "ok"}), produces=frozenset({"http_status", "response_body", "content_type", "latency_ms"}), queue_out="output_q", inject_envelope_keys=frozenset({"trace_id"}), ),]Runner
Section titled “Runner”The runner manages the context, selects payloads, merges outputs, checks for errors, and handles routing. Stages are oblivious to all of this.
The context is treated as a value: every mutation produces a new dict rather than modifying the existing one in place. This follows the “values over state” principle — a context dict, once placed on a queue, is never modified by the producer. Each stage boundary yields a fresh context value.
async def run_stage(stage: Stage, queues: dict[str, asyncio.Queue]): async for ctx in queues[stage.queue_in]: # --- Error short-circuit: runner checks BEFORE calling stage --- if ctx.get("ok") is False: await queues["respond_q"].put(ctx) continue
# --- Build payload: select only the keys this stage requires --- payload = {k: ctx[k] for k in stage.requires if k in ctx}
# --- Inject envelope keys if declared (e.g. trace_id for Respond) --- for key in stage.inject_envelope_keys: payload[key] = ctx[key]
# --- Call stage function (all stages are async) --- output = await stage.fn(payload)
# --- Check for stage error --- if output.get("ok") is False: # New context value with error — original ctx is not mutated ctx = {**ctx, "error": output["error"], "ok": False} await queues["respond_q"].put(ctx) continue
# --- Accrete output into context (new dict, not in-place mutation) --- ctx = {**ctx, **output}
# --- Drop keys if declared (e.g. image_bytes after Align) --- if stage.drops: ctx = {k: v for k, v in ctx.items() if k not in stage.drops}
# --- Route to next queue --- if stage.gate is not None: target = stage.gate(ctx) await queues[target].put(ctx) elif stage.queue_out is not None: await queues[stage.queue_out].put(ctx)
# Boot: start one (or N) worker tasks per stagefor stage in PIPELINE: asyncio.create_task(run_stage(stage, queues))Each stage function is a pure function from payload dict to output dict. The runner manages the context (selecting keys, accreting results, dropping keys, checking errors, routing). The pipeline definition is a plain data structure. None of these concerns are entangled with each other. The value-oriented context means that no stage can observe the side effects of another stage’s runner logic — the dict on the queue is always a fresh snapshot.
Translation Boundary: Pipeline Dicts to Domain Types
Section titled “Translation Boundary: Pipeline Dicts to Domain Types”The executor stages (search_executor, verify_executor, enrol_executor, delete_executor) are the translation boundary between the dict-oriented pipeline and the typed domain layer.
Upstream of the executors, all data flows as plain dicts — no domain types, no imports from core.domain. This keeps the pipeline stages simple, testable, and decoupled from the domain model.
Inside each executor, the incoming payload dict is translated into domain types (e.g., EmbeddingVector, SearchRequest, VerifyRequest) before calling into the orchestrator. The orchestrator response is then translated back into a plain dict for the pipeline’s Respond stage.
Dependency Injection via Closures
Section titled “Dependency Injection via Closures”Executor stage functions need access to their orchestrator instance, but the pipeline Stage.fn signature is Callable[[dict], Awaitable[dict]] — a stage function receives only its payload. The orchestrator dependency is injected at construction time via functools.partial, consistent with how gates receive their config:
from functools import partialfrom dataclasses import asdict
import numpy as np
from core.domain.types import EmbeddingVector, SearchRequest, BiometricTemplatefrom core.orchestration.search import SearchOrchestrator
async def search_executor(orchestrator: SearchOrchestrator, payload: dict) -> dict: """Translation boundary: dict -> domain types -> orchestrator -> dict.
The orchestrator parameter is bound at construction via partial. At runtime the runner calls search_executor(payload) — the orchestrator is already captured in the closure. """ # --- Translation boundary: dict -> domain types --- embedding = EmbeddingVector( vector=np.array(payload["template"]["vector"], dtype=np.float32), model_id=payload["template"]["model_id"], expected_dim=512, ) request = SearchRequest( # ... build typed request from payload dict )
# --- Domain call (typed, via injected orchestrator) --- result = await orchestrator.search(request)
# --- Translation boundary: domain types -> dict --- return { "operation": "SEARCH", "result": { "candidates": [asdict(c) for c in result.candidates], "searched_partition": str(request.partitions), "hit": result.hit, } }
# --- Construction: bind orchestrator at startup, same pattern as gates ---search_orch = SearchOrchestrator(vectors=faiss_adapter, clock=clock_adapter)search_executor_fn = partial(search_executor, search_orch)
# The Stage receives the partially-applied function:# Stage(name="search", ..., fn=search_executor_fn, ...)The same pattern applies to all four executors:
verify_executor_fn = partial(verify_executor, verify_orch)enrol_executor_fn = partial(enrol_executor, enrol_orch)delete_executor_fn = partial(delete_executor, delete_orch)This pattern ensures:
- Pipeline stages (Receive through Extract) never import domain types. They operate on plain dicts and can be tested with dict literals.
- Domain orchestrators (in
core.orchestration) never see pipeline dicts. They receive typed, validated domain objects. - The boundary is explicit and narrow. Each executor is a thin adapter — it does not contain business logic. The logic lives in the orchestrators.
- Dependencies are visible. The orchestrator is an explicit argument bound via
partial, not a hidden global or module-level import. Testing an executor with a stub orchestrator is trivial.
Orchestration Layer
Section titled “Orchestration Layer”The executor stages (search, verify, enrol, delete) translate pipeline dicts into domain types and delegate to orchestrators that compose port calls with pure domain functions. The orchestrators live in src/core/orchestration/ and form the bridge between the dict-oriented pipeline and the typed domain layer.
Canonical definition: The full orchestrator implementations, port splits, and design rationale are in domain-design.md, Section 4: Service Orchestrators. This section summarises only the port wiring relevant to pipeline construction.
Orchestrator Port Summary
Section titled “Orchestrator Port Summary”Each orchestrator receives only the split ports it needs — no god-object that holds every adapter:
| Orchestrator | Ports injected |
|---|---|
SearchOrchestrator | VectorSearchPort, ClockPort |
VerifyOrchestrator | VectorLookupPort, ClockPort |
EnrolOrchestrator | VectorMutationPort, ClockPort |
DeleteOrchestrator | VectorMutationPort, ClockPort |
A single FAISS adapter may implement all three vector ports. That is fine — the adapter composes; the domain does not know.
Wiring Orchestrators into Executor Stages
Section titled “Wiring Orchestrators into Executor Stages”Orchestrator instances are constructed at startup with their port dependencies, then injected into executor stage functions via functools.partial (see “Translation Boundary” above for the full pattern):
from functools import partialfrom core.orchestration.search import SearchOrchestratorfrom core.orchestration.verify import VerifyOrchestratorfrom core.orchestration.enrol import EnrolOrchestratorfrom core.orchestration.delete import DeleteOrchestrator
# --- Construct orchestrators with real adapters ---search_orch = SearchOrchestrator(vectors=faiss_adapter, clock=clock_adapter)verify_orch = VerifyOrchestrator(lookup=faiss_adapter, clock=clock_adapter)enrol_orch = EnrolOrchestrator(mutations=faiss_adapter, clock=clock_adapter)delete_orch = DeleteOrchestrator(mutations=faiss_adapter, clock=clock_adapter)
# --- Bind into stage functions ---search_executor_fn = partial(search_executor, search_orch)verify_executor_fn = partial(verify_executor, verify_orch)enrol_executor_fn = partial(enrol_executor, enrol_orch)delete_executor_fn = partial(delete_executor, delete_orch)These partially-applied functions satisfy the Callable[[dict], Awaitable[dict]] contract expected by Stage.fn.
Testing Strategy
Section titled “Testing Strategy”Because each stage is a pure function that receives only the keys it needs, unit testing is focused and minimal:
# Stage 3 (Align) unit test — no ONNX, no queue, no HTTP# The stage receives ONLY image_bytes and detections (its requires set)def test_align_produces_crop(): payload = { "image_bytes": b"<fake raw image>", "detections": [{ "bbox": [10, 10, 110, 110], "score": 0.97, "landmarks": [[30,40],[80,40],[55,65],[35,90],[75,90]] }], } result = align(payload) assert "crop" in result # Output invariant assert "image_bytes" not in result # Stage only produces what it declares assert "trace_id" not in result # trace_id is on the runner context, not the payload
def test_align_returns_error_as_data_on_bad_landmarks(): payload = { "image_bytes": b"<fake>", "detections": [{"bbox": [0,0,10,10], "score": 0.9, "landmarks": [[0,0],[0,0],[0,0],[0,0],[0,0]]}], } result = align(payload) assert result["ok"] is False assert result["error"]["stage"] == "align" assert result["error"]["code"] == "ALIGNMENT_FAILED"
# Gate unit test — pad_gate is a single-concern spoof predicatedef test_pad_gate_rejects_spoof(): ctx = {"pad": {"spoof_score": 0.95}, "operation": "SEARCH"} assert pad_gate(spoof_score_threshold=0.85, ctx=ctx) == "respond_q"
def test_pad_gate_passes_to_enrol_router(): ctx = {"pad": {"spoof_score": 0.10}, "operation": "ENROL"} assert pad_gate(spoof_score_threshold=0.85, ctx=ctx) == "enrol_router_q"
# enrol_router handles operation-type branching (separate concern from spoof detection)def test_enrol_router_routes_enrol_to_mad(): ctx = {"operation": "ENROL"} assert enrol_router(ctx) == "mad_q"
def test_enrol_router_routes_search_to_detect(): ctx = {"operation": "SEARCH"} assert enrol_router(ctx) == "detect_q"
# Runner error short-circuit testdef test_runner_skips_stage_on_error(): """The runner routes error envelopes directly to respond_q. Stages never see {"ok": False} payloads.""" error_ctx = {"ok": False, "error": {"code": "PAD_INFERENCE_ERROR", "reason": "...", "stage": "pad"}} # Simulate: run_stage should route to respond_q without calling stage.fn ...Integration tests cover queue wiring. Benchmark tests cover per-stage latency targets:
| Stage | Target p99 latency |
|---|---|
| Receive | < 5 ms |
| PAD | < 30 ms (ViT-based unified model) |
| MAD | < 50 ms (CLIP+LoRA, ENROL only) |
| Detect | < 80 ms (CPU) |
| Align | < 2 ms |
| Assess Quality | < 15 ms (OFIQ, ISO 29794-5) |
| Extract | < 120 ms (CPU) |
| Search Executor | < 50 ms |
| Verify Executor | < 20 ms |
| Enrol Executor | < 30 ms |
| Delete Executor | < 10 ms |
| Respond | < 5 ms |
| End-to-end | < 350 ms p99 (SEARCH/VERIFY) |
| End-to-end | < 400 ms p99 (ENROL, includes MAD) |
What Is Not in This Document
Section titled “What Is Not in This Document”- FAISS shard internals -> see FAISS Design
- Inbound XML schema specifics -> see adapter documentation
- Infrastructure and deployment -> see
deploy/ - ISO 30107-3 PAD compliance metrics (APCER/BPCER/ACER) -> evaluated through NIST FATE PAD program
- ISO 29794-5 quality compliance -> OFIQ reference implementation; see Stage 4 (Quality) above