""" Expense prediction model: suggests next expenses based on 6-month history. - Input: JSON array of 300 expense records - Output: Top 3 predicted expenses (date, sum, supplier, user) """ from datetime import datetime from collections import defaultdict import math import statistics from sklearn.ensemble import ExtraTreesRegressor, GradientBoostingRegressor, RandomForestRegressor def _quantile(values: list[float], q: float) -> float: """Returns quantile in [0,1] without numpy.""" if not values: return 0.0 if len(values) == 1: return float(values[0]) data = sorted(float(v) for v in values) q = max(0.0, min(1.0, q)) pos = q * (len(data) - 1) lo = int(math.floor(pos)) hi = int(math.ceil(pos)) if lo == hi: return data[lo] weight = pos - lo return data[lo] * (1.0 - weight) + data[hi] * weight def _time_split_xy(X: list[list[float]], y: list[float]) -> tuple[list[list[float]], list[float], list[list[float]], list[float]]: """Splits sequence into train/validation by time order (last 20% for validation).""" holdout_size = max(1, int(len(X) * 0.2)) if len(X) - holdout_size >= 5: return X[:-holdout_size], y[:-holdout_size], X[-holdout_size:], y[-holdout_size:] return X, y, [], [] def _build_candidates(seed: int = 42) -> list[tuple[str, object]]: """Returns candidate regressors to compare on validation MAE.""" return [ ("rf", RandomForestRegressor(n_estimators=200, min_samples_leaf=3, random_state=seed)), ("extra_trees", ExtraTreesRegressor(n_estimators=200, min_samples_leaf=3, random_state=seed)), ("gbr", GradientBoostingRegressor(n_estimators=100, max_depth=3, random_state=seed)), ] def _train_global_model( samples: list[tuple], supplier_to_idx: dict, user_to_idx: dict, debug: bool = False, ) -> tuple[object | None, float, str]: """Trains ONE global model on ALL records. Each sample: (date, supplier_id, user_id, amount) Features per row: [supplier_idx, user_idx, day, weekday, month, rolling_mean_3 for supplier, rolling_mean_month for supplier] Returns: (fitted_model, global_confidence, model_name) """ # Sort all samples by date to build rolling features correctly. samples_sorted = sorted(samples, key=lambda s: s[0]) # Running histories per supplier and per (supplier, user) pair. supplier_hist_running: dict = defaultdict(list) user_supplier_hist_running: dict = defaultdict(list) supplier_last_date: dict = {} user_supplier_last_date: dict = {} user_supplier_last_sum: dict = {} X_all: list[list[float]] = [] y_all: list[float] = [] for tx_date, supplier_id, user_id, amount in samples_sorted: s_idx = supplier_to_idx.get(supplier_id, -1) u_idx = user_to_idx.get(user_id, -1) s_hist = supplier_hist_running[supplier_id] us_hist = user_supplier_hist_running[(user_id, supplier_id)] # Supplier-wide rolling features. s_rolling3 = statistics.mean(s_hist[-3:]) if s_hist else amount s_rolling_all = statistics.mean(s_hist) if s_hist else amount s_median = statistics.median(s_hist) if s_hist else amount # User×supplier specific rolling features (strongest signal). us_rolling3 = statistics.mean(us_hist[-3:]) if us_hist else s_rolling3 us_rolling_all = statistics.mean(us_hist) if us_hist else s_rolling_all us_median = statistics.median(us_hist) if us_hist else s_median last_s_date = supplier_last_date.get(supplier_id) last_us_date = user_supplier_last_date.get((user_id, supplier_id)) us_last_sum = user_supplier_last_sum.get((user_id, supplier_id), us_rolling3) s_gap_days = max(0, (tx_date - last_s_date).days) if last_s_date else 0 us_gap_days = max(0, (tx_date - last_us_date).days) if last_us_date else 0 X_all.append([ s_idx, u_idx, tx_date.day, tx_date.weekday(), tx_date.month, s_rolling3, s_rolling_all, s_median, us_rolling3, us_rolling_all, us_median, us_last_sum, s_gap_days, us_gap_days, ]) y_all.append(amount) s_hist.append(amount) us_hist.append(amount) supplier_last_date[supplier_id] = tx_date user_supplier_last_date[(user_id, supplier_id)] = tx_date user_supplier_last_sum[(user_id, supplier_id)] = amount if len(X_all) < 10: return None, 0.5, "fallback" X_fit, y_fit, X_val, y_val = _time_split_xy(X_all, y_all) candidates = _build_candidates() best_name = "fallback" best_model = None best_mae = float("inf") for name, model in candidates: model.fit(X_fit, y_fit) if X_val: val_pred = model.predict(X_val) mae = statistics.mean([abs(float(p) - float(t)) for p, t in zip(val_pred, y_val)]) else: train_pred = model.predict(X_fit) mae = statistics.mean([abs(float(p) - float(t)) for p, t in zip(train_pred, y_fit)]) if debug: print(f"[PREDICT] global model={name}, val_mae={mae:.2f}") if mae < best_mae: best_mae = mae best_name = name best_model = model if best_model is None: return None, 0.5, "fallback" baseline_scale = max(1.0, statistics.mean([abs(v) for v in (y_val if y_val else y_fit)])) global_conf = math.exp(-(best_mae / baseline_scale)) if debug: print( f"[PREDICT] best global model={best_name}, mae={best_mae:.2f}, " f"avg_target={baseline_scale:.2f}, global_model_conf={global_conf:.2f}" ) return best_model, max(0.0, min(1.0, global_conf)), best_name def predict_expenses(expenses: list[dict], target_user_id, debug: bool = False) -> list[dict]: if not expenses or len(expenses) < 2: if debug: print(f"[PREDICT] Not enough records: {len(expenses) if expenses else 0}") return [] # Group by supplier_id (top-3 different suppliers) supplier_history = defaultdict(list) supplier_freq = defaultdict(int) total_records = len(expenses) if debug: print(f"[PREDICT] Total records received: {total_records}") for i, exp in enumerate(expenses): print(f"[PREDICT] [{i+1}] date={exp.get('date')}, sum={exp.get('sum')}, supplier_id={exp.get('supplier_id')}, user_id={exp.get('user_id')}") for exp in expenses: supplier_id = exp["supplier_id"] supplier_history[supplier_id].append(exp) supplier_freq[supplier_id] += 1 if debug: print(f"[PREDICT] Unique suppliers: {len(supplier_history)}") for supplier_id, count in supplier_freq.items(): pct = count / total_records * 100 print(f"[PREDICT] supplier_id={supplier_id} -> {count} records ({pct:.1f}%)") # Select top 10 most popular suppliers by frequency candidate_items = sorted( supplier_history.items(), key=lambda item: supplier_freq[item[0]], reverse=True, )[:10] if debug: print(f"[PREDICT] Processing top {len(candidate_items)} suppliers") if not candidate_items: if debug: print("[PREDICT] No suppliers found. Returning empty.") return [] now = datetime.now() # Build shared encoders for categorical features. supplier_to_idx = {sid: idx for idx, sid in enumerate(supplier_history.keys())} user_values = [exp.get("user_id") for exp in expenses if exp.get("user_id") is not None] user_to_idx = {uid: idx for idx, uid in enumerate(sorted(set(user_values), key=str))} # Collect all valid samples for the global model. all_samples: list[tuple] = [] for exp in expenses: try: tx_date = datetime.fromisoformat(exp["date"]) tx_sum = float(exp["sum"]) tx_supplier = exp["supplier_id"] tx_user = exp["user_id"] all_samples.append((tx_date, tx_supplier, tx_user, tx_sum)) except Exception: continue global_model, global_model_conf, model_name = _train_global_model( all_samples, supplier_to_idx, user_to_idx, debug=debug ) # Compute per-supplier and per user×supplier histories for inference features. supplier_amounts_sorted: dict = defaultdict(list) user_supplier_amounts_sorted: dict = defaultdict(list) supplier_last_date: dict = {} user_supplier_last_date: dict = {} user_supplier_last_sum: dict = {} for tx_date, tx_supplier, tx_user, tx_sum in sorted(all_samples, key=lambda s: s[0]): supplier_amounts_sorted[tx_supplier].append(tx_sum) user_supplier_amounts_sorted[(tx_user, tx_supplier)].append(tx_sum) supplier_last_date[tx_supplier] = tx_date user_supplier_last_date[(tx_user, tx_supplier)] = tx_date user_supplier_last_sum[(tx_user, tx_supplier)] = tx_sum # Predict amount for each selected supplier. predictions = [] for supplier_id, _records in candidate_items: s_hist = supplier_amounts_sorted.get(supplier_id, []) us_hist = user_supplier_amounts_sorted.get((target_user_id, supplier_id), []) avg_amount = statistics.mean(s_hist) if s_hist else 0.0 s_rolling3 = statistics.mean(s_hist[-3:]) if s_hist else avg_amount s_rolling_all = avg_amount s_median = statistics.median(s_hist) if s_hist else avg_amount us_rolling3 = statistics.mean(us_hist[-3:]) if us_hist else s_rolling3 us_rolling_all = statistics.mean(us_hist) if us_hist else s_rolling_all us_median = statistics.median(us_hist) if us_hist else s_median last_s_date = supplier_last_date.get(supplier_id) last_us_date = user_supplier_last_date.get((target_user_id, supplier_id)) us_last_sum = user_supplier_last_sum.get((target_user_id, supplier_id), us_rolling3) s_gap_days = max(0, (now - last_s_date).days) if last_s_date else 0 us_gap_days = max(0, (now - last_us_date).days) if last_us_date else s_gap_days next_features = [[ supplier_to_idx.get(supplier_id, -1), user_to_idx.get(target_user_id, -1), now.day, now.weekday(), now.month, s_rolling3, s_rolling_all, s_median, us_rolling3, us_rolling_all, us_median, us_last_sum, s_gap_days, us_gap_days, ]] if global_model is not None: predicted_amount = float(global_model.predict(next_features)[0]) # Local confidence: disagreement between trees. if hasattr(global_model, "estimators_"): tree_preds = [float(tree.predict(next_features)[0]) for tree in global_model.estimators_] tree_std = statistics.stdev(tree_preds) if len(tree_preds) > 1 else 0.0 amount_scale = max(1.0, abs(predicted_amount)) local_model_conf = math.exp(-(tree_std / amount_scale)) else: local_model_conf = 0.7 model_conf = (0.6 * global_model_conf) + (0.4 * local_model_conf) model_conf = max(0.0, min(1.0, model_conf)) else: predicted_amount = avg_amount model_conf = 0.5 # Calibrate prediction toward robust historical center for this user/supplier. # This usually stabilizes noisy forecasts and reduces MAE on small histories. us_count = len(us_hist) w_user_hist = min(1.0, us_count / 8.0) robust_center = (w_user_hist * us_median) + ((1.0 - w_user_hist) * s_median) blend_weight = 0.7 if global_model is not None else 0.0 predicted_amount = (blend_weight * predicted_amount) + ((1.0 - blend_weight) * robust_center) # Clamp into realistic historical range to avoid extreme outputs. hist_for_bounds = us_hist if len(us_hist) >= 5 else s_hist if hist_for_bounds: lower_bound = _quantile(hist_for_bounds, 0.1) upper_bound = _quantile(hist_for_bounds, 0.9) predicted_amount = max(lower_bound, min(predicted_amount, upper_bound)) next_predicted_date = now.strftime("%Y-%m-%d") predicted_user = target_user_id amount_std = statistics.stdev(s_hist) if len(s_hist) > 1 else 0 consistency = max(0, 1 - (amount_std / avg_amount)) if avg_amount > 0 else 0.5 frequency_score = min(supplier_freq[supplier_id] / total_records, 1.0) confidence = (0.4 * consistency) + (0.3 * frequency_score) + (0.3 * model_conf) if debug: print( f"[PREDICT] supplier_id={supplier_id}, user_id={predicted_user} | " f"avg_amount={avg_amount:.2f}, s_rolling3={s_rolling3:.2f}, " f"us_rolling3={us_rolling3:.2f}, pred_sum={predicted_amount:.2f}, " f"target_date={next_predicted_date}, " f"us_count={us_count}, us_gap={us_gap_days}d, " f"consistency={consistency:.2f}, freq_score={frequency_score:.2f}, " f"model={model_name if global_model is not None else 'fallback'}, " f"model_conf={model_conf:.2f}, confidence={confidence:.2f}" ) predictions.append({ "date": next_predicted_date, "sum": round(max(0.0, predicted_amount), 2), "supplier_id": supplier_id, "user_id": predicted_user, "show": True, "confidence": round(confidence, 2) }) # Return all selected suppliers sorted by frequency desc. result = sorted( predictions, key=lambda x: supplier_freq.get(x["supplier_id"], 0), reverse=True, ) if debug: print(f"[PREDICT] Final top {len(result)} predictions:") for i, p in enumerate(result, 1): print(f"[PREDICT] #{i}: supplier_id={p['supplier_id']}, user_id={p['user_id']}, date={p['date']}, sum={p['sum']}, confidence={p['confidence']}") return result