Spaces:
Sleeping
Sleeping
| # app.py | |
| """ | |
| UEBA Risk Scoring demo (Gradio + Hugging Face Spaces) | |
| - Train an unsupervised anomaly detector (IsolationForest) on historical logs | |
| - Build user baseline profiles (devices, IPs, common country, frequent actions) | |
| - Score new events with a blended risk score (model anomaly + rule signals) | |
| Expected CSV schema for both training and scoring: | |
| user,timestamp,action,success,country,device,ip | |
| Where: | |
| - user: string identifier | |
| - timestamp: ISO8601 or any pandas-parsable datetime | |
| - action: free-form string (e.g., 'login', 'file_download', 'admin_change') | |
| - success: 1 or 0 (e.g., login success flag; use 1 for non-login actions) | |
| - country: two-letter or name, free-form string | |
| - device: string identifier | |
| - ip: string identifier | |
| This is a simplified educational demo -- not production security tooling. | |
| """ | |
| import os | |
| import pickle | |
| import json | |
| from datetime import datetime | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.preprocessing import MinMaxScaler | |
| import gradio as gr | |
| ARTIFACT_DIR = "artifacts" | |
| MODEL_PATH = os.path.join(ARTIFACT_DIR, "isolation_forest.pkl") | |
| PROFILE_PATH = os.path.join(ARTIFACT_DIR, "baseline_profiles.json") | |
| SCALER_PATH = os.path.join(ARTIFACT_DIR, "feature_scaler.pkl") | |
| ANOMALY_RANGE_PATH = os.path.join(ARTIFACT_DIR, "anomaly_range.json") | |
| FEATURES_JSON = os.path.join(ARTIFACT_DIR, "features.json") | |
| os.makedirs(ARTIFACT_DIR, exist_ok=True) | |
| FEATURE_COLUMNS = [ | |
| "hour", | |
| "time_since_last_minutes", | |
| "failed_login", | |
| "is_night", | |
| "location_change", | |
| "new_device", | |
| "new_ip", | |
| "rare_action", | |
| "impossible_travel" | |
| ] | |
| RULE_WEIGHTS = { | |
| "failed_login": 25, | |
| "is_night": 10, | |
| "location_change": 20, | |
| "new_device": 15, | |
| "new_ip": 10, | |
| "rare_action": 10, | |
| "impossible_travel": 25, | |
| } | |
| # ----------------- | |
| # Feature Engineering | |
| # ----------------- | |
| def _parse_time(ts): | |
| try: | |
| return pd.to_datetime(ts, errors='coerce') | |
| except Exception: | |
| return pd.NaT | |
| def build_baseline_profiles(df: pd.DataFrame): | |
| """Create per-user baseline: common_country, known_devices, known_ips, action_counts.""" | |
| profiles = {} | |
| for user, g in df.groupby("user"): | |
| # common country = mode | |
| common_country = g["country"].mode().iloc[0] if not g["country"].mode().empty else None | |
| devices = sorted(list(set(g["device"].dropna().astype(str)))) | |
| ips = sorted(list(set(g["ip"].dropna().astype(str)))) | |
| action_counts = g["action"].value_counts().to_dict() | |
| profiles[user] = { | |
| "common_country": common_country, | |
| "devices": devices, | |
| "ips": ips, | |
| "action_counts": action_counts, | |
| "total_actions": int(g.shape[0]) | |
| } | |
| return profiles | |
| def extract_features(df: pd.DataFrame, profiles: dict): | |
| df = df.copy() | |
| df["timestamp"] = df["timestamp"].apply(_parse_time) | |
| df.sort_values(["user", "timestamp"], inplace=True) | |
| # Basic fields | |
| df["hour"] = df["timestamp"].dt.hour.fillna(0) | |
| df["is_night"] = df["hour"].apply(lambda h: 1 if (h <= 5 or h >= 22) else 0) | |
| df["failed_login"] = df["success"].apply(lambda x: 1 if str(x) in ["0", 0, False, "False"] else 0) | |
| # Time since last per user | |
| df["time_since_last_minutes"] = 0.0 | |
| last_time = {} | |
| for idx, row in df.iterrows(): | |
| u = row["user"] | |
| t = row["timestamp"] | |
| if pd.isna(t): | |
| df.at[idx, "time_since_last_minutes"] = 0.0 | |
| else: | |
| if u in last_time and not pd.isna(last_time[u]): | |
| delta = (t - last_time[u]).total_seconds() / 60.0 | |
| df.at[idx, "time_since_last_minutes"] = max(0.0, min(delta, 1440.0)) # clip 0..1 day | |
| else: | |
| df.at[idx, "time_since_last_minutes"] = 1440.0 | |
| last_time[u] = t | |
| # Profile-derived flags | |
| df["location_change"] = 0 | |
| df["new_device"] = 0 | |
| df["new_ip"] = 0 | |
| df["rare_action"] = 0 | |
| for idx, row in df.iterrows(): | |
| u = row["user"] | |
| country = str(row.get("country", "")) | |
| device = str(row.get("device", "")) | |
| ip = str(row.get("ip", "")) | |
| action = str(row.get("action", "")) | |
| prof = profiles.get(u, { | |
| "common_country": None, | |
| "devices": [], | |
| "ips": [], | |
| "action_counts": {}, | |
| "total_actions": 0, | |
| }) | |
| if prof.get("common_country") and country and country != prof.get("common_country"): | |
| df.at[idx, "location_change"] = 1 | |
| if device and device not in set(prof.get("devices", [])): | |
| df.at[idx, "new_device"] = 1 | |
| if ip and ip not in set(prof.get("ips", [])): | |
| df.at[idx, "new_ip"] = 1 | |
| total = max(1, prof.get("total_actions", 0)) | |
| count = prof.get("action_counts", {}).get(action, 0) | |
| rarity = count / total | |
| if rarity <= 0.05: | |
| df.at[idx, "rare_action"] = 1 | |
| # Impossible travel (simplified): location change with very short time gap | |
| df["impossible_travel"] = df.apply(lambda r: 1 if (r["location_change"] == 1 and r["time_since_last_minutes"] < 120) else 0, axis=1) | |
| # Keep only expected columns; fill NaNs | |
| feature_df = df[["user", "timestamp"] + FEATURE_COLUMNS].fillna(0) | |
| return feature_df | |
| # ----------------- | |
| # Training & Scoring | |
| # ----------------- | |
| def train_baseline(csv_file): | |
| try: | |
| df = pd.read_csv(csv_file) | |
| except Exception: | |
| # try excel | |
| df = pd.read_excel(csv_file, engine="openpyxl") | |
| # Validate schema | |
| required_cols = {"user", "timestamp", "action", "success", "country", "device", "ip"} | |
| missing = required_cols - set(df.columns) | |
| if missing: | |
| raise ValueError(f"Missing columns: {sorted(list(missing))}") | |
| # Build profiles | |
| profiles = build_baseline_profiles(df) | |
| feature_df = extract_features(df, profiles) | |
| # Fit scaler and model | |
| X = feature_df[FEATURE_COLUMNS].astype(float).values | |
| scaler = MinMaxScaler() | |
| X_scaled = scaler.fit_transform(X) | |
| iso = IsolationForest( | |
| n_estimators=200, | |
| contamination=0.02, # assume ~2% anomalies in baseline | |
| random_state=42, | |
| n_jobs=-1 | |
| ) | |
| iso.fit(X_scaled) | |
| # For scaling anomaly scores later | |
| decision_scores = iso.decision_function(X_scaled) | |
| # Lower decision_function -> more anomalous; we'll invert | |
| anomaly_raw = -decision_scores | |
| anom_min = float(np.min(anomaly_raw)) | |
| anom_max = float(np.max(anomaly_raw)) | |
| # Persist artifacts | |
| with open(MODEL_PATH, "wb") as f: | |
| pickle.dump(iso, f) | |
| with open(SCALER_PATH, "wb") as f: | |
| pickle.dump(scaler, f) | |
| with open(PROFILE_PATH, "w") as f: | |
| json.dump(profiles, f) | |
| with open(ANOMALY_RANGE_PATH, "w") as f: | |
| json.dump({"min": anom_min, "max": anom_max}, f) | |
| with open(FEATURES_JSON, "w") as f: | |
| json.dump(FEATURE_COLUMNS, f) | |
| summary = { | |
| "users": len(profiles), | |
| "events": int(df.shape[0]), | |
| "features_shape": list(X.shape), | |
| "anomaly_range": {"min": anom_min, "max": anom_max}, | |
| } | |
| return "Baseline trained ✅", pd.DataFrame(feature_df.head(10)), json.dumps(summary, indent=2) | |
| def _load_artifacts(): | |
| if not (os.path.exists(MODEL_PATH) and os.path.exists(SCALER_PATH) and os.path.exists(PROFILE_PATH) and os.path.exists(ANOMALY_RANGE_PATH)): | |
| raise RuntimeError("Artifacts not found. Please train the baseline first.") | |
| with open(MODEL_PATH, "rb") as f: | |
| iso = pickle.load(f) | |
| with open(SCALER_PATH, "rb") as f: | |
| scaler = pickle.load(f) | |
| with open(PROFILE_PATH, "r") as f: | |
| profiles = json.load(f) | |
| with open(ANOMALY_RANGE_PATH, "r") as f: | |
| anomaly_range = json.load(f) | |
| return iso, scaler, profiles, anomaly_range | |
| def _blend_risk(anomaly_raw, rule_risk): | |
| # Normalize anomaly_raw to 0..100 using training range | |
| with open(ANOMALY_RANGE_PATH, "r") as f: | |
| rng = json.load(f) | |
| mn, mx = rng["min"], rng["max"] | |
| if mx <= mn: | |
| anom_norm = 50.0 | |
| else: | |
| anom_norm = 100.0 * (anomaly_raw - mn) / (mx - mn) | |
| anom_norm = float(np.clip(anom_norm, 0, 100)) | |
| # Blend: 60% model, 40% rules | |
| final = 0.6 * anom_norm + 0.4 * rule_risk | |
| return float(np.clip(final, 0, 100)), float(anom_norm) | |
| def score_events(csv_file): | |
| iso, scaler, profiles, _ = _load_artifacts() | |
| try: | |
| df = pd.read_csv(csv_file) | |
| except Exception: | |
| df = pd.read_excel(csv_file, engine="openpyxl") | |
| required_cols = {"user", "timestamp", "action", "success", "country", "device", "ip"} | |
| missing = required_cols - set(df.columns) | |
| if missing: | |
| raise ValueError(f"Missing columns: {sorted(list(missing))}") | |
| feats = extract_features(df, profiles) | |
| X = feats[FEATURE_COLUMNS].astype(float).values | |
| X_scaled = scaler.transform(X) | |
| decision_scores = iso.decision_function(X_scaled) | |
| anomaly_raw = -decision_scores | |
| # Compute rule risk and reasons | |
| rule_risks = [] | |
| reasons = [] | |
| for idx, row in feats.iterrows(): | |
| rr = 0.0 | |
| rs = [] | |
| for k, w in RULE_WEIGHTS.items(): | |
| if row[k] == 1: | |
| rr += w | |
| rs.append(f"{k.replace('_', ' ').title()} (+{w})") | |
| rr = float(np.clip(rr, 0, 100)) | |
| rule_risks.append(rr) | |
| reasons.append("; ".join(rs) if rs else "None") | |
| final_scores = [] | |
| anom_norms = [] | |
| for a, rr in zip(anomaly_raw, rule_risks): | |
| final, anorm = _blend_risk(a, rr) | |
| final_scores.append(final) | |
| anom_norms.append(anorm) | |
| out = pd.DataFrame({ | |
| "user": feats["user"], | |
| "timestamp": feats["timestamp"], | |
| "risk_score": final_scores, | |
| "model_anomaly": anom_norms, | |
| "rule_risk": rule_risks, | |
| "reasons": reasons, | |
| "failed_login": feats["failed_login"], | |
| "is_night": feats["is_night"], | |
| "location_change": feats["location_change"], | |
| "new_device": feats["new_device"], | |
| "new_ip": feats["new_ip"], | |
| "rare_action": feats["rare_action"], | |
| "impossible_travel": feats["impossible_travel"], | |
| }) | |
| # Sort by highest risk first | |
| out.sort_values("risk_score", ascending=False, inplace=True) | |
| return out | |
| # ----------------- | |
| # Gradio UI | |
| # ----------------- | |
| def ui_train(file): | |
| if file is None: | |
| return "Please upload a CSV.", None, None | |
| status, head_df, summary = train_baseline(file.name) | |
| return status, head_df, summary | |
| def ui_score(file): | |
| if file is None: | |
| return None | |
| out_df = score_events(file.name) | |
| return out_df | |
| with gr.Blocks(title="UEBA Risk Scoring (Demo)") as demo: | |
| gr.Markdown(""" | |
| # UEBA Risk Scoring (Demo) | |
| Train an unsupervised anomaly detector on historical logs and score new events with a blended risk score. | |
| **Note:** This demo is simplified for illustration; tailor features, weights, and thresholds to your environment. | |
| """) | |
| with gr.Tab("1) Train Baseline"): | |
| gr.Markdown("Upload historical logs (CSV) to learn normal behavior.") | |
| train_file = gr.File(file_types=[".csv", ".xlsx"], label="Training data") | |
| train_btn = gr.Button("Train Baseline") | |
| train_status = gr.Markdown() | |
| train_head = gr.Dataframe(headers=None, interactive=False) | |
| train_summary = gr.JSON() | |
| train_btn.click(ui_train, inputs=[train_file], outputs=[train_status, train_head, train_summary]) | |
| with gr.Tab("2) Score Events"): | |
| gr.Markdown("Upload new events (CSV) to get risk scores.") | |
| score_file = gr.File(file_types=[".csv", ".xlsx"], label="Events to score") | |
| score_btn = gr.Button("Score") | |
| score_df = gr.Dataframe(interactive=False) | |
| score_btn.click(ui_score, inputs=[score_file], outputs=[score_df]) | |
| if __name__ == "__main__": | |
| demo.launch() | |