""" Normalize — canonical schema builder + entity linking keys. Reads raw files from the input directory, detects file types, parses them into ``NormalizedRecord`` instances with deterministic row IDs and entity-linking fields. """ from __future__ import annotations import csv import hashlib import io import json import logging import re from pathlib import Path from typing import Any, Dict, List, Optional from engine.io_contract import FileType, InputFile, InputSpec, NormalizedRecord logger = logging.getLogger("engine.normalize") # --------------------------------------------------------------------------- # File-type detection # --------------------------------------------------------------------------- _EXT_MAP: Dict[str, FileType] = { ".csv": FileType.CSV, ".json": FileType.JSON, ".txt": FileType.TXT, ".html": FileType.HTML, ".htm": FileType.HTML, ".log": FileType.LOG, } def detect_file_type(path: Path) -> FileType: """Detect file type from extension.""" return _EXT_MAP.get(path.suffix.lower(), FileType.UNKNOWN) def _file_sha256(path: Path) -> str: """Compute SHA-256 hex digest of a file.""" h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() # --------------------------------------------------------------------------- # Build InputSpec from a directory # --------------------------------------------------------------------------- def build_input_spec(input_dir: Path) -> InputSpec: """ Scan *input_dir* for supported files and return an ``InputSpec``. """ input_dir = Path(input_dir) files: List[InputFile] = [] if input_dir.is_file(): # Single file mode ft = detect_file_type(input_dir) files.append(InputFile( path=input_dir, file_type=ft, size_bytes=input_dir.stat().st_size, sha256=_file_sha256(input_dir), )) input_dir = input_dir.parent else: for p in sorted(input_dir.iterdir()): if p.is_file() and not p.name.startswith("."): ft = detect_file_type(p) files.append(InputFile( path=p, file_type=ft, size_bytes=p.stat().st_size, sha256=_file_sha256(p), )) logger.info("InputSpec: %d files from %s", len(files), input_dir) return InputSpec(input_dir=input_dir, files=files) # --------------------------------------------------------------------------- # Deterministic row ID # --------------------------------------------------------------------------- def _make_row_id(source_file: str, index: int, content_hash: str) -> str: """ Deterministic row ID = first 12 hex chars of SHA-256(source_file + index + content). """ raw = f"{source_file}:{index}:{content_hash}" return hashlib.sha256(raw.encode()).hexdigest()[:12] # --------------------------------------------------------------------------- # Entity extraction helpers (lightweight, no ML) # --------------------------------------------------------------------------- _EMAIL_RE = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+") _IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") _PHONE_RE = re.compile(r"\b\+?1?\d{9,15}\b") _DOMAIN_RE = re.compile(r"\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b") _HASH_RE = re.compile(r"\b[a-fA-F0-9]{32,64}\b") def _extract_entities(text: str) -> Dict[str, Optional[str]]: """Extract first occurrence of common entity types from text.""" email_m = _EMAIL_RE.search(text) ip_m = _IP_RE.search(text) phone_m = _PHONE_RE.search(text) domain_m = _DOMAIN_RE.search(text) hash_m = _HASH_RE.search(text) return { "entity_email": email_m.group(0) if email_m else None, "entity_ip": ip_m.group(0) if ip_m else None, "entity_phone": phone_m.group(0) if phone_m else None, "entity_domain": domain_m.group(0) if domain_m else None, "entity_hash": hash_m.group(0) if hash_m else None, } # --------------------------------------------------------------------------- # Parsers per file type # --------------------------------------------------------------------------- def _parse_csv(path: Path) -> List[NormalizedRecord]: records: List[NormalizedRecord] = [] with open(path, newline="", encoding="utf-8", errors="replace") as f: reader = csv.DictReader(f) for idx, row in enumerate(reader): text = json.dumps(row, ensure_ascii=False) content_hash = hashlib.sha256(text.encode()).hexdigest()[:16] entities = _extract_entities(text) # Try to pick up common column names name = row.get("name") or row.get("Name") or row.get("entity_name") phone = row.get("phone") or row.get("Phone") or row.get("entity_phone") email = row.get("email") or row.get("Email") or row.get("entity_email") records.append(NormalizedRecord( row_id=_make_row_id(str(path), idx, content_hash), source_file=str(path.name), source_type=FileType.CSV, entity_name=name or entities.get("entity_name"), entity_phone=phone or entities.get("entity_phone"), entity_email=email or entities.get("entity_email"), entity_ip=entities.get("entity_ip"), entity_domain=entities.get("entity_domain"), entity_hash=entities.get("entity_hash"), raw_text=text, extra=dict(row), )) return records def _parse_json(path: Path) -> List[NormalizedRecord]: records: List[NormalizedRecord] = [] with open(path, encoding="utf-8", errors="replace") as f: data = json.load(f) # Handle both single object and list of objects items = data if isinstance(data, list) else [data] for idx, item in enumerate(items): text = json.dumps(item, ensure_ascii=False) if isinstance(item, dict) else str(item) content_hash = hashlib.sha256(text.encode()).hexdigest()[:16] entities = _extract_entities(text) extra = item if isinstance(item, dict) else {"value": item} records.append(NormalizedRecord( row_id=_make_row_id(str(path), idx, content_hash), source_file=str(path.name), source_type=FileType.JSON, entity_name=extra.get("name") if isinstance(extra, dict) else None, entity_email=entities.get("entity_email"), entity_ip=entities.get("entity_ip"), entity_phone=entities.get("entity_phone"), entity_domain=entities.get("entity_domain"), entity_hash=entities.get("entity_hash"), raw_text=text, extra=extra, )) return records def _parse_text(path: Path, file_type: FileType = FileType.TXT) -> List[NormalizedRecord]: """Parse plain text / HTML / log files — one record per non-empty line.""" records: List[NormalizedRecord] = [] with open(path, encoding="utf-8", errors="replace") as f: lines = f.readlines() for idx, line in enumerate(lines): line = line.strip() if not line: continue content_hash = hashlib.sha256(line.encode()).hexdigest()[:16] entities = _extract_entities(line) records.append(NormalizedRecord( row_id=_make_row_id(str(path), idx, content_hash), source_file=str(path.name), source_type=file_type, entity_email=entities.get("entity_email"), entity_ip=entities.get("entity_ip"), entity_phone=entities.get("entity_phone"), entity_domain=entities.get("entity_domain"), entity_hash=entities.get("entity_hash"), raw_text=line, )) return records # --------------------------------------------------------------------------- # Main normalization entry point # --------------------------------------------------------------------------- def normalize_files(input_spec: InputSpec) -> List[NormalizedRecord]: """ Parse all files in *input_spec* and return a flat list of ``NormalizedRecord`` instances. """ all_records: List[NormalizedRecord] = [] for f in input_spec.files: try: if f.file_type == FileType.CSV: recs = _parse_csv(f.path) elif f.file_type == FileType.JSON: recs = _parse_json(f.path) elif f.file_type in (FileType.TXT, FileType.LOG): recs = _parse_text(f.path, f.file_type) elif f.file_type == FileType.HTML: recs = _parse_text(f.path, FileType.HTML) else: recs = _parse_text(f.path, FileType.UNKNOWN) logger.info("Parsed %d records from %s (%s)", len(recs), f.path.name, f.file_type.value) all_records.extend(recs) except Exception as exc: logger.error("Failed to parse %s: %s", f.path, exc) logger.info("Total normalized records: %d", len(all_records)) return all_records