diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,2113 +1,376 @@ import streamlit as st -import pandas as pd -import numpy as np import requests import time -from collections import defaultdict import json import os -import re -from datetime import datetime, timedelta, time as dt_time -import io -import warnings -import matplotlib.pyplot as plt -import matplotlib.font_manager as font_manager -from matplotlib.lines import Line2D -from matplotlib.backends.backend_pdf import PdfPages -import matplotlib.gridspec as gridspec -import base64 -import math -from pypinyin import lazy_pinyin, Style -from itertools import combinations -# --- 新增:加载环境变量 --- -from dotenv import load_dotenv -load_dotenv() # 加载本地 .env 文件 +import threading +from datetime import datetime, timedelta, timezone, time as dt_time +from collections import deque - -# --- 全局配置和常量 --- +# --- 1. 配置与常量 --- +PUSHOVER_TOKEN = "a5cynwsr19y8bdvn2y447bg3vc51fr" +PUSHOVER_USER = "uc86mdifsboas7bczuxmnoyq6qyouw" TOKEN_FILE = 'token_data.json' -# --- 环境变量获取 (替代硬编码) --- -# 使用 os.getenv 获取,如果获取不到默认为空字符串或特定默认值 -GAODE_API_KEY = os.getenv("GAODE_API_KEY", "") -ADCODE = os.getenv("ADCODE", "440114") -CINEMA_ID = os.getenv("CINEMA_ID", "44001291") - -# --- 打印功能相关常量 --- -BUSINESS_START = "09:30" -BUSINESS_END = "01:30" -BORDER_COLOR = 'grey' -DATE_COLOR = '#A9A9A9' -A5_WIDTH_IN = 5.83 -A5_HEIGHT_IN = 8.27 -NUM_COLS = 3 - -# --- 打印字体清单 --- -ALL_FONTS = { - "思源黑体-常规 (推荐 LED 屏)": "SimHei.ttf", - "思源黑体-重体 (推荐散场表)": "SourceHanSansOLD-Heavy-2.otf", - "思源黑体-粗体": "SourceHanSansOLD-Bold-2.otf", - "思源宋体-常规": "SourceHanSansCN-Normal.otf", - "苹方-中黑": "PingFangSC-Medium.otf", - "苹方-半粗": "PingFangSC-Semibold.otf", - "苹方-极细": "PingFangSC-Ultralight.otf", - "阿里巴巴普惠体-常规": "Alibaba-PuHuiTi.ttf", - "阿里巴巴普惠体-粗体": "AlibabaPuHuiTi-Bold.otf", - "阿里巴巴普惠体-重体": "AlibabaPuHuiTi-Heavy.otf", -} -# 检查可用字体 -AVAILABLE_FONTS = {name: fname for name, fname in ALL_FONTS.items() if os.path.exists(fname)} +CINEMA_ID = "44001291" -# --- 忽略特定警告 --- -# 忽略 openpyxl 的样式警告 -warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl") -# 忽略 pandas 日期解析的警告 (针对无法推断格式的情况) -warnings.filterwarnings("ignore", message="Could not infer format") +# 页面配置 +st.set_page_config(page_title="影城防撤场监控系统 Pro", page_icon="🛡️", layout="wide") -# --- 页面基础设置 --- -st.set_page_config(layout="wide", page_title="影城工作便捷工具") +# --- 2. 核心时间处理 --- +def get_beijing_now(): + """获取当前的北京时间 (Naive)""" + utc_now = datetime.now(timezone.utc) + beijing_now = utc_now.astimezone(timezone(timedelta(hours=8))) + return beijing_now.replace(tzinfo=None) -# --- 1. API 数据获取模块 --- +def get_business_date(): + """获取当前的营业日日期""" + now = get_beijing_now() + if now.time() < dt_time(6, 0): + return (now - timedelta(days=1)).strftime("%Y-%m-%d") + return now.strftime("%Y-%m-%d") -# --- 1.1 Token 管理 --- -def save_token(token_data): - """将Token数据保存到JSON文件""" +def parse_show_datetime(date_str, time_str): + """解析排片时间""" try: - with open(TOKEN_FILE, 'w', encoding='utf-8') as f: - json.dump(token_data, f, ensure_ascii=False, indent=4) - return True - except Exception as e: - st.error(f"保存Token失败: {e}") - return False - - -def load_token(): - """从JSON文件加载Token数据""" - if os.path.exists(TOKEN_FILE): - try: - with open(TOKEN_FILE, 'r', encoding='utf-8') as f: - return json.load(f) - except (json.JSONDecodeError, FileNotFoundError): - return None - return None - - -def login_and_get_token(): - """执行登录操作并获取新的Token""" - st.write("Token无效或已过期,正在尝试重新登录...") - - - # 获取环境变量 - username = os.getenv("CINEMA_USERNAME") - password = os.getenv("CINEMA_PASSWORD") - res_code = os.getenv("CINEMA_RES_CODE") - device_id = os.getenv("CINEMA_DEVICE_ID") - - # 简单检查,防止未配置环境变量导致后续请求莫名报错 - if not all([username, password, res_code]): - st.error("登录失败:未配置用户名、密码或影院编码环境变量。") + show_t = datetime.strptime(time_str, "%H:%M").time() + base_date = datetime.strptime(date_str, "%Y-%m-%d") + if show_t < dt_time(6, 0): + base_date += timedelta(days=1) + return datetime.combine(base_date.date(), show_t) + except: return None - session = requests.Session() - session.headers.update({ - 'Host': 'app.bi.piao51.cn', - 'Accept': 'application/json, text/javascript, */*; q=0.01', - 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', - }) - - login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' - login_headers = { - 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', - 'Origin': 'https://app.bi.piao51.cn', +def send_pushover(title, message, priority=0): + url = "https://api.pushover.net/1/messages.json" + data = { + "token": PUSHOVER_TOKEN, + "user": PUSHOVER_USER, + "title": title, + "message": message, + "sound": "pushover", + "priority": priority } - # 使用变量 - login_data = { - 'username': username, - 'password': password, - 'type': '1', - 'resCode': res_code, - 'deviceid': device_id, - 'dtype': 'ios', - } - try: - response_login = session.post(login_url, headers=login_headers, data=login_data, allow_redirects=False, - timeout=15) - if not (300 <= response_login.status_code < 400 and 'token' in session.cookies): - st.error(f"登录步骤 1 失败,未能获取 Session Token。状态码: {response_login.status_code}") - return None - - user_info_url = 'https://app.bi.piao51.cn/cinema-app/security/logined.action' - response_user_info = session.get(user_info_url, timeout=10) - response_user_info.raise_for_status() - - user_info = response_user_info.json() - if user_info.get("success") and user_info.get("data", {}).get("token"): - token_data = user_info['data'] - if save_token(token_data): st.toast("登录成功,已获取并保存新 Token!", icon="🔑") - return token_data - else: - st.error(f"登录步骤 2 失败,未能从 JSON 中提取 Token。响应: {user_info.get('msg')}") - return None + requests.post(url, data=data, timeout=5) + except: + pass - except requests.exceptions.RequestException as e: - st.error(f"登录请求过程中发生网络错误: {e}") - return None - - -# --- 1.2 API 数据抓取 (排片相关) --- -def fetch_hall_info(token): - url = 'https://cawapi.yinghezhong.com/showInfo/getShowHallInfo' - params = {'token': token, '_': int(time.time() * 1000)} - headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} - response = requests.get(url, params=params, headers=headers, timeout=10) - response.raise_for_status() - data = response.json() - if data.get('code') == 1 and data.get('data'): - return {item['hallId']: item['seatNum'] for item in data['data']} - else: - raise Exception(f"获取影厅信息失败: {data.get('msg', '未知错误')}") +# --- 3. API 管理模块 --- +class APIManager: + def __init__(self, logger): + self.last_login_fail_time = 0 + self.logger = logger -def fetch_schedule_data(token, show_date): - url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' - params = {'showDate': show_date, 'token': token, '_': int(time.time() * 1000)} - headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} - response = requests.get(url, params=params, headers=headers, timeout=15) - response.raise_for_status() - data = response.json() - if data.get('code') == 1: - return data.get('data', []) - elif data.get('code') == 500: - raise ValueError("Token 可能已失效") - else: - raise Exception(f"获取排片数据失败: {data.get('msg', '未知错误')}") + def save_token(self, token_data): + try: + with open(TOKEN_FILE, 'w', encoding='utf-8') as f: + json.dump(token_data, f, ensure_ascii=False, indent=4) + except: + pass + def load_token(self): + if os.path.exists(TOKEN_FILE): + try: + with open(TOKEN_FILE, 'r', encoding='utf-8') as f: + return json.load(f).get('token') + except: + pass + return None -def get_api_data_with_token_management(show_date): - token_data = load_token() - token = token_data.get('token') if token_data else None - if not token: - token_data = login_and_get_token() - if not token_data: return None, None - token = token_data.get('token') + def login(self): + if time.time() - self.last_login_fail_time < 600: + self.logger("⏳ 登录冷却中,跳过重试。") + return None - try: - schedule_list = fetch_schedule_data(token, show_date) - hall_seat_map = fetch_hall_info(token) - return schedule_list, hall_seat_map - except ValueError: - st.toast("Token 已失效,正在尝试重新登录并重试...", icon="🔄") - token_data = login_and_get_token() - if not token_data: return None, None - token = token_data.get('token') + self.logger("🔄 尝试后台自动登��...") + session = requests.Session() + session.headers.update({'User-Agent': 'Mozilla/5.0'}) + login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' + login_data = { + 'username': 'chenhy', 'password': '123456', 'type': '1', + 'resCode': '44001291', 'deviceid': '1517bfd3f6e7fef1333', 'dtype': 'ios', + } + try: - schedule_list = fetch_schedule_data(token, show_date) - hall_seat_map = fetch_hall_info(token) - return schedule_list, hall_seat_map + session.post(login_url, data=login_data, timeout=15) + resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10) + info = resp.json() + if info.get("success") and info.get("data", {}).get("token"): + self.save_token(info['data']) + self.logger("✅ 登录成功。") + return info['data']['token'] + else: + raise Exception("未获取到Token") except Exception as e: - st.error(f"重试获取数据失败: {e}"); - return None, None - except Exception as e: - st.error(f"获取 API 数据时发生错误: {e}"); - return None, None - - -# --- 1.3 新增:电影名称API (获取标准电影名) --- -@st.cache_data(show_spinner=False, ttl=600) -def fetch_canonical_movie_names(token, date_str): - """ - 获取指定日期的官方电影名称列表(唯一名称)。 - 用于后续对原始排片数据中的电影名进行标准化清洗。 - """ - url = 'https://app.bi.piao51.cn/cinema-app/mycinema/movieSellGross.action' - params = { - 'token': token, - 'startDate': date_str, - 'endDate': date_str, - 'dateType': 'day', - 'cinemaId': CINEMA_ID - } - headers = { - 'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'jwt': '0', - 'Accept': 'application/json, text/javascript, */*; q=0.01', - 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', - } - - try: - response = requests.get(url, params=params, headers=headers, timeout=10) - response.raise_for_status() - data = response.json() - if data.get('code') == 'A00000' and data.get('results'): - # 提取 results 列表中的 movieName,排除 "总计" 等非电影项 - names = [item['movieName'] for item in data['results'] if - item.get('movieName') and item['movieName'] != '总计'] - return names - except Exception as e: - # 这里的错误不打断主流程,返回空列表,后续会回退到基础清洗逻辑 - print(f"获取标准电影名称失败: {e}") - return [] - - -def process_api_data(schedule_list, hall_seat_map, token=None, show_date=None): - if not schedule_list: - st.warning("未获取到任何排片数据。"); - return pd.DataFrame() - df = pd.DataFrame(schedule_list) - df['座位数'] = df['hallId'].map(hall_seat_map).fillna(0).astype(int) - df.rename(columns={'movieName': '影片名称', 'showStartTime': '放映时间', 'soldBoxOffice': '总收入', - 'soldTicketNum': '总人次'}, inplace=True) - - # 获取标准电影名列表并进行清洗 - canonical_names = [] - if token and show_date: - canonical_names = fetch_canonical_movie_names(token, show_date) - - df['影片名称'] = df['影片名称'].apply(lambda x: clean_movie_title(x, canonical_names)) - - required_cols = ['影片名称', '放映时间', '座位数', '总收入', '总人次'] - df = df[required_cols] - df.dropna(subset=['影片名称', '放映时间'], inplace=True) - for col in ['座位数', '总收入', '总人次']: - df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) - df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M', errors='coerce').dt.time - df.dropna(subset=['放映时间'], inplace=True) - return df - - -# --- 1.4 API 数据抓取 (销售相关) --- -def fetch_sales_data_from_api(token, selected_date): - """从API获取指定日期的销售数据""" - url = 'https://app.bi.piao51.cn/cinema-app/mycinema/retailTop.action' - date_str = selected_date.strftime('%Y-%m-%d') - headers = { - 'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'jwt': '0', - 'Accept': 'application/json, text/javascript, */*; q=0.01', - 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', - } - params = { - 'dateType': 'day', 'startDate': date_str, 'endDate': date_str, 'noEvent': '1', - 'token': token, 'qTime': date_str, 'cinemaId': CINEMA_ID, - } - try: - response = requests.get(url, params=params, headers=headers, timeout=15) - response.raise_for_status() - data = response.json() - if data.get('code') == 'A00000': - return data.get('results', []) - elif "login" in response.text: - raise ValueError("Token可能已失效") - else: - st.error(f"获取 API 数据失败: {data.get('msg', '未知错误')}") + self.last_login_fail_time = time.time() + msg = f"登录失败: {e}" + self.logger(f"❌ {msg}") + send_pushover("监控异常", msg, priority=0) return None - except requests.exceptions.RequestException as e: - st.error(f"API 请求网络错误: {e}") - return None + def fetch_schedule(self, date_str): + token = self.load_token() + if not token: + token = self.login() + if not token: return None -def get_sales_data_with_token_management(selected_date): - """带Token管理的API销售数据获取流程""" - token_data = load_token() - token = token_data.get('token') if token_data else None - if not token: - token_data = login_and_get_token() - if not token_data: return None - token = token_data.get('token') - - try: - api_results = fetch_sales_data_from_api(token, selected_date) - return api_results - except ValueError: - st.toast("Token 已失效,正在尝试重新登录并重试...", icon="🔄") - token_data = login_and_get_token() - if not token_data: return None - token = token_data.get('token') + url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' + params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)} + headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} + try: - api_results = fetch_sales_data_from_api(token, selected_date) - return api_results + response = requests.get(url, params=params, headers=headers, timeout=15) + data = response.json() + if data.get('code') == 1: + return data.get('data', []) + elif data.get('code') == 500: + self.logger("⚠️ Token失效,重试中...") + token = self.login() + if token: + params['token'] = token + response = requests.get(url, params=params, headers=headers, timeout=15) + return response.json().get('data', []) + return None except Exception as e: - st.error(f"重试获取数据失败: {e}") + self.logger(f"API请求异常: {e}") return None - except Exception as e: - st.error(f"获取 API 数据时发生错误: {e}") - return None - - -# --- 2. 核心数据分析模块 --- -def clean_movie_title(raw_title, canonical_names=None): - """ - 电影名称标准化清洗函数 - """ - if not isinstance(raw_title, str): - return raw_title - - base_name = None - - # 1. 尝试匹配标准名称 - if canonical_names: - # 按长度倒序排序,确保最长匹配优先(解决"你好"vs"你好明天"的问题) - sorted_names = sorted(canonical_names, key=len, reverse=True) - for name in sorted_names: - if name in raw_title: - base_name = name - break - - # 2. 回退逻辑:如果没传列表或没匹配到,使用空格分割 - if not base_name: - base_name = raw_title.split(' ', 1)[0] - - # 3. 后缀追加逻辑 - raw_upper = raw_title.upper() - suffix = "" - - if "HDR LED" in raw_upper: - suffix = "(HDR LED)" - elif "CINITY" in raw_upper: - suffix = "(CINITY)" - elif "杜比" in raw_upper or "DOLBY" in raw_upper: - suffix = "(杜比视界)" - elif "IMAX" in raw_upper: - if "3D" in raw_upper: - suffix = "(数字IMAX3D)" - else: - suffix = "(数字IMAX)" - elif "巨幕" in raw_upper: - if "立体" in raw_upper: - suffix = "(中国巨幕立体)" - else: - suffix = "(中国巨幕)" - elif "3D" in raw_upper: - suffix = "(数字3D)" - - # **修复**: 只有当 base_name 自身不包含该后缀时才添加 - if suffix and suffix not in base_name: - return f"{base_name}{suffix}" - - return base_name - - -def style_efficiency(row): - green, red = 'background-color: #E6F5E6;', 'background-color: #FFE5E5;' - seat_eff, session_eff = row.get('座次效率', 0), row.get('场次效率', 0) - if seat_eff > 1.5 or session_eff > 1.5: return [green] * len(row) - if seat_eff < 0.5 or session_eff < 0.5: return [red] * len(row) - return [''] * len(row) - - -def style_summary_efficiency(row): - green, red = 'background-color: #E6F5E6;', 'background-color: #FFE5E5;' - if (row.get('全部座次效率', 0) > 1.5 or row.get('全部场次效率', 0) > 1.5 or - row.get('黄金时段座次效率', 0) > 1.5 or row.get('黄金时段场次效率', 0) > 1.5): - return [green] * len(row) - if (row.get('全部座次效率', 0) < 0.5 or row.get('全部场次效率', 0) < 0.5 or - row.get('黄金时段座次效率', 0) < 0.5 or row.get('黄金时段场次效率', 0) < 0.5): - return [red] * len(row) - return [''] * len(row) - - -def process_and_analyze_data(df): - if df.empty: return pd.DataFrame() - - # 确保有清洗后的列名 - if '影片名称_清理后' not in df.columns and '影片名称' in df.columns: - df['影片名称_清理后'] = df['影片名称'] - - analysis_df = df.groupby('影片名称_清理后').agg(座位数=('座位数', 'sum'), 场次=('影片名称_清理后', 'size'), - 票房=('总收入', 'sum'), 人次=('总人次', 'sum')).reset_index() - analysis_df.rename(columns={'影片名称_清理后': '影片'}, inplace=True) - analysis_df = analysis_df.sort_values(by='票房', ascending=False).reset_index(drop=True) - total_seats, total_sessions, total_revenue = analysis_df['座位数'].sum(), analysis_df['场次'].sum(), analysis_df[ - '票房'].sum() - with np.errstate(divide='ignore', invalid='ignore'): - analysis_df['均价'] = np.divide(analysis_df['票房'], analysis_df['人次']).fillna(0) - analysis_df['座次比'] = np.divide(analysis_df['座位数'], total_seats).fillna(0) - analysis_df['场次比'] = np.divide(analysis_df['场次'], total_sessions).fillna(0) - analysis_df['票房比'] = np.divide(analysis_df['票房'], total_revenue).fillna(0) - analysis_df['座次效率'] = np.divide(analysis_df['票房比'], analysis_df['座次比']).fillna(0) - analysis_df['场次效率'] = np.divide(analysis_df['票房比'], analysis_df['场次比']).fillna(0) - final_cols = ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率', - '场次效率'] - return analysis_df[final_cols] - - -# --- 2.1 销售数据处理与分析模块 --- -def transform_api_data_to_df(api_results): - """将API返回的JSON列表转换为与Excel格式一致的DataFrame""" - if not api_results: return pd.DataFrame() - records = [] - for item in api_results: - is_package = item.get('goodsAllName') and str(item.get('goodsAllName')).strip() != "" - record = { - '售卖位置': '线上渠道', - '一级分类': '套餐' if is_package else '单品', - '售卖键名称': item.get('goodsName'), - '数量': item.get('goodsSoldNums', 0), - '实收总金额': item.get('goodsSoldIncomes', 0) - } - records.append(record) - return pd.DataFrame(records) - - -def process_sales_data(df): - """核心分析函数,处理DataFrame并返回最终结果""" - if df.empty: - st.warning("没有可供分析的数据。") - return None, "" - required_columns = ['售卖位置', '一级分类', '售卖键名称', '数量', '实收总金额'] - if not all(col in df.columns for col in required_columns): - missing_cols = [col for col in required_columns if col not in df.columns] - st.error(f"数据缺少必要的列: {', '.join(missing_cols)}。") - return None, "" - for col in ['数量', '实收总金额']: - df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) - df.dropna(subset=['售卖键名称', '一级分类'], inplace=True) - df['售卖位置'] = df['售卖位置'].fillna('未知渠道') - - df_package = df[df['一级分类'] == '套餐'].copy() - df_non_package = df[df['一级分类'] != '套餐'].copy() - - if not df_package.empty: - package_summary = df_package.groupby(['售卖位置', '售卖键名称']).agg( - {'数量': 'sum', '实收总金额': 'sum'}).reset_index() - package_summary = package_summary[package_summary['数量'] != 0] - top_10_package = package_summary.sort_values(by='实收总金额', ascending=False).head(10)[ - ['售卖键名称', '数量', '实收总金额']] - else: - top_10_package = pd.DataFrame(columns=['售卖键名称', '数量', '实收总金额']) - - if not df_non_package.empty: - non_package_summary = df_non_package.groupby(['售卖位置', '售卖键名称']).agg( - {'数量': 'sum', '实收总金额': 'sum'}).reset_index() - non_package_summary = non_package_summary[non_package_summary['数量'] != 0] - top_5_non_package = non_package_summary.sort_values(by='实收总金额', ascending=False).head(5)[ - ['售卖键名称', '数量', '实收总金额']] - else: - top_5_non_package = pd.DataFrame(columns=['售卖键名称', '数量', '实收总金额']) - - summary_df = pd.concat([top_10_package, top_5_non_package], ignore_index=True) - final_display_df = pd.DataFrame(index=range(15), columns=['售卖键名称', '数量', '实收总金额']) - final_display_df['售卖键名称'] = '' - final_display_df['数量'] = np.nan - final_display_df['实收总金额'] = np.nan - - if not summary_df.empty: - final_display_df.iloc[:len(summary_df)] = summary_df.to_numpy() - - copy_df = final_display_df.copy() - # **修改**: 先转换为 object 类型再填充,避免 FutureWarning - copy_df = copy_df.astype(object) - copy_df.fillna('', inplace=True) - copy_text = copy_df.to_csv(sep='\t', index=False, header=False) - return final_display_df, copy_text - - -# --- 2.2 影片映出日累计报表数据处理模块 --- -def process_and_filter_data_for_report(schedule_list, hall_seat_map, selected_date_str, token=None): - """核心数据处理函数 for 影片映出日累计报表""" - if not schedule_list: - st.warning("未获取到任何排片数据。") - return pd.DataFrame() - - df = pd.DataFrame(schedule_list) - # 过滤掉观影人数为0的场次 - df['soldTicketNum'] = pd.to_numeric(df['soldTicketNum'], errors='coerce').fillna(0) - df = df[df['soldTicketNum'] > 0].copy() - - if df.empty: - st.info("所有场次的观影人数均为 0,没有可显示的数据。") - return pd.DataFrame() - - # 获取标准电影名并清洗 - canonical_names = [] - if token and selected_date_str: - canonical_names = fetch_canonical_movie_names(token, selected_date_str) - - df['影片'] = df['movieName'].apply(lambda x: clean_movie_title(x, canonical_names)) - df['座位数'] = df['hallId'].map(hall_seat_map).fillna(0).astype(int) - - # 计算上座率 - with np.errstate(divide='ignore', invalid='ignore'): - df['上座率%'] = np.divide(df['soldTicketNum'], df['座位数']) * 100 - df['上座率%'] = df['上座率%'].fillna(0) - - df.rename(columns={'showStartTime': '放映时间', 'hallName': '影厅', 'soldTicketNum': '人数合计'}, inplace=True) - df['放映日期'] = selected_date_str - final_cols = ['影片', '放映日期', '放映时间', '影厅', '人数合计', '座位数', '上座率%'] - result_df = df[final_cols] - result_df = result_df.sort_values(by='放映时间').reset_index(drop=True) - - return result_df - - -# --- 2.3 打印功能数据处理与布局模块 --- - -def get_font_properties(font_path, size=14): - """通用字体加载函数""" - if font_path and os.path.exists(font_path): - return font_manager.FontProperties(fname=font_path, size=size) - else: - st.warning(f"警告:未找到字体文件 '{font_path}',显示可能不正确。将使用默认字体。") - return font_manager.FontProperties(family='sans-serif', size=size) - - -def get_pinyin_abbr(text): - """获取中文文本前两个字的拼音首字母""" - if not text: return "" - chars = [c for c in text if '\u4e00' <= c <= '\u9fff'][:2] - if not chars: return "" - pinyin_list = lazy_pinyin(chars, style=Style.FIRST_LETTER) - return ''.join(pinyin_list).upper() - - -def format_seq(n): - """将数字或字符转换为带圈序号 (①, ②, ③...),非数字则直接返回""" - try: - n = int(n) - except (ValueError, TypeError): - return str(n) - if n <= 0: return str(n) - circled_chars = "①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳㉑㉒㉓㉔㉕㉖㉗㉘㉙㉚㉛㉜㉝㉞㉟㊱㊲㊳㊴㊵㊶㊷㊸㊹㊺㊻㊼㊽㊾㊿" - if 1 <= n <= 50: return circled_chars[n - 1] - return f'({n})' - - -def process_schedule_df(df, base_date, split_time_str, time_adjustment_minutes=0): - """ - 处理排片DataFrame,生成LED屏数据和散场时间数据 - """ - if df is None or df.empty: - return None, None, None - - # 'LED屏排片表' 数据处理 - led_df = df.copy() - try: - # 优先提取 'X号',失败则取第一个字符 + '号' - extracted = led_df['Hall'].astype(str).str.extract(r'(\d+号)') - fallback = led_df['Hall'].astype(str).str[0] + '号' - led_df['Hall'] = extracted[0].fillna(fallback) - - led_df['StartTime_dt'] = pd.to_datetime(led_df['StartTime'], format='%H:%M', errors='coerce').apply( - lambda t: t.replace(year=base_date.year, month=base_date.month, day=base_date.day) if pd.notnull(t) else t) - led_df['EndTime_dt'] = pd.to_datetime(led_df['EndTime'], format='%H:%M', errors='coerce').apply( - lambda t: t.replace(year=base_date.year, month=base_date.month, day=base_date.day) if pd.notnull(t) else t) - led_df.loc[led_df['EndTime_dt'] < led_df['StartTime_dt'], 'EndTime_dt'] += timedelta(days=1) - led_df = led_df.sort_values(['Hall', 'StartTime_dt']) - merged_rows = [] - for _, group in led_df.groupby('Hall'): - current = None - for _, row in group.sort_values('StartTime_dt').iterrows(): - if current is None: - current = row.copy() - elif row['Movie'] == current['Movie']: - current['EndTime_dt'] = row['EndTime_dt'] - else: - merged_rows.append(current) - current = row.copy() - if current is not None: merged_rows.append(current) - merged_df = pd.DataFrame(merged_rows) - merged_df['StartTime_dt'] -= timedelta(minutes=10) - merged_df['EndTime_dt'] -= timedelta(minutes=5) - merged_df['Seq'] = merged_df.groupby('Hall').cumcount() + 1 - merged_df['StartTime_str'] = merged_df['StartTime_dt'].dt.strftime('%H:%M') - merged_df['EndTime_str'] = merged_df['EndTime_dt'].dt.strftime('%H:%M') - led_schedule_df = merged_df[['Hall', 'Seq', 'Movie', 'StartTime_str', 'EndTime_str']] - except Exception as e: - st.error(f"处理 'LED 屏打印表' 数据时出错: {e}") - led_schedule_df = None - - # '散场时间快捷打印' 数据处理 - times_df = df.copy() - try: - # 优先提取数字,失败则取第一个字符 - num_part = times_df['Hall'].str.extract(r'(\d+)')[0] - char_part = times_df['Hall'].astype(str).str[0] - times_df['Hall'] = num_part.fillna(char_part) - times_df.dropna(subset=['Hall', 'StartTime', 'EndTime'], inplace=True) - times_df['StartTime_dt'] = pd.to_datetime(times_df['StartTime'], format='%H:%M', errors='coerce').apply( - lambda t: datetime.combine(base_date, t.time()) if pd.notnull(t) else pd.NaT) - times_df['EndTime_dt'] = pd.to_datetime(times_df['EndTime'], format='%H:%M', errors='coerce').apply( - lambda t: datetime.combine(base_date, t.time()) if pd.notnull(t) else pd.NaT) - times_df.loc[times_df['EndTime_dt'] < times_df['StartTime_dt'], 'EndTime_dt'] += timedelta(days=1) - - # 应用时间提前量 - if time_adjustment_minutes > 0: - times_df['EndTime_dt'] -= timedelta(minutes=time_adjustment_minutes) - - business_start_dt = datetime.combine(base_date, datetime.strptime(BUSINESS_START, "%H:%M").time()) - business_end_dt = datetime.combine(base_date, datetime.strptime(BUSINESS_END, "%H:%M").time()) - if business_end_dt < business_start_dt: business_end_dt += timedelta(days=1) - times_df = times_df[(times_df['EndTime_dt'] >= business_start_dt) & (times_df['EndTime_dt'] <= business_end_dt)] - times_df = times_df.sort_values('EndTime_dt') - split_dt = datetime.combine(base_date, split_time_str) - part1 = times_df[times_df['EndTime_dt'] <= split_dt].copy() - part2 = times_df[times_df['EndTime_dt'] > split_dt].copy() - - # 使用 %H:%M 保证两位小时 - part1['EndTime'] = part1['EndTime_dt'].dt.strftime('%H:%M') - part2['EndTime'] = part2['EndTime_dt'].dt.strftime('%H:%M') - - times_part1_df = part1[['Hall', 'EndTime']] - times_part2_df = part2[['Hall', 'EndTime']] - except Exception as e: - st.error(f"处理 '散场时间表' 数据时出错: {e}") - times_part1_df, times_part2_df = None, None - - return led_schedule_df, times_part1_df, times_part2_df - - -def process_file_upload(file, split_time_str, time_adjustment_minutes=0): - """ - Handles file upload, reads excel and calls the core processing function. - """ - try: - date_df = pd.read_excel(file, header=None, skiprows=7, nrows=1, usecols=[3]) - date_str = pd.to_datetime(date_df.iloc[0, 0]).strftime('%Y-%m-%d') - base_date = pd.to_datetime(date_str).date() - except Exception: - date_str = datetime.today().strftime('%Y-%m-%d') - base_date = datetime.today().date() - - try: - df = pd.read_excel(file, header=9, usecols=[1, 2, 4, 5]) - df.columns = ['Hall', 'StartTime', 'EndTime', 'Movie'] - df['Hall'] = df['Hall'].ffill() - df.dropna(subset=['StartTime', 'EndTime', 'Movie'], inplace=True) - except Exception as e: - st.error(f"读取数据时出错: {e}。请检查文件格式是否为'放映时间核对表'。") - return None, None, None, date_str - - led_data, times_p1, times_p2 = process_schedule_df(df, base_date, split_time_str, time_adjustment_minutes) - return led_data, times_p1, times_p2, date_str - - -def create_print_layout_led(data, date_str, font_path, generate_png=False): - """生成LED屏排片表的PDF/PNG""" - if data is None or data.empty: return None - A4_width_in, A4_height_in = 8.27, 11.69 - dpi = 300 - total_content_rows = len(data) - layout_rows = max(total_content_rows, 25) - totalA = layout_rows + 2 - row_height = A4_height_in / totalA - data = data.reset_index(drop=True) - data['hall_str'] = '$' + data['Hall'].str.replace('号', '') + '^{\\#}$' - data['seq_str'] = data['Seq'].apply(format_seq) - data['pinyin_abbr'] = data['Movie'].apply(get_pinyin_abbr) - data['time_str'] = data['StartTime_str'] + ' - ' + data['EndTime_str'] - temp_fig = plt.figure(figsize=(A4_width_in, A4_height_in), dpi=dpi) - renderer = temp_fig.canvas.get_renderer() - base_font_size_pt = (row_height * 0.9) * 72 - seq_font_size_pt = (row_height * 0.5) * 72 - - def get_col_width_in(series, font_size_pt, is_math=False): - if series.empty: return 0 - font_prop = get_font_properties(font_path, font_size_pt) - longest_str_idx = series.astype(str).str.len().idxmax() - max_content = str(series.loc[longest_str_idx]) - text_width_px, _, _ = renderer.get_text_width_height_descent(max_content, font_prop, ismath=is_math) - return (text_width_px / dpi) * 1.1 - - margin_col_width = row_height - hall_col_width = get_col_width_in(data['hall_str'], base_font_size_pt, is_math=True) - seq_col_width = get_col_width_in(data['seq_str'], seq_font_size_pt) - pinyin_col_width = get_col_width_in(data['pinyin_abbr'], base_font_size_pt) - time_col_width = get_col_width_in(data['time_str'], base_font_size_pt) - movie_col_width = A4_width_in - ( - margin_col_width * 2 + hall_col_width + seq_col_width + pinyin_col_width + time_col_width) - plt.close(temp_fig) - col_widths = {'hall': hall_col_width, 'seq': seq_col_width, 'movie': movie_col_width, 'pinyin': pinyin_col_width, - 'time': time_col_width} - col_x_starts = {} - current_x = margin_col_width - for col_name in ['hall', 'seq', 'movie', 'pinyin', 'time']: - col_x_starts[col_name] = current_x - current_x += col_widths[col_name] - - def draw_figure(fig, ax): - renderer = fig.canvas.get_renderer() - for col_name in ['hall', 'seq', 'movie', 'pinyin']: - x_line = col_x_starts[col_name] + col_widths[col_name] - line_top_y, line_bottom_y = A4_height_in - row_height, row_height - ax.add_line( - Line2D([x_line, x_line], [line_bottom_y, line_top_y], color='gray', linestyle=':', linewidth=0.5)) - last_hall_drawn = None - for i, row in data.iterrows(): - y_bottom = A4_height_in - (i + 2) * row_height - y_center = y_bottom + row_height / 2 - if row['Hall'] != last_hall_drawn: - ax.text(col_x_starts['hall'] + col_widths['hall'] / 2, y_center, row['hall_str'], - fontproperties=get_font_properties(font_path, base_font_size_pt), ha='center', va='center') - last_hall_drawn = row['Hall'] - ax.text(col_x_starts['seq'] + col_widths['seq'] / 2, y_center, row['seq_str'], - fontproperties=get_font_properties(font_path, seq_font_size_pt), ha='center', va='center') - ax.text(col_x_starts['pinyin'] + col_widths['pinyin'] / 2, y_center, row['pinyin_abbr'], - fontproperties=get_font_properties(font_path, base_font_size_pt), ha='center', va='center') - ax.text(col_x_starts['time'] + col_widths['time'] / 2, y_center, row['time_str'], - fontproperties=get_font_properties(font_path, base_font_size_pt), ha='center', va='center') - movie_font_size = base_font_size_pt - movie_font_prop = get_font_properties(font_path, movie_font_size) - text_w_px, _, _ = renderer.get_text_width_height_descent(row['Movie'], movie_font_prop, ismath=False) - text_w_in = text_w_px / dpi - max_width_in = col_widths['movie'] * 0.9 - if text_w_in > max_width_in: - movie_font_size *= (max_width_in / text_w_in) - movie_font_prop = get_font_properties(font_path, movie_font_size) - ax.text(col_x_starts['movie'] + 0.05, y_center, row['Movie'], fontproperties=movie_font_prop, ha='left', - va='center') - is_last_in_hall = (i == len(data) - 1) or (row['Hall'] != data.loc[i + 1, 'Hall']) - line_start_x, line_end_x = margin_col_width, A4_width_in - margin_col_width - if is_last_in_hall: - ax.add_line(Line2D([line_start_x, line_end_x], [y_bottom, y_bottom], color='black', linestyle='-', - linewidth=0.8)) - else: - ax.add_line(Line2D([line_start_x, line_end_x], [y_bottom, y_bottom], color='gray', linestyle=':', - linewidth=0.5)) - - outputs = {} - fig = plt.figure(figsize=(A4_width_in, A4_height_in), dpi=300) - ax = fig.add_axes([0, 0, 1, 1]) - ax.set_axis_off(); - ax.set_xlim(0, A4_width_in); - ax.set_ylim(0, A4_height_in) - ax.text(margin_col_width, A4_height_in - row_height, date_str, fontproperties=get_font_properties(font_path, 10), - color=DATE_COLOR, ha='left', va='bottom', transform=ax.transData) - draw_figure(fig, ax) - pdf_buf = io.BytesIO() - fig.savefig(pdf_buf, format='pdf', dpi=dpi, bbox_inches='tight', pad_inches=0) - pdf_buf.seek(0) - outputs['pdf'] = f"data:application/pdf;base64,{base64.b64encode(pdf_buf.getvalue()).decode()}" - if generate_png: - png_buf = io.BytesIO() - fig.savefig(png_buf, format='png', dpi=dpi, bbox_inches='tight', pad_inches=0) - png_buf.seek(0) - outputs['png'] = f"data:image/png;base64,{base64.b64encode(png_buf.getvalue()).decode()}" - plt.close(fig) - return outputs - - -def create_print_layout_times(data, title, date_str, font_path, size_multiplier=1.1, hall_format='Default', - generate_png=False): - """生成散场时间���的PDF/PNG""" - if data is None or data.empty: return None - - def generate_figure(): - total_items = len(data) - num_rows = math.ceil(total_items / NUM_COLS) if total_items > 0 else 1 - data_area_height_in, cell_width_in = A5_HEIGHT_IN, A5_WIDTH_IN / NUM_COLS - cell_height_in = data_area_height_in / num_rows - target_width_pt, target_height_pt = (cell_width_in * 0.9) * 72, (cell_height_in * 0.9) * 72 - font_size_based_on_width = target_width_pt / (8 * 0.6) - base_fontsize = min(font_size_based_on_width, target_height_pt) * size_multiplier - fig = plt.figure(figsize=(A5_WIDTH_IN, A5_HEIGHT_IN), dpi=300) - fig.subplots_adjust(left=0, right=1, top=1, bottom=0) - gs = gridspec.GridSpec(num_rows, NUM_COLS, hspace=0, wspace=0, figure=fig) - data_values = data.values.tolist() - while len(data_values) % NUM_COLS != 0: data_values.append(['', '']) - rows_per_col_layout = math.ceil(len(data_values) / NUM_COLS) - sorted_data = [['', '']] * len(data_values) - for i, item in enumerate(data_values): - if item[0] and item[1]: - row_in_col, col_idx = i % rows_per_col_layout, i // rows_per_col_layout - new_index = row_in_col * NUM_COLS + col_idx - if new_index < len(sorted_data): sorted_data[new_index] = item - is_first_cell_with_data = True - for idx, (hall, end_time) in enumerate(sorted_data): - if hall and end_time: - row_grid, col_grid = idx // NUM_COLS, idx % NUM_COLS - ax = fig.add_subplot(gs[row_grid, col_grid]) - for spine in ax.spines.values(): - spine.set_visible(True); - spine.set_linestyle((0, (1, 2))) - spine.set_color(BORDER_COLOR); - spine.set_linewidth(0.75) - if is_first_cell_with_data: - ax.text(0.05, 0.95, f"{date_str} {title}", - fontproperties=get_font_properties(font_path, base_fontsize * 0.5), color=DATE_COLOR, - ha='left', va='top', transform=ax.transAxes) - is_first_cell_with_data = False - if hall_format == 'Superscript': - display_text = f'${str(hall)}^{{\\#}}$ {end_time}' - elif hall_format == 'Circled': - display_text = f'{format_seq(hall)} {end_time}' - else: # Default - display_text = f"{str(hall)} {end_time}" - ax.text(0.5, 0.5, display_text, fontproperties=get_font_properties(font_path, base_fontsize), - ha='center', va='center', transform=ax.transAxes) - ax.set_xticks([]); - ax.set_yticks([]); - ax.set_facecolor('none') - return fig - - fig_for_output = generate_figure() - outputs = {} - pdf_buffer = io.BytesIO() - with PdfPages(pdf_buffer) as pdf: - pdf.savefig(fig_for_output) - pdf_buffer.seek(0) - outputs['pdf'] = f"data:application/pdf;base64,{base64.b64encode(pdf_buffer.getvalue()).decode()}" - if generate_png: - png_buffer = io.BytesIO() - fig_for_output.savefig(png_buffer, format='png') - png_buffer.seek(0) - outputs['png'] = f'data:image/png;base64,{base64.b64encode(png_buffer.getvalue()).decode()}' - plt.close(fig_for_output) - return outputs - - -def display_pdf(base64_pdf): - """在Streamlit中嵌入显示PDF""" - return f'' - - -# --- 3. TMS 及天气查询模块 --- -@st.cache_data(show_spinner=False, ttl=600) -def fetch_and_process_server_movies(priority_movie_titles=None): - if priority_movie_titles is None: priority_movie_titles = [] - - # 获取环境变量 - app_secret = os.getenv("TMS_APP_SECRET") - ticket = os.getenv("TMS_TICKET") - theater_id_str = os.getenv("TMS_THEATER_ID") - x_session_id = os.getenv("TMS_X_SESSION_ID") - - # 转换 ID 为整数 - try: - theater_id = int(theater_id_str) if theater_id_str else 0 - except ValueError: - st.error("环境变量 TMS_THEATER_ID 格式错误,应为数字。") - return {}, [] - - token_headers = { - 'Host': 'oa.hengdianfilm.com:7080', 'Content-Type': 'application/json', - 'Origin': 'http://115.239.253.233:7080', 'Connection': 'keep-alive', - 'Accept': 'application/json, text/javascript, */*; q=0.01', - 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_5_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/138.0.7204.156 Mobile/15E148 Safari/604.1', - 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', - } - - # 使用变量 - token_json_data = {'appId': 'hd', 'appSecret': app_secret, 'timeStamp': int(time.time() * 1000)} - # 动态构建 URL - token_url = f'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket={ticket}' - - response = requests.post(token_url, headers=token_headers, json=token_json_data, timeout=10) - - response.raise_for_status() - token_data = response.json() - if token_data.get('error_code') != '0000': raise Exception(f"获取Token失败: {token_data.get('error_desc')}") - auth_token = token_data['param'] - all_movies, page_index = [], 1 - while True: - list_headers = { - 'Accept': 'application/json, text/javascript, */*; q=0.01', - 'Content-Type': 'application/json; charset=UTF-8', - 'Origin': 'http://115.239.253.233:7080', 'Proxy-Connection': 'keep-alive', - 'Token': auth_token, - 'User-Agent': 'Mozilla/5.0 ...', - 'X-SESSIONID': x_session_id, # 使用变量 - } - list_params = {'token': 'hd', 'murl': 'ContentMovie'} - # 使用变量 - list_json_data = {'THEATER_ID': theater_id, 'SOURCE': 'SERVER', 'ASSERT_TYPE': 2, 'PAGE_CAPACITY': 20, - 'PAGE_INDEX': page_index} - - list_url = 'http://oa.hengdianfilm.com:7080/cinema-api/cinema/server/dcp/list' - response = requests.post(list_url, params=list_params, headers=list_headers, json=list_json_data, verify=False) - response.raise_for_status() - movie_data = response.json() - if movie_data.get("RSPCD") != "000000": raise Exception(f"获取影片列表失败: {movie_data.get('RSPMSG')}") - body = movie_data.get("BODY", {}); - movies_on_page = body.get("LIST", []) - if not movies_on_page: break - all_movies.extend(movies_on_page) - if len(all_movies) >= body.get("COUNT", 0): break - page_index += 1; - time.sleep(1) - movie_details = {m.get('CONTENT_NAME'): {'assert_name': m.get('ASSERT_NAME'), - 'halls': sorted([h.get('HALL_NAME') for h in m.get('HALL_INFO', [])]), - 'play_time': m.get('PLAY_TIME')} for m in all_movies if - m.get('CONTENT_NAME')} - by_hall = defaultdict(list) - for content_name, details in movie_details.items(): - for hall_name in details['halls']: - by_hall[hall_name].append({'content_name': content_name, 'details': details}) - for hall_name in by_hall: - by_hall[hall_name].sort( - key=lambda item: (item['details']['assert_name'] is None or item['details']['assert_name'] == '', - item['details']['assert_name'] or item['content_name'])) - view2_list = [{'assert_name': d['assert_name'], 'content_name': c, 'halls': d['halls'], 'play_time': d['play_time']} - for c, d in movie_details.items() if d.get('assert_name')] - priority_list = [item for item in view2_list if - any(p_title in item['assert_name'] for p_title in priority_movie_titles)] - other_list_items = [item for item in view2_list if item not in priority_list] - priority_list.sort(key=lambda x: x['assert_name']); - other_list_items.sort(key=lambda x: x['assert_name']) - final_sorted_list = priority_list + other_list_items - return dict(sorted(by_hall.items())), final_sorted_list - - -@st.cache_data(show_spinner=False, ttl=600) -def get_weather_forecast(target_date): - """获取指定日期的天气信息并格式化为标题字符串""" - if not target_date: - return "当日放映影片" - url = "https://restapi.amap.com/v3/weather/weatherInfo" - params = {'key': GAODE_API_KEY, 'city': ADCODE, 'extensions': 'all', 'output': 'JSON'} - try: - response = requests.get(url, params=params, timeout=5) - response.raise_for_status() - data = response.json() - if data.get('status') == '1' and data.get('forecasts'): - target_date_str = target_date.strftime('%Y-%m-%d') - for day_cast in data['forecasts'][0].get('casts', []): - if day_cast.get('date') == target_date_str: - weekday_map = {'1': '一', '2': '二', '3': '三', '4': '四', '5': '五', '6': '六', '7': '日'} - week = weekday_map.get(day_cast.get('week'), '') - weather = day_cast.get('dayweather', '未知') - max_temp = day_cast.get('daytemp', 'N/A') - min_temp = day_cast.get('nighttemp', 'N/A') - return f"今日放映影片({target_date_str},星期{week},{weather},{max_temp}℃ / {min_temp}℃)" - except Exception as e: - print(f"天气 API 请求失败: {e}") - return f"今日放映影片({target_date.strftime('%Y-%m-%d')},天气未知)" - - -def get_circled_number(hall_name): - mapping = {'1': '①', '2': '②', '3': '③', '4': '④', '5': '⑤', '6': '⑥', '7': '⑦', '8': '⑧', '9': '⑨'} - num_str = ''.join(filter(str.isdigit, hall_name)); - return mapping.get(num_str, '') - - -def format_play_time(time_str): - if not time_str or not isinstance(time_str, str): return None - try: - parts = time_str.split(':'); - hours = int(parts[0]); - minutes = int(parts[1]) - return hours * 60 + minutes - except (ValueError, IndexError): - return None - - -def add_tms_locations_to_analysis(analysis_df, tms_movie_list): - locations = [] - for _, row in analysis_df.iterrows(): - movie_title = row['影片'] - found_versions = [] - for tms_movie in tms_movie_list: - if tms_movie['assert_name'].startswith(movie_title): - version_name = tms_movie['assert_name'].replace(movie_title, '').strip() - circled_halls = " ".join(sorted([get_circled_number(h) for h in tms_movie['halls']])) - found_versions.append(f"{version_name}:{circled_halls}" if version_name else circled_halls) - locations.append('|'.join(found_versions)) - analysis_df['影片所在影厅位置'] = locations - return analysis_df - - -# --- 4. 新增的经营数据API函数 --- -def fetch_income_data(token, date_str): - """获取指定日期的票房、卖品收入和卖品占比""" - url = 'https://app.bi.piao51.cn/cinema-app/mycinema/incomeProportion.action' - params = {'cinemaId': CINEMA_ID, 'token': token, 'qTimeStart': date_str, 'qTimeEnd': date_str} - headers = {'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'User-Agent': 'Mozilla/5.0'} - try: - response = requests.get(url, params=params, headers=headers, timeout=10) - response.raise_for_status() - data = response.json() - if data.get('code') == 'A00000' and data.get('results'): - day_data = data['results'][0] - return { - "ticket_income": float(day_data.get('ticketIncome') or 0), - "goods_income": float(day_data.get('goodsIncome') or 0), - "sold_incomes_zb": float(day_data.get('soldIncomesZb') or 0) - } - except Exception as e: - st.warning(f"获取经营收入数据失败: {e}") - return None - -def fetch_membership_data(token, date_str): - """获取指定日期的文旅卡开卡数""" - url = 'https://app.bi.piao51.cn/cinema-app/mycinema/membership.action' - params = {'token': token, 'cinemaId': CINEMA_ID, 'startDate': date_str, 'endDate': date_str} - headers = {'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'User-Agent': 'Mozilla/5.0'} - try: - response = requests.get(url, params=params, headers=headers, timeout=10) - response.raise_for_status() - data = response.json() - if data.get('code') == 'A00000' and data.get('results'): - for card_type in data['results']: - if card_type.get('cardLevelName') == '文旅消费卡': - return int(card_type.get('sendCardNums', 0)) - return 0 # 没找到文旅卡 - except Exception as e: - st.warning(f"获取会员开卡数据失败: {e}") - return 0 - - -# --- 6. 新增:场次合理性检查日志函数 --- -def generate_schedule_check_logs(schedule_list, date_str): - """ - 生成场次合理性检查日志 - :param schedule_list: 排片数据列表 (list of dicts) - :param date_str: 排片日期字符串 (YYYY-MM-DD) - :return: 格式化后的日志文本 (str) - """ - if not schedule_list: - return "无排片数据,无法进行合理性检查。" - - # 转换为 DataFrame 方便处理 - df_original = pd.DataFrame(schedule_list) - # 重命名列以符合逻辑处理习惯 - df_original.rename( - columns={'hallName': 'Hall', 'movieName': 'filmName', 'showStartTime': 'StartTime', 'showEndTime': 'EndTime'}, - inplace=True) - - # 预处理:转换时间列,简化影厅名 - df_original['startTime'] = pd.to_datetime(df_original['StartTime'], format='%H:%M', errors='coerce').apply( - lambda t: datetime.combine(datetime.strptime(date_str, '%Y-%m-%d').date(), t.time()) if pd.notnull( - t) else pd.NaT) - df_original['endTime'] = pd.to_datetime(df_original['EndTime'], format='%H:%M', errors='coerce').apply( - lambda t: datetime.combine(datetime.strptime(date_str, '%Y-%m-%d').date(), t.time()) if pd.notnull( - t) else pd.NaT) - # 处理跨天时间 - df_original.loc[df_original['endTime'] < df_original['startTime'], 'endTime'] += timedelta(days=1) - - # 简单影厅名处理 (仅提取数字或主要标识) - def simplify_hall(name): - import re - match = re.search(r'(\d+号?)', str(name)) - return match.group(1) if match else str(name)[:2] - - df_original['simpleHallName'] = df_original['Hall'].apply(simplify_hall) - - df_check = df_original.sort_values(by='startTime').reset_index(drop=True) - final_log_parts = [] - - # --- Rule 1: 同影片场次间隔过近 --- - logs_r1 = [] - for film_name in df_check['filmName'].unique(): - film_schedules = df_check[df_check['filmName'] == film_name].sort_values(by='startTime').reset_index() - if len(film_schedules) > 1: - for i in range(len(film_schedules) - 1): - s1, s2 = film_schedules.iloc[i], film_schedules.iloc[i + 1] - interval = (s2['startTime'] - s1['startTime']).total_seconds() / 60 - if interval < 30: - log_entry = f"《{s1['filmName']}》{s1['simpleHallName']}【{s1['startTime'].strftime('%H:%M')}】和 {s2['simpleHallName']}【{s2['startTime'].strftime('%H:%M')}】开场时间距离 {int(interval)} 分钟" - logs_r1.append(log_entry) - - final_log_parts.append("规则一:同影片场次间隔过近(少于 30 分钟)") - if logs_r1: - for i, log in enumerate(logs_r1, 1): - final_log_parts.append(f"{i}. {log}") - else: - final_log_parts.append("(无)") - - # --- Rule 2: 30 分钟内影片开场超过 4 场 --- - logs_r2 = [] - i = 0 - processed_indices_r2 = set() - while i < len(df_check): - if i in processed_indices_r2: - i += 1 - continue - window_start_time = df_check.iloc[i]['startTime'] - window_end_time_30min = window_start_time + timedelta(minutes=30) - window_df = df_check[ - (df_check['startTime'] >= window_start_time) & (df_check['startTime'] < window_end_time_30min)] - - if len(window_df) > 4: - start_t_str = window_df.iloc[0]['startTime'].strftime('%H:%M') - end_t_str = window_df.iloc[-1]['startTime'].strftime('%H:%M') - log_message_lines = [f"【{start_t_str} - {end_t_str}】开场时间比较集中:"] - for _, row in window_df.iterrows(): - log_message_lines.append( - f" {row['simpleHallName']}《{row['filmName']}》> {row['startTime'].strftime('%H:%M')}") - processed_indices_r2.add(row.name) - logs_r2.append("\n".join(log_message_lines)) - i += 1 - - final_log_parts.append("\n规则二:30 分钟内影片开场超过 4 场") - if logs_r2: - for i, log in enumerate(logs_r2, 1): - final_log_parts.append(f"{i}. {log}") - else: - final_log_parts.append("(无)") - - # --- Rule 3: 场次开场间隔超过 30 分钟 --- - logs_r3 = [] - if len(df_check) > 1: - for i in range(len(df_check) - 1): - s1_start, s2_start = df_check.iloc[i]['startTime'], df_check.iloc[i + 1]['startTime'] - gap = (s2_start - s1_start).total_seconds() / 60 - if gap > 30: - log_entry = f"【{s1_start.strftime('%H:%M')} ~ {s2_start.strftime('%H:%M')}】缺少影片开场,间隔 {int(gap)} 分钟" - logs_r3.append(log_entry) - - final_log_parts.append("\n规则三:场次开场间隔超过 30 分钟") - if logs_r3: - for i, log in enumerate(logs_r3, 1): - final_log_parts.append(f"{i}. {log}") - else: - final_log_parts.append("(无)") - - # --- Rule 4: 最早/最晚场次时间检查 --- - logs_r4 = [] - if not df_check.empty: - first_sched = df_check.iloc[0] - last_sched = df_check.iloc[-1] - if first_sched['startTime'].time() > dt_time(10, 0): - logs_r4.append( - f"最早一场 {first_sched['simpleHallName']}《{first_sched['filmName']}》{first_sched['startTime'].strftime('%H:%M')} 晚于 10:00") - if last_sched['startTime'].time() < dt_time(22, 30): - logs_r4.append( - f"最晚一场 {last_sched['simpleHallName']}《{last_sched['filmName']}》{last_sched['startTime'].strftime('%H:%M')} 早于 22:30") - - final_log_parts.append("\n规则四:最早一场晚于 10:00,最晚一场早于 22:30") - if logs_r4: - for i, log in enumerate(logs_r4, 1): - final_log_parts.append(f"{i}. {log}") - else: - final_log_parts.append("(无)") - - # --- Rule 5: 影厅空闲时间超过 1 小时 (10:00-23:00) --- - logs_r5 = [] - today_date = datetime.strptime(date_str, '%Y-%m-%d').date() - window_start_time_limit_r5 = datetime.combine(today_date, dt_time(10, 0)) - window_end_time_limit_r5 = datetime.combine(today_date, dt_time(23, 0)) - - unique_halls_r5 = df_original['simpleHallName'].unique() - for hall_name in unique_halls_r5: - hall_df = df_original[df_original['simpleHallName'] == hall_name].sort_values(by='startTime') - if len(hall_df) > 1: - for i in range(len(hall_df) - 1): - prev_sched_end = hall_df.iloc[i]['endTime'] - curr_sched_start = hall_df.iloc[i + 1]['startTime'] - if prev_sched_end < window_end_time_limit_r5 and curr_sched_start > window_start_time_limit_r5: - idle_duration_minutes = (curr_sched_start - prev_sched_end).total_seconds() / 60 - if idle_duration_minutes > 60: - log_entry = f"{hall_name} 【{prev_sched_end.strftime('%H:%M')} ~ {curr_sched_start.strftime('%H:%M')}】无影片在播,时长 {int(idle_duration_minutes)} 分钟" - logs_r5.append(log_entry) - - final_log_parts.append("\n规则五:影厅空闲时间超过 1 小时(10:00-23:00)") - if logs_r5: - for i, log in enumerate(logs_r5, 1): - final_log_parts.append(f"{i}. {log}") - else: - final_log_parts.append("(无)") - - # --- Rule 6: 影厅场次转换时间检查 --- - logs_r6 = [] - for hall_name in df_original['simpleHallName'].unique(): - hall_df = df_original[df_original['simpleHallName'] == hall_name].sort_values(by='startTime') - if len(hall_df) > 1: - for i in range(len(hall_df) - 1): - prev_sched = hall_df.iloc[i] - next_sched = hall_df.iloc[i + 1] - conversion_time = (next_sched['startTime'] - prev_sched['endTime']).total_seconds() / 60 - if conversion_time < 10: - logs_r6.append( - f"{hall_name} {prev_sched['endTime'].strftime('%H:%M')} 《{prev_sched['filmName']}》结束后影厅空闲时间仅为 {int(conversion_time)} 分钟") - - final_log_parts.append("\n规则六:影厅场次转换时间检查") - if logs_r6: - for i, log in enumerate(logs_r6, 1): - final_log_parts.append(f"{i}. {log}") - else: - final_log_parts.append("(无)") - - # --- Rule 7: 动态散场和入场高峰预警 --- - logs_r7 = [] - final_log_parts.append("\n规则七:动态散场和入场高峰预警") - - if not df_check.empty: - start_time = df_check.iloc[0]['startTime'].replace(second=0, microsecond=0) - end_time = df_check.iloc[-1]['endTime'] - current_time = start_time - reported_windows = set() - - while current_time < end_time: - window_end = current_time + timedelta(minutes=10) - starts_in_window = df_check[(df_check['startTime'] >= current_time) & (df_check['startTime'] < window_end)] - ends_in_window = df_check[(df_check['endTime'] > current_time) & (df_check['endTime'] <= window_end)] - - if len(starts_in_window) + len(ends_in_window) > 5: - window_tuple = (current_time.strftime('%H:%M'), window_end.strftime('%H:%M')) - if window_tuple not in reported_windows: - exit_halls = "、".join(ends_in_window['simpleHallName']) - entry_halls = "、".join(starts_in_window['simpleHallName']) - log_msg = f"【{current_time.strftime('%H:%M')} ~ {window_end.strftime('%H:%M')}】" - if not ends_in_window.empty: - log_msg += f",{exit_halls}集中散场" - if not starts_in_window.empty: - if not ends_in_window.empty: - log_msg += ",同时" - else: - log_msg += "," - log_msg += f"{entry_halls}即将入场" - log_msg += ",预计人流瞬时压力过大。" - logs_r7.append(log_msg) - reported_windows.add(window_tuple) - current_time += timedelta(minutes=5) - - # Part 2: Simultaneous start - start_groups = df_check.groupby('startTime').filter(lambda x: len(x) > 3) - for time_val, group in start_groups.groupby('startTime'): - halls = "、".join(group['simpleHallName']) - logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时开场,注意预计人流瞬时压力过大。") - - # Part 3: Simultaneous end - end_groups = df_check.groupby('endTime').filter(lambda x: len(x) > 3) - for time_val, group in end_groups.groupby('endTime'): - halls = "、".join(group['simpleHallName']) - logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时散场,注意预计人流瞬时压力过大。") - - if logs_r7: - for i, log in enumerate(logs_r7, 1): - final_log_parts.append(f"{i}. {log}") - else: - final_log_parts.append("(无)") - - # --- Rule 8: “幽灵厅”预警 --- - logs_r8 = [] - final_log_parts.append("\n规则八:影厅结束运营过早预警") - for hall_name in df_original['simpleHallName'].unique(): - last_sched = df_original[df_original['simpleHallName'] == hall_name].nlargest(1, 'endTime').iloc[0] - # 简单判断:如果最后一场结束时间早于 22:30 且就是当天(不是跨天到凌晨) - if last_sched['endTime'].date() == today_date and last_sched['endTime'].time() < dt_time(22, 30): - logs_r8.append(f"{hall_name} 最后一场于【{last_sched['endTime'].strftime('%H:%M')}】结束,过早停运。") - if logs_r8: - for i, log in enumerate(logs_r8, 1): - final_log_parts.append(f"{i}. {log}") - else: - final_log_parts.append("(无)") - - # --- Rule 9: 真正意义的黄金时段热门影片排片密度检查 --- - logs_r9 = [] - final_log_parts.append("\n规则九:黄金时段热门影片排片密度检查") - if not df_check.empty: - weekday = today_date.weekday() - golden_hours_r9 = [ - [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], - [(dt_time(14, 0), dt_time(15, 30)), (dt_time(19, 0), dt_time(22, 20))], - [(dt_time(14, 30), dt_time(16, 0)), (dt_time(19, 0), dt_time(21, 40))], - [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], - [(dt_time(14, 0), dt_time(15, 0)), (dt_time(19, 0), dt_time(22, 0))], - [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], - [(dt_time(14, 0), dt_time(17, 0)), (dt_time(19, 0), dt_time(21, 30))] - ][weekday] - - film_counts = df_check['filmName'].value_counts() - if not film_counts.empty: - max_count = film_counts.iloc[0] - # 定义热门影片:排片量接近第一名(95%以上)的影片 - hot_films = film_counts[film_counts >= max_count * 0.95].index.tolist() - - golden_hour_schedules = df_check[ - df_check['startTime'].apply(lambda dt: any(start <= dt.time() < end for start, end in golden_hours_r9))] - - for film in hot_films: - hot_film_total_in_golden = len(golden_hour_schedules[golden_hour_schedules['filmName'] == film]) - golden_total = len(golden_hour_schedules) - if golden_total > 0: - ratio = hot_film_total_in_golden / golden_total - if ratio < 0.3: # 热门影片黄金时段占比低于30%预警 - period_str = " 和 ".join( - [f"{s.strftime('%H:%M')}-{e.strftime('%H:%M')}" for s, e in golden_hours_r9]) - logs_r9.append(f"《{film}》在核心黄金时段 {period_str} 排片占比仅为{ratio:.1%},低于 30%。") - - if logs_r9: - for i, log in enumerate(logs_r9, 1): - final_log_parts.append(f"{i}. {log}") - else: - final_log_parts.append("(无)") - - return "\n".join(final_log_parts) - - -def check_tms_file_availability(schedule_list, tms_data, date_str): - """ - 对比排片表和TMS数据,检查影厅是否缺失对应的影片文件 - 优化:仅匹配核心片名(去除版本后缀),优化影厅名显示,合并日志输出 - """ - if not schedule_list: - return "未获取到排片数据,无法检查。" - - if not tms_data: - return "未获取到 TMS 数据,无法检查。" - - # --- 内部辅助函数 --- - def get_core_movie_name(raw_name): - """ - 获取核心片名用于匹配: - 1. 先执行标准的 clean_movie_title (统一命名) - 2. 再去除所有括号及括号内的内容 (去除版本/制式信息) - 例如:'疯狂动物城2(数字3D)' -> '疯狂动物城2' - """ - # 1. 基础清洗 (利用现有的逻辑处理中英文/特殊后缀) - # 注意:这里我们不传入 canonical_names,只做规则清洗 - name = clean_movie_title(raw_name) - # 2. 正则去除中文全角括号及内容 (...) - name = re.sub(r'(.*?)', '', name) - # 3. 正则去除英文半角括号及内容 (...) - name = re.sub(r'\(.*?\)', '', name) - return name.strip() - - def clean_hall_display_name(raw_name): - """去除影厅名两端多余的 【】 [] 符号""" - return str(raw_name).strip('【】[] ') - - def get_hall_key_num(name): - """提取影厅数字ID用于数据匹配 (如 '1号厅' -> '1')""" - nums = re.findall(r'\d+', str(name)) - return nums[0] if nums else str(name) - - # ------------------ - - # 1. 预处理 TMS 数据 - # 结构: {'1': ['Zootopia2', '疯狂动物城2', ...], ...} - tms_map = defaultdict(set) # 使用 set 提高查找效率 - - for hall_name, movies in tms_data.items(): - hall_key = get_hall_key_num(hall_name) - for movie in movies: - # 收集 Assert Name (显示名) - if movie.get('details', {}).get('assert_name'): - # 同样对 TMS 里的名字取核心名,提高匹配率 - core_tms_name = get_core_movie_name(str(movie['details']['assert_name'])) - tms_map[hall_key].add(core_tms_name.upper()) - # 保留原始 Assert Name 用于兜底匹配 - tms_map[hall_key].add(str(movie['details']['assert_name']).upper()) - - # 收集 Content Name (文件名/UUID) - if movie.get('content_name'): - tms_map[hall_key].add(str(movie['content_name']).upper()) - - # 2. 遍历排片数据进行检查 - missing_logs = [] - checked_combinations = set() - - for item in schedule_list: - hall_raw = item.get('hallName') or item.get('Hall') - movie_raw = item.get('movieName') or item.get('Movie') - - if not hall_raw or not movie_raw: - continue - - # 准备数据 - hall_num = get_hall_key_num(hall_raw) - hall_display = clean_hall_display_name(hall_raw) # 清洗后的影厅名 - - # 获取排片的核心片名 (去掉版本后缀) - target_movie_core = get_core_movie_name(movie_raw).upper() - - # 组合键去重 (同一厅同一部片只报一次) - combo_key = (hall_num, target_movie_core) - if combo_key in checked_combinations: - continue - checked_combinations.add(combo_key) - - # 检查逻辑 - if hall_num not in tms_map: - # 找不到影厅数据暂不报错,可能是未映射或设备离线,避免刷屏 - continue - - # 核心匹配:检查 TMS 集合中是否包含核心片名 - # 方式A:精确匹配核心名 (推荐,最准) - # 方式B:模糊包含 (target in tms_file) - - has_file = False - tms_files = tms_map[hall_num] - - # 策略:只要 TMS 中有一个文件名 包含 我们的核心排片名,就视为有片 - # 例如:排片 core='疯狂动物城2',TMS='疯狂动物城2_IMAX' -> 匹配成功 - for tms_file in tms_files: - if target_movie_core in tms_file: - has_file = True - break - - if not has_file: - # 记录日志,使用清洗后的影厅名和排片原名 - missing_logs.append(f"【{hall_display}】排映《{movie_raw}》,但服务器未检测到包含“{target_movie_core}”的文件。") - - # 3. 格式化输出 - if not missing_logs: - return None # 返回 None 表示一切正常 - - # 生成带编号的字符串 - formatted_output = [] - for idx, log in enumerate(missing_logs, 1): - formatted_output.append(f"{idx}. ❌ 缺片警告:{log}") - - return "\n".join(formatted_output) - -# --- 5. UI 渲染与交互逻辑 --- - -def display_analysis_results(df_raw, data_source_name, date_for_display, query_tms_enabled): - if df_raw.empty: - st.info(f"请先从 {data_source_name} 加载数据。"); - return - - if data_source_name == "文件": - token_data = load_token() - if not token_data: - token_data = login_and_get_token() - - token = token_data.get('token') if token_data else None - date_str = date_for_display.strftime('%Y-%m-%d') if date_for_display else None - - canonical_names = [] - if token and date_str: - canonical_names = fetch_canonical_movie_names(token, date_str) - - df_raw['影片名称_清理后'] = df_raw['影片名称'].apply(lambda x: clean_movie_title(x, canonical_names)) - else: - df_raw['影片名称_清理后'] = df_raw['影片名称'] - - date_str = f"{date_for_display} " if date_for_display else "" - total_revenue, total_attendance, total_sessions = df_raw['总收入'].sum(), df_raw['总人次'].sum(), len(df_raw) - st.markdown( - f"> {date_str}数据总览:总票房 **¥{total_revenue:,.2f}** | 总人次 **{total_attendance:,.0f}** | 总场次 **{total_sessions:,.0f}**") - - format_config = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', '均价': '{:.2f}', - '座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', '座次效率': '{:.2f}', - '场次效率': '{:.2f}'} - full_day_analysis, prime_time_analysis = process_and_analyze_data(df_raw.copy()), process_and_analyze_data( - df_raw[df_raw['放映时间'].between(dt_time(14, 0), dt_time(21, 0))].copy()) - - if query_tms_enabled: - with st.spinner("正在关联查询 TMS 服务器..."): +# --- 4. 监控服务主逻辑 --- + +class CinemaMonitor: + def __init__(self): + self.logs = deque(maxlen=50) + self.status_text = "初始化中..." + self.next_wakeup = None + self.monitored_sessions = [] + + self.api = APIManager(self.log) + self.current_business_date = None + self.daily_schedule_cache = None + + self.zero_ticket_candidates = set() + self.alerted_sessions = set() + + # 新增:记录上一次发送每日报告的日期 + self.last_daily_report_date = None + + self.simulation_mode = False + self.simulation_data = [] + + self.thread = threading.Thread(target=self._run_loop, daemon=True) + self.thread.start() + + def log(self, msg): + timestamp = get_beijing_now().strftime("%H:%M:%S") + entry = f"[{timestamp}] {msg}" + print(entry) + self.logs.appendleft(entry) + + def _run_loop(self): + self.log("🚀 监控服务已启动") + + while True: try: - priority_titles = full_day_analysis['影片'].unique().tolist() - _, tms_movie_list = fetch_and_process_server_movies(priority_titles) - full_day_analysis = add_tms_locations_to_analysis(full_day_analysis, tms_movie_list) - prime_time_analysis = add_tms_locations_to_analysis(prime_time_analysis, tms_movie_list) - if '影片所在影厅位置' in full_day_analysis.columns: - cols = full_day_analysis.columns.tolist(); - full_day_analysis = full_day_analysis[cols[:1] + ['影片所在影厅位置'] + cols[1:-1]] - if '影片所在影厅位置' in prime_time_analysis.columns: - cols = prime_time_analysis.columns.tolist(); - prime_time_analysis = prime_time_analysis[cols[:1] + ['影片所在影厅位置'] + cols[1:-1]] - st.toast("TMS 影片位置关联成功!", icon="🔗") - except Exception as e: - st.error(f"关联TMS失败: {e}") - - st.markdown("#### 全天排片效率分析"); - st.dataframe(full_day_analysis.style.format(format_config).apply(style_efficiency, axis=1), - use_container_width=True, hide_index=True) - st.markdown("#### 黄金时段排片效率分析 (14:00-21:00)"); - st.dataframe(prime_time_analysis.style.format(format_config).apply(style_efficiency, axis=1), - use_container_width=True, hide_index=True) - if not full_day_analysis.empty: - st.markdown("### 排片效率汇总") - full_day_summary = full_day_analysis.rename( - columns={'场次': '全部场次', '座次效率': '全部座次效率', '场次效率': '全部场次效率'}) - full_day_cols_to_keep = ['影片', '票房', '全部场次', '全部座次效率', '全部场次效率'] - if '影片所在影厅位置' in full_day_summary.columns: full_day_cols_to_keep.insert(1, '影片所在影厅位置') - full_day_summary = full_day_summary[full_day_cols_to_keep] - prime_time_summary = prime_time_analysis.rename( - columns={'场次': '黄金时段场次', '座次效率': '黄金时段座次效率', '场次效率': '黄金时段场次效率'})[ - ['影片', '黄金时段场次', '黄金时段座次效率', '黄金时段场次效率']] - summary_df = pd.merge(full_day_summary, prime_time_summary, on='影片', how='left').fillna(0) - summary_df['黄金时段场次'] = summary_df['黄金时段场次'].astype(int) - summary_format_config = {'票房': '{:,.2f}', '全部场次': '{:,.0f}', '黄金时段场次': '{:,.0f}', - '全部座次效率': '{:.2f}', '全部场次效率': '{:.2f}', '黄金时段座次效率': '{:.2f}', - '黄金时段场次效率': '{:.2f}'} - st.dataframe(summary_df.style.format(summary_format_config).apply(style_summary_efficiency, axis=1), - use_container_width=True, hide_index=True) - - -def fetch_and_process_daily_sessions(date_str, quiet=False): - """获取并处理指定日期的排片场次,返回(场次字典, 总场次数)""" - if not quiet: st.write(f"正在查询 {date_str} 的排片数据...") - - token_data = load_token() - token = token_data.get('token') if token_data else None - if not token: - token_data = login_and_get_token() - token = token_data.get('token') if token_data else None - - schedule, _ = get_api_data_with_token_management(date_str) - - if not schedule: - if not quiet: st.warning(f"未能获取到 {date_str} 的排片数据。") - return None, None, None # 修改返回值,增加 raw_schedule - - total_sessions = len(schedule) - df = pd.DataFrame(schedule) - - canonical_names = [] - if token: - canonical_names = fetch_canonical_movie_names(token, date_str) - - df['影片名称_清理后'] = df['movieName'].apply(lambda x: clean_movie_title(x, canonical_names)) - - sessions_map = df.groupby('影片名称_清理后').size().to_dict() - # 返回 (映射表, 总场次, 原始排片列表) - return sessions_map, total_sessions, schedule - - -def generate_efficiency_report_df(analysis_df, next_day_sessions_map=None, next_day_total_sessions=None): - if analysis_df.empty: return pd.DataFrame() - report_df = analysis_df[ - ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率', '场次效率']].copy() - report_df['情况说明'] = ''; - report_df['次日调整方案'] = '' - if next_day_sessions_map is not None: - report_df['次日场数'] = report_df['影片'].map(next_day_sessions_map).fillna(0).astype(int) - else: - report_df['次日场数'] = report_df['场次'] - totals = report_df[['座位数', '场次', '票房', '人次']].sum() - totals['影片'] = '' - totals['次日场数'] = next_day_total_sessions if next_day_total_sessions is not None else report_df['次日场数'].sum() - report_df = pd.concat([report_df, pd.DataFrame(totals).T], ignore_index=True) - return report_df - - -def generate_excel_paste_data(df): - if df.empty: return "" - lines, total_row_num = [], len(df) + 1 - for i, row in df.iterrows(): - excel_row_num = i + 2 - line = [row['影片'], row['座位数'], row['场次'], row['票房'], row['人次']] - if i == len(df) - 1: # 合计行 - line.extend(['', '', '', '', '', '', '', '', row['次日场数']]) - else: # 数据行 - line.extend([f"=IFERROR(F{excel_row_num}/G{excel_row_num},0)", f"=D{excel_row_num}/D${total_row_num}", - f"=E{excel_row_num}/E${total_row_num}", f"=F{excel_row_num}/F${total_row_num}", - f"=IFERROR(K{excel_row_num}/I{excel_row_num},0)", - f"=IFERROR(K{excel_row_num}/J{excel_row_num},0)", row['情况说明'], row['次日调整方案'], - row['次日场数']]) - lines.append("\t".join(map(str, line))) - return "\n".join(lines) - - -def get_business_date(df_with_datetime): - """根据包含完整日期时间列的DataFrame计算营业日""" - crossover_time = dt_time(6, 0) - df_with_datetime['business_date'] = df_with_datetime['datetime'].apply( - lambda dt: (dt - timedelta(days=1)).date() if dt.time() < crossover_time else dt.date() - ) - return df_with_datetime['business_date'].mode()[0] - - -# --- 主应用 --- -def main(): - st.title('影城工作便捷工具') - - # 初始化 session_state - if 'file_df' not in st.session_state: st.session_state.file_df, st.session_state.api_df = pd.DataFrame(), pd.DataFrame() - if 'api_date' not in st.session_state: st.session_state.api_date = datetime.now().date() - if 'file_date' not in st.session_state: st.session_state.file_date = None - if 'today_movie_count' not in st.session_state: st.session_state.today_movie_count = 0 - if 'previous_day_movie_count' not in st.session_state: st.session_state.previous_day_movie_count = 0 - if 'daily_report_df' not in st.session_state: st.session_state.daily_report_df = pd.DataFrame() - if 'processed_print_data' not in st.session_state: st.session_state.processed_print_data = None - if 'check_logs' not in st.session_state: st.session_state.check_logs = "" - - tab1, tab2, tab_sales, tab_report, tab_print, tab3 = st.tabs( - ["🔍 排片效率分析", "📋 次日排片效率分析报表", "🍿 卖品品类分析表", "📑 影片映出日累计表", "🖨️ 场次与散场打印", - "🎬 TMS 影片查询"]) - - with tab1: - # 顶部控制区 - col_a, col_b = st.columns(2) - with col_a: - import_from_file = st.checkbox("从`影片映出日累计报表.xlsx`导入数据") - with col_b: - query_tms_for_location = st.checkbox("查询 TMS 找影片所在影厅") - - if import_from_file: - st.header("从本地文件导入数据") - st.write("上传 `影片映出日累计报表.xlsx`,程序将自动处理数据。") - uploaded_file = st.file_uploader("上传 Excel 文件", type=['xlsx', 'xls'], label_visibility="collapsed") - if uploaded_file is not None: - with st.spinner("正在处理文件..."): - try: - df = pd.read_excel(uploaded_file, skiprows=3, header=None) - df.rename(columns={0: '影片名称', 1: '放映日期', 2: '放映时间', 5: '总人次', 6: '总收入', - 7: '座位数'}, inplace=True) - df_for_date_calc = df[['放映日期', '放映时间']].copy() - df_for_date_calc['datetime_str'] = df_for_date_calc['放映日期'].astype(str).str.split(' ').str[ - 0] + ' ' + df_for_date_calc['放映时间'].astype(str) - df_for_date_calc['datetime'] = pd.to_datetime(df_for_date_calc['datetime_str'], errors='coerce') - df_for_date_calc.dropna(subset=['datetime'], inplace=True) - business_date = get_business_date(df_for_date_calc) - st.session_state.file_date = business_date - st.toast(f"文件营业日识别为: {business_date}", icon="🗓️") - df = df[['影片名称', '放映时间', '座位数', '总收入', '总人次']] - df.dropna(subset=['影片名称', '放映时间'], inplace=True) - for col in ['座位数', '总收入', '总人次']: df[col] = pd.to_numeric(df[col], - errors='coerce').fillna(0) - df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M:%S', errors='coerce').dt.time - df.dropna(subset=['放映时间'], inplace=True) - st.session_state.file_df = df - except Exception as e: - st.error(f"处理文件或识别日期时出错: {e}"); - st.session_state.file_df, st.session_state.file_date = pd.DataFrame(), None - # 展示分析结果 (传递 query_tms_for_location 的状态) - display_analysis_results(st.session_state.file_df, "文件", st.session_state.file_date, - query_tms_for_location) - - else: - st.header("使用 API 获取数据") - st.session_state.api_date = st.date_input("选择要查询的排片日期", value=st.session_state.api_date, - key="api_date_picker") - if st.button("获取排片数据", key="fetch_api_data", icon="🫵"): - with st.spinner(f"正在获取 {st.session_state.api_date} 的排片数据..."): - - token_data = load_token() - if not token_data: - token_data = login_and_get_token() - token = token_data.get('token') if token_data else None - - schedule, halls = get_api_data_with_token_management(st.session_state.api_date.strftime('%Y-%m-%d')) - - if schedule is not None and halls is not None: - # 传入 token 和 date 以进行标准名清洗 - processed_df = process_api_data(schedule, halls, token, - st.session_state.api_date.strftime('%Y-%m-%d')) - st.session_state.api_df = processed_df - if not processed_df.empty: st.toast(f"成功获取并处理了 {len(processed_df)} 条排片数据!", - icon="✅") + now = get_beijing_now() + biz_date = get_business_date() + today_str = now.strftime("%Y-%m-%d") + + # --- 0. 每日健康检查 (09:00) --- + # 如果当前时间超过 09:00,且今天还没发过报告,发一条 + if now.time() >= dt_time(9, 0) and self.last_daily_report_date != today_str: + msg = f"日期: {today_str}\n状态: 监控服务正常运行中\nToken: 有效" + self.log(f"🔔 发送每日健康检查通知: {today_str}") + send_pushover("✅ 影城脚本今日运行正常", msg, priority=0) + self.last_daily_report_date = today_str + + # --- 1. 非营业时间休眠 (06:00 - 09:30) --- + start_check_time = now.replace(hour=9, minute=30, second=0, microsecond=0) + is_early_morning = (dt_time(6,0) <= now.time() < dt_time(9,30)) + + if is_early_morning and not self.simulation_mode: + self.status_text = "非监控时段 (等待 09:30)" + self.next_wakeup = start_check_time + self.zero_ticket_candidates.clear() + self.alerted_sessions.clear() + + sleep_sec = (start_check_time - now).total_seconds() + # 最多睡5分钟醒来一次,确保能准时触发09:00的健康检查 + self.log(f"💤 系统休眠中... 预计唤醒: {start_check_time.strftime('%H:%M')}") + time.sleep(min(sleep_sec, 300)) + continue + + # --- 2. 缓存刷新 --- + if self.daily_schedule_cache is None or self.current_business_date != biz_date: + self.log(f"📅 获取 {biz_date} 全天排片...") + schedule = self.api.fetch_schedule(biz_date) + if schedule is None: + self.log("❌ 初始化获取失败,1分钟后重试") + time.sleep(60) + continue + self.daily_schedule_cache = schedule + self.current_business_date = biz_date + self.zero_ticket_candidates.clear() + self.alerted_sessions.clear() + self.log(f"✅ 数据已更新,共 {len(schedule)} 场") + + # --- 3. 筛选窗口期场次 --- + active_check_needed = False + min_sleep_seconds = 3600 + sessions_in_window = [] + current_schedule = self.daily_schedule_cache + self.simulation_data + + for item in current_schedule: + start_time_str = item.get('showStartTime') + if not start_time_str: continue + show_dt = parse_show_datetime(biz_date, start_time_str) + if not show_dt: continue + + if now >= show_dt: continue + + monitor_start_dt = show_dt - timedelta(minutes=11) + + if now >= monitor_start_dt: + sessions_in_window.append({'data': item, 'dt': show_dt}) + active_check_needed = True else: - st.session_state.api_df = pd.DataFrame() - # 展示分析结果 (传递 query_tms_for_location 的状态) - display_analysis_results(st.session_state.api_df, "API", st.session_state.api_date, query_tms_for_location) - - with tab2: - # 确定数据源 (逻辑保持不变,API > 文件) - source_df_raw, source_date, source_date_str = pd.DataFrame(), None, "" - if not st.session_state.api_df.empty: - source_df_raw, source_date, source_date_str = st.session_state.api_df, st.session_state.api_date, f"{st.session_state.api_date}" - st.toast("正在使用来自 **API 获取** 的最新数据。", icon="☁️") - elif not st.session_state.file_df.empty: - source_df_raw, source_date, source_date_str = st.session_state.file_df, st.session_state.file_date, f"{st.session_state.file_date} (文件)" - st.toast("API数据为空,正在使用来自 **文件导入** 的数据。", icon="📃") - else: - st.warning('没有可用的数据。请先在 "🔍 排片效率分析" 标签页加载数据。') - - st.header(f"{source_date_str} 排片效率分析与调整建议") - if not source_df_raw.empty: - if st.button("生成分析报表", icon="🫵"): - with st.spinner("正在生成分析报表 (含跨日数据查询)..."): - next_day_sessions_map, next_day_total_sessions = None, None - previous_day_sessions_map = None - token_data = load_token() - - if source_date and token_data: - token = token_data.get('token') - # 获取次日数据,并拿到原始排片表用于检查 - next_day = source_date + timedelta(days=1) - next_day_str = next_day.strftime('%Y-%m-%d') - next_day_sessions_map, next_day_total_sessions, next_day_raw_schedule = fetch_and_process_daily_sessions( - next_day_str, quiet=True) - - # 生成合理性检查日志 - if next_day_raw_schedule: - logs = generate_schedule_check_logs(next_day_raw_schedule, next_day_str) - st.session_state.check_logs = logs + seconds_until_window = (monitor_start_dt - now).total_seconds() + if seconds_until_window < min_sleep_seconds: + min_sleep_seconds = seconds_until_window + + # --- 4. 严格执行逻辑 --- + if active_check_needed: + self.status_text = "🔥 监控进行中 (高频轮询)" + self.next_wakeup = now + timedelta(seconds=60) + + # 获取实时数据 + realtime_schedule = self.api.fetch_schedule(biz_date) + + if realtime_schedule is None and not self.simulation_mode: + self.log("⚠️ 网络/API请求失败,跳过本次判定,防止误报!") + time.sleep(60) + continue + + if realtime_schedule is None: realtime_schedule = [] + if self.simulation_mode: realtime_schedule += self.simulation_data + + realtime_map = {f"{x.get('hallId')}_{x.get('showStartTime')}": x for x in realtime_schedule} + + display_monitors = [] + + for session in sessions_in_window: + unique_id = f"{biz_date}_{session['data'].get('hallId')}_{session['data'].get('showStartTime')}" + key_for_map = f"{session['data'].get('hallId')}_{session['data'].get('showStartTime')}" + + latest = realtime_map.get(key_for_map) + if not latest: continue + + movie_name = latest.get('movieName') + hall_name = latest.get('hallName') + sold = int(latest.get('soldTicketNum', 0)) + start_str = latest.get('showStartTime') + mins_left = (session['dt'] - now).total_seconds() / 60 + + if sold == 0: + self.zero_ticket_candidates.add(unique_id) + display_monitors.append(f"👁️ {start_str} {movie_name} (0票, 监控中)") else: - st.session_state.check_logs = "无法获取次日排片详情,跳过检查。" - - # 获取前一日数据 - previous_day = source_date - timedelta(days=1) - previous_day_sessions_map, _, _ = fetch_and_process_daily_sessions( - previous_day.strftime('%Y-%m-%d'), quiet=True) - - # 获取经营摘要数据 - date_str = source_date.strftime('%Y-%m-%d') - income_data = fetch_income_data(token, date_str) - wenlv_cards = fetch_membership_data(token, date_str) - attendance = int(source_df_raw['总人次'].sum()) - - st.session_state.daily_summary_data = { - "ticket_income": income_data.get('ticket_income', 0.0) if income_data else 0.0, - "attendance": attendance, - "goods_income": income_data.get('goods_income', 0.0) if income_data else 0.0, - "sold_incomes_zb": income_data.get('sold_incomes_zb', 0.0) if income_data else 0.0, - "wenlv_cards": wenlv_cards - } - else: - st.error("无法确定源数据日期或Token,无法获取跨日及经营数据。") - - if '影片名称_清理后' not in source_df_raw.columns: - source_df_raw['影片名称_清理后'] = source_df_raw['影片名称'] - - analysis_df = process_and_analyze_data(source_df_raw.copy()) - st.session_state.today_movie_count = len(analysis_df) - st.session_state.previous_day_movie_count = len( - previous_day_sessions_map) if previous_day_sessions_map else 0 - - report_df = generate_efficiency_report_df(analysis_df, next_day_sessions_map, - next_day_total_sessions) - st.session_state.report_df = report_df - st.session_state.excel_paste_data = generate_excel_paste_data(report_df) - - if 'report_df' in st.session_state and not st.session_state.report_df.empty: - st.markdown("#### 排片效率分析表"); - display_df = st.session_state.report_df.copy() - display_df.insert(0, '序号', range(2, len(display_df) + 2)) - report_format = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', - '均价': '{:.2f}', '座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', - '座次效率': '{:.2f}', '场次效率': '{:.2f}', '次日场数': '{:,.0f}'} - display_df['均价'] = pd.to_numeric(display_df['均价'], errors='coerce').replace([np.inf, -np.inf], - np.nan) - st.dataframe(display_df.style.format(report_format, na_rep="#DIV/0!"), use_container_width=True, - hide_index=True) - - n_diff = st.session_state.today_movie_count - st.session_state.previous_day_movie_count - if n_diff > 0: - excel_copy_title = f"复制到 Excel (需要增加 {n_diff} 行,从第一个电影名字开始粘贴)" - elif n_diff < 0: - excel_copy_title = f"复制到 Excel (需要减少 {abs(n_diff)} 行,从第一个电影名字开始粘贴)" - else: - excel_copy_title = "复制到 Excel (行数保持不变,从第一个电影名字开始粘贴)" - st.markdown(f"##### {excel_copy_title}"); - st.code(st.session_state.excel_paste_data, language='text') - - - # 复制列表 - report_df = st.session_state.report_df - movie_titles = report_df.iloc[:-1]['影片'].tolist() - weather_title = get_weather_forecast(source_date) - st.markdown(f"##### {weather_title}") - st.code(''.join([f'《{title}》' for title in movie_titles]), language='text') - - # 经营摘要 - if 'daily_summary_data' in st.session_state: - summary_data = st.session_state.daily_summary_data - summary_text = ( - f"花都店,今日票房:{summary_data.get('ticket_income', 0.0):.2f}元," - f"观影人次:{summary_data.get('attendance', 0)}," - f"卖品收入:{summary_data.get('goods_income', 0.0):.2f}元," - f"卖品占比:{summary_data.get('sold_incomes_zb', 0.0):.2f}%," - f"文旅卡:{summary_data.get('wenlv_cards', 0)}张。" - ) - st.markdown(f"##### 今日经营数据概览") - st.code(summary_text) - st.markdown(f"> 抽奖券计算方法:先在鼎新报表系统里查询`卡发行`当日开卡数量然后查询`卡充值`当日详细的充值金额,**充值金额整除 200 加上卡发行数量即为抽奖券数量**。") - - st.markdown("#### 🔍 场次合理性检查日志") - if st.session_state.check_logs: - st.code(st.session_state.check_logs) - st.markdown("#### 📡 次日排片 TMS 文件核对") - st.info( - "此功能将查询 TMS 服务器,检查次日排程的影厅是否有对应的影片文件(不区分语言和制式版本,仅匹配片名)。") - - if st.button("开始核对 TMS 文件", key="check_tms_files_btn", icon="🕵️‍♂️"): - # 确定次日日期 - check_date = source_date + timedelta(days=1) - check_date_str = check_date.strftime('%Y-%m-%d') - - with st.spinner(f"正在获取 {check_date_str} 的排片数据并连接 TMS 服务器..."): - # 1. 获取次日排片 - schedule_data, _ = get_api_data_with_token_management(check_date_str) - - if not schedule_data: - st.error(f"无法获取 {check_date_str} 的排片数据,请检查网络或 Token。") + if unique_id in self.zero_ticket_candidates: + if unique_id not in self.alerted_sessions: + self.log(f"🚨 突发购票检测!{hall_name}《{movie_name}》") + send_pushover( + f"禁止撤场:{hall_name}有票!", + f"《{movie_name}》{start_str}\n倒计时 {mins_left:.1f}分\n⚠️ 从0票突增至{sold}张", + priority=0 + ) + self.alerted_sessions.add(unique_id) + self.zero_ticket_candidates.remove(unique_id) + + display_monitors.append(f"✅ {start_str} {movie_name} (新售出{sold}张, 已报警)") else: - try: - # 2. 获取 TMS 数据 - df_sched = pd.DataFrame(schedule_data) - priority_titles = df_sched[ - 'movieName'].unique().tolist() if 'movieName' in df_sched.columns else [] - - tms_hall_data, _ = fetch_and_process_server_movies(priority_titles) - - # 3. 执行比对 (使用新函数) - logs_text = check_tms_file_availability(schedule_data, tms_hall_data, - check_date_str) - - if logs_text is None: - st.success( - f"✅ 核对完成:{check_date_str} 所有排映影片在对应影厅服务器中均存在关联文件。") - else: - st.warning("⚠️ 发现潜在缺片风险!请检查以下影厅服务器:") - # 这里使用 st.code 展示多行带编号的文本 - st.code(logs_text, language="text") - - except Exception as e: - st.error(f"核对过程中发生错误: {e}") - - with tab_sales: - # 卖品品类分析表 UI - col_1, col_2 = st.columns(2) - with col_1: - sales_from_file = st.checkbox("从`商品销售汇总报表-已退减.xlsx`导入数据", key="sales_file_cb") - - if sales_from_file: - st.header("从本地文件导入数据") - st.write("请上传 `商品销售汇总报表-已退减.xlsx` 文件。") - uploaded_file = st.file_uploader("上传 Excel 文件", type=['xlsx', 'xls'], label_visibility="collapsed", - key="sales_file_uploader") - if uploaded_file is not None: - with st.spinner("正在读取并分析文件..."): - try: - df = pd.read_excel(uploaded_file, skiprows=3) - final_summary, copy_text = process_sales_data(df) - if final_summary is not None: - st.markdown("#### 销售总览 (套餐 Top 10 + 单品 Top 5)") - st.dataframe(final_summary, use_container_width=True, hide_index=True) - st.markdown("##### 复制到 Excel") - st.code(copy_text, language='text') - except Exception as e: - st.error(f"处理文件时发生错误: {e}") - else: - st.header("从服务器API获取实时数据") - sales_date = st.date_input("选择要查询的日期", value=datetime.now().date(), key="sales_date_picker") - if st.button("获取卖品销售数据", key="fetch_sales_data", icon="🫵"): - with st.spinner(f"正在获取 {sales_date} 的销售数据..."): - api_results = get_sales_data_with_token_management(sales_date) - if api_results is not None: - st.toast(f"成功获取到 {len(api_results)} 条销售记录!", icon="✅") - df = transform_api_data_to_df(api_results) - final_summary, copy_text = process_sales_data(df) - if final_summary is not None: - st.markdown("#### 销售总览 (套餐 Top 10 + 单品 Top 5)") - st.dataframe(final_summary, use_container_width=True, hide_index=True) - st.markdown("##### 复制到 Excel") - st.code(copy_text, language='text') - else: - st.warning("未能从 API 获取到数据,请检查登录或网络连接。") - - with tab_report: - # 影片映出日累计表 UI - st.header("影片映出日累计报表生成") - report_date = st.date_input("选择要查询的排片日期", value=datetime.now().date(), key="daily_report_date") - - if st.button("获取并生成报表", key="fetch_daily_report", icon="🫵"): - report_date_str = report_date.strftime('%Y-%m-%d') - with st.spinner(f"正在获取 {report_date_str} 的排片数据..."): - - token_data = load_token() - if not token_data: token_data = login_and_get_token() - token = token_data.get('token') if token_data else None + display_monitors.append(f"🛡️ {start_str} {movie_name} (原有{sold}张, 忽略)") + + self.monitored_sessions = display_monitors + time.sleep(60) - schedule, halls_map = get_api_data_with_token_management(report_date_str) - if schedule is not None and halls_map is not None: - processed_df = process_and_filter_data_for_report(schedule, halls_map, report_date_str, token) - st.session_state.daily_report_df = processed_df - if not processed_df.empty: - st.toast(f"成功获取并处理了 {len(processed_df)} 条有效场次数据!", icon="✅") else: - st.session_state.daily_report_df = pd.DataFrame() - - if not st.session_state.daily_report_df.empty: - st.markdown(f"#### {report_date.strftime('%Y-%m-%d')} 影片映出日累计报表") - st.dataframe( - st.session_state.daily_report_df.style.format({ - '人数合计': '{:,.0f}', '座位数': '{:,.0f}', '上座率%': '{:.2f}%' - }), - use_container_width=True, hide_index=True - ) - import io - output_buffer = io.BytesIO() - st.session_state.daily_report_df.to_excel(output_buffer, index=False, engine='openpyxl') - excel_data = output_buffer.getvalue() - st.download_button( - label="📥 下载 XLSX 报表文件", - data=excel_data, - file_name=f"{report_date.strftime('%Y-%m-%d')}_影片映出日累计报表.xlsx", - mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - ) - else: - st.info("请选择日期并点击“获取并生成报表”以生成数据。") - - with tab_print: - # 场次与散场时间快捷打印 UI - st.header("场次与散场时间快捷打印") - - with st.expander("⚙️ 显示与打印设置", expanded=False): - col1, col2 = st.columns(2) - with col1: - st.subheader("⚙️ 修改 LED 屏排片表设置") - led_font_name = st.selectbox("字体选择", options=list(AVAILABLE_FONTS.keys()), index=0, key="led_font") - led_font_path = AVAILABLE_FONTS.get(led_font_name) - generate_png_led = st.checkbox("生成 PNG 图片", value=False, key="png_led") - with col2: - st.subheader("⚙️ 散场时间表设置") - times_font_name = st.selectbox("字体选择", options=list(AVAILABLE_FONTS.keys()), index=1, - key="times_font") - times_font_path = AVAILABLE_FONTS.get(times_font_name) - font_size_multiplier = st.slider("字体大小调节", min_value=0.8, max_value=1.5, value=1.2, step=0.05, - help="调整字体在单元格内的相对大小") - split_time = st.time_input("白班 / 晚班分割时间", value=dt_time(17, 0), - help="散场时间在此时间点之前(含)的为白班") - time_adjustment = st.slider("时间提前 (分钟)", min_value=0, max_value=10, value=0, - help="将所有散场时间提前 N 分钟显示") - hall_display_format = st.radio("影厅号格式", options=['Default', 'Superscript', 'Circled'], - format_func=lambda x: - {'Default': '默认 (2 18:28)', 'Superscript': '上标 (2# 18:28)', - 'Circled': '带圈 (② 18:28)'}[x], horizontal=True) - generate_png_times = st.checkbox("生成 PNG 图片", value=False, key="png_times") - - with st.expander("💡 使用帮助", expanded=False): - st.markdown(""" - #### 🖨️ 功能简介 - 本工具用于将影院的排期数据快速转换为两种形式的打印页: - 1. **修改 LED 屏幕排片表打印**:A4 竖版,详细列出影厅、场次、影片名、拼音缩写和时间范围,方便员工在修改 LED 屏幕时快速查阅和输入。 - 2. **散场时间打印**:A5 竖版,以大字体分栏显示各影厅的散场时间,方便员工在疏散人群和清洁影厅时查阅。 - - #### ⬇️ 操作步骤 - 1. **选择数据源**: - * **从文件导入**:导出 `放映时间核对表.xls` 后,点击 "Browse files" 按钮上传。 - * **从 API 获取**:选择日期,点击 "获取排片数据" 按钮,程序将自动登录并拉取最新数据。 - 2. **调整设置 (可选)**:点击上方的 "显示与打印设置" 打开设置面板,根据需要调整字体、大小、格式等。 - 3. **预览与打印**: - * 数据加载成功后,下方会自动生成预览。 - * 默认显示 **PDF 预览**,这是最适合打印的格式。可以直接在预览界面点击 🖨️ 打印按钮。 - """) - - print_tab1, print_tab2 = st.tabs(["☁️ 从 API 获取", "📁 从文件导入"]) - - with print_tab2: - uploaded_file_print = st.file_uploader("请上传 `放映时间核对表.xls` 文件", type=["xls"], - key="print_file_uploader") - if uploaded_file_print: - with st.spinner("正在处理文件,请稍候..."): - led_data, times_part1, times_part2, date_str = process_file_upload(uploaded_file_print, split_time, - time_adjustment) - st.session_state.processed_print_data = { - "led_data": led_data, - "times_part1": times_part1, - "times_part2": times_part2, - "date_str": date_str - } - if date_str: - st.toast(f"文件处理完成!排期日期:**{date_str}**", icon="🎉") - - with print_tab1: - # 修改 value 为当前日期 + 1天 - print_api_date = st.date_input("选择要查询的排片日期", value=datetime.now().date() + timedelta(days=1), - key="print_api_date_picker") - if st.button("获取排片数据", key="fetch_print_api_data", icon="🫵"): - with st.spinner(f"正在获取 {print_api_date} 的排片数据..."): - date_str_api = print_api_date.strftime('%Y-%m-%d') - # 重用 app2.py 中现有的 API 获取逻辑,不需要重复写 fetch_schedule_data - schedule_list, _ = get_api_data_with_token_management(date_str_api) - - if schedule_list is not None and len(schedule_list) > 0: - df_api = pd.DataFrame(schedule_list) - df_api.rename(columns={'hallName': 'Hall', 'showStartTime': 'StartTime', - 'showEndTime': 'EndTime', 'movieName': 'Movie'}, inplace=True) - df_api = df_api[['Hall', 'StartTime', 'EndTime', 'Movie']] - - led_data, times_part1, times_part2 = process_schedule_df(df_api, print_api_date, split_time, - time_adjustment) - st.session_state.processed_print_data = { - "led_data": led_data, - "times_part1": times_part1, - "times_part2": times_part2, - "date_str": date_str_api - } - st.toast(f"成功获取 {len(schedule_list)} 条排片数据!", icon="✅") - elif schedule_list is not None: - st.warning("成功连接API,但当天没有排片数据。") - st.session_state.processed_print_data = None + if 0 < min_sleep_seconds < 86400: + wakeup_dt = now + timedelta(seconds=min_sleep_seconds) + self.status_text = "💤 智能休眠中" + self.next_wakeup = wakeup_dt + self.monitored_sessions = [] + self.log(f"休眠 {min_sleep_seconds/60:.1f} 分钟,直到 {wakeup_dt.strftime('%H:%M')}") + time.sleep(min_sleep_seconds) else: - st.error("获取API数据失败。") - st.session_state.processed_print_data = None - - # --- 显示打印预览结果 --- - if st.session_state.processed_print_data: - data = st.session_state.processed_print_data - led_data, times_part1, times_part2, date_str = data["led_data"], data["times_part1"], data["times_part2"], \ - data["date_str"] - - - # 显示 LED 屏排片表 - st.header("🖥️ 修改 LED 屏幕排片表打印") - if led_data is not None and not led_data.empty: - led_output = create_print_layout_led(led_data, date_str, led_font_path, generate_png_led) - if led_output: - tabs = ["PDF 预览"] - if 'png' in led_output: tabs.append("PNG 预览") - tab_views = st.tabs(tabs) - with tab_views[0]: - st.markdown(display_pdf(led_output['pdf']), unsafe_allow_html=True) - if 'png' in led_output: - with tab_views[1]: st.image(led_output['png'], use_container_width=True) - else: - st.error("未能成功生成 '修改 LED 屏排片表'。请检查数据源。") + self.log("今日无更多监控场次,休眠 5 分钟") + time.sleep(300) - # 显示散场时间快捷打印 - st.header("🔚 散场时间打印") - col1, col2 = st.columns(2) - with col1: - if times_part1 is not None and not times_part1.empty: - part1_output = create_print_layout_times(times_part1, "A", date_str, times_font_path, - font_size_multiplier, hall_display_format, - generate_png_times) - if part1_output: - tabs1 = [f"白班 (≤ {split_time.strftime('%H:%M')}) PDF 预览"] - if 'png' in part1_output: tabs1.append(f"白班 (≤ {split_time.strftime('%H:%M')}) PNG 预览") - tab_views1 = st.tabs(tabs1) - with tab_views1[0]: - st.markdown(display_pdf(part1_output['pdf']), unsafe_allow_html=True) - if 'png' in part1_output: - with tab_views1[1]: st.image(part1_output['png']) - else: - st.info(f"白班 (≤ {split_time.strftime('%H:%M')}) 没有排期数据。") + except Exception as e: + self.log(f"主循环异常: {e}") + time.sleep(60) + + def trigger_simulation(self): + self.simulation_mode = True + fake_time = (get_beijing_now() + timedelta(minutes=5)).strftime("%H:%M") + + self.simulation_data = [{ + 'movieName': '测试影片-防误报验证', + 'hallName': '模拟厅', + 'hallId': 'SIM_SAFE', + 'showStartTime': fake_time, + 'soldTicketNum': '0' + }] + self.log(f"已注入 0票 模拟场次: {fake_time} 开场") + + def simulate_buy(): + time.sleep(10) + self.simulation_data[0]['soldTicketNum'] = '1' + self.log("⚡ [测试] 模拟动作:购票成功!应触发报警。") + + threading.Thread(target=simulate_buy).start() + +# --- 5. Streamlit 前端 --- + +@st.cache_resource +def get_monitor(): + return CinemaMonitor() - with col2: - if times_part2 is not None and not times_part2.empty: - part2_output = create_print_layout_times(times_part2, "C", date_str, times_font_path, - font_size_multiplier, hall_display_format, - generate_png_times) - if part2_output: - tabs2 = [f"晚班 (> {split_time.strftime('%H:%M')}) PDF 预览"] - if 'png' in part2_output: tabs2.append(f"晚班 (> {split_time.strftime('%H:%M')}) PNG 预览") - tab_views2 = st.tabs(tabs2) - with tab_views2[0]: - st.markdown(display_pdf(part2_output['pdf']), unsafe_allow_html=True) - if 'png' in part2_output: - with tab_views2[1]: st.image(part2_output['png']) - else: - st.info(f"晚班 (> {split_time.strftime('%H:%M')}) 没有排期数据。") +def main(): + monitor = get_monitor() + st.title("📽️ 影城排程防撤场监控终端 Pro") + + utc_now = datetime.now(timezone.utc).strftime("%H:%M:%S") + bj_now = get_beijing_now().strftime("%H:%M:%S") + + c1, c2, c3 = st.columns(3) + with c1: st.metric("当前状态", monitor.status_text) + with c2: st.metric("下次唤醒 (北京时间)", monitor.next_wakeup.strftime("%H:%M:%S") if monitor.next_wakeup else "--") + with c3: st.metric("系统时间 (UTC / BJ)", f"{utc_now} / {bj_now}") + + st.divider() + + col_logs, col_list = st.columns([3, 2]) + with col_logs: + st.subheader("📜 系统运行日志") + if st.button("刷新日志"): pass + st.text_area("Logs", "\n".join(list(monitor.logs)), height=450, disabled=True) + with col_list: + st.subheader("🎯 实时监控列表") + st.info("说明:👁️=0票监控中 | ✅=突发购票(报警) | 🛡️=原有票(安全)") + if monitor.monitored_sessions: + for s in monitor.monitored_sessions: + if "✅" in s: st.success(s) + elif "👁️" in s: st.error(s) + else: st.caption(s) else: - st.info("👆 请先从文件或API加载数据以生成预览。") - - with tab3: - st.header("TMS 服务器影片内容查询") - if st.button('点击查询 TMS 服务器', key="query_tms", icon="🫵"): - with st.spinner("正在从 TMS 服务器获取数据中..."): - try: - priority_titles, df_for_tms = [], pd.DataFrame() - if not st.session_state.api_df.empty: - df_for_tms = st.session_state.api_df - elif not st.session_state.file_df.empty: - df_for_tms = st.session_state.file_df - if not df_for_tms.empty: - # 优先使用清洗后的名字 - if '影片名称_清理后' in df_for_tms.columns: - priority_titles = df_for_tms['影片名称_清理后'].unique().tolist() - else: - priority_titles = df_for_tms['影片名称'].apply( - lambda x: clean_movie_title(x)).unique().tolist() - - halls_data, movie_list_sorted = fetch_and_process_server_movies(priority_titles) - st.toast("TMS 服务器数据获取成功!", icon="🎉") - st.markdown("#### 按影片查看所在影厅") - view2_data = [{'影片名称': item['assert_name'], - '所在影厅': " ".join(sorted([get_circled_number(h) for h in item['halls']])), - '文件名': item['content_name'], '时长(分钟)': format_play_time(item['play_time'])} - for item in movie_list_sorted] - st.dataframe(pd.DataFrame(view2_data), hide_index=True, use_container_width=True) - st.markdown("#### 按影厅查看影片内容") - hall_tabs = st.tabs(list(halls_data.keys())) - for tab, hall_name in zip(hall_tabs, halls_data.keys()): - with tab: - view1_data = [{'影片名称': item['details']['assert_name'], '所在影厅': " ".join( - sorted([get_circled_number(h) for h in item['details']['halls']])), - '文件名': item['content_name'], - '时长(分钟)': format_play_time(item['details']['play_time'])} for item in - halls_data[hall_name]] - st.dataframe(pd.DataFrame(view1_data), hide_index=True, use_container_width=True) - except Exception as e: - st.error(f"查询TMS服务器时出错: {e}") - + st.info("当前休眠中,无临近场次") + + st.write("---") + if st.button("模拟 '0票 -> 1票' 验证"): + monitor.trigger_simulation() + st.toast("测试已触发") if __name__ == "__main__": - main() - + main() \ No newline at end of file