| """ |
| ML Analysis module — entity extraction and text classification. |
| |
| Engine contract: |
| run(EngineInput) -> EngineOutput |
| |
| This is a lightweight placeholder that performs regex-based entity |
| extraction. When ML dependencies (torch, transformers, spacy) are |
| available, it can be extended to use real models. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import re |
| from typing import Any, Dict, List, Optional |
|
|
| from engine.io_contract import ( |
| EngineInput, |
| EngineOutput, |
| NormalizedRecord, |
| StageStatus, |
| ) |
|
|
| logger = logging.getLogger("modules.ml_analysis") |
|
|
| |
| |
| |
|
|
| _EMAIL_RE = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+") |
| _IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") |
| _PHONE_RE = re.compile(r"\b\+?1?\d{9,15}\b") |
| _DOMAIN_RE = re.compile(r"\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b") |
| _HASH_RE = re.compile(r"\b[a-fA-F0-9]{32,64}\b") |
|
|
|
|
| def _enrich_record(record: NormalizedRecord) -> NormalizedRecord: |
| """ |
| Enrich a record by extracting entities from raw_text if not already set. |
| """ |
| text = record.raw_text |
| updates: Dict[str, Any] = {} |
|
|
| if not record.entity_email: |
| m = _EMAIL_RE.search(text) |
| if m: |
| updates["entity_email"] = m.group(0).lower() |
|
|
| if not record.entity_ip: |
| m = _IP_RE.search(text) |
| if m: |
| updates["entity_ip"] = m.group(0) |
|
|
| if not record.entity_phone: |
| m = _PHONE_RE.search(text) |
| if m: |
| updates["entity_phone"] = m.group(0) |
|
|
| if not record.entity_domain: |
| m = _DOMAIN_RE.search(text) |
| if m: |
| updates["entity_domain"] = m.group(0).lower() |
|
|
| if not record.entity_hash: |
| m = _HASH_RE.search(text) |
| if m: |
| updates["entity_hash"] = m.group(0).lower() |
|
|
| if updates: |
| return record.model_copy(update=updates) |
| return record |
|
|
|
|
| |
| |
| |
|
|
| def run(engine_input: EngineInput) -> EngineOutput: |
| """ |
| Run ML analysis / entity enrichment on all records. |
| """ |
| try: |
| enriched: List[NormalizedRecord] = [] |
| enrichment_count = 0 |
|
|
| for record in engine_input.records: |
| new_record = _enrich_record(record) |
| if new_record is not record: |
| enrichment_count += 1 |
| enriched.append(new_record) |
|
|
| return EngineOutput( |
| stage="analysis", |
| status=StageStatus.SUCCESS, |
| records=enriched, |
| summary=f"Analyzed {len(enriched)} records, enriched {enrichment_count}", |
| metadata={"enriched_count": enrichment_count}, |
| ) |
| except Exception as exc: |
| logger.error("ML analysis failed: %s", exc, exc_info=True) |
| return EngineOutput( |
| stage="analysis", |
| status=StageStatus.FAILED, |
| error=str(exc), |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def analyze(data: Any) -> Dict[str, Any]: |
| """Legacy wrapper (deprecated). Use ``run()`` instead.""" |
| return {"input": data, "prediction": "none"} |
|
|
|
|
| if __name__ == "__main__": |
| print(analyze("test")) |
|
|