Skip to content

Pipeline Design

Design Philosophy: Rich Hickey’s “Simple Made Easy”


The pipeline is a linear composition of simple functions. Each stage:

  1. Takes plain data in, returns plain data out
  2. Is connected to the next stage via a queue (not a direct call)
  3. Has no shared mutable state with any other stage
  4. Returns errors as data (a Result/Either shape), never raises exceptions
  5. Can be tested, deployed, and scaled in complete isolation
  6. Receives only the keys it needs — the runner selects from the cumulative context

The queue between stages is the contract. The producer does not know who consumes. The consumer does not know who produced. This is the essence of decoupling order from processing.


All data flowing through the pipeline is plain values — no objects, no methods, no hidden state.

The runner maintains a cumulative context dict that accretes results as the message travels through the pipeline. Each stage declares which keys it requires (input) and which keys it produces (output). The runner selects the required keys from the context to build the stage’s payload, and merges the stage’s output back into the context.

This separation prevents stages from depending on keys they should not see. A stage cannot accidentally couple to upstream data it has no business reading.

# Runner context (managed by the runner, invisible to stages)
{
"trace_id": str, # UUID generated by the inbound adapter
"operation": str, # "SEARCH" | "VERIFY" | "ENROL" | "DELETE"
"subject_id": str | None,
"partition": str,
"filters": dict,
"top_k": int,
"threshold": float | None,
"received_at": str,
# Accumulated stage results (accreted, never overwritten):
"image_bytes": bytes | None,
"pad": dict | None,
"morphing": dict | None,
"detections": list | None,
"crop": bytes | None,
"quality": dict | None,
"template": dict | None,
"result": dict | None,
}
  • trace_id: A UUID generated by the inbound adapter (not by Receive). The runner propagates it unchanged across all stage boundaries. No stage function reads, writes, or forwards it — this is the runner’s responsibility.

When a stage succeeds, it returns a plain dict containing its produced keys. When a stage fails, it returns an error-as-data envelope:

# Success — stage returns a plain dict of its produced keys
<stage-output-dict>
# Failure (error as data, not exception)
{"ok": False, "error": {"code": str, "reason": str, "stage": str}}
  • The runner checks for error envelopes before calling the next stage. Stages never see {"ok": False} payloads. When the runner receives an error envelope from a stage, it short-circuits the remaining pipeline and routes the context directly to the Respond queue. This means stage functions never contain error pass-through logic — they can assume their input is always valid stage data.
  • The runner attaches trace_id to the context. The failure dict never contains trace_id — it is always on the outer context managed by the runner.

Operations are plain string discriminants, not polymorphic types:

"operation": "SEARCH" | "VERIFY" | "ENROL" | "DELETE"

Each stage boundary has a typed contract. Stages declare their requirements and productions via requires and produces on the Stage dataclass. For documentation and runtime validation, the schemas are also expressed as TypedDicts:

from typing import TypedDict
# --- Receive output ---
class ReceiveOut(TypedDict):
operation: str
image_bytes: bytes | None
subject_id: str | None
partition: str
filters: dict
top_k: int
threshold: float | None
received_at: str
# --- PAD output (what PAD produces, merged into context) ---
class PadOut(TypedDict):
pad: dict # {"spoof_score": float, "attack_type": str, "confidence": float}
# --- MAD output ---
class MadOut(TypedDict):
morphing: dict # {"morph_score": float, "confidence": float}
# --- Detect output ---
class DetectOut(TypedDict):
detections: list # [{bbox, score, landmarks}, ...]
# --- Align output ---
class AlignOut(TypedDict):
crop: bytes # 112x112 aligned face crop
# --- Quality output ---
class QualityOut(TypedDict):
quality: dict # {"score": float, "blur": float, ...}
# --- Extract output ---
class ExtractOut(TypedDict):
template: dict # {"vector": [float32 x 512], "model_id": str}
# --- Executor outputs ---
class SearchResultOut(TypedDict):
operation: str
result: dict
class VerifyResultOut(TypedDict):
operation: str
result: dict
class EnrolResultOut(TypedDict):
operation: str
result: dict
class DeleteResultOut(TypedDict):
operation: str
result: dict
# --- Respond output ---
class RespondOut(TypedDict):
http_status: int
response_body: bytes
content_type: str
latency_ms: float

The runner uses requires and produces on each Stage to select and merge keys. The TypedDicts above are the reference contracts — they can also serve as runtime validators in development mode (e.g., via TypeAdapter from pydantic or a simple key-set assertion).

Accretion, not replacement: Each stage adds keys to the context. No stage overwrites another stage’s keys (with the deliberate exception of image_bytes being dropped by Align). The context grows monotonically.

Stages may add new keys to their produced dicts but must never rename or remove existing keys. This follows the “provide more, require less” principle: downstream consumers that depend on a key will always find it, and new keys are invisible to stages that do not declare them in requires. Adding a key is a compatible change; renaming or removing is a breaking change.

The TypedDict schemas above are maintained manually for documentation, but in implementation the canonical source of truth is the requires and produces declarations on each Stage. A build-time or startup-time check can derive the expected key sets from the Stage list and validate that the TypedDicts remain consistent, eliminating redundant specification drift.


Purpose: Parse the raw inbound XML payload into plain Python values. All XML parsing complexity is contained here and nowhere else. The Receive stage does not generate trace_id — the inbound adapter or runner does that before the message enters the pipeline.

Requires: raw_payload, source_ip, received_at

Produces: operation, image_bytes, subject_id, partition, filters, top_k, threshold, received_at

Input (from inbound adapter / HTTP POST body):

{
"raw_payload": bytes, # Raw inbound XML body
"source_ip": str, # For audit log only
"received_at": str # ISO-8601 timestamp (UTC)
}

Output (on success):

{
"operation": str, # "SEARCH" | "VERIFY" | "ENROL" | "DELETE"
"image_bytes": bytes | None, # Raw JPEG/PNG, present for SEARCH/VERIFY/ENROL
"subject_id": str | None, # Present for VERIFY/DELETE
"partition": str, # Gallery partition ID e.g. "IABS", "IDENT1"
"filters": dict, # Metadata filters: {"gender": "M", ...} or {}
"top_k": int, # For SEARCH: max candidates (default 50)
"threshold": float | None, # Optional override; None means use system default
"received_at": str # Propagated from input
}

The inbound adapter generates a trace_id UUID and places it on the runner context before the message reaches Receive. Receive is purely XML parsing — it has no transport concerns.

Output (on failure):

{
"ok": False,
"error": {
"code": "PARSE_ERROR" | "SCHEMA_VIOLATION" | "UNSUPPORTED_OPERATION",
"reason": str,
"stage": "receive"
}
}

Error handling:

  • Malformed XML -> PARSE_ERROR
  • Missing required fields -> SCHEMA_VIOLATION
  • Unknown operation type -> UNSUPPORTED_OPERATION
  • No exception propagates beyond this function boundary

Routing (embedded in Stage):

  • Successes: the Stage’s gate routes DELETE operations directly to the Route queue (bypassing PAD/MAD/Detect/Align/Quality/Extract) and all other operations to the PAD queue
  • Failures: the runner short-circuits to the Respond queue

Stage 1.5: PAD (Presentation Attack Detection)

Section titled “Stage 1.5: PAD (Presentation Attack Detection)”

Purpose: Run unified physical+digital attack detection on the raw image before any other processing. Produces a spoof score as data. Does not make the rejection decision — that is a configurable gate on the Stage, applied by the runner after this function returns.

Covers: print attacks, replay attacks, 3D masks, deepfakes, face swaps, GAN-generated faces. Uses a ViT-based unified model (e.g., S-Adapter or UniAttack approach) exported via ONNX.

Requires: image_bytes

Produces: pad

Input (payload selected by runner):

{
"image_bytes": bytes # Raw image
}

Output (on success):

{
"pad": {
"spoof_score": float, # [0.0, 1.0]; higher = more likely spoof
"attack_type": str, # e.g. "print", "replay", "deepfake", "live"
"confidence": float # Model confidence in classification [0.0, 1.0]
}
}

Output (on failure):

{
"ok": False,
"error": {
"code": "PAD_INFERENCE_ERROR",
"reason": str,
"stage": "pad"
}
}

Error handling:

  • ONNX Runtime fault -> PAD_INFERENCE_ERROR
  • The stage function always returns the spoof score as data if inference succeeds
  • PAD_REJECTED is emitted by the gate on the PAD Stage (see below), not by the stage function itself

Gate (configurable policy, embedded in Stage):

The gate is a pure routing predicate applied by the runner after the stage function returns. It reads from the runner context (which now includes the PAD result):

from functools import partial
def pad_gate(spoof_score_threshold: float, ctx: dict) -> str:
"""Single-concern predicate: spoof score above threshold?
Config injected via partial. Does NOT inspect operation type."""
if ctx.get("pad", {}).get("spoof_score", 0.0) > spoof_score_threshold:
return "respond_q" # Runner emits PAD_REJECTED error envelope
return "enrol_router_q"
# Construction: config values bound at startup, not closed over
pad_gate_configured = partial(pad_gate, spoof_score_threshold=0.85)

The threshold is injected via functools.partial at construction time. No global config dict is captured in a closure. Changing the threshold requires only changing the construction site.

After PAD passes, the context is routed to a separate enrol_router stage that handles operation-type branching. This keeps pad_gate as a single-concern predicate (spoof detection) without entangling it with operation routing logic.

def enrol_router(ctx: dict) -> str:
"""Routes ENROL operations to MAD, all others to Detect."""
if ctx["operation"] == "ENROL":
return "mad_q"
return "detect_q"

Routing (PAD + Enrol Router combined):

  • Spoof score > threshold -> Respond queue (runner wraps as PAD_REJECTED error envelope)
  • Spoof score <= threshold, ENROL operation -> MAD queue (via enrol_router)
  • Spoof score <= threshold, all other operations -> Detect queue (via enrol_router)

Stage 1.6: MAD (Morphing Attack Detection)

Section titled “Stage 1.6: MAD (Morphing Attack Detection)”

Purpose: Detect morphing attacks in enrollment images. Morphing attacks blend two identities into a single passport/ID photo; they are the primary document-based attack vector in border control contexts. This stage runs only for ENROL operations.

Uses a CLIP+LoRA foundation model approach (e.g., MADation) for strong cross-domain generalisation. Exported via ONNX.

Requires: image_bytes

Produces: morphing

Input (payload selected by runner):

{
"image_bytes": bytes
}

Output (on success):

{
"morphing": {
"morph_score": float, # [0.0, 1.0]; higher = more likely morphed
"confidence": float # Model confidence [0.0, 1.0]
}
}

Output (on failure):

{
"ok": False,
"error": {
"code": "MAD_INFERENCE_ERROR",
"reason": str,
"stage": "mad"
}
}

Error handling:

  • ONNX Runtime fault -> MAD_INFERENCE_ERROR
  • MORPHING_DETECTED is emitted by the gate on the MAD Stage, not by the stage function itself
  • The stage function always returns the morph score as data if inference succeeds

Gate (configurable policy, embedded in Stage):

from functools import partial
def mad_gate(morph_score_threshold: float, ctx: dict) -> str:
if ctx.get("morphing", {}).get("morph_score", 0.0) > morph_score_threshold:
return "respond_q" # Runner emits MORPHING_DETECTED error envelope
return "detect_q"
mad_gate_configured = partial(mad_gate, morph_score_threshold=0.75)

Routing:

  • Morph score <= threshold -> Detect queue
  • Morph score > threshold -> Respond queue (runner wraps as MORPHING_DETECTED error envelope)

Purpose: Run face detection on the raw image via DetectionPort.detect(). Produce face bounding boxes, five-point landmarks, and confidence scores. Nothing more.

Note: SCRFD_10G (InsightFace) replaces RetinaFace as the detection model. It achieves better accuracy on the WiderFace Hard split (~1% gain on occluded/small faces) using the same ONNX export path and 5-point landmark format. The DetectionPort adapter wraps the ONNX model; the stage function calls DetectionPort.detect(image_bytes) and translates the returned list[Detection] domain types into pipeline dicts.

Requires: image_bytes

Produces: detections

Input (payload selected by runner):

{
"image_bytes": bytes # Raw image data
}

Output (on success):

{
"detections": [ # List of detected faces, ordered by confidence desc
{
"bbox": [x1, y1, x2, y2], # Normalised to [0, 1], float32
"score": float, # Detection confidence [0.0, 1.0]
"landmarks": [ # Five landmarks: left_eye, right_eye, nose,
[x, y], # mouth_left, mouth_right
[x, y],
[x, y],
[x, y],
[x, y]
]
}
]
}

Output (on failure):

{
"ok": False,
"error": {
"code": "NO_FACE_DETECTED" | "MULTIPLE_FACES" | "INFERENCE_ERROR",
"reason": str,
"stage": "detect"
}
}

Error handling:

  • Zero faces found -> NO_FACE_DETECTED
  • More than one face above confidence threshold (configurable, default 0.85) -> MULTIPLE_FACES (policy: reject ambiguous inputs; the caller must provide a single-face image)
  • ONNX Runtime fault -> INFERENCE_ERROR

Routing: Detect -> Align queue (fixed queue_out)

Scaling note: This stage is CPU/GPU-bound. Scale horizontally by adding more Detect workers consuming from the same queue.


Purpose: Apply an affine similarity transform to produce a single, normalised 112x112 pixel crop. All spatial variation is eliminated here.

Requires: image_bytes, detections

Produces: crop

Input (payload selected by runner):

{
"image_bytes": bytes,
"detections": [ ... ] # From Detect stage
}

Output (on success):

{
"crop": bytes # PNG-encoded 112x112 aligned face crop
}

After Align produces its output, the runner drops image_bytes from the context. This enforces the “no raw imagery persisted” policy structurally, not procedurally. Only the 112x112 crop continues downstream.

Output (on failure):

{
"ok": False,
"error": {
"code": "ALIGNMENT_FAILED",
"reason": str,
"stage": "align"
}
}

Error handling:

  • Degenerate landmark geometry (colinear points, out-of-bounds) -> ALIGNMENT_FAILED
  • The function selects the highest-confidence detection from the detections list. Multi-face rejection is already handled by Detect.

Privacy note: image_bytes (the full original image) is explicitly dropped from the context at this stage boundary. The runner handles this via a drops declaration on the Stage dataclass.

Routing: Align -> Quality queue (fixed queue_out)


Purpose: Score the aligned crop for biometric quality. The stage produces a quality score as data. The rejection decision is made by the gate on this Stage, not by the stage function itself.

Reference implementation: OFIQ (Open Source Face Image Quality), the BSI/eu-LISA reference implementation for ISO/IEC 29794-5. OFIQ is the mandatory quality standard for European border control and identity systems. It is a pure measurement function that satisfies the QualityPort protocol.

Requires: crop

Produces: quality

Input (payload selected by runner):

{
"crop": bytes # 112x112 aligned face crop
}

Output (on success):

{
"quality": {
"score": float, # Overall quality [0.0, 1.0] per ISO 29794-5
"blur": float, # Component scores for diagnostics
"illumination": float,
"pose_yaw": float, # Degrees
"pose_pitch": float,
"pose_roll": float
}
}

Output (on failure):

{
"ok": False,
"error": {
"code": "INFERENCE_ERROR",
"reason": str,
"stage": "quality"
}
}

Error handling:

  • ONNX Runtime fault -> INFERENCE_ERROR
  • Quality rejection (QUALITY_REJECTED) is handled by the gate, not the stage function

Gate (configurable policy, embedded in Stage):

from functools import partial
def quality_gate(min_score: float, ctx: dict) -> str:
if ctx.get("quality", {}).get("score", 0.0) < min_score:
return "respond_q" # Runner emits QUALITY_REJECTED error envelope
return "extract_q"
quality_gate_configured = partial(quality_gate, min_score=0.40)

When the gate routes to respond_q, the runner constructs the error envelope including the full quality dict for diagnostics:

{
"ok": False,
"error": {
"code": "QUALITY_REJECTED",
"reason": str, # e.g. "blur=0.12 below threshold=0.40"
"stage": "quality",
"quality": { ... } # Full quality dict included for diagnostics/feedback
}
}

Routing:

  • Quality score >= threshold -> Extract queue
  • Quality score < threshold -> Respond queue (as QUALITY_REJECTED error envelope)

Purpose: Run the ViT model on the aligned crop. Produce the 512-dimensional, L2-normalised float32 template vector. This is the most compute-intensive stage.

Requires: crop

Produces: template

Input (payload selected by runner):

{
"crop": bytes # 112x112 aligned face crop
}

Output (on success):

{
"template": {
"vector": [float32 x 512], # L2-normalised; norm == 1.0 is an invariant
"model_id": str # e.g. "vit-base-adaface-v1.0"
}
}

After Extract, the runner drops crop from the context. No raw imagery remains.

Output (on failure):

{
"ok": False,
"error": {
"code": "EXTRACTION_FAILED" | "INFERENCE_ERROR",
"reason": str,
"stage": "extract"
}
}

Error handling:

  • Output vector norm deviates from 1.0 by more than 1e-5 -> EXTRACTION_FAILED (invariant violation; the function self-validates its output)
  • ONNX Runtime fault -> INFERENCE_ERROR
  • model_id is stamped on every template so future schema migrations are traceable

Routing: Extract -> route_q (fixed queue_out)


Purpose: A pure routing gate embedded in the Stage dataclass. It inspects the operation discriminant and returns the target queue name. It does not execute any FAISS operations — it only decides where the message goes next. This is the pipeline’s branching point into operation-specific executor stages.

DELETE operations arrive here directly from Receive (bypassing the image pipeline). SEARCH, VERIFY, and ENROL arrive after extraction.

The router is implemented as a gate on the Stage:

def operation_router(ctx: dict) -> str:
"""Pure routing function. Returns the name of the target queue."""
return {
"SEARCH": "search_q",
"VERIFY": "verify_q",
"ENROL": "enrol_q",
"DELETE": "delete_q",
}[ctx["operation"]]

Routing: route_q -> search_q | verify_q | enrol_q | delete_q


Purpose: Execute a 1:N FAISS search. Vector in, ranked candidates out. This is the boundary where pipeline dicts become domain types (see “Translation Boundary” below).

Requires: operation, partition, filters, top_k, threshold, template

Produces: operation, result

Input (payload selected by runner):

{
"operation": "SEARCH",
"partition": str,
"filters": dict,
"top_k": int,
"threshold": float | None,
"template": { "vector": [...], "model_id": str }
}

Output (success):

{
"operation": "SEARCH",
"result": {
"candidates": [
{
"subject_id": str,
"score": float, # Inner product similarity [0.0, 1.0]
"rank": int # 1-indexed
}
],
"searched_partition": str,
"filters_applied": dict,
"shards_queried": int,
"latency_ms": float
}
}

Error codes: SHARD_UNAVAILABLE | FAISS_ERROR

Routing: search_q -> respond_q (fixed queue_out)


Purpose: Execute a 1:1 FAISS comparison. Probe template vs enrolled template for a given subject_id. Returns the score and a threshold-based decision as data.

Requires: operation, subject_id, partition, threshold, template

Produces: operation, result

Input (payload selected by runner):

{
"operation": "VERIFY",
"subject_id": str,
"partition": str,
"threshold": float | None,
"template": { "vector": [...], "model_id": str }
}

Output (success):

{
"operation": "VERIFY",
"result": {
"subject_id": str,
"score": float,
"decision": "MATCH" | "NON_MATCH", # Applied threshold decision as data
"threshold_used": float
}
}

Error codes: SUBJECT_NOT_FOUND | SHARD_UNAVAILABLE | FAISS_ERROR

Routing: verify_q -> respond_q (fixed queue_out)


Purpose: Add a new template vector to the FAISS index. Returns the assigned shard and vector index.

Requires: operation, subject_id, partition, template

Produces: operation, result

Input (payload selected by runner):

{
"operation": "ENROL",
"subject_id": str | None,
"partition": str,
"template": { "vector": [...], "model_id": str }
}

Output (success):

{
"operation": "ENROL",
"result": {
"subject_id": str, # Assigned or confirmed
"shard_id": str, # Which shard holds this vector
"vector_index": int # FAISS internal ID, for Delete
}
}

Error codes: DUPLICATE_ENROL | SHARD_UNAVAILABLE | FAISS_ERROR

Routing: enrol_q -> respond_q (fixed queue_out)


Purpose: Remove a subject’s template from the FAISS index. Arrives directly from Receive (no image processing needed).

Requires: operation, subject_id, partition

Produces: operation, result

Input (payload selected by runner):

{
"operation": "DELETE",
"subject_id": str,
"partition": str
}

Output (success):

{
"operation": "DELETE",
"result": {
"subject_id": str,
"deleted": True
}
}

Error codes: SUBJECT_NOT_FOUND | FAISS_ERROR

Routing: delete_q -> respond_q (fixed queue_out)


Shared executor error handling:

  • SEARCH: if fewer than min_shards (configurable) respond within timeout -> SHARD_UNAVAILABLE (partial results are not surfaced; a degraded answer is worse than an honest error in a law enforcement context)
  • DELETE for unknown subject_id -> SUBJECT_NOT_FOUND
  • ENROL for already-present subject_id -> DUPLICATE_ENROL (idempotent enrol is a separate operation flag, not the default)

All executor error outputs follow the standard envelope:

{
"ok": False,
"error": {
"code": str,
"reason": str,
"stage": "search" | "verify" | "enrol" | "delete"
}
}

Purpose: Format the plain result dict back into the outbound XML response format. All serialisation complexity is contained here and nowhere else.

Requires: operation, received_at (always present); result, error, ok (present depending on success/error path)

Produces: http_status, response_body, content_type, latency_ms

The Respond stage handles both success and error paths. On the success path, result is present and ok/error are absent. On the error path (runner short-circuit or gate rejection), ok is False and error contains the error envelope. The runner selects only the keys that exist in the context (the if k in ctx guard in payload construction), so Respond receives whichever subset is available.

The runner injects trace_id into the Respond stage’s payload via the inject_envelope_keys declaration on the Stage dataclass. This is the only stage boundary where envelope keys cross into the payload — and it is explicit in the pipeline definition, not hidden in runner logic.

Input (payload selected by runner, with trace_id injected):

{
"trace_id": str, # Injected by runner via inject_envelope_keys
"operation": str, # Always present
"received_at": str, # Always present, for latency calculation
"ok": bool | None, # Present on error path (False); absent on success
"result": dict | None, # Present on success path
"error": dict | None # Present on error path
}

The Respond function checks ok to decide between success and error response construction:

async def respond(payload: dict) -> dict:
if payload.get("ok") is False:
# Error path: format error response from payload["error"]
return build_error_response(payload)
else:
# Success path: format result response from payload["result"]
return build_success_response(payload)

Output:

{
"http_status": int, # 200 | 400 | 422 | 500 | 503
"response_body": bytes, # XML-encoded response (includes trace_id)
"content_type": str, # "application/xml"
"latency_ms": float # End-to-end: received_at -> now
}

Error handling:

  • All failure envelopes from any prior stage are mapped to appropriate error response codes here
  • Serialisation fault -> 500 with a minimal valid error XML

Routing: Respond -> output (HTTP response / gRPC reply)

This stage is the only place that knows about the XML response format. No other stage touches serialisation.


Inbound (adapter generates trace_id, builds initial context)
|
v
[receive_q] --> receive --> gate --> (DELETE) --------------------------------> [route_q]
\-> (others) --> [pad_q]
|
v
pad --> gate --> (PAD_REJECTED) -----> [respond_q]
\-> (passed) ---------> [enrol_router_q]
|
v
enrol_router --> (ENROL) --> [mad_q]
\-> (others) -> [detect_q]
|
[mad_q] (ENROL only) |
| |
v |
mad --> gate --> (MORPHING_DETECTED) -----------> [respond_q]
\-> (clean) ----------------------> [detect_q]
|
v
detect (fixed -> align_q)
|
v
align (fixed -> quality_q)
|
v
quality --> gate --> (REJECTED) -> [respond_q]
\-> (accepted) -> [extract_q]
|
v
extract (fixed -> route_q)
|
v
[route_q] --> gate --> [search_q] --> search_executor (fixed -> respond_q)
\-> [verify_q] --> verify_executor (fixed -> respond_q)
\-> [enrol_q] --> enrol_executor (fixed -> respond_q)
\-> [delete_q] --> delete_executor (fixed -> respond_q)
[respond_q] --> respond (runner injects trace_id) --> Outbound (HTTP/gRPC)
Runner short-circuit: any stage error envelope --> [respond_q] (no further stages called)

Queue implementation:

  • In-process: asyncio.Queue (single-host development / small deployments)
  • Production: Redis Streams or Kafka topics (one topic per stage boundary)
  • The queue abstraction is a port. Swapping implementations requires no changes to any stage function.

Extract is the pipeline bottleneck at ~120ms p99. At 1,900 searches/min (~32/sec), each queue must absorb burst traffic without unbounded growth. Bounded queues enforce backpressure: producers block when the downstream queue is full, propagating load signals upstream to the inbound adapter.

Queue size defaults:

QueuemaxsizeRationale
receive_q128Inbound buffer; sized for ~4s burst at peak rate
pad_q64PAD at 30ms can drain ~33/sec; 2s buffer
mad_q16ENROL-only path; low traffic
detect_q64Detect at 80ms can drain ~12/sec; 5s buffer
align_q64Align at 2ms is never the bottleneck
quality_q64Quality at 15ms drains fast
extract_q32Bottleneck stage; smaller buffer to propagate backpressure quickly
enrol_router_q64Pure routing after PAD, near-instant
route_q64Pure routing, near-instant
search_q32FAISS search at 50ms; ~1s buffer
verify_q32Similar to search
enrol_q16Lower volume than search
delete_q16Lowest volume path
respond_q128Must not drop responses; generous buffer

Saturation behavior:

  • Development (asyncio.Queue): asyncio.Queue(maxsize=N)put() awaits (non-blocking coroutine yield) when the queue is full. The producing stage’s runner loop naturally pauses until space is available.
  • Production (Redis Streams): XADD with MAXLEN ~N provides approximate trimming. For strict backpressure, the adapter checks XLEN before submitting; if at capacity, it rejects the request.
  • Inbound adapter under saturation: When receive_q is full, the inbound adapter returns HTTP 503 Service Unavailable (REST) or gRPC RESOURCE_EXHAUSTED (gRPC). The upstream biometric provider is expected to retry with exponential backoff. This is an honest signal — a degraded acceptance is worse than a transparent rejection.

Routing is embedded in the Stage dataclass. A stage either has a fixed queue_out (unconditional routing) or a gate callable (dynamic routing). There is no separate Gate type — gates are attributes of stages.

from collections.abc import Awaitable
from dataclasses import dataclass
from typing import Callable
from functools import partial
@dataclass(frozen=True)
class Stage:
name: str
queue_in: str
fn: Callable[[dict], Awaitable[dict]] # All stage functions are async
requires: frozenset[str] # Keys selected from context for payload
produces: frozenset[str] # Keys merged back into context
queue_out: str | None = None # Fixed routing (mutually exclusive with gate)
gate: Callable[[dict], str] | None = None # Dynamic routing
drops: frozenset[str] = frozenset() # Keys removed from context after this stage
inject_envelope_keys: frozenset[str] = frozenset() # Envelope keys injected into payload (e.g. trace_id for Respond)

All stage functions are async. Pure computation stages (e.g., receive, align) use async def for type uniformity — they don’t need to await anything internally but conform to the single await stage.fn(payload) calling convention used by the runner. I/O-bound stages (e.g., executors calling orchestrators, PAD/MAD calling ONNX) naturally use await for their async operations.

Invariant: Exactly one of queue_out or gate must be set. The runner validates this at startup.

Gates receive config values as arguments via functools.partial at construction time. No gate captures a global config dict in a closure. This makes dependencies visible and testable:

# --- Gate functions: config params are explicit arguments ---
# pad_gate is a single-concern predicate: does the spoof score exceed the threshold?
# Routing by operation type is a separate concern handled by enrol_router.
def pad_gate(spoof_score_threshold: float, ctx: dict) -> str:
if ctx.get("pad", {}).get("spoof_score", 0.0) > spoof_score_threshold:
return "respond_q"
return "enrol_router_q"
def enrol_router(ctx: dict) -> str:
"""Routes ENROL operations to MAD, all others to Detect."""
if ctx["operation"] == "ENROL":
return "mad_q"
return "detect_q"
def mad_gate(morph_score_threshold: float, ctx: dict) -> str:
if ctx.get("morphing", {}).get("morph_score", 0.0) > morph_score_threshold:
return "respond_q"
return "detect_q"
def quality_gate(min_score: float, ctx: dict) -> str:
if ctx.get("quality", {}).get("score", 0.0) < min_score:
return "respond_q"
return "extract_q"
def receive_gate(ctx: dict) -> str:
if ctx["operation"] == "DELETE":
return "route_q"
return "pad_q"
def operation_router(ctx: dict) -> str:
return {
"SEARCH": "search_q",
"VERIFY": "verify_q",
"ENROL": "enrol_q",
"DELETE": "delete_q",
}[ctx["operation"]]
# Each stage is an async function: dict -> Awaitable[dict]
# Pure computation stages use async def for type uniformity.
async def receive(payload: dict) -> dict: ...
async def pad(payload: dict) -> dict: ...
async def mad(payload: dict) -> dict: ...
async def detect(payload: dict) -> dict: ...
async def align(payload: dict) -> dict: ...
async def assess_quality(payload: dict) -> dict: ...
async def extract(payload: dict) -> dict: ...
async def search_executor(payload: dict) -> dict: ...
async def verify_executor(payload: dict) -> dict: ...
async def enrol_executor(payload: dict) -> dict: ...
async def delete_executor(payload: dict) -> dict: ...
async def respond(payload: dict) -> dict: ...
async def _identity(payload: dict) -> dict:
"""Async identity function for routing-only stages."""
return payload
# Construction: bind config values at startup
cfg = load_config()
PIPELINE = [
Stage(
name="receive", queue_in="receive_q", fn=receive,
requires=frozenset({"raw_payload", "source_ip", "received_at"}),
produces=frozenset({"operation", "image_bytes", "subject_id", "partition", "filters", "top_k", "threshold", "received_at"}),
gate=receive_gate,
),
Stage(
name="pad", queue_in="pad_q", fn=pad,
requires=frozenset({"image_bytes"}),
produces=frozenset({"pad"}),
gate=partial(pad_gate, spoof_score_threshold=cfg["pad"]["spoof_score_threshold"]),
),
Stage(
name="enrol_router", queue_in="enrol_router_q", fn=_identity, # async identity; routing only
requires=frozenset({"operation"}),
produces=frozenset(),
gate=enrol_router,
),
Stage(
name="mad", queue_in="mad_q", fn=mad,
requires=frozenset({"image_bytes"}),
produces=frozenset({"morphing"}),
gate=partial(mad_gate, morph_score_threshold=cfg["mad"]["morph_score_threshold"]),
),
Stage(
name="detect", queue_in="detect_q", fn=detect,
requires=frozenset({"image_bytes"}),
produces=frozenset({"detections"}),
queue_out="align_q",
),
Stage(
name="align", queue_in="align_q", fn=align,
requires=frozenset({"image_bytes", "detections"}),
produces=frozenset({"crop"}),
queue_out="quality_q",
drops=frozenset({"image_bytes"}),
),
Stage(
name="quality", queue_in="quality_q", fn=assess_quality,
requires=frozenset({"crop"}),
produces=frozenset({"quality"}),
gate=partial(quality_gate, min_score=cfg["quality"]["min_score"]),
),
Stage(
name="extract", queue_in="extract_q", fn=extract,
requires=frozenset({"crop"}),
produces=frozenset({"template"}),
queue_out="route_q",
drops=frozenset({"crop"}),
),
Stage(
name="route", queue_in="route_q", fn=_identity, # async identity; routing only
requires=frozenset({"operation"}),
produces=frozenset(),
gate=operation_router,
),
Stage(
name="search", queue_in="search_q", fn=search_executor,
requires=frozenset({"operation", "partition", "filters", "top_k", "threshold", "template"}),
produces=frozenset({"operation", "result"}),
queue_out="respond_q",
),
Stage(
name="verify", queue_in="verify_q", fn=verify_executor,
requires=frozenset({"operation", "subject_id", "partition", "threshold", "template"}),
produces=frozenset({"operation", "result"}),
queue_out="respond_q",
),
Stage(
name="enrol", queue_in="enrol_q", fn=enrol_executor,
requires=frozenset({"operation", "subject_id", "partition", "template"}),
produces=frozenset({"operation", "result"}),
queue_out="respond_q",
),
Stage(
name="delete", queue_in="delete_q", fn=delete_executor,
requires=frozenset({"operation", "subject_id", "partition"}),
produces=frozenset({"operation", "result"}),
queue_out="respond_q",
),
Stage(
name="respond", queue_in="respond_q", fn=respond,
requires=frozenset({"operation", "received_at", "result", "error", "ok"}),
produces=frozenset({"http_status", "response_body", "content_type", "latency_ms"}),
queue_out="output_q",
inject_envelope_keys=frozenset({"trace_id"}),
),
]

The runner manages the context, selects payloads, merges outputs, checks for errors, and handles routing. Stages are oblivious to all of this.

The context is treated as a value: every mutation produces a new dict rather than modifying the existing one in place. This follows the “values over state” principle — a context dict, once placed on a queue, is never modified by the producer. Each stage boundary yields a fresh context value.

async def run_stage(stage: Stage, queues: dict[str, asyncio.Queue]):
async for ctx in queues[stage.queue_in]:
# --- Error short-circuit: runner checks BEFORE calling stage ---
if ctx.get("ok") is False:
await queues["respond_q"].put(ctx)
continue
# --- Build payload: select only the keys this stage requires ---
payload = {k: ctx[k] for k in stage.requires if k in ctx}
# --- Inject envelope keys if declared (e.g. trace_id for Respond) ---
for key in stage.inject_envelope_keys:
payload[key] = ctx[key]
# --- Call stage function (all stages are async) ---
output = await stage.fn(payload)
# --- Check for stage error ---
if output.get("ok") is False:
# New context value with error — original ctx is not mutated
ctx = {**ctx, "error": output["error"], "ok": False}
await queues["respond_q"].put(ctx)
continue
# --- Accrete output into context (new dict, not in-place mutation) ---
ctx = {**ctx, **output}
# --- Drop keys if declared (e.g. image_bytes after Align) ---
if stage.drops:
ctx = {k: v for k, v in ctx.items() if k not in stage.drops}
# --- Route to next queue ---
if stage.gate is not None:
target = stage.gate(ctx)
await queues[target].put(ctx)
elif stage.queue_out is not None:
await queues[stage.queue_out].put(ctx)
# Boot: start one (or N) worker tasks per stage
for stage in PIPELINE:
asyncio.create_task(run_stage(stage, queues))

Each stage function is a pure function from payload dict to output dict. The runner manages the context (selecting keys, accreting results, dropping keys, checking errors, routing). The pipeline definition is a plain data structure. None of these concerns are entangled with each other. The value-oriented context means that no stage can observe the side effects of another stage’s runner logic — the dict on the queue is always a fresh snapshot.


Translation Boundary: Pipeline Dicts to Domain Types

Section titled “Translation Boundary: Pipeline Dicts to Domain Types”

The executor stages (search_executor, verify_executor, enrol_executor, delete_executor) are the translation boundary between the dict-oriented pipeline and the typed domain layer.

Upstream of the executors, all data flows as plain dicts — no domain types, no imports from core.domain. This keeps the pipeline stages simple, testable, and decoupled from the domain model.

Inside each executor, the incoming payload dict is translated into domain types (e.g., EmbeddingVector, SearchRequest, VerifyRequest) before calling into the orchestrator. The orchestrator response is then translated back into a plain dict for the pipeline’s Respond stage.

Executor stage functions need access to their orchestrator instance, but the pipeline Stage.fn signature is Callable[[dict], Awaitable[dict]] — a stage function receives only its payload. The orchestrator dependency is injected at construction time via functools.partial, consistent with how gates receive their config:

from functools import partial
from dataclasses import asdict
import numpy as np
from core.domain.types import EmbeddingVector, SearchRequest, BiometricTemplate
from core.orchestration.search import SearchOrchestrator
async def search_executor(orchestrator: SearchOrchestrator, payload: dict) -> dict:
"""Translation boundary: dict -> domain types -> orchestrator -> dict.
The orchestrator parameter is bound at construction via partial.
At runtime the runner calls search_executor(payload) — the orchestrator
is already captured in the closure.
"""
# --- Translation boundary: dict -> domain types ---
embedding = EmbeddingVector(
vector=np.array(payload["template"]["vector"], dtype=np.float32),
model_id=payload["template"]["model_id"],
expected_dim=512,
)
request = SearchRequest(
# ... build typed request from payload dict
)
# --- Domain call (typed, via injected orchestrator) ---
result = await orchestrator.search(request)
# --- Translation boundary: domain types -> dict ---
return {
"operation": "SEARCH",
"result": {
"candidates": [asdict(c) for c in result.candidates],
"searched_partition": str(request.partitions),
"hit": result.hit,
}
}
# --- Construction: bind orchestrator at startup, same pattern as gates ---
search_orch = SearchOrchestrator(vectors=faiss_adapter, clock=clock_adapter)
search_executor_fn = partial(search_executor, search_orch)
# The Stage receives the partially-applied function:
# Stage(name="search", ..., fn=search_executor_fn, ...)

The same pattern applies to all four executors:

verify_executor_fn = partial(verify_executor, verify_orch)
enrol_executor_fn = partial(enrol_executor, enrol_orch)
delete_executor_fn = partial(delete_executor, delete_orch)

This pattern ensures:

  • Pipeline stages (Receive through Extract) never import domain types. They operate on plain dicts and can be tested with dict literals.
  • Domain orchestrators (in core.orchestration) never see pipeline dicts. They receive typed, validated domain objects.
  • The boundary is explicit and narrow. Each executor is a thin adapter — it does not contain business logic. The logic lives in the orchestrators.
  • Dependencies are visible. The orchestrator is an explicit argument bound via partial, not a hidden global or module-level import. Testing an executor with a stub orchestrator is trivial.

The executor stages (search, verify, enrol, delete) translate pipeline dicts into domain types and delegate to orchestrators that compose port calls with pure domain functions. The orchestrators live in src/core/orchestration/ and form the bridge between the dict-oriented pipeline and the typed domain layer.

Canonical definition: The full orchestrator implementations, port splits, and design rationale are in domain-design.md, Section 4: Service Orchestrators. This section summarises only the port wiring relevant to pipeline construction.

Each orchestrator receives only the split ports it needs — no god-object that holds every adapter:

OrchestratorPorts injected
SearchOrchestratorVectorSearchPort, ClockPort
VerifyOrchestratorVectorLookupPort, ClockPort
EnrolOrchestratorVectorMutationPort, ClockPort
DeleteOrchestratorVectorMutationPort, ClockPort

A single FAISS adapter may implement all three vector ports. That is fine — the adapter composes; the domain does not know.

Orchestrator instances are constructed at startup with their port dependencies, then injected into executor stage functions via functools.partial (see “Translation Boundary” above for the full pattern):

from functools import partial
from core.orchestration.search import SearchOrchestrator
from core.orchestration.verify import VerifyOrchestrator
from core.orchestration.enrol import EnrolOrchestrator
from core.orchestration.delete import DeleteOrchestrator
# --- Construct orchestrators with real adapters ---
search_orch = SearchOrchestrator(vectors=faiss_adapter, clock=clock_adapter)
verify_orch = VerifyOrchestrator(lookup=faiss_adapter, clock=clock_adapter)
enrol_orch = EnrolOrchestrator(mutations=faiss_adapter, clock=clock_adapter)
delete_orch = DeleteOrchestrator(mutations=faiss_adapter, clock=clock_adapter)
# --- Bind into stage functions ---
search_executor_fn = partial(search_executor, search_orch)
verify_executor_fn = partial(verify_executor, verify_orch)
enrol_executor_fn = partial(enrol_executor, enrol_orch)
delete_executor_fn = partial(delete_executor, delete_orch)

These partially-applied functions satisfy the Callable[[dict], Awaitable[dict]] contract expected by Stage.fn.


Because each stage is a pure function that receives only the keys it needs, unit testing is focused and minimal:

# Stage 3 (Align) unit test — no ONNX, no queue, no HTTP
# The stage receives ONLY image_bytes and detections (its requires set)
def test_align_produces_crop():
payload = {
"image_bytes": b"<fake raw image>",
"detections": [{
"bbox": [10, 10, 110, 110],
"score": 0.97,
"landmarks": [[30,40],[80,40],[55,65],[35,90],[75,90]]
}],
}
result = align(payload)
assert "crop" in result # Output invariant
assert "image_bytes" not in result # Stage only produces what it declares
assert "trace_id" not in result # trace_id is on the runner context, not the payload
def test_align_returns_error_as_data_on_bad_landmarks():
payload = {
"image_bytes": b"<fake>",
"detections": [{"bbox": [0,0,10,10], "score": 0.9,
"landmarks": [[0,0],[0,0],[0,0],[0,0],[0,0]]}],
}
result = align(payload)
assert result["ok"] is False
assert result["error"]["stage"] == "align"
assert result["error"]["code"] == "ALIGNMENT_FAILED"
# Gate unit test — pad_gate is a single-concern spoof predicate
def test_pad_gate_rejects_spoof():
ctx = {"pad": {"spoof_score": 0.95}, "operation": "SEARCH"}
assert pad_gate(spoof_score_threshold=0.85, ctx=ctx) == "respond_q"
def test_pad_gate_passes_to_enrol_router():
ctx = {"pad": {"spoof_score": 0.10}, "operation": "ENROL"}
assert pad_gate(spoof_score_threshold=0.85, ctx=ctx) == "enrol_router_q"
# enrol_router handles operation-type branching (separate concern from spoof detection)
def test_enrol_router_routes_enrol_to_mad():
ctx = {"operation": "ENROL"}
assert enrol_router(ctx) == "mad_q"
def test_enrol_router_routes_search_to_detect():
ctx = {"operation": "SEARCH"}
assert enrol_router(ctx) == "detect_q"
# Runner error short-circuit test
def test_runner_skips_stage_on_error():
"""The runner routes error envelopes directly to respond_q.
Stages never see {"ok": False} payloads."""
error_ctx = {"ok": False, "error": {"code": "PAD_INFERENCE_ERROR", "reason": "...", "stage": "pad"}}
# Simulate: run_stage should route to respond_q without calling stage.fn
...

Integration tests cover queue wiring. Benchmark tests cover per-stage latency targets:

StageTarget p99 latency
Receive< 5 ms
PAD< 30 ms (ViT-based unified model)
MAD< 50 ms (CLIP+LoRA, ENROL only)
Detect< 80 ms (CPU)
Align< 2 ms
Assess Quality< 15 ms (OFIQ, ISO 29794-5)
Extract< 120 ms (CPU)
Search Executor< 50 ms
Verify Executor< 20 ms
Enrol Executor< 30 ms
Delete Executor< 10 ms
Respond< 5 ms
End-to-end< 350 ms p99 (SEARCH/VERIFY)
End-to-end< 400 ms p99 (ENROL, includes MAD)

  • FAISS shard internals -> see FAISS Design
  • Inbound XML schema specifics -> see adapter documentation
  • Infrastructure and deployment -> see deploy/
  • ISO 30107-3 PAD compliance metrics (APCER/BPCER/ACER) -> evaluated through NIST FATE PAD program
  • ISO 29794-5 quality compliance -> OFIQ reference implementation; see Stage 4 (Quality) above