File size: 3,559 Bytes
b75c637
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
ML Analysis module — entity extraction and text classification.

Engine contract:
    run(EngineInput) -> EngineOutput

This is a lightweight placeholder that performs regex-based entity
extraction.  When ML dependencies (torch, transformers, spacy) are
available, it can be extended to use real models.
"""

from __future__ import annotations

import logging
import re
from typing import Any, Dict, List, Optional

from engine.io_contract import (
    EngineInput,
    EngineOutput,
    NormalizedRecord,
    StageStatus,
)

logger = logging.getLogger("modules.ml_analysis")

# ---------------------------------------------------------------------------
# Lightweight entity extraction (no ML deps required)
# ---------------------------------------------------------------------------

_EMAIL_RE = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+")
_IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
_PHONE_RE = re.compile(r"\b\+?1?\d{9,15}\b")
_DOMAIN_RE = re.compile(r"\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b")
_HASH_RE = re.compile(r"\b[a-fA-F0-9]{32,64}\b")


def _enrich_record(record: NormalizedRecord) -> NormalizedRecord:
    """
    Enrich a record by extracting entities from raw_text if not already set.
    """
    text = record.raw_text
    updates: Dict[str, Any] = {}

    if not record.entity_email:
        m = _EMAIL_RE.search(text)
        if m:
            updates["entity_email"] = m.group(0).lower()

    if not record.entity_ip:
        m = _IP_RE.search(text)
        if m:
            updates["entity_ip"] = m.group(0)

    if not record.entity_phone:
        m = _PHONE_RE.search(text)
        if m:
            updates["entity_phone"] = m.group(0)

    if not record.entity_domain:
        m = _DOMAIN_RE.search(text)
        if m:
            updates["entity_domain"] = m.group(0).lower()

    if not record.entity_hash:
        m = _HASH_RE.search(text)
        if m:
            updates["entity_hash"] = m.group(0).lower()

    if updates:
        return record.model_copy(update=updates)
    return record


# ---------------------------------------------------------------------------
# Engine contract
# ---------------------------------------------------------------------------

def run(engine_input: EngineInput) -> EngineOutput:
    """
    Run ML analysis / entity enrichment on all records.
    """
    try:
        enriched: List[NormalizedRecord] = []
        enrichment_count = 0

        for record in engine_input.records:
            new_record = _enrich_record(record)
            if new_record is not record:
                enrichment_count += 1
            enriched.append(new_record)

        return EngineOutput(
            stage="analysis",
            status=StageStatus.SUCCESS,
            records=enriched,
            summary=f"Analyzed {len(enriched)} records, enriched {enrichment_count}",
            metadata={"enriched_count": enrichment_count},
        )
    except Exception as exc:
        logger.error("ML analysis failed: %s", exc, exc_info=True)
        return EngineOutput(
            stage="analysis",
            status=StageStatus.FAILED,
            error=str(exc),
        )


# ---------------------------------------------------------------------------
# Legacy compatibility
# ---------------------------------------------------------------------------

def analyze(data: Any) -> Dict[str, Any]:
    """Legacy wrapper (deprecated). Use ``run()`` instead."""
    return {"input": data, "prediction": "none"}


if __name__ == "__main__":
    print(analyze("test"))