File size: 3,452 Bytes
0cb7559
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
import uuid
from typing import Dict, List

import spacy

from .csv_export import export_findings_csv
from .detectors import merge_findings, ner_findings, redact_value, regex_findings
from .parsers import parse_content
from .zip_handler import ExtractedFile

SPACY_MODEL = os.getenv("SPACY_MODEL", "en_core_web_trf")


class PIIScannerEngine:
    def __init__(self):
        self.model_loaded = False
        self.nlp = None
        self._load_model()

    def _load_model(self):
        try:
            self.nlp = spacy.load(SPACY_MODEL)
            self.model_loaded = True
        except Exception:
            self.nlp = spacy.blank("en")
            self.model_loaded = False

    def scan_files(self, files: List[ExtractedFile], redact: bool = True) -> Dict:
        scan_id = str(uuid.uuid4())
        findings: List[Dict] = []
        file_summaries: List[Dict] = []

        for item in files:
            file_type, records = parse_content(item.filename, item.content)
            file_findings: List[Dict] = []
            for record in records:
                text = record["text"] or ""
                context = text if len(text) <= 500 else text[:500]
                detected = regex_findings(text, context, record.get("field"), record.get("line"), record.get("column"))
                if self.nlp is not None and text.strip():
                    detected.extend(ner_findings(self.nlp, text, context, record.get("field"), record.get("line"), record.get("column")))
                for finding in merge_findings(detected):
                    matched_value = finding["matched_value"]
                    redacted_value = redact_value(matched_value) if redact else matched_value
                    result = {
                        "scan_id": scan_id,
                        "filename": item.filename,
                        "file_type": file_type,
                        "pii_type": finding["pii_type"],
                        "matched_value": matched_value,
                        "redacted_value": redacted_value,
                        "confidence": round(float(finding["confidence"]), 2),
                        "line": finding.get("line"),
                        "column": finding.get("column"),
                        "field": finding.get("field"),
                    }
                    findings.append(result)
                    file_findings.append(result)
            file_summaries.append(
                {
                    "filename": item.filename,
                    "file_type": file_type,
                    "record_count": len(records),
                    "finding_count": len(file_findings),
                }
            )

        csv_report = export_findings_csv(findings)
        summary = self._build_summary(files, findings)
        return {
            "scan_id": scan_id,
            "summary": summary,
            "files": file_summaries,
            "findings": findings,
            "csv": csv_report,
            "model_loaded": self.model_loaded,
        }

    def _build_summary(self, files: List[ExtractedFile], findings: List[Dict]) -> Dict:
        pii_counts: Dict[str, int] = {}
        for finding in findings:
            pii_counts[finding["pii_type"]] = pii_counts.get(finding["pii_type"], 0) + 1
        return {
            "uploaded_file_count": len(files),
            "total_findings": len(findings),
            "pii_type_counts": pii_counts,
        }