PII-Detection / data_augmentation.py
AmitHirpara's picture
add comments
f53fac9
import random
from faker import Faker
import pandas as pd
import numpy as np
from collections import Counter
import torch
class PIIDataAugmenter:
"""
Generates synthetic PII examples to augment training data.
This class creates realistic examples of various PII types including names,
emails, phone numbers, addresses, IDs, URLs, and usernames.
"""
def __init__(self, seed=42):
"""Initialize the augmenter with random seeds for reproducibility."""
# Set random seeds for consistent results
random.seed(seed)
np.random.seed(seed)
self.fake = Faker()
Faker.seed(seed)
# Initialize data structures
self._init_templates()
self._init_context_phrases()
self._init_generators()
def _init_templates(self):
"""Initialize templates for different PII types."""
# Templates for generating sentences with PII
self.templates = {
'NAME_STUDENT': [
"My name is {name}",
"I am {name}",
"This is {name} speaking",
"Student: {name}",
"{name} here",
"Submitted by {name}",
"Author: {name}",
"Contact {name} for more information",
"Please call {name}",
"{name} is my name"
],
'EMAIL': [
"Email me at {email}",
"My email is {email}",
"Contact: {email}",
"Send to {email}",
"Reach me at {email}",
"Email address: {email}",
"You can email {email}",
"Write to {email}",
"My contact email is {email}",
"Send your response to {email}"
],
'PHONE_NUM': [
"Call me at {phone}",
"My phone number is {phone}",
"Phone: {phone}",
"Contact number: {phone}",
"Reach me at {phone}",
"My number is {phone}",
"You can call {phone}",
"Mobile: {phone}",
"Tel: {phone}",
"Phone contact: {phone}"
],
'STREET_ADDRESS': [
"I live at {address}",
"My address is {address}",
"Located at {address}",
"Address: {address}",
"Find me at {address}",
"Residence: {address}",
"Mail to {address}",
"Home address: {address}",
"Visit us at {address}",
"Ship to {address}"
],
'ID_NUM': [
"ID: {id_num}",
"Student ID: {id_num}",
"ID number {id_num}",
"Reference number: {id_num}",
"Account: {id_num}",
"Member ID: {id_num}",
"Registration: {id_num}",
"Code: {id_num}",
"Identification: {id_num}",
"Number: {id_num}"
],
'URL_PERSONAL': [
"Visit my website at {url}",
"Check out {url}",
"My portfolio: {url}",
"Website: {url}",
"Link: {url}",
"Find me online at {url}",
"Personal site: {url}",
"URL: {url}",
"Web: {url}",
"Online at {url}"
],
'USERNAME': [
"Username: {username}",
"User: {username}",
"Handle: {username}",
"My username is {username}",
"Find me as {username}",
"Account: {username}",
"Login: {username}",
"Profile: {username}",
"Known as {username}",
"Tag me @{username}"
]
}
def _init_context_phrases(self):
"""Initialize context phrases for more natural text generation."""
# Opening phrases for generated text
self.context_prefix = [
"Hello everyone,",
"Dear Sir/Madam,",
"To whom it may concern,",
"Please note that",
"For your reference,",
"As requested,",
"I would like to inform you that",
"This is to confirm that",
"Please be advised that",
"I am writing to tell you that"
]
# Closing phrases for generated text
self.context_suffix = [
"Thank you.",
"Best regards.",
"Please let me know if you need anything else.",
"Looking forward to your response.",
"Have a great day!",
"Thanks for your attention.",
"Feel free to contact me.",
"I appreciate your help.",
"Hope this helps.",
"Let me know if you have questions."
]
# Words to connect multiple PII elements
self.connectors = [
" and ", " or ", ", ", ". Also, ", ". Additionally, "
]
def _init_generators(self):
"""Initialize PII generators mapping."""
# Map PII types to their generator functions
self.generators = {
'NAME_STUDENT': self.generate_name,
'EMAIL': self.generate_email,
'PHONE_NUM': self.generate_phone,
'STREET_ADDRESS': self.generate_address,
'ID_NUM': self.generate_id_num,
'URL_PERSONAL': self.generate_url,
'USERNAME': self.generate_username
}
# Map PII types to template placeholder keys
self.format_keys = {
'NAME_STUDENT': 'name',
'EMAIL': 'email',
'PHONE_NUM': 'phone',
'STREET_ADDRESS': 'address',
'ID_NUM': 'id_num',
'URL_PERSONAL': 'url',
'USERNAME': 'username'
}
def generate_name(self):
"""Generate realistic person names."""
return self.fake.name()
def generate_email(self):
"""Generate realistic email addresses."""
return self.fake.email()
def generate_phone(self):
"""Generate realistic phone numbers in various formats."""
# Different phone number formats
formats = [
"555-{:03d}-{:04d}",
"(555) {:03d}-{:04d}",
"555.{:03d}.{:04d}",
"+1-555-{:03d}-{:04d}",
"555{:03d}{:04d}"
]
# Pick a random format and fill with random numbers
format_choice = random.choice(formats)
area = random.randint(100, 999)
number = random.randint(1000, 9999)
return format_choice.format(area, number)
def generate_address(self):
"""Generate realistic street addresses."""
# Get address and replace newlines with commas
return self.fake.address().replace('\n', ', ')
def generate_id_num(self):
"""Generate various ID number formats."""
# Different ID number patterns
formats = [
"{:06d}", # 6-digit ID
"{:08d}", # 8-digit ID
"ID{:05d}", # ID prefix
"STU{:06d}", # Student ID
"{:04d}-{:04d}", # Hyphenated
"A{:07d}", # Letter prefix
]
format_choice = random.choice(formats)
# Handle hyphenated format differently
if '-' in format_choice:
return format_choice.format(
random.randint(1000, 9999),
random.randint(1000, 9999)
)
else:
return format_choice.format(random.randint(10000, 9999999))
def generate_url(self):
"""Generate personal website URLs."""
# Common personal website domains
domains = ['github.com', 'linkedin.com', 'portfolio.com',
'personal.com', 'website.com']
username = self.fake.user_name()
domain = random.choice(domains)
return f"https://{domain}/{username}"
def generate_username(self):
"""Generate usernames."""
return self.fake.user_name()
def create_synthetic_example(self, pii_type, add_context=True):
"""Create a synthetic example with proper BIO labeling."""
# Generate the PII value
pii_value = self.generators[pii_type]()
# Choose a template and insert the PII
template = random.choice(self.templates[pii_type])
format_key = self.format_keys[pii_type]
sentence = template.format(**{format_key: pii_value})
# Optionally add context for more natural text
if add_context and random.random() > 0.3:
sentence = self._add_context(sentence)
# Create tokens and labels
tokens, labels = self._tokenize_and_label(sentence, pii_value, pii_type)
return tokens, labels
def create_mixed_example(self, pii_types, num_pii=2):
"""Create examples with multiple PII types."""
# Select which PII types to include
selected_types = random.sample(pii_types, min(num_pii, len(pii_types)))
all_tokens = []
all_labels = []
# Add opening context
if random.random() > 0.3:
prefix = random.choice(self.context_prefix)
all_tokens.extend(prefix.split())
all_labels.extend(['O'] * len(prefix.split()))
# Add each PII entity
for i, pii_type in enumerate(selected_types):
# Add connector between PII entities
if i > 0 and random.random() > 0.5:
connector = random.choice(self.connectors)
all_tokens.extend(connector.strip().split())
all_labels.extend(['O'] * len(connector.strip().split()))
# Generate PII example
tokens, labels = self.create_synthetic_example(pii_type, add_context=False)
all_tokens.extend(tokens)
all_labels.extend(labels)
# Add closing context
if random.random() > 0.3:
suffix = random.choice(self.context_suffix)
all_tokens.extend(suffix.split())
all_labels.extend(['O'] * len(suffix.split()))
return all_tokens, all_labels
def _add_context(self, sentence):
"""Add context phrases to make text more natural."""
# Randomly add prefix
if random.random() > 0.5:
sentence = random.choice(self.context_prefix) + " " + sentence
# Randomly add suffix
if random.random() > 0.5:
sentence = sentence + " " + random.choice(self.context_suffix)
return sentence
def _tokenize_and_label(self, sentence, pii_value, pii_type):
"""Tokenize sentence and apply BIO labels for PII."""
# Split sentence into tokens
tokens = sentence.split()
labels = ['O'] * len(tokens)
# Split PII value into tokens
pii_tokens = pii_value.split()
# Find where PII appears in the sentence
for i in range(len(tokens) - len(pii_tokens) + 1):
# Check if tokens match the PII value
if (tokens[i:i+len(pii_tokens)] == pii_tokens or
' '.join(tokens[i:i+len(pii_tokens)]).lower() == pii_value.lower()):
# Apply BIO tagging
labels[i] = f'B-{pii_type}' # Beginning
for j in range(1, len(pii_tokens)):
labels[i+j] = f'I-{pii_type}' # Inside
break
return tokens, labels
def augment_dataset(self, original_data, target_samples_per_class=1000, mix_ratio=0.3):
"""Augment dataset with synthetic examples to balance PII classes."""
# Check current distribution
label_counts = self._analyze_label_distribution(original_data)
print("\nOriginal label distribution:")
self._print_distribution(label_counts)
# Generate synthetic data
synthetic_tokens, synthetic_labels = self._generate_synthetic_data(
label_counts, target_samples_per_class, mix_ratio
)
# Add some non-PII examples for balance
synthetic_tokens, synthetic_labels = self._add_non_pii_examples(
synthetic_tokens, synthetic_labels
)
# Combine original and synthetic data
augmented_df = self._combine_and_shuffle(
original_data, synthetic_tokens, synthetic_labels
)
# Check new distribution
new_label_counts = self._analyze_label_distribution(augmented_df)
print("\nAugmented label distribution:")
self._print_distribution(new_label_counts)
return augmented_df
def _analyze_label_distribution(self, data):
"""Analyze the distribution of PII labels in the dataset."""
label_counts = Counter()
# Count each PII type
for labels in data['labels']:
for label in labels:
if label != 'O':
# Remove B- or I- prefix to get base label
base_label = label.split('-')[1] if '-' in label else label
label_counts[base_label] += 1
return label_counts
def _print_distribution(self, label_counts):
"""Print label distribution statistics."""
total = sum(label_counts.values())
# Print each label count and percentage
for label, count in label_counts.most_common():
percentage = (count / total * 100) if total > 0 else 0
print(f" {label:15} : {count:6,} ({percentage:5.2f}%)")
def _generate_synthetic_data(self, label_counts, target_samples, mix_ratio):
"""Generate synthetic PII examples based on current distribution."""
synthetic_tokens = []
synthetic_labels = []
# Generate examples for each PII type
for pii_type in self.templates.keys():
current_count = label_counts.get(pii_type, 0)
needed = max(0, target_samples - current_count)
if needed == 0:
continue
print(f"\nGenerating {needed} synthetic examples for {pii_type}")
# Generate single PII examples
single_count = int(needed * (1 - mix_ratio))
for _ in range(single_count):
tokens, labels = self.create_synthetic_example(pii_type)
synthetic_tokens.append(tokens)
synthetic_labels.append(labels)
# Generate mixed PII examples
mixed_count = int(needed * mix_ratio)
for _ in range(mixed_count):
# Make sure current PII type is included
other_types = [t for t in self.templates.keys() if t != pii_type]
selected_types = [pii_type] + random.sample(
other_types, min(1, len(other_types))
)
tokens, labels = self.create_mixed_example(selected_types, num_pii=2)
synthetic_tokens.append(tokens)
synthetic_labels.append(labels)
return synthetic_tokens, synthetic_labels
def _add_non_pii_examples(self, synthetic_tokens, synthetic_labels):
"""Add examples without PII (all 'O' labels) for balance."""
# Add 10% non-PII examples
num_non_pii = int(len(synthetic_tokens) * 0.1)
for _ in range(num_non_pii):
# Generate random text without PII
sentence = self.fake.text(max_nb_chars=100)
tokens = sentence.split()
labels = ['O'] * len(tokens)
synthetic_tokens.append(tokens)
synthetic_labels.append(labels)
return synthetic_tokens, synthetic_labels
def _combine_and_shuffle(self, original_data, synthetic_tokens, synthetic_labels):
"""Combine original and synthetic data, then shuffle."""
# Merge all data
all_tokens = original_data['tokens'].tolist() + synthetic_tokens
all_labels = original_data['labels'].tolist() + synthetic_labels
# Create new dataframe
augmented_data = pd.DataFrame({
'tokens': all_tokens,
'labels': all_labels
})
# Shuffle the data
augmented_data = augmented_data.sample(frac=1, random_state=42).reset_index(drop=True)
print(f"\nTotal augmented samples: {len(augmented_data):,}")
return augmented_data
def calculate_class_weights(data, label_vocab):
"""Calculate class weights for balanced loss function."""
# Count occurrences of each label
label_counts = Counter()
for labels in data['labels']:
for label in labels:
label_id = label_vocab.word2idx.get(label.lower(), 0)
label_counts[label_id] += 1
# Calculate weights based on inverse frequency
total_samples = sum(label_counts.values())
num_classes = len(label_vocab)
weights = torch.zeros(num_classes)
for class_id, count in label_counts.items():
if count > 0:
# Inverse frequency weighting
weights[class_id] = total_samples / (num_classes * count)
# Normalize the weights
weights = weights / weights.sum() * num_classes
# Prevent extreme weights
weights = torch.clamp(weights, min=0.1, max=10.0)
# Don't weight padding tokens
weights[0] = 0.0
return weights
if __name__ == '__main__':
"""Example usage of the augmentation module."""
# Load original training data
print("Loading original training data...")
original_data = pd.read_json('train.json')
print(f"Original dataset size: {len(original_data):,}")
# Create augmenter instance
augmenter = PIIDataAugmenter(seed=42)
# Run augmentation
print("\n" + "="*60)
print("Starting data augmentation...")
print("="*60)
augmented_data = augmenter.augment_dataset(
original_data,
target_samples_per_class=2000,
mix_ratio=0.3
)
# Save the augmented dataset
output_path = './train_augmented.json'
augmented_data.to_json(output_path, orient='records', lines=True)
print(f"\nSaved augmented data to {output_path}")