File size: 4,588 Bytes
b75c637
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
IO Contract — typed schemas for the MOD-OSINT engine.

Every pipeline module must accept ``EngineInput`` and return ``EngineOutput``.
The engine orchestrator builds ``RunContext`` which carries paths and DB handles.
"""

from __future__ import annotations

import uuid
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field


# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------

class FileType(str, Enum):
    CSV = "csv"
    JSON = "json"
    TXT = "txt"
    HTML = "html"
    LOG = "log"
    UNKNOWN = "unknown"


class StageStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"


# ---------------------------------------------------------------------------
# Input specs
# ---------------------------------------------------------------------------

class InputFile(BaseModel):
    """Descriptor for a single ingested file."""
    path: Path
    file_type: FileType = FileType.UNKNOWN
    size_bytes: int = 0
    sha256: str = ""


class InputSpec(BaseModel):
    """Describes the full set of input data for a pipeline run."""
    input_dir: Path
    files: List[InputFile] = Field(default_factory=list)


# ---------------------------------------------------------------------------
# Normalized records
# ---------------------------------------------------------------------------

class NormalizedRecord(BaseModel):
    """
    Canonical record produced by the normalization stage.

    Every record gets a deterministic ``row_id`` and optional entity-linking
    keys so downstream modules can join/correlate across sources.
    """
    row_id: str = Field(default_factory=lambda: uuid.uuid4().hex[:12])
    source_file: str = ""
    source_type: FileType = FileType.UNKNOWN
    timestamp: Optional[datetime] = None
    entity_name: Optional[str] = None
    entity_phone: Optional[str] = None
    entity_email: Optional[str] = None
    entity_ip: Optional[str] = None
    entity_domain: Optional[str] = None
    entity_hash: Optional[str] = None
    raw_text: str = ""
    extra: Dict[str, Any] = Field(default_factory=dict)


# ---------------------------------------------------------------------------
# Artifacts
# ---------------------------------------------------------------------------

class Artifact(BaseModel):
    """A single output artifact produced by a module."""
    name: str
    path: Path
    mime_type: str = "application/octet-stream"
    description: str = ""


# ---------------------------------------------------------------------------
# Engine IO (module contract)
# ---------------------------------------------------------------------------

class EngineInput(BaseModel):
    """
    Standard input passed to every pipeline module's ``run()`` function.

    Modules receive the normalized records from prior stages plus the
    run context (paths, config).
    """
    run_id: str = Field(default_factory=lambda: datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6])
    input_spec: InputSpec
    records: List[NormalizedRecord] = Field(default_factory=list)
    config: Dict[str, Any] = Field(default_factory=dict)
    run_dir: Path = Path("runs/default")
    previous_artifacts: List[Artifact] = Field(default_factory=list)


class EngineOutput(BaseModel):
    """
    Standard output returned by every pipeline module's ``run()`` function.
    """
    stage: str
    status: StageStatus = StageStatus.SUCCESS
    records: List[NormalizedRecord] = Field(default_factory=list)
    artifacts: List[Artifact] = Field(default_factory=list)
    summary: str = ""
    error: Optional[str] = None
    metadata: Dict[str, Any] = Field(default_factory=dict)


# ---------------------------------------------------------------------------
# Run context (internal, built by orchestrator)
# ---------------------------------------------------------------------------

class RunContext(BaseModel):
    """
    Internal context object built by the orchestrator for a single run.
    Carries the run directory, DB path, and accumulated state.
    """
    run_id: str
    run_dir: Path
    db_path: Path
    input_spec: InputSpec
    config: Dict[str, Any] = Field(default_factory=dict)
    stage_results: Dict[str, EngineOutput] = Field(default_factory=dict)

    class Config:
        arbitrary_types_allowed = True