Distopia22 commited on
Commit
d8473b6
·
1 Parent(s): d37f590

Add lightweight regex-based PII removal for file uploads

Browse files
Dockerfile CHANGED
@@ -5,7 +5,7 @@ WORKDIR /app
5
  # Copy requirements
6
  COPY requirements.txt .
7
 
8
- # Install dependencies (now includes spaCy model)
9
  RUN pip install --no-cache-dir --upgrade pip && \
10
  pip install --no-cache-dir -r requirements.txt
11
 
 
5
  # Copy requirements
6
  COPY requirements.txt .
7
 
8
+ # Install dependencies
9
  RUN pip install --no-cache-dir --upgrade pip && \
10
  pip install --no-cache-dir -r requirements.txt
11
 
requirements.txt CHANGED
@@ -3,8 +3,4 @@ uvicorn==0.24.0
3
  python-dotenv==1.0.0
4
  groq==0.11.0
5
  pydantic==2.5.0
6
- python-multipart==0.0.6
7
- presidio-analyzer==2.2.354
8
- presidio-anonymizer==2.2.354
9
- spacy==3.7.2
10
- en-core-web-sm @ https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl
 
3
  python-dotenv==1.0.0
4
  groq==0.11.0
5
  pydantic==2.5.0
6
+ python-multipart==0.0.6
 
 
 
 
src/api/routes.py CHANGED
@@ -20,7 +20,6 @@ async def analyze_provider_notes(request: ProviderNotesRequest):
20
  try:
21
  logger.info("Received coding request")
22
 
23
- # Get provider notes from request
24
  provider_notes = request.provider_notes
25
 
26
  if not provider_notes or len(provider_notes.strip()) < 10:
@@ -29,12 +28,10 @@ async def analyze_provider_notes(request: ProviderNotesRequest):
29
  detail="Provider notes must be at least 10 characters long"
30
  )
31
 
32
- # Process through Groq service
33
  result = await groq_service.analyze_provider_notes(provider_notes)
34
 
35
  logger.info("Successfully processed coding request")
36
 
37
- # Return response matching CodingResponse model
38
  return CodingResponse(
39
  cpt_codes=result.get("CPT", []),
40
  cpt_explanation=result.get("CPT_explanation", ""),
@@ -52,7 +49,7 @@ async def analyze_provider_notes(request: ProviderNotesRequest):
52
  )
53
 
54
 
55
- # UPDATED ENDPOINT - File Upload with PII Removal
56
  @router.post("/upload-file", response_model=FileUploadResponse)
57
  async def upload_provider_notes_file(file: UploadFile = File(...)):
58
  """
@@ -60,7 +57,7 @@ async def upload_provider_notes_file(file: UploadFile = File(...)):
60
 
61
  This endpoint:
62
  1. Extracts text from uploaded TXT file
63
- 2. Automatically detects and removes patient personal information (PII)
64
  3. Processes sanitized text through LLM
65
  4. Returns ICD-10 and CPT codes
66
 
@@ -73,10 +70,10 @@ async def upload_provider_notes_file(file: UploadFile = File(...)):
73
  try:
74
  logger.info(f"📁 Received file upload request: {file.filename}")
75
 
76
- # Step 1: Extract text from file with automatic PII removal
77
  extraction_result = await file_service.extract_text_from_file(
78
  file=file,
79
- remove_pii=True # Always remove PII for safety
80
  )
81
 
82
  extracted_text = extraction_result["text"]
@@ -87,7 +84,7 @@ async def upload_provider_notes_file(file: UploadFile = File(...)):
87
  logger.info(f"✅ Extracted {text_length} characters from {filename}")
88
 
89
  if pii_info["pii_removed"]:
90
- logger.info(f"🔒 Removed {pii_info['pii_count']} PII entities before processing")
91
 
92
  # Step 2: Process sanitized text through Groq LLM
93
  coding_result = await groq_service.analyze_provider_notes(extracted_text)
 
20
  try:
21
  logger.info("Received coding request")
22
 
 
23
  provider_notes = request.provider_notes
24
 
25
  if not provider_notes or len(provider_notes.strip()) < 10:
 
28
  detail="Provider notes must be at least 10 characters long"
29
  )
30
 
 
31
  result = await groq_service.analyze_provider_notes(provider_notes)
32
 
33
  logger.info("Successfully processed coding request")
34
 
 
35
  return CodingResponse(
36
  cpt_codes=result.get("CPT", []),
37
  cpt_explanation=result.get("CPT_explanation", ""),
 
49
  )
50
 
51
 
52
+ # FILE UPLOAD ENDPOINT WITH REGEX-BASED PII REMOVAL
53
  @router.post("/upload-file", response_model=FileUploadResponse)
54
  async def upload_provider_notes_file(file: UploadFile = File(...)):
55
  """
 
57
 
58
  This endpoint:
59
  1. Extracts text from uploaded TXT file
60
+ 2. Automatically detects and removes patient personal information using regex patterns
61
  3. Processes sanitized text through LLM
62
  4. Returns ICD-10 and CPT codes
63
 
 
70
  try:
71
  logger.info(f"📁 Received file upload request: {file.filename}")
72
 
73
+ # Step 1: Extract text from file with automatic regex-based PII removal
74
  extraction_result = await file_service.extract_text_from_file(
75
  file=file,
76
+ remove_pii=True # Always remove PII using regex patterns
77
  )
78
 
79
  extracted_text = extraction_result["text"]
 
84
  logger.info(f"✅ Extracted {text_length} characters from {filename}")
85
 
86
  if pii_info["pii_removed"]:
87
+ logger.info(f"🔒 Removed {pii_info['pii_count']} PII entities using regex before processing")
88
 
89
  # Step 2: Process sanitized text through Groq LLM
90
  coding_result = await groq_service.analyze_provider_notes(extracted_text)
src/services/file_service.py CHANGED
@@ -2,13 +2,13 @@ from fastapi import UploadFile, HTTPException
2
  import os
3
  from typing import Dict
4
  import logging
5
- from services.pii_detector import pii_detector
6
 
7
  logger = logging.getLogger(__name__)
8
 
9
 
10
  class FileService:
11
- """Service to handle file uploads and text extraction"""
12
 
13
  ALLOWED_EXTENSIONS = {'.txt'}
14
  MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
@@ -24,11 +24,9 @@ class FileService:
24
  Raises:
25
  HTTPException: If file is invalid
26
  """
27
- # Check if file exists
28
  if not file:
29
  raise HTTPException(status_code=400, detail="No file provided")
30
 
31
- # Check file extension
32
  file_ext = os.path.splitext(file.filename)[1].lower()
33
  if file_ext not in FileService.ALLOWED_EXTENSIONS:
34
  raise HTTPException(
@@ -39,7 +37,7 @@ class FileService:
39
  @staticmethod
40
  async def extract_text_from_file(file: UploadFile, remove_pii: bool = True) -> Dict[str, any]:
41
  """
42
- Extract text content from uploaded file and optionally remove PII
43
 
44
  Args:
45
  file: Uploaded file object
@@ -88,9 +86,9 @@ class FileService:
88
  detail="Extracted text is too short. Please provide more detailed provider notes"
89
  )
90
 
91
- logger.info(f" Successfully extracted {len(text)} characters from {file.filename}")
92
 
93
- # Remove PII if requested
94
  pii_info = {
95
  "pii_removed": False,
96
  "pii_count": 0,
@@ -98,18 +96,18 @@ class FileService:
98
  }
99
 
100
  if remove_pii:
101
- logger.info("🔒 Removing PII from extracted text...")
102
- pii_result = pii_detector.remove_pii(text)
103
 
104
- text = pii_result["sanitized_text"]
105
  pii_info = {
106
- "pii_removed": pii_result["was_pii_removed"],
107
- "pii_count": pii_result["pii_count"],
108
- "pii_details": pii_result["pii_detected"]
109
  }
110
 
111
- if pii_result["was_pii_removed"]:
112
- logger.info(f"✅ Removed {pii_result['pii_count']} PII entities")
113
  else:
114
  logger.info("✅ No PII detected in text")
115
 
 
2
  import os
3
  from typing import Dict
4
  import logging
5
+ from services.regex_pii_remover import regex_pii_remover
6
 
7
  logger = logging.getLogger(__name__)
8
 
9
 
10
  class FileService:
11
+ """Service to handle file uploads and text extraction with PII removal"""
12
 
13
  ALLOWED_EXTENSIONS = {'.txt'}
14
  MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
 
24
  Raises:
25
  HTTPException: If file is invalid
26
  """
 
27
  if not file:
28
  raise HTTPException(status_code=400, detail="No file provided")
29
 
 
30
  file_ext = os.path.splitext(file.filename)[1].lower()
31
  if file_ext not in FileService.ALLOWED_EXTENSIONS:
32
  raise HTTPException(
 
37
  @staticmethod
38
  async def extract_text_from_file(file: UploadFile, remove_pii: bool = True) -> Dict[str, any]:
39
  """
40
+ Extract text content from uploaded file and optionally remove PII using regex
41
 
42
  Args:
43
  file: Uploaded file object
 
86
  detail="Extracted text is too short. Please provide more detailed provider notes"
87
  )
88
 
89
+ logger.info(f"📄 Successfully extracted {len(text)} characters from {file.filename}")
90
 
91
+ # Remove PII using regex if requested
92
  pii_info = {
93
  "pii_removed": False,
94
  "pii_count": 0,
 
96
  }
97
 
98
  if remove_pii:
99
+ logger.info("🔒 Removing PII from extracted text using regex patterns...")
100
+ pii_result = regex_pii_remover.sanitize_provider_notes(text)
101
 
102
+ text = pii_result["sanitized_notes"]
103
  pii_info = {
104
+ "pii_removed": pii_result["was_pii_found"],
105
+ "pii_count": pii_result["pii_removed_count"],
106
+ "pii_details": pii_result["pii_details"]
107
  }
108
 
109
+ if pii_result["was_pii_found"]:
110
+ logger.info(f"✅ Removed {pii_result['pii_removed_count']} PII entities using regex")
111
  else:
112
  logger.info("✅ No PII detected in text")
113
 
src/services/pii_detector.py DELETED
@@ -1,197 +0,0 @@
1
- from presidio_analyzer import AnalyzerEngine
2
- from presidio_anonymizer import AnonymizerEngine
3
- from typing import Dict, List
4
- import re
5
- import logging
6
-
7
- logger = logging.getLogger(__name__)
8
-
9
-
10
- class PIIDetector:
11
- """Service to detect and remove Personal Identifiable Information from medical notes"""
12
-
13
- def __init__(self):
14
- """Initialize PII detection engines"""
15
- try:
16
- self.analyzer = AnalyzerEngine()
17
- self.anonymizer = AnonymizerEngine()
18
-
19
- # Entities to detect (common in medical notes)
20
- self.entities_to_detect = [
21
- "PERSON", # Names
22
- "EMAIL_ADDRESS", # Email
23
- "PHONE_NUMBER", # Phone numbers
24
- "US_SSN", # Social Security Number
25
- "CREDIT_CARD", # Credit card numbers
26
- "US_DRIVER_LICENSE", # Driver's license
27
- "LOCATION", # Addresses, cities
28
- "DATE_TIME", # Birth dates, appointment dates
29
- "US_PASSPORT", # Passport numbers
30
- "MEDICAL_LICENSE", # Medical license numbers
31
- "IP_ADDRESS", # IP addresses
32
- "URL" # URLs
33
- ]
34
-
35
- logger.info("✅ PII Detector initialized successfully")
36
- except Exception as e:
37
- logger.error(f"❌ Failed to initialize PII Detector: {str(e)}")
38
- raise
39
-
40
- def detect_pii(self, text: str) -> List[Dict]:
41
- """
42
- Detect PII entities in text
43
-
44
- Args:
45
- text: Input text to analyze
46
-
47
- Returns:
48
- List of detected PII entities with details
49
- """
50
- try:
51
- results = self.analyzer.analyze(
52
- text=text,
53
- entities=self.entities_to_detect,
54
- language='en'
55
- )
56
-
57
- pii_findings = []
58
- for result in results:
59
- pii_findings.append({
60
- "entity_type": result.entity_type,
61
- "start": result.start,
62
- "end": result.end,
63
- "score": result.score,
64
- "text": text[result.start:result.end]
65
- })
66
-
67
- logger.info(f"🔍 Detected {len(pii_findings)} PII entities")
68
- return pii_findings
69
-
70
- except Exception as e:
71
- logger.error(f"❌ Error detecting PII: {str(e)}")
72
- return []
73
-
74
- def remove_pii(self, text: str) -> Dict[str, any]:
75
- """
76
- Remove PII from text while preserving medical information
77
-
78
- Args:
79
- text: Input text containing potential PII
80
-
81
- Returns:
82
- Dictionary with sanitized text and PII removal report
83
- """
84
- try:
85
- # Step 1: Detect PII
86
- analyzer_results = self.analyzer.analyze(
87
- text=text,
88
- entities=self.entities_to_detect,
89
- language='en'
90
- )
91
-
92
- if not analyzer_results:
93
- logger.info("✅ No PII detected in text")
94
- return {
95
- "sanitized_text": text,
96
- "pii_detected": [],
97
- "pii_count": 0,
98
- "was_pii_removed": False
99
- }
100
-
101
- # Step 2: Anonymize detected PII
102
- anonymized_result = self.anonymizer.anonymize(
103
- text=text,
104
- analyzer_results=analyzer_results
105
- )
106
-
107
- sanitized_text = anonymized_result.text
108
-
109
- # Step 3: Additional pattern-based cleaning for medical notes
110
- # Replace common medical note PII patterns
111
- sanitized_text = self._clean_medical_patterns(sanitized_text)
112
-
113
- # Step 4: Collect PII detection details
114
- pii_detected = []
115
- for result in analyzer_results:
116
- pii_detected.append({
117
- "entity_type": result.entity_type,
118
- "start": result.start,
119
- "end": result.end,
120
- "score": result.score
121
- })
122
-
123
- logger.info(f"✅ Removed {len(pii_detected)} PII entities from text")
124
-
125
- return {
126
- "sanitized_text": sanitized_text,
127
- "pii_detected": pii_detected,
128
- "pii_count": len(pii_detected),
129
- "was_pii_removed": True
130
- }
131
-
132
- except Exception as e:
133
- logger.error(f"❌ Error removing PII: {str(e)}")
134
- # Return original text if PII removal fails
135
- return {
136
- "sanitized_text": text,
137
- "pii_detected": [],
138
- "pii_count": 0,
139
- "was_pii_removed": False,
140
- "error": str(e)
141
- }
142
-
143
- def _clean_medical_patterns(self, text: str) -> str:
144
- """
145
- Clean common medical note PII patterns that might be missed
146
-
147
- Args:
148
- text: Text to clean
149
-
150
- Returns:
151
- Cleaned text
152
- """
153
- # Pattern 1: "Patient: <NAME>" or "Pt: <NAME>"
154
- text = re.sub(
155
- r'(Patient|Pt|Patient Name):\s*<[A-Z_]+>',
156
- r'\1: [REDACTED]',
157
- text,
158
- flags=re.IGNORECASE
159
- )
160
-
161
- # Pattern 2: "DOB: <DATE>"
162
- text = re.sub(
163
- r'(DOB|Date of Birth|Birth Date):\s*<[A-Z_]+>',
164
- r'\1: [REDACTED]',
165
- text,
166
- flags=re.IGNORECASE
167
- )
168
-
169
- # Pattern 3: "Address: <LOCATION>"
170
- text = re.sub(
171
- r'(Address|Addr|Home Address):\s*<[A-Z_]+>',
172
- r'\1: [REDACTED]',
173
- text,
174
- flags=re.IGNORECASE
175
- )
176
-
177
- # Pattern 4: "Phone: <PHONE_NUMBER>"
178
- text = re.sub(
179
- r'(Phone|Tel|Telephone|Cell|Mobile):\s*<[A-Z_]+>',
180
- r'\1: [REDACTED]',
181
- text,
182
- flags=re.IGNORECASE
183
- )
184
-
185
- # Pattern 5: "MRN: <NUMBER>" (Medical Record Number)
186
- text = re.sub(
187
- r'(MRN|Medical Record Number|Record #):\s*<[A-Z_]+>',
188
- r'\1: [REDACTED]',
189
- text,
190
- flags=re.IGNORECASE
191
- )
192
-
193
- return text
194
-
195
-
196
- # Singleton instance
197
- pii_detector = PIIDetector()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/services/regex_pii_remover.py ADDED
@@ -0,0 +1,229 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+ import logging
3
+ from typing import Dict, List, Tuple
4
+
5
+ logger = logging.getLogger(__name__)
6
+
7
+
8
+ class RegexPIIRemover:
9
+ """
10
+ Lightweight regex-based PII detection and removal service
11
+ Detects and removes common personal information from medical notes
12
+ """
13
+
14
+ def __init__(self):
15
+ """Initialize regex patterns for PII detection"""
16
+
17
+ # Pattern definitions with descriptions
18
+ self.patterns = {
19
+ 'PHONE': {
20
+ 'pattern': r'\b(?:\+?1[-.]?)?\(?([0-9]{3})\)?[-.]?([0-9]{3})[-.]?([0-9]{4})\b',
21
+ 'replacement': '[PHONE_REDACTED]',
22
+ 'description': 'Phone numbers'
23
+ },
24
+ 'EMAIL': {
25
+ 'pattern': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
26
+ 'replacement': '[EMAIL_REDACTED]',
27
+ 'description': 'Email addresses'
28
+ },
29
+ 'SSN': {
30
+ 'pattern': r'\b\d{3}-\d{2}-\d{4}\b',
31
+ 'replacement': '[SSN_REDACTED]',
32
+ 'description': 'Social Security Numbers'
33
+ },
34
+ 'DATE_OF_BIRTH': {
35
+ 'pattern': r'\b(0?[1-9]|1[0-2])[/-](0?[1-9]|[12][0-9]|3[01])[/-](19|20)\d{2}\b',
36
+ 'replacement': '[DOB_REDACTED]',
37
+ 'description': 'Dates of birth'
38
+ },
39
+ 'ZIP_CODE': {
40
+ 'pattern': r'\b\d{5}(?:-\d{4})?\b',
41
+ 'replacement': '[ZIP_REDACTED]',
42
+ 'description': 'ZIP codes'
43
+ },
44
+ 'CREDIT_CARD': {
45
+ 'pattern': r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
46
+ 'replacement': '[CARD_REDACTED]',
47
+ 'description': 'Credit card numbers'
48
+ },
49
+ 'IP_ADDRESS': {
50
+ 'pattern': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
51
+ 'replacement': '[IP_REDACTED]',
52
+ 'description': 'IP addresses'
53
+ },
54
+ 'STREET_ADDRESS': {
55
+ 'pattern': r'\b\d{1,5}\s+([A-Z][a-z]+\s*){1,3}(Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Way)\b',
56
+ 'replacement': '[ADDRESS_REDACTED]',
57
+ 'description': 'Street addresses'
58
+ }
59
+ }
60
+
61
+ # Medical note specific patterns
62
+ self.medical_patterns = {
63
+ 'PATIENT_NAME_LABEL': {
64
+ 'pattern': r'(Patient|Pt|Patient Name|Name):\s*([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)',
65
+ 'replacement': r'\1: [NAME_REDACTED]',
66
+ 'description': 'Patient names after labels'
67
+ },
68
+ 'DOB_LABEL': {
69
+ 'pattern': r'(DOB|Date of Birth|Birth Date|Birthdate):\s*[\d/\-]+',
70
+ 'replacement': r'\1: [DOB_REDACTED]',
71
+ 'description': 'DOB after labels'
72
+ },
73
+ 'PHONE_LABEL': {
74
+ 'pattern': r'(Phone|Tel|Telephone|Cell|Mobile|Contact):\s*[\d\s\-\(\)\.]+',
75
+ 'replacement': r'\1: [PHONE_REDACTED]',
76
+ 'description': 'Phone numbers after labels'
77
+ },
78
+ 'ADDRESS_LABEL': {
79
+ 'pattern': r'(Address|Addr|Home Address|Mailing Address):\s*[^\n]+',
80
+ 'replacement': r'\1: [ADDRESS_REDACTED]',
81
+ 'description': 'Addresses after labels'
82
+ },
83
+ 'MRN_LABEL': {
84
+ 'pattern': r'(MRN|Medical Record Number|Record #|Patient ID|ID):\s*[\w\d\-]+',
85
+ 'replacement': r'\1: [MRN_REDACTED]',
86
+ 'description': 'Medical record numbers'
87
+ },
88
+ 'GUARDIAN_INFO': {
89
+ 'pattern': r'(Guardian|Emergency Contact|Next of Kin):\s*[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*',
90
+ 'replacement': r'\1: [CONTACT_REDACTED]',
91
+ 'description': 'Guardian/emergency contact names'
92
+ }
93
+ }
94
+
95
+ logger.info("✅ Regex PII Remover initialized with pattern-based detection")
96
+
97
+ def detect_pii(self, text: str) -> List[Dict]:
98
+ """
99
+ Detect PII entities in text using regex patterns
100
+
101
+ Args:
102
+ text: Input text to analyze
103
+
104
+ Returns:
105
+ List of detected PII entities with details
106
+ """
107
+ findings = []
108
+
109
+ # Check general patterns
110
+ for entity_type, config in self.patterns.items():
111
+ matches = re.finditer(config['pattern'], text)
112
+ for match in matches:
113
+ findings.append({
114
+ 'entity_type': entity_type,
115
+ 'text': match.group(),
116
+ 'start': match.start(),
117
+ 'end': match.end(),
118
+ 'description': config['description']
119
+ })
120
+
121
+ # Check medical-specific patterns
122
+ for entity_type, config in self.medical_patterns.items():
123
+ matches = re.finditer(config['pattern'], text, re.IGNORECASE)
124
+ for match in matches:
125
+ findings.append({
126
+ 'entity_type': entity_type,
127
+ 'text': match.group(),
128
+ 'start': match.start(),
129
+ 'end': match.end(),
130
+ 'description': config['description']
131
+ })
132
+
133
+ logger.info(f"🔍 Detected {len(findings)} PII entities using regex patterns")
134
+ return findings
135
+
136
+ def remove_pii(self, text: str) -> Dict[str, any]:
137
+ """
138
+ Remove PII from text using regex patterns
139
+
140
+ Args:
141
+ text: Input text containing potential PII
142
+
143
+ Returns:
144
+ Dictionary with sanitized text and PII removal report
145
+ """
146
+ try:
147
+ original_text = text
148
+ sanitized_text = text
149
+ total_replacements = 0
150
+ replacement_details = []
151
+
152
+ # Apply general PII patterns
153
+ for entity_type, config in self.patterns.items():
154
+ matches = list(re.finditer(config['pattern'], sanitized_text))
155
+ if matches:
156
+ count = len(matches)
157
+ total_replacements += count
158
+ replacement_details.append({
159
+ 'type': entity_type,
160
+ 'count': count,
161
+ 'description': config['description']
162
+ })
163
+ sanitized_text = re.sub(config['pattern'], config['replacement'], sanitized_text)
164
+ logger.info(f" 🔒 Removed {count} {config['description']}")
165
+
166
+ # Apply medical-specific patterns
167
+ for entity_type, config in self.medical_patterns.items():
168
+ matches = list(re.finditer(config['pattern'], sanitized_text, re.IGNORECASE))
169
+ if matches:
170
+ count = len(matches)
171
+ total_replacements += count
172
+ replacement_details.append({
173
+ 'type': entity_type,
174
+ 'count': count,
175
+ 'description': config['description']
176
+ })
177
+ sanitized_text = re.sub(config['pattern'], config['replacement'], sanitized_text, flags=re.IGNORECASE)
178
+ logger.info(f" 🔒 Removed {count} {config['description']}")
179
+
180
+ was_pii_removed = sanitized_text != original_text
181
+
182
+ if was_pii_removed:
183
+ logger.info(f"✅ Total PII removals: {total_replacements} entities")
184
+ else:
185
+ logger.info("✅ No PII detected in text")
186
+
187
+ return {
188
+ 'sanitized_text': sanitized_text,
189
+ 'original_text': original_text,
190
+ 'was_pii_removed': was_pii_removed,
191
+ 'pii_count': total_replacements,
192
+ 'pii_detected': replacement_details
193
+ }
194
+
195
+ except Exception as e:
196
+ logger.error(f"❌ Error removing PII: {str(e)}")
197
+ return {
198
+ 'sanitized_text': text,
199
+ 'original_text': text,
200
+ 'was_pii_removed': False,
201
+ 'pii_count': 0,
202
+ 'pii_detected': [],
203
+ 'error': str(e)
204
+ }
205
+
206
+ def sanitize_provider_notes(self, notes: str) -> Dict[str, any]:
207
+ """
208
+ Sanitize provider notes by removing all PII
209
+ Main entry point for file processing
210
+
211
+ Args:
212
+ notes: Provider notes text
213
+
214
+ Returns:
215
+ Dictionary with sanitized notes and PII removal report
216
+ """
217
+ logger.info("🔒 Starting PII sanitization of provider notes...")
218
+ result = self.remove_pii(notes)
219
+
220
+ return {
221
+ 'sanitized_notes': result['sanitized_text'],
222
+ 'pii_removed_count': result['pii_count'],
223
+ 'pii_details': result['pii_detected'],
224
+ 'was_pii_found': result['was_pii_removed']
225
+ }
226
+
227
+
228
+ # Singleton instance
229
+ regex_pii_remover = RegexPIIRemover()