ConvertAudioToJSON / expense_predictor.py
VladGeekPro
PredictSupplierSumTop10
719b265
raw
history blame
14.3 kB
"""
Expense prediction model: suggests next expenses based on 6-month history.
- Input: JSON array of 300 expense records
- Output: Top 3 predicted expenses (date, sum, supplier, user)
"""
from datetime import datetime
from collections import defaultdict
import math
import statistics
from sklearn.ensemble import ExtraTreesRegressor, GradientBoostingRegressor, RandomForestRegressor
def _quantile(values: list[float], q: float) -> float:
"""Returns quantile in [0,1] without numpy."""
if not values:
return 0.0
if len(values) == 1:
return float(values[0])
data = sorted(float(v) for v in values)
q = max(0.0, min(1.0, q))
pos = q * (len(data) - 1)
lo = int(math.floor(pos))
hi = int(math.ceil(pos))
if lo == hi:
return data[lo]
weight = pos - lo
return data[lo] * (1.0 - weight) + data[hi] * weight
def _time_split_xy(X: list[list[float]], y: list[float]) -> tuple[list[list[float]], list[float], list[list[float]], list[float]]:
"""Splits sequence into train/validation by time order (last 20% for validation)."""
holdout_size = max(1, int(len(X) * 0.2))
if len(X) - holdout_size >= 5:
return X[:-holdout_size], y[:-holdout_size], X[-holdout_size:], y[-holdout_size:]
return X, y, [], []
def _build_candidates(seed: int = 42) -> list[tuple[str, object]]:
"""Returns candidate regressors to compare on validation MAE."""
return [
("rf", RandomForestRegressor(n_estimators=200, min_samples_leaf=3, random_state=seed)),
("extra_trees", ExtraTreesRegressor(n_estimators=200, min_samples_leaf=3, random_state=seed)),
("gbr", GradientBoostingRegressor(n_estimators=100, max_depth=3, random_state=seed)),
]
def _train_global_model(
samples: list[tuple],
supplier_to_idx: dict,
user_to_idx: dict,
debug: bool = False,
) -> tuple[object | None, float, str]:
"""Trains ONE global model on ALL records.
Each sample: (date, supplier_id, user_id, amount)
Features per row: [supplier_idx, user_idx, day, weekday, month,
rolling_mean_3 for supplier, rolling_mean_month for supplier]
Returns: (fitted_model, global_confidence, model_name)
"""
# Sort all samples by date to build rolling features correctly.
samples_sorted = sorted(samples, key=lambda s: s[0])
# Running histories per supplier and per (supplier, user) pair.
supplier_hist_running: dict = defaultdict(list)
user_supplier_hist_running: dict = defaultdict(list)
supplier_last_date: dict = {}
user_supplier_last_date: dict = {}
user_supplier_last_sum: dict = {}
X_all: list[list[float]] = []
y_all: list[float] = []
for tx_date, supplier_id, user_id, amount in samples_sorted:
s_idx = supplier_to_idx.get(supplier_id, -1)
u_idx = user_to_idx.get(user_id, -1)
s_hist = supplier_hist_running[supplier_id]
us_hist = user_supplier_hist_running[(user_id, supplier_id)]
# Supplier-wide rolling features.
s_rolling3 = statistics.mean(s_hist[-3:]) if s_hist else amount
s_rolling_all = statistics.mean(s_hist) if s_hist else amount
s_median = statistics.median(s_hist) if s_hist else amount
# User×supplier specific rolling features (strongest signal).
us_rolling3 = statistics.mean(us_hist[-3:]) if us_hist else s_rolling3
us_rolling_all = statistics.mean(us_hist) if us_hist else s_rolling_all
us_median = statistics.median(us_hist) if us_hist else s_median
last_s_date = supplier_last_date.get(supplier_id)
last_us_date = user_supplier_last_date.get((user_id, supplier_id))
us_last_sum = user_supplier_last_sum.get((user_id, supplier_id), us_rolling3)
s_gap_days = max(0, (tx_date - last_s_date).days) if last_s_date else 0
us_gap_days = max(0, (tx_date - last_us_date).days) if last_us_date else 0
X_all.append([
s_idx,
u_idx,
tx_date.day,
tx_date.weekday(),
tx_date.month,
s_rolling3,
s_rolling_all,
s_median,
us_rolling3,
us_rolling_all,
us_median,
us_last_sum,
s_gap_days,
us_gap_days,
])
y_all.append(amount)
s_hist.append(amount)
us_hist.append(amount)
supplier_last_date[supplier_id] = tx_date
user_supplier_last_date[(user_id, supplier_id)] = tx_date
user_supplier_last_sum[(user_id, supplier_id)] = amount
if len(X_all) < 10:
return None, 0.5, "fallback"
X_fit, y_fit, X_val, y_val = _time_split_xy(X_all, y_all)
candidates = _build_candidates()
best_name = "fallback"
best_model = None
best_mae = float("inf")
for name, model in candidates:
model.fit(X_fit, y_fit)
if X_val:
val_pred = model.predict(X_val)
mae = statistics.mean([abs(float(p) - float(t)) for p, t in zip(val_pred, y_val)])
else:
train_pred = model.predict(X_fit)
mae = statistics.mean([abs(float(p) - float(t)) for p, t in zip(train_pred, y_fit)])
if debug:
print(f"[PREDICT] global model={name}, val_mae={mae:.2f}")
if mae < best_mae:
best_mae = mae
best_name = name
best_model = model
if best_model is None:
return None, 0.5, "fallback"
baseline_scale = max(1.0, statistics.mean([abs(v) for v in (y_val if y_val else y_fit)]))
global_conf = math.exp(-(best_mae / baseline_scale))
if debug:
print(
f"[PREDICT] best global model={best_name}, mae={best_mae:.2f}, "
f"avg_target={baseline_scale:.2f}, global_model_conf={global_conf:.2f}"
)
return best_model, max(0.0, min(1.0, global_conf)), best_name
def predict_expenses(expenses: list[dict], target_user_id, debug: bool = False) -> list[dict]:
if not expenses or len(expenses) < 2:
if debug:
print(f"[PREDICT] Not enough records: {len(expenses) if expenses else 0}")
return []
# Group by supplier_id (top-3 different suppliers)
supplier_history = defaultdict(list)
supplier_freq = defaultdict(int)
total_records = len(expenses)
if debug:
print(f"[PREDICT] Total records received: {total_records}")
for i, exp in enumerate(expenses):
print(f"[PREDICT] [{i+1}] date={exp.get('date')}, sum={exp.get('sum')}, supplier_id={exp.get('supplier_id')}, user_id={exp.get('user_id')}")
for exp in expenses:
supplier_id = exp["supplier_id"]
supplier_history[supplier_id].append(exp)
supplier_freq[supplier_id] += 1
if debug:
print(f"[PREDICT] Unique suppliers: {len(supplier_history)}")
for supplier_id, count in supplier_freq.items():
pct = count / total_records * 100
print(f"[PREDICT] supplier_id={supplier_id} -> {count} records ({pct:.1f}%)")
# Select top 10 most popular suppliers by frequency
candidate_items = sorted(
supplier_history.items(),
key=lambda item: supplier_freq[item[0]],
reverse=True,
)[:10]
if debug:
print(f"[PREDICT] Processing top {len(candidate_items)} suppliers")
if not candidate_items:
if debug:
print("[PREDICT] No suppliers found. Returning empty.")
return []
now = datetime.now()
# Build shared encoders for categorical features.
supplier_to_idx = {sid: idx for idx, sid in enumerate(supplier_history.keys())}
user_values = [exp.get("user_id") for exp in expenses if exp.get("user_id") is not None]
user_to_idx = {uid: idx for idx, uid in enumerate(sorted(set(user_values), key=str))}
# Collect all valid samples for the global model.
all_samples: list[tuple] = []
for exp in expenses:
try:
tx_date = datetime.fromisoformat(exp["date"])
tx_sum = float(exp["sum"])
tx_supplier = exp["supplier_id"]
tx_user = exp["user_id"]
all_samples.append((tx_date, tx_supplier, tx_user, tx_sum))
except Exception:
continue
global_model, global_model_conf, model_name = _train_global_model(
all_samples, supplier_to_idx, user_to_idx, debug=debug
)
# Compute per-supplier and per user×supplier histories for inference features.
supplier_amounts_sorted: dict = defaultdict(list)
user_supplier_amounts_sorted: dict = defaultdict(list)
supplier_last_date: dict = {}
user_supplier_last_date: dict = {}
user_supplier_last_sum: dict = {}
for tx_date, tx_supplier, tx_user, tx_sum in sorted(all_samples, key=lambda s: s[0]):
supplier_amounts_sorted[tx_supplier].append(tx_sum)
user_supplier_amounts_sorted[(tx_user, tx_supplier)].append(tx_sum)
supplier_last_date[tx_supplier] = tx_date
user_supplier_last_date[(tx_user, tx_supplier)] = tx_date
user_supplier_last_sum[(tx_user, tx_supplier)] = tx_sum
# Predict amount for each selected supplier.
predictions = []
for supplier_id, _records in candidate_items:
s_hist = supplier_amounts_sorted.get(supplier_id, [])
us_hist = user_supplier_amounts_sorted.get((target_user_id, supplier_id), [])
avg_amount = statistics.mean(s_hist) if s_hist else 0.0
s_rolling3 = statistics.mean(s_hist[-3:]) if s_hist else avg_amount
s_rolling_all = avg_amount
s_median = statistics.median(s_hist) if s_hist else avg_amount
us_rolling3 = statistics.mean(us_hist[-3:]) if us_hist else s_rolling3
us_rolling_all = statistics.mean(us_hist) if us_hist else s_rolling_all
us_median = statistics.median(us_hist) if us_hist else s_median
last_s_date = supplier_last_date.get(supplier_id)
last_us_date = user_supplier_last_date.get((target_user_id, supplier_id))
us_last_sum = user_supplier_last_sum.get((target_user_id, supplier_id), us_rolling3)
s_gap_days = max(0, (now - last_s_date).days) if last_s_date else 0
us_gap_days = max(0, (now - last_us_date).days) if last_us_date else s_gap_days
next_features = [[
supplier_to_idx.get(supplier_id, -1),
user_to_idx.get(target_user_id, -1),
now.day,
now.weekday(),
now.month,
s_rolling3,
s_rolling_all,
s_median,
us_rolling3,
us_rolling_all,
us_median,
us_last_sum,
s_gap_days,
us_gap_days,
]]
if global_model is not None:
predicted_amount = float(global_model.predict(next_features)[0])
# Local confidence: disagreement between trees.
if hasattr(global_model, "estimators_"):
tree_preds = [float(tree.predict(next_features)[0]) for tree in global_model.estimators_]
tree_std = statistics.stdev(tree_preds) if len(tree_preds) > 1 else 0.0
amount_scale = max(1.0, abs(predicted_amount))
local_model_conf = math.exp(-(tree_std / amount_scale))
else:
local_model_conf = 0.7
model_conf = (0.6 * global_model_conf) + (0.4 * local_model_conf)
model_conf = max(0.0, min(1.0, model_conf))
else:
predicted_amount = avg_amount
model_conf = 0.5
# Calibrate prediction toward robust historical center for this user/supplier.
# This usually stabilizes noisy forecasts and reduces MAE on small histories.
us_count = len(us_hist)
w_user_hist = min(1.0, us_count / 8.0)
robust_center = (w_user_hist * us_median) + ((1.0 - w_user_hist) * s_median)
blend_weight = 0.7 if global_model is not None else 0.0
predicted_amount = (blend_weight * predicted_amount) + ((1.0 - blend_weight) * robust_center)
# Clamp into realistic historical range to avoid extreme outputs.
hist_for_bounds = us_hist if len(us_hist) >= 5 else s_hist
if hist_for_bounds:
lower_bound = _quantile(hist_for_bounds, 0.1)
upper_bound = _quantile(hist_for_bounds, 0.9)
predicted_amount = max(lower_bound, min(predicted_amount, upper_bound))
next_predicted_date = now.strftime("%Y-%m-%d")
predicted_user = target_user_id
amount_std = statistics.stdev(s_hist) if len(s_hist) > 1 else 0
consistency = max(0, 1 - (amount_std / avg_amount)) if avg_amount > 0 else 0.5
frequency_score = min(supplier_freq[supplier_id] / total_records, 1.0)
confidence = (0.4 * consistency) + (0.3 * frequency_score) + (0.3 * model_conf)
if debug:
print(
f"[PREDICT] supplier_id={supplier_id}, user_id={predicted_user} | "
f"avg_amount={avg_amount:.2f}, s_rolling3={s_rolling3:.2f}, "
f"us_rolling3={us_rolling3:.2f}, pred_sum={predicted_amount:.2f}, "
f"target_date={next_predicted_date}, "
f"us_count={us_count}, us_gap={us_gap_days}d, "
f"consistency={consistency:.2f}, freq_score={frequency_score:.2f}, "
f"model={model_name if global_model is not None else 'fallback'}, "
f"model_conf={model_conf:.2f}, confidence={confidence:.2f}"
)
predictions.append({
"date": next_predicted_date,
"sum": round(max(0.0, predicted_amount), 2),
"supplier_id": supplier_id,
"user_id": predicted_user,
"show": True,
"confidence": round(confidence, 2)
})
# Return all selected suppliers sorted by frequency desc.
result = sorted(
predictions,
key=lambda x: supplier_freq.get(x["supplier_id"], 0),
reverse=True,
)
if debug:
print(f"[PREDICT] Final top {len(result)} predictions:")
for i, p in enumerate(result, 1):
print(f"[PREDICT] #{i}: supplier_id={p['supplier_id']}, user_id={p['user_id']}, date={p['date']}, sum={p['sum']}, confidence={p['confidence']}")
return result