mod-osint / modules /ml_analysis /ml_analysis.py
moddux's picture
deploy: HF sanitized GUI snapshot
b75c637
"""
ML Analysis module — entity extraction and text classification.
Engine contract:
run(EngineInput) -> EngineOutput
This is a lightweight placeholder that performs regex-based entity
extraction. When ML dependencies (torch, transformers, spacy) are
available, it can be extended to use real models.
"""
from __future__ import annotations
import logging
import re
from typing import Any, Dict, List, Optional
from engine.io_contract import (
EngineInput,
EngineOutput,
NormalizedRecord,
StageStatus,
)
logger = logging.getLogger("modules.ml_analysis")
# ---------------------------------------------------------------------------
# Lightweight entity extraction (no ML deps required)
# ---------------------------------------------------------------------------
_EMAIL_RE = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+")
_IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
_PHONE_RE = re.compile(r"\b\+?1?\d{9,15}\b")
_DOMAIN_RE = re.compile(r"\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b")
_HASH_RE = re.compile(r"\b[a-fA-F0-9]{32,64}\b")
def _enrich_record(record: NormalizedRecord) -> NormalizedRecord:
"""
Enrich a record by extracting entities from raw_text if not already set.
"""
text = record.raw_text
updates: Dict[str, Any] = {}
if not record.entity_email:
m = _EMAIL_RE.search(text)
if m:
updates["entity_email"] = m.group(0).lower()
if not record.entity_ip:
m = _IP_RE.search(text)
if m:
updates["entity_ip"] = m.group(0)
if not record.entity_phone:
m = _PHONE_RE.search(text)
if m:
updates["entity_phone"] = m.group(0)
if not record.entity_domain:
m = _DOMAIN_RE.search(text)
if m:
updates["entity_domain"] = m.group(0).lower()
if not record.entity_hash:
m = _HASH_RE.search(text)
if m:
updates["entity_hash"] = m.group(0).lower()
if updates:
return record.model_copy(update=updates)
return record
# ---------------------------------------------------------------------------
# Engine contract
# ---------------------------------------------------------------------------
def run(engine_input: EngineInput) -> EngineOutput:
"""
Run ML analysis / entity enrichment on all records.
"""
try:
enriched: List[NormalizedRecord] = []
enrichment_count = 0
for record in engine_input.records:
new_record = _enrich_record(record)
if new_record is not record:
enrichment_count += 1
enriched.append(new_record)
return EngineOutput(
stage="analysis",
status=StageStatus.SUCCESS,
records=enriched,
summary=f"Analyzed {len(enriched)} records, enriched {enrichment_count}",
metadata={"enriched_count": enrichment_count},
)
except Exception as exc:
logger.error("ML analysis failed: %s", exc, exc_info=True)
return EngineOutput(
stage="analysis",
status=StageStatus.FAILED,
error=str(exc),
)
# ---------------------------------------------------------------------------
# Legacy compatibility
# ---------------------------------------------------------------------------
def analyze(data: Any) -> Dict[str, Any]:
"""Legacy wrapper (deprecated). Use ``run()`` instead."""
return {"input": data, "prediction": "none"}
if __name__ == "__main__":
print(analyze("test"))