""" ML Analysis module — entity extraction and text classification. Engine contract: run(EngineInput) -> EngineOutput This is a lightweight placeholder that performs regex-based entity extraction. When ML dependencies (torch, transformers, spacy) are available, it can be extended to use real models. """ from __future__ import annotations import logging import re from typing import Any, Dict, List, Optional from engine.io_contract import ( EngineInput, EngineOutput, NormalizedRecord, StageStatus, ) logger = logging.getLogger("modules.ml_analysis") # --------------------------------------------------------------------------- # Lightweight entity extraction (no ML deps required) # --------------------------------------------------------------------------- _EMAIL_RE = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+") _IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") _PHONE_RE = re.compile(r"\b\+?1?\d{9,15}\b") _DOMAIN_RE = re.compile(r"\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b") _HASH_RE = re.compile(r"\b[a-fA-F0-9]{32,64}\b") def _enrich_record(record: NormalizedRecord) -> NormalizedRecord: """ Enrich a record by extracting entities from raw_text if not already set. """ text = record.raw_text updates: Dict[str, Any] = {} if not record.entity_email: m = _EMAIL_RE.search(text) if m: updates["entity_email"] = m.group(0).lower() if not record.entity_ip: m = _IP_RE.search(text) if m: updates["entity_ip"] = m.group(0) if not record.entity_phone: m = _PHONE_RE.search(text) if m: updates["entity_phone"] = m.group(0) if not record.entity_domain: m = _DOMAIN_RE.search(text) if m: updates["entity_domain"] = m.group(0).lower() if not record.entity_hash: m = _HASH_RE.search(text) if m: updates["entity_hash"] = m.group(0).lower() if updates: return record.model_copy(update=updates) return record # --------------------------------------------------------------------------- # Engine contract # --------------------------------------------------------------------------- def run(engine_input: EngineInput) -> EngineOutput: """ Run ML analysis / entity enrichment on all records. """ try: enriched: List[NormalizedRecord] = [] enrichment_count = 0 for record in engine_input.records: new_record = _enrich_record(record) if new_record is not record: enrichment_count += 1 enriched.append(new_record) return EngineOutput( stage="analysis", status=StageStatus.SUCCESS, records=enriched, summary=f"Analyzed {len(enriched)} records, enriched {enrichment_count}", metadata={"enriched_count": enrichment_count}, ) except Exception as exc: logger.error("ML analysis failed: %s", exc, exc_info=True) return EngineOutput( stage="analysis", status=StageStatus.FAILED, error=str(exc), ) # --------------------------------------------------------------------------- # Legacy compatibility # --------------------------------------------------------------------------- def analyze(data: Any) -> Dict[str, Any]: """Legacy wrapper (deprecated). Use ``run()`` instead.""" return {"input": data, "prediction": "none"} if __name__ == "__main__": print(analyze("test"))