import os import joblib import re try: from datasets import load_dataset, Dataset DATASETS_AVAILABLE = True except ImportError: DATASETS_AVAILABLE = False try: from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments import torch TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False # ----------------- DEEP SCAN TRAINING DATA ----------------- # STRICT Sensitive Keywords for Deep Content Scans SENSITIVE_KEYWORDS = [ "internal use only", "confidential", "strictly private", "personal & confidential", "private", "restricted", "internal", "not for distribution", "do not share", "proprietary", "trade secret", "classified", "sensitive", "bank statement", "invoice", "salary", "contract", "agreement", "non disclosure", "passport", "social security", "ssn", "date of birth", "credit card", "identity", "id number", "company confidential", "staff only", "management only", "internal only" ] def build_dataset_synthetic(): """Builds a dataset ONLY using custom strict words and the full HuggingFace Nemotron-PII dataset.""" X, y = [], [] # 1. Add strict sensitive keywords directly as positive examples for kw in SENSITIVE_KEYWORDS: kw_clean = kw.lower() X.append(kw_clean) y.append(1) # 2. Add safe standard words to offset generic text (no longer huge dictionaries of random safe words) benign_words = [ "app", "main", "index", "style", "script", "logo", "banner", "test", "data", "assets", "public", "docs", "src", "build", "report", "presentation", "meeting", "minutes", "faq", "help", "support", "contact", "about", "info", "general", "misc", "other", "unknown", "untitled", "new", "old" ] for bw in benign_words: X.append(bw) y.append(0) # 3. Integrate HuggingFace Nemotron-PII dataset if the package is installed if DATASETS_AVAILABLE: try: print("Downloading/Loading the FULL nvidia/Nemotron-PII dataset for deep training...") dataset = load_dataset('nvidia/Nemotron-PII', split='train') # The dataset often has 'text' or 'tokens' and a 'labels' or 'pii_spans' column. # We add samples containing PII spans (positive) and those without (negative). count = 0 for row in dataset: # We cap at 30,000 to keep the laptop memory from ballooning during vectorization if count > 30000: break # Check for token structures which is common in NER HuggingFace datasets has_pii = False text = "" if 'tokens' in row and 'labels' in row: text = " ".join(row['tokens']) # If any label differs from 0 or 'O', it contains PII for label in row['labels']: if label != 0 and str(label) != 'O': has_pii = True break elif 'text' in row: text = row['text'] # Look for span lists or standard NER format has_pii = len(row.get('spans', [])) > 0 or len(row.get('labels', [])) > 0 if not text: continue X.append(text[:1000]) # Cap length to avoid massive feature spaces in TF-IDF y.append(1 if has_pii else 0) count += 1 print(f"Successfully loaded {count} HuggingFace dataset samples.") except Exception as e: print(f"Skipping HuggingFace dataset loading due to error: {e}") return X, y # ----------------- LOGIC ----------------- COMPILED_REGEXES = { kw.lower(): re.compile(r'(?:^|[^a-z0-9])' + re.escape(kw.lower()) + r'(?:[^a-z0-9]|$)') for kw in SENSITIVE_KEYWORDS if not kw.startswith('.') } def get_deep_trigger_explanation(text_chunk): """ Scans a chunk of text to find explicit keyword triggers. Returns the keyword if found, otherwise falls back to ML description. """ text_lower = text_chunk.lower() for kw in SENSITIVE_KEYWORDS: kw_lower = kw.lower() if kw_lower in text_lower: if kw_lower.startswith('.'): return kw if COMPILED_REGEXES.get(kw_lower) and COMPILED_REGEXES[kw_lower].search(text_lower): return kw return "Sensitive content detected by AI analysis" def train_deep_model(model_path): if not TRANSFORMERS_AVAILABLE: print("Error: The 'transformers' library is required to train the Deep ML Model.") print("Please run: pip install transformers torch") return print("Building Deep Scanner dataset...") X, y = build_dataset_synthetic() # HuggingFace Transformer Models take significantly longer to train, so we cap our dataset down to # an absolute maximum of 1,000 samples for a quick local CPU-style fine tune, otherwise # this training will take literally hours. X = X[:1000] y = y[:1000] print(f"Deep Scanner Dataset generated: {len(X)} samples for fine-tuning.") # Ensure model path is a directory for HuggingFace if model_path.endswith('.joblib'): model_path = model_path.replace('.joblib', '_hf') os.makedirs(model_path, exist_ok=True) print("Initializing HuggingFace DistilBERT Transformer...") model_name = "distilbert-base-uncased" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2) # Create huggingface dataset structures hf_dataset = Dataset.from_dict({"text": X, "label": y}) def tokenize_function(examples): return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512) tokenized_dataset = hf_dataset.map(tokenize_function, batched=True) training_args = TrainingArguments( output_dir=os.path.join(model_path, "checkpoints"), learning_rate=2e-5, per_device_train_batch_size=8, num_train_epochs=1, weight_decay=0.01, save_strategy="no", logging_steps=10 ) trainer = Trainer( model=model, args=training_args, train_dataset=tokenized_dataset, ) print("Beginning Transformer Fine-Tuning (This will take a bit of time)...") trainer.train() print("Training complete! Saving transformer locally...") model.save_pretrained(model_path) tokenizer.save_pretrained(model_path) print(f"Deep Model successfully saved to -> {model_path}.")