import numpy as np import pandas as pd import joblib import shap import traceback from flask import Flask, request, jsonify from urllib.parse import urlparse, parse_qs # Initialize Flask app = Flask("Bot detector") @app.get('/') def home(): return "✅ Welcome to the Bot Prediction API!" # Load models and utilities model = joblib.load("model.joblib") encoders = joblib.load("encoders.joblib") scaler = joblib.load("scaler.joblib") if_model = joblib.load("best_if_model.joblib") svm_model = joblib.load("best_svm_model.joblib") iso_scaler = joblib.load("iso_scaler.joblib") svm_scaler = joblib.load("svm_scaler.joblib") feature_names = joblib.load("feature_names.joblib") explainer = shap.TreeExplainer(model) def sigmoid(x): return 1 / (1 + np.exp(-x)) def parse_url_params(url): try: query = urlparse(url).query return {k: v[0] if isinstance(v, list) else v for k, v in parse_qs(query).items()} except Exception: return {} def prepare_features(row_dict): base = { 'region': row_dict.get('region', 'unknown'), 'browser': row_dict.get('browser', 'unknown'), 'device': row_dict.get('device', 'unknown'), 'd': row_dict.get('d', '') } query_params = parse_url_params(base['d']) combined = {**base, **query_params} combined.pop('d', None) for col in feature_names: if col not in combined and col not in ["iso_anomaly_prob", "svm_anomaly_prob"]: combined[col] = "unknown" df = pd.DataFrame([combined]) for col in df.columns: if col in encoders: try: df[col] = encoders[col].transform(df[col].astype(str)) except: df[col] = encoders[col].transform(["unknown"])[0] else: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) df_scaled = scaler.transform(df) iso_score = if_model.decision_function(df_scaled).reshape(-1, 1) svm_score = svm_model.decision_function(df_scaled).reshape(-1, 1) iso_prob = float(1 - iso_scaler.transform(iso_score)[0][0]) svm_prob = float(1 - svm_scaler.transform(svm_score)[0][0]) df['iso_anomaly_prob'] = iso_prob df['svm_anomaly_prob'] = svm_prob return df[feature_names] def generate_shap_bot_attack_paragraph(index, shap_values, X, encoders=None, class_index=1, top_n=10): if isinstance(shap_values, list): shap_vals = shap_values[class_index][index] base_val = explainer.expected_value[class_index] else: shap_vals = shap_values[index] base_val = explainer.expected_value if not np.isscalar(base_val) and len(np.shape(base_val)) > 0: base_val = base_val[class_index] if len(base_val) > class_index else base_val[0] shap_scalar_vals = [float(s[0]) if isinstance(s, np.ndarray) else float(s) for s in shap_vals] x_vals = X.iloc[index] feature_names = X.columns decoded_vals = {} for col in feature_names: val = x_vals[col] try: if encoders and col in encoders: decoded_vals[col] = encoders[col].inverse_transform([int(val)])[0] else: decoded_vals[col] = val except: decoded_vals[col] = val feature_contribs = list(zip(feature_names, decoded_vals.values(), shap_scalar_vals)) feature_contribs = sorted(feature_contribs, key=lambda x: abs(x[2]), reverse=True)[:top_n] positive_impacts = [] negative_impacts = [] for fname, fval, sval in feature_contribs: line = f" - {fname:20} = {str(fval):<20} contributed {sval:.4f}" if sval > 0: positive_impacts.append(line) elif sval < 0: negative_impacts.append(line) final_log_odds = base_val + np.sum(shap_scalar_vals) explanation = f"\n==== SHAP Explanation for Bot Attack Classification ====\n" explanation += f"Base value (log-odds for class 1) : {base_val:.4f}\n" explanation += f"Predicted log-odds (class 1) : {final_log_odds:.4f}\n\n" if positive_impacts: explanation += "🔺 Factors that INCREASED Bot Likelihood:\n" + "\n".join(positive_impacts) + "\n\n" if negative_impacts: explanation += "🔻 Factors that DECREASED Bot Likelihood:\n" + "\n".join(negative_impacts) + "\n\n" explanation += "📝 These features collectively explain the model's decision.\n" return explanation @app.post('/v1/predict') def predict(): try: row = request.get_json() X = prepare_features(row) probs = model.predict_proba(X)[0] pred_label = int(model.classes_[np.argmax(probs)]) shap_values = explainer.shap_values(X) explanation = generate_shap_bot_attack_paragraph(0, shap_values, X, encoders) return jsonify({ "Prediction": "Bot Attack" if pred_label == 1 else "Legitimate", "SHAP Explanation": explanation }) except Exception as e: traceback.print_exc() return jsonify({"error": str(e)}), 500