Skip to content

Domain Design

Philosophy: Rich Hickey’s “Simple Made Easy” — simplicity through composability, decoupling, and pure data.


Hickey distinguishes simple (one braid — not interleaved with other concerns) from easy (familiar, near-at-hand). The domain layer must be simple:

  • Embeddings are values — immutable float32 arrays whose dimensionality comes from model configuration. They carry no behaviour, no identity, no mutable fields.
  • Operations are pure functions — same inputs always yield same outputs. No side effects, no I/O.
  • Ports are protocols — structural interfaces (Python Protocol), not abstract base classes. Adapters need not inherit anything; they just satisfy the shape.
  • One port, one concern — each protocol covers exactly one capability. When a port aggregates unrelated responsibilities (e.g. search + mutation), split it. The cost of an extra protocol is near zero; the cost of a complected interface compounds.
  • State lives at the boundary — the domain knows nothing about databases, caches, or request lifecycles.
  • No complecting — detection, alignment, quality, extraction, and matching are separate concerns composed at the service layer, never merged inside a single object.

All domain types are frozen dataclasses or named tuples. frozen=True enforces value semantics at the Python level. slots=True eliminates the __dict__ overhead and makes accidental mutation impossible even via object.__setattr__. Additionally, numpy arrays held inside frozen dataclasses have their writeable flag set to False in __post_init__, ensuring deep immutability beyond what Python’s frozen=True can enforce on its own.

src/core/domain/types.py
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import FrozenSet, NewType, Tuple, Union
import numpy as np
from numpy.typing import NDArray
# ---------------------------------------------------------------------------
# Primitive scalars — strong types prevent unit confusion
# ---------------------------------------------------------------------------
class SimilarityScore(float):
"""Cosine similarity in [-1.0, 1.0]. L2-normalised vectors → IP == cosine."""
def __new__(cls, value: float) -> "SimilarityScore":
if not (-1.0 <= value <= 1.0):
raise ValueError(f"SimilarityScore must be in [-1, 1], got {value}")
return super().__new__(cls, value)
class QualityScore(float):
"""Quality in [0.0, 1.0]. 0 = unusable image; 1 = perfect."""
def __new__(cls, value: float) -> "QualityScore":
if not (0.0 <= value <= 1.0):
raise ValueError(f"QualityScore must be in [0, 1], got {value}")
return super().__new__(cls, value)
class Threshold(float):
"""Decision threshold in [0.0, 1.0]."""
def __new__(cls, value: float) -> "Threshold":
if not (0.0 <= value <= 1.0):
raise ValueError(f"Threshold must be in [0, 1], got {value}")
return super().__new__(cls, value)
# ---------------------------------------------------------------------------
# Identifiers — typed wrappers prevent mixing gallery IDs with request IDs
# ---------------------------------------------------------------------------
class SubjectId(str):
"""Unique identifier for an enrolled subject in the gallery."""
class RequestId(str):
"""Trace/correlation ID for a single biometric transaction."""
class PartitionId(str):
"""Logical partition label (e.g. 'IABS', 'IDENT1')."""
# ---------------------------------------------------------------------------
# EmbeddingVector — the raw numeric value extracted by a model
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class EmbeddingVector:
"""
An L2-normalised float32 embedding produced by a specific model.
This is a *value* — it has no identity, no behaviour, and no
mutable fields. The numpy array is made read-only in __post_init__
for deep immutability.
Invariant: np.linalg.norm(vector) ≈ 1.0 (enforced by `template_normalize`)
"""
vector: NDArray[np.float32] # shape (expected_dim,), dtype float32
model_id: str # e.g. "adaface-vit-base-v2"
expected_dim: int # dimensionality from model config (e.g. 512)
def __post_init__(self) -> None:
if self.vector.shape != (self.expected_dim,):
raise ValueError(
f"EmbeddingVector must be shape ({self.expected_dim},), "
f"got {self.vector.shape}"
)
if self.vector.dtype != np.float32:
raise ValueError(f"EmbeddingVector must be float32, got {self.vector.dtype}")
# Deep immutability: prevent mutation of the underlying numpy buffer.
# This mutates the array's flags (not a dataclass field), so it works
# even though the dataclass is frozen.
self.vector.flags.writeable = False
# ---------------------------------------------------------------------------
# BiometricTemplate — the central record of the domain
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class BiometricTemplate:
"""
A biometric record linking an EmbeddingVector to a subject and its
provenance metadata. This is a *value* — it has no identity, no
behaviour, and no mutable fields.
"""
embedding: EmbeddingVector # the numeric embedding + model info
subject_id: SubjectId
partition: PartitionId
captured_at: datetime # UTC; when the probe was captured
quality: QualityScore
source_image_hash: str # SHA-256 of the source image (audit only)
def __post_init__(self) -> None:
if not self.captured_at.tzinfo:
raise ValueError("captured_at must be timezone-aware (UTC)")
# ---------------------------------------------------------------------------
# Search — 1:N probe-against-gallery
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class SearchRequest:
"""
A 1:N search request. The probe template is compared against every
template in the nominated partitions.
"""
request_id: RequestId
probe: BiometricTemplate
partitions: FrozenSet[PartitionId] # which gallery segments to scan
top_k: int # maximum candidates to return
threshold: Threshold # minimum score to include a hit
submitted_at: datetime # UTC
@dataclass(frozen=True, slots=True)
class SearchCandidate:
"""A single ranked result from a 1:N search."""
subject_id: SubjectId
score: SimilarityScore
rank: int # 1-indexed; 1 == best match
@dataclass(frozen=True, slots=True)
class SearchResult:
request_id: RequestId
candidates: Tuple[SearchCandidate, ...] # ordered by score descending
hit: bool # True if any candidate meets threshold
completed_at: datetime
# ---------------------------------------------------------------------------
# Verify — 1:1 probe-vs-enrolled-template
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class VerifyRequest:
"""
A 1:1 verification request. The probe is compared against the single
enrolled template for the claimed subject.
"""
request_id: RequestId
probe: BiometricTemplate
claimed_subject_id: SubjectId
threshold: Threshold
submitted_at: datetime
@dataclass(frozen=True, slots=True)
class VerifyResult:
request_id: RequestId
claimed_subject_id: SubjectId
score: SimilarityScore
accepted: bool # score >= threshold
completed_at: datetime
# ---------------------------------------------------------------------------
# Enrol — add a subject template to the gallery
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class EnrolRequest:
request_id: RequestId
template: BiometricTemplate # already extracted and normalised
replace_existing: bool # overwrite if subject_id already present
submitted_at: datetime
@dataclass(frozen=True, slots=True)
class EnrolResult:
request_id: RequestId
subject_id: SubjectId
enrolled_at: datetime
replaced: bool # True if this overwrote a prior template
# ---------------------------------------------------------------------------
# Delete — remove a subject from the gallery
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class DeleteRequest:
request_id: RequestId
subject_id: SubjectId
partition: PartitionId
submitted_at: datetime
@dataclass(frozen=True, slots=True)
class DeleteResult:
request_id: RequestId
subject_id: SubjectId
deleted: bool # False if subject was not found
completed_at: datetime
# ---------------------------------------------------------------------------
# Detection — face bounding boxes and landmarks from raw image
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class Detection:
"""
A single face detection result from a raw image.
Produced by DetectionPort, consumed by the pipeline detect stage.
"""
bbox: tuple[float, float, float, float] # (x1, y1, x2, y2) normalised to [0, 1]
landmarks: NDArray[np.float32] # 5-point facial landmarks, shape (5, 2)
confidence: float # detection confidence [0, 1]
def __post_init__(self) -> None:
if self.landmarks.shape != (5, 2):
raise ValueError(
f"Detection landmarks must be shape (5, 2), got {self.landmarks.shape}"
)
if self.landmarks.dtype != np.float32:
raise ValueError(f"Detection landmarks must be float32, got {self.landmarks.dtype}")
self.landmarks.flags.writeable = False
# ---------------------------------------------------------------------------
# Presentation Attack Detection (PAD)
# ---------------------------------------------------------------------------
class PadScore(float):
"""Spoof likelihood in [0.0, 1.0]. 0 = bona fide; 1 = attack."""
def __new__(cls, value: float) -> "PadScore":
if not (0.0 <= value <= 1.0):
raise ValueError(f"PadScore must be in [0, 1], got {value}")
return super().__new__(cls, value)
AttackType = NewType("AttackType", str)
"""
Open categorisation of detected presentation attack.
Convention strings (not exhaustive — new attack types do not require
modifying domain code):
"BONA_FIDE", "PRINT", "REPLAY", "MASK_3D", "DEEPFAKE",
"FACE_SWAP", "MORPHING", "UNKNOWN_ATTACK"
"""
@dataclass(frozen=True, slots=True)
class PadResult:
"""Result of presentation attack detection — measurement only, no decision."""
spoof_score: PadScore
attack_type: AttackType
confidence: float # [0.0, 1.0] confidence in the classification
@dataclass(frozen=True, slots=True)
class MorphingResult:
"""Result of morphing attack detection — measurement only."""
morph_score: PadScore # reuses [0, 1] scale: 0 = genuine, 1 = morphed
confidence: float
# ---------------------------------------------------------------------------
# Vector mutation acknowledgements — returned by split mutation ports
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class EnrolAck:
"""
Acknowledgement from the vector store that an enrolment mutation has
been accepted. Carries the event_id for correlation with the event log.
"""
event_id: str
subject_id: SubjectId
partition: PartitionId
replaced: bool # True if a prior template existed for this subject
@dataclass(frozen=True, slots=True)
class DeleteAck:
"""
Acknowledgement from the vector store that a deletion mutation has
been accepted. `deleted` is False if the subject was not found.
"""
event_id: str
subject_id: SubjectId
deleted: bool
# ---------------------------------------------------------------------------
# Event log entries — immutable records of gallery mutations
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class EnrolEvent:
"""Immutable record that a template was enrolled into the gallery."""
event_id: str
subject_id: SubjectId
partition: PartitionId
vector: NDArray[np.float32] # L2-normalised 512-dim template; needed for index rebuilds from event log
enrolled_at: datetime
source_ref: str # upstream transaction reference (e.g. SHA-256 of source image)
@dataclass(frozen=True, slots=True)
class DeleteEvent:
"""Immutable record that a template was deleted from the gallery."""
event_id: str
subject_id: SubjectId
partition: PartitionId
deleted_at: datetime
reason_code: str # e.g. "SUBJECT_REQUEST", "RETENTION_EXPIRY", "ADMIN"
Event = Union[EnrolEvent, DeleteEvent]
"""
Union of all gallery mutation events for the event log.
These domain event types are **canonical**. The Rust/FAISS layer derives its
own event structs from these definitions via protobuf translation. See
faiss-design.md Section 5 for the Rust-side representation.
"""

Why frozen dataclasses, not Pydantic models?

Section titled “Why frozen dataclasses, not Pydantic models?”
  • Pydantic models carry validation state and .model_fields, which are implementation details. Frozen dataclasses are leaner and communicate value semantics more directly.
  • slots=True prevents any accidental attribute assignment that frozen=True alone would only catch at runtime via FrozenInstanceError.
  • If JSON (de)serialisation is needed at the boundary, that is the adapter’s concern, not the domain’s.

Ports are structural protocols (PEP 544). An adapter satisfies a port by having the right method signatures. No isinstance checks, no registration, no base classes — pure duck typing verified by mypy.

Each protocol covers exactly one concern. The previous VectorStorePort aggregated four unrelated capabilities (search, lookup, mutation, deletion) behind a single interface. That is the classic Hickey “complecting” smell — interleaving independent braids into one strand. The split below gives each orchestrator exactly the capability it needs and nothing more.

src/core/ports/biometric.py
from __future__ import annotations
from collections.abc import AsyncIterator
from datetime import datetime
from typing import Protocol, runtime_checkable
from core.domain.types import (
BiometricTemplate,
DeleteAck,
DeleteRequest, DeleteResult,
Detection,
EmbeddingVector,
EnrolAck,
EnrolEvent, DeleteEvent, Event,
EnrolRequest, EnrolResult,
MorphingResult,
NDArray,
PadResult,
PartitionId,
QualityScore,
SearchRequest, SearchResult,
SimilarityScore,
SubjectId,
VerifyRequest, VerifyResult,
)
import numpy as np
# ---------------------------------------------------------------------------
# Inbound operation ports — define WHAT the application can do
# ---------------------------------------------------------------------------
@runtime_checkable
class SearchPort(Protocol):
"""Execute a 1:N biometric search."""
async def search(self, request: SearchRequest) -> SearchResult:
...
@runtime_checkable
class VerifyPort(Protocol):
"""Execute a 1:1 biometric verification."""
async def verify(self, request: VerifyRequest) -> VerifyResult:
...
@runtime_checkable
class EnrolPort(Protocol):
"""Enrol a subject template into the gallery."""
async def enrol(self, request: EnrolRequest) -> EnrolResult:
...
@runtime_checkable
class DeletePort(Protocol):
"""Remove a subject template from the gallery."""
async def delete(self, request: DeleteRequest) -> DeleteResult:
...
# ---------------------------------------------------------------------------
# Outbound infrastructure ports — define HOW the domain reaches the outside
# ---------------------------------------------------------------------------
@runtime_checkable
class InferencePort(Protocol):
"""
Run the ViT feature-extraction model to produce a raw (unnormalised)
EmbeddingVector from a pre-aligned face crop.
The domain does not know whether this is ONNX, TensorRT, or a stub.
"""
async def extract(self, aligned_face: NDArray[np.uint8]) -> EmbeddingVector:
"""
Parameters
----------
aligned_face:
112x112 RGB uint8 array produced by the alignment stage.
Returns
-------
EmbeddingVector
Raw (unnormalised) embedding with model_id and expected_dim set.
The caller is responsible for normalisation via `template_normalize`
before creating a `BiometricTemplate`.
"""
...
@runtime_checkable
class DetectionPort(Protocol):
"""
Detects faces in a raw image. Returns zero or more Detection results
ordered by confidence descending. The adapter wraps SCRFD_10G or
equivalent via ONNX Runtime.
"""
async def detect(self, image_bytes: bytes) -> list[Detection]:
...
# ---------------------------------------------------------------------------
# Vector store — split into three single-concern protocols
#
# The old VectorStorePort aggregated search, lookup, and mutation behind
# one interface. That complects read and write paths that have entirely
# different scaling, consistency, and deployment characteristics.
#
# Split:
# VectorSearchPort — ANN search across partitions (read, scatter-gather)
# VectorLookupPort — fetch a single template by ID (read, point lookup)
# VectorMutationPort — enrol and delete (write, event-sourced)
# ---------------------------------------------------------------------------
@runtime_checkable
class VectorSearchPort(Protocol):
"""
Approximate nearest-neighbour search across gallery partitions.
This is the hot read path — scatter-gather across FAISS shards.
The domain does not know about sharding, IVF probes, or PQ compression.
"""
async def search_ann(
self,
query: NDArray[np.float32],
partitions: frozenset[PartitionId],
top_k: int,
) -> list[tuple[SubjectId, SimilarityScore]]:
"""
Returns up to `top_k` (subject_id, raw_score) pairs, highest score first.
The domain service applies threshold filtering on top of this.
"""
...
@runtime_checkable
class VectorLookupPort(Protocol):
"""
Point-lookup of a single enrolled template by subject ID.
Used by VerifyOrchestrator to fetch the gallery template for 1:1
comparison. Intentionally separate from search — the adapter may
use a flat key-value store rather than FAISS for this.
"""
async def fetch(self, subject_id: SubjectId) -> BiometricTemplate | None:
"""Return the enrolled template, or None if not present."""
...
@runtime_checkable
class VectorMutationPort(Protocol):
"""
Write path for gallery mutations: enrolment and deletion.
Returns acknowledgement values (EnrolAck, DeleteAck) that carry
event_ids for correlation with the EventLogPort. The adapter
decides whether this is synchronous or queue-backed.
"""
async def enrol(self, template: BiometricTemplate) -> EnrolAck:
"""Insert or replace a template for subject_id in its partition."""
...
async def delete(self, subject_id: SubjectId, partition: PartitionId) -> DeleteAck:
"""Delete the template. Returns ack with deleted=True if it existed."""
...
# ---------------------------------------------------------------------------
# Quality, PAD, and morphing detection
# ---------------------------------------------------------------------------
@runtime_checkable
class QualityPort(Protocol):
"""
Assess whether a face image meets minimum quality standards for extraction.
Sits *before* InferencePort in the pipeline to reject bad probes cheaply.
Reference implementation: OFIQ (ISO/IEC 29794-5:2025), the BSI/eu-LISA
open-source implementation of the ISO face image quality standard.
"""
async def assess(self, aligned_face: NDArray[np.uint8]) -> QualityScore:
"""
Parameters
----------
aligned_face:
112x112 RGB uint8 array.
Returns
-------
QualityScore
A float in [0, 1]. The pipeline uses this to gate extraction.
"""
...
@runtime_checkable
class PadPort(Protocol):
"""
Unified physical-digital presentation attack detection.
Covers: print, replay, mask, deepfake, face swap.
Produces a measurement only — the decision threshold is policy.
"""
async def detect_attack(self, image: NDArray[np.uint8]) -> PadResult:
...
@runtime_checkable
class MorphingDetectionPort(Protocol):
"""
Morphing attack detection for document-based enrollment.
Detects blended identities in passport/ID photos.
"""
async def detect_morphing(self, image: NDArray[np.uint8]) -> MorphingResult:
...
# ---------------------------------------------------------------------------
# Event log — append-only record of gallery mutations
# ---------------------------------------------------------------------------
@runtime_checkable
class EventLogPort(Protocol):
"""
Append-only log of gallery mutation events.
Every enrol and delete flows through this log before (or alongside)
the vector store mutation. This enables:
- Event sourcing: rebuild gallery state from the log
- Audit: complete history of who was enrolled/deleted and when
- Replication: followers consume the log to stay in sync
The adapter may be backed by Kafka, a WAL file, or an in-memory list.
"""
async def append(self, event: EnrolEvent | DeleteEvent) -> str:
"""Append an event and return its event_id."""
...
async def read_from(self, offset: int) -> AsyncIterator[Event]:
"""Yield events starting from the given offset."""
...
async def current_offset(self) -> int:
"""Return the current log tail offset."""
...
# ---------------------------------------------------------------------------
# Queue — inter-stage message passing
# ---------------------------------------------------------------------------
@runtime_checkable
class QueuePort(Protocol):
"""
Minimal async queue for passing envelopes between pipeline stages.
The envelope is a plain dict — no domain types leak into the transport.
This keeps pipeline stages decoupled: each stage reads the keys it needs
and ignores the rest. Hickey: "queues are the mechanism that lets you
get at the independent things independently."
The adapter may be an asyncio.Queue, Redis Stream, or Kafka topic.
"""
async def put(self, envelope: dict) -> None:
"""Enqueue an envelope for the next stage."""
...
def __aiter__(self) -> AsyncIterator[dict]:
"""Iterate over incoming envelopes."""
...
# ---------------------------------------------------------------------------
# Clock — injectable time source
# ---------------------------------------------------------------------------
@runtime_checkable
class ClockPort(Protocol):
"""
Injectable time source for timestamps.
Production uses `datetime.now(timezone.utc)`. Tests inject a
deterministic clock that returns fixed or advancing timestamps.
This eliminates time-dependent flakiness from domain tests.
"""
def now(self) -> datetime:
"""Return the current UTC datetime."""
...

Abstract base classes complect interface definition with inheritance. A Protocol defines only the structural contract. An adapter written in Rust (via PyO3) or in a third-party library satisfies the protocol without ever knowing the protocol exists — Hickey would call this composing without complecting.

All ports are decorated with @runtime_checkable to enable isinstance checks at service startup — specifically in the composition root where adapters are wired to ports. This catches wiring errors (e.g. passing a FAISSAdapter where an EventLogPort was expected) before the first request arrives.

@runtime_checkable is not intended for production dispatch. It verifies method names exist but cannot check argument types or return types at runtime. Mypy is the primary verification mechanism — structural subtyping via Protocol is checked statically during development and CI. The runtime check is a safety net for deployment, not a substitute for static analysis.


These functions are stateless and free of I/O. They can be unit-tested with plain numpy arrays and no mocks.

src/core/domain/ops.py
from __future__ import annotations
import numpy as np
from numpy.typing import NDArray
from core.domain.types import (
EmbeddingVector,
MorphingResult,
PadResult,
QualityScore,
SearchCandidate,
SimilarityScore,
SubjectId,
Threshold,
)
# ---------------------------------------------------------------------------
# Template arithmetic
# ---------------------------------------------------------------------------
def template_normalize(embedding: EmbeddingVector) -> EmbeddingVector:
"""
L2-normalise an EmbeddingVector onto the unit hypersphere, returning
a new EmbeddingVector with the same model_id and expected_dim.
This is the single mandatory post-processing step after InferencePort.extract().
After normalisation, Inner Product == Cosine Similarity.
Parameters
----------
embedding:
EmbeddingVector with raw (unnormalised) float32 data from the ViT.
Returns
-------
EmbeddingVector
L2-normalised copy. Input is not mutated.
Raises
------
ValueError
If the vector is the zero vector (degenerate model output).
"""
norm = np.linalg.norm(embedding.vector)
if norm < 1e-10:
raise ValueError("Cannot normalise a zero vector — degenerate extraction output.")
normalised = (embedding.vector / norm).astype(np.float32)
return EmbeddingVector(
vector=normalised,
model_id=embedding.model_id,
expected_dim=embedding.expected_dim,
)
def cosine_similarity(
a: EmbeddingVector,
b: EmbeddingVector,
) -> SimilarityScore:
"""
Compute cosine similarity between two L2-normalised EmbeddingVectors.
For L2-normalised vectors this reduces to a single dot product —
mathematically identical to the Inner Product FAISS uses, so scores
are directly comparable across the domain boundary.
Parameters
----------
a, b:
L2-normalised EmbeddingVectors from the same model.
Returns
-------
SimilarityScore
Scalar in [-1.0, 1.0].
Raises
------
ValueError
If the two embeddings come from different models.
"""
if a.model_id != b.model_id:
raise ValueError(
f"Cannot compare embeddings from different models: "
f"{a.model_id!r} vs {b.model_id!r}"
)
raw = float(np.dot(a.vector, b.vector))
# Clamp to [-1, 1] to guard against floating-point overshoot on
# near-identical vectors (e.g. 1.0000001 from fp32 rounding).
return SimilarityScore(max(-1.0, min(1.0, raw)))
def score_fusion(
scores: tuple[SimilarityScore, ...],
weights: tuple[float, ...] | None = None,
) -> SimilarityScore:
"""
Fuse multiple per-shard or per-model similarity scores into a single score.
Default strategy: weighted average (uniform weights if none provided).
Alternative strategies (max, product) can be composed by the caller —
this function does not decide policy, it implements one explicit mechanism.
Parameters
----------
scores:
Tuple of SimilarityScore values from individual shards or models.
weights:
Optional matching-length tuple of non-negative floats.
Must sum to > 0.
Returns
-------
SimilarityScore
Fused score in [-1, 1].
Raises
------
ValueError
On empty input, mismatched lengths, or zero total weight.
"""
if not scores:
raise ValueError("score_fusion requires at least one score.")
if weights is None:
weights = tuple(1.0 for _ in scores)
if len(weights) != len(scores):
raise ValueError("scores and weights must have the same length.")
total_weight = sum(weights)
if total_weight < 1e-10:
raise ValueError("Total weight must be > 0.")
fused = sum(s * w for s, w in zip(scores, weights)) / total_weight
return SimilarityScore(max(-1.0, min(1.0, fused)))
# ---------------------------------------------------------------------------
# Decision functions — pure policy, no I/O
# ---------------------------------------------------------------------------
def threshold_check(score: SimilarityScore, threshold: Threshold) -> bool:
"""
Apply the configured decision threshold.
Returns True (accept / hit) when the similarity score meets or exceeds
the threshold. This is the *only* place in the domain where the binary
accept/reject decision is made. The threshold value itself is passed in
by the caller (the service orchestrator), keeping policy out of this function.
Parameters
----------
score:
Similarity between probe and gallery template.
threshold:
Decision boundary configured per use-case.
Returns
-------
bool
True = accept (score >= threshold).
"""
return float(score) >= float(threshold)
def quality_gate(score: QualityScore, minimum: QualityScore) -> bool:
"""
Decide whether a probe image meets the minimum quality bar for extraction.
Called *before* InferencePort.extract() to reject poor-quality probes
cheaply. Returns True if the image is acceptable.
Parameters
----------
score:
Quality assessment from QualityPort.assess().
minimum:
Configured minimum acceptable quality (e.g. 0.5 for operational use).
Returns
-------
bool
True = image is acceptable for extraction.
"""
return float(score) >= float(minimum)
def pad_gate(result: PadResult, threshold: Threshold) -> bool:
"""Decide whether an image passes PAD. True = bona fide (not an attack)."""
return float(result.spoof_score) < float(threshold)
def morphing_gate(result: MorphingResult, threshold: Threshold) -> bool:
"""Decide whether an image passes morphing detection. True = genuine."""
return float(result.morph_score) < float(threshold)
# ---------------------------------------------------------------------------
# Result construction helpers — pure functions over data
# ---------------------------------------------------------------------------
def rank_candidates(
raw_results: list[tuple[SubjectId, SimilarityScore]],
threshold: Threshold,
top_k: int,
) -> tuple[SearchCandidate, ...]:
"""
Convert raw ANN results into ranked, threshold-filtered SearchCandidates.
This function encapsulates the aggregation policy for 1:N search:
sort by score descending, apply threshold, cap at top_k.
Parameters
----------
raw_results:
List of (subject_id, score) pairs from VectorSearchPort.search_ann().
May contain results from multiple shards already merged.
threshold:
Minimum score for a result to be included.
top_k:
Maximum number of candidates in the output.
Returns
-------
tuple[SearchCandidate, ...]
Immutable, ranked tuple. Empty if no result meets the threshold.
"""
accepted = [
(sid, score)
for sid, score in raw_results
if threshold_check(score, threshold)
]
accepted.sort(key=lambda x: float(x[1]), reverse=True)
return tuple(
SearchCandidate(subject_id=sid, score=score, rank=i + 1)
for i, (sid, score) in enumerate(accepted[:top_k])
)

Orchestrators compose ports and pure functions to implement the four inbound operations. Each orchestrator receives only the ports it needs — no god-object service that holds every adapter.

src/core/orchestration/search.py
from core.domain.ops import rank_candidates
from core.domain.types import SearchRequest, SearchResult
from core.ports.biometric import ClockPort, VectorSearchPort
class SearchOrchestrator:
"""1:N search — scatter query across partitions, rank results."""
def __init__(self, vectors: VectorSearchPort, clock: ClockPort) -> None:
self._vectors = vectors
self._clock = clock
async def search(self, request: SearchRequest) -> SearchResult:
raw = await self._vectors.search_ann(
query=request.probe.embedding.vector,
partitions=request.partitions,
top_k=request.top_k,
)
candidates = rank_candidates(raw, request.threshold, request.top_k)
return SearchResult(
request_id=request.request_id,
candidates=candidates,
hit=len(candidates) > 0,
completed_at=self._clock.now(),
)
src/core/orchestration/verify.py
from core.domain.ops import cosine_similarity, threshold_check
from core.domain.types import VerifyRequest, VerifyResult
from core.ports.biometric import ClockPort, VectorLookupPort
class VerifyOrchestrator:
"""1:1 verification — fetch enrolled template, compute similarity."""
def __init__(self, lookup: VectorLookupPort, clock: ClockPort) -> None:
self._lookup = lookup
self._clock = clock
async def verify(self, request: VerifyRequest) -> VerifyResult:
enrolled = await self._lookup.fetch(request.claimed_subject_id)
if enrolled is None:
# Subject not found — score 0, always reject
from core.domain.types import SimilarityScore
score = SimilarityScore(0.0)
else:
score = cosine_similarity(request.probe.embedding, enrolled.embedding)
return VerifyResult(
request_id=request.request_id,
claimed_subject_id=request.claimed_subject_id,
score=score,
accepted=threshold_check(score, request.threshold),
completed_at=self._clock.now(),
)
src/core/orchestration/enrol.py
from core.domain.types import EnrolRequest, EnrolResult
from core.ports.biometric import ClockPort, VectorMutationPort
class EnrolOrchestrator:
"""Enrol a template into the gallery via the mutation port."""
def __init__(self, mutations: VectorMutationPort, clock: ClockPort) -> None:
self._mutations = mutations
self._clock = clock
async def enrol(self, request: EnrolRequest) -> EnrolResult:
ack = await self._mutations.enrol(request.template)
return EnrolResult(
request_id=request.request_id,
subject_id=request.template.subject_id,
enrolled_at=self._clock.now(),
replaced=ack.replaced,
)
src/core/orchestration/delete.py
from core.domain.types import DeleteRequest, DeleteResult
from core.ports.biometric import ClockPort, VectorMutationPort
class DeleteOrchestrator:
"""Delete a template from the gallery via the mutation port."""
def __init__(self, mutations: VectorMutationPort, clock: ClockPort) -> None:
self._mutations = mutations
self._clock = clock
async def delete(self, request: DeleteRequest) -> DeleteResult:
ack = await self._mutations.delete(request.subject_id, request.partition)
return DeleteResult(
request_id=request.request_id,
subject_id=request.subject_id,
deleted=ack.deleted,
completed_at=self._clock.now(),
)

Each orchestrator takes the narrowest possible set of ports:

OrchestratorPorts injected
SearchOrchestratorVectorSearchPort, ClockPort
VerifyOrchestratorVectorLookupPort, ClockPort
EnrolOrchestratorVectorMutationPort, ClockPort
DeleteOrchestratorVectorMutationPort, ClockPort

A single FAISS adapter may implement all three vector ports. That is fine — the adapter composes; the domain does not know.


┌──────────────────────────────────────────┐
│ Service Orchestrators │
│ (Search, Verify, Enrol, Delete) │
│ Call ports. Compose pure functions. │
│ Inject dependencies. Own the workflow. │
└──────────────┬───────────────────────────┘
│ uses
┌─────────────────────┼─────────────────────────┐
│ │ │
┌───────▼──────┐ ┌──────────▼───────────┐ ┌────────▼───────┐
│ domain/ │ │ ports/ │ │ adapters/ │
│ types.py │ │ biometric.py │ │ (outbound) │
│ │ │ │ │ │
│ Frozen │ │ Inbound: │ │ FAISSAdapter │
│ dataclasses │ │ SearchPort │ │ (implements │
│ Typed │ │ VerifyPort │ │ VectorSearch │
│ scalars │ │ EnrolPort │ │ + Lookup │
│ Ack types │ │ DeletePort │ │ + Mutation) │
│ Event types │ │ │ │ ONNXAdapter │
│ │ │ Vector (split): │ │ QualityAdapter│
│ domain/ │ │ VectorSearchPort │ │ PadAdapter │
│ ops.py │ │ VectorLookupPort │ │ MorphAdapter │
│ │ │ VectorMutationPort │ │ EventLogAdapt │
│ Pure fns: │ │ │ │ QueueAdapter │
│ normalize │ │ Inference: │ │ ClockAdapter │
│ cosine_sim │ │ InferencePort │ │ │
│ threshold │ │ DetectionPort │ │ Satisfy ports │
│ quality_gate│ │ PadPort │ │ via duck │
│ pad_gate │ │ MorphingDetection- │ │ typing only │
│ morph_gate │ │ Port │ │ │
│ │ │ │ │ No domain │
│ │ │ Infrastructure: │ │ types leak │
│ │ │ EventLogPort │ │ into adapters │
│ │ │ QueuePort │ │ │
│ │ │ ClockPort │ │ │
│ │ │ │ │ │
│ │ │ Python Protocol │ │ │
│ │ │ (structural, │ │ │
│ │ │ not nominal) │ │ │
└──────────────┘ └──────────────────────┘ └────────────────┘
Zero external dependencies in src/core/domain/
Zero I/O in src/core/domain/ops.py
Zero mutable state anywhere in core/

6. What This Design Avoids (The “No Complecting” List)

Section titled “6. What This Design Avoids (The “No Complecting” List)”
Complected Anti-PatternSimple Alternative Used Here
Template class with .match(other) methodcosine_similarity(a, b) pure function over EmbeddingVector values
VectorStorePort with search + lookup + mutationThree split ports: VectorSearchPort, VectorLookupPort, VectorMutationPort
Searcher base class -> FAISSSearcher subclassVectorSearchPort Protocol; FAISSAdapter satisfies it structurally
GalleryService with mutable self._cacheStateless orchestrator calls ports; state lives in FAISS cluster
QualityChecker that also extractsQualityPort and InferencePort are separate protocols
Result object with .accept() side effectthreshold_check(score, threshold) -> bool — data in, data out
TemplateRepository with ORM modelVectorLookupPort.fetch() returns a plain BiometricTemplate value
Datetime strings in domain typesdatetime with tzinfo — single canonical time representation
datetime.now() called inside domainClockPort injected — deterministic, testable
PadDetector that also rejectsPadPort produces measurement; pad_gate decides
MorphingDetector coupled to PADSeparate port, separate model, composable
Direct function calls between pipeline stagesQueuePort decouples stages; each consumes independently
Mutable gallery state without audit trailEventLogPort provides append-only event history

src/core/
├── domain/
│ ├── __init__.py
│ ├── types.py # All frozen dataclasses and typed scalars
│ │ # (EmbeddingVector, BiometricTemplate, PadResult,
│ │ # EnrolAck, DeleteAck, EnrolEvent, DeleteEvent, etc.)
│ └── ops.py # All pure functions (template_normalize, cosine_similarity,
│ # pad_gate, morphing_gate, rank_candidates, etc.)
├── ports/
│ ├── __init__.py
│ └── biometric.py # Protocol definitions for all ports:
│ # Inbound: SearchPort, VerifyPort, EnrolPort, DeletePort
│ # Vector: VectorSearchPort, VectorLookupPort, VectorMutationPort
│ # Infra: InferencePort, DetectionPort, QualityPort,
│ # PadPort, MorphingDetectionPort, EventLogPort,
│ # QueuePort, ClockPort
└── orchestration/
├── __init__.py
├── search.py # SearchOrchestrator(VectorSearchPort, ClockPort)
├── verify.py # VerifyOrchestrator(VectorLookupPort, ClockPort)
├── enrol.py # EnrolOrchestrator(VectorMutationPort, ClockPort)
└── delete.py # DeleteOrchestrator(VectorMutationPort, ClockPort)

src/core/ has zero third-party dependencies beyond numpy (needed for the vector type annotations). All I/O, all network calls, and all framework code live in src/adapters/.


Because every function in ops.py is pure and every type in types.py is a frozen value, tests need no mocks, no fixtures, and no test databases:

# tests/unit/test_ops.py (illustrative)
import numpy as np
import pytest
from core.domain.ops import cosine_similarity, template_normalize, threshold_check
from core.domain.types import EmbeddingVector, SimilarityScore, Threshold
MODEL_ID = "adaface-vit-base-v2"
def _make_embedding(vec: np.ndarray) -> EmbeddingVector:
return EmbeddingVector(vector=vec, model_id=MODEL_ID, expected_dim=512)
def test_identical_vectors_score_one():
v = template_normalize(_make_embedding(np.random.randn(512).astype(np.float32)))
assert cosine_similarity(v, v) == pytest.approx(1.0, abs=1e-6)
def test_orthogonal_vectors_score_zero():
a = np.zeros(512, dtype=np.float32); a[0] = 1.0
b = np.zeros(512, dtype=np.float32); b[1] = 1.0
assert cosine_similarity(_make_embedding(a), _make_embedding(b)) == pytest.approx(0.0, abs=1e-6)
def test_threshold_accept():
assert threshold_check(SimilarityScore(0.85), Threshold(0.80)) is True
def test_threshold_reject():
assert threshold_check(SimilarityScore(0.75), Threshold(0.80)) is False
def test_normalize_zero_vector_raises():
with pytest.raises(ValueError, match="zero vector"):
template_normalize(_make_embedding(np.zeros(512, dtype=np.float32)))

Property-based tests using Hypothesis can verify the normalisation invariant (np.linalg.norm(template_normalize(e).vector) ≈ 1.0) across thousands of random EmbeddingVector instances — no domain-specific fixtures needed.

Orchestrator tests inject stub adapters that implement the split ports and a deterministic ClockPort:

# tests/unit/test_search_orchestrator.py (illustrative)
from datetime import datetime, timezone
from core.domain.types import SimilarityScore, SubjectId, PartitionId
from core.orchestration.search import SearchOrchestrator
class StubVectorSearch:
"""Satisfies VectorSearchPort structurally — no inheritance needed."""
async def search_ann(self, query, partitions, top_k):
return [(SubjectId("S001"), SimilarityScore(0.95))]
class StubClock:
"""Satisfies ClockPort structurally."""
def now(self):
return datetime(2025, 1, 1, tzinfo=timezone.utc)
async def test_search_returns_ranked_candidates():
orch = SearchOrchestrator(vectors=StubVectorSearch(), clock=StubClock())
# ... build SearchRequest, assert on SearchResult