s3shastra / deep_ml_engine.py
Atharv834
Deploy S3Shastra backend - FastAPI + scanners + ML models
6a4dcb6
import os
import joblib
import re
try:
from datasets import load_dataset, Dataset
DATASETS_AVAILABLE = True
except ImportError:
DATASETS_AVAILABLE = False
try:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
import torch
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
# ----------------- DEEP SCAN TRAINING DATA -----------------
# STRICT Sensitive Keywords for Deep Content Scans
SENSITIVE_KEYWORDS = [
"internal use only",
"confidential",
"strictly private",
"personal & confidential",
"private",
"restricted",
"internal",
"not for distribution",
"do not share",
"proprietary",
"trade secret",
"classified",
"sensitive",
"bank statement",
"invoice",
"salary",
"contract",
"agreement",
"non disclosure",
"passport",
"social security",
"ssn",
"date of birth",
"credit card",
"identity",
"id number",
"company confidential",
"staff only",
"management only",
"internal only"
]
def build_dataset_synthetic():
"""Builds a dataset ONLY using custom strict words and the full HuggingFace Nemotron-PII dataset."""
X, y = [], []
# 1. Add strict sensitive keywords directly as positive examples
for kw in SENSITIVE_KEYWORDS:
kw_clean = kw.lower()
X.append(kw_clean)
y.append(1)
# 2. Add safe standard words to offset generic text (no longer huge dictionaries of random safe words)
benign_words = [
"app", "main", "index", "style", "script", "logo", "banner", "test", "data", "assets", "public",
"docs", "src", "build", "report", "presentation", "meeting", "minutes", "faq", "help", "support",
"contact", "about", "info", "general", "misc", "other", "unknown", "untitled", "new", "old"
]
for bw in benign_words:
X.append(bw)
y.append(0)
# 3. Integrate HuggingFace Nemotron-PII dataset if the package is installed
if DATASETS_AVAILABLE:
try:
print("Downloading/Loading the FULL nvidia/Nemotron-PII dataset for deep training...")
dataset = load_dataset('nvidia/Nemotron-PII', split='train')
# The dataset often has 'text' or 'tokens' and a 'labels' or 'pii_spans' column.
# We add samples containing PII spans (positive) and those without (negative).
count = 0
for row in dataset:
# We cap at 30,000 to keep the laptop memory from ballooning during vectorization
if count > 30000: break
# Check for token structures which is common in NER HuggingFace datasets
has_pii = False
text = ""
if 'tokens' in row and 'labels' in row:
text = " ".join(row['tokens'])
# If any label differs from 0 or 'O', it contains PII
for label in row['labels']:
if label != 0 and str(label) != 'O':
has_pii = True
break
elif 'text' in row:
text = row['text']
# Look for span lists or standard NER format
has_pii = len(row.get('spans', [])) > 0 or len(row.get('labels', [])) > 0
if not text:
continue
X.append(text[:1000]) # Cap length to avoid massive feature spaces in TF-IDF
y.append(1 if has_pii else 0)
count += 1
print(f"Successfully loaded {count} HuggingFace dataset samples.")
except Exception as e:
print(f"Skipping HuggingFace dataset loading due to error: {e}")
return X, y
# ----------------- LOGIC -----------------
COMPILED_REGEXES = {
kw.lower(): re.compile(r'(?:^|[^a-z0-9])' + re.escape(kw.lower()) + r'(?:[^a-z0-9]|$)')
for kw in SENSITIVE_KEYWORDS
if not kw.startswith('.')
}
def get_deep_trigger_explanation(text_chunk):
"""
Scans a chunk of text to find explicit keyword triggers.
Returns the keyword if found, otherwise falls back to ML description.
"""
text_lower = text_chunk.lower()
for kw in SENSITIVE_KEYWORDS:
kw_lower = kw.lower()
if kw_lower in text_lower:
if kw_lower.startswith('.'):
return kw
if COMPILED_REGEXES.get(kw_lower) and COMPILED_REGEXES[kw_lower].search(text_lower):
return kw
return "Sensitive content detected by AI analysis"
def train_deep_model(model_path):
if not TRANSFORMERS_AVAILABLE:
print("Error: The 'transformers' library is required to train the Deep ML Model.")
print("Please run: pip install transformers torch")
return
print("Building Deep Scanner dataset...")
X, y = build_dataset_synthetic()
# HuggingFace Transformer Models take significantly longer to train, so we cap our dataset down to
# an absolute maximum of 1,000 samples for a quick local CPU-style fine tune, otherwise
# this training will take literally hours.
X = X[:1000]
y = y[:1000]
print(f"Deep Scanner Dataset generated: {len(X)} samples for fine-tuning.")
# Ensure model path is a directory for HuggingFace
if model_path.endswith('.joblib'):
model_path = model_path.replace('.joblib', '_hf')
os.makedirs(model_path, exist_ok=True)
print("Initializing HuggingFace DistilBERT Transformer...")
model_name = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
# Create huggingface dataset structures
hf_dataset = Dataset.from_dict({"text": X, "label": y})
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512)
tokenized_dataset = hf_dataset.map(tokenize_function, batched=True)
training_args = TrainingArguments(
output_dir=os.path.join(model_path, "checkpoints"),
learning_rate=2e-5,
per_device_train_batch_size=8,
num_train_epochs=1,
weight_decay=0.01,
save_strategy="no",
logging_steps=10
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset,
)
print("Beginning Transformer Fine-Tuning (This will take a bit of time)...")
trainer.train()
print("Training complete! Saving transformer locally...")
model.save_pretrained(model_path)
tokenizer.save_pretrained(model_path)
print(f"Deep Model successfully saved to -> {model_path}.")