import pandas as pd import numpy as np from datetime import datetime from sqlalchemy.orm import Session from app.db.session import SessionLocal from app.db.models import Job, CustomerProfile, Insight from app.pipeline.schema_detection import detect_schema from app.pipeline.prediction import predict_hvr from app.pipeline.rfm_extraction import extract_rfm from app.pipeline.segmentation import score_and_segment from app.pipeline.clv import compute_clv from app.pipeline.anomaly import detect_anomalies from app.pipeline.insights import generate_all_insights def standardize_customer_id(df: pd.DataFrame) -> pd.DataFrame: df["CustomerID"] = ( df["CustomerID"] .astype(str) .str.strip() .str.replace(".0", "", regex=False) ) return df def run_full_pipeline(job_id: str, filepath: str): db = SessionLocal() try: df_raw = pd.read_csv(filepath) job = db.query(Job).filter(Job.id == job_id).first() job.row_count = len(df_raw) db.commit() schema = detect_schema(df_raw) if not schema["success"]: raise ValueError(f"Schema detection failed: {schema['missing']}") rfm_df = extract_rfm(df_raw, schema["mapping"]) rfm_df = standardize_customer_id(rfm_df) scored_df = score_and_segment(rfm_df) scored_df = standardize_customer_id(scored_df) if len(scored_df) >= 100: scored_df = predict_hvr(scored_df) else: scored_df["hvr_probability"] = None scored_df["hvr_potential"] = None clv_df = compute_clv(df_raw, schema["mapping"]) clv_df = standardize_customer_id(clv_df) scored_df = scored_df.merge( clv_df[["CustomerID", "clv_12months", "clv_segment", "prob_alive", "predicted_purchases_90d"]], on="CustomerID", how="left" ) scored_df = detect_anomalies(scored_df) job.customer_count = len(scored_df) db.commit() profiles = [] for _, row in scored_df.iterrows(): profile = CustomerProfile( job_id = job_id, customer_id = str(row["CustomerID"]), recency = float(row["Recency"]), frequency = float(row["Frequency"]), monetary = float(row["Monetary"]), avg_order_value = float(row.get("AvgOrderValue", 0)), total_items = float(row.get("TotalItems", 0)), distinct_products= float(row.get("DistinctProducts", 0)), tenure_days = float(row.get("TenureDays", 0)), avg_items_per_order = float(row.get("AvgItemsPerOrder", 0)), r_score = int(row.get("R_score", 0)), f_score = int(row.get("F_score", 0)), m_score = int(row.get("M_score", 0)), segment = str(row.get("Segment", "Unknown")), clv_12months = float(row["clv_12months"]) if pd.notna(row.get("clv_12months")) else None, clv_segment = str(row["clv_segment"]) if pd.notna(row.get("clv_segment")) else None, prob_alive = float(row["prob_alive"]) if pd.notna(row.get("prob_alive")) else None, predicted_purchases_90d = float(row["predicted_purchases_90d"]) if pd.notna(row.get("predicted_purchases_90d")) else None, hvr_probability = float(row["hvr_probability"]) if pd.notna(row.get("hvr_probability")) else None, hvr_potential = str(row["hvr_potential"]) if pd.notna(row.get("hvr_potential")) else None, anomaly_score = float(row.get("anomaly_score", 0)), is_anomaly = bool(row.get("is_anomaly", False)), anomaly_severity = str(row.get("anomaly_severity", "Normal")), anomaly_type = str(row.get("anomaly_type", "Normal")), ) profiles.append(profile) db.bulk_save_objects(profiles) db.commit() insights = generate_all_insights(scored_df) insight_objects = [] for category, content in insights.items(): insight = Insight( job_id = job_id, category = category, title = content.get("title", category), body = content.get("body", str(content)), priority = content.get("priority", 3) ) insight_objects.append(insight) db.bulk_save_objects(insight_objects) db.commit() job.status = "complete" job.completed_at = datetime.utcnow() db.commit() print(f"Pipeline complete for job {job_id}") except Exception as e: job = db.query(Job).filter(Job.id == job_id).first() job.status = "failed" job.error_message = str(e) db.commit() print(f"Pipeline failed for job {job_id}: {e}") finally: db.close()