Domain Design
Philosophy: Rich Hickey’s “Simple Made Easy” — simplicity through composability, decoupling, and pure data.
0. Guiding Principles
Section titled “0. Guiding Principles”Hickey distinguishes simple (one braid — not interleaved with other concerns) from easy (familiar, near-at-hand). The domain layer must be simple:
- Embeddings are values — immutable float32 arrays whose dimensionality comes from model configuration. They carry no behaviour, no identity, no mutable fields.
- Operations are pure functions — same inputs always yield same outputs. No side effects, no I/O.
- Ports are protocols — structural interfaces (Python
Protocol), not abstract base classes. Adapters need not inherit anything; they just satisfy the shape. - One port, one concern — each protocol covers exactly one capability. When a port aggregates unrelated responsibilities (e.g. search + mutation), split it. The cost of an extra protocol is near zero; the cost of a complected interface compounds.
- State lives at the boundary — the domain knows nothing about databases, caches, or request lifecycles.
- No complecting — detection, alignment, quality, extraction, and matching are separate concerns composed at the service layer, never merged inside a single object.
1. Core Data Types
Section titled “1. Core Data Types”All domain types are frozen dataclasses or named tuples. frozen=True enforces value semantics at the Python level. slots=True eliminates the __dict__ overhead and makes accidental mutation impossible even via object.__setattr__. Additionally, numpy arrays held inside frozen dataclasses have their writeable flag set to False in __post_init__, ensuring deep immutability beyond what Python’s frozen=True can enforce on its own.
from __future__ import annotations
import uuidfrom dataclasses import dataclass, fieldfrom datetime import datetime, timezonefrom typing import FrozenSet, NewType, Tuple, Union
import numpy as npfrom numpy.typing import NDArray
# ---------------------------------------------------------------------------# Primitive scalars — strong types prevent unit confusion# ---------------------------------------------------------------------------
class SimilarityScore(float): """Cosine similarity in [-1.0, 1.0]. L2-normalised vectors → IP == cosine."""
def __new__(cls, value: float) -> "SimilarityScore": if not (-1.0 <= value <= 1.0): raise ValueError(f"SimilarityScore must be in [-1, 1], got {value}") return super().__new__(cls, value)
class QualityScore(float): """Quality in [0.0, 1.0]. 0 = unusable image; 1 = perfect."""
def __new__(cls, value: float) -> "QualityScore": if not (0.0 <= value <= 1.0): raise ValueError(f"QualityScore must be in [0, 1], got {value}") return super().__new__(cls, value)
class Threshold(float): """Decision threshold in [0.0, 1.0]."""
def __new__(cls, value: float) -> "Threshold": if not (0.0 <= value <= 1.0): raise ValueError(f"Threshold must be in [0, 1], got {value}") return super().__new__(cls, value)
# ---------------------------------------------------------------------------# Identifiers — typed wrappers prevent mixing gallery IDs with request IDs# ---------------------------------------------------------------------------
class SubjectId(str): """Unique identifier for an enrolled subject in the gallery."""
class RequestId(str): """Trace/correlation ID for a single biometric transaction."""
class PartitionId(str): """Logical partition label (e.g. 'IABS', 'IDENT1')."""
# ---------------------------------------------------------------------------# EmbeddingVector — the raw numeric value extracted by a model# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)class EmbeddingVector: """ An L2-normalised float32 embedding produced by a specific model. This is a *value* — it has no identity, no behaviour, and no mutable fields. The numpy array is made read-only in __post_init__ for deep immutability.
Invariant: np.linalg.norm(vector) ≈ 1.0 (enforced by `template_normalize`) """
vector: NDArray[np.float32] # shape (expected_dim,), dtype float32 model_id: str # e.g. "adaface-vit-base-v2" expected_dim: int # dimensionality from model config (e.g. 512)
def __post_init__(self) -> None: if self.vector.shape != (self.expected_dim,): raise ValueError( f"EmbeddingVector must be shape ({self.expected_dim},), " f"got {self.vector.shape}" ) if self.vector.dtype != np.float32: raise ValueError(f"EmbeddingVector must be float32, got {self.vector.dtype}") # Deep immutability: prevent mutation of the underlying numpy buffer. # This mutates the array's flags (not a dataclass field), so it works # even though the dataclass is frozen. self.vector.flags.writeable = False
# ---------------------------------------------------------------------------# BiometricTemplate — the central record of the domain# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)class BiometricTemplate: """ A biometric record linking an EmbeddingVector to a subject and its provenance metadata. This is a *value* — it has no identity, no behaviour, and no mutable fields. """
embedding: EmbeddingVector # the numeric embedding + model info subject_id: SubjectId partition: PartitionId captured_at: datetime # UTC; when the probe was captured quality: QualityScore source_image_hash: str # SHA-256 of the source image (audit only)
def __post_init__(self) -> None: if not self.captured_at.tzinfo: raise ValueError("captured_at must be timezone-aware (UTC)")
# ---------------------------------------------------------------------------# Search — 1:N probe-against-gallery# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)class SearchRequest: """ A 1:N search request. The probe template is compared against every template in the nominated partitions. """
request_id: RequestId probe: BiometricTemplate partitions: FrozenSet[PartitionId] # which gallery segments to scan top_k: int # maximum candidates to return threshold: Threshold # minimum score to include a hit submitted_at: datetime # UTC
@dataclass(frozen=True, slots=True)class SearchCandidate: """A single ranked result from a 1:N search."""
subject_id: SubjectId score: SimilarityScore rank: int # 1-indexed; 1 == best match
@dataclass(frozen=True, slots=True)class SearchResult: request_id: RequestId candidates: Tuple[SearchCandidate, ...] # ordered by score descending hit: bool # True if any candidate meets threshold completed_at: datetime
# ---------------------------------------------------------------------------# Verify — 1:1 probe-vs-enrolled-template# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)class VerifyRequest: """ A 1:1 verification request. The probe is compared against the single enrolled template for the claimed subject. """
request_id: RequestId probe: BiometricTemplate claimed_subject_id: SubjectId threshold: Threshold submitted_at: datetime
@dataclass(frozen=True, slots=True)class VerifyResult: request_id: RequestId claimed_subject_id: SubjectId score: SimilarityScore accepted: bool # score >= threshold completed_at: datetime
# ---------------------------------------------------------------------------# Enrol — add a subject template to the gallery# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)class EnrolRequest: request_id: RequestId template: BiometricTemplate # already extracted and normalised replace_existing: bool # overwrite if subject_id already present submitted_at: datetime
@dataclass(frozen=True, slots=True)class EnrolResult: request_id: RequestId subject_id: SubjectId enrolled_at: datetime replaced: bool # True if this overwrote a prior template
# ---------------------------------------------------------------------------# Delete — remove a subject from the gallery# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)class DeleteRequest: request_id: RequestId subject_id: SubjectId partition: PartitionId submitted_at: datetime
@dataclass(frozen=True, slots=True)class DeleteResult: request_id: RequestId subject_id: SubjectId deleted: bool # False if subject was not found completed_at: datetime
# ---------------------------------------------------------------------------# Detection — face bounding boxes and landmarks from raw image# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)class Detection: """ A single face detection result from a raw image. Produced by DetectionPort, consumed by the pipeline detect stage. """ bbox: tuple[float, float, float, float] # (x1, y1, x2, y2) normalised to [0, 1] landmarks: NDArray[np.float32] # 5-point facial landmarks, shape (5, 2) confidence: float # detection confidence [0, 1]
def __post_init__(self) -> None: if self.landmarks.shape != (5, 2): raise ValueError( f"Detection landmarks must be shape (5, 2), got {self.landmarks.shape}" ) if self.landmarks.dtype != np.float32: raise ValueError(f"Detection landmarks must be float32, got {self.landmarks.dtype}") self.landmarks.flags.writeable = False
# ---------------------------------------------------------------------------# Presentation Attack Detection (PAD)# ---------------------------------------------------------------------------
class PadScore(float): """Spoof likelihood in [0.0, 1.0]. 0 = bona fide; 1 = attack.""" def __new__(cls, value: float) -> "PadScore": if not (0.0 <= value <= 1.0): raise ValueError(f"PadScore must be in [0, 1], got {value}") return super().__new__(cls, value)
AttackType = NewType("AttackType", str)"""Open categorisation of detected presentation attack.
Convention strings (not exhaustive — new attack types do not requiremodifying domain code): "BONA_FIDE", "PRINT", "REPLAY", "MASK_3D", "DEEPFAKE", "FACE_SWAP", "MORPHING", "UNKNOWN_ATTACK""""
@dataclass(frozen=True, slots=True)class PadResult: """Result of presentation attack detection — measurement only, no decision.""" spoof_score: PadScore attack_type: AttackType confidence: float # [0.0, 1.0] confidence in the classification
@dataclass(frozen=True, slots=True)class MorphingResult: """Result of morphing attack detection — measurement only.""" morph_score: PadScore # reuses [0, 1] scale: 0 = genuine, 1 = morphed confidence: float
# ---------------------------------------------------------------------------# Vector mutation acknowledgements — returned by split mutation ports# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)class EnrolAck: """ Acknowledgement from the vector store that an enrolment mutation has been accepted. Carries the event_id for correlation with the event log. """ event_id: str subject_id: SubjectId partition: PartitionId replaced: bool # True if a prior template existed for this subject
@dataclass(frozen=True, slots=True)class DeleteAck: """ Acknowledgement from the vector store that a deletion mutation has been accepted. `deleted` is False if the subject was not found. """ event_id: str subject_id: SubjectId deleted: bool
# ---------------------------------------------------------------------------# Event log entries — immutable records of gallery mutations# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)class EnrolEvent: """Immutable record that a template was enrolled into the gallery.""" event_id: str subject_id: SubjectId partition: PartitionId vector: NDArray[np.float32] # L2-normalised 512-dim template; needed for index rebuilds from event log enrolled_at: datetime source_ref: str # upstream transaction reference (e.g. SHA-256 of source image)
@dataclass(frozen=True, slots=True)class DeleteEvent: """Immutable record that a template was deleted from the gallery.""" event_id: str subject_id: SubjectId partition: PartitionId deleted_at: datetime reason_code: str # e.g. "SUBJECT_REQUEST", "RETENTION_EXPIRY", "ADMIN"
Event = Union[EnrolEvent, DeleteEvent]"""Union of all gallery mutation events for the event log.
These domain event types are **canonical**. The Rust/FAISS layer derives itsown event structs from these definitions via protobuf translation. Seefaiss-design.md Section 5 for the Rust-side representation."""Why frozen dataclasses, not Pydantic models?
Section titled “Why frozen dataclasses, not Pydantic models?”- Pydantic models carry validation state and
.model_fields, which are implementation details. Frozen dataclasses are leaner and communicate value semantics more directly. slots=Trueprevents any accidental attribute assignment thatfrozen=Truealone would only catch at runtime viaFrozenInstanceError.- If JSON (de)serialisation is needed at the boundary, that is the adapter’s concern, not the domain’s.
2. Port Definitions
Section titled “2. Port Definitions”Ports are structural protocols (PEP 544). An adapter satisfies a port by having the right method signatures. No isinstance checks, no registration, no base classes — pure duck typing verified by mypy.
Each protocol covers exactly one concern. The previous VectorStorePort aggregated four unrelated capabilities (search, lookup, mutation, deletion) behind a single interface. That is the classic Hickey “complecting” smell — interleaving independent braids into one strand. The split below gives each orchestrator exactly the capability it needs and nothing more.
from __future__ import annotations
from collections.abc import AsyncIteratorfrom datetime import datetimefrom typing import Protocol, runtime_checkable
from core.domain.types import ( BiometricTemplate, DeleteAck, DeleteRequest, DeleteResult, Detection, EmbeddingVector, EnrolAck, EnrolEvent, DeleteEvent, Event, EnrolRequest, EnrolResult, MorphingResult, NDArray, PadResult, PartitionId, QualityScore, SearchRequest, SearchResult, SimilarityScore, SubjectId, VerifyRequest, VerifyResult,)import numpy as np
# ---------------------------------------------------------------------------# Inbound operation ports — define WHAT the application can do# ---------------------------------------------------------------------------
@runtime_checkableclass SearchPort(Protocol): """Execute a 1:N biometric search."""
async def search(self, request: SearchRequest) -> SearchResult: ...
@runtime_checkableclass VerifyPort(Protocol): """Execute a 1:1 biometric verification."""
async def verify(self, request: VerifyRequest) -> VerifyResult: ...
@runtime_checkableclass EnrolPort(Protocol): """Enrol a subject template into the gallery."""
async def enrol(self, request: EnrolRequest) -> EnrolResult: ...
@runtime_checkableclass DeletePort(Protocol): """Remove a subject template from the gallery."""
async def delete(self, request: DeleteRequest) -> DeleteResult: ...
# ---------------------------------------------------------------------------# Outbound infrastructure ports — define HOW the domain reaches the outside# ---------------------------------------------------------------------------
@runtime_checkableclass InferencePort(Protocol): """ Run the ViT feature-extraction model to produce a raw (unnormalised) EmbeddingVector from a pre-aligned face crop.
The domain does not know whether this is ONNX, TensorRT, or a stub. """
async def extract(self, aligned_face: NDArray[np.uint8]) -> EmbeddingVector: """ Parameters ---------- aligned_face: 112x112 RGB uint8 array produced by the alignment stage.
Returns ------- EmbeddingVector Raw (unnormalised) embedding with model_id and expected_dim set. The caller is responsible for normalisation via `template_normalize` before creating a `BiometricTemplate`. """ ...
@runtime_checkableclass DetectionPort(Protocol): """ Detects faces in a raw image. Returns zero or more Detection results ordered by confidence descending. The adapter wraps SCRFD_10G or equivalent via ONNX Runtime. """
async def detect(self, image_bytes: bytes) -> list[Detection]: ...
# ---------------------------------------------------------------------------# Vector store — split into three single-concern protocols## The old VectorStorePort aggregated search, lookup, and mutation behind# one interface. That complects read and write paths that have entirely# different scaling, consistency, and deployment characteristics.## Split:# VectorSearchPort — ANN search across partitions (read, scatter-gather)# VectorLookupPort — fetch a single template by ID (read, point lookup)# VectorMutationPort — enrol and delete (write, event-sourced)# ---------------------------------------------------------------------------
@runtime_checkableclass VectorSearchPort(Protocol): """ Approximate nearest-neighbour search across gallery partitions.
This is the hot read path — scatter-gather across FAISS shards. The domain does not know about sharding, IVF probes, or PQ compression. """
async def search_ann( self, query: NDArray[np.float32], partitions: frozenset[PartitionId], top_k: int, ) -> list[tuple[SubjectId, SimilarityScore]]: """ Returns up to `top_k` (subject_id, raw_score) pairs, highest score first. The domain service applies threshold filtering on top of this. """ ...
@runtime_checkableclass VectorLookupPort(Protocol): """ Point-lookup of a single enrolled template by subject ID.
Used by VerifyOrchestrator to fetch the gallery template for 1:1 comparison. Intentionally separate from search — the adapter may use a flat key-value store rather than FAISS for this. """
async def fetch(self, subject_id: SubjectId) -> BiometricTemplate | None: """Return the enrolled template, or None if not present.""" ...
@runtime_checkableclass VectorMutationPort(Protocol): """ Write path for gallery mutations: enrolment and deletion.
Returns acknowledgement values (EnrolAck, DeleteAck) that carry event_ids for correlation with the EventLogPort. The adapter decides whether this is synchronous or queue-backed. """
async def enrol(self, template: BiometricTemplate) -> EnrolAck: """Insert or replace a template for subject_id in its partition.""" ...
async def delete(self, subject_id: SubjectId, partition: PartitionId) -> DeleteAck: """Delete the template. Returns ack with deleted=True if it existed.""" ...
# ---------------------------------------------------------------------------# Quality, PAD, and morphing detection# ---------------------------------------------------------------------------
@runtime_checkableclass QualityPort(Protocol): """ Assess whether a face image meets minimum quality standards for extraction. Sits *before* InferencePort in the pipeline to reject bad probes cheaply.
Reference implementation: OFIQ (ISO/IEC 29794-5:2025), the BSI/eu-LISA open-source implementation of the ISO face image quality standard. """
async def assess(self, aligned_face: NDArray[np.uint8]) -> QualityScore: """ Parameters ---------- aligned_face: 112x112 RGB uint8 array.
Returns ------- QualityScore A float in [0, 1]. The pipeline uses this to gate extraction. """ ...
@runtime_checkableclass PadPort(Protocol): """ Unified physical-digital presentation attack detection. Covers: print, replay, mask, deepfake, face swap. Produces a measurement only — the decision threshold is policy. """ async def detect_attack(self, image: NDArray[np.uint8]) -> PadResult: ...
@runtime_checkableclass MorphingDetectionPort(Protocol): """ Morphing attack detection for document-based enrollment. Detects blended identities in passport/ID photos. """ async def detect_morphing(self, image: NDArray[np.uint8]) -> MorphingResult: ...
# ---------------------------------------------------------------------------# Event log — append-only record of gallery mutations# ---------------------------------------------------------------------------
@runtime_checkableclass EventLogPort(Protocol): """ Append-only log of gallery mutation events.
Every enrol and delete flows through this log before (or alongside) the vector store mutation. This enables: - Event sourcing: rebuild gallery state from the log - Audit: complete history of who was enrolled/deleted and when - Replication: followers consume the log to stay in sync
The adapter may be backed by Kafka, a WAL file, or an in-memory list. """
async def append(self, event: EnrolEvent | DeleteEvent) -> str: """Append an event and return its event_id.""" ...
async def read_from(self, offset: int) -> AsyncIterator[Event]: """Yield events starting from the given offset.""" ...
async def current_offset(self) -> int: """Return the current log tail offset.""" ...
# ---------------------------------------------------------------------------# Queue — inter-stage message passing# ---------------------------------------------------------------------------
@runtime_checkableclass QueuePort(Protocol): """ Minimal async queue for passing envelopes between pipeline stages.
The envelope is a plain dict — no domain types leak into the transport. This keeps pipeline stages decoupled: each stage reads the keys it needs and ignores the rest. Hickey: "queues are the mechanism that lets you get at the independent things independently."
The adapter may be an asyncio.Queue, Redis Stream, or Kafka topic. """
async def put(self, envelope: dict) -> None: """Enqueue an envelope for the next stage.""" ...
def __aiter__(self) -> AsyncIterator[dict]: """Iterate over incoming envelopes.""" ...
# ---------------------------------------------------------------------------# Clock — injectable time source# ---------------------------------------------------------------------------
@runtime_checkableclass ClockPort(Protocol): """ Injectable time source for timestamps.
Production uses `datetime.now(timezone.utc)`. Tests inject a deterministic clock that returns fixed or advancing timestamps. This eliminates time-dependent flakiness from domain tests. """
def now(self) -> datetime: """Return the current UTC datetime.""" ...Why Protocol, not ABC?
Section titled “Why Protocol, not ABC?”Abstract base classes complect interface definition with inheritance. A Protocol defines only the structural contract. An adapter written in Rust (via PyO3) or in a third-party library satisfies the protocol without ever knowing the protocol exists — Hickey would call this composing without complecting.
A note on @runtime_checkable
Section titled “A note on @runtime_checkable”All ports are decorated with @runtime_checkable to enable isinstance checks at service startup — specifically in the composition root where adapters are wired to ports. This catches wiring errors (e.g. passing a FAISSAdapter where an EventLogPort was expected) before the first request arrives.
@runtime_checkable is not intended for production dispatch. It verifies method names exist but cannot check argument types or return types at runtime. Mypy is the primary verification mechanism — structural subtyping via Protocol is checked statically during development and CI. The runtime check is a safety net for deployment, not a substitute for static analysis.
3. Pure Domain Functions
Section titled “3. Pure Domain Functions”These functions are stateless and free of I/O. They can be unit-tested with plain numpy arrays and no mocks.
from __future__ import annotations
import numpy as npfrom numpy.typing import NDArray
from core.domain.types import ( EmbeddingVector, MorphingResult, PadResult, QualityScore, SearchCandidate, SimilarityScore, SubjectId, Threshold,)
# ---------------------------------------------------------------------------# Template arithmetic# ---------------------------------------------------------------------------
def template_normalize(embedding: EmbeddingVector) -> EmbeddingVector: """ L2-normalise an EmbeddingVector onto the unit hypersphere, returning a new EmbeddingVector with the same model_id and expected_dim.
This is the single mandatory post-processing step after InferencePort.extract(). After normalisation, Inner Product == Cosine Similarity.
Parameters ---------- embedding: EmbeddingVector with raw (unnormalised) float32 data from the ViT.
Returns ------- EmbeddingVector L2-normalised copy. Input is not mutated.
Raises ------ ValueError If the vector is the zero vector (degenerate model output). """ norm = np.linalg.norm(embedding.vector) if norm < 1e-10: raise ValueError("Cannot normalise a zero vector — degenerate extraction output.") normalised = (embedding.vector / norm).astype(np.float32) return EmbeddingVector( vector=normalised, model_id=embedding.model_id, expected_dim=embedding.expected_dim, )
def cosine_similarity( a: EmbeddingVector, b: EmbeddingVector,) -> SimilarityScore: """ Compute cosine similarity between two L2-normalised EmbeddingVectors.
For L2-normalised vectors this reduces to a single dot product — mathematically identical to the Inner Product FAISS uses, so scores are directly comparable across the domain boundary.
Parameters ---------- a, b: L2-normalised EmbeddingVectors from the same model.
Returns ------- SimilarityScore Scalar in [-1.0, 1.0].
Raises ------ ValueError If the two embeddings come from different models. """ if a.model_id != b.model_id: raise ValueError( f"Cannot compare embeddings from different models: " f"{a.model_id!r} vs {b.model_id!r}" ) raw = float(np.dot(a.vector, b.vector)) # Clamp to [-1, 1] to guard against floating-point overshoot on # near-identical vectors (e.g. 1.0000001 from fp32 rounding). return SimilarityScore(max(-1.0, min(1.0, raw)))
def score_fusion( scores: tuple[SimilarityScore, ...], weights: tuple[float, ...] | None = None,) -> SimilarityScore: """ Fuse multiple per-shard or per-model similarity scores into a single score.
Default strategy: weighted average (uniform weights if none provided). Alternative strategies (max, product) can be composed by the caller — this function does not decide policy, it implements one explicit mechanism.
Parameters ---------- scores: Tuple of SimilarityScore values from individual shards or models. weights: Optional matching-length tuple of non-negative floats. Must sum to > 0.
Returns ------- SimilarityScore Fused score in [-1, 1].
Raises ------ ValueError On empty input, mismatched lengths, or zero total weight. """ if not scores: raise ValueError("score_fusion requires at least one score.") if weights is None: weights = tuple(1.0 for _ in scores) if len(weights) != len(scores): raise ValueError("scores and weights must have the same length.") total_weight = sum(weights) if total_weight < 1e-10: raise ValueError("Total weight must be > 0.") fused = sum(s * w for s, w in zip(scores, weights)) / total_weight return SimilarityScore(max(-1.0, min(1.0, fused)))
# ---------------------------------------------------------------------------# Decision functions — pure policy, no I/O# ---------------------------------------------------------------------------
def threshold_check(score: SimilarityScore, threshold: Threshold) -> bool: """ Apply the configured decision threshold.
Returns True (accept / hit) when the similarity score meets or exceeds the threshold. This is the *only* place in the domain where the binary accept/reject decision is made. The threshold value itself is passed in by the caller (the service orchestrator), keeping policy out of this function.
Parameters ---------- score: Similarity between probe and gallery template. threshold: Decision boundary configured per use-case.
Returns ------- bool True = accept (score >= threshold). """ return float(score) >= float(threshold)
def quality_gate(score: QualityScore, minimum: QualityScore) -> bool: """ Decide whether a probe image meets the minimum quality bar for extraction.
Called *before* InferencePort.extract() to reject poor-quality probes cheaply. Returns True if the image is acceptable.
Parameters ---------- score: Quality assessment from QualityPort.assess(). minimum: Configured minimum acceptable quality (e.g. 0.5 for operational use).
Returns ------- bool True = image is acceptable for extraction. """ return float(score) >= float(minimum)
def pad_gate(result: PadResult, threshold: Threshold) -> bool: """Decide whether an image passes PAD. True = bona fide (not an attack).""" return float(result.spoof_score) < float(threshold)
def morphing_gate(result: MorphingResult, threshold: Threshold) -> bool: """Decide whether an image passes morphing detection. True = genuine.""" return float(result.morph_score) < float(threshold)
# ---------------------------------------------------------------------------# Result construction helpers — pure functions over data# ---------------------------------------------------------------------------
def rank_candidates( raw_results: list[tuple[SubjectId, SimilarityScore]], threshold: Threshold, top_k: int,) -> tuple[SearchCandidate, ...]: """ Convert raw ANN results into ranked, threshold-filtered SearchCandidates.
This function encapsulates the aggregation policy for 1:N search: sort by score descending, apply threshold, cap at top_k.
Parameters ---------- raw_results: List of (subject_id, score) pairs from VectorSearchPort.search_ann(). May contain results from multiple shards already merged. threshold: Minimum score for a result to be included. top_k: Maximum number of candidates in the output.
Returns ------- tuple[SearchCandidate, ...] Immutable, ranked tuple. Empty if no result meets the threshold. """ accepted = [ (sid, score) for sid, score in raw_results if threshold_check(score, threshold) ] accepted.sort(key=lambda x: float(x[1]), reverse=True) return tuple( SearchCandidate(subject_id=sid, score=score, rank=i + 1) for i, (sid, score) in enumerate(accepted[:top_k]) )4. Service Orchestrators
Section titled “4. Service Orchestrators”Orchestrators compose ports and pure functions to implement the four inbound operations. Each orchestrator receives only the ports it needs — no god-object service that holds every adapter.
from core.domain.ops import rank_candidatesfrom core.domain.types import SearchRequest, SearchResultfrom core.ports.biometric import ClockPort, VectorSearchPort
class SearchOrchestrator: """1:N search — scatter query across partitions, rank results."""
def __init__(self, vectors: VectorSearchPort, clock: ClockPort) -> None: self._vectors = vectors self._clock = clock
async def search(self, request: SearchRequest) -> SearchResult: raw = await self._vectors.search_ann( query=request.probe.embedding.vector, partitions=request.partitions, top_k=request.top_k, ) candidates = rank_candidates(raw, request.threshold, request.top_k) return SearchResult( request_id=request.request_id, candidates=candidates, hit=len(candidates) > 0, completed_at=self._clock.now(), )from core.domain.ops import cosine_similarity, threshold_checkfrom core.domain.types import VerifyRequest, VerifyResultfrom core.ports.biometric import ClockPort, VectorLookupPort
class VerifyOrchestrator: """1:1 verification — fetch enrolled template, compute similarity."""
def __init__(self, lookup: VectorLookupPort, clock: ClockPort) -> None: self._lookup = lookup self._clock = clock
async def verify(self, request: VerifyRequest) -> VerifyResult: enrolled = await self._lookup.fetch(request.claimed_subject_id) if enrolled is None: # Subject not found — score 0, always reject from core.domain.types import SimilarityScore score = SimilarityScore(0.0) else: score = cosine_similarity(request.probe.embedding, enrolled.embedding) return VerifyResult( request_id=request.request_id, claimed_subject_id=request.claimed_subject_id, score=score, accepted=threshold_check(score, request.threshold), completed_at=self._clock.now(), )from core.domain.types import EnrolRequest, EnrolResultfrom core.ports.biometric import ClockPort, VectorMutationPort
class EnrolOrchestrator: """Enrol a template into the gallery via the mutation port."""
def __init__(self, mutations: VectorMutationPort, clock: ClockPort) -> None: self._mutations = mutations self._clock = clock
async def enrol(self, request: EnrolRequest) -> EnrolResult: ack = await self._mutations.enrol(request.template) return EnrolResult( request_id=request.request_id, subject_id=request.template.subject_id, enrolled_at=self._clock.now(), replaced=ack.replaced, )from core.domain.types import DeleteRequest, DeleteResultfrom core.ports.biometric import ClockPort, VectorMutationPort
class DeleteOrchestrator: """Delete a template from the gallery via the mutation port."""
def __init__(self, mutations: VectorMutationPort, clock: ClockPort) -> None: self._mutations = mutations self._clock = clock
async def delete(self, request: DeleteRequest) -> DeleteResult: ack = await self._mutations.delete(request.subject_id, request.partition) return DeleteResult( request_id=request.request_id, subject_id=request.subject_id, deleted=ack.deleted, completed_at=self._clock.now(), )Each orchestrator takes the narrowest possible set of ports:
| Orchestrator | Ports injected |
|---|---|
SearchOrchestrator | VectorSearchPort, ClockPort |
VerifyOrchestrator | VectorLookupPort, ClockPort |
EnrolOrchestrator | VectorMutationPort, ClockPort |
DeleteOrchestrator | VectorMutationPort, ClockPort |
A single FAISS adapter may implement all three vector ports. That is fine — the adapter composes; the domain does not know.
5. Separation of Concerns Map
Section titled “5. Separation of Concerns Map” ┌──────────────────────────────────────────┐ │ Service Orchestrators │ │ (Search, Verify, Enrol, Delete) │ │ Call ports. Compose pure functions. │ │ Inject dependencies. Own the workflow. │ └──────────────┬───────────────────────────┘ │ uses ┌─────────────────────┼─────────────────────────┐ │ │ │ ┌───────▼──────┐ ┌──────────▼───────────┐ ┌────────▼───────┐ │ domain/ │ │ ports/ │ │ adapters/ │ │ types.py │ │ biometric.py │ │ (outbound) │ │ │ │ │ │ │ │ Frozen │ │ Inbound: │ │ FAISSAdapter │ │ dataclasses │ │ SearchPort │ │ (implements │ │ Typed │ │ VerifyPort │ │ VectorSearch │ │ scalars │ │ EnrolPort │ │ + Lookup │ │ Ack types │ │ DeletePort │ │ + Mutation) │ │ Event types │ │ │ │ ONNXAdapter │ │ │ │ Vector (split): │ │ QualityAdapter│ │ domain/ │ │ VectorSearchPort │ │ PadAdapter │ │ ops.py │ │ VectorLookupPort │ │ MorphAdapter │ │ │ │ VectorMutationPort │ │ EventLogAdapt │ │ Pure fns: │ │ │ │ QueueAdapter │ │ normalize │ │ Inference: │ │ ClockAdapter │ │ cosine_sim │ │ InferencePort │ │ │ │ threshold │ │ DetectionPort │ │ Satisfy ports │ │ quality_gate│ │ PadPort │ │ via duck │ │ pad_gate │ │ MorphingDetection- │ │ typing only │ │ morph_gate │ │ Port │ │ │ │ │ │ │ │ No domain │ │ │ │ Infrastructure: │ │ types leak │ │ │ │ EventLogPort │ │ into adapters │ │ │ │ QueuePort │ │ │ │ │ │ ClockPort │ │ │ │ │ │ │ │ │ │ │ │ Python Protocol │ │ │ │ │ │ (structural, │ │ │ │ │ │ not nominal) │ │ │ └──────────────┘ └──────────────────────┘ └────────────────┘
Zero external dependencies in src/core/domain/ Zero I/O in src/core/domain/ops.py Zero mutable state anywhere in core/6. What This Design Avoids (The “No Complecting” List)
Section titled “6. What This Design Avoids (The “No Complecting” List)”| Complected Anti-Pattern | Simple Alternative Used Here |
|---|---|
Template class with .match(other) method | cosine_similarity(a, b) pure function over EmbeddingVector values |
VectorStorePort with search + lookup + mutation | Three split ports: VectorSearchPort, VectorLookupPort, VectorMutationPort |
Searcher base class -> FAISSSearcher subclass | VectorSearchPort Protocol; FAISSAdapter satisfies it structurally |
GalleryService with mutable self._cache | Stateless orchestrator calls ports; state lives in FAISS cluster |
QualityChecker that also extracts | QualityPort and InferencePort are separate protocols |
Result object with .accept() side effect | threshold_check(score, threshold) -> bool — data in, data out |
TemplateRepository with ORM model | VectorLookupPort.fetch() returns a plain BiometricTemplate value |
| Datetime strings in domain types | datetime with tzinfo — single canonical time representation |
datetime.now() called inside domain | ClockPort injected — deterministic, testable |
PadDetector that also rejects | PadPort produces measurement; pad_gate decides |
MorphingDetector coupled to PAD | Separate port, separate model, composable |
| Direct function calls between pipeline stages | QueuePort decouples stages; each consumes independently |
| Mutable gallery state without audit trail | EventLogPort provides append-only event history |
7. File Layout
Section titled “7. File Layout”src/core/├── domain/│ ├── __init__.py│ ├── types.py # All frozen dataclasses and typed scalars│ │ # (EmbeddingVector, BiometricTemplate, PadResult,│ │ # EnrolAck, DeleteAck, EnrolEvent, DeleteEvent, etc.)│ └── ops.py # All pure functions (template_normalize, cosine_similarity,│ # pad_gate, morphing_gate, rank_candidates, etc.)├── ports/│ ├── __init__.py│ └── biometric.py # Protocol definitions for all ports:│ # Inbound: SearchPort, VerifyPort, EnrolPort, DeletePort│ # Vector: VectorSearchPort, VectorLookupPort, VectorMutationPort│ # Infra: InferencePort, DetectionPort, QualityPort,│ # PadPort, MorphingDetectionPort, EventLogPort,│ # QueuePort, ClockPort└── orchestration/ ├── __init__.py ├── search.py # SearchOrchestrator(VectorSearchPort, ClockPort) ├── verify.py # VerifyOrchestrator(VectorLookupPort, ClockPort) ├── enrol.py # EnrolOrchestrator(VectorMutationPort, ClockPort) └── delete.py # DeleteOrchestrator(VectorMutationPort, ClockPort)src/core/ has zero third-party dependencies beyond numpy (needed for the vector type annotations). All I/O, all network calls, and all framework code live in src/adapters/.
8. Testing Strategy for the Domain Layer
Section titled “8. Testing Strategy for the Domain Layer”Because every function in ops.py is pure and every type in types.py is a frozen value, tests need no mocks, no fixtures, and no test databases:
# tests/unit/test_ops.py (illustrative)
import numpy as npimport pytestfrom core.domain.ops import cosine_similarity, template_normalize, threshold_checkfrom core.domain.types import EmbeddingVector, SimilarityScore, Threshold
MODEL_ID = "adaface-vit-base-v2"
def _make_embedding(vec: np.ndarray) -> EmbeddingVector: return EmbeddingVector(vector=vec, model_id=MODEL_ID, expected_dim=512)
def test_identical_vectors_score_one(): v = template_normalize(_make_embedding(np.random.randn(512).astype(np.float32))) assert cosine_similarity(v, v) == pytest.approx(1.0, abs=1e-6)
def test_orthogonal_vectors_score_zero(): a = np.zeros(512, dtype=np.float32); a[0] = 1.0 b = np.zeros(512, dtype=np.float32); b[1] = 1.0 assert cosine_similarity(_make_embedding(a), _make_embedding(b)) == pytest.approx(0.0, abs=1e-6)
def test_threshold_accept(): assert threshold_check(SimilarityScore(0.85), Threshold(0.80)) is True
def test_threshold_reject(): assert threshold_check(SimilarityScore(0.75), Threshold(0.80)) is False
def test_normalize_zero_vector_raises(): with pytest.raises(ValueError, match="zero vector"): template_normalize(_make_embedding(np.zeros(512, dtype=np.float32)))Property-based tests using Hypothesis can verify the normalisation invariant (np.linalg.norm(template_normalize(e).vector) ≈ 1.0) across thousands of random EmbeddingVector instances — no domain-specific fixtures needed.
Orchestrator tests inject stub adapters that implement the split ports and a deterministic ClockPort:
# tests/unit/test_search_orchestrator.py (illustrative)
from datetime import datetime, timezonefrom core.domain.types import SimilarityScore, SubjectId, PartitionIdfrom core.orchestration.search import SearchOrchestrator
class StubVectorSearch: """Satisfies VectorSearchPort structurally — no inheritance needed."""
async def search_ann(self, query, partitions, top_k): return [(SubjectId("S001"), SimilarityScore(0.95))]
class StubClock: """Satisfies ClockPort structurally."""
def now(self): return datetime(2025, 1, 1, tzinfo=timezone.utc)
async def test_search_returns_ranked_candidates(): orch = SearchOrchestrator(vectors=StubVectorSearch(), clock=StubClock()) # ... build SearchRequest, assert on SearchResult