PII-Scan / scanner /engine.py
Adisri99's picture
Upload 9 files
0cb7559 verified
import os
import uuid
from typing import Dict, List
import spacy
from .csv_export import export_findings_csv
from .detectors import merge_findings, ner_findings, redact_value, regex_findings
from .parsers import parse_content
from .zip_handler import ExtractedFile
SPACY_MODEL = os.getenv("SPACY_MODEL", "en_core_web_trf")
class PIIScannerEngine:
def __init__(self):
self.model_loaded = False
self.nlp = None
self._load_model()
def _load_model(self):
try:
self.nlp = spacy.load(SPACY_MODEL)
self.model_loaded = True
except Exception:
self.nlp = spacy.blank("en")
self.model_loaded = False
def scan_files(self, files: List[ExtractedFile], redact: bool = True) -> Dict:
scan_id = str(uuid.uuid4())
findings: List[Dict] = []
file_summaries: List[Dict] = []
for item in files:
file_type, records = parse_content(item.filename, item.content)
file_findings: List[Dict] = []
for record in records:
text = record["text"] or ""
context = text if len(text) <= 500 else text[:500]
detected = regex_findings(text, context, record.get("field"), record.get("line"), record.get("column"))
if self.nlp is not None and text.strip():
detected.extend(ner_findings(self.nlp, text, context, record.get("field"), record.get("line"), record.get("column")))
for finding in merge_findings(detected):
matched_value = finding["matched_value"]
redacted_value = redact_value(matched_value) if redact else matched_value
result = {
"scan_id": scan_id,
"filename": item.filename,
"file_type": file_type,
"pii_type": finding["pii_type"],
"matched_value": matched_value,
"redacted_value": redacted_value,
"confidence": round(float(finding["confidence"]), 2),
"line": finding.get("line"),
"column": finding.get("column"),
"field": finding.get("field"),
}
findings.append(result)
file_findings.append(result)
file_summaries.append(
{
"filename": item.filename,
"file_type": file_type,
"record_count": len(records),
"finding_count": len(file_findings),
}
)
csv_report = export_findings_csv(findings)
summary = self._build_summary(files, findings)
return {
"scan_id": scan_id,
"summary": summary,
"files": file_summaries,
"findings": findings,
"csv": csv_report,
"model_loaded": self.model_loaded,
}
def _build_summary(self, files: List[ExtractedFile], findings: List[Dict]) -> Dict:
pii_counts: Dict[str, int] = {}
for finding in findings:
pii_counts[finding["pii_type"]] = pii_counts.get(finding["pii_type"], 0) + 1
return {
"uploaded_file_count": len(files),
"total_findings": len(findings),
"pii_type_counts": pii_counts,
}