import html import os import re import secrets import sys import threading import uuid from collections import Counter from datetime import date, datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import pandas as pd from flask import Flask, jsonify, render_template, request, session # Ensure project root is importable when this file runs directly. BASE_DIR = Path(__file__).resolve().parent if str(BASE_DIR) not in sys.path: sys.path.insert(0, str(BASE_DIR)) import optimizer_core as opt # noqa: E402 app = Flask( __name__, template_folder=str(Path(__file__).resolve().parent / "templates"), static_folder=str(Path(__file__).resolve().parent / "static"), ) app.secret_key = os.getenv("NEXTDAY_OPTIMIZER_WEB_SECRET", secrets.token_hex(32)) _STATE_LOCK = threading.Lock() _SESSION_STATE: Dict[str, Dict[str, Any]] = {} TUNING_COLUMNS = [ "选中", "影片", "今日场次", "今日黄金场次", "今日全天效率", "今日黄金效率", "最少场次", "最多场次", "固定场次", "最少黄金场次", "最多黄金场次", "最低场次占比", "最高场次占比", ] def _get_sid() -> str: sid = session.get("nextday_optimizer_sid") if not sid: sid = uuid.uuid4().hex session["nextday_optimizer_sid"] = sid return sid def _get_user_state() -> Dict[str, Any]: sid = _get_sid() with _STATE_LOCK: return _SESSION_STATE.setdefault(sid, {}) def _to_bool(v: Any, default: bool = False) -> bool: if isinstance(v, bool): return v if v in (None, ""): return default if isinstance(v, (int, float)): return bool(v) s = str(v).strip().lower() if s in {"1", "true", "yes", "y", "on"}: return True if s in {"0", "false", "no", "n", "off"}: return False return default def _to_int(v: Any, default: int = 0) -> int: try: if v in (None, "", "None"): return int(default) return int(float(v)) except Exception: return int(default) def _to_float(v: Any, default: float = 0.0) -> float: try: if v in (None, "", "None"): return float(default) return float(v) except Exception: return float(default) def _safe_value(v: Any) -> Any: if v is None: return None if isinstance(v, (datetime, date)): return v.isoformat() if isinstance(v, bool): return v if isinstance(v, (int, float, str)): if isinstance(v, float) and (pd.isna(v) or v in (float("inf"), float("-inf"))): return None return v try: if pd.isna(v): return None except Exception: pass if isinstance(v, (list, tuple)): return [_safe_value(x) for x in v] if isinstance(v, dict): return {str(k): _safe_value(val) for k, val in v.items()} return str(v) def _df_to_records(df: Optional[pd.DataFrame]) -> List[Dict[str, Any]]: if df is None or not isinstance(df, pd.DataFrame) or df.empty: return [] out: List[Dict[str, Any]] = [] for _, row in df.iterrows(): item: Dict[str, Any] = {} for col in df.columns: item[str(col)] = _safe_value(row.get(col)) out.append(item) return out def _normalize_tuning_rows(rows: List[Dict[str, Any]]) -> pd.DataFrame: if not rows: return pd.DataFrame(columns=TUNING_COLUMNS) df = pd.DataFrame(rows) for c in TUNING_COLUMNS: if c not in df.columns: df[c] = None df = df[TUNING_COLUMNS] return opt.coerce_tuning_editor_df(df) def _tuning_df_to_rows(df: Optional[pd.DataFrame]) -> List[Dict[str, Any]]: if df is None or df.empty: return [] return _df_to_records(df[TUNING_COLUMNS].copy()) def _build_runtime_cfg(cfg: Dict[str, Any], payload: Dict[str, Any]) -> Dict[str, Any]: raw = payload or {} # maintenance blocks keep list records; parser in opt handles stricter normalization. maintenance_blocks = raw.get("maintenance_blocks") if maintenance_blocks is None: maintenance_blocks = cfg.get("maintenance_blocks", []) widgets = { "business_start": opt.parse_hm(str(raw.get("business_start", cfg.get("business_start", "09:30"))), "09:30"), "business_end": opt.parse_hm(str(raw.get("business_end", cfg.get("business_end", "01:30"))), "01:30"), "turnaround_base": _to_int(raw.get("turnaround_base", cfg.get("turnaround_base", 10)), 10), "golden_start": opt.parse_hm(str(raw.get("golden_start", cfg.get("golden_start", "14:00"))), "14:00"), "golden_end": opt.parse_hm(str(raw.get("golden_end", cfg.get("golden_end", "21:00"))), "21:00"), "efficiency_enabled": _to_bool(raw.get("efficiency_enabled", cfg.get("efficiency_enabled", True)), True), "efficiency_penalty_coef": _to_float(raw.get("efficiency_penalty_coef", cfg.get("efficiency_penalty_coef", 1.0)), 1.0), "eff_daily_delta_cap": _to_int(raw.get("eff_daily_delta_cap", cfg.get("eff_daily_delta_cap", 5)), 5), "rule1_enabled": _to_bool(raw.get("rule1_enabled", cfg.get("rule1_enabled", True)), True), "rule1_gap": _to_int(raw.get("rule1_gap", cfg.get("rule1_gap", 30)), 30), "rule2_enabled": _to_bool(raw.get("rule2_enabled", cfg.get("rule2_enabled", True)), True), "rule2_threshold": _to_int(raw.get("rule2_threshold", cfg.get("rule2_threshold", 4)), 4), "rule2_window_minutes": _to_int(raw.get("rule2_window_minutes", cfg.get("rule2_window_minutes", 30)), 30), "rule2_penalty": _to_float(raw.get("rule2_penalty", cfg.get("rule2_penalty", 15.0)), 15.0), "rule2_exempt_ranges": str(raw.get("rule2_exempt_ranges", ", ".join(cfg.get("rule2_exempt_ranges", [])))), "rule3_enabled": _to_bool(raw.get("rule3_enabled", cfg.get("rule3_enabled", True)), True), "rule3_gap_minutes": _to_int(raw.get("rule3_gap_minutes", cfg.get("rule3_gap_minutes", 30)), 30), "rule3_penalty": _to_float(raw.get("rule3_penalty", cfg.get("rule3_penalty", 12.0)), 12.0), "rule4_enabled": _to_bool(raw.get("rule4_enabled", cfg.get("rule4_enabled", True)), True), "rule4_earliest": opt.parse_hm(str(raw.get("rule4_earliest", cfg.get("rule4_earliest", "10:00"))), "10:00"), "rule4_latest": opt.parse_hm(str(raw.get("rule4_latest", cfg.get("rule4_latest", "22:30"))), "22:30"), "rule9_enabled": _to_bool(raw.get("rule9_enabled", cfg.get("rule9_enabled", True)), True), "rule9_hot_top_n": _to_int(raw.get("rule9_hot_top_n", cfg.get("rule9_hot_top_n", 3)), 3), "rule9_min_ratio": _to_float(raw.get("rule9_min_ratio", cfg.get("rule9_min_ratio", 0.30)), 0.30), "rule9_penalty": _to_float(raw.get("rule9_penalty", cfg.get("rule9_penalty", 20.0)), 20.0), "rule11_enabled": _to_bool(raw.get("rule11_enabled", cfg.get("rule11_enabled", True)), True), "rule11_after_time": opt.parse_hm(str(raw.get("rule11_after_time", cfg.get("rule11_after_time", "22:00"))), "22:00"), "rule11_penalty": _to_float(raw.get("rule11_penalty", cfg.get("rule11_penalty", 30.0)), 30.0), "rule12_enabled": _to_bool(raw.get("rule12_enabled", cfg.get("rule12_enabled", True)), True), "rule12_penalty_each": _to_float(raw.get("rule12_penalty_each", cfg.get("rule12_penalty_each", 25.0)), 25.0), "rule13_enabled": _to_bool(raw.get("rule13_enabled", cfg.get("rule13_enabled", True)), True), "rule13_forbidden_halls": str(raw.get("rule13_forbidden_halls", ",".join(cfg.get("rule13_forbidden_halls", [])))), "tms_allowance": _to_int(raw.get("tms_allowance", cfg.get("tms_allowance", 0)), 0), "maintenance_blocks": maintenance_blocks, "iterations": _to_int(raw.get("iterations", cfg.get("iterations", 300)), 300), "random_seed": _to_int(raw.get("random_seed", cfg.get("random_seed", 20260331)), 20260331), } return opt.build_runtime_config_from_widgets(cfg, widgets) def _runtime_cfg_for_json(cfg: Dict[str, Any]) -> Dict[str, Any]: return { "business_start": str(cfg.get("business_start", "09:30")), "business_end": str(cfg.get("business_end", "01:30")), "turnaround_base": _to_int(cfg.get("turnaround_base", 10), 10), "golden_start": str(cfg.get("golden_start", "14:00")), "golden_end": str(cfg.get("golden_end", "21:00")), "efficiency_enabled": _to_bool(cfg.get("efficiency_enabled", True), True), "efficiency_penalty_coef": _to_float(cfg.get("efficiency_penalty_coef", 1.0), 1.0), "eff_daily_delta_cap": _to_int(cfg.get("eff_daily_delta_cap", 5), 5), "rule1_enabled": _to_bool(cfg.get("rule1_enabled", True), True), "rule1_gap": _to_int(cfg.get("rule1_gap", 30), 30), "rule2_enabled": _to_bool(cfg.get("rule2_enabled", True), True), "rule2_threshold": _to_int(cfg.get("rule2_threshold", 4), 4), "rule2_window_minutes": _to_int(cfg.get("rule2_window_minutes", 30), 30), "rule2_penalty": _to_float(cfg.get("rule2_penalty", 15.0), 15.0), "rule2_exempt_ranges": ", ".join(cfg.get("rule2_exempt_ranges", [])), "rule3_enabled": _to_bool(cfg.get("rule3_enabled", True), True), "rule3_gap_minutes": _to_int(cfg.get("rule3_gap_minutes", 30), 30), "rule3_penalty": _to_float(cfg.get("rule3_penalty", 12.0), 12.0), "rule4_enabled": _to_bool(cfg.get("rule4_enabled", True), True), "rule4_earliest": str(cfg.get("rule4_earliest", "10:00")), "rule4_latest": str(cfg.get("rule4_latest", "22:30")), "rule9_enabled": _to_bool(cfg.get("rule9_enabled", True), True), "rule9_hot_top_n": _to_int(cfg.get("rule9_hot_top_n", 3), 3), "rule9_min_ratio": _to_float(cfg.get("rule9_min_ratio", 0.30), 0.30), "rule9_penalty": _to_float(cfg.get("rule9_penalty", 20.0), 20.0), "rule11_enabled": _to_bool(cfg.get("rule11_enabled", True), True), "rule11_after_time": str(cfg.get("rule11_after_time", "22:00")), "rule11_penalty": _to_float(cfg.get("rule11_penalty", 30.0), 30.0), "rule12_enabled": _to_bool(cfg.get("rule12_enabled", True), True), "rule12_penalty_each": _to_float(cfg.get("rule12_penalty_each", 25.0), 25.0), "rule13_enabled": _to_bool(cfg.get("rule13_enabled", True), True), "rule13_forbidden_halls": ",".join(str(x) for x in cfg.get("rule13_forbidden_halls", ["2", "8", "9"])), "tms_allowance": _to_int(cfg.get("tms_allowance", 0), 0), "maintenance_blocks": list(cfg.get("maintenance_blocks", [])), "iterations": _to_int(cfg.get("iterations", 300), 300), "random_seed": _to_int(cfg.get("random_seed", 20260331), 20260331), } def _base_and_target(base_str: str) -> Tuple[date, date, str, str]: try: base_date = datetime.strptime(base_str, "%Y-%m-%d").date() except Exception: base_date = date.today() target_date = base_date + timedelta(days=1) return base_date, target_date, base_date.strftime("%Y-%m-%d"), target_date.strftime("%Y-%m-%d") def _normalize_string_list(raw: Any) -> List[str]: if not isinstance(raw, list): return [] out: List[str] = [] seen: set[str] = set() for x in raw: v = str(x).strip() if not v or v in seen: continue seen.add(v) out.append(v) return out def _normalize_exclude_rules(raw: Any) -> Dict[str, bool]: obj = raw if isinstance(raw, dict) else {} return { "zero_sales": _to_bool(obj.get("zero_sales", False), False), "early_morning": _to_bool(obj.get("early_morning", False), False), } def _session_start_text(session: Dict[str, Any]) -> str: return str(session.get("showStartTime") or session.get("startTime") or "").strip() def _session_end_text(session: Dict[str, Any]) -> str: return str(session.get("showEndTime") or session.get("endTime") or "").strip() def _session_hall_name(session: Dict[str, Any]) -> str: return str(session.get("hallName") or session.get("hallId") or "").strip() def _session_movie_name(session: Dict[str, Any]) -> str: return str(session.get("movieName") or "").strip() def _session_sold_tickets(session: Dict[str, Any]) -> int: return _to_int( session.get("soldTicketNum", session.get("buyTicketNum", session.get("ticketNum", 0))), 0, ) def _session_box_office(session: Dict[str, Any]) -> float: return _to_float(session.get("soldBoxOffice", 0.0), 0.0) def _hm_to_minutes(text: str) -> Optional[int]: if not text: return None try: t = datetime.strptime(str(text), "%H:%M") except Exception: return None return t.hour * 60 + t.minute def _is_zero_sales_session(session: Dict[str, Any]) -> bool: return _session_sold_tickets(session) <= 0 and _session_box_office(session) <= 0 def _is_early_morning_session(session: Dict[str, Any]) -> bool: mins = _hm_to_minutes(_session_start_text(session)) return mins is not None and mins < 10 * 60 def _session_special_tags(session: Dict[str, Any]) -> List[str]: tags: List[str] = [] if _is_zero_sales_session(session): tags.append("零票零票房") if _is_early_morning_session(session): tags.append("早场") sold = _session_sold_tickets(session) box_office = _session_box_office(session) if sold <= 1 and 0 < box_office <= 20: tags.append("低票低收") return tags def _build_exclusion_options( raw_today_schedule: List[Dict[str, Any]], ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: options: List[Dict[str, Any]] = [] movie_counter: Counter[str] = Counter() hall_counter: Counter[str] = Counter() for idx, session in enumerate(raw_today_schedule): movie = _session_movie_name(session) or "未知影片" hall = _session_hall_name(session) or "未知影厅" start = _session_start_text(session) end = _session_end_text(session) sold = _session_sold_tickets(session) box_office = _session_box_office(session) tags = _session_special_tags(session) tag_text = f" | {'/'.join(tags)}" if tags else "" time_span = f"{start}-{end}" if end else start options.append( { "key": str(idx), "label": f"{time_span} | {hall} | {movie} | 票{sold} 票房{box_office:.0f}{tag_text}", "movie": movie, "hall": hall, "suspected": bool(tags), "legacy_label": opt.session_display_label(session), } ) movie_counter[movie] += 1 hall_counter[hall] += 1 movie_options = [ {"value": name, "label": f"{name}({cnt}场)", "count": int(cnt)} for name, cnt in sorted(movie_counter.items(), key=lambda kv: (-kv[1], kv[0])) ] hall_options = [ {"value": name, "label": f"{name}({cnt}场)", "count": int(cnt)} for name, cnt in sorted(hall_counter.items(), key=lambda kv: (-kv[1], _hall_sort_key(kv[0]))) ] return options, movie_options, hall_options def _apply_exclusion_filters( schedule_list: List[Dict[str, Any]], excluded_session_keys: List[str], excluded_movies: List[str], excluded_halls: List[str], exclude_rules: Dict[str, bool], ) -> Tuple[List[Dict[str, Any]], Dict[str, Any], List[str]]: if not schedule_list: return [], {"total_count": 0, "removed_count": 0, "remaining_count": 0, "affected_movies": 0, "reason_breakdown": {}}, [] key_set = {int(x) for x in excluded_session_keys if str(x).isdigit()} movie_set = set(excluded_movies) hall_set = set(excluded_halls) rules = _normalize_exclude_rules(exclude_rules) filtered: List[Dict[str, Any]] = [] removed_labels: List[str] = [] removed_movies: set[str] = set() reason_counter: Counter[str] = Counter() for idx, session in enumerate(schedule_list): reasons: List[str] = [] movie = _session_movie_name(session) hall = _session_hall_name(session) if idx in key_set: reasons.append("手动场次") if movie and movie in movie_set: reasons.append("按影片剔除") if hall and hall in hall_set: reasons.append("按影厅剔除") if rules["zero_sales"] and _is_zero_sales_session(session): reasons.append("零票零票房") if rules["early_morning"] and _is_early_morning_session(session): reasons.append("早场(10:00前)") if reasons: removed_labels.append(opt.session_display_label(session)) if movie: removed_movies.add(movie) for reason in sorted(set(reasons)): reason_counter[reason] += 1 else: filtered.append(session) effect = { "total_count": len(schedule_list), "removed_count": len(schedule_list) - len(filtered), "remaining_count": len(filtered), "affected_movies": len(removed_movies), "reason_breakdown": dict(reason_counter.most_common()), } return filtered, effect, removed_labels def _bundle_payload(state: Dict[str, Any]) -> Optional[Dict[str, Any]]: bundle = state.get("bundle") if not isinstance(bundle, dict): return None raw_today_schedule = bundle.get("today_schedule_raw") or bundle.get("today_schedule") or [] exclude_options, exclude_movie_options, exclude_hall_options = _build_exclusion_options(raw_today_schedule) key_to_legacy = {item["key"]: item["legacy_label"] for item in exclude_options} legacy_to_keys: Dict[str, List[str]] = {} for item in exclude_options: legacy_to_keys.setdefault(item["legacy_label"], []).append(item["key"]) valid_key_set = {item["key"] for item in exclude_options} valid_movie_set = {item["value"] for item in exclude_movie_options} valid_hall_set = {item["value"] for item in exclude_hall_options} excluded_session_keys = _normalize_string_list(bundle.get("today_schedule_excluded_keys") or []) if not excluded_session_keys: legacy_labels = _normalize_string_list(bundle.get("today_schedule_excluded_labels") or []) mapped: List[str] = [] for label in legacy_labels: mapped.extend(legacy_to_keys.get(label, [])) excluded_session_keys = _normalize_string_list(mapped) excluded_session_keys = [x for x in excluded_session_keys if x in valid_key_set] excluded_movies = [x for x in _normalize_string_list(bundle.get("today_schedule_excluded_movies") or []) if x in valid_movie_set] excluded_halls = [x for x in _normalize_string_list(bundle.get("today_schedule_excluded_halls") or []) if x in valid_hall_set] exclude_rules = _normalize_exclude_rules(bundle.get("today_schedule_exclude_rules") or {}) exclude_effect = bundle.get("today_schedule_exclusion_effect") or { "total_count": len(raw_today_schedule), "removed_count": 0, "remaining_count": len(raw_today_schedule), "affected_movies": 0, "reason_breakdown": {}, } return { "target_str": str(bundle.get("target_str", "")), "today_str": str(bundle.get("today_str", "")), "excluded_labels": [key_to_legacy.get(k, "") for k in excluded_session_keys if key_to_legacy.get(k, "")], "excluded_session_keys": excluded_session_keys, "excluded_movies": excluded_movies, "excluded_halls": excluded_halls, "exclude_rules": exclude_rules, "exclude_options": [{k: v for k, v in item.items() if k != "legacy_label"} for item in exclude_options], "exclude_movie_options": exclude_movie_options, "exclude_hall_options": exclude_hall_options, "exclude_effect": _safe_value(exclude_effect), "movies_count": len(bundle.get("movies", [])), "locked_count": len(bundle.get("locked_sessions", [])), "tuning_rows": _tuning_df_to_rows(state.get("tuning_df")), "today_eff_rows": _df_to_records(bundle.get("today_eff")), } def _repair_broken_state() -> Dict[str, Any]: state = opt.read_job_state() if state.get("status") in {"running", "paused"} and opt._find_live_worker() is None: # noqa: SLF001 if _to_int(state.get("iter_done", 0), 0) < _to_int(state.get("iterations", 0), 0): state = opt.write_job_state(status="failed", control="run", message="后台任务已中断,请重新启动。") return state def _job_state_payload(state: Dict[str, Any]) -> Dict[str, Any]: return { "status": str(state.get("status", "idle")), "control": str(state.get("control", "run")), "job_id": str(state.get("job_id", "")), "started_at": str(state.get("started_at", "")), "ended_at": str(state.get("ended_at", "")), "target_date": str(state.get("target_date", "")), "iterations": _to_int(state.get("iterations", 0), 0), "iter_done": _to_int(state.get("iter_done", 0), 0), "progress": _to_float(state.get("progress", 0.0), 0.0), "elapsed_seconds": _to_float(state.get("elapsed_seconds", 0.0), 0.0), "feasible_count": _to_int(state.get("feasible_count", 0), 0), "hard_reject": _to_int(state.get("hard_reject", 0), 0), "build_reject": _to_int(state.get("build_reject", 0), 0), "rule_reject": _to_int(state.get("rule_reject", 0), 0), "reject_reason_top": _safe_value(state.get("reject_reason_top") or {}), "reject_detail_top": _safe_value(state.get("reject_detail_top") or {}), "message": str(state.get("message", "")), "result_count": _to_int(state.get("result_count", 0), 0), } def _hall_sort_key(hall_name: str) -> Tuple[int, str]: nums = re.findall(r"\d+", str(hall_name)) return (int(nums[0]), str(hall_name)) if nums else (9999, str(hall_name)) GANTT_MOVIE_COLORS = [ "#9D6BFF", "#FF6699", "#FF6666", "#2CD52C", "#99CCFF", "#FFCC99", "#FF6600", "#CCFF99", "#CC9999", "#FF9900", "#2CB5FB", "#CC66CC", "#9981B1", "#FFFF66", "#009999", "#3366CC", "#996633", ] GANTT_TEXT_COLOR = "#000000" def _gantt_color_key(movie_name: Any, movie_num: Any, media_type: Any, movie_language: Any) -> str: movie_num_value = str(movie_num or "").strip() base_key = movie_num_value if movie_num_value else str(movie_name or "").strip() language_value = str(movie_language or "").strip() imagery_value = str(media_type or "").strip() return "||".join([base_key, language_value, imagery_value]) def render_gantt_html(schedule: List[Dict[str, Any]], date_str: str) -> str: if not schedule: return '