File size: 20,260 Bytes
b7c31ab
 
b8cffa7
edc8356
b8cffa7
b7c31ab
edc8356
b7c31ab
 
 
 
 
 
 
 
 
 
 
 
 
 
edc8356
 
 
 
 
 
b7c31ab
 
edc8356
 
 
 
 
b7c31ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b8cffa7
 
edc8356
b7c31ab
 
 
 
 
 
 
adf8222
edc8356
 
 
 
 
b7c31ab
 
 
 
 
 
 
 
edc8356
 
 
 
b7c31ab
 
 
 
 
 
 
 
 
 
 
32134ac
b7c31ab
 
 
 
 
 
32134ac
 
 
 
edc8356
 
 
b7c31ab
 
edc8356
 
b7c31ab
 
 
edc8356
aea4b1f
 
b7c31ab
 
 
 
 
 
 
 
edc8356
 
 
b7c31ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
edc8356
 
 
 
b7c31ab
 
 
 
 
 
 
 
 
adf8222
b7c31ab
 
 
edc8356
 
b7c31ab
edc8356
 
b7c31ab
edc8356
b7c31ab
adf8222
 
 
 
b7c31ab
 
 
adf8222
 
edc8356
 
 
adf8222
b7c31ab
adf8222
edc8356
 
 
 
b7c31ab
adf8222
 
 
b7c31ab
adf8222
 
edc8356
 
 
 
adf8222
b7c31ab
adf8222
 
 
edc8356
 
 
 
 
 
 
 
adf8222
b7c31ab
32134ac
 
 
 
 
 
edc8356
adf8222
 
 
edc8356
adf8222
 
 
edc8356
adf8222
32134ac
 
 
edc8356
adf8222
32134ac
edc8356
 
 
32134ac
edc8356
adf8222
edc8356
 
 
 
adf8222
edc8356
 
adf8222
32134ac
edc8356
adf8222
 
 
 
 
edc8356
 
 
 
 
 
b7c31ab
 
 
 
 
edc8356
b7c31ab
 
 
 
edc8356
 
 
b7c31ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
edc8356
 
 
 
b7c31ab
 
 
 
 
 
edc8356
 
aea4b1f
edc8356
 
b7c31ab
 
 
 
edc8356
b7c31ab
 
 
 
edc8356
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b7c31ab
edc8356
 
b7c31ab
 
 
edc8356
 
 
b7c31ab
 
 
edc8356
adf8222
edc8356
b7c31ab
edc8356
b7c31ab
edc8356
b7c31ab
 
 
 
 
edc8356
 
 
b7c31ab
 
 
 
 
 
 
 
 
 
 
 
edc8356
 
 
 
b7c31ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32134ac
 
b7c31ab
 
 
 
 
 
 
 
 
 
 
 
b8cffa7
 
b7c31ab
b8cffa7
b7c31ab
edc8356
5407d4c
 
b8cffa7
 
 
 
edc8356
5407d4c
b7c31ab
edc8356
b7c31ab
 
b8cffa7
5407d4c
b8cffa7
edc8356
 
 
 
b8cffa7
 
edc8356
b8cffa7
 
 
edc8356
b8cffa7
 
 
 
edc8356
b8cffa7
 
 
edc8356
b8cffa7
 
edc8356
b8cffa7
 
 
5407d4c
edc8356
 
 
 
5407d4c
 
edc8356
5407d4c
 
edc8356
5407d4c
 
 
edc8356
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
import re
import spacy
from typing import List, Dict, Tuple, Any, Optional

from database import EmailDatabase


class Entity:
    def __init__(self, start: int, end: int, entity_type: str, value: str):
        self.start = start
        self.end = end
        self.entity_type = entity_type
        self.value = value

    def to_dict(self):
        return {
            "position": [self.start, self.end],
            "classification": self.entity_type,
            "entity": self.value
        }

    def __repr__(self):  # Added for easier debugging
        return (
            f"Entity(type='{self.entity_type}', value='{self.value}', "
            f"start={self.start}, end={self.end})"
        )


class PIIMasker:
    def __init__(
        self,
        spacy_model_name: str = "xx_ent_wiki_sm",
        db_path: str = None
    ):  # Allow model choice
        # Load SpaCy model
        try:
            self.nlp = spacy.load(spacy_model_name)
        except OSError:
            print(f"SpaCy model '{spacy_model_name}' not found. Downloading...")
            try:
                spacy.cli.download(spacy_model_name)
                self.nlp = spacy.load(spacy_model_name)
            except Exception as e:
                print(f"Failed to download or load {spacy_model_name}. Error: {e}")
                print("Attempting to load 'en_core_web_sm' as a fallback for English.")
                try:
                    self.nlp = spacy.load("en_core_web_sm")
                except OSError:
                    print("Downloading 'en_core_web_sm'...")
                    spacy.cli.download("en_core_web_sm")
                    self.nlp = spacy.load("en_core_web_sm")

        # Initialize database connection with SQLite path
        self.db = EmailDatabase(connection_string=db_path)

        # Initialize regex patterns
        self._initialize_patterns()

    def _initialize_patterns(self):
        # Define regex patterns for different entity types
        self.patterns = {
            "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            # Simplified phone regex to capture both standard and international formats
            "phone_number": (
                r'\b(?:(?:\+|00)[1-9]\d{0,3}[-\s.]?)?'
                r'(?:\(?\d{1,5}\)?[-\s.]?)?\d{1,5}'
                r'(?:[-\s.]\d{1,5}){1,4}\b'
            ),
            # Card number regex: common formats, allows optional spaces/hyphens
            "credit_debit_no": r'\b(?:(?:\d{4}[\s-]?){3}\d{4}|\d{13,19})\b',
            # CVV: 3 or 4 digits, ensuring it's a standalone number (word boundary)
            "cvv_no": r'\b\d{3,4}\b',
            # Expiry: MM/YY or MM/YYYY, common separators
            "expiry_no": r'\b(0[1-9]|1[0-2])[/\s-]([0-9]{2}|20[0-9]{2})\b',
            "aadhar_num": r'\b\d{4}\s?\d{4}\s?\d{4}\b',
            # DOB: DD/MM/YYYY or DD-MM-YYYY etc.
            "dob": (
                r'\b(0[1-9]|[12][0-9]|3[01])[/\s-]'
                r'(0[1-9]|1[0-2])[/\s-](?:19|20)\d\d\b'
            )
        }

    def detect_regex_entities(self, text: str) -> List[Entity]:
        """Detect entities using regex patterns"""
        entities = []

        for entity_type, pattern in self.patterns.items():
            for match in re.finditer(pattern, text):
                start, end = match.span()
                value = match.group()

                # Specific verifications for each entity type
                if entity_type == "credit_debit_no":
                    if not self.verify_credit_card(text, match):
                        continue
                elif entity_type == "cvv_no":
                    if not self.verify_cvv(text, match):
                        continue
                elif entity_type == "phone_number":
                    if not self.verify_phone_number(text, match):
                        continue
                elif entity_type == "dob":
                    if not self._verify_with_context(
                        text, start, end, ["birth", "dob", "born"]
                    ):
                        continue

                # Avoid detecting parts of already matched longer entities
                # (e.g. year within a DOB)
                # This is a simple check; more robust overlap handling is done later
                is_substring_of_existing = False
                for existing_entity in entities:
                    if (existing_entity.start <= start
                            and existing_entity.end >= end
                            and existing_entity.value != value):
                        is_substring_of_existing = True
                        break
                if is_substring_of_existing:
                    continue

                entities.append(Entity(start, end, entity_type, value))
        return entities

    def _verify_with_context(
        self, text: str, start: int, end: int, keywords: List[str], window: int = 50
    ) -> bool:
        """Verify an entity match using surrounding context"""
        context_before = text[max(0, start - window):start].lower()
        context_after = text[end:min(len(text), end + window)].lower()

        for keyword in keywords:
            if keyword in context_before or keyword in context_after:
                return True
        return False

    def verify_credit_card(self, text: str, match: re.Match) -> bool:
        """Verify if a match is actually a credit card number using contextual clues"""
        context_window = 50
        start, end = match.span()

        context_before = text[max(0, start - context_window):start].lower()
        context_after = text[end:min(len(text), end + context_window)].lower()

        card_keywords = [
            "card", "credit", "debit", "visa", "mastercard",
            "payment", "amex", "account no", "card no"
        ]
        for keyword in card_keywords:
            if keyword in context_before or keyword in context_after:
                return True
        # Basic Luhn algorithm check (optional, can be computationally more intensive)
        # For simplicity, we'll rely on context here. If needed, Luhn can be added.
        return False

    def verify_cvv(self, text: str, match: re.Match) -> bool:
        """Verify if a 3-4 digit number is actually a CVV using contextual clues"""
        context_window = 50
        start, end = match.span()
        value = match.group()

        # If it's part of a longer number sequence (like a phone number or ID),
        # it's likely not a CVV
        # Check character immediately before and after
        char_before = text[start - 1:start] if start > 0 else ""
        char_after = text[end:end + 1] if end < len(text) else ""
        if char_before.isdigit() or char_after.isdigit():
            return False  # It's part of a larger number

        # Only consider 3-4 digit numbers
        if not value.isdigit() or len(value) < 3 or len(value) > 4:
            return False

        context_before = text[max(0, start - context_window):start].lower()
        context_after = text[end:min(len(text), end + context_window)].lower()

        # Expanded list of CVV-related keywords to improve detection
        cvv_keywords = [
            "cvv", "cvc", "csc", "security code", "card verification",
            "verification no", "security", "security number", "cv2",
            "card code", "security value"
        ]

        # Look for CVV context clues
        is_cvv_context = any(
            keyword in context_before or keyword in context_after
            for keyword in cvv_keywords
        )

        # If explicitly mentioned as a CVV, immediately return true
        if is_cvv_context:
            return True

        # If it looks like a year, reject it
        if len(value) == 4 and 1900 <= int(value) <= 2100:
            if any(
                k in context_before or k in context_after
                for k in ["year", "born", "established", "since"]
            ):
                return False

        # If in expiry date context, reject it
        if re.search(r'\b(0[1-9]|1[0-2])[/\s-]$', context_before.strip()):
            return False

        # If no context clues but we have a credit card mention nearby,
        # it could be a CVV
        card_context = any(
            k in context_before or k in context_after for k in
            ["card", "credit", "visa", "mastercard", "amex", "discover"]
        )

        return is_cvv_context or (card_context and len(value) in [3, 4])

    def verify_phone_number(self, text: str, match: re.Match) -> bool:
        """
        Verify if a match is actually a phone number using validation rules and context.
        """
        value = match.group()
        start, end = match.span()

        # Extract only digits to count them
        digits = ''.join(c for c in value if c.isdigit())
        digit_count = len(digits)

        # Most phone numbers worldwide have between 7 and 15 digits
        if digit_count < 7 or digit_count > 15:
            return False

        # Check for common phone number indicators
        context_window = 50
        context_before = text[max(0, start - context_window):start].lower()
        context_after = text[end:min(len(text), end + context_window)].lower()

        # Expanded phone keywords
        phone_keywords = [
            "phone", "call", "tel", "telephone", "contact", "dial", "mobile",
            "cell", "number", "direct", "office", "fax", "reach me at",
            "call me", "contact me", "line", "extension", "ext", "phone number"
        ]

        # Check for phone context
        has_phone_context = any(
            kw in context_before or kw in context_after for kw in phone_keywords
        )

        # Check for formatting that indicates a phone number
        has_phone_formatting = bool(re.search(r'[-\s.()\\+]', value))

        # Check for international prefix
        has_intl_prefix = value.startswith('+') or value.startswith('00')

        # Return true if any of these conditions are met:
        # 1. Has explicit phone context
        # 2. Has phone-like formatting AND reasonable digit count
        # 3. Has international prefix AND reasonable digit count
        # 4. Has 10 digits exactly (common in many countries) with formatting
        return (
            has_phone_context
            or (has_phone_formatting and digit_count >= 7)
            or (has_intl_prefix)
            or (digit_count == 10 and has_phone_formatting)
        )

    def detect_name_entities(self, text: str) -> List[Entity]:
        """Detect name entities using SpaCy NER"""
        entities = []
        doc = self.nlp(text)

        for ent in doc.ents:
            # Use PER for person, common in many models like xx_ent_wiki_sm
            # Also checking for PERSON as some models might use it.
            if ent.label_ in ["PER", "PERSON"]:
                entities.append(
                    Entity(ent.start_char, ent.end_char, "full_name", ent.text)
                )
        return entities

    def detect_all_entities(self, text: str) -> List[Entity]:
        """Detect all types of entities in the text"""
        # Get regex-based entities first
        entities = self.detect_regex_entities(text)

        # Add SpaCy-based name entities
        # We add them second and let overlap resolution handle conflicts
        # This is because NER for names can be more reliable than a generic regex
        name_entities = self.detect_name_entities(text)
        entities.extend(name_entities)

        # Sort entities by their starting position
        entities.sort(key=lambda x: x.start)

        # Resolve overlaps: prioritize NER entities (like names) or longer regex matches
        entities = self._resolve_overlaps(entities)
        return entities

    def _resolve_overlaps(self, entities: List[Entity]) -> List[Entity]:
        """Resolve overlapping entities.
        Prioritize:
        1. NER entities (e.g., "full_name") if they overlap with regex.
        2. Longer entities over shorter ones.
        3. If same length and type, no change (first one encountered).
        """
        if not entities:
            return []

        # A simple greedy approach: iterate and remove/adjust overlaps
        # This can be made more sophisticated
        resolved_entities: List[Entity] = []
        # Process by start, then by longest
        for current_entity in sorted(
            entities, key=lambda e: (e.start, -(e.end - e.start))
        ):
            is_overlapped_or_contained = False
            temp_resolved = []
            for i, res_entity in enumerate(resolved_entities):
                # Check for overlap:
                # Current: |----|
                # Res:         |----|   or |----| or   |--|  or  |------|
                overlap = max(
                    0,
                    min(current_entity.end, res_entity.end)
                    - max(current_entity.start, res_entity.start)
                )

                if overlap > 0:
                    is_overlapped_or_contained = True
                    # Preference:
                    # 1. NER often trump regex if they are the ones causing overlap
                    # 2. Longer entity wins
                    current_len = current_entity.end - current_entity.start
                    res_len = res_entity.end - res_entity.start

                    # If current is a name and overlaps, and previous is not a name,
                    # prefer current if it's not fully contained
                    if (current_entity.entity_type == "full_name"  # E501 corrected
                            and res_entity.entity_type != "full_name"):
                        # current not fully contained by res
                        if not (res_entity.start <= current_entity.start
                                and res_entity.end >= current_entity.end):
                            # remove res_entity, current will be added later
                            continue  # go to next res_entity, marked for removal
                    elif (res_entity.entity_type == "full_name"
                          and current_entity.entity_type != "full_name"):
                        # res_entity is a name, current is not. Prefer res_entity
                        # if it's not fully contained
                        if not (current_entity.start <= res_entity.start
                                and current_entity.end >= res_entity.end):
                            # current entity is subsumed or less important,
                            # so don't add current and keep res_entity
                            temp_resolved.append(res_entity)
                            is_overlapped_or_contained = True  # Mark current as handled
                            break  # Current is dominated

                    # General case: longer entity wins
                    if current_len > res_len:
                        # current is longer, res_entity is removed from
                        # consideration for this current_entity
                        pass  # res_entity not added to temp_resolved if fully replaced
                    elif res_len > current_len:
                        # res is longer, current is dominated
                        temp_resolved.append(res_entity)
                        is_overlapped_or_contained = True  # Mark current as handled
                        break
                    else:  # Same length, keep existing one (res_entity)
                        temp_resolved.append(res_entity)
                        is_overlapped_or_contained = True  # Mark current as handled
                        break
                else:  # No overlap
                    temp_resolved.append(res_entity)

            if not is_overlapped_or_contained:
                temp_resolved.append(current_entity)

            resolved_entities = sorted(
                temp_resolved, key=lambda e: (e.start, -(e.end - e.start))
            )

        # Final pass to remove fully contained entities if a larger one exists
        final_entities = []
        if not resolved_entities:
            return []

        for i, entity in enumerate(resolved_entities):
            is_contained = False
            for j, other_entity in enumerate(resolved_entities):
                if i == j:
                    continue
                # If 'entity' is strictly contained within 'other_entity'
                if (other_entity.start <= entity.start
                        and other_entity.end >= entity.end
                        and (other_entity.end - other_entity.start
                             > entity.end - entity.start)):
                    is_contained = True
                    break
            if not is_contained:
                final_entities.append(entity)

        return final_entities

    def mask_text(self, text: str) -> Tuple[str, List[Dict[str, Any]]]:
        """
        Mask PII entities in the text and return masked text and entity information
        """
        entities = self.detect_all_entities(text)
        entity_info = [entity.to_dict() for entity in entities]

        # Sort entities by start position to ensure correct masking,
        # longest first at same start to prevent partial masking by shorter entities
        entities.sort(key=lambda x: (x.start, -(x.end - x.start)))

        new_text_parts = []
        current_pos = 0

        for entity in entities:
            # Add text before the entity
            if entity.start > current_pos:
                new_text_parts.append(text[current_pos:entity.start])

            # Add the mask with entity type in uppercase for better visibility
            mask = f"[{entity.entity_type.upper()}]"
            new_text_parts.append(mask)

            current_pos = entity.end

        # Add any remaining text after the last entity
        if current_pos < len(text):
            new_text_parts.append(text[current_pos:])

        return "".join(new_text_parts), entity_info

    def process_email(self, email_text: str) -> Dict[str, Any]:
        """
        Process an email by detecting and masking PII entities.
        The original email is stored in the database for later retrieval if needed.
        """
        # Mask the email
        masked_email, entity_info = self.mask_text(email_text)

        # Store the email in the SQLite database - only get back email_id now
        email_id = self.db.store_email(
            original_email=email_text,
            masked_email=masked_email,
            masked_entities=entity_info
        )

        # Return the processed data with just the email_id
        return {
            "input_email_body": email_text,  # Return original for API compatibility
            "list_of_masked_entities": entity_info,
            "masked_email": masked_email,
            "category_of_the_email": "",
            "email_id": email_id
        }

    def get_original_email(
        self, email_id: str, access_key: str
    ) -> Optional[Dict[str, Any]]:
        """
        Retrieve the original email with PII using the email ID and access key.

        Args:
            email_id: The ID of the stored email
            access_key: The security key for accessing the original email

        Returns:
            The original email data or None if not found or access_key is invalid
        """
        return self.db.get_original_email(email_id, access_key)

    def get_masked_email_by_id(self, email_id: str) -> Optional[Dict[str, Any]]:
        """
        Retrieve a masked email by its ID (without the original PII-containing email).

        Args:
            email_id: The ID of the stored email

        Returns:
            The masked email data or None if not found
        """
        return self.db.get_email_by_id(email_id)

    def get_original_by_masked_email(
        self, masked_email: str
    ) -> Optional[Dict[str, Any]]:
        """
        Retrieve the original unmasked email using the masked email content.

        Args:
            masked_email: The masked version of the email to search for

        Returns:
            The original email data or None if not found
        """
        return self.db.get_email_by_masked_content(masked_email)