Spaces:
Sleeping
Sleeping
| import os, json, calendar | |
| from datetime import date, timedelta | |
| # Keras 3 поверх TF | |
| os.environ["KERAS_BACKEND"] = "tensorflow" | |
| import numpy as np | |
| import pandas as pd | |
| import joblib | |
| import gradio as gr | |
| import keras # standalone Keras 3 | |
| # ====== ПУТИ К АРТЕФАКТАМ ====== | |
| MODEL_DIR = "neiro1" # в Space лежит папка neiro1 рядом с app.py | |
| MODEL_PATH = os.path.join(MODEL_DIR, "model_v6.keras") | |
| PP_PATH = os.path.join(MODEL_DIR, "preprocess_v6.joblib") | |
| META_PATH = os.path.join(MODEL_DIR, "meta_v6.json") | |
| # ====== ЗАГРУЗКА ====== | |
| model = keras.models.load_model(MODEL_PATH, compile=False) | |
| ct = joblib.load(PP_PATH) | |
| with open(META_PATH, "r", encoding="utf-8") as f: | |
| META = json.load(f) | |
| # ====== ПАРАМЕТРЫ И МЕТАДАННЫЕ ====== | |
| ROUTE = META.get("route", "SFO_MIA") | |
| ORIGIN, DEST = ROUTE.split("_") | |
| AIRLINES = META.get("airlines", []) | |
| M_BLEND = {int(k): float(v) for k, v in META.get("blend_month", {}).items()} | |
| A_BLEND = {str(k): float(v) for k, v in META.get("blend_airline", {}).items()} | |
| AM_MEANS = {k: float(v) for k, v in META.get("am_means", {}).items()} | |
| AM_COUNTS= {k: int(v) for k, v in META.get("am_counts", {}).items()} | |
| W_MONTH, W_AIR, W_AM = META.get("w_month", 0.0), META.get("w_air", 0.0), META.get("w_am", 0.0) | |
| FLOOR_FRAC = float(META.get("floor_frac", 0.25)) | |
| EXOG = META.get("exog_params", {}) | |
| K_AIRLINE = float(META.get("calibration_k_airline", 1.0)) | |
| K_ROUTE = float(META.get("calibration_k_route", 1.0)) | |
| WHITELIST = META.get("airline_whitelist", AIRLINES) | |
| SHARE = META.get("airline_share_overall", {}) | |
| CAP_TARGET = float(META.get("capacity_target_summer", 1000.0)) | |
| # ====== ХЕЛПЕРЫ ====== | |
| def nth_weekday(year, month, weekday, n): | |
| d = date(year, month, 1) | |
| add = (weekday - d.weekday()) % 7 | |
| return d + timedelta(days=add) + timedelta(weeks=n-1) | |
| def last_weekday(year, month, weekday): | |
| d = date(year, month, calendar.monthrange(year, month)[1]) | |
| sub = (d.weekday() - weekday) % 7 | |
| return d - timedelta(days=sub) | |
| def daterange(d0, d1): | |
| cur, out = d0, [] | |
| while cur <= d1: | |
| out.append(cur) | |
| cur += timedelta(days=1) | |
| return out | |
| def holiday_windows_for_year(y): | |
| mem = set(daterange(last_weekday(y,5,0)-timedelta(days=3), last_weekday(y,5,0))) | |
| july = set(daterange(date(y,6,30), date(y,7,7))) | |
| labor= set(daterange(nth_weekday(y,9,0,1)-timedelta(days=3), nth_weekday(y,9,0,1))) | |
| tg = set(daterange(nth_weekday(y,11,3,4)-timedelta(days=2), nth_weekday(y,11,3,4)+timedelta(days=3))) | |
| xny = set(daterange(date(y,12,20), date(y,12,31))) | set(daterange(date(y+1,1,1), date(y+1,1,5))) | |
| return {"memorial":mem, "july4":july, "labor":labor, "thanksgiving":tg, "xmas_newyear":xny} | |
| def exog_multiplier(ts: pd.Timestamp) -> float: | |
| y = int(ts.year); m = int(ts.month); dow = int(ts.weekday()) | |
| P = {k: float(EXOG.get(k, v)) for k, v in dict( | |
| SUMMER_MULT=1.60, FRI_MULT=1.10, SAT_MULT=1.15, SUN_MULT=1.10, | |
| SPRING_BREAK_MULT=1.12, MEMORIAL_MULT=1.35, JULY4_MULT=1.30, | |
| LABOR_MULT=1.25, THANKSGIVING_MULT=1.60, XMAS_NEWYEAR_MULT=1.70 | |
| ).items()} | |
| mult = 1.0 | |
| if m in (6,7,8): mult *= P["SUMMER_MULT"] | |
| if pd.Timestamp(year=y, month=3, day=10) <= ts <= pd.Timestamp(year=y, month=4, day=10): | |
| mult *= P["SPRING_BREAK_MULT"] | |
| if dow == 4: mult *= P["FRI_MULT"] | |
| if dow == 5: mult *= P["SAT_MULT"] | |
| if dow == 6: mult *= P["SUN_MULT"] | |
| hw = holiday_windows_for_year(y); hw_prev = holiday_windows_for_year(y-1) | |
| if ts.date() in hw["memorial"]: mult *= P["MEMORIAL_MULT"] | |
| if ts.date() in hw["july4"]: mult *= P["JULY4_MULT"] | |
| if ts.date() in hw["labor"]: mult *= P["LABOR_MULT"] | |
| if ts.date() in hw["thanksgiving"]: mult *= P["THANKSGIVING_MULT"] | |
| if ts.date() in hw["xmas_newyear"] or ts.date() in hw_prev["xmas_newyear"]: | |
| mult *= P["XMAS_NEWYEAR_MULT"] | |
| return float(mult) | |
| def _blend(child_mean, child_n, prior_mean, prior_w): | |
| child_n = float(child_n) | |
| return float((child_n*child_mean + prior_w*prior_mean) / (child_n + prior_w)) | |
| def baseline_pair(a, m): | |
| gmean = np.mean(list(A_BLEND.values())) if A_BLEND else 0.0 | |
| a0 = A_BLEND.get(str(a), gmean) | |
| m0 = M_BLEND.get(int(m), gmean) | |
| prior = 0.5*a0 + 0.5*m0 | |
| key = f"{a}_{int(m)}" | |
| if key in AM_MEANS: | |
| mean = AM_MEANS[key]; cnt = AM_COUNTS.get(key, 0) | |
| return _blend(mean, cnt, prior, W_AM) | |
| return float(prior) | |
| def make_inputs(airline: str, date_str: str): | |
| d = pd.to_datetime(date_str) | |
| dow, month = int(d.weekday()), int(d.month) | |
| weekofyear = int(d.isocalendar().week) | |
| is_weekend = int(dow in [5,6]) | |
| sin_dow, cos_dow = np.sin(2*np.pi*dow/7), np.cos(2*np.pi*dow/7) | |
| sin_mon, cos_mon = np.sin(2*np.pi*(month-1)/12), np.cos(2*np.pi*(month-1)/12) | |
| exog = exog_multiplier(d) | |
| X_df = pd.DataFrame([{ | |
| "airline": airline, | |
| "sin_dow": sin_dow, "cos_dow": cos_dow, | |
| "sin_month": sin_mon, "cos_month": cos_mon, | |
| "weekofyear": weekofyear, "is_weekend": is_weekend, | |
| "exog_mult": exog, | |
| }]) | |
| base = baseline_pair(airline, month) * exog | |
| return ct.transform(X_df), np.array([[base]], dtype=float), dict( | |
| dow=dow, month=month, weekofyear=weekofyear, is_weekend=bool(is_weekend), | |
| exog=exog, baseline=float(base) | |
| ) | |
| def predict_one(airline, date_str): | |
| X, B, info = make_inputs(airline, date_str) | |
| y_raw = float(model.predict([X, B], verbose=0)[0,0]) | |
| y_cal = y_raw * K_AIRLINE | |
| # safety floor: минимум 25% от baseline (или META['floor_frac']) | |
| floor_val = float(B[0,0]) * max(FLOOR_FRAC, 0.25) | |
| y_cal = max(y_cal, floor_val) | |
| return y_cal, info, y_raw | |
| # ====== UI ====== | |
| with gr.Blocks(title="Прогноз пассажиров (v6.2)") as demo: | |
| gr.Markdown("### Прогноз числа пассажиров (учтены сезонность/праздники)") | |
| with gr.Row(): | |
| gr.Textbox(label="Маршрут", value=ROUTE.replace('_', '→'), interactive=False) | |
| dd_airline = gr.Dropdown( | |
| choices=(WHITELIST or AIRLINES or ["American airlines","United"]), | |
| value=(WHITELIST[0] if WHITELIST else (AIRLINES[0] if AIRLINES else "American airlines")), | |
| label="Авиакомпания", | |
| allow_custom_value=False | |
| ) | |
| tb_date = gr.Textbox(label="Дата (YYYY-MM-DD)", value=str(pd.Timestamp.today().date())) | |
| sum_all = gr.Checkbox(label="Суммарно по всем авиакомпаниям", value=True) | |
| def predict_ui(airline, date_str, sum_all): | |
| # простая валидация даты | |
| try: | |
| _ = pd.to_datetime(date_str) | |
| except Exception: | |
| return 0, "Некорректная дата, формат YYYY-MM-DD." | |
| if sum_all: | |
| total_raw, lines = 0.0, [] | |
| for a in (WHITELIST or AIRLINES or [airline]): | |
| X, B, info = make_inputs(a, date_str) | |
| y_raw = float(model.predict([X, B], verbose=0)[0,0]) | |
| total_raw += y_raw | |
| lines.append(f"- {a}: base×M={info['baseline']:.1f} (M={info['exog']:.2f}) → raw {y_raw:.1f}") | |
| total = total_raw * K_ROUTE | |
| msg = ( | |
| f"Маршрут: **{ORIGIN}→{DEST}**, дата: **{date_str}** \n" | |
| f"Суммарный прогноз: **{round(total)}** " | |
| f"(raw_sum={total_raw:.1f}, k_route={K_ROUTE:.2f}; целевой летний≈{int(CAP_TARGET)})\n" | |
| + "\n".join(lines) | |
| ) | |
| return round(total), msg | |
| y, info, y_raw = predict_one(airline, date_str) | |
| warn = "" | |
| if SHARE and SHARE.get(airline, 0.0) < 0.05: | |
| warn = ( | |
| "\n\n_Примечание: доля этой авиакомпании на маршруте <5%, " | |
| "прогноз может быть занижен; попробуйте «Суммарно по всем»._" | |
| ) | |
| msg = ( | |
| f"Маршрут: **{ORIGIN}→{DEST}**, авиакомпания: **{airline}**, дата: **{date_str}** \n" | |
| f"M(date)={info['exog']:.2f}; baseline×M={info['baseline']:.1f} \n" | |
| f"Прогноз: **{round(y)}** (raw={y_raw:.1f}, k_airline={K_AIRLINE:.2f}){warn}" | |
| ) | |
| return round(y), msg | |
| btn = gr.Button("Рассчитать") | |
| out_pred = gr.Number(label="Прогноз пассажиров", interactive=False) | |
| out_expl = gr.Markdown() | |
| btn.click(predict_ui, [dd_airline, tb_date, sum_all], [out_pred, out_expl]) | |
| # очередь без спорных параметров (кросс-версийно) | |
| try: | |
| demo.queue(max_size=32) | |
| except TypeError: | |
| demo.queue() | |
| # HF Spaces сам запускает; локально — так: | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |