File size: 2,411 Bytes
b75c637
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""
Preprocessing module — cleans and normalizes raw text in records.

Engine contract:
    run(EngineInput) -> EngineOutput

Applies basic text cleaning to each record's ``raw_text`` field:
strip whitespace, normalize unicode, collapse whitespace runs.
"""

from __future__ import annotations

import logging
import re
import unicodedata
from typing import Any, Dict, List

from engine.io_contract import EngineInput, EngineOutput, NormalizedRecord, StageStatus

logger = logging.getLogger("modules.preprocessing")


def _clean_text(text: str) -> str:
    """Basic text normalization."""
    # Unicode NFKC normalization
    text = unicodedata.normalize("NFKC", text)
    # Strip leading/trailing whitespace
    text = text.strip()
    # Collapse multiple whitespace to single space
    text = re.sub(r"\s+", " ", text)
    return text


def run(engine_input: EngineInput) -> EngineOutput:
    """
    Preprocess all records: clean ``raw_text``, normalize entity fields.
    """
    try:
        cleaned: List[NormalizedRecord] = []
        for record in engine_input.records:
            # Create a copy with cleaned text
            updated = record.model_copy(update={
                "raw_text": _clean_text(record.raw_text),
                "entity_name": record.entity_name.strip() if record.entity_name else None,
                "entity_email": record.entity_email.strip().lower() if record.entity_email else None,
                "entity_domain": record.entity_domain.strip().lower() if record.entity_domain else None,
            })
            cleaned.append(updated)

        return EngineOutput(
            stage="preprocessing",
            status=StageStatus.SUCCESS,
            records=cleaned,
            summary=f"Preprocessed {len(cleaned)} records",
        )
    except Exception as exc:
        logger.error("Preprocessing failed: %s", exc, exc_info=True)
        return EngineOutput(
            stage="preprocessing",
            status=StageStatus.FAILED,
            error=str(exc),
        )


# ---------------------------------------------------------------------------
# Legacy compatibility
# ---------------------------------------------------------------------------

def preprocess(text: str) -> str:
    """Legacy wrapper (deprecated). Use ``run()`` instead."""
    return text.strip().lower()


if __name__ == "__main__":
    print(preprocess("  This is RAW DATA.  "))