diff --git "a/app.py" "b/app.py" new file mode 100644--- /dev/null +++ "b/app.py" @@ -0,0 +1,2074 @@ +import streamlit as st +import pandas as pd +import numpy as np +import requests +import time +from collections import defaultdict +import json +import os +from datetime import datetime, timedelta, time as dt_time +import io +import warnings +import matplotlib.pyplot as plt +import matplotlib.font_manager as font_manager +from matplotlib.lines import Line2D +from matplotlib.backends.backend_pdf import PdfPages +import matplotlib.gridspec as gridspec +import base64 +import math +from pypinyin import lazy_pinyin, Style +from itertools import combinations +# --- 新增:加载环境变量 --- +from dotenv import load_dotenv +load_dotenv() # 加载本地 .env 文件 + +# --- 全局配置和常量 --- +TOKEN_FILE = 'token_data.json' +# --- 环境变量获取 (替代硬编码) --- +# 使用 os.getenv 获取,如果获取不到默认为空字符串或特定默认值 +GAODE_API_KEY = os.getenv("GAODE_API_KEY", "") +ADCODE = os.getenv("ADCODE", "440114") +CINEMA_ID = os.getenv("CINEMA_ID", "44001291") + +# --- 打印功能相关常量 --- +BUSINESS_START = "09:30" +BUSINESS_END = "01:30" +BORDER_COLOR = 'grey' +DATE_COLOR = '#A9A9A9' +A5_WIDTH_IN = 5.83 +A5_HEIGHT_IN = 8.27 +NUM_COLS = 3 + +# --- 打印字体清单 --- +ALL_FONTS = { + "思源黑体-常规 (推荐 LED 屏)": "SimHei.ttf", + "思源黑体-重体 (推荐散场表)": "SourceHanSansOLD-Heavy-2.otf", + "思源黑体-粗体": "SourceHanSansOLD-Bold-2.otf", + "思源宋体-常规": "SourceHanSansCN-Normal.otf", + "苹方-中黑": "PingFangSC-Medium.otf", + "苹方-半粗": "PingFangSC-Semibold.otf", + "苹方-极细": "PingFangSC-Ultralight.otf", + "阿里巴巴普惠体-常规": "Alibaba-PuHuiTi.ttf", + "阿里巴巴普惠体-粗体": "AlibabaPuHuiTi-Bold.otf", + "阿里巴巴普惠体-重体": "AlibabaPuHuiTi-Heavy.otf", +} +# 检查可用字体 +AVAILABLE_FONTS = {name: fname for name, fname in ALL_FONTS.items() if os.path.exists(fname)} + +# --- 忽略特定警告 --- +# 忽略 openpyxl 的样式警告 +warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl") +# 忽略 pandas 日期解析的警告 (针对无法推断格式的情况) +warnings.filterwarnings("ignore", message="Could not infer format") + +# --- 页面基础设置 --- +st.set_page_config(layout="wide", page_title="影城工作便捷工具") + + +# --- 1. API 数据获取模块 --- + +# --- 1.1 Token 管理 --- +def save_token(token_data): + """将Token数据保存到JSON文件""" + try: + with open(TOKEN_FILE, 'w', encoding='utf-8') as f: + json.dump(token_data, f, ensure_ascii=False, indent=4) + return True + except Exception as e: + st.error(f"保存Token失败: {e}") + return False + + +def load_token(): + """从JSON文件加载Token数据""" + if os.path.exists(TOKEN_FILE): + try: + with open(TOKEN_FILE, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError): + return None + return None + + +def login_and_get_token(): + """执行登录操作并获取新的Token""" + st.write("Token无效或已过期,正在尝试重新登录...") + + + # 获取环境变量 + username = os.getenv("CINEMA_USERNAME") + password = os.getenv("CINEMA_PASSWORD") + res_code = os.getenv("CINEMA_RES_CODE") + device_id = os.getenv("CINEMA_DEVICE_ID") + + # 简单检查,防止未配置环境变量导致后续请求莫名报错 + if not all([username, password, res_code]): + st.error("登录失败:未配置用户名、密码或影院编码环境变量。") + return None + + session = requests.Session() + session.headers.update({ + 'Host': 'app.bi.piao51.cn', + 'Accept': 'application/json, text/javascript, */*; q=0.01', + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', + }) + + login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' + login_headers = { + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', + 'Origin': 'https://app.bi.piao51.cn', + } + # 使用变量 + login_data = { + 'username': username, + 'password': password, + 'type': '1', + 'resCode': res_code, + 'deviceid': device_id, + 'dtype': 'ios', + } + + try: + response_login = session.post(login_url, headers=login_headers, data=login_data, allow_redirects=False, + timeout=15) + if not (300 <= response_login.status_code < 400 and 'token' in session.cookies): + st.error(f"登录步骤 1 失败,未能获取 Session Token。状态码: {response_login.status_code}") + return None + + user_info_url = 'https://app.bi.piao51.cn/cinema-app/security/logined.action' + response_user_info = session.get(user_info_url, timeout=10) + response_user_info.raise_for_status() + + user_info = response_user_info.json() + if user_info.get("success") and user_info.get("data", {}).get("token"): + token_data = user_info['data'] + if save_token(token_data): st.toast("登录成功,已获取并保存新 Token!", icon="🔑") + return token_data + else: + st.error(f"登录步骤 2 失败,未能从 JSON 中提取 Token。响应: {user_info.get('msg')}") + return None + + except requests.exceptions.RequestException as e: + st.error(f"登录请求过程中发生网络错误: {e}") + return None + + +# --- 1.2 API 数据抓取 (排片相关) --- +def fetch_hall_info(token): + url = 'https://cawapi.yinghezhong.com/showInfo/getShowHallInfo' + params = {'token': token, '_': int(time.time() * 1000)} + headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} + response = requests.get(url, params=params, headers=headers, timeout=10) + response.raise_for_status() + data = response.json() + if data.get('code') == 1 and data.get('data'): + return {item['hallId']: item['seatNum'] for item in data['data']} + else: + raise Exception(f"获取影厅信息失败: {data.get('msg', '未知错误')}") + + +def fetch_schedule_data(token, show_date): + url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' + params = {'showDate': show_date, 'token': token, '_': int(time.time() * 1000)} + headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} + response = requests.get(url, params=params, headers=headers, timeout=15) + response.raise_for_status() + data = response.json() + if data.get('code') == 1: + return data.get('data', []) + elif data.get('code') == 500: + raise ValueError("Token 可能已失效") + else: + raise Exception(f"获取排片数据失败: {data.get('msg', '未知错误')}") + + +def get_api_data_with_token_management(show_date): + token_data = load_token() + token = token_data.get('token') if token_data else None + if not token: + token_data = login_and_get_token() + if not token_data: return None, None + token = token_data.get('token') + + try: + schedule_list = fetch_schedule_data(token, show_date) + hall_seat_map = fetch_hall_info(token) + return schedule_list, hall_seat_map + except ValueError: + st.toast("Token 已失效,正在尝试重新登录并重试...", icon="🔄") + token_data = login_and_get_token() + if not token_data: return None, None + token = token_data.get('token') + try: + schedule_list = fetch_schedule_data(token, show_date) + hall_seat_map = fetch_hall_info(token) + return schedule_list, hall_seat_map + except Exception as e: + st.error(f"重试获取数据失败: {e}"); + return None, None + except Exception as e: + st.error(f"获取 API 数据时发生错误: {e}"); + return None, None + + +# --- 1.3 新增:电影名称API (获取标准电影名) --- +@st.cache_data(show_spinner=False, ttl=600) +def fetch_canonical_movie_names(token, date_str): + """ + 获取指定日期的官方电影名称列表(唯一名称)。 + 用于后续对原始排片数据中的电影名进行标准化清洗。 + """ + url = 'https://app.bi.piao51.cn/cinema-app/mycinema/movieSellGross.action' + params = { + 'token': token, + 'startDate': date_str, + 'endDate': date_str, + 'dateType': 'day', + 'cinemaId': CINEMA_ID + } + headers = { + 'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'jwt': '0', + 'Accept': 'application/json, text/javascript, */*; q=0.01', + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', + } + + try: + response = requests.get(url, params=params, headers=headers, timeout=10) + response.raise_for_status() + data = response.json() + if data.get('code') == 'A00000' and data.get('results'): + # 提取 results 列表中的 movieName,排除 "总计" 等非电影项 + names = [item['movieName'] for item in data['results'] if + item.get('movieName') and item['movieName'] != '总计'] + return names + except Exception as e: + # 这里的错误不打断主流程,返回空列表,后续会回退到基础清洗逻辑 + print(f"获取标准电影名称失败: {e}") + return [] + + +def process_api_data(schedule_list, hall_seat_map, token=None, show_date=None): + if not schedule_list: + st.warning("未获取到任何排片数据。"); + return pd.DataFrame() + df = pd.DataFrame(schedule_list) + df['座位数'] = df['hallId'].map(hall_seat_map).fillna(0).astype(int) + df.rename(columns={'movieName': '影片名称', 'showStartTime': '放映时间', 'soldBoxOffice': '总收入', + 'soldTicketNum': '总人次'}, inplace=True) + + # 获取标准电影名列表并进行清洗 + canonical_names = [] + if token and show_date: + canonical_names = fetch_canonical_movie_names(token, show_date) + + df['影片名称'] = df['影片名称'].apply(lambda x: clean_movie_title(x, canonical_names)) + + required_cols = ['影片名称', '放映时间', '座位数', '总收入', '总人次'] + df = df[required_cols] + df.dropna(subset=['影片名称', '放映时间'], inplace=True) + for col in ['座位数', '总收入', '总人次']: + df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) + df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M', errors='coerce').dt.time + df.dropna(subset=['放映时间'], inplace=True) + return df + + +# --- 1.4 API 数据抓取 (销售相关) --- +def fetch_sales_data_from_api(token, selected_date): + """从API获取指定日期的销售数据""" + url = 'https://app.bi.piao51.cn/cinema-app/mycinema/retailTop.action' + date_str = selected_date.strftime('%Y-%m-%d') + headers = { + 'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'jwt': '0', + 'Accept': 'application/json, text/javascript, */*; q=0.01', + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', + } + params = { + 'dateType': 'day', 'startDate': date_str, 'endDate': date_str, 'noEvent': '1', + 'token': token, 'qTime': date_str, 'cinemaId': CINEMA_ID, + } + try: + response = requests.get(url, params=params, headers=headers, timeout=15) + response.raise_for_status() + data = response.json() + if data.get('code') == 'A00000': + return data.get('results', []) + elif "login" in response.text: + raise ValueError("Token可能已失效") + else: + st.error(f"获取 API 数据失败: {data.get('msg', '未知错误')}") + return None + except requests.exceptions.RequestException as e: + st.error(f"API 请求网络错误: {e}") + return None + + +def get_sales_data_with_token_management(selected_date): + """带Token管理的API销售数据获取流程""" + token_data = load_token() + token = token_data.get('token') if token_data else None + if not token: + token_data = login_and_get_token() + if not token_data: return None + token = token_data.get('token') + + try: + api_results = fetch_sales_data_from_api(token, selected_date) + return api_results + except ValueError: + st.toast("Token 已失效,正在尝试重新登录并重试...", icon="🔄") + token_data = login_and_get_token() + if not token_data: return None + token = token_data.get('token') + try: + api_results = fetch_sales_data_from_api(token, selected_date) + return api_results + except Exception as e: + st.error(f"重试获取数据失败: {e}") + return None + except Exception as e: + st.error(f"获取 API 数据时发生错误: {e}") + return None + + +# --- 2. 核心数据分析模块 --- +def clean_movie_title(raw_title, canonical_names=None): + """ + 电影名称标准化清洗函数 + """ + if not isinstance(raw_title, str): + return raw_title + + base_name = None + + # 1. 尝试匹配标准名称 + if canonical_names: + # 按长度倒序排序,确保最长匹配优先(解决"你好"vs"你好明天"的问题) + sorted_names = sorted(canonical_names, key=len, reverse=True) + for name in sorted_names: + if name in raw_title: + base_name = name + break + + # 2. 回退逻辑:如果没传列表或没匹配到,使用空格分割 + if not base_name: + base_name = raw_title.split(' ', 1)[0] + + # 3. 后缀追加逻辑 + raw_upper = raw_title.upper() + suffix = "" + + if "HDR LED" in raw_upper: + suffix = "(HDR LED)" + elif "CINITY" in raw_upper: + suffix = "(CINITY)" + elif "杜比" in raw_upper or "DOLBY" in raw_upper: + suffix = "(杜比视界)" + elif "IMAX" in raw_upper: + if "3D" in raw_upper: + suffix = "(数字IMAX3D)" + else: + suffix = "(数字IMAX)" + elif "巨幕" in raw_upper: + if "立体" in raw_upper: + suffix = "(中国巨幕立体)" + else: + suffix = "(中国巨幕)" + elif "3D" in raw_upper: + suffix = "(数字3D)" + + # **修复**: 只有当 base_name 自身不包含该后缀时才添加 + if suffix and suffix not in base_name: + return f"{base_name}{suffix}" + + return base_name + + +def style_efficiency(row): + green, red = 'background-color: #E6F5E6;', 'background-color: #FFE5E5;' + seat_eff, session_eff = row.get('座次效率', 0), row.get('场次效率', 0) + if seat_eff > 1.5 or session_eff > 1.5: return [green] * len(row) + if seat_eff < 0.5 or session_eff < 0.5: return [red] * len(row) + return [''] * len(row) + + +def style_summary_efficiency(row): + green, red = 'background-color: #E6F5E6;', 'background-color: #FFE5E5;' + if (row.get('全部座次效率', 0) > 1.5 or row.get('全部场次效率', 0) > 1.5 or + row.get('黄金时段座次效率', 0) > 1.5 or row.get('黄金时段场次效率', 0) > 1.5): + return [green] * len(row) + if (row.get('全部座次效率', 0) < 0.5 or row.get('全部场次效率', 0) < 0.5 or + row.get('黄金时段座次效率', 0) < 0.5 or row.get('黄金时段场次效率', 0) < 0.5): + return [red] * len(row) + return [''] * len(row) + + +def process_and_analyze_data(df): + if df.empty: return pd.DataFrame() + + # 确保有清洗后的列名 + if '影片名称_清理后' not in df.columns and '影片名称' in df.columns: + df['影片名称_清理后'] = df['影片名称'] + + analysis_df = df.groupby('影片名称_清理后').agg(座位数=('座位数', 'sum'), 场次=('影片名称_清理后', 'size'), + 票房=('总收入', 'sum'), 人次=('总人次', 'sum')).reset_index() + analysis_df.rename(columns={'影片名称_清理后': '影片'}, inplace=True) + analysis_df = analysis_df.sort_values(by='票房', ascending=False).reset_index(drop=True) + total_seats, total_sessions, total_revenue = analysis_df['座位数'].sum(), analysis_df['场次'].sum(), analysis_df[ + '票房'].sum() + with np.errstate(divide='ignore', invalid='ignore'): + analysis_df['均价'] = np.divide(analysis_df['票房'], analysis_df['人次']).fillna(0) + analysis_df['座次比'] = np.divide(analysis_df['座位数'], total_seats).fillna(0) + analysis_df['场次比'] = np.divide(analysis_df['场次'], total_sessions).fillna(0) + analysis_df['票房比'] = np.divide(analysis_df['票房'], total_revenue).fillna(0) + analysis_df['座次效率'] = np.divide(analysis_df['票房比'], analysis_df['座次比']).fillna(0) + analysis_df['场次效率'] = np.divide(analysis_df['票房比'], analysis_df['场次比']).fillna(0) + final_cols = ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率', + '场次效率'] + return analysis_df[final_cols] + + +# --- 2.1 销售数据处理与分析模块 --- +def transform_api_data_to_df(api_results): + """将API返回的JSON列表转换为与Excel格式一致的DataFrame""" + if not api_results: return pd.DataFrame() + records = [] + for item in api_results: + is_package = item.get('goodsAllName') and str(item.get('goodsAllName')).strip() != "" + record = { + '售卖位置': '线上渠道', + '一级分类': '套餐' if is_package else '单品', + '售卖键名称': item.get('goodsName'), + '数量': item.get('goodsSoldNums', 0), + '实收总金额': item.get('goodsSoldIncomes', 0) + } + records.append(record) + return pd.DataFrame(records) + + +def process_sales_data(df): + """核心分析函数,处理DataFrame并返回最终结果""" + if df.empty: + st.warning("没有可供分析的数据。") + return None, "" + required_columns = ['售卖位置', '一级分类', '售卖键名称', '数量', '实收总金额'] + if not all(col in df.columns for col in required_columns): + missing_cols = [col for col in required_columns if col not in df.columns] + st.error(f"数据缺少必要的列: {', '.join(missing_cols)}。") + return None, "" + for col in ['数量', '实收总金额']: + df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) + df.dropna(subset=['售卖键名称', '一级分类'], inplace=True) + df['售卖位置'] = df['售卖位置'].fillna('未知渠道') + + df_package = df[df['一级分类'] == '套餐'].copy() + df_non_package = df[df['一级分类'] != '套餐'].copy() + + if not df_package.empty: + package_summary = df_package.groupby(['售卖位置', '售卖键名称']).agg( + {'数量': 'sum', '实收总金额': 'sum'}).reset_index() + package_summary = package_summary[package_summary['数量'] != 0] + top_10_package = package_summary.sort_values(by='实收总金额', ascending=False).head(10)[ + ['售卖键名称', '数量', '实收总金额']] + else: + top_10_package = pd.DataFrame(columns=['售卖键名称', '数量', '实收总金额']) + + if not df_non_package.empty: + non_package_summary = df_non_package.groupby(['售卖位置', '售卖键名称']).agg( + {'数量': 'sum', '实收总金额': 'sum'}).reset_index() + non_package_summary = non_package_summary[non_package_summary['数量'] != 0] + top_5_non_package = non_package_summary.sort_values(by='实收总金额', ascending=False).head(5)[ + ['售卖键名称', '数量', '实收总金额']] + else: + top_5_non_package = pd.DataFrame(columns=['售卖键名称', '数量', '实收总金额']) + + summary_df = pd.concat([top_10_package, top_5_non_package], ignore_index=True) + final_display_df = pd.DataFrame(index=range(15), columns=['售卖键名称', '数量', '实收总金额']) + final_display_df['售卖键名称'] = '' + final_display_df['数量'] = np.nan + final_display_df['实收总金额'] = np.nan + + if not summary_df.empty: + final_display_df.iloc[:len(summary_df)] = summary_df.to_numpy() + + copy_df = final_display_df.copy() + # **修改**: 先转换为 object 类型再填充,避免 FutureWarning + copy_df = copy_df.astype(object) + copy_df.fillna('', inplace=True) + copy_text = copy_df.to_csv(sep='\t', index=False, header=False) + return final_display_df, copy_text + + +# --- 2.2 影片映出日累计报表数据处理模块 --- +def process_and_filter_data_for_report(schedule_list, hall_seat_map, selected_date_str, token=None): + """核心数据处理函数 for 影片映出日累计报表""" + if not schedule_list: + st.warning("未获取到任何排片数据。") + return pd.DataFrame() + + df = pd.DataFrame(schedule_list) + # 过滤掉观影人数为0的场次 + df['soldTicketNum'] = pd.to_numeric(df['soldTicketNum'], errors='coerce').fillna(0) + df = df[df['soldTicketNum'] > 0].copy() + + if df.empty: + st.info("所有场次的观影人数均为 0,没有可显示的数据。") + return pd.DataFrame() + + # 获取标准电影名并清洗 + canonical_names = [] + if token and selected_date_str: + canonical_names = fetch_canonical_movie_names(token, selected_date_str) + + df['影片'] = df['movieName'].apply(lambda x: clean_movie_title(x, canonical_names)) + df['座位数'] = df['hallId'].map(hall_seat_map).fillna(0).astype(int) + + # 计算上座率 + with np.errstate(divide='ignore', invalid='ignore'): + df['上座率%'] = np.divide(df['soldTicketNum'], df['座位数']) * 100 + df['上座率%'] = df['上座率%'].fillna(0) + + df.rename(columns={'showStartTime': '放映时间', 'hallName': '影厅', 'soldTicketNum': '人数合计'}, inplace=True) + df['放映日期'] = selected_date_str + final_cols = ['影片', '放映日期', '放映时间', '影厅', '人数合计', '座位数', '上座率%'] + result_df = df[final_cols] + result_df = result_df.sort_values(by='放映时间').reset_index(drop=True) + + return result_df + + +# --- 2.3 打印功能数据处理与布局模块 --- + +def get_font_properties(font_path, size=14): + """通用字体加载函数""" + if font_path and os.path.exists(font_path): + return font_manager.FontProperties(fname=font_path, size=size) + else: + st.warning(f"警告:未找到字体文件 '{font_path}',显示可能不正确。将使用默认字体。") + return font_manager.FontProperties(family='sans-serif', size=size) + + +def get_pinyin_abbr(text): + """获取中文文本前两个字的拼音首字母""" + if not text: return "" + chars = [c for c in text if '\u4e00' <= c <= '\u9fff'][:2] + if not chars: return "" + pinyin_list = lazy_pinyin(chars, style=Style.FIRST_LETTER) + return ''.join(pinyin_list).upper() + + +def format_seq(n): + """将数字或字符转换为带圈序号 (①, ②, ③...),非数字则直接返回""" + try: + n = int(n) + except (ValueError, TypeError): + return str(n) + if n <= 0: return str(n) + circled_chars = "①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳㉑㉒㉓㉔㉕㉖㉗㉘㉙㉚㉛㉜㉝㉞㉟㊱㊲㊳㊴㊵㊶㊷㊸㊹㊺㊻㊼㊽㊾㊿" + if 1 <= n <= 50: return circled_chars[n - 1] + return f'({n})' + + +def process_schedule_df(df, base_date, split_time_str, time_adjustment_minutes=0): + """ + 处理排片DataFrame,生成LED屏数据和散场时间数据 + """ + if df is None or df.empty: + return None, None, None + + # 'LED屏排片表' 数据处理 + led_df = df.copy() + try: + # 优先提取 'X号',失败则取第一个字符 + '号' + extracted = led_df['Hall'].astype(str).str.extract(r'(\d+号)') + fallback = led_df['Hall'].astype(str).str[0] + '号' + led_df['Hall'] = extracted[0].fillna(fallback) + + led_df['StartTime_dt'] = pd.to_datetime(led_df['StartTime'], format='%H:%M', errors='coerce').apply( + lambda t: t.replace(year=base_date.year, month=base_date.month, day=base_date.day) if pd.notnull(t) else t) + led_df['EndTime_dt'] = pd.to_datetime(led_df['EndTime'], format='%H:%M', errors='coerce').apply( + lambda t: t.replace(year=base_date.year, month=base_date.month, day=base_date.day) if pd.notnull(t) else t) + led_df.loc[led_df['EndTime_dt'] < led_df['StartTime_dt'], 'EndTime_dt'] += timedelta(days=1) + led_df = led_df.sort_values(['Hall', 'StartTime_dt']) + merged_rows = [] + for _, group in led_df.groupby('Hall'): + current = None + for _, row in group.sort_values('StartTime_dt').iterrows(): + if current is None: + current = row.copy() + elif row['Movie'] == current['Movie']: + current['EndTime_dt'] = row['EndTime_dt'] + else: + merged_rows.append(current) + current = row.copy() + if current is not None: merged_rows.append(current) + merged_df = pd.DataFrame(merged_rows) + merged_df['StartTime_dt'] -= timedelta(minutes=10) + merged_df['EndTime_dt'] -= timedelta(minutes=5) + merged_df['Seq'] = merged_df.groupby('Hall').cumcount() + 1 + merged_df['StartTime_str'] = merged_df['StartTime_dt'].dt.strftime('%H:%M') + merged_df['EndTime_str'] = merged_df['EndTime_dt'].dt.strftime('%H:%M') + led_schedule_df = merged_df[['Hall', 'Seq', 'Movie', 'StartTime_str', 'EndTime_str']] + except Exception as e: + st.error(f"处理 'LED 屏打印表' 数据时出错: {e}") + led_schedule_df = None + + # '散场时间快捷打印' 数据处理 + times_df = df.copy() + try: + # 优先提取数字,失败则取第一个字符 + num_part = times_df['Hall'].str.extract(r'(\d+)')[0] + char_part = times_df['Hall'].astype(str).str[0] + times_df['Hall'] = num_part.fillna(char_part) + times_df.dropna(subset=['Hall', 'StartTime', 'EndTime'], inplace=True) + times_df['StartTime_dt'] = pd.to_datetime(times_df['StartTime'], format='%H:%M', errors='coerce').apply( + lambda t: datetime.combine(base_date, t.time()) if pd.notnull(t) else pd.NaT) + times_df['EndTime_dt'] = pd.to_datetime(times_df['EndTime'], format='%H:%M', errors='coerce').apply( + lambda t: datetime.combine(base_date, t.time()) if pd.notnull(t) else pd.NaT) + times_df.loc[times_df['EndTime_dt'] < times_df['StartTime_dt'], 'EndTime_dt'] += timedelta(days=1) + + # 应用时间提前量 + if time_adjustment_minutes > 0: + times_df['EndTime_dt'] -= timedelta(minutes=time_adjustment_minutes) + + business_start_dt = datetime.combine(base_date, datetime.strptime(BUSINESS_START, "%H:%M").time()) + business_end_dt = datetime.combine(base_date, datetime.strptime(BUSINESS_END, "%H:%M").time()) + if business_end_dt < business_start_dt: business_end_dt += timedelta(days=1) + times_df = times_df[(times_df['EndTime_dt'] >= business_start_dt) & (times_df['EndTime_dt'] <= business_end_dt)] + times_df = times_df.sort_values('EndTime_dt') + split_dt = datetime.combine(base_date, split_time_str) + part1 = times_df[times_df['EndTime_dt'] <= split_dt].copy() + part2 = times_df[times_df['EndTime_dt'] > split_dt].copy() + + # 使用 %H:%M 保证两位小时 + part1['EndTime'] = part1['EndTime_dt'].dt.strftime('%H:%M') + part2['EndTime'] = part2['EndTime_dt'].dt.strftime('%H:%M') + + times_part1_df = part1[['Hall', 'EndTime']] + times_part2_df = part2[['Hall', 'EndTime']] + except Exception as e: + st.error(f"处理 '散场时间表' 数据时出错: {e}") + times_part1_df, times_part2_df = None, None + + return led_schedule_df, times_part1_df, times_part2_df + + +def process_file_upload(file, split_time_str, time_adjustment_minutes=0): + """ + Handles file upload, reads excel and calls the core processing function. + """ + try: + date_df = pd.read_excel(file, header=None, skiprows=7, nrows=1, usecols=[3]) + date_str = pd.to_datetime(date_df.iloc[0, 0]).strftime('%Y-%m-%d') + base_date = pd.to_datetime(date_str).date() + except Exception: + date_str = datetime.today().strftime('%Y-%m-%d') + base_date = datetime.today().date() + + try: + df = pd.read_excel(file, header=9, usecols=[1, 2, 4, 5]) + df.columns = ['Hall', 'StartTime', 'EndTime', 'Movie'] + df['Hall'] = df['Hall'].ffill() + df.dropna(subset=['StartTime', 'EndTime', 'Movie'], inplace=True) + except Exception as e: + st.error(f"读取数据时出错: {e}。请检查文件格式是否为'放映时间核对表'。") + return None, None, None, date_str + + led_data, times_p1, times_p2 = process_schedule_df(df, base_date, split_time_str, time_adjustment_minutes) + return led_data, times_p1, times_p2, date_str + + +def create_print_layout_led(data, date_str, font_path, generate_png=False): + """生成LED屏排片表的PDF/PNG""" + if data is None or data.empty: return None + A4_width_in, A4_height_in = 8.27, 11.69 + dpi = 300 + total_content_rows = len(data) + layout_rows = max(total_content_rows, 25) + totalA = layout_rows + 2 + row_height = A4_height_in / totalA + data = data.reset_index(drop=True) + data['hall_str'] = '$' + data['Hall'].str.replace('号', '') + '^{\\#}$' + data['seq_str'] = data['Seq'].apply(format_seq) + data['pinyin_abbr'] = data['Movie'].apply(get_pinyin_abbr) + data['time_str'] = data['StartTime_str'] + ' - ' + data['EndTime_str'] + temp_fig = plt.figure(figsize=(A4_width_in, A4_height_in), dpi=dpi) + renderer = temp_fig.canvas.get_renderer() + base_font_size_pt = (row_height * 0.9) * 72 + seq_font_size_pt = (row_height * 0.5) * 72 + + def get_col_width_in(series, font_size_pt, is_math=False): + if series.empty: return 0 + font_prop = get_font_properties(font_path, font_size_pt) + longest_str_idx = series.astype(str).str.len().idxmax() + max_content = str(series.loc[longest_str_idx]) + text_width_px, _, _ = renderer.get_text_width_height_descent(max_content, font_prop, ismath=is_math) + return (text_width_px / dpi) * 1.1 + + margin_col_width = row_height + hall_col_width = get_col_width_in(data['hall_str'], base_font_size_pt, is_math=True) + seq_col_width = get_col_width_in(data['seq_str'], seq_font_size_pt) + pinyin_col_width = get_col_width_in(data['pinyin_abbr'], base_font_size_pt) + time_col_width = get_col_width_in(data['time_str'], base_font_size_pt) + movie_col_width = A4_width_in - ( + margin_col_width * 2 + hall_col_width + seq_col_width + pinyin_col_width + time_col_width) + plt.close(temp_fig) + col_widths = {'hall': hall_col_width, 'seq': seq_col_width, 'movie': movie_col_width, 'pinyin': pinyin_col_width, + 'time': time_col_width} + col_x_starts = {} + current_x = margin_col_width + for col_name in ['hall', 'seq', 'movie', 'pinyin', 'time']: + col_x_starts[col_name] = current_x + current_x += col_widths[col_name] + + def draw_figure(fig, ax): + renderer = fig.canvas.get_renderer() + for col_name in ['hall', 'seq', 'movie', 'pinyin']: + x_line = col_x_starts[col_name] + col_widths[col_name] + line_top_y, line_bottom_y = A4_height_in - row_height, row_height + ax.add_line( + Line2D([x_line, x_line], [line_bottom_y, line_top_y], color='gray', linestyle=':', linewidth=0.5)) + last_hall_drawn = None + for i, row in data.iterrows(): + y_bottom = A4_height_in - (i + 2) * row_height + y_center = y_bottom + row_height / 2 + if row['Hall'] != last_hall_drawn: + ax.text(col_x_starts['hall'] + col_widths['hall'] / 2, y_center, row['hall_str'], + fontproperties=get_font_properties(font_path, base_font_size_pt), ha='center', va='center') + last_hall_drawn = row['Hall'] + ax.text(col_x_starts['seq'] + col_widths['seq'] / 2, y_center, row['seq_str'], + fontproperties=get_font_properties(font_path, seq_font_size_pt), ha='center', va='center') + ax.text(col_x_starts['pinyin'] + col_widths['pinyin'] / 2, y_center, row['pinyin_abbr'], + fontproperties=get_font_properties(font_path, base_font_size_pt), ha='center', va='center') + ax.text(col_x_starts['time'] + col_widths['time'] / 2, y_center, row['time_str'], + fontproperties=get_font_properties(font_path, base_font_size_pt), ha='center', va='center') + movie_font_size = base_font_size_pt + movie_font_prop = get_font_properties(font_path, movie_font_size) + text_w_px, _, _ = renderer.get_text_width_height_descent(row['Movie'], movie_font_prop, ismath=False) + text_w_in = text_w_px / dpi + max_width_in = col_widths['movie'] * 0.9 + if text_w_in > max_width_in: + movie_font_size *= (max_width_in / text_w_in) + movie_font_prop = get_font_properties(font_path, movie_font_size) + ax.text(col_x_starts['movie'] + 0.05, y_center, row['Movie'], fontproperties=movie_font_prop, ha='left', + va='center') + is_last_in_hall = (i == len(data) - 1) or (row['Hall'] != data.loc[i + 1, 'Hall']) + line_start_x, line_end_x = margin_col_width, A4_width_in - margin_col_width + if is_last_in_hall: + ax.add_line(Line2D([line_start_x, line_end_x], [y_bottom, y_bottom], color='black', linestyle='-', + linewidth=0.8)) + else: + ax.add_line(Line2D([line_start_x, line_end_x], [y_bottom, y_bottom], color='gray', linestyle=':', + linewidth=0.5)) + + outputs = {} + fig = plt.figure(figsize=(A4_width_in, A4_height_in), dpi=300) + ax = fig.add_axes([0, 0, 1, 1]) + ax.set_axis_off(); + ax.set_xlim(0, A4_width_in); + ax.set_ylim(0, A4_height_in) + ax.text(margin_col_width, A4_height_in - row_height, date_str, fontproperties=get_font_properties(font_path, 10), + color=DATE_COLOR, ha='left', va='bottom', transform=ax.transData) + draw_figure(fig, ax) + pdf_buf = io.BytesIO() + fig.savefig(pdf_buf, format='pdf', dpi=dpi, bbox_inches='tight', pad_inches=0) + pdf_buf.seek(0) + outputs['pdf'] = f"data:application/pdf;base64,{base64.b64encode(pdf_buf.getvalue()).decode()}" + if generate_png: + png_buf = io.BytesIO() + fig.savefig(png_buf, format='png', dpi=dpi, bbox_inches='tight', pad_inches=0) + png_buf.seek(0) + outputs['png'] = f"data:image/png;base64,{base64.b64encode(png_buf.getvalue()).decode()}" + plt.close(fig) + return outputs + + +def create_print_layout_times(data, title, date_str, font_path, size_multiplier=1.1, hall_format='Default', + generate_png=False): + """生成散场时间表的PDF/PNG""" + if data is None or data.empty: return None + + def generate_figure(): + total_items = len(data) + num_rows = math.ceil(total_items / NUM_COLS) if total_items > 0 else 1 + data_area_height_in, cell_width_in = A5_HEIGHT_IN, A5_WIDTH_IN / NUM_COLS + cell_height_in = data_area_height_in / num_rows + target_width_pt, target_height_pt = (cell_width_in * 0.9) * 72, (cell_height_in * 0.9) * 72 + font_size_based_on_width = target_width_pt / (8 * 0.6) + base_fontsize = min(font_size_based_on_width, target_height_pt) * size_multiplier + fig = plt.figure(figsize=(A5_WIDTH_IN, A5_HEIGHT_IN), dpi=300) + fig.subplots_adjust(left=0, right=1, top=1, bottom=0) + gs = gridspec.GridSpec(num_rows, NUM_COLS, hspace=0, wspace=0, figure=fig) + data_values = data.values.tolist() + while len(data_values) % NUM_COLS != 0: data_values.append(['', '']) + rows_per_col_layout = math.ceil(len(data_values) / NUM_COLS) + sorted_data = [['', '']] * len(data_values) + for i, item in enumerate(data_values): + if item[0] and item[1]: + row_in_col, col_idx = i % rows_per_col_layout, i // rows_per_col_layout + new_index = row_in_col * NUM_COLS + col_idx + if new_index < len(sorted_data): sorted_data[new_index] = item + is_first_cell_with_data = True + for idx, (hall, end_time) in enumerate(sorted_data): + if hall and end_time: + row_grid, col_grid = idx // NUM_COLS, idx % NUM_COLS + ax = fig.add_subplot(gs[row_grid, col_grid]) + for spine in ax.spines.values(): + spine.set_visible(True); + spine.set_linestyle((0, (1, 2))) + spine.set_color(BORDER_COLOR); + spine.set_linewidth(0.75) + if is_first_cell_with_data: + ax.text(0.05, 0.95, f"{date_str} {title}", + fontproperties=get_font_properties(font_path, base_fontsize * 0.5), color=DATE_COLOR, + ha='left', va='top', transform=ax.transAxes) + is_first_cell_with_data = False + if hall_format == 'Superscript': + display_text = f'${str(hall)}^{{\\#}}$ {end_time}' + elif hall_format == 'Circled': + display_text = f'{format_seq(hall)} {end_time}' + else: # Default + display_text = f"{str(hall)} {end_time}" + ax.text(0.5, 0.5, display_text, fontproperties=get_font_properties(font_path, base_fontsize), + ha='center', va='center', transform=ax.transAxes) + ax.set_xticks([]); + ax.set_yticks([]); + ax.set_facecolor('none') + return fig + + fig_for_output = generate_figure() + outputs = {} + pdf_buffer = io.BytesIO() + with PdfPages(pdf_buffer) as pdf: + pdf.savefig(fig_for_output) + pdf_buffer.seek(0) + outputs['pdf'] = f"data:application/pdf;base64,{base64.b64encode(pdf_buffer.getvalue()).decode()}" + if generate_png: + png_buffer = io.BytesIO() + fig_for_output.savefig(png_buffer, format='png') + png_buffer.seek(0) + outputs['png'] = f'data:image/png;base64,{base64.b64encode(png_buffer.getvalue()).decode()}' + plt.close(fig_for_output) + return outputs + + +def display_pdf(base64_pdf): + """在Streamlit中嵌入显示PDF""" + return f'' + + +# --- 3. TMS 及天气查询模块 --- +@st.cache_data(show_spinner=False, ttl=600) +def fetch_and_process_server_movies(priority_movie_titles=None): + if priority_movie_titles is None: priority_movie_titles = [] + + # 获取环境变量 + app_secret = os.getenv("TMS_APP_SECRET") + ticket = os.getenv("TMS_TICKET") + theater_id_str = os.getenv("TMS_THEATER_ID") + x_session_id = os.getenv("TMS_X_SESSION_ID") + + # 转换 ID 为整数 + try: + theater_id = int(theater_id_str) if theater_id_str else 0 + except ValueError: + st.error("环境变量 TMS_THEATER_ID 格式错误,应为数字。") + return {}, [] + + token_headers = { + 'Host': 'oa.hengdianfilm.com:7080', 'Content-Type': 'application/json', + 'Origin': 'http://115.239.253.233:7080', 'Connection': 'keep-alive', + 'Accept': 'application/json, text/javascript, */*; q=0.01', + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_5_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/138.0.7204.156 Mobile/15E148 Safari/604.1', + 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', + } + + # 使用变量 + token_json_data = {'appId': 'hd', 'appSecret': app_secret, 'timeStamp': int(time.time() * 1000)} + # 动态构建 URL + token_url = f'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket={ticket}' + + response = requests.post(token_url, headers=token_headers, json=token_json_data, timeout=10) + + response.raise_for_status() + token_data = response.json() + if token_data.get('error_code') != '0000': raise Exception(f"获取Token失败: {token_data.get('error_desc')}") + auth_token = token_data['param'] + all_movies, page_index = [], 1 + while True: + list_headers = { + 'Accept': 'application/json, text/javascript, */*; q=0.01', + 'Content-Type': 'application/json; charset=UTF-8', + 'Origin': 'http://115.239.253.233:7080', 'Proxy-Connection': 'keep-alive', + 'Token': auth_token, + 'User-Agent': 'Mozilla/5.0 ...', + 'X-SESSIONID': x_session_id, # 使用变量 + } + list_params = {'token': 'hd', 'murl': 'ContentMovie'} + # 使用变量 + list_json_data = {'THEATER_ID': theater_id, 'SOURCE': 'SERVER', 'ASSERT_TYPE': 2, 'PAGE_CAPACITY': 20, + 'PAGE_INDEX': page_index} + + list_url = 'http://oa.hengdianfilm.com:7080/cinema-api/cinema/server/dcp/list' + response = requests.post(list_url, params=list_params, headers=list_headers, json=list_json_data, verify=False) + response.raise_for_status() + movie_data = response.json() + if movie_data.get("RSPCD") != "000000": raise Exception(f"获取影片列表失败: {movie_data.get('RSPMSG')}") + body = movie_data.get("BODY", {}); + movies_on_page = body.get("LIST", []) + if not movies_on_page: break + all_movies.extend(movies_on_page) + if len(all_movies) >= body.get("COUNT", 0): break + page_index += 1; + time.sleep(1) + movie_details = {m.get('CONTENT_NAME'): {'assert_name': m.get('ASSERT_NAME'), + 'halls': sorted([h.get('HALL_NAME') for h in m.get('HALL_INFO', [])]), + 'play_time': m.get('PLAY_TIME')} for m in all_movies if + m.get('CONTENT_NAME')} + by_hall = defaultdict(list) + for content_name, details in movie_details.items(): + for hall_name in details['halls']: + by_hall[hall_name].append({'content_name': content_name, 'details': details}) + for hall_name in by_hall: + by_hall[hall_name].sort( + key=lambda item: (item['details']['assert_name'] is None or item['details']['assert_name'] == '', + item['details']['assert_name'] or item['content_name'])) + view2_list = [{'assert_name': d['assert_name'], 'content_name': c, 'halls': d['halls'], 'play_time': d['play_time']} + for c, d in movie_details.items() if d.get('assert_name')] + priority_list = [item for item in view2_list if + any(p_title in item['assert_name'] for p_title in priority_movie_titles)] + other_list_items = [item for item in view2_list if item not in priority_list] + priority_list.sort(key=lambda x: x['assert_name']); + other_list_items.sort(key=lambda x: x['assert_name']) + final_sorted_list = priority_list + other_list_items + return dict(sorted(by_hall.items())), final_sorted_list + + +@st.cache_data(show_spinner=False, ttl=600) +def get_weather_forecast(target_date): + """获取指定日期的天气信息并格式化为标题字符串""" + if not target_date: + return "当日放映影片" + url = "https://restapi.amap.com/v3/weather/weatherInfo" + params = {'key': GAODE_API_KEY, 'city': ADCODE, 'extensions': 'all', 'output': 'JSON'} + try: + response = requests.get(url, params=params, timeout=5) + response.raise_for_status() + data = response.json() + if data.get('status') == '1' and data.get('forecasts'): + target_date_str = target_date.strftime('%Y-%m-%d') + for day_cast in data['forecasts'][0].get('casts', []): + if day_cast.get('date') == target_date_str: + weekday_map = {'1': '一', '2': '二', '3': '三', '4': '四', '5': '五', '6': '六', '7': '日'} + week = weekday_map.get(day_cast.get('week'), '') + weather = day_cast.get('dayweather', '未知') + max_temp = day_cast.get('daytemp', 'N/A') + min_temp = day_cast.get('nighttemp', 'N/A') + return f"今日放映影片({target_date_str},星期{week},{weather},{max_temp}℃ / {min_temp}℃)" + except Exception as e: + print(f"天气 API 请求失败: {e}") + return f"今日放映影片({target_date.strftime('%Y-%m-%d')},天气未知)" + + +def get_circled_number(hall_name): + mapping = {'1': '①', '2': '②', '3': '③', '4': '④', '5': '⑤', '6': '⑥', '7': '⑦', '8': '⑧', '9': '⑨'} + num_str = ''.join(filter(str.isdigit, hall_name)); + return mapping.get(num_str, '') + + +def format_play_time(time_str): + if not time_str or not isinstance(time_str, str): return None + try: + parts = time_str.split(':'); + hours = int(parts[0]); + minutes = int(parts[1]) + return hours * 60 + minutes + except (ValueError, IndexError): + return None + + +def add_tms_locations_to_analysis(analysis_df, tms_movie_list): + locations = [] + for _, row in analysis_df.iterrows(): + movie_title = row['影片'] + found_versions = [] + for tms_movie in tms_movie_list: + if tms_movie['assert_name'].startswith(movie_title): + version_name = tms_movie['assert_name'].replace(movie_title, '').strip() + circled_halls = " ".join(sorted([get_circled_number(h) for h in tms_movie['halls']])) + found_versions.append(f"{version_name}:{circled_halls}" if version_name else circled_halls) + locations.append('|'.join(found_versions)) + analysis_df['影片所在影厅位置'] = locations + return analysis_df + + +# --- 4. 新增的经营数据API函数 --- +def fetch_income_data(token, date_str): + """获取指定日期的票房、卖品收入和卖品占比""" + url = 'https://app.bi.piao51.cn/cinema-app/mycinema/incomeProportion.action' + params = {'cinemaId': CINEMA_ID, 'token': token, 'qTimeStart': date_str, 'qTimeEnd': date_str} + headers = {'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'User-Agent': 'Mozilla/5.0'} + try: + response = requests.get(url, params=params, headers=headers, timeout=10) + response.raise_for_status() + data = response.json() + if data.get('code') == 'A00000' and data.get('results'): + day_data = data['results'][0] + return { + "ticket_income": float(day_data.get('ticketIncome') or 0), + "goods_income": float(day_data.get('goodsIncome') or 0), + "sold_incomes_zb": float(day_data.get('soldIncomesZb') or 0) + } + except Exception as e: + st.warning(f"获取经营收入数据失败: {e}") + return None + + +def fetch_membership_data(token, date_str): + """获取指定日期的文旅卡开卡数""" + url = 'https://app.bi.piao51.cn/cinema-app/mycinema/membership.action' + params = {'token': token, 'cinemaId': CINEMA_ID, 'startDate': date_str, 'endDate': date_str} + headers = {'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'User-Agent': 'Mozilla/5.0'} + try: + response = requests.get(url, params=params, headers=headers, timeout=10) + response.raise_for_status() + data = response.json() + if data.get('code') == 'A00000' and data.get('results'): + for card_type in data['results']: + if card_type.get('cardLevelName') == '文旅消费卡': + return int(card_type.get('sendCardNums', 0)) + return 0 # 没找到文旅卡 + except Exception as e: + st.warning(f"获取会员开卡数据失败: {e}") + return 0 + + +# --- 6. 新增:场次合理性检查日志函数 --- +def generate_schedule_check_logs(schedule_list, date_str): + """ + 生成场次合理性检查日志 + :param schedule_list: 排片数据列表 (list of dicts) + :param date_str: 排片日期字符串 (YYYY-MM-DD) + :return: 格式化后的日志文本 (str) + """ + if not schedule_list: + return "无排片数据,无法进行合理性检查。" + + # 转换为 DataFrame 方便处理 + df_original = pd.DataFrame(schedule_list) + # 重命名列以符合逻辑处理习惯 + df_original.rename( + columns={'hallName': 'Hall', 'movieName': 'filmName', 'showStartTime': 'StartTime', 'showEndTime': 'EndTime'}, + inplace=True) + + # 预处理:转换时间列,简化影厅名 + df_original['startTime'] = pd.to_datetime(df_original['StartTime'], format='%H:%M', errors='coerce').apply( + lambda t: datetime.combine(datetime.strptime(date_str, '%Y-%m-%d').date(), t.time()) if pd.notnull( + t) else pd.NaT) + df_original['endTime'] = pd.to_datetime(df_original['EndTime'], format='%H:%M', errors='coerce').apply( + lambda t: datetime.combine(datetime.strptime(date_str, '%Y-%m-%d').date(), t.time()) if pd.notnull( + t) else pd.NaT) + # 处理跨天时间 + df_original.loc[df_original['endTime'] < df_original['startTime'], 'endTime'] += timedelta(days=1) + + # 简单影厅名处理 (仅提取数字或主要标识) + def simplify_hall(name): + import re + match = re.search(r'(\d+号?)', str(name)) + return match.group(1) if match else str(name)[:2] + + df_original['simpleHallName'] = df_original['Hall'].apply(simplify_hall) + + df_check = df_original.sort_values(by='startTime').reset_index(drop=True) + final_log_parts = [] + + # --- Rule 1: 同影片场次间隔过近 --- + logs_r1 = [] + for film_name in df_check['filmName'].unique(): + film_schedules = df_check[df_check['filmName'] == film_name].sort_values(by='startTime').reset_index() + if len(film_schedules) > 1: + for i in range(len(film_schedules) - 1): + s1, s2 = film_schedules.iloc[i], film_schedules.iloc[i + 1] + interval = (s2['startTime'] - s1['startTime']).total_seconds() / 60 + if interval < 30: + log_entry = f"《{s1['filmName']}》{s1['simpleHallName']}【{s1['startTime'].strftime('%H:%M')}】和 {s2['simpleHallName']}【{s2['startTime'].strftime('%H:%M')}】开场时间距离 {int(interval)} 分钟" + logs_r1.append(log_entry) + + final_log_parts.append("规则一:同影片场次间隔过近(少于 30 分钟)") + if logs_r1: + for i, log in enumerate(logs_r1, 1): + final_log_parts.append(f"{i}. {log}") + else: + final_log_parts.append("(无)") + + # --- Rule 2: 30 分钟内影片开场超过 4 场 --- + logs_r2 = [] + i = 0 + processed_indices_r2 = set() + while i < len(df_check): + if i in processed_indices_r2: + i += 1 + continue + window_start_time = df_check.iloc[i]['startTime'] + window_end_time_30min = window_start_time + timedelta(minutes=30) + window_df = df_check[ + (df_check['startTime'] >= window_start_time) & (df_check['startTime'] < window_end_time_30min)] + + if len(window_df) > 4: + start_t_str = window_df.iloc[0]['startTime'].strftime('%H:%M') + end_t_str = window_df.iloc[-1]['startTime'].strftime('%H:%M') + log_message_lines = [f"【{start_t_str} - {end_t_str}】开场时间比较集中:"] + for _, row in window_df.iterrows(): + log_message_lines.append( + f" {row['simpleHallName']}《{row['filmName']}》> {row['startTime'].strftime('%H:%M')}") + processed_indices_r2.add(row.name) + logs_r2.append("\n".join(log_message_lines)) + i += 1 + + final_log_parts.append("\n规则二:30 分钟内影片开场超过 4 场") + if logs_r2: + for i, log in enumerate(logs_r2, 1): + final_log_parts.append(f"{i}. {log}") + else: + final_log_parts.append("(无)") + + # --- Rule 3: 场次开场间隔超过 30 分钟 --- + logs_r3 = [] + if len(df_check) > 1: + for i in range(len(df_check) - 1): + s1_start, s2_start = df_check.iloc[i]['startTime'], df_check.iloc[i + 1]['startTime'] + gap = (s2_start - s1_start).total_seconds() / 60 + if gap > 30: + log_entry = f"【{s1_start.strftime('%H:%M')} ~ {s2_start.strftime('%H:%M')}】缺少影片开场,间隔 {int(gap)} 分钟" + logs_r3.append(log_entry) + + final_log_parts.append("\n规则三:场次开场间隔超过 30 分钟") + if logs_r3: + for i, log in enumerate(logs_r3, 1): + final_log_parts.append(f"{i}. {log}") + else: + final_log_parts.append("(无)") + + # --- Rule 4: 最早/最晚场次时间检查 --- + logs_r4 = [] + if not df_check.empty: + first_sched = df_check.iloc[0] + last_sched = df_check.iloc[-1] + if first_sched['startTime'].time() > dt_time(10, 0): + logs_r4.append( + f"最早一场 {first_sched['simpleHallName']}《{first_sched['filmName']}》{first_sched['startTime'].strftime('%H:%M')} 晚于 10:00") + if last_sched['startTime'].time() < dt_time(22, 30): + logs_r4.append( + f"最晚一场 {last_sched['simpleHallName']}《{last_sched['filmName']}》{last_sched['startTime'].strftime('%H:%M')} 早于 22:30") + + final_log_parts.append("\n规则四:最早一场晚于 10:00,最晚一场早于 22:30") + if logs_r4: + for i, log in enumerate(logs_r4, 1): + final_log_parts.append(f"{i}. {log}") + else: + final_log_parts.append("(无)") + + # --- Rule 5: 影厅空闲时间超过 1 小时 (10:00-23:00) --- + logs_r5 = [] + today_date = datetime.strptime(date_str, '%Y-%m-%d').date() + window_start_time_limit_r5 = datetime.combine(today_date, dt_time(10, 0)) + window_end_time_limit_r5 = datetime.combine(today_date, dt_time(23, 0)) + + unique_halls_r5 = df_original['simpleHallName'].unique() + for hall_name in unique_halls_r5: + hall_df = df_original[df_original['simpleHallName'] == hall_name].sort_values(by='startTime') + if len(hall_df) > 1: + for i in range(len(hall_df) - 1): + prev_sched_end = hall_df.iloc[i]['endTime'] + curr_sched_start = hall_df.iloc[i + 1]['startTime'] + if prev_sched_end < window_end_time_limit_r5 and curr_sched_start > window_start_time_limit_r5: + idle_duration_minutes = (curr_sched_start - prev_sched_end).total_seconds() / 60 + if idle_duration_minutes > 60: + log_entry = f"{hall_name} 【{prev_sched_end.strftime('%H:%M')} ~ {curr_sched_start.strftime('%H:%M')}】无影片在播,时长 {int(idle_duration_minutes)} 分钟" + logs_r5.append(log_entry) + + final_log_parts.append("\n规则五:影厅空闲时间超过 1 小时(10:00-23:00)") + if logs_r5: + for i, log in enumerate(logs_r5, 1): + final_log_parts.append(f"{i}. {log}") + else: + final_log_parts.append("(无)") + + # --- Rule 6: 影厅场次转换时间检查 --- + logs_r6 = [] + for hall_name in df_original['simpleHallName'].unique(): + hall_df = df_original[df_original['simpleHallName'] == hall_name].sort_values(by='startTime') + if len(hall_df) > 1: + for i in range(len(hall_df) - 1): + prev_sched = hall_df.iloc[i] + next_sched = hall_df.iloc[i + 1] + conversion_time = (next_sched['startTime'] - prev_sched['endTime']).total_seconds() / 60 + if conversion_time < 10: + logs_r6.append( + f"{hall_name} {prev_sched['endTime'].strftime('%H:%M')} 《{prev_sched['filmName']}》结束后影厅空闲时间仅为 {int(conversion_time)} 分钟") + + final_log_parts.append("\n规则六:影厅场次转换时间检查") + if logs_r6: + for i, log in enumerate(logs_r6, 1): + final_log_parts.append(f"{i}. {log}") + else: + final_log_parts.append("(无)") + + # --- Rule 7: 动态散场和入场高峰预警 --- + logs_r7 = [] + final_log_parts.append("\n规则七:动态散场和入场高峰预警") + + if not df_check.empty: + start_time = df_check.iloc[0]['startTime'].replace(second=0, microsecond=0) + end_time = df_check.iloc[-1]['endTime'] + current_time = start_time + reported_windows = set() + + while current_time < end_time: + window_end = current_time + timedelta(minutes=10) + starts_in_window = df_check[(df_check['startTime'] >= current_time) & (df_check['startTime'] < window_end)] + ends_in_window = df_check[(df_check['endTime'] > current_time) & (df_check['endTime'] <= window_end)] + + if len(starts_in_window) + len(ends_in_window) > 5: + window_tuple = (current_time.strftime('%H:%M'), window_end.strftime('%H:%M')) + if window_tuple not in reported_windows: + exit_halls = "、".join(ends_in_window['simpleHallName']) + entry_halls = "、".join(starts_in_window['simpleHallName']) + log_msg = f"【{current_time.strftime('%H:%M')} ~ {window_end.strftime('%H:%M')}】" + if not ends_in_window.empty: + log_msg += f",{exit_halls}集中散场" + if not starts_in_window.empty: + if not ends_in_window.empty: + log_msg += ",同时" + else: + log_msg += "," + log_msg += f"{entry_halls}即将入场" + log_msg += ",预计人流瞬时压力过大。" + logs_r7.append(log_msg) + reported_windows.add(window_tuple) + current_time += timedelta(minutes=5) + + # Part 2: Simultaneous start + start_groups = df_check.groupby('startTime').filter(lambda x: len(x) > 3) + for time_val, group in start_groups.groupby('startTime'): + halls = "、".join(group['simpleHallName']) + logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时开场,注意预计人流瞬时压力过大。") + + # Part 3: Simultaneous end + end_groups = df_check.groupby('endTime').filter(lambda x: len(x) > 3) + for time_val, group in end_groups.groupby('endTime'): + halls = "、".join(group['simpleHallName']) + logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时散场,注意预计人流瞬时压力过大。") + + if logs_r7: + for i, log in enumerate(logs_r7, 1): + final_log_parts.append(f"{i}. {log}") + else: + final_log_parts.append("(无)") + + # --- Rule 8: “幽灵厅”预警 --- + logs_r8 = [] + final_log_parts.append("\n规则八:影厅结束运营过早预警") + for hall_name in df_original['simpleHallName'].unique(): + last_sched = df_original[df_original['simpleHallName'] == hall_name].nlargest(1, 'endTime').iloc[0] + # 简单判断:如果最后一场结束时间早于 22:30 且就是当天(不是跨天到凌晨) + if last_sched['endTime'].date() == today_date and last_sched['endTime'].time() < dt_time(22, 30): + logs_r8.append(f"{hall_name} 最后一场于【{last_sched['endTime'].strftime('%H:%M')}】结束,过早停运。") + if logs_r8: + for i, log in enumerate(logs_r8, 1): + final_log_parts.append(f"{i}. {log}") + else: + final_log_parts.append("(无)") + + # --- Rule 9: 真正意义的黄金时段热门影片排片密度检查 --- + logs_r9 = [] + final_log_parts.append("\n规则九:黄金时段热门影片排片密度检查") + if not df_check.empty: + weekday = today_date.weekday() + golden_hours_r9 = [ + [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], + [(dt_time(14, 0), dt_time(15, 30)), (dt_time(19, 0), dt_time(22, 20))], + [(dt_time(14, 30), dt_time(16, 0)), (dt_time(19, 0), dt_time(21, 40))], + [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], + [(dt_time(14, 0), dt_time(15, 0)), (dt_time(19, 0), dt_time(22, 0))], + [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], + [(dt_time(14, 0), dt_time(17, 0)), (dt_time(19, 0), dt_time(21, 30))] + ][weekday] + + film_counts = df_check['filmName'].value_counts() + if not film_counts.empty: + max_count = film_counts.iloc[0] + # 定义热门影片:排片量接近第一名(95%以上)的影片 + hot_films = film_counts[film_counts >= max_count * 0.95].index.tolist() + + golden_hour_schedules = df_check[ + df_check['startTime'].apply(lambda dt: any(start <= dt.time() < end for start, end in golden_hours_r9))] + + for film in hot_films: + hot_film_total_in_golden = len(golden_hour_schedules[golden_hour_schedules['filmName'] == film]) + golden_total = len(golden_hour_schedules) + if golden_total > 0: + ratio = hot_film_total_in_golden / golden_total + if ratio < 0.3: # 热门影片黄金时段占比低于30%预警 + period_str = " 和 ".join( + [f"{s.strftime('%H:%M')}-{e.strftime('%H:%M')}" for s, e in golden_hours_r9]) + logs_r9.append(f"《{film}》在核心黄金时段 {period_str} 排片占比仅为{ratio:.1%},低于 30%。") + + if logs_r9: + for i, log in enumerate(logs_r9, 1): + final_log_parts.append(f"{i}. {log}") + else: + final_log_parts.append("(无)") + + return "\n".join(final_log_parts) + + +def check_tms_file_availability(schedule_list, tms_data, date_str): + """ + 对比排片表和TMS数据,检查影厅是否缺失对应的影片文件 + """ + if not schedule_list: + return ["未获取到排片数据,无法检查。"] + + if not tms_data: + return ["未获取到 TMS 数据,无法检查。"] + + # 1. 预处理 TMS 数据:建立 { '影厅号(纯数字)': [影片名列表] } 的映射 + # tms_data 结构: {'1号厅': [{'content_name':..., 'details': {'assert_name':...}}], ...} + tms_map = defaultdict(list) + + import re + def get_hall_num(name): + # 提取影厅名称中的数字,例如 "1号厅" -> "1", "IMAX厅" -> "IMAX" (如果没数字) + nums = re.findall(r'\d+', str(name)) + return nums[0] if nums else str(name) + + for hall_name, movies in tms_data.items(): + hall_key = get_hall_num(hall_name) + for movie in movies: + # 收集 assert_name (显示名) 和 content_name (文件名) + # 转为大写方便不区分大小写匹配 + if movie.get('details', {}).get('assert_name'): + tms_map[hall_key].append(str(movie['details']['assert_name']).upper()) + if movie.get('content_name'): + tms_map[hall_key].append(str(movie['content_name']).upper()) + + # 2. 遍历排片数据进行检查 + missing_files_log = [] + # 用于缓存已经检查过的 (影厅, 影片) 组合,避免重复报错 + checked_combinations = set() + + for item in schedule_list: + # 排片数据字段可能不同,做一下兼容 + hall_raw = item.get('hallName') or item.get('Hall') + movie_raw = item.get('movieName') or item.get('Movie') + + if not hall_raw or not movie_raw: + continue + + hall_num = get_hall_num(hall_raw) + + # 清洗影片名,去掉版本后缀,获取核心片名 (使用现有的 clean_movie_title 逻辑) + # 这里为了匹配更宽泛,我们直接取 cleaning 后的基础名,并不依赖 canonical_names (因为可能还在预售没通过API获取到) + movie_clean = clean_movie_title(movie_raw).upper() + + # 组合键 + combo_key = (hall_num, movie_clean) + if combo_key in checked_combinations: + continue + checked_combinations.add(combo_key) + + # 开始检查 + if hall_num not in tms_map: + # 如果 TMS 里甚至没读到这个厅的数据(可能是坏了或者网络问题),视情况报错,这里先提示 + # missing_files_log.append(f"⚠️ 影厅异常:TMS 中未找到【{hall_raw}】的数据,无法检查该厅《{movie_raw}》。") + continue + + # 核心匹配逻辑:只要 TMS 列表中有一个文件名包含了排片名的核心词,就认为有片 + # 例如:排片 "抓娃娃",TMS 有 "Zhuawawa_..." 或 "抓娃娃(数字2D)..." -> 匹配成功 + has_file = False + tms_files = tms_map[hall_num] + + for tms_file in tms_files: + if movie_clean in tms_file: + has_file = True + break + + if not has_file: + missing_files_log.append( + f"❌ 缺片警告:【{hall_raw}】排映《{movie_raw}》,但服务器未检测到包含“{movie_clean}”的文件。") + + return missing_files_log + +# --- 5. UI 渲染与交互逻辑 --- + +def display_analysis_results(df_raw, data_source_name, date_for_display, query_tms_enabled): + if df_raw.empty: + st.info(f"请先从 {data_source_name} 加载数据。"); + return + + if data_source_name == "文件": + token_data = load_token() + if not token_data: + token_data = login_and_get_token() + + token = token_data.get('token') if token_data else None + date_str = date_for_display.strftime('%Y-%m-%d') if date_for_display else None + + canonical_names = [] + if token and date_str: + canonical_names = fetch_canonical_movie_names(token, date_str) + + df_raw['影片名称_清理后'] = df_raw['影片名称'].apply(lambda x: clean_movie_title(x, canonical_names)) + else: + df_raw['影片名称_清理后'] = df_raw['影片名称'] + + date_str = f"{date_for_display} " if date_for_display else "" + total_revenue, total_attendance, total_sessions = df_raw['总收入'].sum(), df_raw['总人次'].sum(), len(df_raw) + st.markdown( + f"> {date_str}数据总览:总票房 **¥{total_revenue:,.2f}** | 总人次 **{total_attendance:,.0f}** | 总场次 **{total_sessions:,.0f}**") + + format_config = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', '均价': '{:.2f}', + '座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', '座次效率': '{:.2f}', + '场次效率': '{:.2f}'} + full_day_analysis, prime_time_analysis = process_and_analyze_data(df_raw.copy()), process_and_analyze_data( + df_raw[df_raw['放映时间'].between(dt_time(14, 0), dt_time(21, 0))].copy()) + + if query_tms_enabled: + with st.spinner("正在关联查询 TMS 服务器..."): + try: + priority_titles = full_day_analysis['影片'].unique().tolist() + _, tms_movie_list = fetch_and_process_server_movies(priority_titles) + full_day_analysis = add_tms_locations_to_analysis(full_day_analysis, tms_movie_list) + prime_time_analysis = add_tms_locations_to_analysis(prime_time_analysis, tms_movie_list) + if '影片所在影厅位置' in full_day_analysis.columns: + cols = full_day_analysis.columns.tolist(); + full_day_analysis = full_day_analysis[cols[:1] + ['影片所在影厅位置'] + cols[1:-1]] + if '影片所在影厅位置' in prime_time_analysis.columns: + cols = prime_time_analysis.columns.tolist(); + prime_time_analysis = prime_time_analysis[cols[:1] + ['影片所在影厅位置'] + cols[1:-1]] + st.toast("TMS 影片位置关联成功!", icon="🔗") + except Exception as e: + st.error(f"关联TMS失败: {e}") + + st.markdown("#### 全天排片效率分析"); + st.dataframe(full_day_analysis.style.format(format_config).apply(style_efficiency, axis=1), + use_container_width=True, hide_index=True) + st.markdown("#### 黄金时段排片效率分析 (14:00-21:00)"); + st.dataframe(prime_time_analysis.style.format(format_config).apply(style_efficiency, axis=1), + use_container_width=True, hide_index=True) + if not full_day_analysis.empty: + st.markdown("### 排片效率汇总") + full_day_summary = full_day_analysis.rename( + columns={'场次': '全部场次', '座次效率': '全部座次效率', '场次效率': '全部场次效率'}) + full_day_cols_to_keep = ['影片', '票房', '全部场次', '全部座次效率', '全部场次效率'] + if '影片所在影厅位置' in full_day_summary.columns: full_day_cols_to_keep.insert(1, '影片所在影厅位置') + full_day_summary = full_day_summary[full_day_cols_to_keep] + prime_time_summary = prime_time_analysis.rename( + columns={'场次': '黄金时段场次', '座次效率': '黄金时段座次效率', '场次效率': '黄金时段场次效率'})[ + ['影片', '黄金时段场次', '黄金时段座次效率', '黄金时段场次效率']] + summary_df = pd.merge(full_day_summary, prime_time_summary, on='影片', how='left').fillna(0) + summary_df['黄金时段场次'] = summary_df['黄金时段场次'].astype(int) + summary_format_config = {'票房': '{:,.2f}', '全部场次': '{:,.0f}', '黄金时段场次': '{:,.0f}', + '全部座次效率': '{:.2f}', '全部场次效率': '{:.2f}', '黄金时段座次效率': '{:.2f}', + '黄金时段场次效率': '{:.2f}'} + st.dataframe(summary_df.style.format(summary_format_config).apply(style_summary_efficiency, axis=1), + use_container_width=True, hide_index=True) + + +def fetch_and_process_daily_sessions(date_str, quiet=False): + """获取并处理指定日期的排片场次,返回(场次字典, 总场次数)""" + if not quiet: st.write(f"正在查询 {date_str} 的排片数据...") + + token_data = load_token() + token = token_data.get('token') if token_data else None + if not token: + token_data = login_and_get_token() + token = token_data.get('token') if token_data else None + + schedule, _ = get_api_data_with_token_management(date_str) + + if not schedule: + if not quiet: st.warning(f"未能获取到 {date_str} 的排片数据。") + return None, None, None # 修改返回值,增加 raw_schedule + + total_sessions = len(schedule) + df = pd.DataFrame(schedule) + + canonical_names = [] + if token: + canonical_names = fetch_canonical_movie_names(token, date_str) + + df['影片名称_清理后'] = df['movieName'].apply(lambda x: clean_movie_title(x, canonical_names)) + + sessions_map = df.groupby('影片名称_清理后').size().to_dict() + # 返回 (映射表, 总场次, 原始排片列表) + return sessions_map, total_sessions, schedule + + +def generate_efficiency_report_df(analysis_df, next_day_sessions_map=None, next_day_total_sessions=None): + if analysis_df.empty: return pd.DataFrame() + report_df = analysis_df[ + ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率', '场次效率']].copy() + report_df['情况说明'] = ''; + report_df['次日调整方案'] = '' + if next_day_sessions_map is not None: + report_df['次日场数'] = report_df['影片'].map(next_day_sessions_map).fillna(0).astype(int) + else: + report_df['次日场数'] = report_df['场次'] + totals = report_df[['座位数', '场次', '票房', '人次']].sum() + totals['影片'] = '' + totals['次日场数'] = next_day_total_sessions if next_day_total_sessions is not None else report_df['次日场数'].sum() + report_df = pd.concat([report_df, pd.DataFrame(totals).T], ignore_index=True) + return report_df + + +def generate_excel_paste_data(df): + if df.empty: return "" + lines, total_row_num = [], len(df) + 1 + for i, row in df.iterrows(): + excel_row_num = i + 2 + line = [row['影片'], row['座位数'], row['场次'], row['票房'], row['人次']] + if i == len(df) - 1: # 合计行 + line.extend(['', '', '', '', '', '', '', '', row['次日场数']]) + else: # 数据行 + line.extend([f"=IFERROR(F{excel_row_num}/G{excel_row_num},0)", f"=D{excel_row_num}/D${total_row_num}", + f"=E{excel_row_num}/E${total_row_num}", f"=F{excel_row_num}/F${total_row_num}", + f"=IFERROR(K{excel_row_num}/I{excel_row_num},0)", + f"=IFERROR(K{excel_row_num}/J{excel_row_num},0)", row['情况说明'], row['次日调整方案'], + row['次日场数']]) + lines.append("\t".join(map(str, line))) + return "\n".join(lines) + + +def get_business_date(df_with_datetime): + """根据包含完整日期时间列的DataFrame计算营业日""" + crossover_time = dt_time(6, 0) + df_with_datetime['business_date'] = df_with_datetime['datetime'].apply( + lambda dt: (dt - timedelta(days=1)).date() if dt.time() < crossover_time else dt.date() + ) + return df_with_datetime['business_date'].mode()[0] + + +# --- 主应用 --- +def main(): + st.title('影城工作便捷工具') + + # 初始化 session_state + if 'file_df' not in st.session_state: st.session_state.file_df, st.session_state.api_df = pd.DataFrame(), pd.DataFrame() + if 'api_date' not in st.session_state: st.session_state.api_date = datetime.now().date() + if 'file_date' not in st.session_state: st.session_state.file_date = None + if 'today_movie_count' not in st.session_state: st.session_state.today_movie_count = 0 + if 'previous_day_movie_count' not in st.session_state: st.session_state.previous_day_movie_count = 0 + if 'daily_report_df' not in st.session_state: st.session_state.daily_report_df = pd.DataFrame() + if 'processed_print_data' not in st.session_state: st.session_state.processed_print_data = None + if 'check_logs' not in st.session_state: st.session_state.check_logs = "" + + tab1, tab2, tab_sales, tab_report, tab_print, tab3 = st.tabs( + ["🔍 排片效率分析", "📋 次日排片效率分析报表", "🍿 卖品品类分析表", "📑 影片��出日累计表", "🖨️ 场次与散场打印", + "🎬 TMS 影片查询"]) + + with tab1: + # 顶部控制区 + col_a, col_b = st.columns(2) + with col_a: + import_from_file = st.checkbox("从`影片映出日累计报表.xlsx`导入数据") + with col_b: + query_tms_for_location = st.checkbox("查询 TMS 找影片所在影厅") + + if import_from_file: + st.header("从本地文件导入数据") + st.write("上传 `影片映出日累计报表.xlsx`,程序将自动处理数据。") + uploaded_file = st.file_uploader("上传 Excel 文件", type=['xlsx', 'xls'], label_visibility="collapsed") + if uploaded_file is not None: + with st.spinner("正在处理文件..."): + try: + df = pd.read_excel(uploaded_file, skiprows=3, header=None) + df.rename(columns={0: '影片名称', 1: '放映日期', 2: '放映时间', 5: '总人次', 6: '总收入', + 7: '座位数'}, inplace=True) + df_for_date_calc = df[['放映日期', '放映时间']].copy() + df_for_date_calc['datetime_str'] = df_for_date_calc['放映日期'].astype(str).str.split(' ').str[ + 0] + ' ' + df_for_date_calc['放映时间'].astype(str) + df_for_date_calc['datetime'] = pd.to_datetime(df_for_date_calc['datetime_str'], errors='coerce') + df_for_date_calc.dropna(subset=['datetime'], inplace=True) + business_date = get_business_date(df_for_date_calc) + st.session_state.file_date = business_date + st.toast(f"文件营业日识别为: {business_date}", icon="🗓️") + df = df[['影片名称', '放映时间', '座位数', '总收入', '总人次']] + df.dropna(subset=['影片名称', '放映时间'], inplace=True) + for col in ['座位数', '总收入', '总人次']: df[col] = pd.to_numeric(df[col], + errors='coerce').fillna(0) + df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M:%S', errors='coerce').dt.time + df.dropna(subset=['放映时间'], inplace=True) + st.session_state.file_df = df + except Exception as e: + st.error(f"处理文件或识别日期时出错: {e}"); + st.session_state.file_df, st.session_state.file_date = pd.DataFrame(), None + # 展示分析结果 (传递 query_tms_for_location 的状态) + display_analysis_results(st.session_state.file_df, "文件", st.session_state.file_date, + query_tms_for_location) + + else: + st.header("使用 API 获取数据") + st.session_state.api_date = st.date_input("选择要查询的排片日期", value=st.session_state.api_date, + key="api_date_picker") + if st.button("获取排片数据", key="fetch_api_data", icon="🫵"): + with st.spinner(f"正在获取 {st.session_state.api_date} 的排片数据..."): + + token_data = load_token() + if not token_data: + token_data = login_and_get_token() + token = token_data.get('token') if token_data else None + + schedule, halls = get_api_data_with_token_management(st.session_state.api_date.strftime('%Y-%m-%d')) + + if schedule is not None and halls is not None: + # 传入 token 和 date 以进行标准名清洗 + processed_df = process_api_data(schedule, halls, token, + st.session_state.api_date.strftime('%Y-%m-%d')) + st.session_state.api_df = processed_df + if not processed_df.empty: st.toast(f"成功获取并处理了 {len(processed_df)} 条排片数据!", + icon="✅") + else: + st.session_state.api_df = pd.DataFrame() + # 展示分析结果 (传递 query_tms_for_location 的状态) + display_analysis_results(st.session_state.api_df, "API", st.session_state.api_date, query_tms_for_location) + + with tab2: + # 确定数据源 (逻辑保持不变,API > 文件) + source_df_raw, source_date, source_date_str = pd.DataFrame(), None, "" + if not st.session_state.api_df.empty: + source_df_raw, source_date, source_date_str = st.session_state.api_df, st.session_state.api_date, f"{st.session_state.api_date}" + st.toast("正在使用��自 **API 获取** 的最新数据。", icon="☁️") + elif not st.session_state.file_df.empty: + source_df_raw, source_date, source_date_str = st.session_state.file_df, st.session_state.file_date, f"{st.session_state.file_date} (文件)" + st.toast("API数据为空,正在使用来自 **文件导入** 的数据。", icon="📃") + else: + st.warning('没有可用的数据。请先在 "🔍 排片效率分析" 标签页加载数据。') + + st.header(f"{source_date_str} 排片效率分析与调整建议") + if not source_df_raw.empty: + if st.button("生成分析报表", icon="🫵"): + with st.spinner("正在生成分析报表 (含跨日数据查询)..."): + next_day_sessions_map, next_day_total_sessions = None, None + previous_day_sessions_map = None + token_data = load_token() + + if source_date and token_data: + token = token_data.get('token') + # 获取次日数据,并拿到原始排片表用于检查 + next_day = source_date + timedelta(days=1) + next_day_str = next_day.strftime('%Y-%m-%d') + next_day_sessions_map, next_day_total_sessions, next_day_raw_schedule = fetch_and_process_daily_sessions( + next_day_str, quiet=True) + + # 生成合理性检查日志 + if next_day_raw_schedule: + logs = generate_schedule_check_logs(next_day_raw_schedule, next_day_str) + st.session_state.check_logs = logs + else: + st.session_state.check_logs = "无法获取次日排片详情,跳过检查。" + + # 获取前一日数据 + previous_day = source_date - timedelta(days=1) + previous_day_sessions_map, _, _ = fetch_and_process_daily_sessions( + previous_day.strftime('%Y-%m-%d'), quiet=True) + + # 获取经营摘要数据 + date_str = source_date.strftime('%Y-%m-%d') + income_data = fetch_income_data(token, date_str) + wenlv_cards = fetch_membership_data(token, date_str) + attendance = int(source_df_raw['总人次'].sum()) + + st.session_state.daily_summary_data = { + "ticket_income": income_data.get('ticket_income', 0.0) if income_data else 0.0, + "attendance": attendance, + "goods_income": income_data.get('goods_income', 0.0) if income_data else 0.0, + "sold_incomes_zb": income_data.get('sold_incomes_zb', 0.0) if income_data else 0.0, + "wenlv_cards": wenlv_cards + } + else: + st.error("无法确定源数据日期或Token,无法获取跨日及经营数据。") + + if '影片名称_清理后' not in source_df_raw.columns: + source_df_raw['影片名称_清理后'] = source_df_raw['影片名称'] + + analysis_df = process_and_analyze_data(source_df_raw.copy()) + st.session_state.today_movie_count = len(analysis_df) + st.session_state.previous_day_movie_count = len( + previous_day_sessions_map) if previous_day_sessions_map else 0 + + report_df = generate_efficiency_report_df(analysis_df, next_day_sessions_map, + next_day_total_sessions) + st.session_state.report_df = report_df + st.session_state.excel_paste_data = generate_excel_paste_data(report_df) + + if 'report_df' in st.session_state and not st.session_state.report_df.empty: + st.markdown("#### 排片效率分析表"); + display_df = st.session_state.report_df.copy() + display_df.insert(0, '序号', range(2, len(display_df) + 2)) + report_format = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', + '均价': '{:.2f}', '座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', + '座次效率': '{:.2f}', '场次效率': '{:.2f}', '次日场数': '{:,.0f}'} + display_df['均价'] = pd.to_numeric(display_df['均价'], errors='coerce').replace([np.inf, -np.inf], + np.nan) + st.dataframe(display_df.style.format(report_format, na_rep="#DIV/0!"), use_container_width=True, + hide_index=True) + + n_diff = st.session_state.today_movie_count - st.session_state.previous_day_movie_count + if n_diff > 0: + excel_copy_title = f"复制到 Excel (需要增加 {n_diff} 行,从第一个电影名字开始粘贴)" + elif n_diff < 0: + excel_copy_title = f"复制到 Excel (需要减少 {abs(n_diff)} 行,从第一个电影名字开始粘贴)" + else: + excel_copy_title = "复制到 Excel (行数保持不变,从第一个电影名字开始粘贴)" + st.markdown(f"##### {excel_copy_title}"); + st.code(st.session_state.excel_paste_data, language='text') + + + # 复制列表 + report_df = st.session_state.report_df + movie_titles = report_df.iloc[:-1]['影片'].tolist() + weather_title = get_weather_forecast(source_date) + st.markdown(f"##### {weather_title}") + st.code(''.join([f'《{title}》' for title in movie_titles]), language='text') + + # 经营摘要 + if 'daily_summary_data' in st.session_state: + summary_data = st.session_state.daily_summary_data + summary_text = ( + f"花都店,今日票房:{summary_data.get('ticket_income', 0.0):.2f}元," + f"观影人次:{summary_data.get('attendance', 0)}," + f"卖品收入:{summary_data.get('goods_income', 0.0):.2f}元," + f"卖品占比:{summary_data.get('sold_incomes_zb', 0.0):.2f}%," + f"文旅卡:{summary_data.get('wenlv_cards', 0)}张。" + ) + st.markdown(f"##### 今日经营数据概览") + st.code(summary_text) + st.markdown(f"> 抽奖券计算方法:先在鼎新报表系统里查询`卡发行`当日开卡数量然后查询`卡充值`当日详细的充值金额,**充值金额整除 200 加上卡发行数量即为抽奖券数量**。") + + st.markdown("#### 🔍 场次合理性检查日志") + if st.session_state.check_logs: + st.code(st.session_state.check_logs) + st.markdown("#### 📡 次日排片 TMS 文件核对") + st.info( + "此功能将查询 TMS 服务器,检查次日排程的影厅是否有对应的影片文件(不区分语言和制式版本,仅匹配片名)。") + + if st.button("开始核对 TMS 文件", key="check_tms_files_btn", icon="🕵️‍♂️"): + # 确定次日日期 + check_date = source_date + timedelta(days=1) + check_date_str = check_date.strftime('%Y-%m-%d') + + with st.spinner(f"正在获取 {check_date_str} 的排片数据并连接 TMS 服务器..."): + # 1. 获取次日排片 (如果之前没获取过) + # 注意:为了确保数据最新,这里重新快速获取一次原始数据 + schedule_data, _ = get_api_data_with_token_management(check_date_str) + + if not schedule_data: + st.error(f"无法获取 {check_date_str} 的排片数据,请检查网络或 Token。") + else: + try: + # 2. 获取 TMS 数据 + # 提取排片中出现的所有影片名作为优先查询关键词,加快 TMS 搜索速度 (虽然后台是全量拉取) + df_sched = pd.DataFrame(schedule_data) + priority_titles = df_sched[ + 'movieName'].unique().tolist() if 'movieName' in df_sched.columns else [] + + tms_hall_data, _ = fetch_and_process_server_movies(priority_titles) + + # 3. 执行比对 + missing_logs = check_tms_file_availability(schedule_data, tms_hall_data, + check_date_str) + + if not missing_logs: + st.success( + f"✅ 核对完成:{check_date_str} 所有排映影片在对应影厅服务器中均存在关联文件。") + else: + st.error(f"⚠️ 发现 {len(missing_logs)} 个潜在缺片风险!") + for log in missing_logs: + st.code(log) + + except Exception as e: + st.error(f"核对过程中发生错误: {e}") + + with tab_sales: + # 卖品品类分析表 UI + col_1, col_2 = st.columns(2) + with col_1: + sales_from_file = st.checkbox("从`商品销售汇总报表-已退减.xlsx`导入数据", key="sales_file_cb") + + if sales_from_file: + st.header("从本地文件导入数据") + st.write("请上传 `商品销售汇总报表-已退减.xlsx` 文件。") + uploaded_file = st.file_uploader("上传 Excel 文件", type=['xlsx', 'xls'], label_visibility="collapsed", + key="sales_file_uploader") + if uploaded_file is not None: + with st.spinner("正在读取并分析文件..."): + try: + df = pd.read_excel(uploaded_file, skiprows=3) + final_summary, copy_text = process_sales_data(df) + if final_summary is not None: + st.markdown("#### 销售总览 (套餐 Top 10 + 单品 Top 5)") + st.dataframe(final_summary, use_container_width=True, hide_index=True) + st.markdown("##### 复制到 Excel") + st.code(copy_text, language='text') + except Exception as e: + st.error(f"处理文件时发生错误: {e}") + else: + st.header("从服务器API获取实时数据") + sales_date = st.date_input("选择要查询的日期", value=datetime.now().date(), key="sales_date_picker") + if st.button("获取卖品销售数据", key="fetch_sales_data", icon="🫵"): + with st.spinner(f"正在获取 {sales_date} 的销售数据..."): + api_results = get_sales_data_with_token_management(sales_date) + if api_results is not None: + st.toast(f"成功获取到 {len(api_results)} 条销售记录!", icon="✅") + df = transform_api_data_to_df(api_results) + final_summary, copy_text = process_sales_data(df) + if final_summary is not None: + st.markdown("#### 销售总览 (套餐 Top 10 + 单品 Top 5)") + st.dataframe(final_summary, use_container_width=True, hide_index=True) + st.markdown("##### 复制到 Excel") + st.code(copy_text, language='text') + else: + st.warning("未能从 API 获取到数据,请检查登录或网络连接。") + + with tab_report: + # 影片映出日累计表 UI + st.header("影片映出日累计报表生成") + report_date = st.date_input("选择要查询的排片日期", value=datetime.now().date(), key="daily_report_date") + + if st.button("获取并生成报表", key="fetch_daily_report", icon="🫵"): + report_date_str = report_date.strftime('%Y-%m-%d') + with st.spinner(f"正在获取 {report_date_str} 的排片数据..."): + + token_data = load_token() + if not token_data: token_data = login_and_get_token() + token = token_data.get('token') if token_data else None + + schedule, halls_map = get_api_data_with_token_management(report_date_str) + if schedule is not None and halls_map is not None: + processed_df = process_and_filter_data_for_report(schedule, halls_map, report_date_str, token) + st.session_state.daily_report_df = processed_df + if not processed_df.empty: + st.toast(f"成功获取并处理了 {len(processed_df)} 条有效场次数据!", icon="✅") + else: + st.session_state.daily_report_df = pd.DataFrame() + + if not st.session_state.daily_report_df.empty: + st.markdown(f"#### {report_date.strftime('%Y-%m-%d')} 影片映出日累计报表") + st.dataframe( + st.session_state.daily_report_df.style.format({ + '人数合计': '{:,.0f}', '座位数': '{:,.0f}', '上座率%': '{:.2f}%' + }), + use_container_width=True, hide_index=True + ) + import io + output_buffer = io.BytesIO() + st.session_state.daily_report_df.to_excel(output_buffer, index=False, engine='openpyxl') + excel_data = output_buffer.getvalue() + st.download_button( + label="📥 下载 XLSX 报表文件", + data=excel_data, + file_name=f"{report_date.strftime('%Y-%m-%d')}_影片映出日累计报表.xlsx", + mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ) + else: + st.info("请选择日期并点击“获取并生成报表”以生成数据。") + + with tab_print: + # 场次与散场时间快捷打印 UI + st.header("场次与散场时间快捷打印") + + with st.expander("⚙️ 显示与��印设置", expanded=False): + col1, col2 = st.columns(2) + with col1: + st.subheader("⚙️ 修改 LED 屏排片表设置") + led_font_name = st.selectbox("字体选择", options=list(AVAILABLE_FONTS.keys()), index=0, key="led_font") + led_font_path = AVAILABLE_FONTS.get(led_font_name) + generate_png_led = st.checkbox("生成 PNG 图片", value=False, key="png_led") + with col2: + st.subheader("⚙️ 散场时间表设置") + times_font_name = st.selectbox("字体选择", options=list(AVAILABLE_FONTS.keys()), index=1, + key="times_font") + times_font_path = AVAILABLE_FONTS.get(times_font_name) + font_size_multiplier = st.slider("字体大小调节", min_value=0.8, max_value=1.5, value=1.2, step=0.05, + help="调整字体在单元格内的相对大小") + split_time = st.time_input("白班 / 晚班分割时间", value=dt_time(17, 0), + help="散场时间在此时间点之前(含)的为白班") + time_adjustment = st.slider("时间提前 (分钟)", min_value=0, max_value=10, value=0, + help="将所有散场时间提前 N 分钟显示") + hall_display_format = st.radio("影厅号格式", options=['Default', 'Superscript', 'Circled'], + format_func=lambda x: + {'Default': '默认 (2 18:28)', 'Superscript': '上标 (2# 18:28)', + 'Circled': '带圈 (② 18:28)'}[x], horizontal=True) + generate_png_times = st.checkbox("生成 PNG 图片", value=False, key="png_times") + + with st.expander("💡 使用帮助", expanded=False): + st.markdown(""" + #### 🖨️ 功能简介 + 本工具用于将影院的排期数据快速转换为两种形式的打印页: + 1. **修改 LED 屏幕排片表打印**:A4 竖版,详细列出影厅、场次、影片名、拼音缩写和时间范围,方便员工在修改 LED 屏幕时快速查阅和输入。 + 2. **散场时间打印**:A5 竖版,以大字体分栏显示各影厅的散场时间,方便员工在疏散人群和清洁影厅时查阅。 + + #### ⬇️ 操作步骤 + 1. **选择数据源**: + * **从文件导入**:导出 `放映时间核对表.xls` 后,点击 "Browse files" 按钮上传。 + * **从 API 获取**:选择日期,点击 "获取排片数据" 按钮,程序将自动登录并拉取最新数据。 + 2. **调整设置 (可选)**:点击上方的 "显示与打印设置" 打开设置面板,根据需要调整字体、大小、格式等。 + 3. **预览与打印**: + * 数据加载成功后,下方会自动生成预览。 + * 默认显示 **PDF 预览**,这是最适合打印的格式。可以直接在预览界面点击 🖨️ 打印按钮。 + """) + + print_tab1, print_tab2 = st.tabs(["☁️ 从 API 获取", "📁 从文件导入"]) + + with print_tab2: + uploaded_file_print = st.file_uploader("请上传 `放映时间核对表.xls` 文件", type=["xls"], + key="print_file_uploader") + if uploaded_file_print: + with st.spinner("正在处理文件,请稍候..."): + led_data, times_part1, times_part2, date_str = process_file_upload(uploaded_file_print, split_time, + time_adjustment) + st.session_state.processed_print_data = { + "led_data": led_data, + "times_part1": times_part1, + "times_part2": times_part2, + "date_str": date_str + } + if date_str: + st.toast(f"文件处理完成!排期日期:**{date_str}**", icon="🎉") + + with print_tab1: + # 修改 value 为当前日期 + 1天 + print_api_date = st.date_input("选择要查询的排片日期", value=datetime.now().date() + timedelta(days=1), + key="print_api_date_picker") + if st.button("获取排片数据", key="fetch_print_api_data", icon="🫵"): + with st.spinner(f"正在获取 {print_api_date} 的排片数据..."): + date_str_api = print_api_date.strftime('%Y-%m-%d') + # 重用 app2.py 中现有的 API 获取逻辑,不需要重复写 fetch_schedule_data + schedule_list, _ = get_api_data_with_token_management(date_str_api) + + if schedule_list is not None and len(schedule_list) > 0: + df_api = pd.DataFrame(schedule_list) + df_api.rename(columns={'hallName': 'Hall', 'showStartTime': 'StartTime', + 'showEndTime': 'EndTime', 'movieName': 'Movie'}, inplace=True) + df_api = df_api[['Hall', 'StartTime', 'EndTime', 'Movie']] + + led_data, times_part1, times_part2 = process_schedule_df(df_api, print_api_date, split_time, + time_adjustment) + st.session_state.processed_print_data = { + "led_data": led_data, + "times_part1": times_part1, + "times_part2": times_part2, + "date_str": date_str_api + } + st.toast(f"成功获取 {len(schedule_list)} 条排片数据!", icon="✅") + elif schedule_list is not None: + st.warning("成功连接API,但当天没有排片数据。") + st.session_state.processed_print_data = None + else: + st.error("获取API数据失败。") + st.session_state.processed_print_data = None + + # --- 显示打印预览结果 --- + if st.session_state.processed_print_data: + data = st.session_state.processed_print_data + led_data, times_part1, times_part2, date_str = data["led_data"], data["times_part1"], data["times_part2"], \ + data["date_str"] + + + # 显示 LED 屏排片表 + st.header("🖥️ 修改 LED 屏幕排片表打印") + if led_data is not None and not led_data.empty: + led_output = create_print_layout_led(led_data, date_str, led_font_path, generate_png_led) + if led_output: + tabs = ["PDF 预览"] + if 'png' in led_output: tabs.append("PNG 预览") + tab_views = st.tabs(tabs) + with tab_views[0]: + st.markdown(display_pdf(led_output['pdf']), unsafe_allow_html=True) + if 'png' in led_output: + with tab_views[1]: st.image(led_output['png'], use_container_width=True) + else: + st.error("未能成功生成 '修改 LED 屏排片表'。请检查数据源。") + + # 显示散场时间快捷打印 + st.header("🔚 散场时间打印") + col1, col2 = st.columns(2) + with col1: + if times_part1 is not None and not times_part1.empty: + part1_output = create_print_layout_times(times_part1, "A", date_str, times_font_path, + font_size_multiplier, hall_display_format, + generate_png_times) + if part1_output: + tabs1 = [f"白班 (≤ {split_time.strftime('%H:%M')}) PDF 预览"] + if 'png' in part1_output: tabs1.append(f"白班 (≤ {split_time.strftime('%H:%M')}) PNG 预览") + tab_views1 = st.tabs(tabs1) + with tab_views1[0]: + st.markdown(display_pdf(part1_output['pdf']), unsafe_allow_html=True) + if 'png' in part1_output: + with tab_views1[1]: st.image(part1_output['png']) + else: + st.info(f"白班 (≤ {split_time.strftime('%H:%M')}) 没有排期数据。") + + with col2: + if times_part2 is not None and not times_part2.empty: + part2_output = create_print_layout_times(times_part2, "C", date_str, times_font_path, + font_size_multiplier, hall_display_format, + generate_png_times) + if part2_output: + tabs2 = [f"晚班 (> {split_time.strftime('%H:%M')}) PDF 预览"] + if 'png' in part2_output: tabs2.append(f"晚班 (> {split_time.strftime('%H:%M')}) PNG 预览") + tab_views2 = st.tabs(tabs2) + with tab_views2[0]: + st.markdown(display_pdf(part2_output['pdf']), unsafe_allow_html=True) + if 'png' in part2_output: + with tab_views2[1]: st.image(part2_output['png']) + else: + st.info(f"晚班 (> {split_time.strftime('%H:%M')}) 没有排期数据。") + else: + st.info("👆 请先从文件或API加载数据以生成预览。") + + with tab3: + st.header("TMS 服务器影片内容查询") + if st.button('点击查询 TMS 服务器', key="query_tms", icon="🫵"): + with st.spinner("正在从 TMS 服务器获取数据中..."): + try: + priority_titles, df_for_tms = [], pd.DataFrame() + if not st.session_state.api_df.empty: + df_for_tms = st.session_state.api_df + elif not st.session_state.file_df.empty: + df_for_tms = st.session_state.file_df + if not df_for_tms.empty: + # 优先使用清洗后的名字 + if '影片名称_清理后' in df_for_tms.columns: + priority_titles = df_for_tms['影片名称_清理后'].unique().tolist() + else: + priority_titles = df_for_tms['影片名称'].apply( + lambda x: clean_movie_title(x)).unique().tolist() + + halls_data, movie_list_sorted = fetch_and_process_server_movies(priority_titles) + st.toast("TMS 服务器数据获取成功!", icon="🎉") + st.markdown("#### 按影片查看所在影厅") + view2_data = [{'影片名称': item['assert_name'], + '所在影厅': " ".join(sorted([get_circled_number(h) for h in item['halls']])), + '文件名': item['content_name'], '时长(分钟)': format_play_time(item['play_time'])} + for item in movie_list_sorted] + st.dataframe(pd.DataFrame(view2_data), hide_index=True, use_container_width=True) + st.markdown("#### 按影厅查看影片内容") + hall_tabs = st.tabs(list(halls_data.keys())) + for tab, hall_name in zip(hall_tabs, halls_data.keys()): + with tab: + view1_data = [{'影片名称': item['details']['assert_name'], '所在影厅': " ".join( + sorted([get_circled_number(h) for h in item['details']['halls']])), + '文件名': item['content_name'], + '时长(分钟)': format_play_time(item['details']['play_time'])} for item in + halls_data[hall_name]] + st.dataframe(pd.DataFrame(view1_data), hide_index=True, use_container_width=True) + except Exception as e: + st.error(f"查询TMS服务器时出错: {e}") + + +if __name__ == "__main__": + main() +