File size: 5,407 Bytes
d4398e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""

PII (Personally Identifiable Information) Filter Module

=========================================================

Regex-based detection and masking for emails, phone numbers,

CNIC/SSN-like patterns, API keys, and addresses.

"""

import re
from dataclasses import dataclass
from typing import List, Dict, Tuple
import pandas as pd


@dataclass
class PIIFilterConfig:
    """Configuration for PII filtering."""
    filter_emails: bool = False
    filter_phones: bool = False
    filter_id_numbers: bool = False   # CNIC / SSN patterns
    filter_api_keys: bool = False
    filter_addresses: bool = False
    mask_char: str = "[REDACTED]"


# ---------------------------------------------------------------------------
# Detection + Masking patterns
# ---------------------------------------------------------------------------

_EMAIL_PATTERN = re.compile(
    r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
)

_PHONE_PATTERN = re.compile(
    r'(?:\+?\d{1,3}[-.\s]?)?\(?\d{2,4}\)?[-.\s]?\d{3,4}[-.\s]?\d{3,4}'
)

# SSN: 123-45-6789, CNIC: 12345-1234567-1
_ID_NUMBER_PATTERN = re.compile(
    r'\b\d{3}-\d{2}-\d{4}\b'        # US SSN
    r'|\b\d{5}-\d{7}-\d{1}\b'       # PK CNIC
    r'|\b\d{13}\b'                   # 13-digit ID
)

# Long hex or base64 strings that look like API keys / secrets
_API_KEY_PATTERN = re.compile(
    r'\b(?:sk|pk|api|key|secret|token)[_-]?[A-Za-z0-9]{20,}\b'
    r'|[A-Fa-f0-9]{32,}'
    r'|[A-Za-z0-9+/]{40,}={0,2}',
    re.IGNORECASE,
)

# Basic address patterns (US-style zip, PO Box, street numbers)
_ADDRESS_PATTERN = re.compile(
    r'\b\d{1,5}\s+\w+\s+(?:St|Street|Ave|Avenue|Blvd|Boulevard|Dr|Drive|Rd|Road|Ln|Lane|Way|Ct|Court)\b'
    r'|\bP\.?O\.?\s*Box\s+\d+\b'
    r'|\b\d{5}(?:-\d{4})?\b',         # Zip code
    re.IGNORECASE,
)


def detect_emails(text: str) -> List[str]:
    """Find all email addresses in text."""
    return _EMAIL_PATTERN.findall(text) if isinstance(text, str) else []


def mask_emails(text: str, mask: str = "[REDACTED_EMAIL]") -> str:
    """Replace email addresses with mask."""
    return _EMAIL_PATTERN.sub(mask, text) if isinstance(text, str) else text


def detect_phones(text: str) -> List[str]:
    """Find all phone numbers in text."""
    return _PHONE_PATTERN.findall(text) if isinstance(text, str) else []


def mask_phones(text: str, mask: str = "[REDACTED_PHONE]") -> str:
    """Replace phone numbers with mask."""
    return _PHONE_PATTERN.sub(mask, text) if isinstance(text, str) else text


def detect_id_numbers(text: str) -> List[str]:
    """Find SSN/CNIC-like patterns in text."""
    return _ID_NUMBER_PATTERN.findall(text) if isinstance(text, str) else []


def mask_id_numbers(text: str, mask: str = "[REDACTED_ID]") -> str:
    """Replace ID number patterns with mask."""
    return _ID_NUMBER_PATTERN.sub(mask, text) if isinstance(text, str) else text


def detect_api_keys(text: str) -> List[str]:
    """Find API key / secret patterns in text."""
    return _API_KEY_PATTERN.findall(text) if isinstance(text, str) else []


def mask_api_keys(text: str, mask: str = "[REDACTED_KEY]") -> str:
    """Replace API key patterns with mask."""
    return _API_KEY_PATTERN.sub(mask, text) if isinstance(text, str) else text


def detect_addresses(text: str) -> List[str]:
    """Find address-like patterns in text."""
    return _ADDRESS_PATTERN.findall(text) if isinstance(text, str) else []


def mask_addresses(text: str, mask: str = "[REDACTED_ADDR]") -> str:
    """Replace address patterns with mask."""
    return _ADDRESS_PATTERN.sub(mask, text) if isinstance(text, str) else text


def apply_pii_filter(

    text: str,

    config: PIIFilterConfig,

) -> str:
    """Apply all enabled PII filters to a single text string."""
    mask = config.mask_char

    if config.filter_emails:
        text = mask_emails(text, mask)
    if config.filter_phones:
        text = mask_phones(text, mask)
    if config.filter_id_numbers:
        text = mask_id_numbers(text, mask)
    if config.filter_api_keys:
        text = mask_api_keys(text, mask)
    if config.filter_addresses:
        text = mask_addresses(text, mask)

    return text


def apply_pii_filter_df(

    df: pd.DataFrame,

    columns: List[str],

    config: PIIFilterConfig,

) -> pd.DataFrame:
    """Apply PII filtering to specified columns of a DataFrame."""
    df = df.copy()
    for col in columns:
        if col in df.columns:
            df[col] = df[col].apply(lambda t: apply_pii_filter(str(t), config))
    return df


def detect_pii_summary(

    df: pd.DataFrame,

    columns: List[str],

) -> Dict[str, int]:
    """

    Scan columns and count PII instances found.

    Returns dict like {"emails": 5, "phones": 2, ...}.

    """
    summary = {"emails": 0, "phones": 0, "id_numbers": 0, "api_keys": 0, "addresses": 0}

    for col in columns:
        if col not in df.columns:
            continue
        for text in df[col].astype(str):
            summary["emails"] += len(detect_emails(text))
            summary["phones"] += len(detect_phones(text))
            summary["id_numbers"] += len(detect_id_numbers(text))
            summary["api_keys"] += len(detect_api_keys(text))
            summary["addresses"] += len(detect_addresses(text))

    return summary