import os import uuid from typing import Dict, List import spacy from .csv_export import export_findings_csv from .detectors import merge_findings, ner_findings, redact_value, regex_findings from .parsers import parse_content from .zip_handler import ExtractedFile SPACY_MODEL = os.getenv("SPACY_MODEL", "en_core_web_trf") class PIIScannerEngine: def __init__(self): self.model_loaded = False self.nlp = None self._load_model() def _load_model(self): try: self.nlp = spacy.load(SPACY_MODEL) self.model_loaded = True except Exception: self.nlp = spacy.blank("en") self.model_loaded = False def scan_files(self, files: List[ExtractedFile], redact: bool = True) -> Dict: scan_id = str(uuid.uuid4()) findings: List[Dict] = [] file_summaries: List[Dict] = [] for item in files: file_type, records = parse_content(item.filename, item.content) file_findings: List[Dict] = [] for record in records: text = record["text"] or "" context = text if len(text) <= 500 else text[:500] detected = regex_findings(text, context, record.get("field"), record.get("line"), record.get("column")) if self.nlp is not None and text.strip(): detected.extend(ner_findings(self.nlp, text, context, record.get("field"), record.get("line"), record.get("column"))) for finding in merge_findings(detected): matched_value = finding["matched_value"] redacted_value = redact_value(matched_value) if redact else matched_value result = { "scan_id": scan_id, "filename": item.filename, "file_type": file_type, "pii_type": finding["pii_type"], "matched_value": matched_value, "redacted_value": redacted_value, "confidence": round(float(finding["confidence"]), 2), "line": finding.get("line"), "column": finding.get("column"), "field": finding.get("field"), } findings.append(result) file_findings.append(result) file_summaries.append( { "filename": item.filename, "file_type": file_type, "record_count": len(records), "finding_count": len(file_findings), } ) csv_report = export_findings_csv(findings) summary = self._build_summary(files, findings) return { "scan_id": scan_id, "summary": summary, "files": file_summaries, "findings": findings, "csv": csv_report, "model_loaded": self.model_loaded, } def _build_summary(self, files: List[ExtractedFile], findings: List[Dict]) -> Dict: pii_counts: Dict[str, int] = {} for finding in findings: pii_counts[finding["pii_type"]] = pii_counts.get(finding["pii_type"], 0) + 1 return { "uploaded_file_count": len(files), "total_findings": len(findings), "pii_type_counts": pii_counts, }