import html import os import re import secrets import sys import threading import uuid from collections import Counter from datetime import date, datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import pandas as pd from flask import Flask, jsonify, render_template, request, session # Ensure project root is importable when this file runs directly. BASE_DIR = Path(__file__).resolve().parent if str(BASE_DIR) not in sys.path: sys.path.insert(0, str(BASE_DIR)) import optimizer_core as opt # noqa: E402 app = Flask( __name__, template_folder=str(Path(__file__).resolve().parent / "templates"), static_folder=str(Path(__file__).resolve().parent / "static"), ) app.secret_key = os.getenv("NEXTDAY_OPTIMIZER_WEB_SECRET", secrets.token_hex(32)) _STATE_LOCK = threading.Lock() _SESSION_STATE: Dict[str, Dict[str, Any]] = {} TUNING_COLUMNS = [ "选中", "影片", "今日场次", "今日黄金场次", "今日全天效率", "今日黄金效率", "最少场次", "最多场次", "固定场次", "最少黄金场次", "最多黄金场次", "最低场次占比", "最高场次占比", ] def _get_sid() -> str: sid = session.get("nextday_optimizer_sid") if not sid: sid = uuid.uuid4().hex session["nextday_optimizer_sid"] = sid return sid def _get_user_state() -> Dict[str, Any]: sid = _get_sid() with _STATE_LOCK: return _SESSION_STATE.setdefault(sid, {}) def _to_bool(v: Any, default: bool = False) -> bool: if isinstance(v, bool): return v if v in (None, ""): return default if isinstance(v, (int, float)): return bool(v) s = str(v).strip().lower() if s in {"1", "true", "yes", "y", "on"}: return True if s in {"0", "false", "no", "n", "off"}: return False return default def _to_int(v: Any, default: int = 0) -> int: try: if v in (None, "", "None"): return int(default) return int(float(v)) except Exception: return int(default) def _to_float(v: Any, default: float = 0.0) -> float: try: if v in (None, "", "None"): return float(default) return float(v) except Exception: return float(default) def _safe_value(v: Any) -> Any: if v is None: return None if isinstance(v, (datetime, date)): return v.isoformat() if isinstance(v, bool): return v if isinstance(v, (int, float, str)): if isinstance(v, float) and (pd.isna(v) or v in (float("inf"), float("-inf"))): return None return v try: if pd.isna(v): return None except Exception: pass if isinstance(v, (list, tuple)): return [_safe_value(x) for x in v] if isinstance(v, dict): return {str(k): _safe_value(val) for k, val in v.items()} return str(v) def _df_to_records(df: Optional[pd.DataFrame]) -> List[Dict[str, Any]]: if df is None or not isinstance(df, pd.DataFrame) or df.empty: return [] out: List[Dict[str, Any]] = [] for _, row in df.iterrows(): item: Dict[str, Any] = {} for col in df.columns: item[str(col)] = _safe_value(row.get(col)) out.append(item) return out def _normalize_tuning_rows(rows: List[Dict[str, Any]]) -> pd.DataFrame: if not rows: return pd.DataFrame(columns=TUNING_COLUMNS) df = pd.DataFrame(rows) for c in TUNING_COLUMNS: if c not in df.columns: df[c] = None df = df[TUNING_COLUMNS] return opt.coerce_tuning_editor_df(df) def _tuning_df_to_rows(df: Optional[pd.DataFrame]) -> List[Dict[str, Any]]: if df is None or df.empty: return [] return _df_to_records(df[TUNING_COLUMNS].copy()) def _build_runtime_cfg(cfg: Dict[str, Any], payload: Dict[str, Any]) -> Dict[str, Any]: raw = payload or {} # maintenance blocks keep list records; parser in opt handles stricter normalization. maintenance_blocks = raw.get("maintenance_blocks") if maintenance_blocks is None: maintenance_blocks = cfg.get("maintenance_blocks", []) widgets = { "business_start": opt.parse_hm(str(raw.get("business_start", cfg.get("business_start", "09:30"))), "09:30"), "business_end": opt.parse_hm(str(raw.get("business_end", cfg.get("business_end", "01:30"))), "01:30"), "turnaround_base": _to_int(raw.get("turnaround_base", cfg.get("turnaround_base", 10)), 10), "golden_start": opt.parse_hm(str(raw.get("golden_start", cfg.get("golden_start", "14:00"))), "14:00"), "golden_end": opt.parse_hm(str(raw.get("golden_end", cfg.get("golden_end", "21:00"))), "21:00"), "efficiency_enabled": _to_bool(raw.get("efficiency_enabled", cfg.get("efficiency_enabled", True)), True), "efficiency_penalty_coef": _to_float(raw.get("efficiency_penalty_coef", cfg.get("efficiency_penalty_coef", 1.0)), 1.0), "eff_daily_delta_cap": _to_int(raw.get("eff_daily_delta_cap", cfg.get("eff_daily_delta_cap", 5)), 5), "rule1_enabled": _to_bool(raw.get("rule1_enabled", cfg.get("rule1_enabled", True)), True), "rule1_gap": _to_int(raw.get("rule1_gap", cfg.get("rule1_gap", 30)), 30), "rule2_enabled": _to_bool(raw.get("rule2_enabled", cfg.get("rule2_enabled", True)), True), "rule2_threshold": _to_int(raw.get("rule2_threshold", cfg.get("rule2_threshold", 4)), 4), "rule2_window_minutes": _to_int(raw.get("rule2_window_minutes", cfg.get("rule2_window_minutes", 30)), 30), "rule2_penalty": _to_float(raw.get("rule2_penalty", cfg.get("rule2_penalty", 15.0)), 15.0), "rule2_exempt_ranges": str(raw.get("rule2_exempt_ranges", ", ".join(cfg.get("rule2_exempt_ranges", [])))), "rule3_enabled": _to_bool(raw.get("rule3_enabled", cfg.get("rule3_enabled", True)), True), "rule3_gap_minutes": _to_int(raw.get("rule3_gap_minutes", cfg.get("rule3_gap_minutes", 30)), 30), "rule3_penalty": _to_float(raw.get("rule3_penalty", cfg.get("rule3_penalty", 12.0)), 12.0), "rule4_enabled": _to_bool(raw.get("rule4_enabled", cfg.get("rule4_enabled", True)), True), "rule4_earliest": opt.parse_hm(str(raw.get("rule4_earliest", cfg.get("rule4_earliest", "10:00"))), "10:00"), "rule4_latest": opt.parse_hm(str(raw.get("rule4_latest", cfg.get("rule4_latest", "22:30"))), "22:30"), "rule9_enabled": _to_bool(raw.get("rule9_enabled", cfg.get("rule9_enabled", True)), True), "rule9_hot_top_n": _to_int(raw.get("rule9_hot_top_n", cfg.get("rule9_hot_top_n", 3)), 3), "rule9_min_ratio": _to_float(raw.get("rule9_min_ratio", cfg.get("rule9_min_ratio", 0.30)), 0.30), "rule9_penalty": _to_float(raw.get("rule9_penalty", cfg.get("rule9_penalty", 20.0)), 20.0), "rule11_enabled": _to_bool(raw.get("rule11_enabled", cfg.get("rule11_enabled", True)), True), "rule11_after_time": opt.parse_hm(str(raw.get("rule11_after_time", cfg.get("rule11_after_time", "22:00"))), "22:00"), "rule11_penalty": _to_float(raw.get("rule11_penalty", cfg.get("rule11_penalty", 30.0)), 30.0), "rule12_enabled": _to_bool(raw.get("rule12_enabled", cfg.get("rule12_enabled", True)), True), "rule12_penalty_each": _to_float(raw.get("rule12_penalty_each", cfg.get("rule12_penalty_each", 25.0)), 25.0), "rule13_enabled": _to_bool(raw.get("rule13_enabled", cfg.get("rule13_enabled", True)), True), "rule13_forbidden_halls": str(raw.get("rule13_forbidden_halls", ",".join(cfg.get("rule13_forbidden_halls", [])))), "tms_allowance": _to_int(raw.get("tms_allowance", cfg.get("tms_allowance", 0)), 0), "maintenance_blocks": maintenance_blocks, "iterations": _to_int(raw.get("iterations", cfg.get("iterations", 300)), 300), "random_seed": _to_int(raw.get("random_seed", cfg.get("random_seed", 20260331)), 20260331), } return opt.build_runtime_config_from_widgets(cfg, widgets) def _runtime_cfg_for_json(cfg: Dict[str, Any]) -> Dict[str, Any]: return { "business_start": str(cfg.get("business_start", "09:30")), "business_end": str(cfg.get("business_end", "01:30")), "turnaround_base": _to_int(cfg.get("turnaround_base", 10), 10), "golden_start": str(cfg.get("golden_start", "14:00")), "golden_end": str(cfg.get("golden_end", "21:00")), "efficiency_enabled": _to_bool(cfg.get("efficiency_enabled", True), True), "efficiency_penalty_coef": _to_float(cfg.get("efficiency_penalty_coef", 1.0), 1.0), "eff_daily_delta_cap": _to_int(cfg.get("eff_daily_delta_cap", 5), 5), "rule1_enabled": _to_bool(cfg.get("rule1_enabled", True), True), "rule1_gap": _to_int(cfg.get("rule1_gap", 30), 30), "rule2_enabled": _to_bool(cfg.get("rule2_enabled", True), True), "rule2_threshold": _to_int(cfg.get("rule2_threshold", 4), 4), "rule2_window_minutes": _to_int(cfg.get("rule2_window_minutes", 30), 30), "rule2_penalty": _to_float(cfg.get("rule2_penalty", 15.0), 15.0), "rule2_exempt_ranges": ", ".join(cfg.get("rule2_exempt_ranges", [])), "rule3_enabled": _to_bool(cfg.get("rule3_enabled", True), True), "rule3_gap_minutes": _to_int(cfg.get("rule3_gap_minutes", 30), 30), "rule3_penalty": _to_float(cfg.get("rule3_penalty", 12.0), 12.0), "rule4_enabled": _to_bool(cfg.get("rule4_enabled", True), True), "rule4_earliest": str(cfg.get("rule4_earliest", "10:00")), "rule4_latest": str(cfg.get("rule4_latest", "22:30")), "rule9_enabled": _to_bool(cfg.get("rule9_enabled", True), True), "rule9_hot_top_n": _to_int(cfg.get("rule9_hot_top_n", 3), 3), "rule9_min_ratio": _to_float(cfg.get("rule9_min_ratio", 0.30), 0.30), "rule9_penalty": _to_float(cfg.get("rule9_penalty", 20.0), 20.0), "rule11_enabled": _to_bool(cfg.get("rule11_enabled", True), True), "rule11_after_time": str(cfg.get("rule11_after_time", "22:00")), "rule11_penalty": _to_float(cfg.get("rule11_penalty", 30.0), 30.0), "rule12_enabled": _to_bool(cfg.get("rule12_enabled", True), True), "rule12_penalty_each": _to_float(cfg.get("rule12_penalty_each", 25.0), 25.0), "rule13_enabled": _to_bool(cfg.get("rule13_enabled", True), True), "rule13_forbidden_halls": ",".join(str(x) for x in cfg.get("rule13_forbidden_halls", ["2", "8", "9"])), "tms_allowance": _to_int(cfg.get("tms_allowance", 0), 0), "maintenance_blocks": list(cfg.get("maintenance_blocks", [])), "iterations": _to_int(cfg.get("iterations", 300), 300), "random_seed": _to_int(cfg.get("random_seed", 20260331), 20260331), } def _base_and_target(base_str: str) -> Tuple[date, date, str, str]: try: base_date = datetime.strptime(base_str, "%Y-%m-%d").date() except Exception: base_date = date.today() target_date = base_date + timedelta(days=1) return base_date, target_date, base_date.strftime("%Y-%m-%d"), target_date.strftime("%Y-%m-%d") def _normalize_string_list(raw: Any) -> List[str]: if not isinstance(raw, list): return [] out: List[str] = [] seen: set[str] = set() for x in raw: v = str(x).strip() if not v or v in seen: continue seen.add(v) out.append(v) return out def _normalize_exclude_rules(raw: Any) -> Dict[str, bool]: obj = raw if isinstance(raw, dict) else {} return { "zero_sales": _to_bool(obj.get("zero_sales", False), False), "early_morning": _to_bool(obj.get("early_morning", False), False), } def _session_start_text(session: Dict[str, Any]) -> str: return str(session.get("showStartTime") or session.get("startTime") or "").strip() def _session_end_text(session: Dict[str, Any]) -> str: return str(session.get("showEndTime") or session.get("endTime") or "").strip() def _session_hall_name(session: Dict[str, Any]) -> str: return str(session.get("hallName") or session.get("hallId") or "").strip() def _session_movie_name(session: Dict[str, Any]) -> str: return str(session.get("movieName") or "").strip() def _session_sold_tickets(session: Dict[str, Any]) -> int: return _to_int( session.get("soldTicketNum", session.get("buyTicketNum", session.get("ticketNum", 0))), 0, ) def _session_box_office(session: Dict[str, Any]) -> float: return _to_float(session.get("soldBoxOffice", 0.0), 0.0) def _hm_to_minutes(text: str) -> Optional[int]: if not text: return None try: t = datetime.strptime(str(text), "%H:%M") except Exception: return None return t.hour * 60 + t.minute def _is_zero_sales_session(session: Dict[str, Any]) -> bool: return _session_sold_tickets(session) <= 0 and _session_box_office(session) <= 0 def _is_early_morning_session(session: Dict[str, Any]) -> bool: mins = _hm_to_minutes(_session_start_text(session)) return mins is not None and mins < 10 * 60 def _session_special_tags(session: Dict[str, Any]) -> List[str]: tags: List[str] = [] if _is_zero_sales_session(session): tags.append("零票零票房") if _is_early_morning_session(session): tags.append("早场") sold = _session_sold_tickets(session) box_office = _session_box_office(session) if sold <= 1 and 0 < box_office <= 20: tags.append("低票低收") return tags def _build_exclusion_options( raw_today_schedule: List[Dict[str, Any]], ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: options: List[Dict[str, Any]] = [] movie_counter: Counter[str] = Counter() hall_counter: Counter[str] = Counter() for idx, session in enumerate(raw_today_schedule): movie = _session_movie_name(session) or "未知影片" hall = _session_hall_name(session) or "未知影厅" start = _session_start_text(session) end = _session_end_text(session) sold = _session_sold_tickets(session) box_office = _session_box_office(session) tags = _session_special_tags(session) tag_text = f" | {'/'.join(tags)}" if tags else "" time_span = f"{start}-{end}" if end else start options.append( { "key": str(idx), "label": f"{time_span} | {hall} | {movie} | 票{sold} 票房{box_office:.0f}{tag_text}", "movie": movie, "hall": hall, "suspected": bool(tags), "legacy_label": opt.session_display_label(session), } ) movie_counter[movie] += 1 hall_counter[hall] += 1 movie_options = [ {"value": name, "label": f"{name}({cnt}场)", "count": int(cnt)} for name, cnt in sorted(movie_counter.items(), key=lambda kv: (-kv[1], kv[0])) ] hall_options = [ {"value": name, "label": f"{name}({cnt}场)", "count": int(cnt)} for name, cnt in sorted(hall_counter.items(), key=lambda kv: (-kv[1], _hall_sort_key(kv[0]))) ] return options, movie_options, hall_options def _apply_exclusion_filters( schedule_list: List[Dict[str, Any]], excluded_session_keys: List[str], excluded_movies: List[str], excluded_halls: List[str], exclude_rules: Dict[str, bool], ) -> Tuple[List[Dict[str, Any]], Dict[str, Any], List[str]]: if not schedule_list: return [], {"total_count": 0, "removed_count": 0, "remaining_count": 0, "affected_movies": 0, "reason_breakdown": {}}, [] key_set = {int(x) for x in excluded_session_keys if str(x).isdigit()} movie_set = set(excluded_movies) hall_set = set(excluded_halls) rules = _normalize_exclude_rules(exclude_rules) filtered: List[Dict[str, Any]] = [] removed_labels: List[str] = [] removed_movies: set[str] = set() reason_counter: Counter[str] = Counter() for idx, session in enumerate(schedule_list): reasons: List[str] = [] movie = _session_movie_name(session) hall = _session_hall_name(session) if idx in key_set: reasons.append("手动场次") if movie and movie in movie_set: reasons.append("按影片剔除") if hall and hall in hall_set: reasons.append("按影厅剔除") if rules["zero_sales"] and _is_zero_sales_session(session): reasons.append("零票零票房") if rules["early_morning"] and _is_early_morning_session(session): reasons.append("早场(10:00前)") if reasons: removed_labels.append(opt.session_display_label(session)) if movie: removed_movies.add(movie) for reason in sorted(set(reasons)): reason_counter[reason] += 1 else: filtered.append(session) effect = { "total_count": len(schedule_list), "removed_count": len(schedule_list) - len(filtered), "remaining_count": len(filtered), "affected_movies": len(removed_movies), "reason_breakdown": dict(reason_counter.most_common()), } return filtered, effect, removed_labels def _bundle_payload(state: Dict[str, Any]) -> Optional[Dict[str, Any]]: bundle = state.get("bundle") if not isinstance(bundle, dict): return None raw_today_schedule = bundle.get("today_schedule_raw") or bundle.get("today_schedule") or [] exclude_options, exclude_movie_options, exclude_hall_options = _build_exclusion_options(raw_today_schedule) key_to_legacy = {item["key"]: item["legacy_label"] for item in exclude_options} legacy_to_keys: Dict[str, List[str]] = {} for item in exclude_options: legacy_to_keys.setdefault(item["legacy_label"], []).append(item["key"]) valid_key_set = {item["key"] for item in exclude_options} valid_movie_set = {item["value"] for item in exclude_movie_options} valid_hall_set = {item["value"] for item in exclude_hall_options} excluded_session_keys = _normalize_string_list(bundle.get("today_schedule_excluded_keys") or []) if not excluded_session_keys: legacy_labels = _normalize_string_list(bundle.get("today_schedule_excluded_labels") or []) mapped: List[str] = [] for label in legacy_labels: mapped.extend(legacy_to_keys.get(label, [])) excluded_session_keys = _normalize_string_list(mapped) excluded_session_keys = [x for x in excluded_session_keys if x in valid_key_set] excluded_movies = [x for x in _normalize_string_list(bundle.get("today_schedule_excluded_movies") or []) if x in valid_movie_set] excluded_halls = [x for x in _normalize_string_list(bundle.get("today_schedule_excluded_halls") or []) if x in valid_hall_set] exclude_rules = _normalize_exclude_rules(bundle.get("today_schedule_exclude_rules") or {}) exclude_effect = bundle.get("today_schedule_exclusion_effect") or { "total_count": len(raw_today_schedule), "removed_count": 0, "remaining_count": len(raw_today_schedule), "affected_movies": 0, "reason_breakdown": {}, } return { "target_str": str(bundle.get("target_str", "")), "today_str": str(bundle.get("today_str", "")), "excluded_labels": [key_to_legacy.get(k, "") for k in excluded_session_keys if key_to_legacy.get(k, "")], "excluded_session_keys": excluded_session_keys, "excluded_movies": excluded_movies, "excluded_halls": excluded_halls, "exclude_rules": exclude_rules, "exclude_options": [{k: v for k, v in item.items() if k != "legacy_label"} for item in exclude_options], "exclude_movie_options": exclude_movie_options, "exclude_hall_options": exclude_hall_options, "exclude_effect": _safe_value(exclude_effect), "movies_count": len(bundle.get("movies", [])), "locked_count": len(bundle.get("locked_sessions", [])), "tuning_rows": _tuning_df_to_rows(state.get("tuning_df")), "today_eff_rows": _df_to_records(bundle.get("today_eff")), } def _repair_broken_state() -> Dict[str, Any]: state = opt.read_job_state() if state.get("status") in {"running", "paused"} and opt._find_live_worker() is None: # noqa: SLF001 if _to_int(state.get("iter_done", 0), 0) < _to_int(state.get("iterations", 0), 0): state = opt.write_job_state(status="failed", control="run", message="后台任务已中断,请重新启动。") return state def _job_state_payload(state: Dict[str, Any]) -> Dict[str, Any]: return { "status": str(state.get("status", "idle")), "control": str(state.get("control", "run")), "job_id": str(state.get("job_id", "")), "started_at": str(state.get("started_at", "")), "ended_at": str(state.get("ended_at", "")), "target_date": str(state.get("target_date", "")), "iterations": _to_int(state.get("iterations", 0), 0), "iter_done": _to_int(state.get("iter_done", 0), 0), "progress": _to_float(state.get("progress", 0.0), 0.0), "elapsed_seconds": _to_float(state.get("elapsed_seconds", 0.0), 0.0), "feasible_count": _to_int(state.get("feasible_count", 0), 0), "hard_reject": _to_int(state.get("hard_reject", 0), 0), "build_reject": _to_int(state.get("build_reject", 0), 0), "rule_reject": _to_int(state.get("rule_reject", 0), 0), "reject_reason_top": _safe_value(state.get("reject_reason_top") or {}), "reject_detail_top": _safe_value(state.get("reject_detail_top") or {}), "message": str(state.get("message", "")), "result_count": _to_int(state.get("result_count", 0), 0), } def _hall_sort_key(hall_name: str) -> Tuple[int, str]: nums = re.findall(r"\d+", str(hall_name)) return (int(nums[0]), str(hall_name)) if nums else (9999, str(hall_name)) GANTT_MOVIE_COLORS = [ "#9D6BFF", "#FF6699", "#FF6666", "#2CD52C", "#99CCFF", "#FFCC99", "#FF6600", "#CCFF99", "#CC9999", "#FF9900", "#2CB5FB", "#CC66CC", "#9981B1", "#FFFF66", "#009999", "#3366CC", "#996633", ] GANTT_TEXT_COLOR = "#000000" def _gantt_color_key(movie_name: Any, movie_num: Any, media_type: Any, movie_language: Any) -> str: movie_num_value = str(movie_num or "").strip() base_key = movie_num_value if movie_num_value else str(movie_name or "").strip() language_value = str(movie_language or "").strip() imagery_value = str(media_type or "").strip() return "||".join([base_key, language_value, imagery_value]) def render_gantt_html(schedule: List[Dict[str, Any]], date_str: str) -> str: if not schedule: return '
无排片数据
' df = pd.DataFrame(schedule).copy() if df.empty: return '
无排片数据
' df["startTime"] = pd.to_datetime(df["startTime"], errors="coerce") df["endTime"] = pd.to_datetime(df["endTime"], errors="coerce") df = df.dropna(subset=["hallName", "movieName", "startTime", "endTime"]).copy() overnight_mask = df["endTime"] <= df["startTime"] if overnight_mask.any(): df.loc[overnight_mask, "endTime"] = df.loc[overnight_mask, "endTime"] + timedelta(days=1) if df.empty: return '
无有效排片数据
' df["movieColorKey"] = df.apply( lambda r: _gantt_color_key( r.get("movieName"), r.get("movieNum"), r.get("movieMediaType"), r.get("movieLanguage"), ), axis=1, ) hall_order = sorted(df["hallName"].astype(str).unique().tolist(), key=_hall_sort_key) t_min = df["startTime"].min().replace(minute=0, second=0, microsecond=0) t_max = (df["endTime"].max() + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) total_minutes = max(60.0, (t_max - t_min).total_seconds() / 60.0) total_hours = max(1, int((t_max - t_min).total_seconds() / 3600)) color_keys = sorted(df["movieColorKey"].astype(str).unique().tolist()) color_map = {k: GANTT_MOVIE_COLORS[i % len(GANTT_MOVIE_COLORS)] for i, k in enumerate(color_keys)} legend_map: Dict[str, str] = {} for _, r in df.iterrows(): key = str(r.get("movieColorKey") or "") film_name = str(r.get("movieName") or "") media_type = str(r.get("movieMediaType") or "").strip() if key not in legend_map: legend_map[key] = f"{film_name} {media_type}".strip() legend_items = [ f'{html.escape(legend_map.get(k, k))}' for k in color_keys[:16] ] if len(color_keys) > 16: legend_items.append(f'+{len(color_keys) - 16} 个制式/影片') legend_html = "".join(legend_items) labels = [] for i in range(total_hours + 1): labels.append(f'
{(t_min + timedelta(hours=i)).strftime("%H:%M")}
') time_labels_html = "".join(labels) halls_html = "" for hall in hall_order: row_df = df[df["hallName"].astype(str) == hall].sort_values("startTime") blocks = "" for _, r in row_df.iterrows(): start = r["startTime"] end = r["endTime"] left = ((start - t_min).total_seconds() / 60.0 / total_minutes) * 100.0 width = ((end - start).total_seconds() / 60.0 / total_minutes) * 100.0 if left < 0: width += left left = 0 width = max(0.4, width) if left + width > 100: width = max(0.4, 100 - left) movie_name = str(r.get("movieName") or "") media_type = str(r.get("movieMediaType") or "").strip() film_title = f"{movie_name} {media_type}".strip() hall_name = str(r.get("hallName") or "") sold = _to_int(r.get("sold", r.get("soldTicketNum", 0)), 0) duration_min = int((end - start).total_seconds() / 60) tooltip = html.escape( f"{film_title}\n{hall_name}\n{start.strftime('%H:%M')} - {end.strftime('%H:%M')} ({duration_min}min)\n已售:{sold}", quote=True, ).replace("\n", " ") lock_tag = '已售锁定' if _to_bool(r.get("is_presold", False), False) else "" blocks += ( f'
' f'
{html.escape(movie_name)}
' f'
{start.strftime("%H:%M")}-{end.strftime("%H:%M")} · {duration_min}m
' f"{lock_tag}" "
" ) halls_html += ( f'
{html.escape(str(hall))}
' f'
{blocks}
' ) dt_obj = datetime.strptime(date_str, "%Y-%m-%d") weekdays = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"] date_display = f"{dt_obj.strftime('%Y.%m.%d')} {weekdays[dt_obj.weekday()]} · 场次 {len(df)}" half_hour_grid_size = 100 / max(1, total_hours * 2) min_width = max(1320, int(total_hours * 96)) return f"""
{html.escape(date_display)}
{legend_html}
{time_labels_html}
{halls_html}
""" @app.route("/") def index() -> str: return render_template("index.html") @app.get("/api/session") def api_session() -> Any: cfg = opt.load_config() state = _get_user_state() today = date.today() bundle = state.get("bundle") if isinstance(bundle, dict) and bundle.get("today_str"): try: base_date = datetime.strptime(str(bundle.get("today_str")), "%Y-%m-%d").date() except Exception: base_date = today else: base_date = today _, _, base_str, target_str = _base_and_target(base_date.strftime("%Y-%m-%d")) job_state = _repair_broken_state() return jsonify( { "success": True, "base_date": base_str, "target_date": target_str, "runtime_cfg": _runtime_cfg_for_json(state.get("runtime_cfg") or cfg), "bundle": _bundle_payload(state), "job_state": _job_state_payload(job_state), } ) @app.post("/api/config/save") def api_config_save() -> Any: body = request.get_json(silent=True) or {} cfg = opt.load_config() runtime_cfg = _build_runtime_cfg(cfg, body.get("runtime_cfg") or {}) opt.save_config(runtime_cfg) state = _get_user_state() state["runtime_cfg"] = runtime_cfg return jsonify({"success": True, "runtime_cfg": _runtime_cfg_for_json(runtime_cfg)}) @app.post("/api/config/reset") def api_config_reset() -> Any: opt.save_config(dict(opt.DEFAULT_CONFIG)) cfg = opt.load_config() state = _get_user_state() state["runtime_cfg"] = cfg return jsonify({"success": True, "runtime_cfg": _runtime_cfg_for_json(cfg)}) @app.post("/api/load-data") def api_load_data() -> Any: body = request.get_json(silent=True) or {} base_date, target_date, today_str, target_str = _base_and_target(str(body.get("base_date", date.today().strftime("%Y-%m-%d")))) current_cfg = opt.load_config() runtime_cfg = _build_runtime_cfg(current_cfg, body.get("runtime_cfg") or {}) opt.save_config(runtime_cfg) next_day_schedule, hall_seat_map, err_next = opt.fetch_schedule_and_halls(target_str) today_schedule, _, err_today = opt.fetch_schedule_and_halls(today_str) if err_next: return jsonify({"success": False, "error": f"次日排片拉取失败:{err_next}"}), 400 if err_today: return jsonify({"success": False, "error": f"当日排片拉取失败:{err_today}"}), 400 hall_name_map = opt.build_hall_name_map(next_day_schedule, hall_seat_map) locked_sessions = opt.build_locked_sessions(next_day_schedule, target_date) movies = opt.fetch_movie_info_for_date(target_str) if not movies: return jsonify({"success": False, "error": "getMovieInfo 接口未返回可放映电影,无法生成排片。"}), 400 movies = opt.dedupe_movies_by_policy_key(movies) preview_windows_by_identity = opt.build_preview_windows_for_movies(target_date, movies) blockouts = opt.parse_blockouts_from_config(target_date, runtime_cfg.get("maintenance_blocks", [])) blockouts_by_hall = opt.build_hall_blockouts(blockouts, hall_name_map) biz_start_t = opt.parse_hm(runtime_cfg["business_start"], "09:30") biz_end_t = opt.parse_hm(runtime_cfg["business_end"], "01:30") golden_start_t = opt.parse_hm(runtime_cfg["golden_start"], "14:00") golden_end_t = opt.parse_hm(runtime_cfg["golden_end"], "21:00") biz_start_dt = opt.parse_operating_dt(target_date, biz_start_t) biz_end_dt = opt.parse_operating_dt(target_date, biz_end_t) if biz_end_dt <= biz_start_dt: biz_end_dt += timedelta(days=1) golden_start_dt = opt.parse_operating_dt(target_date, golden_start_t) golden_end_dt = opt.parse_operating_dt(target_date, golden_end_t) if golden_end_dt < golden_start_dt: golden_end_dt += timedelta(days=1) box_office_data = opt.fetch_realtime_box_office(target_str) if not box_office_data: box_office_data = opt.fetch_realtime_box_office(today_str) tms_rows = opt.fetch_tms_server_movies_raw() tms_by_hall = opt.build_tms_index_by_hall(tms_rows) today_eff = opt.build_today_efficiency(today_schedule, hall_seat_map, golden_start_t, golden_end_t) movie_targets = opt.build_movie_targets( movies=movies, today_eff=today_eff, locked_sessions=locked_sessions, box_office_data=box_office_data, rule12_enabled=bool(runtime_cfg["rule12_enabled"]), ) movie_weights = opt.build_movie_weights(movies, movie_targets, box_office_data) tuning_df = opt.build_default_tuning_table( movies=movies, movie_targets=movie_targets, today_eff=today_eff, next_day_schedule=next_day_schedule, box_office_data=box_office_data, efficiency_enabled=bool(runtime_cfg["efficiency_enabled"]), rule12_enabled=bool(runtime_cfg["rule12_enabled"]), daily_delta_cap=int(runtime_cfg.get("eff_daily_delta_cap", 5)), ) state = _get_user_state() state["runtime_cfg"] = runtime_cfg state["bundle"] = { "target_date": target_date, "target_str": target_str, "today_str": today_str, "next_day_schedule": next_day_schedule, "today_schedule_raw": list(today_schedule), "today_schedule_excluded_labels": [], "today_schedule_excluded_keys": [], "today_schedule_excluded_movies": [], "today_schedule_excluded_halls": [], "today_schedule_exclude_rules": {"zero_sales": False, "early_morning": False}, "today_schedule_exclusion_effect": { "total_count": len(today_schedule), "removed_count": 0, "remaining_count": len(today_schedule), "affected_movies": 0, "reason_breakdown": {}, }, "today_schedule": today_schedule, "hall_seat_map": hall_seat_map, "hall_name_map": hall_name_map, "locked_sessions": locked_sessions, "movies": movies, "preview_windows_by_identity": preview_windows_by_identity, "blockouts_by_hall": blockouts_by_hall, "biz_start_dt": biz_start_dt, "biz_end_dt": biz_end_dt, "golden_start_dt": golden_start_dt, "golden_end_dt": golden_end_dt, "today_eff": today_eff, "movie_targets": movie_targets, "movie_weights": movie_weights, "box_office_data": box_office_data, "tms_by_hall": tms_by_hall, } state["tuning_df"] = opt.coerce_tuning_editor_df(tuning_df) return jsonify( { "success": True, "runtime_cfg": _runtime_cfg_for_json(runtime_cfg), "bundle": _bundle_payload(state), "message": f"数据加载完成,目标日期 {target_str}。", } ) @app.post("/api/update-exclusions") def api_update_exclusions() -> Any: body = request.get_json(silent=True) or {} labels = _normalize_string_list(body.get("excluded_labels") or []) excluded_session_keys = _normalize_string_list(body.get("excluded_session_keys") or []) excluded_movies = _normalize_string_list(body.get("excluded_movies") or []) excluded_halls = _normalize_string_list(body.get("excluded_halls") or []) exclude_rules = _normalize_exclude_rules(body.get("exclude_rules") or {}) state = _get_user_state() bundle = state.get("bundle") runtime_cfg = state.get("runtime_cfg") or opt.load_config() if not isinstance(bundle, dict): return jsonify({"success": False, "error": "请先加载数据。"}), 400 raw_today_schedule = bundle.get("today_schedule_raw") or bundle.get("today_schedule") or [] option_rows, movie_rows, hall_rows = _build_exclusion_options(raw_today_schedule) valid_key_set = {str(item["key"]) for item in option_rows} valid_movie_set = {str(item["value"]) for item in movie_rows} valid_hall_set = {str(item["value"]) for item in hall_rows} if labels and not excluded_session_keys: legacy_to_keys: Dict[str, List[str]] = {} for item in option_rows: legacy_to_keys.setdefault(str(item["legacy_label"]), []).append(str(item["key"])) mapped_keys: List[str] = [] for label in labels: mapped_keys.extend(legacy_to_keys.get(label, [])) excluded_session_keys = _normalize_string_list(mapped_keys) excluded_session_keys = [x for x in excluded_session_keys if x in valid_key_set] excluded_movies = [x for x in excluded_movies if x in valid_movie_set] excluded_halls = [x for x in excluded_halls if x in valid_hall_set] filtered_today_schedule, exclusion_effect, removed_legacy_labels = _apply_exclusion_filters( raw_today_schedule, excluded_session_keys, excluded_movies, excluded_halls, exclude_rules, ) golden_start_t = opt.parse_hm(runtime_cfg["golden_start"], "14:00") golden_end_t = opt.parse_hm(runtime_cfg["golden_end"], "21:00") today_eff = opt.build_today_efficiency( filtered_today_schedule, bundle["hall_seat_map"], golden_start_t, golden_end_t, ) movie_targets = opt.build_movie_targets( movies=bundle["movies"], today_eff=today_eff, locked_sessions=bundle["locked_sessions"], box_office_data=bundle["box_office_data"], rule12_enabled=bool(runtime_cfg["rule12_enabled"]), ) movie_weights = opt.build_movie_weights(bundle["movies"], movie_targets, bundle["box_office_data"]) tuning_df = opt.build_default_tuning_table( movies=bundle["movies"], movie_targets=movie_targets, today_eff=today_eff, next_day_schedule=bundle["next_day_schedule"], box_office_data=bundle["box_office_data"], efficiency_enabled=bool(runtime_cfg["efficiency_enabled"]), rule12_enabled=bool(runtime_cfg["rule12_enabled"]), daily_delta_cap=int(runtime_cfg.get("eff_daily_delta_cap", 5)), ) bundle["today_schedule_excluded_labels"] = removed_legacy_labels bundle["today_schedule_excluded_keys"] = excluded_session_keys bundle["today_schedule_excluded_movies"] = excluded_movies bundle["today_schedule_excluded_halls"] = excluded_halls bundle["today_schedule_exclude_rules"] = exclude_rules bundle["today_schedule_exclusion_effect"] = exclusion_effect bundle["today_schedule"] = filtered_today_schedule bundle["today_eff"] = today_eff bundle["movie_targets"] = movie_targets bundle["movie_weights"] = movie_weights state["bundle"] = bundle state["tuning_df"] = opt.coerce_tuning_editor_df(tuning_df) msg = ( f"剔除已应用:共剔除 {exclusion_effect['removed_count']}/{exclusion_effect['total_count']} 场," f"影响 {exclusion_effect['affected_movies']} 部影片。" ) return jsonify({"success": True, "bundle": _bundle_payload(state), "effect": _safe_value(exclusion_effect), "message": msg}) @app.post("/api/job/start") def api_job_start() -> Any: body = request.get_json(silent=True) or {} state = _get_user_state() bundle = state.get("bundle") runtime_cfg = state.get("runtime_cfg") or opt.load_config() if not isinstance(bundle, dict): return jsonify({"success": False, "error": "请先加载数据并生成微调约束。"}), 400 rows = list(body.get("tuning_rows") or []) tuning_df = _normalize_tuning_rows(rows) state["tuning_df"] = tuning_df manual_constraints = opt.parse_movie_tuning_constraints(tuning_df) allowed_movies = opt.extract_allowed_movies_from_tuning_df(tuning_df) allowed_movies |= opt.build_locked_movie_policy_set(bundle.get("locked_sessions", [])) payload = opt.build_job_payload( bundle=bundle, runtime_cfg=runtime_cfg, manual_constraints=manual_constraints, allowed_movies=allowed_movies, ) ok, msg = opt.start_background_job(payload) code = 200 if ok else 400 return jsonify({"success": bool(ok), "message": msg, "job_state": _job_state_payload(opt.read_job_state())}), code @app.post("/api/job/control") def api_job_control() -> Any: body = request.get_json(silent=True) or {} action = str(body.get("action") or "").strip().lower() if action == "pause": state = opt.write_job_state(control="pause", message="收到暂停请求") elif action == "resume": state = opt.write_job_state(control="run", status="running", message="任务继续") elif action == "stop": state = opt.write_job_state(control="stop", message="收到停止请求") else: return jsonify({"success": False, "error": "不支持的动作"}), 400 return jsonify({"success": True, "job_state": _job_state_payload(state)}) @app.get("/api/job/state") def api_job_state() -> Any: state = _repair_broken_state() return jsonify({"success": True, "job_state": _job_state_payload(state)}) @app.get("/api/results") def api_results() -> Any: state = _get_user_state() bundle = state.get("bundle") or {} target_str = str(request.args.get("target_str") or bundle.get("target_str") or "") job_state = _repair_broken_state() job_result = opt._read_pickle(opt.JOB_RESULT_FILE, {}) # noqa: SLF001 if not isinstance(job_result, dict): job_result = {} # Failed/stopped summary if ( job_result.get("target_str") == target_str and job_state.get("status") in {"failed", "stopped"} and job_result.get("reject_reason_stats") ): return jsonify( { "success": True, "target_str": target_str, "status": str(job_state.get("status")), "has_results": False, "reject_summary": { "elapsed_seconds": _to_float(job_result.get("elapsed_seconds", 0.0), 0.0), "build_reject": _to_int(job_result.get("build_reject", 0), 0), "rule_reject": _to_int(job_result.get("rule_reject", 0), 0), "reject_reason_stats": _safe_value(job_result.get("reject_reason_stats") or {}), "reject_detail_stats": _safe_value(job_result.get("reject_detail_stats") or {}), }, } ) if not ( isinstance(job_result, dict) and job_result.get("target_str") == target_str and job_state.get("status") == "completed" ): return jsonify( { "success": True, "target_str": target_str, "status": str(job_state.get("status", "idle")), "has_results": False, } ) raw_results = list(job_result.get("results") or []) results = [x for x in (opt.deserialize_candidate(r) for r in raw_results) if x is not None] today_eff = job_result.get("today_eff", pd.DataFrame()) g_st = job_result.get("golden_start_dt") g_et = job_result.get("golden_end_dt") runtime_cfg = job_result.get("runtime_cfg", state.get("runtime_cfg") or opt.load_config()) box_office_data = job_result.get("box_office_data", []) candidates: List[Dict[str, Any]] = [] for idx, cand in enumerate(results): schedule_table = opt.df_schedule_for_display(cand.schedule) summary_df = opt.build_candidate_summary_table(cand.schedule, today_eff, g_st, g_et) log_target_date = datetime.strptime(target_str, "%Y-%m-%d").date() log_text = opt.generate_schedule_check_logs_text( schedule=cand.schedule, target_date=log_target_date, params=runtime_cfg, today_eff=today_eff if isinstance(today_eff, pd.DataFrame) else pd.DataFrame(), box_office_data=box_office_data, ) breakdown = [ {"规则": str(name), "分值": _to_float(delta, 0.0), "说明": str(msg)} for name, delta, msg in (cand.score_breakdown or []) ] candidates.append( { "index": idx, "title": f"方案{idx + 1}|分数 {cand.score:.1f}", "score": _to_float(cand.score, 0.0), "score_breakdown": breakdown, "schedule_table": _df_to_records(schedule_table), "summary_table": _df_to_records(summary_df), "gantt_html": render_gantt_html(cand.schedule, target_str), "log_text": str(log_text or ""), "hard_violations": [str(x) for x in (cand.hard_violations or [])], } ) return jsonify( { "success": True, "target_str": target_str, "status": "completed", "has_results": True, "summary": { "total_feasible": _to_int(job_result.get("all_results_count", 0), 0), "hard_reject": _to_int(job_result.get("hard_reject", 0), 0), "build_reject": _to_int(job_result.get("build_reject", 0), 0), "rule_reject": _to_int(job_result.get("rule_reject", 0), 0), "elapsed_seconds": _to_float(job_result.get("elapsed_seconds", 0.0), 0.0), "locked_count": _to_int(job_result.get("locked_count", 0), 0), "reject_reason_stats": _safe_value(job_result.get("reject_reason_stats") or {}), "reject_detail_stats": _safe_value(job_result.get("reject_detail_stats") or {}), "reject_phase_stats": _safe_value(job_result.get("reject_phase_stats") or {}), "reject_examples": _safe_value(job_result.get("reject_examples") or {}), "movie_targets": _safe_value(job_result.get("movie_targets") or {}), "today_eff_rows": _df_to_records(today_eff if isinstance(today_eff, pd.DataFrame) else pd.DataFrame()), }, "candidates": candidates, } ) if __name__ == "__main__": host = os.getenv("NEXTDAY_OPT_HOST", "0.0.0.0") port = _to_int(os.getenv("NEXTDAY_OPT_PORT", "8502"), 8502) app.run(host=host, port=port, debug=False, threaded=True)