Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| import joblib | |
| import shap | |
| import traceback | |
| from flask import Flask, request, jsonify | |
| from urllib.parse import urlparse, parse_qs | |
| # Initialize Flask | |
| app = Flask("Bot detector") | |
| def home(): | |
| return "✅ Welcome to the Bot Prediction API!" | |
| # Load models and utilities | |
| model = joblib.load("model.joblib") | |
| encoders = joblib.load("encoders.joblib") | |
| scaler = joblib.load("scaler.joblib") | |
| if_model = joblib.load("best_if_model.joblib") | |
| svm_model = joblib.load("best_svm_model.joblib") | |
| iso_scaler = joblib.load("iso_scaler.joblib") | |
| svm_scaler = joblib.load("svm_scaler.joblib") | |
| feature_names = joblib.load("feature_names.joblib") | |
| explainer = shap.TreeExplainer(model) | |
| def sigmoid(x): | |
| return 1 / (1 + np.exp(-x)) | |
| def parse_url_params(url): | |
| try: | |
| query = urlparse(url).query | |
| return {k: v[0] if isinstance(v, list) else v for k, v in parse_qs(query).items()} | |
| except Exception: | |
| return {} | |
| def prepare_features(row_dict): | |
| base = { | |
| 'region': row_dict.get('region', 'unknown'), | |
| 'browser': row_dict.get('browser', 'unknown'), | |
| 'device': row_dict.get('device', 'unknown'), | |
| 'd': row_dict.get('d', '') | |
| } | |
| query_params = parse_url_params(base['d']) | |
| combined = {**base, **query_params} | |
| combined.pop('d', None) | |
| for col in feature_names: | |
| if col not in combined and col not in ["iso_anomaly_prob", "svm_anomaly_prob"]: | |
| combined[col] = "unknown" | |
| df = pd.DataFrame([combined]) | |
| for col in df.columns: | |
| if col in encoders: | |
| try: | |
| df[col] = encoders[col].transform(df[col].astype(str)) | |
| except: | |
| df[col] = encoders[col].transform(["unknown"])[0] | |
| else: | |
| df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) | |
| df_scaled = scaler.transform(df) | |
| iso_score = if_model.decision_function(df_scaled).reshape(-1, 1) | |
| svm_score = svm_model.decision_function(df_scaled).reshape(-1, 1) | |
| iso_prob = float(1 - iso_scaler.transform(iso_score)[0][0]) | |
| svm_prob = float(1 - svm_scaler.transform(svm_score)[0][0]) | |
| df['iso_anomaly_prob'] = iso_prob | |
| df['svm_anomaly_prob'] = svm_prob | |
| return df[feature_names] | |
| def generate_shap_bot_attack_paragraph(index, shap_values, X, encoders=None, class_index=1, top_n=10): | |
| if isinstance(shap_values, list): | |
| shap_vals = shap_values[class_index][index] | |
| base_val = explainer.expected_value[class_index] | |
| else: | |
| shap_vals = shap_values[index] | |
| base_val = explainer.expected_value | |
| if not np.isscalar(base_val) and len(np.shape(base_val)) > 0: | |
| base_val = base_val[class_index] if len(base_val) > class_index else base_val[0] | |
| shap_scalar_vals = [float(s[0]) if isinstance(s, np.ndarray) else float(s) for s in shap_vals] | |
| x_vals = X.iloc[index] | |
| feature_names = X.columns | |
| decoded_vals = {} | |
| for col in feature_names: | |
| val = x_vals[col] | |
| try: | |
| if encoders and col in encoders: | |
| decoded_vals[col] = encoders[col].inverse_transform([int(val)])[0] | |
| else: | |
| decoded_vals[col] = val | |
| except: | |
| decoded_vals[col] = val | |
| feature_contribs = list(zip(feature_names, decoded_vals.values(), shap_scalar_vals)) | |
| feature_contribs = sorted(feature_contribs, key=lambda x: abs(x[2]), reverse=True)[:top_n] | |
| positive_impacts = [] | |
| negative_impacts = [] | |
| for fname, fval, sval in feature_contribs: | |
| line = f" - {fname:20} = {str(fval):<20} contributed {sval:.4f}" | |
| if sval > 0: | |
| positive_impacts.append(line) | |
| elif sval < 0: | |
| negative_impacts.append(line) | |
| final_log_odds = base_val + np.sum(shap_scalar_vals) | |
| explanation = f"\n==== SHAP Explanation for Bot Attack Classification ====\n" | |
| explanation += f"Base value (log-odds for class 1) : {base_val:.4f}\n" | |
| explanation += f"Predicted log-odds (class 1) : {final_log_odds:.4f}\n\n" | |
| if positive_impacts: | |
| explanation += "🔺 Factors that INCREASED Bot Likelihood:\n" + "\n".join(positive_impacts) + "\n\n" | |
| if negative_impacts: | |
| explanation += "🔻 Factors that DECREASED Bot Likelihood:\n" + "\n".join(negative_impacts) + "\n\n" | |
| explanation += "📝 These features collectively explain the model's decision.\n" | |
| return explanation | |
| def predict(): | |
| try: | |
| row = request.get_json() | |
| X = prepare_features(row) | |
| probs = model.predict_proba(X)[0] | |
| pred_label = int(model.classes_[np.argmax(probs)]) | |
| shap_values = explainer.shap_values(X) | |
| explanation = generate_shap_bot_attack_paragraph(0, shap_values, X, encoders) | |
| return jsonify({ | |
| "Prediction": "Bot Attack" if pred_label == 1 else "Legitimate", | |
| "SHAP Explanation": explanation | |
| }) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return jsonify({"error": str(e)}), 500 | |