| import os |
| import uuid |
| from typing import Dict, List |
|
|
| import spacy |
|
|
| from .csv_export import export_findings_csv |
| from .detectors import merge_findings, ner_findings, redact_value, regex_findings |
| from .parsers import parse_content |
| from .zip_handler import ExtractedFile |
|
|
| SPACY_MODEL = os.getenv("SPACY_MODEL", "en_core_web_trf") |
|
|
|
|
| class PIIScannerEngine: |
| def __init__(self): |
| self.model_loaded = False |
| self.nlp = None |
| self._load_model() |
|
|
| def _load_model(self): |
| try: |
| self.nlp = spacy.load(SPACY_MODEL) |
| self.model_loaded = True |
| except Exception: |
| self.nlp = spacy.blank("en") |
| self.model_loaded = False |
|
|
| def scan_files(self, files: List[ExtractedFile], redact: bool = True) -> Dict: |
| scan_id = str(uuid.uuid4()) |
| findings: List[Dict] = [] |
| file_summaries: List[Dict] = [] |
|
|
| for item in files: |
| file_type, records = parse_content(item.filename, item.content) |
| file_findings: List[Dict] = [] |
| for record in records: |
| text = record["text"] or "" |
| context = text if len(text) <= 500 else text[:500] |
| detected = regex_findings(text, context, record.get("field"), record.get("line"), record.get("column")) |
| if self.nlp is not None and text.strip(): |
| detected.extend(ner_findings(self.nlp, text, context, record.get("field"), record.get("line"), record.get("column"))) |
| for finding in merge_findings(detected): |
| matched_value = finding["matched_value"] |
| redacted_value = redact_value(matched_value) if redact else matched_value |
| result = { |
| "scan_id": scan_id, |
| "filename": item.filename, |
| "file_type": file_type, |
| "pii_type": finding["pii_type"], |
| "matched_value": matched_value, |
| "redacted_value": redacted_value, |
| "confidence": round(float(finding["confidence"]), 2), |
| "line": finding.get("line"), |
| "column": finding.get("column"), |
| "field": finding.get("field"), |
| } |
| findings.append(result) |
| file_findings.append(result) |
| file_summaries.append( |
| { |
| "filename": item.filename, |
| "file_type": file_type, |
| "record_count": len(records), |
| "finding_count": len(file_findings), |
| } |
| ) |
|
|
| csv_report = export_findings_csv(findings) |
| summary = self._build_summary(files, findings) |
| return { |
| "scan_id": scan_id, |
| "summary": summary, |
| "files": file_summaries, |
| "findings": findings, |
| "csv": csv_report, |
| "model_loaded": self.model_loaded, |
| } |
|
|
| def _build_summary(self, files: List[ExtractedFile], findings: List[Dict]) -> Dict: |
| pii_counts: Dict[str, int] = {} |
| for finding in findings: |
| pii_counts[finding["pii_type"]] = pii_counts.get(finding["pii_type"], 0) + 1 |
| return { |
| "uploaded_file_count": len(files), |
| "total_findings": len(findings), |
| "pii_type_counts": pii_counts, |
| } |
|
|