Spaces:
Sleeping
Sleeping
| import random | |
| from faker import Faker | |
| import pandas as pd | |
| import numpy as np | |
| from collections import Counter | |
| import torch | |
| class PIIDataAugmenter: | |
| """ | |
| Generates synthetic PII examples to augment training data. | |
| This class creates realistic examples of various PII types including names, | |
| emails, phone numbers, addresses, IDs, URLs, and usernames. | |
| """ | |
| def __init__(self, seed=42): | |
| """Initialize the augmenter with random seeds for reproducibility.""" | |
| # Set random seeds for consistent results | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| self.fake = Faker() | |
| Faker.seed(seed) | |
| # Initialize data structures | |
| self._init_templates() | |
| self._init_context_phrases() | |
| self._init_generators() | |
| def _init_templates(self): | |
| """Initialize templates for different PII types.""" | |
| # Templates for generating sentences with PII | |
| self.templates = { | |
| 'NAME_STUDENT': [ | |
| "My name is {name}", | |
| "I am {name}", | |
| "This is {name} speaking", | |
| "Student: {name}", | |
| "{name} here", | |
| "Submitted by {name}", | |
| "Author: {name}", | |
| "Contact {name} for more information", | |
| "Please call {name}", | |
| "{name} is my name" | |
| ], | |
| 'EMAIL': [ | |
| "Email me at {email}", | |
| "My email is {email}", | |
| "Contact: {email}", | |
| "Send to {email}", | |
| "Reach me at {email}", | |
| "Email address: {email}", | |
| "You can email {email}", | |
| "Write to {email}", | |
| "My contact email is {email}", | |
| "Send your response to {email}" | |
| ], | |
| 'PHONE_NUM': [ | |
| "Call me at {phone}", | |
| "My phone number is {phone}", | |
| "Phone: {phone}", | |
| "Contact number: {phone}", | |
| "Reach me at {phone}", | |
| "My number is {phone}", | |
| "You can call {phone}", | |
| "Mobile: {phone}", | |
| "Tel: {phone}", | |
| "Phone contact: {phone}" | |
| ], | |
| 'STREET_ADDRESS': [ | |
| "I live at {address}", | |
| "My address is {address}", | |
| "Located at {address}", | |
| "Address: {address}", | |
| "Find me at {address}", | |
| "Residence: {address}", | |
| "Mail to {address}", | |
| "Home address: {address}", | |
| "Visit us at {address}", | |
| "Ship to {address}" | |
| ], | |
| 'ID_NUM': [ | |
| "ID: {id_num}", | |
| "Student ID: {id_num}", | |
| "ID number {id_num}", | |
| "Reference number: {id_num}", | |
| "Account: {id_num}", | |
| "Member ID: {id_num}", | |
| "Registration: {id_num}", | |
| "Code: {id_num}", | |
| "Identification: {id_num}", | |
| "Number: {id_num}" | |
| ], | |
| 'URL_PERSONAL': [ | |
| "Visit my website at {url}", | |
| "Check out {url}", | |
| "My portfolio: {url}", | |
| "Website: {url}", | |
| "Link: {url}", | |
| "Find me online at {url}", | |
| "Personal site: {url}", | |
| "URL: {url}", | |
| "Web: {url}", | |
| "Online at {url}" | |
| ], | |
| 'USERNAME': [ | |
| "Username: {username}", | |
| "User: {username}", | |
| "Handle: {username}", | |
| "My username is {username}", | |
| "Find me as {username}", | |
| "Account: {username}", | |
| "Login: {username}", | |
| "Profile: {username}", | |
| "Known as {username}", | |
| "Tag me @{username}" | |
| ] | |
| } | |
| def _init_context_phrases(self): | |
| """Initialize context phrases for more natural text generation.""" | |
| # Opening phrases for generated text | |
| self.context_prefix = [ | |
| "Hello everyone,", | |
| "Dear Sir/Madam,", | |
| "To whom it may concern,", | |
| "Please note that", | |
| "For your reference,", | |
| "As requested,", | |
| "I would like to inform you that", | |
| "This is to confirm that", | |
| "Please be advised that", | |
| "I am writing to tell you that" | |
| ] | |
| # Closing phrases for generated text | |
| self.context_suffix = [ | |
| "Thank you.", | |
| "Best regards.", | |
| "Please let me know if you need anything else.", | |
| "Looking forward to your response.", | |
| "Have a great day!", | |
| "Thanks for your attention.", | |
| "Feel free to contact me.", | |
| "I appreciate your help.", | |
| "Hope this helps.", | |
| "Let me know if you have questions." | |
| ] | |
| # Words to connect multiple PII elements | |
| self.connectors = [ | |
| " and ", " or ", ", ", ". Also, ", ". Additionally, " | |
| ] | |
| def _init_generators(self): | |
| """Initialize PII generators mapping.""" | |
| # Map PII types to their generator functions | |
| self.generators = { | |
| 'NAME_STUDENT': self.generate_name, | |
| 'EMAIL': self.generate_email, | |
| 'PHONE_NUM': self.generate_phone, | |
| 'STREET_ADDRESS': self.generate_address, | |
| 'ID_NUM': self.generate_id_num, | |
| 'URL_PERSONAL': self.generate_url, | |
| 'USERNAME': self.generate_username | |
| } | |
| # Map PII types to template placeholder keys | |
| self.format_keys = { | |
| 'NAME_STUDENT': 'name', | |
| 'EMAIL': 'email', | |
| 'PHONE_NUM': 'phone', | |
| 'STREET_ADDRESS': 'address', | |
| 'ID_NUM': 'id_num', | |
| 'URL_PERSONAL': 'url', | |
| 'USERNAME': 'username' | |
| } | |
| def generate_name(self): | |
| """Generate realistic person names.""" | |
| return self.fake.name() | |
| def generate_email(self): | |
| """Generate realistic email addresses.""" | |
| return self.fake.email() | |
| def generate_phone(self): | |
| """Generate realistic phone numbers in various formats.""" | |
| # Different phone number formats | |
| formats = [ | |
| "555-{:03d}-{:04d}", | |
| "(555) {:03d}-{:04d}", | |
| "555.{:03d}.{:04d}", | |
| "+1-555-{:03d}-{:04d}", | |
| "555{:03d}{:04d}" | |
| ] | |
| # Pick a random format and fill with random numbers | |
| format_choice = random.choice(formats) | |
| area = random.randint(100, 999) | |
| number = random.randint(1000, 9999) | |
| return format_choice.format(area, number) | |
| def generate_address(self): | |
| """Generate realistic street addresses.""" | |
| # Get address and replace newlines with commas | |
| return self.fake.address().replace('\n', ', ') | |
| def generate_id_num(self): | |
| """Generate various ID number formats.""" | |
| # Different ID number patterns | |
| formats = [ | |
| "{:06d}", # 6-digit ID | |
| "{:08d}", # 8-digit ID | |
| "ID{:05d}", # ID prefix | |
| "STU{:06d}", # Student ID | |
| "{:04d}-{:04d}", # Hyphenated | |
| "A{:07d}", # Letter prefix | |
| ] | |
| format_choice = random.choice(formats) | |
| # Handle hyphenated format differently | |
| if '-' in format_choice: | |
| return format_choice.format( | |
| random.randint(1000, 9999), | |
| random.randint(1000, 9999) | |
| ) | |
| else: | |
| return format_choice.format(random.randint(10000, 9999999)) | |
| def generate_url(self): | |
| """Generate personal website URLs.""" | |
| # Common personal website domains | |
| domains = ['github.com', 'linkedin.com', 'portfolio.com', | |
| 'personal.com', 'website.com'] | |
| username = self.fake.user_name() | |
| domain = random.choice(domains) | |
| return f"https://{domain}/{username}" | |
| def generate_username(self): | |
| """Generate usernames.""" | |
| return self.fake.user_name() | |
| def create_synthetic_example(self, pii_type, add_context=True): | |
| """Create a synthetic example with proper BIO labeling.""" | |
| # Generate the PII value | |
| pii_value = self.generators[pii_type]() | |
| # Choose a template and insert the PII | |
| template = random.choice(self.templates[pii_type]) | |
| format_key = self.format_keys[pii_type] | |
| sentence = template.format(**{format_key: pii_value}) | |
| # Optionally add context for more natural text | |
| if add_context and random.random() > 0.3: | |
| sentence = self._add_context(sentence) | |
| # Create tokens and labels | |
| tokens, labels = self._tokenize_and_label(sentence, pii_value, pii_type) | |
| return tokens, labels | |
| def create_mixed_example(self, pii_types, num_pii=2): | |
| """Create examples with multiple PII types.""" | |
| # Select which PII types to include | |
| selected_types = random.sample(pii_types, min(num_pii, len(pii_types))) | |
| all_tokens = [] | |
| all_labels = [] | |
| # Add opening context | |
| if random.random() > 0.3: | |
| prefix = random.choice(self.context_prefix) | |
| all_tokens.extend(prefix.split()) | |
| all_labels.extend(['O'] * len(prefix.split())) | |
| # Add each PII entity | |
| for i, pii_type in enumerate(selected_types): | |
| # Add connector between PII entities | |
| if i > 0 and random.random() > 0.5: | |
| connector = random.choice(self.connectors) | |
| all_tokens.extend(connector.strip().split()) | |
| all_labels.extend(['O'] * len(connector.strip().split())) | |
| # Generate PII example | |
| tokens, labels = self.create_synthetic_example(pii_type, add_context=False) | |
| all_tokens.extend(tokens) | |
| all_labels.extend(labels) | |
| # Add closing context | |
| if random.random() > 0.3: | |
| suffix = random.choice(self.context_suffix) | |
| all_tokens.extend(suffix.split()) | |
| all_labels.extend(['O'] * len(suffix.split())) | |
| return all_tokens, all_labels | |
| def _add_context(self, sentence): | |
| """Add context phrases to make text more natural.""" | |
| # Randomly add prefix | |
| if random.random() > 0.5: | |
| sentence = random.choice(self.context_prefix) + " " + sentence | |
| # Randomly add suffix | |
| if random.random() > 0.5: | |
| sentence = sentence + " " + random.choice(self.context_suffix) | |
| return sentence | |
| def _tokenize_and_label(self, sentence, pii_value, pii_type): | |
| """Tokenize sentence and apply BIO labels for PII.""" | |
| # Split sentence into tokens | |
| tokens = sentence.split() | |
| labels = ['O'] * len(tokens) | |
| # Split PII value into tokens | |
| pii_tokens = pii_value.split() | |
| # Find where PII appears in the sentence | |
| for i in range(len(tokens) - len(pii_tokens) + 1): | |
| # Check if tokens match the PII value | |
| if (tokens[i:i+len(pii_tokens)] == pii_tokens or | |
| ' '.join(tokens[i:i+len(pii_tokens)]).lower() == pii_value.lower()): | |
| # Apply BIO tagging | |
| labels[i] = f'B-{pii_type}' # Beginning | |
| for j in range(1, len(pii_tokens)): | |
| labels[i+j] = f'I-{pii_type}' # Inside | |
| break | |
| return tokens, labels | |
| def augment_dataset(self, original_data, target_samples_per_class=1000, mix_ratio=0.3): | |
| """Augment dataset with synthetic examples to balance PII classes.""" | |
| # Check current distribution | |
| label_counts = self._analyze_label_distribution(original_data) | |
| print("\nOriginal label distribution:") | |
| self._print_distribution(label_counts) | |
| # Generate synthetic data | |
| synthetic_tokens, synthetic_labels = self._generate_synthetic_data( | |
| label_counts, target_samples_per_class, mix_ratio | |
| ) | |
| # Add some non-PII examples for balance | |
| synthetic_tokens, synthetic_labels = self._add_non_pii_examples( | |
| synthetic_tokens, synthetic_labels | |
| ) | |
| # Combine original and synthetic data | |
| augmented_df = self._combine_and_shuffle( | |
| original_data, synthetic_tokens, synthetic_labels | |
| ) | |
| # Check new distribution | |
| new_label_counts = self._analyze_label_distribution(augmented_df) | |
| print("\nAugmented label distribution:") | |
| self._print_distribution(new_label_counts) | |
| return augmented_df | |
| def _analyze_label_distribution(self, data): | |
| """Analyze the distribution of PII labels in the dataset.""" | |
| label_counts = Counter() | |
| # Count each PII type | |
| for labels in data['labels']: | |
| for label in labels: | |
| if label != 'O': | |
| # Remove B- or I- prefix to get base label | |
| base_label = label.split('-')[1] if '-' in label else label | |
| label_counts[base_label] += 1 | |
| return label_counts | |
| def _print_distribution(self, label_counts): | |
| """Print label distribution statistics.""" | |
| total = sum(label_counts.values()) | |
| # Print each label count and percentage | |
| for label, count in label_counts.most_common(): | |
| percentage = (count / total * 100) if total > 0 else 0 | |
| print(f" {label:15} : {count:6,} ({percentage:5.2f}%)") | |
| def _generate_synthetic_data(self, label_counts, target_samples, mix_ratio): | |
| """Generate synthetic PII examples based on current distribution.""" | |
| synthetic_tokens = [] | |
| synthetic_labels = [] | |
| # Generate examples for each PII type | |
| for pii_type in self.templates.keys(): | |
| current_count = label_counts.get(pii_type, 0) | |
| needed = max(0, target_samples - current_count) | |
| if needed == 0: | |
| continue | |
| print(f"\nGenerating {needed} synthetic examples for {pii_type}") | |
| # Generate single PII examples | |
| single_count = int(needed * (1 - mix_ratio)) | |
| for _ in range(single_count): | |
| tokens, labels = self.create_synthetic_example(pii_type) | |
| synthetic_tokens.append(tokens) | |
| synthetic_labels.append(labels) | |
| # Generate mixed PII examples | |
| mixed_count = int(needed * mix_ratio) | |
| for _ in range(mixed_count): | |
| # Make sure current PII type is included | |
| other_types = [t for t in self.templates.keys() if t != pii_type] | |
| selected_types = [pii_type] + random.sample( | |
| other_types, min(1, len(other_types)) | |
| ) | |
| tokens, labels = self.create_mixed_example(selected_types, num_pii=2) | |
| synthetic_tokens.append(tokens) | |
| synthetic_labels.append(labels) | |
| return synthetic_tokens, synthetic_labels | |
| def _add_non_pii_examples(self, synthetic_tokens, synthetic_labels): | |
| """Add examples without PII (all 'O' labels) for balance.""" | |
| # Add 10% non-PII examples | |
| num_non_pii = int(len(synthetic_tokens) * 0.1) | |
| for _ in range(num_non_pii): | |
| # Generate random text without PII | |
| sentence = self.fake.text(max_nb_chars=100) | |
| tokens = sentence.split() | |
| labels = ['O'] * len(tokens) | |
| synthetic_tokens.append(tokens) | |
| synthetic_labels.append(labels) | |
| return synthetic_tokens, synthetic_labels | |
| def _combine_and_shuffle(self, original_data, synthetic_tokens, synthetic_labels): | |
| """Combine original and synthetic data, then shuffle.""" | |
| # Merge all data | |
| all_tokens = original_data['tokens'].tolist() + synthetic_tokens | |
| all_labels = original_data['labels'].tolist() + synthetic_labels | |
| # Create new dataframe | |
| augmented_data = pd.DataFrame({ | |
| 'tokens': all_tokens, | |
| 'labels': all_labels | |
| }) | |
| # Shuffle the data | |
| augmented_data = augmented_data.sample(frac=1, random_state=42).reset_index(drop=True) | |
| print(f"\nTotal augmented samples: {len(augmented_data):,}") | |
| return augmented_data | |
| def calculate_class_weights(data, label_vocab): | |
| """Calculate class weights for balanced loss function.""" | |
| # Count occurrences of each label | |
| label_counts = Counter() | |
| for labels in data['labels']: | |
| for label in labels: | |
| label_id = label_vocab.word2idx.get(label.lower(), 0) | |
| label_counts[label_id] += 1 | |
| # Calculate weights based on inverse frequency | |
| total_samples = sum(label_counts.values()) | |
| num_classes = len(label_vocab) | |
| weights = torch.zeros(num_classes) | |
| for class_id, count in label_counts.items(): | |
| if count > 0: | |
| # Inverse frequency weighting | |
| weights[class_id] = total_samples / (num_classes * count) | |
| # Normalize the weights | |
| weights = weights / weights.sum() * num_classes | |
| # Prevent extreme weights | |
| weights = torch.clamp(weights, min=0.1, max=10.0) | |
| # Don't weight padding tokens | |
| weights[0] = 0.0 | |
| return weights | |
| if __name__ == '__main__': | |
| """Example usage of the augmentation module.""" | |
| # Load original training data | |
| print("Loading original training data...") | |
| original_data = pd.read_json('train.json') | |
| print(f"Original dataset size: {len(original_data):,}") | |
| # Create augmenter instance | |
| augmenter = PIIDataAugmenter(seed=42) | |
| # Run augmentation | |
| print("\n" + "="*60) | |
| print("Starting data augmentation...") | |
| print("="*60) | |
| augmented_data = augmenter.augment_dataset( | |
| original_data, | |
| target_samples_per_class=2000, | |
| mix_ratio=0.3 | |
| ) | |
| # Save the augmented dataset | |
| output_path = './train_augmented.json' | |
| augmented_data.to_json(output_path, orient='records', lines=True) | |
| print(f"\nSaved augmented data to {output_path}") |