| """ |
| Normalize — canonical schema builder + entity linking keys. |
| |
| Reads raw files from the input directory, detects file types, parses |
| them into ``NormalizedRecord`` instances with deterministic row IDs |
| and entity-linking fields. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import csv |
| import hashlib |
| import io |
| import json |
| import logging |
| import re |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| from engine.io_contract import FileType, InputFile, InputSpec, NormalizedRecord |
|
|
| logger = logging.getLogger("engine.normalize") |
|
|
| |
| |
| |
|
|
| _EXT_MAP: Dict[str, FileType] = { |
| ".csv": FileType.CSV, |
| ".json": FileType.JSON, |
| ".txt": FileType.TXT, |
| ".html": FileType.HTML, |
| ".htm": FileType.HTML, |
| ".log": FileType.LOG, |
| } |
|
|
|
|
| def detect_file_type(path: Path) -> FileType: |
| """Detect file type from extension.""" |
| return _EXT_MAP.get(path.suffix.lower(), FileType.UNKNOWN) |
|
|
|
|
| def _file_sha256(path: Path) -> str: |
| """Compute SHA-256 hex digest of a file.""" |
| h = hashlib.sha256() |
| with open(path, "rb") as f: |
| for chunk in iter(lambda: f.read(8192), b""): |
| h.update(chunk) |
| return h.hexdigest() |
|
|
|
|
| |
| |
| |
|
|
| def build_input_spec(input_dir: Path) -> InputSpec: |
| """ |
| Scan *input_dir* for supported files and return an ``InputSpec``. |
| """ |
| input_dir = Path(input_dir) |
| files: List[InputFile] = [] |
| if input_dir.is_file(): |
| |
| ft = detect_file_type(input_dir) |
| files.append(InputFile( |
| path=input_dir, |
| file_type=ft, |
| size_bytes=input_dir.stat().st_size, |
| sha256=_file_sha256(input_dir), |
| )) |
| input_dir = input_dir.parent |
| else: |
| for p in sorted(input_dir.iterdir()): |
| if p.is_file() and not p.name.startswith("."): |
| ft = detect_file_type(p) |
| files.append(InputFile( |
| path=p, |
| file_type=ft, |
| size_bytes=p.stat().st_size, |
| sha256=_file_sha256(p), |
| )) |
| logger.info("InputSpec: %d files from %s", len(files), input_dir) |
| return InputSpec(input_dir=input_dir, files=files) |
|
|
|
|
| |
| |
| |
|
|
| def _make_row_id(source_file: str, index: int, content_hash: str) -> str: |
| """ |
| Deterministic row ID = first 12 hex chars of SHA-256(source_file + index + content). |
| """ |
| raw = f"{source_file}:{index}:{content_hash}" |
| return hashlib.sha256(raw.encode()).hexdigest()[:12] |
|
|
|
|
| |
| |
| |
|
|
| _EMAIL_RE = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+") |
| _IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") |
| _PHONE_RE = re.compile(r"\b\+?1?\d{9,15}\b") |
| _DOMAIN_RE = re.compile(r"\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b") |
| _HASH_RE = re.compile(r"\b[a-fA-F0-9]{32,64}\b") |
|
|
|
|
| def _extract_entities(text: str) -> Dict[str, Optional[str]]: |
| """Extract first occurrence of common entity types from text.""" |
| email_m = _EMAIL_RE.search(text) |
| ip_m = _IP_RE.search(text) |
| phone_m = _PHONE_RE.search(text) |
| domain_m = _DOMAIN_RE.search(text) |
| hash_m = _HASH_RE.search(text) |
| return { |
| "entity_email": email_m.group(0) if email_m else None, |
| "entity_ip": ip_m.group(0) if ip_m else None, |
| "entity_phone": phone_m.group(0) if phone_m else None, |
| "entity_domain": domain_m.group(0) if domain_m else None, |
| "entity_hash": hash_m.group(0) if hash_m else None, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def _parse_csv(path: Path) -> List[NormalizedRecord]: |
| records: List[NormalizedRecord] = [] |
| with open(path, newline="", encoding="utf-8", errors="replace") as f: |
| reader = csv.DictReader(f) |
| for idx, row in enumerate(reader): |
| text = json.dumps(row, ensure_ascii=False) |
| content_hash = hashlib.sha256(text.encode()).hexdigest()[:16] |
| entities = _extract_entities(text) |
| |
| name = row.get("name") or row.get("Name") or row.get("entity_name") |
| phone = row.get("phone") or row.get("Phone") or row.get("entity_phone") |
| email = row.get("email") or row.get("Email") or row.get("entity_email") |
| records.append(NormalizedRecord( |
| row_id=_make_row_id(str(path), idx, content_hash), |
| source_file=str(path.name), |
| source_type=FileType.CSV, |
| entity_name=name or entities.get("entity_name"), |
| entity_phone=phone or entities.get("entity_phone"), |
| entity_email=email or entities.get("entity_email"), |
| entity_ip=entities.get("entity_ip"), |
| entity_domain=entities.get("entity_domain"), |
| entity_hash=entities.get("entity_hash"), |
| raw_text=text, |
| extra=dict(row), |
| )) |
| return records |
|
|
|
|
| def _parse_json(path: Path) -> List[NormalizedRecord]: |
| records: List[NormalizedRecord] = [] |
| with open(path, encoding="utf-8", errors="replace") as f: |
| data = json.load(f) |
| |
| items = data if isinstance(data, list) else [data] |
| for idx, item in enumerate(items): |
| text = json.dumps(item, ensure_ascii=False) if isinstance(item, dict) else str(item) |
| content_hash = hashlib.sha256(text.encode()).hexdigest()[:16] |
| entities = _extract_entities(text) |
| extra = item if isinstance(item, dict) else {"value": item} |
| records.append(NormalizedRecord( |
| row_id=_make_row_id(str(path), idx, content_hash), |
| source_file=str(path.name), |
| source_type=FileType.JSON, |
| entity_name=extra.get("name") if isinstance(extra, dict) else None, |
| entity_email=entities.get("entity_email"), |
| entity_ip=entities.get("entity_ip"), |
| entity_phone=entities.get("entity_phone"), |
| entity_domain=entities.get("entity_domain"), |
| entity_hash=entities.get("entity_hash"), |
| raw_text=text, |
| extra=extra, |
| )) |
| return records |
|
|
|
|
| def _parse_text(path: Path, file_type: FileType = FileType.TXT) -> List[NormalizedRecord]: |
| """Parse plain text / HTML / log files — one record per non-empty line.""" |
| records: List[NormalizedRecord] = [] |
| with open(path, encoding="utf-8", errors="replace") as f: |
| lines = f.readlines() |
| for idx, line in enumerate(lines): |
| line = line.strip() |
| if not line: |
| continue |
| content_hash = hashlib.sha256(line.encode()).hexdigest()[:16] |
| entities = _extract_entities(line) |
| records.append(NormalizedRecord( |
| row_id=_make_row_id(str(path), idx, content_hash), |
| source_file=str(path.name), |
| source_type=file_type, |
| entity_email=entities.get("entity_email"), |
| entity_ip=entities.get("entity_ip"), |
| entity_phone=entities.get("entity_phone"), |
| entity_domain=entities.get("entity_domain"), |
| entity_hash=entities.get("entity_hash"), |
| raw_text=line, |
| )) |
| return records |
|
|
|
|
| |
| |
| |
|
|
| def normalize_files(input_spec: InputSpec) -> List[NormalizedRecord]: |
| """ |
| Parse all files in *input_spec* and return a flat list of |
| ``NormalizedRecord`` instances. |
| """ |
| all_records: List[NormalizedRecord] = [] |
| for f in input_spec.files: |
| try: |
| if f.file_type == FileType.CSV: |
| recs = _parse_csv(f.path) |
| elif f.file_type == FileType.JSON: |
| recs = _parse_json(f.path) |
| elif f.file_type in (FileType.TXT, FileType.LOG): |
| recs = _parse_text(f.path, f.file_type) |
| elif f.file_type == FileType.HTML: |
| recs = _parse_text(f.path, FileType.HTML) |
| else: |
| recs = _parse_text(f.path, FileType.UNKNOWN) |
| logger.info("Parsed %d records from %s (%s)", len(recs), f.path.name, f.file_type.value) |
| all_records.extend(recs) |
| except Exception as exc: |
| logger.error("Failed to parse %s: %s", f.path, exc) |
| logger.info("Total normalized records: %d", len(all_records)) |
| return all_records |
|
|