nextday / app.py
Ethscriptions's picture
Upload 3 files
c16c1d7 verified
import html
import os
import re
import secrets
import sys
import threading
import uuid
from collections import Counter
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from flask import Flask, jsonify, render_template, request, session
# Ensure project root is importable when this file runs directly.
BASE_DIR = Path(__file__).resolve().parent
if str(BASE_DIR) not in sys.path:
sys.path.insert(0, str(BASE_DIR))
import optimizer_core as opt # noqa: E402
app = Flask(
__name__,
template_folder=str(Path(__file__).resolve().parent / "templates"),
static_folder=str(Path(__file__).resolve().parent / "static"),
)
app.secret_key = os.getenv("NEXTDAY_OPTIMIZER_WEB_SECRET", secrets.token_hex(32))
_STATE_LOCK = threading.Lock()
_SESSION_STATE: Dict[str, Dict[str, Any]] = {}
TUNING_COLUMNS = [
"选中",
"影片",
"今日场次",
"今日黄金场次",
"今日全天效率",
"今日黄金效率",
"最少场次",
"最多场次",
"固定场次",
"最少黄金场次",
"最多黄金场次",
"最低场次占比",
"最高场次占比",
]
def _get_sid() -> str:
sid = session.get("nextday_optimizer_sid")
if not sid:
sid = uuid.uuid4().hex
session["nextday_optimizer_sid"] = sid
return sid
def _get_user_state() -> Dict[str, Any]:
sid = _get_sid()
with _STATE_LOCK:
return _SESSION_STATE.setdefault(sid, {})
def _to_bool(v: Any, default: bool = False) -> bool:
if isinstance(v, bool):
return v
if v in (None, ""):
return default
if isinstance(v, (int, float)):
return bool(v)
s = str(v).strip().lower()
if s in {"1", "true", "yes", "y", "on"}:
return True
if s in {"0", "false", "no", "n", "off"}:
return False
return default
def _to_int(v: Any, default: int = 0) -> int:
try:
if v in (None, "", "None"):
return int(default)
return int(float(v))
except Exception:
return int(default)
def _to_float(v: Any, default: float = 0.0) -> float:
try:
if v in (None, "", "None"):
return float(default)
return float(v)
except Exception:
return float(default)
def _safe_value(v: Any) -> Any:
if v is None:
return None
if isinstance(v, (datetime, date)):
return v.isoformat()
if isinstance(v, bool):
return v
if isinstance(v, (int, float, str)):
if isinstance(v, float) and (pd.isna(v) or v in (float("inf"), float("-inf"))):
return None
return v
try:
if pd.isna(v):
return None
except Exception:
pass
if isinstance(v, (list, tuple)):
return [_safe_value(x) for x in v]
if isinstance(v, dict):
return {str(k): _safe_value(val) for k, val in v.items()}
return str(v)
def _df_to_records(df: Optional[pd.DataFrame]) -> List[Dict[str, Any]]:
if df is None or not isinstance(df, pd.DataFrame) or df.empty:
return []
out: List[Dict[str, Any]] = []
for _, row in df.iterrows():
item: Dict[str, Any] = {}
for col in df.columns:
item[str(col)] = _safe_value(row.get(col))
out.append(item)
return out
def _normalize_tuning_rows(rows: List[Dict[str, Any]]) -> pd.DataFrame:
if not rows:
return pd.DataFrame(columns=TUNING_COLUMNS)
df = pd.DataFrame(rows)
for c in TUNING_COLUMNS:
if c not in df.columns:
df[c] = None
df = df[TUNING_COLUMNS]
return opt.coerce_tuning_editor_df(df)
def _tuning_df_to_rows(df: Optional[pd.DataFrame]) -> List[Dict[str, Any]]:
if df is None or df.empty:
return []
return _df_to_records(df[TUNING_COLUMNS].copy())
def _build_runtime_cfg(cfg: Dict[str, Any], payload: Dict[str, Any]) -> Dict[str, Any]:
raw = payload or {}
# maintenance blocks keep list records; parser in opt handles stricter normalization.
maintenance_blocks = raw.get("maintenance_blocks")
if maintenance_blocks is None:
maintenance_blocks = cfg.get("maintenance_blocks", [])
widgets = {
"business_start": opt.parse_hm(str(raw.get("business_start", cfg.get("business_start", "09:30"))), "09:30"),
"business_end": opt.parse_hm(str(raw.get("business_end", cfg.get("business_end", "01:30"))), "01:30"),
"turnaround_base": _to_int(raw.get("turnaround_base", cfg.get("turnaround_base", 10)), 10),
"golden_start": opt.parse_hm(str(raw.get("golden_start", cfg.get("golden_start", "14:00"))), "14:00"),
"golden_end": opt.parse_hm(str(raw.get("golden_end", cfg.get("golden_end", "21:00"))), "21:00"),
"efficiency_enabled": _to_bool(raw.get("efficiency_enabled", cfg.get("efficiency_enabled", True)), True),
"efficiency_penalty_coef": _to_float(raw.get("efficiency_penalty_coef", cfg.get("efficiency_penalty_coef", 1.0)), 1.0),
"eff_daily_delta_cap": _to_int(raw.get("eff_daily_delta_cap", cfg.get("eff_daily_delta_cap", 5)), 5),
"rule1_enabled": _to_bool(raw.get("rule1_enabled", cfg.get("rule1_enabled", True)), True),
"rule1_gap": _to_int(raw.get("rule1_gap", cfg.get("rule1_gap", 30)), 30),
"rule2_enabled": _to_bool(raw.get("rule2_enabled", cfg.get("rule2_enabled", True)), True),
"rule2_threshold": _to_int(raw.get("rule2_threshold", cfg.get("rule2_threshold", 4)), 4),
"rule2_window_minutes": _to_int(raw.get("rule2_window_minutes", cfg.get("rule2_window_minutes", 30)), 30),
"rule2_penalty": _to_float(raw.get("rule2_penalty", cfg.get("rule2_penalty", 15.0)), 15.0),
"rule2_exempt_ranges": str(raw.get("rule2_exempt_ranges", ", ".join(cfg.get("rule2_exempt_ranges", [])))),
"rule3_enabled": _to_bool(raw.get("rule3_enabled", cfg.get("rule3_enabled", True)), True),
"rule3_gap_minutes": _to_int(raw.get("rule3_gap_minutes", cfg.get("rule3_gap_minutes", 30)), 30),
"rule3_penalty": _to_float(raw.get("rule3_penalty", cfg.get("rule3_penalty", 12.0)), 12.0),
"rule4_enabled": _to_bool(raw.get("rule4_enabled", cfg.get("rule4_enabled", True)), True),
"rule4_earliest": opt.parse_hm(str(raw.get("rule4_earliest", cfg.get("rule4_earliest", "10:00"))), "10:00"),
"rule4_latest": opt.parse_hm(str(raw.get("rule4_latest", cfg.get("rule4_latest", "22:30"))), "22:30"),
"rule9_enabled": _to_bool(raw.get("rule9_enabled", cfg.get("rule9_enabled", True)), True),
"rule9_hot_top_n": _to_int(raw.get("rule9_hot_top_n", cfg.get("rule9_hot_top_n", 3)), 3),
"rule9_min_ratio": _to_float(raw.get("rule9_min_ratio", cfg.get("rule9_min_ratio", 0.30)), 0.30),
"rule9_penalty": _to_float(raw.get("rule9_penalty", cfg.get("rule9_penalty", 20.0)), 20.0),
"rule11_enabled": _to_bool(raw.get("rule11_enabled", cfg.get("rule11_enabled", True)), True),
"rule11_after_time": opt.parse_hm(str(raw.get("rule11_after_time", cfg.get("rule11_after_time", "22:00"))), "22:00"),
"rule11_penalty": _to_float(raw.get("rule11_penalty", cfg.get("rule11_penalty", 30.0)), 30.0),
"rule12_enabled": _to_bool(raw.get("rule12_enabled", cfg.get("rule12_enabled", True)), True),
"rule12_penalty_each": _to_float(raw.get("rule12_penalty_each", cfg.get("rule12_penalty_each", 25.0)), 25.0),
"rule13_enabled": _to_bool(raw.get("rule13_enabled", cfg.get("rule13_enabled", True)), True),
"rule13_forbidden_halls": str(raw.get("rule13_forbidden_halls", ",".join(cfg.get("rule13_forbidden_halls", [])))),
"tms_allowance": _to_int(raw.get("tms_allowance", cfg.get("tms_allowance", 0)), 0),
"maintenance_blocks": maintenance_blocks,
"iterations": _to_int(raw.get("iterations", cfg.get("iterations", 300)), 300),
"random_seed": _to_int(raw.get("random_seed", cfg.get("random_seed", 20260331)), 20260331),
}
return opt.build_runtime_config_from_widgets(cfg, widgets)
def _runtime_cfg_for_json(cfg: Dict[str, Any]) -> Dict[str, Any]:
return {
"business_start": str(cfg.get("business_start", "09:30")),
"business_end": str(cfg.get("business_end", "01:30")),
"turnaround_base": _to_int(cfg.get("turnaround_base", 10), 10),
"golden_start": str(cfg.get("golden_start", "14:00")),
"golden_end": str(cfg.get("golden_end", "21:00")),
"efficiency_enabled": _to_bool(cfg.get("efficiency_enabled", True), True),
"efficiency_penalty_coef": _to_float(cfg.get("efficiency_penalty_coef", 1.0), 1.0),
"eff_daily_delta_cap": _to_int(cfg.get("eff_daily_delta_cap", 5), 5),
"rule1_enabled": _to_bool(cfg.get("rule1_enabled", True), True),
"rule1_gap": _to_int(cfg.get("rule1_gap", 30), 30),
"rule2_enabled": _to_bool(cfg.get("rule2_enabled", True), True),
"rule2_threshold": _to_int(cfg.get("rule2_threshold", 4), 4),
"rule2_window_minutes": _to_int(cfg.get("rule2_window_minutes", 30), 30),
"rule2_penalty": _to_float(cfg.get("rule2_penalty", 15.0), 15.0),
"rule2_exempt_ranges": ", ".join(cfg.get("rule2_exempt_ranges", [])),
"rule3_enabled": _to_bool(cfg.get("rule3_enabled", True), True),
"rule3_gap_minutes": _to_int(cfg.get("rule3_gap_minutes", 30), 30),
"rule3_penalty": _to_float(cfg.get("rule3_penalty", 12.0), 12.0),
"rule4_enabled": _to_bool(cfg.get("rule4_enabled", True), True),
"rule4_earliest": str(cfg.get("rule4_earliest", "10:00")),
"rule4_latest": str(cfg.get("rule4_latest", "22:30")),
"rule9_enabled": _to_bool(cfg.get("rule9_enabled", True), True),
"rule9_hot_top_n": _to_int(cfg.get("rule9_hot_top_n", 3), 3),
"rule9_min_ratio": _to_float(cfg.get("rule9_min_ratio", 0.30), 0.30),
"rule9_penalty": _to_float(cfg.get("rule9_penalty", 20.0), 20.0),
"rule11_enabled": _to_bool(cfg.get("rule11_enabled", True), True),
"rule11_after_time": str(cfg.get("rule11_after_time", "22:00")),
"rule11_penalty": _to_float(cfg.get("rule11_penalty", 30.0), 30.0),
"rule12_enabled": _to_bool(cfg.get("rule12_enabled", True), True),
"rule12_penalty_each": _to_float(cfg.get("rule12_penalty_each", 25.0), 25.0),
"rule13_enabled": _to_bool(cfg.get("rule13_enabled", True), True),
"rule13_forbidden_halls": ",".join(str(x) for x in cfg.get("rule13_forbidden_halls", ["2", "8", "9"])),
"tms_allowance": _to_int(cfg.get("tms_allowance", 0), 0),
"maintenance_blocks": list(cfg.get("maintenance_blocks", [])),
"iterations": _to_int(cfg.get("iterations", 300), 300),
"random_seed": _to_int(cfg.get("random_seed", 20260331), 20260331),
}
def _base_and_target(base_str: str) -> Tuple[date, date, str, str]:
try:
base_date = datetime.strptime(base_str, "%Y-%m-%d").date()
except Exception:
base_date = date.today()
target_date = base_date + timedelta(days=1)
return base_date, target_date, base_date.strftime("%Y-%m-%d"), target_date.strftime("%Y-%m-%d")
def _normalize_string_list(raw: Any) -> List[str]:
if not isinstance(raw, list):
return []
out: List[str] = []
seen: set[str] = set()
for x in raw:
v = str(x).strip()
if not v or v in seen:
continue
seen.add(v)
out.append(v)
return out
def _normalize_exclude_rules(raw: Any) -> Dict[str, bool]:
obj = raw if isinstance(raw, dict) else {}
return {
"zero_sales": _to_bool(obj.get("zero_sales", False), False),
"early_morning": _to_bool(obj.get("early_morning", False), False),
}
def _session_start_text(session: Dict[str, Any]) -> str:
return str(session.get("showStartTime") or session.get("startTime") or "").strip()
def _session_end_text(session: Dict[str, Any]) -> str:
return str(session.get("showEndTime") or session.get("endTime") or "").strip()
def _session_hall_name(session: Dict[str, Any]) -> str:
return str(session.get("hallName") or session.get("hallId") or "").strip()
def _session_movie_name(session: Dict[str, Any]) -> str:
return str(session.get("movieName") or "").strip()
def _session_sold_tickets(session: Dict[str, Any]) -> int:
return _to_int(
session.get("soldTicketNum", session.get("buyTicketNum", session.get("ticketNum", 0))),
0,
)
def _session_box_office(session: Dict[str, Any]) -> float:
return _to_float(session.get("soldBoxOffice", 0.0), 0.0)
def _hm_to_minutes(text: str) -> Optional[int]:
if not text:
return None
try:
t = datetime.strptime(str(text), "%H:%M")
except Exception:
return None
return t.hour * 60 + t.minute
def _is_zero_sales_session(session: Dict[str, Any]) -> bool:
return _session_sold_tickets(session) <= 0 and _session_box_office(session) <= 0
def _is_early_morning_session(session: Dict[str, Any]) -> bool:
mins = _hm_to_minutes(_session_start_text(session))
return mins is not None and mins < 10 * 60
def _session_special_tags(session: Dict[str, Any]) -> List[str]:
tags: List[str] = []
if _is_zero_sales_session(session):
tags.append("零票零票房")
if _is_early_morning_session(session):
tags.append("早场")
sold = _session_sold_tickets(session)
box_office = _session_box_office(session)
if sold <= 1 and 0 < box_office <= 20:
tags.append("低票低收")
return tags
def _build_exclusion_options(
raw_today_schedule: List[Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
options: List[Dict[str, Any]] = []
movie_counter: Counter[str] = Counter()
hall_counter: Counter[str] = Counter()
for idx, session in enumerate(raw_today_schedule):
movie = _session_movie_name(session) or "未知影片"
hall = _session_hall_name(session) or "未知影厅"
start = _session_start_text(session)
end = _session_end_text(session)
sold = _session_sold_tickets(session)
box_office = _session_box_office(session)
tags = _session_special_tags(session)
tag_text = f" | {'/'.join(tags)}" if tags else ""
time_span = f"{start}-{end}" if end else start
options.append(
{
"key": str(idx),
"label": f"{time_span} | {hall} | {movie} | 票{sold} 票房{box_office:.0f}{tag_text}",
"movie": movie,
"hall": hall,
"suspected": bool(tags),
"legacy_label": opt.session_display_label(session),
}
)
movie_counter[movie] += 1
hall_counter[hall] += 1
movie_options = [
{"value": name, "label": f"{name}{cnt}场)", "count": int(cnt)}
for name, cnt in sorted(movie_counter.items(), key=lambda kv: (-kv[1], kv[0]))
]
hall_options = [
{"value": name, "label": f"{name}{cnt}场)", "count": int(cnt)}
for name, cnt in sorted(hall_counter.items(), key=lambda kv: (-kv[1], _hall_sort_key(kv[0])))
]
return options, movie_options, hall_options
def _apply_exclusion_filters(
schedule_list: List[Dict[str, Any]],
excluded_session_keys: List[str],
excluded_movies: List[str],
excluded_halls: List[str],
exclude_rules: Dict[str, bool],
) -> Tuple[List[Dict[str, Any]], Dict[str, Any], List[str]]:
if not schedule_list:
return [], {"total_count": 0, "removed_count": 0, "remaining_count": 0, "affected_movies": 0, "reason_breakdown": {}}, []
key_set = {int(x) for x in excluded_session_keys if str(x).isdigit()}
movie_set = set(excluded_movies)
hall_set = set(excluded_halls)
rules = _normalize_exclude_rules(exclude_rules)
filtered: List[Dict[str, Any]] = []
removed_labels: List[str] = []
removed_movies: set[str] = set()
reason_counter: Counter[str] = Counter()
for idx, session in enumerate(schedule_list):
reasons: List[str] = []
movie = _session_movie_name(session)
hall = _session_hall_name(session)
if idx in key_set:
reasons.append("手动场次")
if movie and movie in movie_set:
reasons.append("按影片剔除")
if hall and hall in hall_set:
reasons.append("按影厅剔除")
if rules["zero_sales"] and _is_zero_sales_session(session):
reasons.append("零票零票房")
if rules["early_morning"] and _is_early_morning_session(session):
reasons.append("早场(10:00前)")
if reasons:
removed_labels.append(opt.session_display_label(session))
if movie:
removed_movies.add(movie)
for reason in sorted(set(reasons)):
reason_counter[reason] += 1
else:
filtered.append(session)
effect = {
"total_count": len(schedule_list),
"removed_count": len(schedule_list) - len(filtered),
"remaining_count": len(filtered),
"affected_movies": len(removed_movies),
"reason_breakdown": dict(reason_counter.most_common()),
}
return filtered, effect, removed_labels
def _bundle_payload(state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
bundle = state.get("bundle")
if not isinstance(bundle, dict):
return None
raw_today_schedule = bundle.get("today_schedule_raw") or bundle.get("today_schedule") or []
exclude_options, exclude_movie_options, exclude_hall_options = _build_exclusion_options(raw_today_schedule)
key_to_legacy = {item["key"]: item["legacy_label"] for item in exclude_options}
legacy_to_keys: Dict[str, List[str]] = {}
for item in exclude_options:
legacy_to_keys.setdefault(item["legacy_label"], []).append(item["key"])
valid_key_set = {item["key"] for item in exclude_options}
valid_movie_set = {item["value"] for item in exclude_movie_options}
valid_hall_set = {item["value"] for item in exclude_hall_options}
excluded_session_keys = _normalize_string_list(bundle.get("today_schedule_excluded_keys") or [])
if not excluded_session_keys:
legacy_labels = _normalize_string_list(bundle.get("today_schedule_excluded_labels") or [])
mapped: List[str] = []
for label in legacy_labels:
mapped.extend(legacy_to_keys.get(label, []))
excluded_session_keys = _normalize_string_list(mapped)
excluded_session_keys = [x for x in excluded_session_keys if x in valid_key_set]
excluded_movies = [x for x in _normalize_string_list(bundle.get("today_schedule_excluded_movies") or []) if x in valid_movie_set]
excluded_halls = [x for x in _normalize_string_list(bundle.get("today_schedule_excluded_halls") or []) if x in valid_hall_set]
exclude_rules = _normalize_exclude_rules(bundle.get("today_schedule_exclude_rules") or {})
exclude_effect = bundle.get("today_schedule_exclusion_effect") or {
"total_count": len(raw_today_schedule),
"removed_count": 0,
"remaining_count": len(raw_today_schedule),
"affected_movies": 0,
"reason_breakdown": {},
}
return {
"target_str": str(bundle.get("target_str", "")),
"today_str": str(bundle.get("today_str", "")),
"excluded_labels": [key_to_legacy.get(k, "") for k in excluded_session_keys if key_to_legacy.get(k, "")],
"excluded_session_keys": excluded_session_keys,
"excluded_movies": excluded_movies,
"excluded_halls": excluded_halls,
"exclude_rules": exclude_rules,
"exclude_options": [{k: v for k, v in item.items() if k != "legacy_label"} for item in exclude_options],
"exclude_movie_options": exclude_movie_options,
"exclude_hall_options": exclude_hall_options,
"exclude_effect": _safe_value(exclude_effect),
"movies_count": len(bundle.get("movies", [])),
"locked_count": len(bundle.get("locked_sessions", [])),
"tuning_rows": _tuning_df_to_rows(state.get("tuning_df")),
"today_eff_rows": _df_to_records(bundle.get("today_eff")),
}
def _repair_broken_state() -> Dict[str, Any]:
state = opt.read_job_state()
if state.get("status") in {"running", "paused"} and opt._find_live_worker() is None: # noqa: SLF001
if _to_int(state.get("iter_done", 0), 0) < _to_int(state.get("iterations", 0), 0):
state = opt.write_job_state(status="failed", control="run", message="后台任务已中断,请重新启动。")
return state
def _job_state_payload(state: Dict[str, Any]) -> Dict[str, Any]:
return {
"status": str(state.get("status", "idle")),
"control": str(state.get("control", "run")),
"job_id": str(state.get("job_id", "")),
"started_at": str(state.get("started_at", "")),
"ended_at": str(state.get("ended_at", "")),
"target_date": str(state.get("target_date", "")),
"iterations": _to_int(state.get("iterations", 0), 0),
"iter_done": _to_int(state.get("iter_done", 0), 0),
"progress": _to_float(state.get("progress", 0.0), 0.0),
"elapsed_seconds": _to_float(state.get("elapsed_seconds", 0.0), 0.0),
"feasible_count": _to_int(state.get("feasible_count", 0), 0),
"hard_reject": _to_int(state.get("hard_reject", 0), 0),
"build_reject": _to_int(state.get("build_reject", 0), 0),
"rule_reject": _to_int(state.get("rule_reject", 0), 0),
"reject_reason_top": _safe_value(state.get("reject_reason_top") or {}),
"reject_detail_top": _safe_value(state.get("reject_detail_top") or {}),
"message": str(state.get("message", "")),
"result_count": _to_int(state.get("result_count", 0), 0),
}
def _hall_sort_key(hall_name: str) -> Tuple[int, str]:
nums = re.findall(r"\d+", str(hall_name))
return (int(nums[0]), str(hall_name)) if nums else (9999, str(hall_name))
GANTT_MOVIE_COLORS = [
"#9D6BFF", "#FF6699", "#FF6666", "#2CD52C", "#99CCFF", "#FFCC99",
"#FF6600", "#CCFF99", "#CC9999", "#FF9900", "#2CB5FB", "#CC66CC",
"#9981B1", "#FFFF66", "#009999", "#3366CC", "#996633",
]
GANTT_TEXT_COLOR = "#000000"
def _gantt_color_key(movie_name: Any, movie_num: Any, media_type: Any, movie_language: Any) -> str:
movie_num_value = str(movie_num or "").strip()
base_key = movie_num_value if movie_num_value else str(movie_name or "").strip()
language_value = str(movie_language or "").strip()
imagery_value = str(media_type or "").strip()
return "||".join([base_key, language_value, imagery_value])
def render_gantt_html(schedule: List[Dict[str, Any]], date_str: str) -> str:
if not schedule:
return '<div class="empty-msg">无排片数据</div>'
df = pd.DataFrame(schedule).copy()
if df.empty:
return '<div class="empty-msg">无排片数据</div>'
df["startTime"] = pd.to_datetime(df["startTime"], errors="coerce")
df["endTime"] = pd.to_datetime(df["endTime"], errors="coerce")
df = df.dropna(subset=["hallName", "movieName", "startTime", "endTime"]).copy()
overnight_mask = df["endTime"] <= df["startTime"]
if overnight_mask.any():
df.loc[overnight_mask, "endTime"] = df.loc[overnight_mask, "endTime"] + timedelta(days=1)
if df.empty:
return '<div class="empty-msg">无有效排片数据</div>'
df["movieColorKey"] = df.apply(
lambda r: _gantt_color_key(
r.get("movieName"),
r.get("movieNum"),
r.get("movieMediaType"),
r.get("movieLanguage"),
),
axis=1,
)
hall_order = sorted(df["hallName"].astype(str).unique().tolist(), key=_hall_sort_key)
t_min = df["startTime"].min().replace(minute=0, second=0, microsecond=0)
t_max = (df["endTime"].max() + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
total_minutes = max(60.0, (t_max - t_min).total_seconds() / 60.0)
total_hours = max(1, int((t_max - t_min).total_seconds() / 3600))
color_keys = sorted(df["movieColorKey"].astype(str).unique().tolist())
color_map = {k: GANTT_MOVIE_COLORS[i % len(GANTT_MOVIE_COLORS)] for i, k in enumerate(color_keys)}
legend_map: Dict[str, str] = {}
for _, r in df.iterrows():
key = str(r.get("movieColorKey") or "")
film_name = str(r.get("movieName") or "")
media_type = str(r.get("movieMediaType") or "").strip()
if key not in legend_map:
legend_map[key] = f"{film_name} {media_type}".strip()
legend_items = [
f'<span class="gantt-legend-item"><i style="background:{color_map.get(k, "#778899")}"></i>{html.escape(legend_map.get(k, k))}</span>'
for k in color_keys[:16]
]
if len(color_keys) > 16:
legend_items.append(f'<span class="gantt-legend-more">+{len(color_keys) - 16} 个制式/影片</span>')
legend_html = "".join(legend_items)
labels = []
for i in range(total_hours + 1):
labels.append(f'<div class="gantt-time-label">{(t_min + timedelta(hours=i)).strftime("%H:%M")}</div>')
time_labels_html = "".join(labels)
halls_html = ""
for hall in hall_order:
row_df = df[df["hallName"].astype(str) == hall].sort_values("startTime")
blocks = ""
for _, r in row_df.iterrows():
start = r["startTime"]
end = r["endTime"]
left = ((start - t_min).total_seconds() / 60.0 / total_minutes) * 100.0
width = ((end - start).total_seconds() / 60.0 / total_minutes) * 100.0
if left < 0:
width += left
left = 0
width = max(0.4, width)
if left + width > 100:
width = max(0.4, 100 - left)
movie_name = str(r.get("movieName") or "")
media_type = str(r.get("movieMediaType") or "").strip()
film_title = f"{movie_name} {media_type}".strip()
hall_name = str(r.get("hallName") or "")
sold = _to_int(r.get("sold", r.get("soldTicketNum", 0)), 0)
duration_min = int((end - start).total_seconds() / 60)
tooltip = html.escape(
f"{film_title}\n{hall_name}\n{start.strftime('%H:%M')} - {end.strftime('%H:%M')} ({duration_min}min)\n已售:{sold}",
quote=True,
).replace("\n", "&#10;")
lock_tag = '<span class="gantt-lock">已售锁定</span>' if _to_bool(r.get("is_presold", False), False) else ""
blocks += (
f'<div class="gantt-block" style="left:{left:.3f}%;width:{width:.3f}%;'
f'background-color:{color_map.get(str(r.get("movieColorKey") or ""), "#778899")};" title="{tooltip}">'
f'<div class="gantt-film">{html.escape(movie_name)}</div>'
f'<div class="gantt-meta">{start.strftime("%H:%M")}-{end.strftime("%H:%M")} · {duration_min}m</div>'
f"{lock_tag}"
"</div>"
)
halls_html += (
f'<div class="gantt-hall-row"><div class="gantt-hall-name">{html.escape(str(hall))}</div>'
f'<div class="gantt-timeline">{blocks}</div></div>'
)
dt_obj = datetime.strptime(date_str, "%Y-%m-%d")
weekdays = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
date_display = f"{dt_obj.strftime('%Y.%m.%d')} {weekdays[dt_obj.weekday()]} · 场次 {len(df)}"
half_hour_grid_size = 100 / max(1, total_hours * 2)
min_width = max(1320, int(total_hours * 96))
return f"""
<div class="gantt-scroll-wrapper">
<div class="gantt-container" style="min-width:{min_width}px">
<div class="gantt-header">
<div class="gantt-title">{html.escape(date_display)}</div>
<div class="gantt-legend">{legend_html}</div>
</div>
<div class="gantt-grid">
<div class="gantt-corner"></div>
<div class="gantt-time-axis">{time_labels_html}</div>
{halls_html}
</div>
</div>
</div>
<style>
.gantt-scroll-wrapper {{width:100%;overflow-x:auto;border:1px solid #dbe2ec;border-radius:10px;margin-bottom:1rem;background:#fff;}}
.gantt-container {{font-family:"PingFang SC","Noto Sans CJK SC","Microsoft YaHei",sans-serif;background:#fff;}}
.gantt-header {{display:flex;justify-content:space-between;gap:12px;align-items:flex-start;padding:10px 14px;background:#f7fbff;border-bottom:1px solid #dee6f2;}}
.gantt-title {{font-size:14px;font-weight:700;color:#173b72;white-space:nowrap;}}
.gantt-legend {{display:flex;gap:8px;flex-wrap:wrap;justify-content:flex-end;}}
.gantt-legend-item {{display:inline-flex;align-items:center;gap:4px;padding:2px 8px;border:1px solid #d8e3f0;border-radius:999px;font-size:11px;color:#305275;background:#fff;max-width:280px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap;}}
.gantt-legend-item i {{display:inline-block;width:9px;height:9px;border-radius:2px;flex:0 0 9px;}}
.gantt-legend-more {{font-size:11px;color:#4e6786;align-self:center;}}
.gantt-grid {{display:grid;grid-template-columns:146px 1fr;}}
.gantt-corner {{grid-column:1;grid-row:1;border-bottom:1px solid #dee6f2;border-right:1px solid #dee6f2;background:#f7fbff;}}
.gantt-time-axis {{grid-column:2;display:flex;background:#f7fbff;border-bottom:1px solid #dee6f2;}}
.gantt-time-label {{flex:1;text-align:center;padding:8px 0;font-size:12px;color:#4f6684;border-left:1px solid #d3dbe7;}}
.gantt-hall-row {{display:contents;}}
.gantt-hall-name {{grid-column:1;padding:10px 8px;font-size:13px;font-weight:600;border-right:1px solid #e3eaf5;border-top:1px solid #e3eaf5;background:#fbfdff;text-align:center;display:flex;align-items:center;justify-content:center;line-height:1.2;}}
.gantt-timeline {{grid-column:2;position:relative;border-top:1px solid #e3eaf5;background-image:linear-gradient(to right,#eef3fa 1px,transparent 1px),linear-gradient(to right,#d7dfeb 1px,transparent 1px);background-size:{half_hour_grid_size}% 100%, {100 / max(1, total_hours)}% 100%;min-height:68px;}}
.gantt-block {{position:absolute;top:5px;bottom:5px;border-radius:6px;padding:5px 8px;color:{GANTT_TEXT_COLOR};overflow:hidden;display:flex;flex-direction:column;justify-content:center;align-items:flex-start;box-sizing:border-box;border:1px solid rgba(0,0,0,0.16);box-shadow:0 1px 2px rgba(0,0,0,0.08);}}
.gantt-film,.gantt-meta {{width:100%;white-space:nowrap;overflow:hidden;text-overflow:ellipsis;}}
.gantt-film {{font-weight:700;font-size:12px;line-height:1.2;}}
.gantt-meta {{font-size:11px;opacity:0.9;line-height:1.2;margin-top:2px;}}
.gantt-lock {{display:inline-block;margin-top:2px;padding:1px 5px;border-radius:10px;font-size:10px;background:rgba(255,255,255,0.55);border:1px solid rgba(0,0,0,0.15);}}
.empty-msg {{padding:12px;color:#666;}}
</style>
"""
@app.route("/")
def index() -> str:
return render_template("index.html")
@app.get("/api/session")
def api_session() -> Any:
cfg = opt.load_config()
state = _get_user_state()
today = date.today()
bundle = state.get("bundle")
if isinstance(bundle, dict) and bundle.get("today_str"):
try:
base_date = datetime.strptime(str(bundle.get("today_str")), "%Y-%m-%d").date()
except Exception:
base_date = today
else:
base_date = today
_, _, base_str, target_str = _base_and_target(base_date.strftime("%Y-%m-%d"))
job_state = _repair_broken_state()
return jsonify(
{
"success": True,
"base_date": base_str,
"target_date": target_str,
"runtime_cfg": _runtime_cfg_for_json(state.get("runtime_cfg") or cfg),
"bundle": _bundle_payload(state),
"job_state": _job_state_payload(job_state),
}
)
@app.post("/api/config/save")
def api_config_save() -> Any:
body = request.get_json(silent=True) or {}
cfg = opt.load_config()
runtime_cfg = _build_runtime_cfg(cfg, body.get("runtime_cfg") or {})
opt.save_config(runtime_cfg)
state = _get_user_state()
state["runtime_cfg"] = runtime_cfg
return jsonify({"success": True, "runtime_cfg": _runtime_cfg_for_json(runtime_cfg)})
@app.post("/api/config/reset")
def api_config_reset() -> Any:
opt.save_config(dict(opt.DEFAULT_CONFIG))
cfg = opt.load_config()
state = _get_user_state()
state["runtime_cfg"] = cfg
return jsonify({"success": True, "runtime_cfg": _runtime_cfg_for_json(cfg)})
@app.post("/api/load-data")
def api_load_data() -> Any:
body = request.get_json(silent=True) or {}
base_date, target_date, today_str, target_str = _base_and_target(str(body.get("base_date", date.today().strftime("%Y-%m-%d"))))
current_cfg = opt.load_config()
runtime_cfg = _build_runtime_cfg(current_cfg, body.get("runtime_cfg") or {})
opt.save_config(runtime_cfg)
next_day_schedule, hall_seat_map, err_next = opt.fetch_schedule_and_halls(target_str)
today_schedule, _, err_today = opt.fetch_schedule_and_halls(today_str)
if err_next:
return jsonify({"success": False, "error": f"次日排片拉取失败:{err_next}"}), 400
if err_today:
return jsonify({"success": False, "error": f"当日排片拉取失败:{err_today}"}), 400
hall_name_map = opt.build_hall_name_map(next_day_schedule, hall_seat_map)
locked_sessions = opt.build_locked_sessions(next_day_schedule, target_date)
movies = opt.fetch_movie_info_for_date(target_str)
if not movies:
return jsonify({"success": False, "error": "getMovieInfo 接口未返回可放映电影,无法生成排片。"}), 400
movies = opt.dedupe_movies_by_policy_key(movies)
preview_windows_by_identity = opt.build_preview_windows_for_movies(target_date, movies)
blockouts = opt.parse_blockouts_from_config(target_date, runtime_cfg.get("maintenance_blocks", []))
blockouts_by_hall = opt.build_hall_blockouts(blockouts, hall_name_map)
biz_start_t = opt.parse_hm(runtime_cfg["business_start"], "09:30")
biz_end_t = opt.parse_hm(runtime_cfg["business_end"], "01:30")
golden_start_t = opt.parse_hm(runtime_cfg["golden_start"], "14:00")
golden_end_t = opt.parse_hm(runtime_cfg["golden_end"], "21:00")
biz_start_dt = opt.parse_operating_dt(target_date, biz_start_t)
biz_end_dt = opt.parse_operating_dt(target_date, biz_end_t)
if biz_end_dt <= biz_start_dt:
biz_end_dt += timedelta(days=1)
golden_start_dt = opt.parse_operating_dt(target_date, golden_start_t)
golden_end_dt = opt.parse_operating_dt(target_date, golden_end_t)
if golden_end_dt < golden_start_dt:
golden_end_dt += timedelta(days=1)
box_office_data = opt.fetch_realtime_box_office(target_str)
if not box_office_data:
box_office_data = opt.fetch_realtime_box_office(today_str)
tms_rows = opt.fetch_tms_server_movies_raw()
tms_by_hall = opt.build_tms_index_by_hall(tms_rows)
today_eff = opt.build_today_efficiency(today_schedule, hall_seat_map, golden_start_t, golden_end_t)
movie_targets = opt.build_movie_targets(
movies=movies,
today_eff=today_eff,
locked_sessions=locked_sessions,
box_office_data=box_office_data,
rule12_enabled=bool(runtime_cfg["rule12_enabled"]),
)
movie_weights = opt.build_movie_weights(movies, movie_targets, box_office_data)
tuning_df = opt.build_default_tuning_table(
movies=movies,
movie_targets=movie_targets,
today_eff=today_eff,
next_day_schedule=next_day_schedule,
box_office_data=box_office_data,
efficiency_enabled=bool(runtime_cfg["efficiency_enabled"]),
rule12_enabled=bool(runtime_cfg["rule12_enabled"]),
daily_delta_cap=int(runtime_cfg.get("eff_daily_delta_cap", 5)),
)
state = _get_user_state()
state["runtime_cfg"] = runtime_cfg
state["bundle"] = {
"target_date": target_date,
"target_str": target_str,
"today_str": today_str,
"next_day_schedule": next_day_schedule,
"today_schedule_raw": list(today_schedule),
"today_schedule_excluded_labels": [],
"today_schedule_excluded_keys": [],
"today_schedule_excluded_movies": [],
"today_schedule_excluded_halls": [],
"today_schedule_exclude_rules": {"zero_sales": False, "early_morning": False},
"today_schedule_exclusion_effect": {
"total_count": len(today_schedule),
"removed_count": 0,
"remaining_count": len(today_schedule),
"affected_movies": 0,
"reason_breakdown": {},
},
"today_schedule": today_schedule,
"hall_seat_map": hall_seat_map,
"hall_name_map": hall_name_map,
"locked_sessions": locked_sessions,
"movies": movies,
"preview_windows_by_identity": preview_windows_by_identity,
"blockouts_by_hall": blockouts_by_hall,
"biz_start_dt": biz_start_dt,
"biz_end_dt": biz_end_dt,
"golden_start_dt": golden_start_dt,
"golden_end_dt": golden_end_dt,
"today_eff": today_eff,
"movie_targets": movie_targets,
"movie_weights": movie_weights,
"box_office_data": box_office_data,
"tms_by_hall": tms_by_hall,
}
state["tuning_df"] = opt.coerce_tuning_editor_df(tuning_df)
return jsonify(
{
"success": True,
"runtime_cfg": _runtime_cfg_for_json(runtime_cfg),
"bundle": _bundle_payload(state),
"message": f"数据加载完成,目标日期 {target_str}。",
}
)
@app.post("/api/update-exclusions")
def api_update_exclusions() -> Any:
body = request.get_json(silent=True) or {}
labels = _normalize_string_list(body.get("excluded_labels") or [])
excluded_session_keys = _normalize_string_list(body.get("excluded_session_keys") or [])
excluded_movies = _normalize_string_list(body.get("excluded_movies") or [])
excluded_halls = _normalize_string_list(body.get("excluded_halls") or [])
exclude_rules = _normalize_exclude_rules(body.get("exclude_rules") or {})
state = _get_user_state()
bundle = state.get("bundle")
runtime_cfg = state.get("runtime_cfg") or opt.load_config()
if not isinstance(bundle, dict):
return jsonify({"success": False, "error": "请先加载数据。"}), 400
raw_today_schedule = bundle.get("today_schedule_raw") or bundle.get("today_schedule") or []
option_rows, movie_rows, hall_rows = _build_exclusion_options(raw_today_schedule)
valid_key_set = {str(item["key"]) for item in option_rows}
valid_movie_set = {str(item["value"]) for item in movie_rows}
valid_hall_set = {str(item["value"]) for item in hall_rows}
if labels and not excluded_session_keys:
legacy_to_keys: Dict[str, List[str]] = {}
for item in option_rows:
legacy_to_keys.setdefault(str(item["legacy_label"]), []).append(str(item["key"]))
mapped_keys: List[str] = []
for label in labels:
mapped_keys.extend(legacy_to_keys.get(label, []))
excluded_session_keys = _normalize_string_list(mapped_keys)
excluded_session_keys = [x for x in excluded_session_keys if x in valid_key_set]
excluded_movies = [x for x in excluded_movies if x in valid_movie_set]
excluded_halls = [x for x in excluded_halls if x in valid_hall_set]
filtered_today_schedule, exclusion_effect, removed_legacy_labels = _apply_exclusion_filters(
raw_today_schedule,
excluded_session_keys,
excluded_movies,
excluded_halls,
exclude_rules,
)
golden_start_t = opt.parse_hm(runtime_cfg["golden_start"], "14:00")
golden_end_t = opt.parse_hm(runtime_cfg["golden_end"], "21:00")
today_eff = opt.build_today_efficiency(
filtered_today_schedule,
bundle["hall_seat_map"],
golden_start_t,
golden_end_t,
)
movie_targets = opt.build_movie_targets(
movies=bundle["movies"],
today_eff=today_eff,
locked_sessions=bundle["locked_sessions"],
box_office_data=bundle["box_office_data"],
rule12_enabled=bool(runtime_cfg["rule12_enabled"]),
)
movie_weights = opt.build_movie_weights(bundle["movies"], movie_targets, bundle["box_office_data"])
tuning_df = opt.build_default_tuning_table(
movies=bundle["movies"],
movie_targets=movie_targets,
today_eff=today_eff,
next_day_schedule=bundle["next_day_schedule"],
box_office_data=bundle["box_office_data"],
efficiency_enabled=bool(runtime_cfg["efficiency_enabled"]),
rule12_enabled=bool(runtime_cfg["rule12_enabled"]),
daily_delta_cap=int(runtime_cfg.get("eff_daily_delta_cap", 5)),
)
bundle["today_schedule_excluded_labels"] = removed_legacy_labels
bundle["today_schedule_excluded_keys"] = excluded_session_keys
bundle["today_schedule_excluded_movies"] = excluded_movies
bundle["today_schedule_excluded_halls"] = excluded_halls
bundle["today_schedule_exclude_rules"] = exclude_rules
bundle["today_schedule_exclusion_effect"] = exclusion_effect
bundle["today_schedule"] = filtered_today_schedule
bundle["today_eff"] = today_eff
bundle["movie_targets"] = movie_targets
bundle["movie_weights"] = movie_weights
state["bundle"] = bundle
state["tuning_df"] = opt.coerce_tuning_editor_df(tuning_df)
msg = (
f"剔除已应用:共剔除 {exclusion_effect['removed_count']}/{exclusion_effect['total_count']} 场,"
f"影响 {exclusion_effect['affected_movies']} 部影片。"
)
return jsonify({"success": True, "bundle": _bundle_payload(state), "effect": _safe_value(exclusion_effect), "message": msg})
@app.post("/api/job/start")
def api_job_start() -> Any:
body = request.get_json(silent=True) or {}
state = _get_user_state()
bundle = state.get("bundle")
runtime_cfg = state.get("runtime_cfg") or opt.load_config()
if not isinstance(bundle, dict):
return jsonify({"success": False, "error": "请先加载数据并生成微调约束。"}), 400
rows = list(body.get("tuning_rows") or [])
tuning_df = _normalize_tuning_rows(rows)
state["tuning_df"] = tuning_df
manual_constraints = opt.parse_movie_tuning_constraints(tuning_df)
allowed_movies = opt.extract_allowed_movies_from_tuning_df(tuning_df)
allowed_movies |= opt.build_locked_movie_policy_set(bundle.get("locked_sessions", []))
payload = opt.build_job_payload(
bundle=bundle,
runtime_cfg=runtime_cfg,
manual_constraints=manual_constraints,
allowed_movies=allowed_movies,
)
ok, msg = opt.start_background_job(payload)
code = 200 if ok else 400
return jsonify({"success": bool(ok), "message": msg, "job_state": _job_state_payload(opt.read_job_state())}), code
@app.post("/api/job/control")
def api_job_control() -> Any:
body = request.get_json(silent=True) or {}
action = str(body.get("action") or "").strip().lower()
if action == "pause":
state = opt.write_job_state(control="pause", message="收到暂停请求")
elif action == "resume":
state = opt.write_job_state(control="run", status="running", message="任务继续")
elif action == "stop":
state = opt.write_job_state(control="stop", message="收到停止请求")
else:
return jsonify({"success": False, "error": "不支持的动作"}), 400
return jsonify({"success": True, "job_state": _job_state_payload(state)})
@app.get("/api/job/state")
def api_job_state() -> Any:
state = _repair_broken_state()
return jsonify({"success": True, "job_state": _job_state_payload(state)})
@app.get("/api/results")
def api_results() -> Any:
state = _get_user_state()
bundle = state.get("bundle") or {}
target_str = str(request.args.get("target_str") or bundle.get("target_str") or "")
job_state = _repair_broken_state()
job_result = opt._read_pickle(opt.JOB_RESULT_FILE, {}) # noqa: SLF001
if not isinstance(job_result, dict):
job_result = {}
# Failed/stopped summary
if (
job_result.get("target_str") == target_str
and job_state.get("status") in {"failed", "stopped"}
and job_result.get("reject_reason_stats")
):
return jsonify(
{
"success": True,
"target_str": target_str,
"status": str(job_state.get("status")),
"has_results": False,
"reject_summary": {
"elapsed_seconds": _to_float(job_result.get("elapsed_seconds", 0.0), 0.0),
"build_reject": _to_int(job_result.get("build_reject", 0), 0),
"rule_reject": _to_int(job_result.get("rule_reject", 0), 0),
"reject_reason_stats": _safe_value(job_result.get("reject_reason_stats") or {}),
"reject_detail_stats": _safe_value(job_result.get("reject_detail_stats") or {}),
},
}
)
if not (
isinstance(job_result, dict)
and job_result.get("target_str") == target_str
and job_state.get("status") == "completed"
):
return jsonify(
{
"success": True,
"target_str": target_str,
"status": str(job_state.get("status", "idle")),
"has_results": False,
}
)
raw_results = list(job_result.get("results") or [])
results = [x for x in (opt.deserialize_candidate(r) for r in raw_results) if x is not None]
today_eff = job_result.get("today_eff", pd.DataFrame())
g_st = job_result.get("golden_start_dt")
g_et = job_result.get("golden_end_dt")
runtime_cfg = job_result.get("runtime_cfg", state.get("runtime_cfg") or opt.load_config())
box_office_data = job_result.get("box_office_data", [])
candidates: List[Dict[str, Any]] = []
for idx, cand in enumerate(results):
schedule_table = opt.df_schedule_for_display(cand.schedule)
summary_df = opt.build_candidate_summary_table(cand.schedule, today_eff, g_st, g_et)
log_target_date = datetime.strptime(target_str, "%Y-%m-%d").date()
log_text = opt.generate_schedule_check_logs_text(
schedule=cand.schedule,
target_date=log_target_date,
params=runtime_cfg,
today_eff=today_eff if isinstance(today_eff, pd.DataFrame) else pd.DataFrame(),
box_office_data=box_office_data,
)
breakdown = [
{"规则": str(name), "分值": _to_float(delta, 0.0), "说明": str(msg)}
for name, delta, msg in (cand.score_breakdown or [])
]
candidates.append(
{
"index": idx,
"title": f"方案{idx + 1}|分数 {cand.score:.1f}",
"score": _to_float(cand.score, 0.0),
"score_breakdown": breakdown,
"schedule_table": _df_to_records(schedule_table),
"summary_table": _df_to_records(summary_df),
"gantt_html": render_gantt_html(cand.schedule, target_str),
"log_text": str(log_text or ""),
"hard_violations": [str(x) for x in (cand.hard_violations or [])],
}
)
return jsonify(
{
"success": True,
"target_str": target_str,
"status": "completed",
"has_results": True,
"summary": {
"total_feasible": _to_int(job_result.get("all_results_count", 0), 0),
"hard_reject": _to_int(job_result.get("hard_reject", 0), 0),
"build_reject": _to_int(job_result.get("build_reject", 0), 0),
"rule_reject": _to_int(job_result.get("rule_reject", 0), 0),
"elapsed_seconds": _to_float(job_result.get("elapsed_seconds", 0.0), 0.0),
"locked_count": _to_int(job_result.get("locked_count", 0), 0),
"reject_reason_stats": _safe_value(job_result.get("reject_reason_stats") or {}),
"reject_detail_stats": _safe_value(job_result.get("reject_detail_stats") or {}),
"reject_phase_stats": _safe_value(job_result.get("reject_phase_stats") or {}),
"reject_examples": _safe_value(job_result.get("reject_examples") or {}),
"movie_targets": _safe_value(job_result.get("movie_targets") or {}),
"today_eff_rows": _df_to_records(today_eff if isinstance(today_eff, pd.DataFrame) else pd.DataFrame()),
},
"candidates": candidates,
}
)
if __name__ == "__main__":
host = os.getenv("NEXTDAY_OPT_HOST", "0.0.0.0")
port = _to_int(os.getenv("NEXTDAY_OPT_PORT", "8502"), 8502)
app.run(host=host, port=port, debug=False, threaded=True)