File size: 4,588 Bytes
b75c637 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | """
IO Contract — typed schemas for the MOD-OSINT engine.
Every pipeline module must accept ``EngineInput`` and return ``EngineOutput``.
The engine orchestrator builds ``RunContext`` which carries paths and DB handles.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class FileType(str, Enum):
CSV = "csv"
JSON = "json"
TXT = "txt"
HTML = "html"
LOG = "log"
UNKNOWN = "unknown"
class StageStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
# ---------------------------------------------------------------------------
# Input specs
# ---------------------------------------------------------------------------
class InputFile(BaseModel):
"""Descriptor for a single ingested file."""
path: Path
file_type: FileType = FileType.UNKNOWN
size_bytes: int = 0
sha256: str = ""
class InputSpec(BaseModel):
"""Describes the full set of input data for a pipeline run."""
input_dir: Path
files: List[InputFile] = Field(default_factory=list)
# ---------------------------------------------------------------------------
# Normalized records
# ---------------------------------------------------------------------------
class NormalizedRecord(BaseModel):
"""
Canonical record produced by the normalization stage.
Every record gets a deterministic ``row_id`` and optional entity-linking
keys so downstream modules can join/correlate across sources.
"""
row_id: str = Field(default_factory=lambda: uuid.uuid4().hex[:12])
source_file: str = ""
source_type: FileType = FileType.UNKNOWN
timestamp: Optional[datetime] = None
entity_name: Optional[str] = None
entity_phone: Optional[str] = None
entity_email: Optional[str] = None
entity_ip: Optional[str] = None
entity_domain: Optional[str] = None
entity_hash: Optional[str] = None
raw_text: str = ""
extra: Dict[str, Any] = Field(default_factory=dict)
# ---------------------------------------------------------------------------
# Artifacts
# ---------------------------------------------------------------------------
class Artifact(BaseModel):
"""A single output artifact produced by a module."""
name: str
path: Path
mime_type: str = "application/octet-stream"
description: str = ""
# ---------------------------------------------------------------------------
# Engine IO (module contract)
# ---------------------------------------------------------------------------
class EngineInput(BaseModel):
"""
Standard input passed to every pipeline module's ``run()`` function.
Modules receive the normalized records from prior stages plus the
run context (paths, config).
"""
run_id: str = Field(default_factory=lambda: datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6])
input_spec: InputSpec
records: List[NormalizedRecord] = Field(default_factory=list)
config: Dict[str, Any] = Field(default_factory=dict)
run_dir: Path = Path("runs/default")
previous_artifacts: List[Artifact] = Field(default_factory=list)
class EngineOutput(BaseModel):
"""
Standard output returned by every pipeline module's ``run()`` function.
"""
stage: str
status: StageStatus = StageStatus.SUCCESS
records: List[NormalizedRecord] = Field(default_factory=list)
artifacts: List[Artifact] = Field(default_factory=list)
summary: str = ""
error: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
# ---------------------------------------------------------------------------
# Run context (internal, built by orchestrator)
# ---------------------------------------------------------------------------
class RunContext(BaseModel):
"""
Internal context object built by the orchestrator for a single run.
Carries the run directory, DB path, and accumulated state.
"""
run_id: str
run_dir: Path
db_path: Path
input_spec: InputSpec
config: Dict[str, Any] = Field(default_factory=dict)
stage_results: Dict[str, EngineOutput] = Field(default_factory=dict)
class Config:
arbitrary_types_allowed = True
|