File size: 2,411 Bytes
b75c637 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | """
Preprocessing module — cleans and normalizes raw text in records.
Engine contract:
run(EngineInput) -> EngineOutput
Applies basic text cleaning to each record's ``raw_text`` field:
strip whitespace, normalize unicode, collapse whitespace runs.
"""
from __future__ import annotations
import logging
import re
import unicodedata
from typing import Any, Dict, List
from engine.io_contract import EngineInput, EngineOutput, NormalizedRecord, StageStatus
logger = logging.getLogger("modules.preprocessing")
def _clean_text(text: str) -> str:
"""Basic text normalization."""
# Unicode NFKC normalization
text = unicodedata.normalize("NFKC", text)
# Strip leading/trailing whitespace
text = text.strip()
# Collapse multiple whitespace to single space
text = re.sub(r"\s+", " ", text)
return text
def run(engine_input: EngineInput) -> EngineOutput:
"""
Preprocess all records: clean ``raw_text``, normalize entity fields.
"""
try:
cleaned: List[NormalizedRecord] = []
for record in engine_input.records:
# Create a copy with cleaned text
updated = record.model_copy(update={
"raw_text": _clean_text(record.raw_text),
"entity_name": record.entity_name.strip() if record.entity_name else None,
"entity_email": record.entity_email.strip().lower() if record.entity_email else None,
"entity_domain": record.entity_domain.strip().lower() if record.entity_domain else None,
})
cleaned.append(updated)
return EngineOutput(
stage="preprocessing",
status=StageStatus.SUCCESS,
records=cleaned,
summary=f"Preprocessed {len(cleaned)} records",
)
except Exception as exc:
logger.error("Preprocessing failed: %s", exc, exc_info=True)
return EngineOutput(
stage="preprocessing",
status=StageStatus.FAILED,
error=str(exc),
)
# ---------------------------------------------------------------------------
# Legacy compatibility
# ---------------------------------------------------------------------------
def preprocess(text: str) -> str:
"""Legacy wrapper (deprecated). Use ``run()`` instead."""
return text.strip().lower()
if __name__ == "__main__":
print(preprocess(" This is RAW DATA. "))
|