Spaces:
Sleeping
Sleeping
| import traceback | |
| from fastapi import FastAPI | |
| import pickle | |
| import numpy as np | |
| import pandas as pd | |
| import joblib | |
| import re | |
| from datetime import datetime, time | |
| import pytz | |
| # Load the trained XGBoost model | |
| model = joblib.load("xgb_model.pkl") | |
| # Load the feature names from the trained model | |
| model_features = model.get_booster().feature_names | |
| app = FastAPI(root_path="/") | |
| def home(): | |
| return {"message": "Lead Scoring Model is Live!"} | |
| def should_send_to_ai_caller(estimated_volume, country): | |
| try: | |
| max_attempts = 3 # Always fixed at 3 | |
| # --- Country to timezone mapping --- | |
| country_tz = { | |
| "France": "Europe/Paris", | |
| "Italy": "Europe/Paris", | |
| "United Kingdom": "Europe/London", | |
| "Spain": "Europe/Madrid", | |
| "Germany": "Europe/Berlin" | |
| } | |
| # --- Only allow France and UK --- | |
| if country not in country_tz: | |
| print(f"Country not supported for AI routing: {country}") | |
| return {"send": False, "max_attempts": max_attempts} | |
| # Default timezone (fallback) | |
| timezone_str = country_tz.get(country, "UTC") | |
| tz = pytz.timezone(timezone_str) | |
| now = datetime.now(tz) | |
| current_time = now.time() | |
| weekday = now.weekday() # 0 = Monday, 6 = Sunday | |
| # --- Time Windows --- | |
| early_morning = time(7, 0) | |
| late_morning = time(9, 0) | |
| evening_start = time(18, 0) | |
| evening_end = time(21, 0) | |
| business_start = time(9, 0) | |
| business_end = time(18, 0) | |
| weekend_end = time(21, 0) | |
| is_weekend = weekday >= 5 # Saturday or Sunday | |
| # Normalize TPV | |
| def parse_tpv_range(value): | |
| if not value or str(value).strip().lower() in {'none', ''}: | |
| return 'None' | |
| value = value.upper().replace('K', '000').replace('M', '000000') | |
| value = re.sub(r'[£$€¥]', '', value) | |
| value = value.replace(' – ', '-').replace(' ', '').strip() | |
| if '-' in value: | |
| low, high = map(int, re.findall(r'\d+', value)) | |
| return f"{low}-{high}" | |
| elif value.isdigit(): | |
| return f"{value}-{value}" | |
| return 'None' | |
| parsed_tpv = parse_tpv_range(estimated_volume) | |
| # --- Rule 1: All leads during early/late weekday hours --- | |
| if not is_weekend and ( | |
| early_morning <= current_time <= late_morning or | |
| evening_start <= current_time <= evening_end | |
| ): | |
| return {"send": True, "max_attempts": max_attempts} | |
| # --- Rule 2: All leads on weekends between 7 AM and 8 PM --- | |
| if early_morning <= current_time <= weekend_end: | |
| if weekday == 5: # Saturday | |
| if country in {"France", "United Kingdom", "Italy", "Spain", "Germany"}: | |
| return {"send": True, "max_attempts": max_attempts} | |
| elif weekday == 6: # Sunday | |
| if country in {"France", "United Kingdom", "Italy", "Spain"}: | |
| return {"send": True, "max_attempts": max_attempts} | |
| # --- Rule 3: Weekday 9 AM–6 PM only if low TPV or None --- | |
| if not is_weekend and business_start <= current_time <= business_end: | |
| if country in {"France", "United Kingdom"}: | |
| if parsed_tpv in {'None', '0-35000'}: | |
| return {"send": True, "max_attempts": max_attempts} | |
| elif country in {"Italy", "Spain"}: | |
| if parsed_tpv == '0-35000': | |
| return {"send": True, "max_attempts": max_attempts} | |
| # Otherwise, don't send | |
| return {"send": False, "max_attempts": max_attempts} | |
| except Exception as e: | |
| print(f"[AI Caller Decision Error] {str(e)}") | |
| return {"send": False, "max_attempts": 3} | |
| # Function to preprocess a single lead | |
| def preprocess_single_lead(data): | |
| df = pd.DataFrame([data]) # Convert dict to DataFrame | |
| df.columns = df.columns.str.strip().str.replace('"', '').str.replace(';', ',') | |
| # Handle missing values | |
| cat_cols = df.select_dtypes(include=['object']).columns | |
| df[cat_cols] = df[cat_cols].fillna(pd.NA) | |
| num_cols = df.select_dtypes(include=['float64', 'int64']).columns | |
| df[num_cols] = df[num_cols].fillna(pd.NA) | |
| expected_columns = ['Email', 'Phone', 'GA Campaign', 'LP: Campaign', 'Lead Source', 'GA Source', 'Prospect product interest', 'Estimated Yearly Transaction Volume', 'Estimated Turnover'] | |
| for col in expected_columns: | |
| if col not in df.columns: | |
| df[col] = pd.NA | |
| # Feature engineering (add other encoding as needed) | |
| df['Email Available'] = df['Email'].apply(lambda x: 1 if pd.notna(x) else 0) | |
| df['Phone Available'] = df['Phone'].apply(lambda x: 1 if pd.notna(x) else 0) | |
| df['GA Campaign Available'] = df['GA Campaign'].apply(lambda x: 1 if pd.notna(x) else 0) | |
| df['LP: Campaign Available'] = df['LP: Campaign'].apply(lambda x: 1 if pd.notna(x) else 0) | |
| df['Lead Source Available'] = df['Lead Source'].apply(lambda x: 1 if pd.notna(x) else 0) | |
| df['GA Source Available'] = df['GA Source'].apply(lambda x: 1 if pd.notna(x) else 0) | |
| # Assign default product interest based on estimated yearly transaction volume | |
| def assign_product_interest(row): | |
| if pd.isna(row['Prospect product interest']) or row['Prospect product interest'] == '': | |
| if pd.notna(row['Estimated Yearly Transaction Volume']): | |
| try: | |
| value = row['Estimated Yearly Transaction Volume'] | |
| if '-' in value: | |
| low, high = map(int, value.split('-')) | |
| midpoint = (low + high) / 2 | |
| else: | |
| midpoint = int(value) | |
| return 'Payment, POS Lite' if midpoint < 60000 else 'POS Pro, Kiosk' | |
| except Exception: | |
| return 'Payment, POS Pro' # Fallback value when transaction volume is invalid | |
| else: | |
| return '' # Default when transaction volume is missing | |
| return row['Prospect product interest'] | |
| df['Prospect product interest'] = df.apply(assign_product_interest, axis=1) | |
| df['POS Pro Available'] = df['Prospect product interest'].apply(lambda x: 1 if 'POS Pro' in str(x) else 0) | |
| df['Payment Available'] = df['Prospect product interest'].apply(lambda x: 1 if 'Payment' in str(x) else 0) | |
| df['Product Interest Available'] = df['Prospect product interest'].apply(lambda x: 1 if pd.notna(x) and x != '' else 0) | |
| df['Contacts Available'] = df[['Email Available', 'Phone Available']].sum(axis=1) | |
| df['Sources Available'] = df[['Lead Source Available', 'GA Source Available']].sum(axis=1) | |
| df['Campaigns Available'] = df[['GA Campaign Available', 'LP: Campaign Available']].sum(axis=1) | |
| # Drop the original 'Email', 'Phone', 'GA Campaign', and 'LP: Campaign' columns since they are now encoded | |
| df = df.drop(columns=['Email', 'Phone', 'GA Campaign', 'LP: Campaign', 'Lead Source', 'GA Source', | |
| 'Email Available', 'Phone Available', 'GA Campaign Available', 'LP: Campaign Available', | |
| 'Lead Source Available', 'GA Source Available', 'Prospect product interest']) | |
| # Apply function to convert to numeric ranges (same as in your training model) | |
| def convert_to_numeric_range(value): | |
| if pd.isna(value) or value == '': | |
| return "60000-100000" # Default value when missing | |
| value = value.replace('\xa0', '').replace("'", '').replace(',', '').strip() # Clean unwanted characters | |
| value = re.sub(r'[£$€¥]', '', value) # Remove currency symbols | |
| value = value.replace(' – ', ' - ') # Fix en dash to regular dash | |
| value = value.replace('K', '000').replace('M', '000000') | |
| if '+' in value: | |
| value = value.replace('+', '') # Remove '+' | |
| if value.isdigit(): | |
| return f"{value}-{int(value) * 2}" # Convert "5M+" to "5000000-10000000" | |
| if '-' in value: | |
| low, high = value.split('-') | |
| low = ''.join(re.findall(r'\d+', low)) # Keep only numeric characters | |
| high = ''.join(re.findall(r'\d+', high)) # Keep only numeric characters | |
| return f"{low}-{high}" if low and high else None | |
| value = ''.join(re.findall(r'\d+', value)) # Keep only numeric characters | |
| return value if value else "60000-100000" # Default when not interpretable | |
| df['Estimated Yearly Transaction Volume'] = df['Estimated Yearly Transaction Volume'].apply(convert_to_numeric_range) | |
| df['Estimated Turnover'] = df['Estimated Turnover'].apply(convert_to_numeric_range) | |
| # Apply the same alignment and combination logic for 'Estimated Turnover' to 'Estimated Yearly Transaction Volume' | |
| def align_turnover_to_transaction(value, transaction_values): | |
| if pd.isna(value): | |
| return value # Handle missing values | |
| transaction_midpoints = [] | |
| for transaction_range in transaction_values: | |
| if '-' in transaction_range: | |
| low, high = map(int, transaction_range.split('-')) | |
| transaction_midpoints.append((low + high) // 2) | |
| else: | |
| transaction_midpoints.append(int(transaction_range)) | |
| if '-' in value: | |
| low, high = map(int, value.split('-')) | |
| turnover_midpoint = (low + high) // 2 | |
| else: | |
| turnover_midpoint = int(value) | |
| closest_index = min(range(len(transaction_midpoints)), key=lambda i: abs(transaction_midpoints[i] - turnover_midpoint)) | |
| return transaction_values[closest_index] | |
| unique_transaction_values = df['Estimated Yearly Transaction Volume'].dropna().unique() | |
| df['Estimated Turnover'] = df['Estimated Turnover'].apply(lambda x: align_turnover_to_transaction(x, unique_transaction_values)) | |
| # Combine the 'Estimated Yearly Transaction Volume' and 'Estimated Turnover' columns into one | |
| def combine_transaction_and_turnover(row): | |
| if pd.notna(row["Estimated Yearly Transaction Volume"]) and row["Estimated Yearly Transaction Volume"] != "0-0": | |
| return row["Estimated Yearly Transaction Volume"] | |
| elif pd.notna(row["Estimated Turnover"]) and row["Estimated Turnover"] != "0-0": | |
| return row["Estimated Turnover"] | |
| return "0-0" # Default if both are missing | |
| df["Combined Volume and Turnover"] = df.apply(combine_transaction_and_turnover, axis=1) | |
| # Calculate midpoints for combined column | |
| def calculate_midpoint(value): | |
| if pd.isna(value) or '-' not in value: | |
| return None | |
| try: | |
| start, end = map(int, value.split('-')) | |
| return (start + end) / 2 | |
| except ValueError: | |
| return None | |
| df['Combined Midpoint'] = df['Combined Volume and Turnover'].apply(calculate_midpoint) | |
| # Generate dynamic bins for 'Combined Midpoint' | |
| def generate_bins_and_labels(): | |
| '''midpoints = [x for x in midpoints if pd.notna(x)] | |
| if len(midpoints) < 2: | |
| # If we have less than 2 values, fallback to a single default bin | |
| return [0, 1], ['Bin 1'] | |
| unique_values = sorted(set(midpoints)) | |
| bins = [min(unique_values)] + unique_values + [max(unique_values) * 1.1] | |
| bins = sorted(set(bins)) # Ensure bins are unique | |
| if len(bins) - 1 != len(unique_values): | |
| labels = [f'Bin {i+1}' for i in range(len(bins) - 1)] | |
| else: | |
| labels = [f'Bin {i+1}' for i in range(len(unique_values))] | |
| return bins, labels''' | |
| # Predefined bins based on your given ranges | |
| bins = [0, 35000, 60000, 100000, 200000, 400000, 600000, 1000000, 2000000, 5000000, float('inf')] | |
| labels = [f'Bin {i+1}' for i in range(len(bins) - 1)] | |
| return bins, labels | |
| bins_combined, labels_combined = generate_bins_and_labels() | |
| df['Combined Volume Category'] = pd.cut(df['Combined Midpoint'], bins=bins_combined, labels=labels_combined, include_lowest=True) | |
| # One-hot encode categorical columns | |
| cat_columns = ['Pos Pro Segmentation Level 3', 'Combined Volume Category', 'Sourcing Direction'] | |
| df_encoded = pd.get_dummies(df, columns=cat_columns, drop_first=True) | |
| # Drop columns that are not useful or of type 'object' | |
| drop_columns = ['Estimated Yearly Transaction Volume', 'Estimated Turnover', 'Combined Volume and Turnover'] | |
| df_encoded = df_encoded.drop(columns=drop_columns) | |
| # Ensure that the final df has the same columns as the model expects | |
| for col in model_features: | |
| if col not in df_encoded.columns: | |
| df_encoded[col] = 0 # Assign 0 to missing columns | |
| df_encoded = df_encoded[model_features] # Ensure the correct order of features | |
| return df_encoded | |
| low_score_counter = {"POS Pro": 0, "Payment": 0, "Payment or POS Lite": 0, "global_count": 0, "score_1_count": 0, "score_2_count": 0, "score_3_count": 0} | |
| async def predict(lead: dict): | |
| try: | |
| global low_score_counter | |
| # Preprocess the lead data | |
| processed_lead = preprocess_single_lead(lead) | |
| # Predict probability of conversion | |
| probability = model.predict_proba(processed_lead)[0, 1] # Class 1 probability | |
| # Cast to regular Python types | |
| probability = float(probability) # Convert numpy.float32 to regular float | |
| # Map probability to score (1 to 10) | |
| score = int(np.ceil(probability * 10)) # Compute score | |
| score = score + 2 if 4 <= score < 9 else score # Handle score to not push extremes | |
| estimated_volume = lead.get("Estimated Yearly Transaction Volume") | |
| product_interest = str(lead.get("Prospect product interest", "")).strip() | |
| sourcing_direction = str(lead.get("Sourcing Direction", "")).strip() | |
| lp_campaign = str(lead.get("LP: Campaign", "")).strip() | |
| Leads_source_website = str(lead.get("Leads_source_Website__c", "")).strip() | |
| if product_interest: | |
| product_interest = set(map(str.strip, product_interest.split(";"))) | |
| else: | |
| product_interest = set() | |
| # Determine if it should be sent to AI caller | |
| country = lead.get("Country") | |
| if "SAP-" in Leads_source_website: | |
| send_to_ai_caller = False | |
| max_attempts = 3 | |
| elif sourcing_direction.lower() == 'inbound' and lp_campaign != 'London-Coffee-Festival-2025': | |
| ai_caller_decision = should_send_to_ai_caller(estimated_volume, country) | |
| send_to_ai_caller = ai_caller_decision["send"] | |
| max_attempts = ai_caller_decision["max_attempts"] | |
| else: | |
| send_to_ai_caller = False | |
| max_attempts = 3 | |
| # Map score to conversion category | |
| if score >= 7: | |
| conversion_category = "Hot" | |
| elif 4 <= score <= 6: | |
| conversion_category = "Warm" | |
| else: | |
| conversion_category = "Cold" | |
| return { | |
| "score": score, | |
| "conversion_probability": conversion_category, # Return as string category | |
| "send_to_ai_caller": False, | |
| "max_attempts": max_attempts | |
| } | |
| except Exception as e: | |
| # Capture and return detailed error information | |
| error_message = str(e) | |
| stack_trace = traceback.format_exc() | |
| return { | |
| "score": 8, | |
| "conversion_probability": "Hot", # Since score 8 is in the "Warm" category | |
| "error": f"An error occurred, defaulting score to 8. {error_message}", | |
| "stack_trace": stack_trace | |
| } | |