DuckDuckGoBackend / bot_detector_api.py
sheltonmaharesh's picture
Deploy backend Flask app
70dd390 verified
import numpy as np
import pandas as pd
import joblib
import shap
import traceback
from flask import Flask, request, jsonify
from urllib.parse import urlparse, parse_qs
# Initialize Flask
app = Flask("Bot detector")
@app.get('/')
def home():
return "✅ Welcome to the Bot Prediction API!"
# Load models and utilities
model = joblib.load("model.joblib")
encoders = joblib.load("encoders.joblib")
scaler = joblib.load("scaler.joblib")
if_model = joblib.load("best_if_model.joblib")
svm_model = joblib.load("best_svm_model.joblib")
iso_scaler = joblib.load("iso_scaler.joblib")
svm_scaler = joblib.load("svm_scaler.joblib")
feature_names = joblib.load("feature_names.joblib")
explainer = shap.TreeExplainer(model)
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def parse_url_params(url):
try:
query = urlparse(url).query
return {k: v[0] if isinstance(v, list) else v for k, v in parse_qs(query).items()}
except Exception:
return {}
def prepare_features(row_dict):
base = {
'region': row_dict.get('region', 'unknown'),
'browser': row_dict.get('browser', 'unknown'),
'device': row_dict.get('device', 'unknown'),
'd': row_dict.get('d', '')
}
query_params = parse_url_params(base['d'])
combined = {**base, **query_params}
combined.pop('d', None)
for col in feature_names:
if col not in combined and col not in ["iso_anomaly_prob", "svm_anomaly_prob"]:
combined[col] = "unknown"
df = pd.DataFrame([combined])
for col in df.columns:
if col in encoders:
try:
df[col] = encoders[col].transform(df[col].astype(str))
except:
df[col] = encoders[col].transform(["unknown"])[0]
else:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
df_scaled = scaler.transform(df)
iso_score = if_model.decision_function(df_scaled).reshape(-1, 1)
svm_score = svm_model.decision_function(df_scaled).reshape(-1, 1)
iso_prob = float(1 - iso_scaler.transform(iso_score)[0][0])
svm_prob = float(1 - svm_scaler.transform(svm_score)[0][0])
df['iso_anomaly_prob'] = iso_prob
df['svm_anomaly_prob'] = svm_prob
return df[feature_names]
def generate_shap_bot_attack_paragraph(index, shap_values, X, encoders=None, class_index=1, top_n=10):
if isinstance(shap_values, list):
shap_vals = shap_values[class_index][index]
base_val = explainer.expected_value[class_index]
else:
shap_vals = shap_values[index]
base_val = explainer.expected_value
if not np.isscalar(base_val) and len(np.shape(base_val)) > 0:
base_val = base_val[class_index] if len(base_val) > class_index else base_val[0]
shap_scalar_vals = [float(s[0]) if isinstance(s, np.ndarray) else float(s) for s in shap_vals]
x_vals = X.iloc[index]
feature_names = X.columns
decoded_vals = {}
for col in feature_names:
val = x_vals[col]
try:
if encoders and col in encoders:
decoded_vals[col] = encoders[col].inverse_transform([int(val)])[0]
else:
decoded_vals[col] = val
except:
decoded_vals[col] = val
feature_contribs = list(zip(feature_names, decoded_vals.values(), shap_scalar_vals))
feature_contribs = sorted(feature_contribs, key=lambda x: abs(x[2]), reverse=True)[:top_n]
positive_impacts = []
negative_impacts = []
for fname, fval, sval in feature_contribs:
line = f" - {fname:20} = {str(fval):<20} contributed {sval:.4f}"
if sval > 0:
positive_impacts.append(line)
elif sval < 0:
negative_impacts.append(line)
final_log_odds = base_val + np.sum(shap_scalar_vals)
explanation = f"\n==== SHAP Explanation for Bot Attack Classification ====\n"
explanation += f"Base value (log-odds for class 1) : {base_val:.4f}\n"
explanation += f"Predicted log-odds (class 1) : {final_log_odds:.4f}\n\n"
if positive_impacts:
explanation += "🔺 Factors that INCREASED Bot Likelihood:\n" + "\n".join(positive_impacts) + "\n\n"
if negative_impacts:
explanation += "🔻 Factors that DECREASED Bot Likelihood:\n" + "\n".join(negative_impacts) + "\n\n"
explanation += "📝 These features collectively explain the model's decision.\n"
return explanation
@app.post('/v1/predict')
def predict():
try:
row = request.get_json()
X = prepare_features(row)
probs = model.predict_proba(X)[0]
pred_label = int(model.classes_[np.argmax(probs)])
shap_values = explainer.shap_values(X)
explanation = generate_shap_bot_attack_paragraph(0, shap_values, X, encoders)
return jsonify({
"Prediction": "Bot Attack" if pred_label == 1 else "Legitimate",
"SHAP Explanation": explanation
})
except Exception as e:
traceback.print_exc()
return jsonify({"error": str(e)}), 500