diff --git "a/optimizer_core.py" "b/optimizer_core.py" new file mode 100644--- /dev/null +++ "b/optimizer_core.py" @@ -0,0 +1,4077 @@ +import json +import os +import pickle +import random +import re +import threading +import time +from collections import Counter, defaultdict +from dataclasses import dataclass +from datetime import date, datetime, time as dt_time, timedelta +from typing import Any, Dict, List, Optional, Set, Tuple + +import pandas as pd +import requests +import urllib3 +from dotenv import load_dotenv + +from schedule_api_client import clean_movie_title, fetch_hall_info, fetch_schedule_data, get_valid_token + + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +load_dotenv() + + + +CONFIG_FILE = os.path.join("cinema_cache", "nextday_schedule_optimizer_config.json") +JOB_STATE_FILE = os.path.join("cinema_cache", "nextday_optimizer_job_state.json") +JOB_PAYLOAD_FILE = os.path.join("cinema_cache", "nextday_optimizer_job_payload.pkl") +JOB_RESULT_FILE = os.path.join("cinema_cache", "nextday_optimizer_job_result.pkl") + +_JOB_THREAD: Optional[threading.Thread] = None + +DEFAULT_CONFIG: Dict[str, Any] = { + "business_start": "09:30", + "business_end": "01:30", + "turnaround_base": 10, + "golden_start": "14:00", + "golden_end": "21:00", + "efficiency_enabled": True, + "efficiency_penalty_coef": 1.0, + "eff_daily_delta_cap": 5, + "rule1_enabled": True, + "rule1_gap": 30, + "rule2_enabled": True, + "rule2_threshold": 4, + "rule2_window_minutes": 30, + "rule2_penalty": 15.0, + "rule2_exempt_ranges": ["14:00-15:00", "19:00-20:00"], + "rule3_enabled": True, + "rule3_gap_minutes": 30, + "rule3_penalty": 12.0, + "rule4_enabled": True, + "rule4_earliest": "10:00", + "rule4_latest": "22:30", + "rule9_enabled": True, + "rule9_hot_top_n": 3, + "rule9_min_ratio": 0.30, + "rule9_penalty": 20.0, + "rule11_enabled": True, + "rule11_after_time": "22:00", + "rule11_penalty": 30.0, + "rule12_enabled": True, + "rule12_penalty_each": 25.0, + "rule13_enabled": True, + "rule13_forbidden_halls": ["2", "8", "9"], + "tms_allowance": 0, + "maintenance_blocks": [], + "iterations": 300, + "random_seed": 20260331, +} + + +@dataclass +class RuleContext: + target_date: date + business_start_dt: datetime + business_end_dt: datetime + golden_start_dt: datetime + golden_end_dt: datetime + params: Dict[str, Any] + blockouts_by_hall: Dict[str, List[Tuple[datetime, datetime]]] + movie_targets: Dict[str, Dict[str, Any]] + movie_weights: Dict[str, float] + tms_by_hall: Dict[str, List[Dict[str, Any]]] + manual_constraints: Dict[str, Dict[str, Optional[float]]] + allowed_movies: Set[str] + preview_windows_by_identity: Dict[str, List[Tuple[datetime, datetime]]] + + +@dataclass +class CandidateResult: + schedule: List[Dict[str, Any]] + score: float + score_breakdown: List[Tuple[str, float, str]] + hard_violations: List[str] + + +def serialize_candidate(cand: CandidateResult) -> Dict[str, Any]: + return { + "schedule": cand.schedule, + "score": float(cand.score), + "score_breakdown": [list(x) for x in (cand.score_breakdown or [])], + "hard_violations": list(cand.hard_violations or []), + } + + +def deserialize_candidate(obj: Any) -> Optional[CandidateResult]: + if isinstance(obj, CandidateResult): + return obj + if not isinstance(obj, dict): + return None + score_breakdown = obj.get("score_breakdown") or [] + parsed_bd: List[Tuple[str, float, str]] = [] + for x in score_breakdown: + if isinstance(x, (list, tuple)) and len(x) >= 3: + parsed_bd.append((str(x[0]), float(x[1]), str(x[2]))) + return CandidateResult( + schedule=list(obj.get("schedule") or []), + score=float(obj.get("score") or 0.0), + score_breakdown=parsed_bd, + hard_violations=list(obj.get("hard_violations") or []), + ) + + +def ensure_cache_dir() -> None: + os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True) + + +def load_config() -> Dict[str, Any]: + ensure_cache_dir() + if not os.path.exists(CONFIG_FILE): + return dict(DEFAULT_CONFIG) + try: + with open(CONFIG_FILE, "r", encoding="utf-8") as f: + loaded = json.load(f) + cfg = dict(DEFAULT_CONFIG) + cfg.update(loaded) + return cfg + except Exception: + return dict(DEFAULT_CONFIG) + + +def save_config(cfg: Dict[str, Any]) -> None: + ensure_cache_dir() + with open(CONFIG_FILE, "w", encoding="utf-8") as f: + json.dump(cfg, f, ensure_ascii=False, indent=2) + + +def _atomic_write_json(path: str, payload: Dict[str, Any]) -> None: + ensure_cache_dir() + tmp = f"{path}.tmp" + with open(tmp, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + os.replace(tmp, path) + + +def _read_json(path: str, default: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + if default is None: + default = {} + if not os.path.exists(path): + return dict(default) + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + if isinstance(data, dict): + out = dict(default) + out.update(data) + return out + return dict(default) + except Exception: + return dict(default) + + +def _now_text() -> str: + return datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + +def _default_job_state() -> Dict[str, Any]: + return { + "status": "idle", + "control": "run", # run | pause | stop + "job_id": "", + "started_at": "", + "started_ts": 0.0, + "ended_at": "", + "updated_at": "", + "target_date": "", + "iterations": 0, + "iter_done": 0, + "progress": 0.0, + "elapsed_seconds": 0.0, + "feasible_count": 0, + "hard_reject": 0, + "build_reject": 0, + "rule_reject": 0, + "reject_reason_top": {}, + "reject_detail_top": {}, + "message": "", + "result_count": 0, + } + + +def _atomic_write_pickle(path: str, payload: Any) -> None: + ensure_cache_dir() + tmp = f"{path}.tmp" + with open(tmp, "wb") as f: + pickle.dump(payload, f) + os.replace(tmp, path) + + +def _read_pickle(path: str, default: Any = None) -> Any: + if not os.path.exists(path): + return default + try: + with open(path, "rb") as f: + return pickle.load(f) + except Exception: + return default + + +def _find_live_worker() -> Optional[threading.Thread]: + global _JOB_THREAD + if _JOB_THREAD is not None and _JOB_THREAD.is_alive(): + return _JOB_THREAD + for t in threading.enumerate(): + if t.name == "nextday-opt-worker" and t.is_alive(): + _JOB_THREAD = t + return t + _JOB_THREAD = None + return None + + +def read_job_state() -> Dict[str, Any]: + return _read_json(JOB_STATE_FILE, _default_job_state()) + + +def write_job_state(**kwargs: Any) -> Dict[str, Any]: + state = read_job_state() + state.update(kwargs) + state["updated_at"] = _now_text() + _atomic_write_json(JOB_STATE_FILE, state) + return state + + +def parse_hm(hm: str, fallback: str) -> dt_time: + raw = str(hm or "").strip() + if not raw: + raw = fallback + try: + return datetime.strptime(raw, "%H:%M").time() + except Exception: + return datetime.strptime(fallback, "%H:%M").time() + + +def hm_str(t: dt_time) -> str: + return t.strftime("%H:%M") + + +def parse_operating_dt(d: date, t: dt_time) -> datetime: + dt = datetime.combine(d, t) + if t < dt_time(6, 0): + dt += timedelta(days=1) + return dt + + +def ceil_datetime_to_step(dt: datetime, step_minutes: int = 5) -> datetime: + aligned = dt.replace(second=0, microsecond=0) + if aligned.minute % step_minutes == 0 and dt.second == 0 and dt.microsecond == 0: + return aligned + add_minutes = (step_minutes - (aligned.minute % step_minutes)) % step_minutes + if add_minutes == 0: + add_minutes = step_minutes + return aligned + timedelta(minutes=add_minutes) + + +def normalize_hall_key(hall_id: Any, hall_name: Any) -> str: + if hall_id not in (None, ""): + return str(hall_id) + if hall_name in (None, ""): + return "" + nums = re.findall(r"\d+", str(hall_name)) + return nums[0] if nums else str(hall_name) + + +def extract_hall_no(raw: Any) -> str: + nums = re.findall(r"\d+", str(raw or "")) + return nums[0] if nums else str(raw or "") + + +def normalize_media_type(media: Any) -> str: + text = str(media or "").upper() + if "3D" in text: + return "3D" + if "2D" in text: + return "2D" + return "" + + +def movie_policy_key(movie_name: Any, movie_media_type: Any = "") -> str: + """ + 片名策略键: + - 同片不同语言归并(依赖 clean_movie_title 规则) + - 2D/3D 分开(若 clean 后未体现 3D,则追加) + """ + base = clean_movie_title(movie_name or "") + media = normalize_media_type(movie_media_type or movie_name) + if media == "3D" and "3D" not in str(base).upper(): + return f"{base}(数字3D)" + return str(base) + + +def tms_missing_pair_key(session: Dict[str, Any]) -> Tuple[str, str, str]: + hall_no = extract_hall_no(session.get("hallName") or session.get("hallId")) + policy = movie_policy_key(session.get("movieName", ""), session.get("movieMediaType", "")) + media = normalize_media_type(session.get("movieMediaType", "")) + return hall_no, policy, media + + +def extract_allowed_movies_from_tuning_df(df: pd.DataFrame) -> Set[str]: + if df is None or df.empty: + return set() + out: Set[str] = set() + for _, row in df.iterrows(): + selected = row.get("选中", False) + if pd.notna(selected) and bool(selected): + key = movie_policy_key(row.get("影片", "")) + if key: + out.add(key) + return out + + +def normalize_text_token(text: Any) -> str: + s = str(text or "") + s = clean_movie_title(s) + s = re.sub(r"\s+", "", s) + s = re.sub(r"[\[\]【】()()·,.,::!!??'\"-]", "", s) + return s.upper() + + +def to_float(v: Any, default: float = 0.0) -> float: + try: + if v in (None, "", "None"): + return default + return float(v) + except Exception: + return default + + +def extract_movie_serial_5_8(movie_num: Any) -> str: + movie_num_norm = re.sub(r"[^A-Z0-9]", "", str(movie_num or "").upper()) + if len(movie_num_norm) >= 8: + return movie_num_norm[4:8] + return "" + + +def movie_identity_key(movie_num: Any, movie_name: Any) -> str: + serial = extract_movie_serial_5_8(movie_num) + if serial: + return f"serial:{serial}" + return f"name:{clean_movie_title(movie_name or '')}" + + +def is_3d_by_movie_num_or_media(movie_num: Any, media: Any) -> bool: + movie_num_norm = re.sub(r"[^A-Z0-9]", "", str(movie_num or "").upper()) + if len(movie_num_norm) >= 4 and movie_num_norm[3] == "2": + return True + return "3D" in str(media or "").upper() + + +def extract_box_office_value(item: Dict[str, Any]) -> float: + for key in ( + "ticketIncome", + "splitTicketIncome", + "todayTicketIncome", + "todayBoxOffice", + "boxOffice", + "box", + "income", + "今日票房", + "今日票房(不含费)", + ): + if key not in item: + continue + raw = item.get(key) + if isinstance(raw, str): + raw = raw.replace(",", "").strip() + try: + val = float(raw) + if val >= 0: + return val + except Exception: + continue + return 0.0 + + +def sort_movies_by_box_office(box_office_data: List[Dict[str, Any]]) -> List[Tuple[str, float]]: + score_map: Dict[str, float] = {} + order_map: Dict[str, int] = {} + + for idx, item in enumerate(box_office_data): + name = clean_movie_title(item.get("movieName") or item.get("影片名称") or "") + if not name: + continue + val = extract_box_office_value(item) + if name not in order_map: + order_map[name] = idx + score_map[name] = max(score_map.get(name, 0.0), val) + + if not score_map: + return [] + + if max(score_map.values()) > 0: + ranked = sorted(score_map.items(), key=lambda x: x[1], reverse=True) + else: + ranked = sorted(score_map.items(), key=lambda x: order_map.get(x[0], 99999)) + return ranked + + +def resolve_hot_movies( + df: pd.DataFrame, + box_office_data: List[Dict[str, Any]], + top_n: int, +) -> Tuple[List[str], str, List[Tuple[str, float]]]: + bo_ranked = sort_movies_by_box_office(box_office_data) + if bo_ranked: + top_val = bo_ranked[0][1] + if top_val > 0: + hot = [m for m, v in bo_ranked if v >= top_val * 0.95] + else: + hot = [m for m, _ in bo_ranked[:top_n]] + if not hot: + hot = [m for m, _ in bo_ranked[:top_n]] + return hot[: max(top_n, len(hot))], "全国大盘票房", bo_ranked + + counts = df["movieClean"].value_counts() + if counts.empty: + return [], "无可用数据", [] + max_count = int(counts.iloc[0]) + hot = counts[counts >= max_count * 0.95].index.tolist() + if not hot: + hot = counts.head(top_n).index.tolist() + fallback_ranked = [(m, float(c)) for m, c in counts.items()] + return hot[: max(top_n, len(hot))], "场次数量", fallback_ranked + + +def rule9_core_windows(d: date) -> List[Tuple[dt_time, dt_time]]: + weekday = d.weekday() + windows = [ + [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], + [(dt_time(14, 0), dt_time(15, 30)), (dt_time(19, 0), dt_time(22, 20))], + [(dt_time(14, 30), dt_time(16, 0)), (dt_time(19, 0), dt_time(21, 40))], + [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], + [(dt_time(14, 0), dt_time(15, 0)), (dt_time(19, 0), dt_time(22, 0))], + [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], + [(dt_time(14, 0), dt_time(17, 0)), (dt_time(19, 0), dt_time(21, 30))], + ] + return windows[weekday] + + +def time_in_ranges(t: dt_time, ranges: List[Tuple[dt_time, dt_time]]) -> bool: + for st_t, et_t in ranges: + if st_t <= et_t: + if st_t <= t < et_t: + return True + else: + if t >= st_t or t < et_t: + return True + return False + + +def gap_intersects_any_blockout( + g_st: datetime, + g_et: datetime, + blockouts_by_hall: Dict[str, List[Tuple[datetime, datetime]]], +) -> bool: + for _, ranges in blockouts_by_hall.items(): + for b_st, b_et in ranges: + if interval_overlaps(g_st, g_et, b_st, b_et): + return True + return False + + +def parse_exempt_ranges(items: List[str]) -> List[Tuple[dt_time, dt_time]]: + out: List[Tuple[dt_time, dt_time]] = [] + for item in items: + s = str(item or "").strip() + if not s: + continue + if "-" not in s: + continue + try: + st_s, et_s = s.split("-", 1) + out.append((datetime.strptime(st_s.strip(), "%H:%M").time(), datetime.strptime(et_s.strip(), "%H:%M").time())) + except Exception: + continue + return out + + +def in_any_exempt(ts: datetime, ranges: List[Tuple[dt_time, dt_time]]) -> bool: + t = ts.time() + for st_t, et_t in ranges: + if st_t <= et_t: + if st_t <= t <= et_t: + return True + else: + if t >= st_t or t <= et_t: + return True + return False + + +def interval_overlaps(a_st: datetime, a_et: datetime, b_st: datetime, b_et: datetime) -> bool: + return not (a_et <= b_st or a_st >= b_et) + + +def gap_intersects_blockout( + hall_key: str, + g_st: datetime, + g_et: datetime, + blockouts_by_hall: Dict[str, List[Tuple[datetime, datetime]]], +) -> bool: + for b_st, b_et in blockouts_by_hall.get(hall_key, []): + if interval_overlaps(g_st, g_et, b_st, b_et): + return True + return False + + +def parse_blockouts_from_config(target_date: date, raw: Any) -> List[Dict[str, Any]]: + if raw in (None, "", []): + return [] + + parsed: List[Dict[str, Any]] + if isinstance(raw, str): + try: + payload = json.loads(raw) + parsed = payload if isinstance(payload, list) else [] + except Exception: + parsed = [] + elif isinstance(raw, list): + parsed = raw + else: + parsed = [] + + result: List[Dict[str, Any]] = [] + for item in parsed: + if not isinstance(item, dict): + continue + hall_token = str(item.get("hall") or item.get("hallId") or item.get("hallName") or "").strip() + st_s = str(item.get("start") or "").strip() + et_s = str(item.get("end") or "").strip() + if not hall_token or not st_s or not et_s: + continue + try: + st_t = datetime.strptime(st_s, "%H:%M").time() + et_t = datetime.strptime(et_s, "%H:%M").time() + st_dt = parse_operating_dt(target_date, st_t) + et_dt = parse_operating_dt(target_date, et_t) + if et_dt <= st_dt: + et_dt += timedelta(days=1) + result.append( + { + "hall_token": hall_token, + "start": st_dt, + "end": et_dt, + } + ) + except Exception: + continue + return result + + +def build_hall_blockouts( + blockouts: List[Dict[str, Any]], + hall_name_map: Dict[Any, str], +) -> Dict[str, List[Tuple[datetime, datetime]]]: + out: Dict[str, List[Tuple[datetime, datetime]]] = {str(hid): [] for hid in hall_name_map.keys()} + + for hid, hname in hall_name_map.items(): + hall_key = str(hid) + hall_no = extract_hall_no(hname) + for b in blockouts: + token = str(b["hall_token"]) + token_no = extract_hall_no(token) + if token in (hall_key, str(hname), hall_no, f"{hall_no}号厅") or token_no == hall_no: + out.setdefault(hall_key, []).append((b["start"], b["end"])) + + for hall_key in out: + out[hall_key].sort(key=lambda x: x[0]) + return out + + +def is_3d_movie(movie: Dict[str, Any]) -> bool: + text = f"{movie.get('movieMediaType', '')} {movie.get('movieName', '')}".upper() + return "3D" in text + + +def fetch_movie_info_for_date(show_date: str) -> List[Dict[str, Any]]: + token = get_valid_token(force_refresh=False) + if not token: + return [] + + def _call(tok: str) -> Tuple[int, Dict[str, Any]]: + url = "https://cawapi.yinghezhong.com/show/getMovieInfo" + params = {"showDate": show_date, "token": tok, "_": int(time.time() * 1000)} + headers = { + "Origin": "https://caw.yinghezhong.com", + "Referer": "https://caw.yinghezhong.com/", + "User-Agent": "Mozilla/5.0", + } + resp = requests.get(url, params=params, headers=headers, timeout=15) + resp.raise_for_status() + payload = resp.json() + return int(payload.get("code", -1)), payload + + try: + code, payload = _call(token) + if code == 1: + return payload.get("data", []) or [] + if code == 500: + token = get_valid_token(force_refresh=True) + if not token: + return [] + code2, payload2 = _call(token) + return payload2.get("data", []) if code2 == 1 else [] + except Exception: + return [] + return [] + + +def dedupe_movies_by_policy_key(movies: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 去重规则: + - 同片不同语言按同一条处理 + - 不同制式(2D/3D)保留 + """ + out: List[Dict[str, Any]] = [] + seen: Set[str] = set() + for m in movies: + key = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) + if not key or key in seen: + continue + seen.add(key) + out.append(m) + return out + + +def build_preview_windows_for_movies( + target_date: date, + movies: List[Dict[str, Any]], +) -> Dict[str, List[Tuple[datetime, datetime]]]: + """ + previewShowTime 规则: + - previewShowTime 为空:不限时段 + - previewShowTime 有值且命中 target_date:仅允许落在该日对应时段内开场 + - previewShowTime 有值但未命中 target_date:视为该日不限时段 + """ + out: Dict[str, List[Tuple[datetime, datetime]]] = {} + for m in movies: + identity = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) + if not identity: + continue + raw_windows = m.get("previewShowTime") or [] + if not isinstance(raw_windows, list) or not raw_windows: + continue + + matched_target_date = False + allowed: List[Tuple[datetime, datetime]] = [] + for w in raw_windows: + if not isinstance(w, dict): + continue + sd_s = str(w.get("startDate") or "").strip() + ed_s = str(w.get("endDate") or "").strip() + st_s = str(w.get("startTime") or "").strip() + et_s = str(w.get("endTime") or "").strip() + if not sd_s or not ed_s or not st_s or not et_s: + continue + try: + sd = datetime.strptime(sd_s, "%Y-%m-%d").date() + ed = datetime.strptime(ed_s, "%Y-%m-%d").date() + if not (sd <= target_date <= ed): + continue + matched_target_date = True + st_t = datetime.strptime(st_s, "%H:%M").time() + et_t = datetime.strptime(et_s, "%H:%M").time() + st_dt = parse_operating_dt(target_date, st_t) + et_dt = parse_operating_dt(target_date, et_t) + if et_dt <= st_dt: + et_dt += timedelta(days=1) + allowed.append((st_dt, et_dt)) + except Exception: + continue + + if matched_target_date and allowed: + out[identity] = allowed + return out + + +def fetch_realtime_box_office(date_str: str) -> List[Dict[str, Any]]: + token = get_valid_token(force_refresh=False) + if not token: + return [] + url = "https://app.bi.piao51.cn/cinema-app/market/realtimeDailyBoxOffice.action" + params = {"qTime": date_str, "token": token} + headers = {"Host": "app.bi.piao51.cn", "User-Agent": "Mozilla/5.0"} + + try: + resp = requests.get(url, params=params, headers=headers, timeout=10) + resp.raise_for_status() + data = resp.json() + if data.get("code") == "A00000": + return data.get("results", {}).get("movieDatalist", []) or [] + except Exception: + return [] + return [] + + +def fetch_tms_server_movies_raw() -> List[Dict[str, Any]]: + app_secret = os.getenv("TMS_APP_SECRET") + ticket = os.getenv("TMS_TICKET") + theater_id = int(os.getenv("TMS_THEATER_ID", "0")) + x_session_id = os.getenv("TMS_X_SESSION_ID") + if not all([app_secret, ticket, theater_id, x_session_id]): + return [] + + try: + token_url = f"https://tms.hengdianfilm.com/cinema-api/admin/generateToken?token=hd&murl=ticket={ticket}" + token_headers = {"Cookie": f"JSESSIONID={x_session_id}", "Content-Type": "application/json"} + token_payload = {"appId": "hd", "appSecret": app_secret, "timeStamp": int(time.time() * 1000)} + token_resp = requests.post(token_url, headers=token_headers, json=token_payload, timeout=10) + token_resp.raise_for_status() + token_data = token_resp.json() + auth_token = token_data.get("param") + if not auth_token: + return [] + + list_url = "https://tms.hengdianfilm.com/cinema-api/cinema/server/dcp/list" + list_headers = {"Token": auth_token, "X-SESSIONID": x_session_id} + all_rows: List[Dict[str, Any]] = [] + page_index = 1 + + while True: + payload = { + "THEATER_ID": theater_id, + "SOURCE": "SERVER", + "ASSERT_TYPE": 2, + "PAGE_CAPACITY": 200, + "PAGE_INDEX": page_index, + } + movie_resp = requests.post( + list_url, + params={"token": "hd", "murl": "ContentMovie"}, + headers=list_headers, + json=payload, + verify=False, + timeout=20, + ) + movie_resp.raise_for_status() + body = movie_resp.json().get("BODY", {}) + rows = body.get("LIST", []) or [] + if not rows: + break + all_rows.extend(rows) + count = int(body.get("COUNT") or len(all_rows)) + if len(all_rows) >= count: + break + page_index += 1 + time.sleep(0.2) + + return all_rows + except Exception: + return [] + + +def fetch_schedule_and_halls(show_date: str) -> Tuple[List[Dict[str, Any]], Dict[Any, Any], Optional[str]]: + token = get_valid_token(force_refresh=False) + if not token: + return [], {}, "未获取到有效 token" + + try: + schedule = fetch_schedule_data(token, show_date) + halls = fetch_hall_info(token) + return schedule or [], halls or {}, None + except ValueError: + token = get_valid_token(force_refresh=True) + if not token: + return [], {}, "token 刷新失败" + try: + schedule = fetch_schedule_data(token, show_date) + halls = fetch_hall_info(token) + return schedule or [], halls or {}, None + except Exception as e: + return [], {}, f"重试后仍失败: {e}" + except Exception as e: + return [], {}, str(e) + + +def build_hall_name_map(next_day_schedule: List[Dict[str, Any]], hall_seat_map: Dict[Any, Any]) -> Dict[Any, str]: + hall_name_map: Dict[Any, str] = {} + for s in next_day_schedule: + hid = s.get("hallId") + hname = s.get("hallName") + if hid not in (None, "") and hname: + hall_name_map[hid] = str(hname) + + if hall_name_map: + return hall_name_map + + for hid in hall_seat_map.keys(): + hall_name_map[hid] = f"{hid}号厅" + + if not hall_name_map: + hall_name_map = {1: "1号厅", 2: "2号厅", 3: "3号厅", 4: "4号厅"} + return hall_name_map + + +def session_display_label(session: Dict[str, Any]) -> str: + start = str(session.get("showStartTime") or session.get("startTime") or "").strip() + hall = str(session.get("hallName") or session.get("hallId") or "").strip() + movie = str(session.get("movieName") or "").strip() + return f"{start} | {hall} | {movie}" + + +def apply_session_exclusions( + schedule_list: List[Dict[str, Any]], + excluded_labels: List[str], +) -> List[Dict[str, Any]]: + if not schedule_list or not excluded_labels: + return list(schedule_list or []) + exclude_set = set(str(x).strip() for x in excluded_labels if str(x).strip()) + return [s for s in schedule_list if session_display_label(s) not in exclude_set] + + +def build_today_efficiency( + today_schedule: List[Dict[str, Any]], + hall_seat_map: Dict[Any, Any], + golden_start: dt_time, + golden_end: dt_time, +) -> pd.DataFrame: + if not today_schedule: + return pd.DataFrame(columns=["影片", "场次", "场次效率", "黄金场次", "黄金效率", "票房"]) + + df = pd.DataFrame(today_schedule) + if df.empty: + return pd.DataFrame(columns=["影片", "场次", "场次效率", "黄金场次", "黄金效率", "票房"]) + + df["影片"] = df.get("movieName", "").apply(clean_movie_title) + df["总收入"] = pd.to_numeric(df.get("soldBoxOffice", 0), errors="coerce").fillna(0) + df["放映时间"] = pd.to_datetime(df.get("showStartTime", "00:00"), format="%H:%M", errors="coerce").dt.time + + by_movie = ( + df.groupby("影片", dropna=False) + .agg(场次=("影片", "size"), 票房=("总收入", "sum")) + .reset_index() + ) + + total_revenue = float(by_movie["票房"].sum()) + total_sessions = int(by_movie["场次"].sum()) + by_movie["场次效率"] = 0.0 + + if total_revenue > 0 and total_sessions > 0: + by_movie["票房比"] = by_movie["票房"] / total_revenue + by_movie["场次比"] = by_movie["场次"] / total_sessions + by_movie["场次效率"] = ( + (by_movie["票房比"] / by_movie["场次比"]) + .replace([float("inf"), -float("inf")], 0) + .fillna(0) + ) + + golden_df = df[df["放映时间"].between(golden_start, golden_end, inclusive="both")].copy() + if golden_df.empty: + by_movie["黄金场次"] = 0 + by_movie["黄金效率"] = 0.0 + else: + g = ( + golden_df.groupby("影片", dropna=False) + .agg(黄金场次=("影片", "size"), 黄金票房=("总收入", "sum")) + .reset_index() + ) + g_total_revenue = float(g["黄金票房"].sum()) + g_total_count = int(g["黄金场次"].sum()) + g["黄金效率"] = 0.0 + + if g_total_revenue > 0 and g_total_count > 0: + g["黄金票房比"] = g["黄金票房"] / g_total_revenue + g["黄金场次比"] = g["黄金场次"] / g_total_count + g["黄金效率"] = ( + (g["黄金票房比"] / g["黄金场次比"]) + .replace([float("inf"), -float("inf")], 0) + .fillna(0) + ) + + by_movie = by_movie.merge(g[["影片", "黄金场次", "黄金效率"]], on="影片", how="left") + by_movie["黄金场次"] = by_movie["黄金场次"].fillna(0).astype(int) + by_movie["黄金效率"] = by_movie["黄金效率"].fillna(0.0) + + return by_movie[["影片", "场次", "场次效率", "黄金场次", "黄金效率", "票房"]] + + +def build_locked_sessions(raw_next_day_schedule: List[Dict[str, Any]], target_date: date) -> List[Dict[str, Any]]: + locked: List[Dict[str, Any]] = [] + for s in raw_next_day_schedule: + sold = int(s.get("soldTicketNum") or s.get("buyTicketNum") or 0) + if sold <= 0: + continue + try: + st_t = datetime.strptime(str(s.get("showStartTime", "00:00")), "%H:%M").time() + et_t = datetime.strptime(str(s.get("showEndTime", "00:00")), "%H:%M").time() + except Exception: + continue + + st_dt = parse_operating_dt(target_date, st_t) + et_dt = parse_operating_dt(target_date, et_t) + if et_dt <= st_dt: + et_dt += timedelta(days=1) + + locked.append( + { + "hallId": s.get("hallId"), + "hallName": s.get("hallName") or f"{s.get('hallId')}号厅", + "movieId": s.get("movieId"), + "movieNum": s.get("movieNum"), + "movieName": s.get("movieName", "未知影片"), + "movieDuration": int(s.get("movieLength") or s.get("movieDuration") or 120), + "movieMediaType": s.get("movieMediaType", ""), + "startTime": st_dt, + "endTime": et_dt, + "is_presold": True, + "sold": sold, + } + ) + return locked + + +def build_tms_index_by_hall(tms_rows: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + by_hall: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + + for row in tms_rows: + halls = row.get("HALL_INFO") or [] + if not isinstance(halls, list) or not halls: + continue + + content_name = str(row.get("CONTENT_NAME") or "") + assert_name = str(row.get("ASSERT_NAME") or "") + assert_id = str(row.get("ASSERT_ID") or "") + source_format = str(row.get("SOURCE_FORMAT") or "") + + entry = { + "assert_12": re.sub(r"[^A-Za-z0-9]", "", assert_id).upper()[:12], + "name_norm": normalize_text_token(assert_name or content_name), + "media": normalize_media_type(source_format), + } + + for hall in halls: + hall_key = extract_hall_no(hall.get("HALL_NAME") or hall.get("HALL_ID")) + by_hall[hall_key].append(entry) + + return dict(by_hall) + + +def session_in_tms(session: Dict[str, Any], hall_key: str, tms_by_hall: Dict[str, List[Dict[str, Any]]]) -> bool: + if not tms_by_hall: + return True + + entries = tms_by_hall.get(extract_hall_no(hall_key), []) + if not entries: + return False + + movie_name_norm = normalize_text_token(session.get("movieName")) + movie_num_12 = re.sub(r"[^A-Za-z0-9]", "", str(session.get("movieNum") or "")).upper()[:12] + media = normalize_media_type(session.get("movieMediaType")) + + for e in entries: + id_ok = bool(movie_num_12) and movie_num_12 == e.get("assert_12") + name_norm = e.get("name_norm") or "" + name_ok = movie_name_norm and ( + movie_name_norm == name_norm + or (movie_name_norm in name_norm) + or (name_norm in movie_name_norm) + ) + media_ok = (not media) or (not e.get("media")) or media == e.get("media") + if media_ok and (id_ok or name_ok): + return True + + return False + + +def build_movie_targets( + movies: List[Dict[str, Any]], + today_eff: pd.DataFrame, + locked_sessions: List[Dict[str, Any]], + box_office_data: List[Dict[str, Any]], + rule12_enabled: bool = True, +) -> Dict[str, Dict[str, Any]]: + locked_total = Counter(movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) for s in locked_sessions) + locked_golden = Counter() + + targets: Dict[str, Dict[str, Any]] = {} + eff_map = { + movie_policy_key(r["影片"]): r + for _, r in (today_eff.iterrows() if not today_eff.empty else []) + } + + rank_boost: Dict[str, float] = {} + bo_ranked = sort_movies_by_box_office(box_office_data) + top10 = {mv for mv, _ in bo_ranked[:10]} + top5 = {mv for mv, _ in bo_ranked[:5]} if rule12_enabled else set() + for i, (mv, _) in enumerate(bo_ranked[:10], start=1): + rank_boost[mv] = max(0.6, 1.6 - 0.1 * i) + + for m in movies: + mv = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) + if not mv: + continue + + eff = eff_map.get(mv) + today_total = int(eff.get("场次", 0)) if eff is not None else 0 + today_golden = int(eff.get("黄金场次", 0)) if eff is not None else 0 + fe = float(eff.get("场次效率", 1.0)) if eff is not None else 1.0 + ge = float(eff.get("黄金效率", 1.0)) if eff is not None else 1.0 + + if today_total <= 0: + if mv in top10: + min_total, max_total = 0, 1 + else: + min_total, max_total = 0, 0 + min_golden = 0 + else: + if fe > 1.5: + min_total, max_total = today_total + 1, today_total + 4 + elif fe < 0.5: + min_total, max_total = max(0, today_total - 1), max(today_total, 1) + else: + min_total, max_total = max(0, today_total - 1), today_total + 2 + + if ge > 1.5: + min_golden = today_golden + 1 + elif ge < 0.5: + min_golden = max(0, today_golden - 1) + else: + min_golden = max(0, today_golden) + + if today_golden == 0 and fe > 1.5: + min_golden = max(1, min_golden) + + # 规则十二优先:票房Top5至少给1个黄金场,并保证总场次可容纳 + if mv in top5: + min_golden = max(1, min_golden) + min_total = max(min_total, 1) + max_total = max(max_total, 1) + + lt = int(locked_total.get(mv, 0)) + lg = int(locked_golden.get(mv, 0)) + + min_total = max(min_total, lt) + min_golden = max(min_golden, lg) + max_total = max(max_total, min_total) + + targets[mv] = { + "min_total": int(min_total), + "max_total": int(max_total), + "min_golden": int(min_golden), + "today_total": int(today_total), + "today_golden": int(today_golden), + "fe": float(fe), + "ge": float(ge), + "base_weight": float(rank_boost.get(mv, 1.0)), + } + + return targets + + +def build_movie_weights( + movies: List[Dict[str, Any]], + movie_targets: Dict[str, Dict[str, Any]], + box_office_data: List[Dict[str, Any]], +) -> Dict[str, float]: + weights: Dict[str, float] = {} + + rank_map: Dict[str, int] = {} + bo_ranked = sort_movies_by_box_office(box_office_data) + for i, (mv, _) in enumerate(bo_ranked[:20], start=1): + rank_map[mv] = i + + for m in movies: + mv = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) + if not mv: + continue + + w = 1.0 + rank = rank_map.get(mv) + if rank is not None: + w *= max(0.7, 1.8 - 0.08 * rank) + + target = movie_targets.get(mv, {}) + fe = float(target.get("fe", 1.0) or 1.0) + ge = float(target.get("ge", 1.0) or 1.0) + + if fe > 1.5: + w *= 1.2 + elif fe < 0.5: + w *= 0.85 + + if ge > 1.5: + w *= 1.1 + elif ge < 0.5: + w *= 0.92 + + weights[mv] = max(0.1, w) + + return weights + + +def can_place( + session: Dict[str, Any], + hall_sessions: List[Dict[str, Any]], + all_sessions: List[Dict[str, Any]], + turn_min: int, + turn_max: int, + hall_key: str, + ctx: RuleContext, +) -> bool: + st_dt = session["startTime"] + et_dt = session["endTime"] + if et_dt <= st_dt: + return False + + blockouts = ctx.blockouts_by_hall.get(hall_key, []) + for b_st, b_et in blockouts: + if interval_overlaps(st_dt, et_dt, b_st, b_et): + return False + + same_hall = sorted(hall_sessions, key=lambda x: x["startTime"]) + for s in same_hall: + if interval_overlaps(st_dt, et_dt, s["startTime"], s["endTime"]): + return False + + prev_session: Optional[Dict[str, Any]] = None + next_session: Optional[Dict[str, Any]] = None + + for s in same_hall: + if s["endTime"] <= st_dt: + prev_session = s + elif s["startTime"] >= et_dt: + next_session = s + break + + if prev_session is not None: + gap = (st_dt - prev_session["endTime"]).total_seconds() / 60 + if gap < turn_min: + return False + if gap > turn_max and not gap_intersects_blockout(hall_key, prev_session["endTime"], st_dt, ctx.blockouts_by_hall): + return False + + if next_session is not None: + gap = (next_session["startTime"] - et_dt).total_seconds() / 60 + if gap < turn_min: + return False + if gap > turn_max and not gap_intersects_blockout(hall_key, et_dt, next_session["startTime"], ctx.blockouts_by_hall): + return False + + if ctx.params["rule1_enabled"]: + identity = movie_identity_key(session.get("movieNum"), session.get("movieName")) + for s in all_sessions: + if movie_identity_key(s.get("movieNum"), s.get("movieName")) != identity: + continue + gap = abs((s["startTime"] - st_dt).total_seconds()) / 60 + if gap < int(ctx.params["rule1_gap"]): + return False + + # 点映时段限制(previewShowTime) + identity = movie_policy_key(session.get("movieName", ""), session.get("movieMediaType", "")) + if identity in ctx.preview_windows_by_identity: + allowed_windows = ctx.preview_windows_by_identity.get(identity, []) + if not allowed_windows: + return False + if not any(w_st <= st_dt <= w_et for w_st, w_et in allowed_windows): + return False + + return True + + +def construct_weight( + movie: Dict[str, Any], + start_dt: datetime, + in_tms: bool, + total_counter: Counter, + golden_counter: Counter, + ctx: RuleContext, +) -> float: + mv = movie_policy_key(movie.get("movieName", ""), movie.get("movieMediaType", "")) + target = ctx.movie_targets.get(mv, {"min_total": 1, "max_total": 6, "min_golden": 0}) + mc = ctx.manual_constraints.get(mv, {}) + + cur_total = int(total_counter.get(mv, 0)) + cur_golden = int(golden_counter.get(mv, 0)) + + deficit_total = max(0, int(target.get("min_total", 0)) - cur_total) + deficit_golden = max(0, int(target.get("min_golden", 0)) - cur_golden) + over_total = max(0, cur_total - int(target.get("max_total", cur_total + 10))) + + is_golden = ctx.golden_start_dt <= start_dt <= ctx.golden_end_dt + + w = float(ctx.movie_weights.get(mv, target.get("base_weight", 1.0))) + w *= 1.0 + deficit_total * 0.7 + + fixed_sessions = mc.get("fixed_sessions") + min_sessions = mc.get("min_sessions") + max_sessions = mc.get("max_sessions") + min_golden_sessions = mc.get("min_golden_sessions") + max_golden_sessions = mc.get("max_golden_sessions") + + if fixed_sessions is not None: + if cur_total < int(fixed_sessions): + w *= 1.6 + else: + w *= 0.12 + else: + if min_sessions is not None and cur_total < int(min_sessions): + w *= 1.3 + max(0, int(min_sessions) - cur_total) * 0.2 + if max_sessions is not None and cur_total >= int(max_sessions): + w *= 0.1 + + if is_golden: + w *= 1.05 + deficit_golden * 0.65 + if min_golden_sessions is not None and cur_golden < int(min_golden_sessions): + w *= 1.25 + if max_golden_sessions is not None and cur_golden >= int(max_golden_sessions): + w *= 0.2 + elif deficit_golden > 0: + w *= 0.85 + + if over_total > 0: + w *= max(0.2, 0.8 - over_total * 0.15) + + if in_tms: + w *= 1.06 + else: + w *= 0.75 + + w *= random.uniform(0.90, 1.15) + return max(0.01, w) + + +def simulate_one_candidate( + movies: List[Dict[str, Any]], + hall_name_map: Dict[Any, str], + locked_sessions: List[Dict[str, Any]], + ctx: RuleContext, + fail_reason_out: Optional[List[str]] = None, +) -> Optional[List[Dict[str, Any]]]: + turn_base = int(ctx.params["turnaround_base"]) + turn_min = max(1, turn_base - 3) + turn_max = max(turn_min, turn_base + 5) + + schedule = [dict(s) for s in locked_sessions] + by_hall: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + for s in schedule: + by_hall[str(s["hallId"])].append(s) + + total_counter = Counter(movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) for s in schedule) + golden_counter = Counter( + movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) + for s in schedule + if ctx.golden_start_dt <= s["startTime"] <= ctx.golden_end_dt + ) + + forbidden_set = {extract_hall_no(h) for h in ctx.params["rule13_forbidden_halls"]} + missing_tms_pairs: Set[Tuple[str, str, str]] = set() + + for s in schedule: + hall_key = extract_hall_no(s.get("hallId") or s.get("hallName")) + if not session_in_tms(s, hall_key, ctx.tms_by_hall): + missing_tms_pairs.add(tms_missing_pair_key(s)) + + if len(missing_tms_pairs) > int(ctx.params["tms_allowance"]): + if fail_reason_out is not None: + fail_reason_out.append( + f"构造前失败:已售锁定场次导致TMS缺片去重 {len(missing_tms_pairs)} 超过允许值 {int(ctx.params['tms_allowance'])}" + ) + return None + + min_duration = min( + [int(m.get("movieDuration") or 9999) for m in movies if int(m.get("movieDuration") or 0) > 0] or [90] + ) + + hall_items = list(hall_name_map.items()) + random.shuffle(hall_items) + + density_window = int(ctx.params.get("rule2_window_minutes", 30)) + density_threshold = int(ctx.params.get("rule2_threshold", 4)) + spread_step = max(5, min(20, int(density_window / max(2, density_threshold + 1)))) + + for hall_idx, (hall_id, hall_name) in enumerate(hall_items): + hall_key = str(hall_id) + hall_no = extract_hall_no(hall_name or hall_id) + hall_sessions = by_hall.get(hall_key, []) + blockouts = ctx.blockouts_by_hall.get(hall_key, []) + + # 各厅首场按步长错峰启动,避免 10:00~10:20 集中扎堆 + base_offset = hall_idx * spread_step + jitter = random.choice([0, 5, 10]) + cursor = ceil_datetime_to_step(ctx.business_start_dt + timedelta(minutes=base_offset + jitter), 5) + attempts = 0 + + while cursor < ctx.business_end_dt and attempts < 1000: + attempts += 1 + cursor = ceil_datetime_to_step(cursor, 5) + + occupied = sorted( + hall_sessions + [{"startTime": b[0], "endTime": b[1], "is_block": True} for b in blockouts], + key=lambda x: x["startTime"], + ) + + next_anchor = None + moved = False + for item in occupied: + if item["endTime"] <= cursor: + continue + if item["startTime"] <= cursor < item["endTime"]: + cursor = item["endTime"] + moved = True + break + if item["startTime"] > cursor: + next_anchor = item + break + if moved: + continue + + if cursor >= ctx.business_end_dt: + break + + gap_end = next_anchor["startTime"] if next_anchor else ctx.business_end_dt + if (gap_end - cursor).total_seconds() / 60 < min_duration: + cursor += timedelta(minutes=5) + continue + + candidates: List[Tuple[Dict[str, Any], float, bool]] = [] + offsets = [0, 5, 10, 15, 20, 25, 30] + random.shuffle(offsets) + + # 所有算法生成场次的开场时间统一按 5 分钟粒度对齐 + + for movie in movies: + mv_policy = movie_policy_key(movie.get("movieName", ""), movie.get("movieMediaType", "")) + if ctx.allowed_movies and mv_policy not in ctx.allowed_movies: + continue + + dur = int(movie.get("movieDuration") or 0) + if dur <= 0: + continue + + media = movie.get("movieMediaType", "") + if ctx.params["rule13_enabled"] and hall_no in forbidden_set and is_3d_by_movie_num_or_media(movie.get("movieNum"), media): + continue + + for off in offsets: + st_dt = cursor + timedelta(minutes=off) + et_dt = st_dt + timedelta(minutes=dur) + if et_dt > gap_end or et_dt > ctx.business_end_dt: + continue + + cand = { + "hallId": hall_id, + "hallName": hall_name, + "movieId": movie.get("movieId"), + "movieNum": movie.get("movieNum"), + "movieName": movie.get("movieName", "未知影片"), + "movieDuration": dur, + "movieMediaType": movie.get("movieMediaType", ""), + "startTime": st_dt, + "endTime": et_dt, + "is_presold": False, + "sold": 0, + } + + if not can_place( + session=cand, + hall_sessions=hall_sessions, + all_sessions=schedule, + turn_min=turn_min, + turn_max=turn_max, + hall_key=hall_key, + ctx=ctx, + ): + continue + + in_tms = session_in_tms(cand, hall_no or hall_key, ctx.tms_by_hall) + if not in_tms: + cand_key = tms_missing_pair_key(cand) + if cand_key not in missing_tms_pairs and len(missing_tms_pairs) >= int(ctx.params["tms_allowance"]): + continue + + w = construct_weight(cand, st_dt, in_tms, total_counter, golden_counter, ctx) + + # 全局开场密度抑制:优先抑制“前30分钟内已过密”的候选 + existing_in_window = int( + sum( + 1 + for s in schedule + if 0 <= (st_dt - s["startTime"]).total_seconds() / 60 < density_window + ) + ) + if existing_in_window >= density_threshold and ctx.params.get("rule2_enabled", True): + continue + if existing_in_window > 0: + w *= max(0.30, 1.0 - 0.10 * existing_in_window) + if existing_in_window >= max(0, density_threshold - 1): + overflow = existing_in_window - density_threshold + 1 + w *= max(0.05, 1.0 - 0.22 * overflow) + + candidates.append((cand, w, in_tms)) + + if not candidates: + cursor += timedelta(minutes=5) + continue + + chosen, _, in_tms = random.choices( + population=[c[0] for c in candidates], + weights=[c[1] for c in candidates], + k=1, + )[0], None, None + + for c in candidates: + if c[0] is chosen: + in_tms = c[2] + break + + schedule.append(chosen) + hall_sessions.append(chosen) + by_hall[hall_key] = hall_sessions + + mv_clean = movie_policy_key(chosen.get("movieName", ""), chosen.get("movieMediaType", "")) + total_counter[mv_clean] += 1 + if ctx.golden_start_dt <= chosen["startTime"] <= ctx.golden_end_dt: + golden_counter[mv_clean] += 1 + + if in_tms is False: + missing_tms_pairs.add(tms_missing_pair_key(chosen)) + + cursor = ceil_datetime_to_step(chosen["endTime"] + timedelta(minutes=turn_min), 5) + + return schedule + + +def validate_manual_movie_constraints( + schedule: List[Dict[str, Any]], + constraints: Dict[str, Dict[str, Optional[float]]], + ctx: RuleContext, + locked_sessions: Optional[List[Dict[str, Any]]] = None, +) -> List[str]: + if not constraints: + return [] + + df = pd.DataFrame(schedule).copy() + if df.empty: + return [] + df["movieClean"] = df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) + + total_sessions = len(df) + violations: List[str] = [] + locked_total: Counter = Counter() + locked_golden: Counter = Counter() + if locked_sessions: + locked_df = pd.DataFrame(locked_sessions).copy() + if not locked_df.empty: + locked_df["movieClean"] = locked_df.apply( + lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1 + ) + locked_total = Counter(locked_df["movieClean"].tolist()) + locked_golden = Counter( + locked_df[ + (locked_df["startTime"] >= ctx.golden_start_dt) & (locked_df["startTime"] <= ctx.golden_end_dt) + ]["movieClean"].tolist() + ) + + for mv, c in constraints.items(): + sub = df[df["movieClean"] == mv] + total = int(len(sub)) + golden = int( + ((sub["startTime"] >= ctx.golden_start_dt) & (sub["startTime"] <= ctx.golden_end_dt)).sum() + ) + share_pct = (total / total_sessions * 100.0) if total_sessions > 0 else 0.0 + golden_ratio_pct = (golden / total * 100.0) if total > 0 else 0.0 + + fixed_sessions = c.get("fixed_sessions") + min_sessions = c.get("min_sessions") + max_sessions = c.get("max_sessions") + min_share_pct = c.get("min_share_pct") + max_share_pct = c.get("max_share_pct") + min_golden_sessions = c.get("min_golden_sessions") + max_golden_sessions = c.get("max_golden_sessions") + min_golden_ratio_pct = c.get("min_golden_ratio_pct") + max_golden_ratio_pct = c.get("max_golden_ratio_pct") + locked_total_mv = int(locked_total.get(mv, 0)) + locked_golden_mv = int(locked_golden.get(mv, 0)) + + if max_sessions is not None: + max_sessions = max(float(max_sessions), float(locked_total_mv)) + if max_golden_sessions is not None: + max_golden_sessions = max(float(max_golden_sessions), float(locked_golden_mv)) + if fixed_sessions is not None and locked_total_mv > int(fixed_sessions): + # 预售锁定优先:固定场次不可低于已售锁定 + fixed_sessions = float(locked_total_mv) + + if fixed_sessions is not None and total != int(fixed_sessions): + violations.append(f"《{mv}》固定场次要求 {int(fixed_sessions)},当前 {total}") + continue + + if min_sessions is not None and total < int(min_sessions): + violations.append(f"《{mv}》次日场次 {total} 低于最少场次 {int(min_sessions)}") + if min_share_pct is not None and share_pct < float(min_share_pct): + violations.append(f"《{mv}》排片占比 {share_pct:.1f}% 低于 {float(min_share_pct):.1f}%") + if max_share_pct is not None and share_pct > float(max_share_pct): + violations.append(f"《{mv}》排片占比 {share_pct:.1f}% 高于 {float(max_share_pct):.1f}%") + if min_golden_sessions is not None and golden < int(min_golden_sessions): + violations.append(f"《{mv}》次日黄金场次 {golden} 低于 {int(min_golden_sessions)}") + if min_golden_ratio_pct is not None and total > 0 and golden_ratio_pct < float(min_golden_ratio_pct): + violations.append(f"《{mv}》黄金占比 {golden_ratio_pct:.1f}% 低于 {float(min_golden_ratio_pct):.1f}%") + if max_golden_ratio_pct is not None and total > 0 and golden_ratio_pct > float(max_golden_ratio_pct): + violations.append(f"《{mv}》黄金占比 {golden_ratio_pct:.1f}% 高于 {float(max_golden_ratio_pct):.1f}%") + + return violations + + +def validate_hard_rules( + schedule: List[Dict[str, Any]], + locked_sessions: List[Dict[str, Any]], + ctx: RuleContext, +) -> List[str]: + if not schedule: + return ["方案为空"] + + p = ctx.params + turn_base = int(p["turnaround_base"]) + turn_min = max(1, turn_base - 3) + turn_max = max(turn_min, turn_base + 5) + + violations: List[str] = [] + + for s in schedule: + st_dt = s["startTime"] + et_dt = s["endTime"] + if et_dt <= st_dt: + violations.append("存在结束时间早于开始时间的场次") + break + if st_dt < ctx.business_start_dt or et_dt > ctx.business_end_dt: + violations.append("存在场次超出营业时间") + break + + by_hall: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + for s in schedule: + by_hall[str(s.get("hallId"))].append(s) + + for hall_key, sessions in by_hall.items(): + sessions = sorted(sessions, key=lambda x: x["startTime"]) + + for i in range(1, len(sessions)): + a = sessions[i - 1] + b = sessions[i] + if interval_overlaps(a["startTime"], a["endTime"], b["startTime"], b["endTime"]): + violations.append(f"影厅{hall_key}存在场次重叠") + break + + gap = (b["startTime"] - a["endTime"]).total_seconds() / 60 + if gap < turn_min: + violations.append(f"影厅{hall_key}存在小于{turn_min}分钟的转换间隔") + break + if gap > turn_max and not gap_intersects_blockout(hall_key, a["endTime"], b["startTime"], ctx.blockouts_by_hall): + violations.append(f"影厅{hall_key}存在大于{turn_max}分钟的转换间隔") + break + + if p["rule1_enabled"]: + movie_slots: Dict[str, List[datetime]] = defaultdict(list) + for s in schedule: + identity = movie_identity_key(s.get("movieNum"), s.get("movieName")) + movie_slots[identity].append(s["startTime"]) + for identity, starts in movie_slots.items(): + starts = sorted(starts) + for i in range(1, len(starts)): + gap = (starts[i] - starts[i - 1]).total_seconds() / 60 + if gap < int(p["rule1_gap"]): + violations.append(f"同影片开场间隔小于{int(p['rule1_gap'])}分钟({identity})") + break + + if p["rule4_enabled"]: + earliest = min(s["startTime"] for s in schedule).time() + latest = max(s["startTime"] for s in schedule).time() + if earliest > parse_hm(p["rule4_earliest"], "10:00"): + violations.append("最早一场晚于规则四阈值") + if latest < parse_hm(p["rule4_latest"], "22:30"): + violations.append("最晚一场早于规则四阈值") + + if p["rule13_enabled"]: + forbidden_set = {extract_hall_no(h) for h in p["rule13_forbidden_halls"]} + for s in schedule: + hall_no = extract_hall_no(s.get("hallName") or s.get("hallId")) + if hall_no in forbidden_set and is_3d_by_movie_num_or_media(s.get("movieNum"), s.get("movieMediaType", "")): + violations.append(f"规则十三违规:{hall_no}号厅出现3D") + break + + if ctx.tms_by_hall: + missing_pairs: Set[Tuple[str, str, str]] = set() + for s in schedule: + hall_no = extract_hall_no(s.get("hallName") or s.get("hallId")) + if not session_in_tms(s, hall_no, ctx.tms_by_hall): + missing_pairs.add(tms_missing_pair_key(s)) + if len(missing_pairs) > int(p["tms_allowance"]): + violations.append(f"TMS 缺片场次(同片同厅去重) {len(missing_pairs)},超过允许值 {int(p['tms_allowance'])}") + + locked_keys = { + ( + str(s.get("hallId")), + movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")), + s.get("startTime"), + s.get("endTime"), + ) + for s in locked_sessions + } + cand_keys = { + ( + str(s.get("hallId")), + movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")), + s.get("startTime"), + s.get("endTime"), + ) + for s in schedule + if s.get("is_presold") + } + if not locked_keys.issubset(cand_keys): + violations.append("存在已售锁定场次被改动") + + manual_violations = validate_manual_movie_constraints(schedule, ctx.manual_constraints, ctx, locked_sessions) + if manual_violations: + violations.extend(manual_violations[:20]) + + return violations + + +def normalize_reject_reason(msg: str) -> str: + text = str(msg or "") + if not text: + return "其他淘汰原因" + if "构造失败" in text: + return "构造阶段失败" + if "存在场次重叠" in text: + return "硬规则:影厅场次重叠" + if "转换间隔" in text: + return "硬规则:影厅场次转换间隔不符" + if "同影片开场间隔" in text: + return "硬规则:规则一同影片间隔不足" + if "最早一场晚于" in text: + return "硬规则:规则四最早场过晚" + if "最晚一场早于" in text: + return "硬规则:规则四最晚场过早" + if "规则十三违规" in text: + return "硬规则:规则十三禁3D违规" + if "TMS 缺片场次" in text: + return "硬规则:TMS缺片超限" + if "已售锁定场次被改动" in text: + return "硬规则:预售锁定场次被改动" + if "固定场次要求" in text: + return "微调约束:固定场次不满足" + if "低于最少场次" in text: + return "微调约束:低于最少场次" + if "高于最多场次" in text: + return "微调约束:高于最多场次" + if "排片占比" in text and "低于" in text: + return "微调约束:低于最低场次占比" + if "排片占比" in text and "高于" in text: + return "微调约束:高于最���场次占比" + if "黄金场次" in text and "低于" in text: + return "微调约束:低于最少黄金场次" + if "黄金场次" in text and "高于" in text: + return "微调约束:高于最多黄金场次" + if "黄金占比" in text and "低于" in text: + return "微调约束:低于最低黄金占比" + if "黄金占比" in text and "高于" in text: + return "微调约束:高于最高黄金占比" + if "超出营业时间" in text: + return "硬规则:场次超出营业时间" + if "结束时间早于开始时间" in text: + return "硬规则:结束时间早于开始时间" + if "方案为空" in text: + return "硬规则:空方案" + return "其他淘汰原因" + + +def score_efficiency_rules( + sched_df: pd.DataFrame, + today_eff: pd.DataFrame, + locked_sessions: List[Dict[str, Any]], + ctx: RuleContext, +) -> Tuple[float, str]: + if today_eff.empty: + return 0.0, "无今日效率数据" + + bonus = 0.0 + reason_parts: List[str] = [] + + golden_mask = (sched_df["startTime"] >= ctx.golden_start_dt) & (sched_df["startTime"] <= ctx.golden_end_dt) + sim_total = sched_df.groupby("movieClean").size().to_dict() + sim_golden = sched_df[golden_mask].groupby("movieClean").size().to_dict() + + if locked_sessions: + locked_df = pd.DataFrame(locked_sessions) + locked_df["movieClean"] = locked_df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) + locked_total = locked_df.groupby("movieClean").size().to_dict() + else: + locked_total = {} + + for _, row in today_eff.iterrows(): + mv = movie_policy_key(row["影片"]) + today_total = int(row.get("场次", 0)) + today_golden = int(row.get("黄金场次", 0) or 0) + fe = float(row.get("场次效率", 0) or 0) + ge = float(row.get("黄金效率", 0) or 0) + + t_total = int(sim_total.get(mv, 0)) + t_golden = int(sim_golden.get(mv, 0)) + locked_cnt = int(locked_total.get(mv, 0)) + + if t_total < locked_cnt: + t_total = locked_cnt + + if today_total == 1: + if today_golden == 0: + if fe > 1.5: + bonus += 30 if (t_total >= 2 and t_golden >= 1) else -35 + elif fe < 0.5: + bonus += 8 if t_total <= 1 else -8 + else: + bonus += 4 + else: + if ge > 1.5: + bonus += 30 if (t_total >= 2 and t_golden >= 2) else -35 + elif ge < 0.5: + if locked_cnt >= today_total: + reason_parts.append(f"{mv}: 锁定场次不可减黄金场,跳过扣分") + else: + bonus += 12 if t_golden <= 0 else -16 + else: + bonus += 5 + else: + if today_golden == 0: + if fe > 1.5: + bonus += 22 if (t_total >= today_total + 1 and t_golden >= 1) else -24 + elif fe < 0.5: + if locked_cnt >= today_total: + reason_parts.append(f"{mv}: 锁定场次不可降总量,跳过扣分") + else: + bonus += 16 if t_total <= max(0, today_total - 1) else -18 + else: + bonus += 4 + else: + if fe > 1.5 and ge > 1.5: + bonus += 24 if (t_total >= today_total + 1 and t_golden >= today_golden + 1) else -25 + elif fe > 1.5 and 0.5 <= ge <= 1.5: + bonus += 18 if t_total >= today_total + 1 else -16 + elif fe > 1.5 and ge < 0.5: + if locked_cnt >= today_golden: + reason_parts.append(f"{mv}: 黄金低效但锁定场次不可减,跳过扣分") + else: + bonus += 12 if (t_total >= today_total + 1 and t_golden <= max(0, today_golden - 1)) else -20 + elif 0.5 <= fe <= 1.5 and ge > 1.5: + bonus += 14 if t_golden >= today_golden + 1 else -12 + elif 0.5 <= fe <= 1.5 and ge < 0.5: + if locked_cnt >= today_total: + reason_parts.append(f"{mv}: 锁定场次不可减,跳过扣分") + else: + bonus += 10 if (t_total <= today_total - 1 and t_golden <= max(0, today_golden - 1)) else -14 + elif fe < 0.5 and ge > 1.5: + if locked_cnt >= today_total: + reason_parts.append(f"{mv}: 锁定场次不可降总场,跳过扣分") + else: + bonus += 9 if (t_total <= max(1, today_total - 1) and t_golden >= today_golden + 1) else -12 + elif fe < 0.5 and 0.5 <= ge <= 1.5: + if locked_cnt >= today_total: + reason_parts.append(f"{mv}: 锁定场次不可降总场,跳���扣分") + else: + bonus += 8 if t_total <= max(1, today_total - 1) else -10 + elif fe < 0.5 and ge < 0.5: + if locked_cnt >= today_total: + reason_parts.append(f"{mv}: 锁定场次不可降总场,跳过扣分") + else: + bonus += 12 if (t_total <= max(1, today_total - 1) and t_golden <= max(0, today_golden - 1)) else -15 + + return bonus, ";".join(reason_parts[:8]) + + +def score_rule2_density(df: pd.DataFrame, ctx: RuleContext) -> Tuple[float, str]: + p = ctx.params + if not p["rule2_enabled"]: + return 0.0, "未启用" + + deduct = 0.0 + starts = sorted(df["startTime"].tolist()) + exempt_ranges = parse_exempt_ranges(p["rule2_exempt_ranges"]) + + for st_dt in starts: + we = st_dt + timedelta(minutes=int(p["rule2_window_minutes"])) + cnt = int(((df["startTime"] >= st_dt) & (df["startTime"] < we)).sum()) + overflow = cnt - int(p["rule2_threshold"]) + if overflow > 0 and not in_any_exempt(st_dt, exempt_ranges): + deduct += overflow * float(p["rule2_penalty"]) + + return -deduct, f"过密窗口扣分 {deduct:.1f}" + + +def score_rule3_gap(df: pd.DataFrame, ctx: RuleContext) -> Tuple[float, str]: + p = ctx.params + if not p["rule3_enabled"]: + return 0.0, "未启用" + + deduct = 0.0 + starts = sorted(df["startTime"].tolist()) + if len(starts) <= 1: + return 0.0, "场次不足" + + for i in range(len(starts) - 1): + gap = (starts[i + 1] - starts[i]).total_seconds() / 60 + if gap > int(p["rule3_gap_minutes"]): + if gap_intersects_any_blockout(starts[i], starts[i + 1], ctx.blockouts_by_hall): + continue + overflow = max(1.0, gap - int(p["rule3_gap_minutes"])) + deduct += (overflow / 10.0) * float(p["rule3_penalty"]) + + return -deduct, f"全局开场断档扣分 {deduct:.1f}" + + +def score_rule9_hot_density(df: pd.DataFrame, ctx: RuleContext, box_office_data: List[Dict[str, Any]]) -> Tuple[float, str]: + p = ctx.params + if not p["rule9_enabled"]: + return 0.0, "未启用" + + windows = rule9_core_windows(ctx.target_date) + golden_df = df[df["startTime"].dt.time.apply(lambda t: time_in_ranges(t, windows))] + if golden_df.empty: + return -float(p["rule9_penalty"]), "核心黄金窗口无场次" + + hot_movies, source, _ = resolve_hot_movies(df, box_office_data, int(p["rule9_hot_top_n"])) + if not hot_movies: + return -float(p["rule9_penalty"]), "无热门片可评估" + + total = len(golden_df) + miss = 0 + for mv in hot_movies: + ratio = float((golden_df["movieClean"] == mv).sum()) / total + if ratio < float(p["rule9_min_ratio"]): + miss += 1 + + deduct = miss * float(p["rule9_penalty"]) + return -deduct, f"热门片来源:{source},密度不足 {miss} 部" + + +def score_rule11_late_hot(df: pd.DataFrame, ctx: RuleContext, box_office_data: List[Dict[str, Any]]) -> Tuple[float, str]: + p = ctx.params + if not p["rule11_enabled"]: + return 0.0, "未启用" + + hot_movies, source, bo_ranked = resolve_hot_movies(df, box_office_data, int(p["rule9_hot_top_n"])) + top_movies = hot_movies[:3] if hot_movies else [] + if bo_ranked and source == "全国大盘票房": + top_movies = [m for m, _ in bo_ranked[:3]] + + hot_movies = top_movies + if not hot_movies: + return 0.0, "无热门片" + + after_t = parse_hm(p["rule11_after_time"], "22:00") + late_df = df[df["startTime"].dt.time.apply(lambda t: t >= after_t or t < dt_time(6, 0))] + if late_df.empty: + return -float(p["rule11_penalty"]), "22:00后无场次" + + late_movies = set(late_df["movieClean"]) + if any(m in late_movies for m in hot_movies): + return 0.0, f"热门片来源:{source},符合" + return -float(p["rule11_penalty"]), f"热门片来源:{source},22:00后无热门片" + + +def score_rule12_top5_golden(df: pd.DataFrame, ctx: RuleContext, box_office_data: List[Dict[str, Any]]) -> Tuple[float, str]: + p = ctx.params + if not p["rule12_enabled"]: + return 0.0, "未启用" + + bo_ranked = sort_movies_by_box_office(box_office_data) + if not bo_ranked: + return 0.0, "未获取到次日票房数据" + + top5 = [m for m, _ in bo_ranked[:5]] + golden_movies = set(df[df["startTime"].dt.time.apply(lambda t: dt_time(14, 0) <= t <= dt_time(21, 0))]["movieClean"]) + miss = [m for m in top5 if m and m not in golden_movies] + deduct = len(miss) * float(p["rule12_penalty_each"]) + return -deduct, f"缺黄金场影片 {len(miss)}" + + +def score_manual_upper_constraints( + schedule: List[Dict[str, Any]], + constraints: Dict[str, Dict[str, Optional[float]]], + locked_sessions: List[Dict[str, Any]], +) -> Tuple[float, str]: + """ + 将“最多场次 / 最多黄金场次”作为软扣分项,不再作为硬淘汰项。 + 预售锁定优先:若已售锁定本身超过���限,则不对该部分扣分。 + """ + if not constraints or not schedule: + return 0.0, "无" + + df = pd.DataFrame(schedule).copy() + if df.empty: + return 0.0, "无" + df["movieClean"] = df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) + totals = Counter(df["movieClean"].tolist()) + golden = Counter( + df[df["startTime"].dt.time.apply(lambda t: dt_time(14, 0) <= t <= dt_time(21, 0))]["movieClean"].tolist() + ) + + locked_total: Counter = Counter() + locked_golden: Counter = Counter() + if locked_sessions: + ldf = pd.DataFrame(locked_sessions).copy() + if not ldf.empty: + ldf["movieClean"] = ldf.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) + locked_total = Counter(ldf["movieClean"].tolist()) + locked_golden = Counter( + ldf[ldf["startTime"].dt.time.apply(lambda t: dt_time(14, 0) <= t <= dt_time(21, 0))]["movieClean"].tolist() + ) + + # 扣分力度:每超1场分别扣 8 / 10 分 + penalty = 0.0 + lines: List[str] = [] + for mv, c in constraints.items(): + cur_total = int(totals.get(mv, 0)) + cur_golden = int(golden.get(mv, 0)) + max_total = c.get("max_sessions") + max_golden = c.get("max_golden_sessions") + + if max_total is not None: + eff_max_total = max(int(max_total), int(locked_total.get(mv, 0))) + overflow = max(0, cur_total - eff_max_total) + if overflow > 0: + d = overflow * 8.0 + penalty += d + lines.append(f"{mv} 超总场 {overflow} 场(-{d:.0f})") + + if max_golden is not None: + eff_max_golden = max(int(max_golden), int(locked_golden.get(mv, 0))) + overflow_g = max(0, cur_golden - eff_max_golden) + if overflow_g > 0: + d = overflow_g * 10.0 + penalty += d + lines.append(f"{mv} 超黄金场 {overflow_g} 场(-{d:.0f})") + + return (-penalty, ";".join(lines[:8]) if lines else "满足上限约束") + + +def score_candidate( + schedule: List[Dict[str, Any]], + ctx: RuleContext, + today_eff: pd.DataFrame, + locked_sessions: List[Dict[str, Any]], + box_office_data: List[Dict[str, Any]], +) -> CandidateResult: + if not schedule: + return CandidateResult(schedule=[], score=0.0, score_breakdown=[], hard_violations=["空方案"]) + + df = pd.DataFrame(schedule).sort_values(["startTime", "hallId"]).copy() + df["movieClean"] = df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) + + score = 1000.0 + breakdown: List[Tuple[str, float, str]] = [] + + if ctx.params["efficiency_enabled"]: + delta, msg = score_efficiency_rules(df, today_eff, locked_sessions, ctx) + penalty_coef = float(ctx.params.get("efficiency_penalty_coef", 1.0) or 1.0) + if delta < 0: + delta *= max(0.0, penalty_coef) + score += delta + breakdown.append(("效率分析表", delta, msg or "按18种情况评估")) + + d2, m2 = score_rule2_density(df, ctx) + score += d2 + breakdown.append(("规则二", d2, m2)) + + d3, m3 = score_rule3_gap(df, ctx) + score += d3 + breakdown.append(("规则三", d3, m3)) + + d9, m9 = score_rule9_hot_density(df, ctx, box_office_data) + score += d9 + breakdown.append(("规则九", d9, m9)) + + d11, m11 = score_rule11_late_hot(df, ctx, box_office_data) + score += d11 + breakdown.append(("规则十一", d11, m11)) + + d12, m12 = score_rule12_top5_golden(df, ctx, box_office_data) + score += d12 + breakdown.append(("规则十二", d12, m12)) + + d_manual_max, m_manual_max = score_manual_upper_constraints(schedule, ctx.manual_constraints, locked_sessions) + score += d_manual_max + breakdown.append(("微调上限扣分", d_manual_max, m_manual_max)) + + return CandidateResult(schedule=schedule, score=score, score_breakdown=breakdown, hard_violations=[]) + + +def _append_rule_logs(parts: List[str], title: str, logs: List[str]) -> None: + parts.append(title) + if logs: + for i, log in enumerate(logs, 1): + parts.append(f"{i}. {log}") + else: + parts.append("(无)") + + +def _hall_display(raw: Any, with_ting: bool = True) -> str: + hall_no = extract_hall_no(raw) + if not hall_no: + return str(raw or "") + return f"{hall_no}号厅" if with_ting else f"{hall_no}号" + + +def generate_schedule_check_logs_text( + schedule: List[Dict[str, Any]], + target_date: date, + params: Dict[str, Any], + today_eff: pd.DataFrame, + box_office_data: List[Dict[str, Any]], +) -> str: + if not schedule: + return "无排片数据,无法进行合理性检查。" + + df = pd.DataFrame(schedule).copy() + if df.empty: + return "无排片数据,无法进行合理性检查。" + + df["startTime"] = pd.to_datetime(df["startTime"], errors="coerce") + df["endTime"] = pd.to_datetime(df["endTime"], errors="coerce") + df = df.dropna(subset=["startTime", "endTime"]).sort_values("startTime").reset_index(drop=True) + if df.empty: + return "无有效排片时间数据,无法进行合理性检查。" + + df["filmName"] = df["movieName"].astype(str) + df["clean_filmName"] = df["filmName"].apply(clean_movie_title) + df["simpleHallName"] = df["hallName"].apply(lambda x: _hall_display(x, with_ting=True)) + + bo_ranked = sort_movies_by_box_office(box_office_data) + bo_sorted_movies = [m for m, _ in bo_ranked] + movie_box_office = {m: float(v) for m, v in bo_ranked} + + final_log_parts: List[str] = [] + + # Rule 1 + logs_r1: List[str] = [] + gap_limit = int(params.get("rule1_gap", 30)) + movie_num_series = df["movieNum"] if "movieNum" in df.columns else pd.Series([""] * len(df), index=df.index) + df["movieSerial_5_8"] = movie_num_series.apply(extract_movie_serial_5_8) + serial_values = [s for s in df["movieSerial_5_8"].dropna().unique() if str(s).strip()] + for serial in serial_values: + film_schedules = df[df["movieSerial_5_8"] == serial].sort_values("startTime").reset_index(drop=True) + for i in range(len(film_schedules) - 1): + s1, s2 = film_schedules.iloc[i], film_schedules.iloc[i + 1] + interval = (s2["startTime"] - s1["startTime"]).total_seconds() / 60 + if interval < gap_limit: + logs_r1.append( + f"《{s1['filmName']}》{s1['simpleHallName']}【{s1['startTime'].strftime('%H:%M')}】和 " + f"《{s2['filmName']}》{s2['simpleHallName']}【{s2['startTime'].strftime('%H:%M')}】" + f"开场时间距离 {int(interval)} 分钟(年度顺序号:{serial})" + ) + _append_rule_logs( + final_log_parts, + f"规则一:同影片场次间隔过近(按 movieNum 第5~8位年度顺序号,少于 {gap_limit} 分钟)", + logs_r1, + ) + + # Rule 2 + logs_r2: List[str] = [] + window_minutes = int(params.get("rule2_window_minutes", 30)) + threshold = int(params.get("rule2_threshold", 4)) + i = 0 + processed_indices_r2 = set() + while i < len(df): + if i in processed_indices_r2: + i += 1 + continue + window_start_time = df.iloc[i]["startTime"] + window_end_time = window_start_time + timedelta(minutes=window_minutes) + window_df = df[(df["startTime"] >= window_start_time) & (df["startTime"] < window_end_time)] + if len(window_df) > threshold: + start_t_str = window_df.iloc[0]["startTime"].strftime("%H:%M") + end_t_str = window_df.iloc[-1]["startTime"].strftime("%H:%M") + lines = [f"【{start_t_str} - {end_t_str}】开场时间比较集中:"] + for _, row in window_df.iterrows(): + lines.append(f" {row['simpleHallName']}《{row['filmName']}》> {row['startTime'].strftime('%H:%M')}") + processed_indices_r2.add(int(row.name)) + logs_r2.append("\n".join(lines)) + i += 1 + _append_rule_logs(final_log_parts, f"\n规则二:{window_minutes} 分钟内影片开场超过 {threshold} 场", logs_r2) + + # Rule 3 + logs_r3: List[str] = [] + gap_minutes = int(params.get("rule3_gap_minutes", 30)) + if len(df) > 1: + for i in range(len(df) - 1): + s1_start, s2_start = df.iloc[i]["startTime"], df.iloc[i + 1]["startTime"] + gap = (s2_start - s1_start).total_seconds() / 60 + if gap > gap_minutes: + logs_r3.append(f"【{s1_start.strftime('%H:%M')} ~ {s2_start.strftime('%H:%M')}】缺少影片开场,间隔 {int(gap)} 分钟") + _append_rule_logs(final_log_parts, f"\n规则三:场次开场间隔超过 {gap_minutes} 分钟", logs_r3) + + # Rule 4 + logs_r4: List[str] = [] + if not df.empty: + first_sched = df.iloc[0] + last_sched = df.iloc[-1] + earliest_limit = parse_hm(params.get("rule4_earliest", "10:00"), "10:00") + latest_limit = parse_hm(params.get("rule4_latest", "22:30"), "22:30") + if first_sched["startTime"].time() > earliest_limit: + logs_r4.append( + f"最早一场 {first_sched['simpleHallName']}《{first_sched['filmName']}》{first_sched['startTime'].strftime('%H:%M')} 晚于 {earliest_limit.strftime('%H:%M')}" + ) + if last_sched["startTime"].time() < latest_limit: + logs_r4.append( + f"最晚一场 {last_sched['simpleHallName']}《{last_sched['filmName']}》{last_sched['startTime'].strftime('%H:%M')} 早于 {latest_limit.strftime('%H:%M')}" + ) + _append_rule_logs(final_log_parts, "\n规则四:最早一场晚于 10:00,最晚一场早于 22:30", logs_r4) + + # Rule 5 + logs_r5: List[str] = [] + w5_start = datetime.combine(target_date, dt_time(10, 0)) + w5_end = datetime.combine(target_date, dt_time(23, 0)) + for hall_name in df["simpleHallName"].unique(): + hall_df = df[df["simpleHallName"] == hall_name].sort_values("startTime") + for i in range(len(hall_df) - 1): + prev_end = hall_df.iloc[i]["endTime"] + curr_start = hall_df.iloc[i + 1]["startTime"] + if prev_end < w5_end and curr_start > w5_start: + idle_mins = (curr_start - prev_end).total_seconds() / 60 + if idle_mins > 60: + logs_r5.append(f"{hall_name.replace('厅', '')} 【{prev_end.strftime('%H:%M')} ~ {curr_start.strftime('%H:%M')}】无影片在播,时长 {int(idle_mins)} 分钟") + _append_rule_logs(final_log_parts, "\n规则五:影厅空闲时间超过 1 小时(10:00-23:00)", logs_r5) + + # Rule 6 + logs_r6: List[str] = [] + convert_limit = int(params.get("turnaround_base", 10)) + for hall_name in df["simpleHallName"].unique(): + hall_df = df[df["simpleHallName"] == hall_name].sort_values("startTime") + for i in range(len(hall_df) - 1): + prev_sched = hall_df.iloc[i] + next_sched = hall_df.iloc[i + 1] + conversion = (next_sched["startTime"] - prev_sched["endTime"]).total_seconds() / 60 + if conversion < convert_limit: + logs_r6.append( + f"{hall_name.replace('厅', '')} {prev_sched['endTime'].strftime('%H:%M')} 《{prev_sched['filmName']}》结束后影厅空闲时间仅为 {int(conversion)} 分钟" + ) + _append_rule_logs(final_log_parts, "\n规则六:影厅场次转换时间检查", logs_r6) + + # Rule 7 + logs_r7: List[str] = [] + if not df.empty: + current_time = df.iloc[0]["startTime"].replace(second=0, microsecond=0) + end_time = df.iloc[-1]["endTime"] + reported_windows = set() + while current_time < end_time: + window_end = current_time + timedelta(minutes=10) + starts_in_window = df[(df["startTime"] >= current_time) & (df["startTime"] < window_end)] + ends_in_window = df[(df["endTime"] > current_time) & (df["endTime"] <= window_end)] + if len(starts_in_window) + len(ends_in_window) > 5: + window_tuple = (current_time.strftime("%H:%M"), window_end.strftime("%H:%M")) + if window_tuple not in reported_windows: + exit_halls = "、".join(sorted(set(ends_in_window["simpleHallName"].tolist()))) + entry_halls = "、".join(sorted(set(starts_in_window["simpleHallName"].tolist()))) + log_msg = f"【{current_time.strftime('%H:%M')} ~ {window_end.strftime('%H:%M')}】" + if exit_halls: + log_msg += f",{exit_halls}集中散场" + if entry_halls: + log_msg += ",同时" if exit_halls else "," + log_msg += f"{entry_halls}即将入场" + log_msg += ",预计人流瞬时压力过大。" + logs_r7.append(log_msg) + reported_windows.add(window_tuple) + current_time += timedelta(minutes=5) + + start_groups = df.groupby("startTime").filter(lambda x: len(x) > 3) + for time_val, group in start_groups.groupby("startTime"): + halls = "、".join(sorted(set(group["simpleHallName"].tolist()))) + logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时开场,注意预计人流瞬时压力过大。") + + end_groups = df.groupby("endTime").filter(lambda x: len(x) > 3) + for time_val, group in end_groups.groupby("endTime"): + halls = "、".join(sorted(set(group["simpleHallName"].tolist()))) + logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时散场,注意预计人流瞬时压力过大。") + _append_rule_logs(final_log_parts, "\n规则七:动态散场和入场高峰预警", logs_r7) + + # Rule 8 + logs_r8: List[str] = [] + for hall_name in df["simpleHallName"].unique(): + hall_df = df[df["simpleHallName"] == hall_name] + last_sched = hall_df.nlargest(1, "endTime").iloc[0] + if last_sched["endTime"].date() == target_date and last_sched["endTime"].time() < dt_time(22, 30): + logs_r8.append(f"{hall_name.replace('厅', '')} 最后一场于【{last_sched['endTime'].strftime('%H:%M')}】结束,过早停运。") + _append_rule_logs(final_log_parts, "\n规则八:影厅结束运营过早预警", logs_r8) + + # Rule 9 + logs_r9: List[str] = [] + windows = rule9_core_windows(target_date) + period_str = " 和 ".join([f"{s.strftime('%H:%M')}-{e.strftime('%H:%M')}" for s, e in windows]) + golden_df = df[df["startTime"].apply(lambda x: time_in_ranges(x.time(), windows))] + if not golden_df.empty: + if bo_sorted_movies: + max_bo = float(movie_box_office.get(bo_sorted_movies[0], 0)) + if max_bo > 0: + hot_films = [m for m, v in movie_box_office.items() if v >= max_bo * 0.95] + else: + hot_films = bo_sorted_movies[: int(params.get("rule9_hot_top_n", 3))] + else: + counts = df["clean_filmName"].value_counts() + max_count = int(counts.iloc[0]) if not counts.empty else 0 + hot_films = counts[counts >= max_count * 0.95].index.tolist() if max_count > 0 else [] + + min_ratio = float(params.get("rule9_min_ratio", 0.3)) + for film in hot_films: + ratio = float((golden_df["clean_filmName"] == film).sum()) / max(1, len(golden_df)) + if ratio < min_ratio: + logs_r9.append(f"《{film}》在核心黄金时段 {period_str} 排片占比仅为{ratio:.1%},低于 {min_ratio:.0%}。") + _append_rule_logs(final_log_parts, "\n规则九:黄金时段热门影片排片密度检查", logs_r9) + + # Rule 10 + logs_r10: List[str] = [] + if today_eff is not None and not today_eff.empty: + tomorrow_stats: Dict[str, Dict[str, int]] = {} + for film in df["clean_filmName"].unique(): + fdf = df[df["clean_filmName"] == film] + tom_total = len(fdf) + tom_golden = len(fdf[fdf["startTime"].apply(lambda x: dt_time(14, 0) <= x.time() <= dt_time(21, 0))]) + tomorrow_stats[film] = {"total": int(tom_total), "golden": int(tom_golden)} + + for _, row in today_eff.iterrows(): + film = clean_movie_title(row.get("影片", "")) + if film not in tomorrow_stats: + continue + + today_total = int(row.get("场次", 0) or 0) + today_golden = int(row.get("黄金场次", 0) or 0) + fe = float(row.get("场次效率", 0) or 0) + ge = float(row.get("黄金效率", 0) or 0) + tom_total = int(tomorrow_stats[film]["total"]) + tom_golden = int(tomorrow_stats[film]["golden"]) + is_valid = True + + if today_total == 1: + if today_golden == 0: + if fe > 1.5: + is_valid = tom_golden >= 1 and tom_total >= 2 + elif fe < 0.5: + is_valid = tom_total in [0, 1] + else: + is_valid = tom_total in [0, 1, 2] + else: + if ge > 1.5: + is_valid = tom_golden >= 2 and tom_total >= 2 + elif ge < 0.5: + is_valid = tom_golden == 0 and tom_total in [0, 1] + else: + is_valid = (tom_total, tom_golden) in [(1, 1), (2, 1), (1, 0)] + else: + if today_golden == 0: + if fe > 1.5: + is_valid = tom_total > today_total and tom_golden >= 1 + elif fe < 0.5: + is_valid = tom_total < today_total and tom_golden == 0 + else: + if fe > 1.5 and ge > 1.5: + is_valid = tom_total > today_total and tom_golden >= today_golden + 1 + elif fe > 1.5 and 0.5 <= ge <= 1.5: + is_valid = tom_total > today_total + elif fe > 1.5 and ge < 0.5: + is_valid = tom_total > today_total and tom_golden <= max(0, today_golden - 1) + elif 0.5 <= fe <= 1.5 and ge > 1.5: + is_valid = tom_golden >= today_golden + 1 + elif 0.5 <= fe <= 1.5 and ge < 0.5: + is_valid = tom_total < today_total and tom_golden <= max(0, today_golden - 1) + elif fe < 0.5 and ge > 1.5: + is_valid = tom_total <= max(1, today_total - 1) and tom_golden >= today_golden + elif fe < 0.5 and 0.5 <= ge <= 1.5: + is_valid = tom_total <= max(1, today_total - 1) + elif fe < 0.5 and ge < 0.5: + is_valid = tom_total <= max(1, today_total - 1) and tom_golden <= max(0, today_golden - 1) + + if not is_valid: + film_rows = df[df["clean_filmName"] == film] + locked_cnt = int(film_rows["is_presold"].fillna(False).sum()) if ("is_presold" in film_rows.columns) else 0 + # 预售优先:若次日已有预售锁定场次,与效率建议冲突时可忽略 + if locked_cnt > 0: + continue + logs_r10.append(f"《{film}》全天场次效率:{fe:.2f} 黄金时段场次效率:{ge:.2f} 次日的排片不满足要求。") + _append_rule_logs(final_log_parts, "\n规则十:次日排片效率匹配度检查", logs_r10) + + # Rule 11 + logs_r11: List[str] = [] + if bo_sorted_movies: + top_movies = bo_sorted_movies[:3] + top_movies_type = "票房排行前三" + else: + top_movies = df["clean_filmName"].value_counts().head(3).index.tolist() + top_movies_type = "排片量前三" + + if top_movies: + after_t = parse_hm(params.get("rule11_after_time", "22:00"), "22:00") + late_sessions = df[df["startTime"].apply(lambda t: t.time() >= after_t or t.time() < dt_time(6, 0))] + late_movies = set(late_sessions["clean_filmName"].unique()) if not late_sessions.empty else set() + if not any(m in late_movies for m in top_movies): + top_movies_str = "、".join([f"《{m}》" for m in top_movies]) + logs_r11.append(f"{top_movies_type}的影片 {top_movies_str} 在 22:00 后均无场次,建议增加热门影片晚场。") + _append_rule_logs(final_log_parts, "\n规则十一:22:00 后热门影片排片检查", logs_r11) + + # Rule 12 + logs_r12: List[str] = [] + if bo_sorted_movies: + for movie in bo_sorted_movies[:5]: + movie_df = df[df["clean_filmName"] == movie] + if movie_df.empty: + logs_r12.append(f"《{movie}》为次日票房排行前五的影片,但目前未排片。") + continue + golden_sessions = movie_df[movie_df["startTime"].apply(lambda x: dt_time(14, 0) <= x.time() <= dt_time(21, 0))] + if golden_sessions.empty: + logs_r12.append(f"《{movie}》为次日票房排行前五的影片,但没有安排黄金场(14:00-21:00)。") + else: + logs_r12.append("未获取到次日票房数据,无法检查规则十二。") + _append_rule_logs(final_log_parts, "\n规则十二:次日票房前五的影片必须有一场黄金场", logs_r12) + + # Rule 13 + logs_r13: List[str] = [] + restricted = {extract_hall_no(x) for x in params.get("rule13_forbidden_halls", ["2", "8", "9"])} + for _, row in df.iterrows(): + hall_no = extract_hall_no(row.get("hallName")) + if hall_no in restricted and is_3d_by_movie_num_or_media(row.get("movieNum"), row.get("movieMediaType", "")): + logs_r13.append(f"{hall_no}号厅《{row.get('filmName', '未知影片')}》疑似3D排片(movieNum第4位为2)") + _append_rule_logs(final_log_parts, "\n规则十三:2号、8号、9号厅禁止3D排片检查(movieNum第4位为2)", logs_r13) + + return "\n".join(final_log_parts) + + +def schedule_signature(schedule: List[Dict[str, Any]]) -> str: + tokens: List[str] = [] + for s in sorted( + schedule, + key=lambda x: ( + str(x.get("hallId")), + x.get("startTime"), + movie_policy_key(x.get("movieName", ""), x.get("movieMediaType", "")), + ), + ): + tokens.append( + f"{s.get('hallId')}|{movie_policy_key(s.get('movieName',''), s.get('movieMediaType',''))}|" + f"{s.get('startTime').strftime('%H:%M')}|{s.get('endTime').strftime('%H:%M')}" + ) + return "#".join(tokens) + + +def render_gantt(schedule: List[Dict[str, Any]], date_str: str, tab_key: str) -> None: + if not schedule: + st.info("无排片数据") + return + + df = pd.DataFrame(schedule).copy() + if df.empty: + st.info("无排片数据") + return + + df["startTime"] = pd.to_datetime(df["startTime"], errors="coerce") + df["endTime"] = pd.to_datetime(df["endTime"], errors="coerce") + df = df.dropna(subset=["hallName", "movieName", "startTime", "endTime"]).copy() + if df.empty: + st.info("无有效排片数据") + return + + def _hall_sort_key(h: Any) -> Tuple[int, str]: + nums = re.findall(r"\d+", str(h)) + return (int(nums[0]), str(h)) if nums else (9999, str(h)) + + hall_order = sorted(df["hallName"].astype(str).unique().tolist(), key=_hall_sort_key) + t_min = df["startTime"].min().replace(minute=0, second=0, microsecond=0) + t_max = (df["endTime"].max() + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + total_minutes = max(60.0, (t_max - t_min).total_seconds() / 60.0) + total_hours = max(1, int((t_max - t_min).total_seconds() / 3600)) + + palette = [ + "#2A9D8F", + "#E76F51", + "#264653", + "#F4A261", + "#5B8FF9", + "#6DC8EC", + "#5D7092", + "#9270CA", + "#FF9D4D", + "#269A99", + ] + movies = sorted(df["movieName"].astype(str).unique().tolist()) + color_map = {m: palette[i % len(palette)] for i, m in enumerate(movies)} + + labels: List[str] = [] + for i in range(total_hours + 1): + labels.append(f'