|
|
|
|
|
""" |
|
|
PII (Personally Identifiable Information) Detection Extension |
|
|
Integrates with dual-mode content moderation |
|
|
""" |
|
|
|
|
|
import re |
|
|
from enum import Enum |
|
|
from typing import Dict, List, Tuple |
|
|
|
|
|
class PIILabel(Enum): |
|
|
SAFE = "safe" |
|
|
EMAIL = "email" |
|
|
PHONE = "phone" |
|
|
ADDRESS = "address" |
|
|
CREDIT_CARD = "credit_card" |
|
|
SSN = "ssn" |
|
|
SOCIAL_MEDIA = "social_media" |
|
|
URL = "url" |
|
|
|
|
|
class UnicodeDeobfuscator: |
|
|
"""Detect and normalize unicode obfuscation attempts""" |
|
|
|
|
|
|
|
|
CIRCLED_LETTERS = range(0x24B6, 0x24EA) |
|
|
MATHEMATICAL_CHARS = range(0x1D400, 0x1D800) |
|
|
FULLWIDTH_CHARS = range(0xFF01, 0xFF5F) |
|
|
DOUBLE_STRUCK = range(0x2100, 0x2150) |
|
|
BOX_DRAWING = range(0x2500, 0x2580) |
|
|
BLOCK_ELEMENTS = range(0x2580, 0x25A0) |
|
|
|
|
|
|
|
|
CIRCLED_MAP = { |
|
|
|
|
|
'βΆ': 'A', 'β·': 'B', 'βΈ': 'C', 'βΉ': 'D', 'βΊ': 'E', |
|
|
'β»': 'F', 'βΌ': 'G', 'β½': 'H', 'βΎ': 'I', 'βΏ': 'J', |
|
|
'β': 'K', 'β': 'L', 'β': 'M', 'β': 'N', 'β': 'O', |
|
|
'β
': 'P', 'β': 'Q', 'β': 'R', 'β': 'S', 'β': 'T', |
|
|
'β': 'U', 'β': 'V', 'β': 'W', 'β': 'X', 'β': 'Y', 'β': 'Z', |
|
|
|
|
|
'β': 'a', 'β': 'b', 'β': 'c', 'β': 'd', 'β': 'e', |
|
|
'β': 'f', 'β': 'g', 'β': 'h', 'β': 'i', 'β': 'j', |
|
|
'β': 'k', 'β': 'l', 'β': 'm', 'β': 'n', 'β': 'o', |
|
|
'β': 'p', 'β ': 'q', 'β‘': 'r', 'β’': 's', 'β£': 't', |
|
|
'β€': 'u', 'β₯': 'v', 'β¦': 'w', 'β§': 'x', 'β¨': 'y', 'β©': 'z', |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def detect_obfuscation(cls, text: str) -> Tuple[bool, List[Tuple[str, str]], str]: |
|
|
""" |
|
|
Detect unicode obfuscation |
|
|
Returns: (is_obfuscated, [(char, type)], normalized_text) |
|
|
""" |
|
|
suspicious = [] |
|
|
normalized = [] |
|
|
|
|
|
for char in text: |
|
|
code = ord(char) |
|
|
|
|
|
|
|
|
if char in cls.CIRCLED_MAP: |
|
|
suspicious.append((char, 'circled')) |
|
|
normalized.append(cls.CIRCLED_MAP[char]) |
|
|
|
|
|
elif code in cls.DOUBLE_STRUCK: |
|
|
suspicious.append((char, 'double-struck')) |
|
|
|
|
|
if char == 'β': |
|
|
normalized.append('C') |
|
|
elif char == 'β': |
|
|
normalized.append('H') |
|
|
elif char == 'β': |
|
|
normalized.append('N') |
|
|
elif char == 'β': |
|
|
normalized.append('P') |
|
|
elif char == 'β': |
|
|
normalized.append('Q') |
|
|
elif char == 'β': |
|
|
normalized.append('R') |
|
|
elif char == 'β€': |
|
|
normalized.append('Z') |
|
|
else: |
|
|
normalized.append(char) |
|
|
|
|
|
elif code in cls.FULLWIDTH_CHARS: |
|
|
suspicious.append((char, 'fullwidth')) |
|
|
|
|
|
normalized.append(chr(code - 0xFEE0)) |
|
|
|
|
|
elif code in cls.MATHEMATICAL_CHARS: |
|
|
suspicious.append((char, 'mathematical')) |
|
|
normalized.append(char) |
|
|
else: |
|
|
normalized.append(char) |
|
|
|
|
|
is_obfuscated = len(suspicious) > 0 |
|
|
normalized_text = ''.join(normalized) |
|
|
|
|
|
return is_obfuscated, suspicious, normalized_text |
|
|
|
|
|
@classmethod |
|
|
def normalize(cls, text: str) -> str: |
|
|
"""Quick normalize without detection details""" |
|
|
_, _, normalized = cls.detect_obfuscation(text) |
|
|
return normalized |
|
|
|
|
|
|
|
|
class PIIDetector: |
|
|
"""Detect PII in text with context awareness""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.email_pattern = re.compile( |
|
|
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
|
|
) |
|
|
|
|
|
|
|
|
self.phone_patterns = [ |
|
|
re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'), |
|
|
re.compile(r'\b\(\d{3}\)\s?\d{3}[-.]?\d{4}\b'), |
|
|
re.compile(r'\b\+?\d{1,3}[-.\s]?\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b'), |
|
|
re.compile(r'\b\d{4}\s?\d{3}\s?\d{3}\b'), |
|
|
re.compile(r'\b\d{3}[-.]?\d{4}\b'), |
|
|
re.compile(r'\b\d{7,10}\b'), |
|
|
] |
|
|
|
|
|
|
|
|
self.address_patterns = [ |
|
|
re.compile(r'\b\d+\s+\d*[A-Za-z]+(?:\s+[A-Za-z]+)?\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Way|Place|Pl|Circle|Cir|Trail|Trl|Parkway|Pkwy)\b', re.IGNORECASE), |
|
|
re.compile(r'\b(?:PO|P\.O\.)\s*Box\s*\d+\b', re.IGNORECASE), |
|
|
re.compile(r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Ave|Road|Rd)\b', re.IGNORECASE), |
|
|
] |
|
|
|
|
|
|
|
|
self.cc_pattern = re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b|\b\d{16}\b') |
|
|
|
|
|
|
|
|
self.ssn_pattern = re.compile(r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b') |
|
|
|
|
|
|
|
|
self.social_media_domains = [ |
|
|
'instagram.com', 'instagr.am', |
|
|
'twitter.com', 'x.com', |
|
|
'tiktok.com', |
|
|
'snapchat.com', 'snap.com', |
|
|
'discord.com', 'discord.gg', |
|
|
'facebook.com', 'fb.com', |
|
|
'reddit.com', |
|
|
'youtube.com', 'youtu.be', |
|
|
'twitch.tv', |
|
|
'steamcommunity.com', |
|
|
'roblox.com', |
|
|
] |
|
|
|
|
|
|
|
|
self.grooming_keywords = [ |
|
|
'dm me', 'message me privately', 'private chat', 'secret', |
|
|
'dont tell your parents', 'our little secret', 'just between us', |
|
|
'send me pics', 'send pictures', 'photo of you', 'what do you look like', |
|
|
'how old are you', 'where do you live', 'home alone', 'parents gone', |
|
|
'meet up', 'meet in person', 'come over', 'visit you', |
|
|
'boyfriend', 'girlfriend', 'dating', 'relationship', |
|
|
'trust me', 'special friend', 'mature for your age', |
|
|
'youre different', 'understand you', 'only one who gets you', |
|
|
] |
|
|
|
|
|
|
|
|
self.url_pattern = re.compile( |
|
|
r'https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?', |
|
|
re.IGNORECASE |
|
|
) |
|
|
|
|
|
def detect_emails(self, text: str) -> List[Tuple[str, int, int]]: |
|
|
"""Find all emails in text""" |
|
|
matches = [] |
|
|
for match in self.email_pattern.finditer(text): |
|
|
matches.append((match.group(), match.start(), match.end())) |
|
|
return matches |
|
|
|
|
|
def detect_phones(self, text: str) -> List[Tuple[str, int, int]]: |
|
|
"""Find all phone numbers""" |
|
|
matches = [] |
|
|
for pattern in self.phone_patterns: |
|
|
for match in pattern.finditer(text): |
|
|
matches.append((match.group(), match.start(), match.end())) |
|
|
return matches |
|
|
|
|
|
def detect_addresses(self, text: str) -> List[Tuple[str, int, int]]: |
|
|
"""Find addresses""" |
|
|
matches = [] |
|
|
for pattern in self.address_patterns: |
|
|
for match in pattern.finditer(text): |
|
|
matches.append((match.group(), match.start(), match.end())) |
|
|
return matches |
|
|
|
|
|
def detect_credit_cards(self, text: str) -> List[Tuple[str, int, int]]: |
|
|
"""Find credit card numbers""" |
|
|
matches = [] |
|
|
for match in self.cc_pattern.finditer(text): |
|
|
card = match.group().replace('-', '').replace(' ', '') |
|
|
if len(card) >= 13 and len(card) <= 19: |
|
|
matches.append((match.group(), match.start(), match.end())) |
|
|
return matches |
|
|
|
|
|
def detect_ssn(self, text: str) -> List[Tuple[str, int, int]]: |
|
|
"""Find SSNs""" |
|
|
matches = [] |
|
|
for match in self.ssn_pattern.finditer(text): |
|
|
matches.append((match.group(), match.start(), match.end())) |
|
|
return matches |
|
|
|
|
|
def detect_social_media(self, text: str) -> List[Tuple[str, int, int, str]]: |
|
|
"""Find social media links with platform detection""" |
|
|
matches = [] |
|
|
urls = self.url_pattern.finditer(text) |
|
|
|
|
|
for url_match in urls: |
|
|
url = url_match.group() |
|
|
for domain in self.social_media_domains: |
|
|
if domain.lower() in url.lower(): |
|
|
matches.append((url, url_match.start(), url_match.end(), domain)) |
|
|
break |
|
|
|
|
|
|
|
|
username_patterns = [ |
|
|
re.compile(r'\b(?:instagram|ig|insta)[:\s]*@?(\w+)\b', re.IGNORECASE), |
|
|
re.compile(r'\b(?:twitter|x)[:\s]*@?(\w+)\b', re.IGNORECASE), |
|
|
re.compile(r'\bdiscord[:\s]*@?(\w+)\b', re.IGNORECASE), |
|
|
re.compile(r'\bsnapchat|snap[:\s]*@?(\w+)\b', re.IGNORECASE), |
|
|
re.compile(r'\btiktok[:\s]*@?(\w+)\b', re.IGNORECASE), |
|
|
] |
|
|
|
|
|
for pattern in username_patterns: |
|
|
for match in pattern.finditer(text): |
|
|
platform = match.group(0).split(':')[0].lower() |
|
|
matches.append((match.group(), match.start(), match.end(), platform)) |
|
|
|
|
|
return matches |
|
|
|
|
|
def detect_grooming_context(self, text: str) -> Tuple[bool, float, List[str]]: |
|
|
"""Detect if social media sharing has grooming context""" |
|
|
text_lower = text.lower() |
|
|
found_keywords = [] |
|
|
|
|
|
for keyword in self.grooming_keywords: |
|
|
if keyword in text_lower: |
|
|
found_keywords.append(keyword) |
|
|
|
|
|
|
|
|
risk_score = min(len(found_keywords) / 3.0, 1.0) |
|
|
is_suspicious = risk_score >= 0.33 |
|
|
|
|
|
return is_suspicious, risk_score, found_keywords |
|
|
|
|
|
def scan(self, text: str, age: int) -> Dict: |
|
|
""" |
|
|
Full PII scan with age-appropriate rules |
|
|
Also detects unicode obfuscation |
|
|
|
|
|
Returns: |
|
|
{ |
|
|
"has_pii": bool, |
|
|
"pii_types": list, |
|
|
"details": list, |
|
|
"social_media_allowed": bool, |
|
|
"grooming_risk": float, |
|
|
"action": "allow" | "block" | "flag", |
|
|
"reason": str, |
|
|
"obfuscation_detected": bool, |
|
|
"normalized_text": str |
|
|
} |
|
|
""" |
|
|
|
|
|
is_obfuscated, suspicious_chars, normalized_text = UnicodeDeobfuscator.detect_obfuscation(text) |
|
|
|
|
|
|
|
|
detection_text = normalized_text if is_obfuscated else text |
|
|
|
|
|
pii_found = [] |
|
|
pii_types = set() |
|
|
|
|
|
|
|
|
emails = self.detect_emails(detection_text) |
|
|
if emails: |
|
|
pii_types.add(PIILabel.EMAIL) |
|
|
for email, start, end in emails: |
|
|
pii_found.append({"type": "email", "value": email, "start": start, "end": end}) |
|
|
|
|
|
phones = self.detect_phones(detection_text) |
|
|
if phones: |
|
|
pii_types.add(PIILabel.PHONE) |
|
|
for phone, start, end in phones: |
|
|
pii_found.append({"type": "phone", "value": phone, "start": start, "end": end}) |
|
|
|
|
|
addresses = self.detect_addresses(detection_text) |
|
|
if addresses: |
|
|
pii_types.add(PIILabel.ADDRESS) |
|
|
for addr, start, end in addresses: |
|
|
pii_found.append({"type": "address", "value": addr, "start": start, "end": end}) |
|
|
|
|
|
credit_cards = self.detect_credit_cards(detection_text) |
|
|
if credit_cards: |
|
|
pii_types.add(PIILabel.CREDIT_CARD) |
|
|
for cc, start, end in credit_cards: |
|
|
pii_found.append({"type": "credit_card", "value": cc, "start": start, "end": end}) |
|
|
|
|
|
ssns = self.detect_ssn(detection_text) |
|
|
if ssns: |
|
|
pii_types.add(PIILabel.SSN) |
|
|
for ssn, start, end in ssns: |
|
|
pii_found.append({"type": "ssn", "value": ssn, "start": start, "end": end}) |
|
|
|
|
|
|
|
|
social_links = self.detect_social_media(detection_text) |
|
|
has_social_media = len(social_links) > 0 |
|
|
|
|
|
if has_social_media: |
|
|
pii_types.add(PIILabel.SOCIAL_MEDIA) |
|
|
for link, start, end, platform in social_links: |
|
|
pii_found.append({"type": "social_media", "value": link, "platform": platform, "start": start, "end": end}) |
|
|
|
|
|
|
|
|
grooming_risk = 0.0 |
|
|
grooming_keywords = [] |
|
|
|
|
|
|
|
|
critical_pii = pii_types.intersection({PIILabel.EMAIL, PIILabel.PHONE, PIILabel.ADDRESS, PIILabel.CREDIT_CARD, PIILabel.SSN}) |
|
|
|
|
|
if critical_pii: |
|
|
action = "block" |
|
|
reason = f"PII detected: {', '.join([p.value for p in critical_pii])}" |
|
|
elif has_social_media: |
|
|
|
|
|
is_grooming, grooming_risk, grooming_keywords = self.detect_grooming_context(detection_text) |
|
|
|
|
|
if age < 13: |
|
|
|
|
|
action = "block" |
|
|
reason = "Social media sharing not permitted under 13" |
|
|
elif is_grooming: |
|
|
|
|
|
action = "block" |
|
|
reason = f"Potential grooming detected (risk: {grooming_risk:.0%})" |
|
|
else: |
|
|
|
|
|
action = "allow" |
|
|
reason = "Social media permitted for 13+ (no grooming signals)" |
|
|
else: |
|
|
action = "allow" |
|
|
reason = "No PII detected" |
|
|
|
|
|
|
|
|
social_media_allowed = True |
|
|
if has_social_media: |
|
|
if age < 13: |
|
|
social_media_allowed = False |
|
|
elif grooming_risk > 0: |
|
|
social_media_allowed = False |
|
|
|
|
|
|
|
|
if is_obfuscated and action == "allow": |
|
|
reason = f"Unicode obfuscation detected and normalized. {reason}" |
|
|
|
|
|
return { |
|
|
"has_pii": len(pii_types) > 0, |
|
|
"pii_types": [p.value for p in pii_types], |
|
|
"details": pii_found, |
|
|
"social_media_allowed": social_media_allowed, |
|
|
"grooming_risk": grooming_risk, |
|
|
"grooming_keywords": grooming_keywords, |
|
|
"action": action, |
|
|
"reason": reason, |
|
|
"age": age, |
|
|
"obfuscation_detected": is_obfuscated, |
|
|
"obfuscation_chars": [(c, t) for c, t in suspicious_chars] if is_obfuscated else [], |
|
|
"normalized_text": normalized_text if is_obfuscated else text |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
class CombinedModerationFilter: |
|
|
"""Combines content moderation + PII detection""" |
|
|
|
|
|
def __init__(self, content_model_path="./moderation_model_v2.pkl"): |
|
|
from enhanced_moderation import EnhancedContentModerator, ContentLabel |
|
|
|
|
|
self.content_moderator = EnhancedContentModerator() |
|
|
self.content_moderator.load(content_model_path) |
|
|
self.pii_detector = PIIDetector() |
|
|
|
|
|
|
|
|
self.under_13_blocked_content = [1, 2, 3, 4, 5] |
|
|
self.teen_plus_blocked_content = [1, 3, 4, 5] |
|
|
|
|
|
def check(self, text: str, age: int) -> Dict: |
|
|
"""Full check: content + PII""" |
|
|
from enhanced_moderation import ContentLabel |
|
|
|
|
|
|
|
|
pii_result = self.pii_detector.scan(text, age) |
|
|
|
|
|
if pii_result["action"] == "block": |
|
|
return { |
|
|
"allowed": False, |
|
|
"violation": "PII", |
|
|
"pii_details": pii_result, |
|
|
"content_details": None, |
|
|
"reason": pii_result["reason"], |
|
|
"age": age |
|
|
} |
|
|
|
|
|
|
|
|
content_label, confidence = self.content_moderator.predict(text) |
|
|
|
|
|
|
|
|
if age >= 13: |
|
|
content_allowed = content_label.value not in self.teen_plus_blocked_content |
|
|
else: |
|
|
content_allowed = content_label.value not in self.under_13_blocked_content |
|
|
|
|
|
|
|
|
if not content_allowed and content_label.value == 2 and age >= 13: |
|
|
content_allowed = True |
|
|
content_reason = "Swearing permitted as reaction (13+)" |
|
|
elif not content_allowed: |
|
|
content_reason = f"{content_label.name} detected" |
|
|
else: |
|
|
content_reason = "Content safe" |
|
|
|
|
|
if not content_allowed: |
|
|
return { |
|
|
"allowed": False, |
|
|
"violation": "CONTENT", |
|
|
"pii_details": pii_result, |
|
|
"content_details": { |
|
|
"label": content_label.name, |
|
|
"confidence": confidence |
|
|
}, |
|
|
"reason": content_reason, |
|
|
"age": age |
|
|
} |
|
|
|
|
|
|
|
|
return { |
|
|
"allowed": True, |
|
|
"violation": None, |
|
|
"pii_details": pii_result, |
|
|
"content_details": { |
|
|
"label": content_label.name, |
|
|
"confidence": confidence |
|
|
}, |
|
|
"reason": "Content and PII checks passed", |
|
|
"age": age |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
detector = PIIDetector() |
|
|
|
|
|
test_cases = [ |
|
|
("My email is john@example.com", 15), |
|
|
("Call me at 555-123-4567", 16), |
|
|
("I'm at 123 Main Street", 14), |
|
|
("Follow me on instagram @cooluser", 10), |
|
|
("Follow me on instagram @cooluser", 15), |
|
|
("DM me on instagram, don't tell your parents", 15), |
|
|
("Check my tiktok @user", 14), |
|
|
("Send me pics on snapchat, it's our secret", 13), |
|
|
] |
|
|
|
|
|
print("PII Detection Tests") |
|
|
print("=" * 70) |
|
|
|
|
|
for text, age in test_cases: |
|
|
result = detector.scan(text, age) |
|
|
status = "β
ALLOW" if result["action"] == "allow" else "β BLOCK" |
|
|
|
|
|
print(f"\nAge {age}: '{text}'") |
|
|
print(f" {status} - {result['reason']}") |
|
|
if result["grooming_risk"] > 0: |
|
|
print(f" Grooming risk: {result['grooming_risk']:.0%}") |
|
|
print(f" Keywords: {result['grooming_keywords']}") |
|
|
|