mod-osint / engine /io_contract.py
moddux's picture
deploy: HF sanitized GUI snapshot
b75c637
"""
IO Contract — typed schemas for the MOD-OSINT engine.
Every pipeline module must accept ``EngineInput`` and return ``EngineOutput``.
The engine orchestrator builds ``RunContext`` which carries paths and DB handles.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class FileType(str, Enum):
CSV = "csv"
JSON = "json"
TXT = "txt"
HTML = "html"
LOG = "log"
UNKNOWN = "unknown"
class StageStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
# ---------------------------------------------------------------------------
# Input specs
# ---------------------------------------------------------------------------
class InputFile(BaseModel):
"""Descriptor for a single ingested file."""
path: Path
file_type: FileType = FileType.UNKNOWN
size_bytes: int = 0
sha256: str = ""
class InputSpec(BaseModel):
"""Describes the full set of input data for a pipeline run."""
input_dir: Path
files: List[InputFile] = Field(default_factory=list)
# ---------------------------------------------------------------------------
# Normalized records
# ---------------------------------------------------------------------------
class NormalizedRecord(BaseModel):
"""
Canonical record produced by the normalization stage.
Every record gets a deterministic ``row_id`` and optional entity-linking
keys so downstream modules can join/correlate across sources.
"""
row_id: str = Field(default_factory=lambda: uuid.uuid4().hex[:12])
source_file: str = ""
source_type: FileType = FileType.UNKNOWN
timestamp: Optional[datetime] = None
entity_name: Optional[str] = None
entity_phone: Optional[str] = None
entity_email: Optional[str] = None
entity_ip: Optional[str] = None
entity_domain: Optional[str] = None
entity_hash: Optional[str] = None
raw_text: str = ""
extra: Dict[str, Any] = Field(default_factory=dict)
# ---------------------------------------------------------------------------
# Artifacts
# ---------------------------------------------------------------------------
class Artifact(BaseModel):
"""A single output artifact produced by a module."""
name: str
path: Path
mime_type: str = "application/octet-stream"
description: str = ""
# ---------------------------------------------------------------------------
# Engine IO (module contract)
# ---------------------------------------------------------------------------
class EngineInput(BaseModel):
"""
Standard input passed to every pipeline module's ``run()`` function.
Modules receive the normalized records from prior stages plus the
run context (paths, config).
"""
run_id: str = Field(default_factory=lambda: datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6])
input_spec: InputSpec
records: List[NormalizedRecord] = Field(default_factory=list)
config: Dict[str, Any] = Field(default_factory=dict)
run_dir: Path = Path("runs/default")
previous_artifacts: List[Artifact] = Field(default_factory=list)
class EngineOutput(BaseModel):
"""
Standard output returned by every pipeline module's ``run()`` function.
"""
stage: str
status: StageStatus = StageStatus.SUCCESS
records: List[NormalizedRecord] = Field(default_factory=list)
artifacts: List[Artifact] = Field(default_factory=list)
summary: str = ""
error: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
# ---------------------------------------------------------------------------
# Run context (internal, built by orchestrator)
# ---------------------------------------------------------------------------
class RunContext(BaseModel):
"""
Internal context object built by the orchestrator for a single run.
Carries the run directory, DB path, and accumulated state.
"""
run_id: str
run_dir: Path
db_path: Path
input_spec: InputSpec
config: Dict[str, Any] = Field(default_factory=dict)
stage_results: Dict[str, EngineOutput] = Field(default_factory=dict)
class Config:
arbitrary_types_allowed = True