| import os |
| import joblib |
| import re |
|
|
| try: |
| from datasets import load_dataset, Dataset |
| DATASETS_AVAILABLE = True |
| except ImportError: |
| DATASETS_AVAILABLE = False |
|
|
| try: |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments |
| import torch |
| TRANSFORMERS_AVAILABLE = True |
| except ImportError: |
| TRANSFORMERS_AVAILABLE = False |
|
|
| |
| |
| SENSITIVE_KEYWORDS = [ |
| "internal use only", |
| "confidential", |
| "strictly private", |
| "personal & confidential", |
| "private", |
| "restricted", |
| "internal", |
| "not for distribution", |
| "do not share", |
| "proprietary", |
| "trade secret", |
| "classified", |
| "sensitive", |
| "bank statement", |
| "invoice", |
| "salary", |
| "contract", |
| "agreement", |
| "non disclosure", |
| "passport", |
| "social security", |
| "ssn", |
| "date of birth", |
| "credit card", |
| "identity", |
| "id number", |
| "company confidential", |
| "staff only", |
| "management only", |
| "internal only" |
| ] |
|
|
| def build_dataset_synthetic(): |
| """Builds a dataset ONLY using custom strict words and the full HuggingFace Nemotron-PII dataset.""" |
| X, y = [], [] |
| |
| |
| for kw in SENSITIVE_KEYWORDS: |
| kw_clean = kw.lower() |
| X.append(kw_clean) |
| y.append(1) |
| |
| |
| benign_words = [ |
| "app", "main", "index", "style", "script", "logo", "banner", "test", "data", "assets", "public", |
| "docs", "src", "build", "report", "presentation", "meeting", "minutes", "faq", "help", "support", |
| "contact", "about", "info", "general", "misc", "other", "unknown", "untitled", "new", "old" |
| ] |
| for bw in benign_words: |
| X.append(bw) |
| y.append(0) |
|
|
| |
| if DATASETS_AVAILABLE: |
| try: |
| print("Downloading/Loading the FULL nvidia/Nemotron-PII dataset for deep training...") |
| dataset = load_dataset('nvidia/Nemotron-PII', split='train') |
| |
| |
| |
| count = 0 |
| for row in dataset: |
| |
| if count > 30000: break |
| |
| |
| has_pii = False |
| text = "" |
| |
| if 'tokens' in row and 'labels' in row: |
| text = " ".join(row['tokens']) |
| |
| for label in row['labels']: |
| if label != 0 and str(label) != 'O': |
| has_pii = True |
| break |
| elif 'text' in row: |
| text = row['text'] |
| |
| has_pii = len(row.get('spans', [])) > 0 or len(row.get('labels', [])) > 0 |
| |
| if not text: |
| continue |
| |
| X.append(text[:1000]) |
| y.append(1 if has_pii else 0) |
| count += 1 |
| |
| print(f"Successfully loaded {count} HuggingFace dataset samples.") |
| except Exception as e: |
| print(f"Skipping HuggingFace dataset loading due to error: {e}") |
| |
| return X, y |
|
|
| |
|
|
| COMPILED_REGEXES = { |
| kw.lower(): re.compile(r'(?:^|[^a-z0-9])' + re.escape(kw.lower()) + r'(?:[^a-z0-9]|$)') |
| for kw in SENSITIVE_KEYWORDS |
| if not kw.startswith('.') |
| } |
|
|
| def get_deep_trigger_explanation(text_chunk): |
| """ |
| Scans a chunk of text to find explicit keyword triggers. |
| Returns the keyword if found, otherwise falls back to ML description. |
| """ |
| text_lower = text_chunk.lower() |
| for kw in SENSITIVE_KEYWORDS: |
| kw_lower = kw.lower() |
| if kw_lower in text_lower: |
| if kw_lower.startswith('.'): |
| return kw |
| if COMPILED_REGEXES.get(kw_lower) and COMPILED_REGEXES[kw_lower].search(text_lower): |
| return kw |
| return "Sensitive content detected by AI analysis" |
|
|
| def train_deep_model(model_path): |
| if not TRANSFORMERS_AVAILABLE: |
| print("Error: The 'transformers' library is required to train the Deep ML Model.") |
| print("Please run: pip install transformers torch") |
| return |
|
|
| print("Building Deep Scanner dataset...") |
| X, y = build_dataset_synthetic() |
| |
| |
| |
| |
| X = X[:1000] |
| y = y[:1000] |
| |
| print(f"Deep Scanner Dataset generated: {len(X)} samples for fine-tuning.") |
| |
| |
| if model_path.endswith('.joblib'): |
| model_path = model_path.replace('.joblib', '_hf') |
| |
| os.makedirs(model_path, exist_ok=True) |
|
|
| print("Initializing HuggingFace DistilBERT Transformer...") |
| model_name = "distilbert-base-uncased" |
| tokenizer = AutoTokenizer.from_pretrained(model_name) |
| model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2) |
| |
| |
| hf_dataset = Dataset.from_dict({"text": X, "label": y}) |
| |
| def tokenize_function(examples): |
| return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512) |
| |
| tokenized_dataset = hf_dataset.map(tokenize_function, batched=True) |
|
|
| training_args = TrainingArguments( |
| output_dir=os.path.join(model_path, "checkpoints"), |
| learning_rate=2e-5, |
| per_device_train_batch_size=8, |
| num_train_epochs=1, |
| weight_decay=0.01, |
| save_strategy="no", |
| logging_steps=10 |
| ) |
|
|
| trainer = Trainer( |
| model=model, |
| args=training_args, |
| train_dataset=tokenized_dataset, |
| ) |
|
|
| print("Beginning Transformer Fine-Tuning (This will take a bit of time)...") |
| trainer.train() |
|
|
| print("Training complete! Saving transformer locally...") |
| model.save_pretrained(model_path) |
| tokenizer.save_pretrained(model_path) |
| |
| print(f"Deep Model successfully saved to -> {model_path}.") |
|
|
|
|