Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from flask import Flask, request, jsonify | |
| # ----------------------- | |
| # Load serialized artifact | |
| # ----------------------- | |
| ARTIFACT_PATH = os.environ.get("ARTIFACT_PATH", "model_artifact.joblib") | |
| artifact = joblib.load(ARTIFACT_PATH) | |
| model = artifact["model"] | |
| feature_order = artifact["feature_order"] | |
| cap_bounds = artifact.get("cap_bounds", {}) | |
| # Optional security (only enforced if API_KEY is set) | |
| API_KEY = os.environ.get("API_KEY", None) | |
| app = Flask(__name__) | |
| def apply_feature_engineering(df_raw: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Takes raw input with original columns and creates engineered columns: | |
| - profile_completed_score (Low/Medium/High -> 1/2/3) | |
| - media_exposure_count (sum of Yes flags) | |
| - time_per_visit | |
| - total_page_views_est | |
| Then drops profile_completed (as used in training). | |
| """ | |
| df = df_raw.copy() | |
| # Profile mapping | |
| profile_map = {"Low": 1, "Medium": 2, "High": 3} | |
| df["profile_completed_score"] = df["profile_completed"].map(profile_map) | |
| # Media exposure count from Yes/No columns | |
| flag_cols = ["print_media_type1", "print_media_type2", "digital_media", "educational_channels", "referral"] | |
| yesno_map = {"Yes": 1, "No": 0} | |
| for c in flag_cols: | |
| df[c] = df[c].astype(str) | |
| df["media_exposure_count"] = sum(df[c].map(yesno_map) for c in flag_cols) | |
| # Engagement features | |
| df["website_visits"] = pd.to_numeric(df["website_visits"], errors="coerce") | |
| df["time_spent_on_website"] = pd.to_numeric(df["time_spent_on_website"], errors="coerce") | |
| df["page_views_per_visit"] = pd.to_numeric(df["page_views_per_visit"], errors="coerce") | |
| df["time_per_visit"] = np.where(df["website_visits"] > 0, df["time_spent_on_website"] / df["website_visits"], 0) | |
| df["total_page_views_est"] = df["website_visits"] * df["page_views_per_visit"] | |
| # Drop original ordinal source column (because training used score) | |
| if "profile_completed" in df.columns: | |
| df = df.drop(columns=["profile_completed"]) | |
| return df | |
| def apply_iqr_capping(df: pd.DataFrame) -> pd.DataFrame: | |
| """Clip selected numeric columns using training-time IQR bounds saved in artifact.""" | |
| df2 = df.copy() | |
| for col, b in cap_bounds.items(): | |
| if col in df2.columns: | |
| df2[col] = pd.to_numeric(df2[col], errors="coerce") | |
| df2[col] = df2[col].clip(lower=b["low"], upper=b["high"]) | |
| return df2 | |
| def validate_required_columns(df: pd.DataFrame) -> None: | |
| required = [ | |
| "age", | |
| "current_occupation", | |
| "first_interaction", | |
| "profile_completed", | |
| "website_visits", | |
| "time_spent_on_website", | |
| "page_views_per_visit", | |
| "last_activity", | |
| "print_media_type1", | |
| "print_media_type2", | |
| "digital_media", | |
| "educational_channels", | |
| "referral", | |
| ] | |
| missing = [c for c in required if c not in df.columns] | |
| if missing: | |
| raise ValueError(f"Missing required fields: {missing}") | |
| def build_model_input(df_raw: pd.DataFrame) -> pd.DataFrame: | |
| """Raw JSON -> feature engineered -> capped -> ordered columns for model.""" | |
| validate_required_columns(df_raw) | |
| df_fe = apply_feature_engineering(df_raw) | |
| df_fe = apply_iqr_capping(df_fe) | |
| # Keep only expected features and in correct order | |
| df_fe = df_fe.reindex(columns=feature_order) | |
| return df_fe | |
| def check_api_key(req): | |
| if API_KEY is None: | |
| return True | |
| return req.headers.get("x-api-key") == API_KEY | |
| # ----------------------- | |
| # Routes | |
| # ----------------------- | |
| def health(): | |
| return jsonify({"status": "ok"}), 200 | |
| def predict(): | |
| if not check_api_key(request): | |
| return jsonify({"error": "Unauthorized (invalid API key)"}), 401 | |
| payload = request.get_json(silent=True) | |
| if payload is None: | |
| return jsonify({"error": "Invalid JSON"}), 400 | |
| # Support single record (dict) OR multiple records (list of dicts) | |
| if isinstance(payload, dict): | |
| records = [payload] | |
| elif isinstance(payload, list): | |
| records = payload | |
| else: | |
| return jsonify({"error": "Payload must be a dict or list of dicts"}), 400 | |
| df_raw = pd.DataFrame(records) | |
| try: | |
| X_in = build_model_input(df_raw) | |
| # Predict probability and class | |
| if hasattr(model, "predict_proba"): | |
| proba = model.predict_proba(X_in)[:, 1] | |
| else: | |
| # fallback | |
| proba = model.predict(X_in).astype(float) | |
| pred = (proba >= 0.5).astype(int) | |
| out = [] | |
| for i in range(len(records)): | |
| out.append({ | |
| "converted_prediction": int(pred[i]), | |
| "conversion_probability": float(proba[i]) | |
| }) | |
| return jsonify({"predictions": out}), 200 | |
| except ValueError as ve: | |
| return jsonify({"error": str(ve)}), 400 | |
| except Exception as e: | |
| return jsonify({"error": "Internal server error", "details": str(e)}), 500 | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", "7860")) | |
| app.run(host="0.0.0.0", port=port, debug=False) | |