Spaces:
Running
Running
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import requests | |
| import time | |
| from collections import defaultdict | |
| import json | |
| import os | |
| import re | |
| from datetime import datetime, timedelta, time as dt_time | |
| import io | |
| import warnings | |
| import matplotlib.pyplot as plt | |
| import matplotlib.font_manager as font_manager | |
| from matplotlib.lines import Line2D | |
| from matplotlib.backends.backend_pdf import PdfPages | |
| import matplotlib.gridspec as gridspec | |
| import base64 | |
| import math | |
| from pypinyin import lazy_pinyin, Style | |
| from itertools import combinations | |
| # --- 新增:加载环境变量 --- | |
| from dotenv import load_dotenv | |
| load_dotenv() # 加载本地 .env 文件 | |
| # --- 全局配置和常量 --- | |
| TOKEN_FILE = 'token_data.json' | |
| # --- 环境变量获取 (替代硬编码) --- | |
| # 使用 os.getenv 获取,如果获取不到默认为空字符串或特定默认值 | |
| GAODE_API_KEY = os.getenv("GAODE_API_KEY", "") | |
| ADCODE = os.getenv("ADCODE") | |
| CINEMA_ID = os.getenv("CINEMA_ID") | |
| # --- 打印功能相关常量 --- | |
| BUSINESS_START = "09:30" | |
| BUSINESS_END = "01:30" | |
| BORDER_COLOR = 'grey' | |
| DATE_COLOR = '#A9A9A9' | |
| A5_WIDTH_IN = 5.83 | |
| A5_HEIGHT_IN = 8.27 | |
| NUM_COLS = 3 | |
| # --- 打印字体清单 --- | |
| ALL_FONTS = { | |
| "思源黑体-常规 (推荐 LED 屏)": "SimHei.ttf", | |
| "思源黑体-重体 (推荐散场表)": "SourceHanSansOLD-Heavy-2.otf", | |
| "思源黑体-粗体": "SourceHanSansOLD-Bold-2.otf", | |
| "思源宋体-常规": "SourceHanSansCN-Normal.otf", | |
| "苹方-中黑": "PingFangSC-Medium.otf", | |
| "苹方-半粗": "PingFangSC-Semibold.otf", | |
| "苹方-极细": "PingFangSC-Ultralight.otf", | |
| "阿里巴巴普惠体-常规": "Alibaba-PuHuiTi.ttf", | |
| "阿里巴巴普惠体-粗体": "AlibabaPuHuiTi-Bold.otf", | |
| "阿里巴巴普惠体-重体": "AlibabaPuHuiTi-Heavy.otf", | |
| } | |
| # 检查可用字体 | |
| AVAILABLE_FONTS = {name: fname for name, fname in ALL_FONTS.items() if os.path.exists(fname)} | |
| # --- 忽略特定警告 --- | |
| # 忽略 openpyxl 的样式警告 | |
| warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl") | |
| # 忽略 pandas 日期解析的警告 (针对无法推断格式的情况) | |
| warnings.filterwarnings("ignore", message="Could not infer format") | |
| # --- 页面基础设置 --- | |
| st.set_page_config(layout="wide", page_title="影城工作便捷工具") | |
| # --- 1. API 数据获取模块 --- | |
| # --- 1.1 Token 管理 --- | |
| def save_token(token_data): | |
| """将Token数据保存到JSON文件""" | |
| try: | |
| with open(TOKEN_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(token_data, f, ensure_ascii=False, indent=4) | |
| return True | |
| except Exception as e: | |
| st.error(f"保存Token失败: {e}") | |
| return False | |
| def load_token(): | |
| """从JSON文件加载Token数据""" | |
| if os.path.exists(TOKEN_FILE): | |
| try: | |
| with open(TOKEN_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, FileNotFoundError): | |
| return None | |
| return None | |
| def login_and_get_token(): | |
| """执行登录操作并获取新的Token""" | |
| st.write("Token无效或已过期,正在尝试重新登录...") | |
| # 获取环境变量 | |
| username = os.getenv("CINEMA_USERNAME") | |
| password = os.getenv("CINEMA_PASSWORD") | |
| res_code = os.getenv("CINEMA_RES_CODE") | |
| device_id = os.getenv("CINEMA_DEVICE_ID") | |
| # 简单检查,防止未配置环境变量导致后续请求莫名报错 | |
| if not all([username, password, res_code]): | |
| st.error("登录失败:未配置用户名、密码或影院编码环境变量。") | |
| return None | |
| session = requests.Session() | |
| session.headers.update({ | |
| 'Host': 'app.bi.piao51.cn', | |
| 'Accept': 'application/json, text/javascript, */*; q=0.01', | |
| 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', | |
| }) | |
| login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' | |
| login_headers = { | |
| 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', | |
| 'Origin': 'https://app.bi.piao51.cn', | |
| } | |
| # 使用变量 | |
| login_data = { | |
| 'username': username, | |
| 'password': password, | |
| 'type': '1', | |
| 'resCode': res_code, | |
| 'deviceid': device_id, | |
| 'dtype': 'ios', | |
| } | |
| try: | |
| response_login = session.post(login_url, headers=login_headers, data=login_data, allow_redirects=False, | |
| timeout=15) | |
| if not (300 <= response_login.status_code < 400 and 'token' in session.cookies): | |
| st.error(f"登录步骤 1 失败,未能获取 Session Token。状态码: {response_login.status_code}") | |
| return None | |
| user_info_url = 'https://app.bi.piao51.cn/cinema-app/security/logined.action' | |
| response_user_info = session.get(user_info_url, timeout=10) | |
| response_user_info.raise_for_status() | |
| user_info = response_user_info.json() | |
| if user_info.get("success") and user_info.get("data", {}).get("token"): | |
| token_data = user_info['data'] | |
| if save_token(token_data): st.toast("登录成功,已获取并保存新 Token!", icon="🔑") | |
| return token_data | |
| else: | |
| st.error(f"登录步骤 2 失败,未能从 JSON 中提取 Token。响应: {user_info.get('msg')}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"登录请求过程中发生网络错误: {e}") | |
| return None | |
| # --- 1.2 API 数据抓取 (排片相关) --- | |
| def fetch_hall_info(token): | |
| url = 'https://cawapi.yinghezhong.com/showInfo/getShowHallInfo' | |
| params = {'token': token, '_': int(time.time() * 1000)} | |
| headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} | |
| response = requests.get(url, params=params, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('code') == 1 and data.get('data'): | |
| return {item['hallId']: item['seatNum'] for item in data['data']} | |
| else: | |
| raise Exception(f"获取影厅信息失败: {data.get('msg', '未知错误')}") | |
| def fetch_schedule_data(token, show_date): | |
| url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' | |
| params = {'showDate': show_date, 'token': token, '_': int(time.time() * 1000)} | |
| headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} | |
| response = requests.get(url, params=params, headers=headers, timeout=15) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('code') == 1: | |
| return data.get('data', []) | |
| elif data.get('code') == 500: | |
| raise ValueError("Token 可能已失效") | |
| else: | |
| raise Exception(f"获取排片数据失败: {data.get('msg', '未知错误')}") | |
| def get_api_data_with_token_management(show_date): | |
| token_data = load_token() | |
| token = token_data.get('token') if token_data else None | |
| if not token: | |
| token_data = login_and_get_token() | |
| if not token_data: return None, None | |
| token = token_data.get('token') | |
| try: | |
| schedule_list = fetch_schedule_data(token, show_date) | |
| hall_seat_map = fetch_hall_info(token) | |
| return schedule_list, hall_seat_map | |
| except ValueError: | |
| st.toast("Token 已失效,正在尝试重新登录并重试...", icon="🔄") | |
| token_data = login_and_get_token() | |
| if not token_data: return None, None | |
| token = token_data.get('token') | |
| try: | |
| schedule_list = fetch_schedule_data(token, show_date) | |
| hall_seat_map = fetch_hall_info(token) | |
| return schedule_list, hall_seat_map | |
| except Exception as e: | |
| st.error(f"重试获取数据失败: {e}"); | |
| return None, None | |
| except Exception as e: | |
| st.error(f"获取 API 数据时发生错误: {e}"); | |
| return None, None | |
| # --- 1.3 新增:电影名称API (获取标准电影名) --- | |
| def fetch_canonical_movie_names(token, date_str): | |
| """ | |
| 获取指定日期的官方电影名称列表(唯一名称)。 | |
| 用于后续对原始排片数据中的电影名进行标准化清洗。 | |
| """ | |
| url = 'https://app.bi.piao51.cn/cinema-app/mycinema/movieSellGross.action' | |
| params = { | |
| 'token': token, | |
| 'startDate': date_str, | |
| 'endDate': date_str, | |
| 'dateType': 'day', | |
| 'cinemaId': CINEMA_ID | |
| } | |
| headers = { | |
| 'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'jwt': '0', | |
| 'Accept': 'application/json, text/javascript, */*; q=0.01', | |
| 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', | |
| } | |
| try: | |
| response = requests.get(url, params=params, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('code') == 'A00000' and data.get('results'): | |
| # 提取 results 列表中的 movieName,排除 "总计" 等非电影项 | |
| names = [item['movieName'] for item in data['results'] if | |
| item.get('movieName') and item['movieName'] != '总计'] | |
| return names | |
| except Exception as e: | |
| # 这里的错误不打断主流程,返回空列表,后续会回退到基础清洗逻辑 | |
| print(f"获取标准电影名称失败: {e}") | |
| return [] | |
| def process_api_data(schedule_list, hall_seat_map, token=None, show_date=None): | |
| if not schedule_list: | |
| st.warning("未获取到任何排片数据。"); | |
| return pd.DataFrame() | |
| df = pd.DataFrame(schedule_list) | |
| df['座位数'] = df['hallId'].map(hall_seat_map).fillna(0).astype(int) | |
| df.rename(columns={'movieName': '影片名称', 'showStartTime': '放映时间', 'soldBoxOffice': '总收入', | |
| 'soldTicketNum': '总人次'}, inplace=True) | |
| # 获取标准电影名列表并进行清洗 | |
| canonical_names = [] | |
| if token and show_date: | |
| canonical_names = fetch_canonical_movie_names(token, show_date) | |
| df['影片名称'] = df['影片名称'].apply(lambda x: clean_movie_title(x, canonical_names)) | |
| required_cols = ['影片名称', '放映时间', '座位数', '总收入', '总人次'] | |
| df = df[required_cols] | |
| df.dropna(subset=['影片名称', '放映时间'], inplace=True) | |
| for col in ['座位数', '总收入', '总人次']: | |
| df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) | |
| df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M', errors='coerce').dt.time | |
| df.dropna(subset=['放映时间'], inplace=True) | |
| return df | |
| # --- 1.4 API 数据抓取 (销售相关) --- | |
| def fetch_sales_data_from_api(token, selected_date): | |
| """从API获取指定日期的销售数据""" | |
| url = 'https://app.bi.piao51.cn/cinema-app/mycinema/retailTop.action' | |
| date_str = selected_date.strftime('%Y-%m-%d') | |
| headers = { | |
| 'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'jwt': '0', | |
| 'Accept': 'application/json, text/javascript, */*; q=0.01', | |
| 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148', | |
| } | |
| params = { | |
| 'dateType': 'day', 'startDate': date_str, 'endDate': date_str, 'noEvent': '1', | |
| 'token': token, 'qTime': date_str, 'cinemaId': CINEMA_ID, | |
| } | |
| try: | |
| response = requests.get(url, params=params, headers=headers, timeout=15) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('code') == 'A00000': | |
| return data.get('results', []) | |
| elif "login" in response.text: | |
| raise ValueError("Token可能已失效") | |
| else: | |
| st.error(f"获取 API 数据失败: {data.get('msg', '未知错误')}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"API 请求网络错误: {e}") | |
| return None | |
| def get_sales_data_with_token_management(selected_date): | |
| """带Token管理的API销售数据获取流程""" | |
| token_data = load_token() | |
| token = token_data.get('token') if token_data else None | |
| if not token: | |
| token_data = login_and_get_token() | |
| if not token_data: return None | |
| token = token_data.get('token') | |
| try: | |
| api_results = fetch_sales_data_from_api(token, selected_date) | |
| return api_results | |
| except ValueError: | |
| st.toast("Token 已失效,正在尝试重新登录并重试...", icon="🔄") | |
| token_data = login_and_get_token() | |
| if not token_data: return None | |
| token = token_data.get('token') | |
| try: | |
| api_results = fetch_sales_data_from_api(token, selected_date) | |
| return api_results | |
| except Exception as e: | |
| st.error(f"重试获取数据失败: {e}") | |
| return None | |
| except Exception as e: | |
| st.error(f"获取 API 数据时发生错误: {e}") | |
| return None | |
| # --- 2. 核心数据分析模块 --- | |
| def clean_movie_title(raw_title, canonical_names=None): | |
| """ | |
| 电影名称标准化清洗函数 | |
| """ | |
| if not isinstance(raw_title, str): | |
| return raw_title | |
| base_name = None | |
| # 1. 尝试匹配标准名称 | |
| if canonical_names: | |
| # 按长度倒序排序,确保最长匹配优先(解决"你好"vs"你好明天"的问题) | |
| sorted_names = sorted(canonical_names, key=len, reverse=True) | |
| for name in sorted_names: | |
| if name in raw_title: | |
| base_name = name | |
| break | |
| # 2. 回退逻辑:如果没传列表或没匹配到,使用空格分割 | |
| if not base_name: | |
| base_name = raw_title.split(' ', 1)[0] | |
| # 3. 后缀追加逻辑 | |
| raw_upper = raw_title.upper() | |
| suffix = "" | |
| if "HDR LED" in raw_upper: | |
| suffix = "(HDR LED)" | |
| elif "CINITY" in raw_upper: | |
| suffix = "(CINITY)" | |
| elif "杜比" in raw_upper or "DOLBY" in raw_upper: | |
| suffix = "(杜比视界)" | |
| elif "IMAX" in raw_upper: | |
| if "3D" in raw_upper: | |
| suffix = "(数字IMAX3D)" | |
| else: | |
| suffix = "(数字IMAX)" | |
| elif "巨幕" in raw_upper: | |
| if "立体" in raw_upper: | |
| suffix = "(中国巨幕立体)" | |
| else: | |
| suffix = "(中国巨幕)" | |
| elif "3D" in raw_upper: | |
| suffix = "(数字3D)" | |
| # **修复**: 只有当 base_name 自身不包含该后缀时才添加 | |
| if suffix and suffix not in base_name: | |
| return f"{base_name}{suffix}" | |
| return base_name | |
| def style_efficiency(row): | |
| green, red = 'background-color: #E6F5E6;', 'background-color: #FFE5E5;' | |
| seat_eff, session_eff = row.get('座次效率', 0), row.get('场次效率', 0) | |
| if seat_eff > 1.5 or session_eff > 1.5: return [green] * len(row) | |
| if seat_eff < 0.5 or session_eff < 0.5: return [red] * len(row) | |
| return [''] * len(row) | |
| def style_summary_efficiency(row): | |
| green, red = 'background-color: #E6F5E6;', 'background-color: #FFE5E5;' | |
| if (row.get('全部座次效率', 0) > 1.5 or row.get('全部场次效率', 0) > 1.5 or | |
| row.get('黄金时段座次效率', 0) > 1.5 or row.get('黄金时段场次效率', 0) > 1.5): | |
| return [green] * len(row) | |
| if (row.get('全部座次效率', 0) < 0.5 or row.get('全部场次效率', 0) < 0.5 or | |
| row.get('黄金时段座次效率', 0) < 0.5 or row.get('黄金时段场次效率', 0) < 0.5): | |
| return [red] * len(row) | |
| return [''] * len(row) | |
| def process_and_analyze_data(df): | |
| if df.empty: return pd.DataFrame() | |
| # 确保有清洗后的列名 | |
| if '影片名称_清理后' not in df.columns and '影片名称' in df.columns: | |
| df['影片名称_清理后'] = df['影片名称'] | |
| analysis_df = df.groupby('影片名称_清理后').agg(座位数=('座位数', 'sum'), 场次=('影片名称_清理后', 'size'), | |
| 票房=('总收入', 'sum'), 人次=('总人次', 'sum')).reset_index() | |
| analysis_df.rename(columns={'影片名称_清理后': '影片'}, inplace=True) | |
| analysis_df = analysis_df.sort_values(by='票房', ascending=False).reset_index(drop=True) | |
| total_seats, total_sessions, total_revenue = analysis_df['座位数'].sum(), analysis_df['场次'].sum(), analysis_df[ | |
| '票房'].sum() | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| analysis_df['均价'] = np.divide(analysis_df['票房'], analysis_df['人次']).fillna(0) | |
| analysis_df['座次比'] = np.divide(analysis_df['座位数'], total_seats).fillna(0) | |
| analysis_df['场次比'] = np.divide(analysis_df['场次'], total_sessions).fillna(0) | |
| analysis_df['票房比'] = np.divide(analysis_df['票房'], total_revenue).fillna(0) | |
| analysis_df['座次效率'] = np.divide(analysis_df['票房比'], analysis_df['座次比']).fillna(0) | |
| analysis_df['场次效率'] = np.divide(analysis_df['票房比'], analysis_df['场次比']).fillna(0) | |
| final_cols = ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率', | |
| '场次效率'] | |
| return analysis_df[final_cols] | |
| # --- 2.1 销售数据处理与分析模块 --- | |
| def transform_api_data_to_df(api_results): | |
| """将API返回的JSON列表转换为与Excel格式一致的DataFrame""" | |
| if not api_results: return pd.DataFrame() | |
| records = [] | |
| for item in api_results: | |
| is_package = item.get('goodsAllName') and str(item.get('goodsAllName')).strip() != "" | |
| record = { | |
| '售卖位置': '线上渠道', | |
| '一级分类': '套餐' if is_package else '单品', | |
| '售卖键名称': item.get('goodsName'), | |
| '数量': item.get('goodsSoldNums', 0), | |
| '实收总金额': item.get('goodsSoldIncomes', 0) | |
| } | |
| records.append(record) | |
| return pd.DataFrame(records) | |
| def process_sales_data(df): | |
| """核心分析函数,处理DataFrame并返回最终结果""" | |
| if df.empty: | |
| st.warning("没有可供分析的数据。") | |
| return None, "" | |
| required_columns = ['售卖位置', '一级分类', '售卖键名称', '数量', '实收总金额'] | |
| if not all(col in df.columns for col in required_columns): | |
| missing_cols = [col for col in required_columns if col not in df.columns] | |
| st.error(f"数据缺少必要的列: {', '.join(missing_cols)}。") | |
| return None, "" | |
| for col in ['数量', '实收总金额']: | |
| df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) | |
| df.dropna(subset=['售卖键名称', '一级分类'], inplace=True) | |
| df['售卖位置'] = df['售卖位置'].fillna('未知渠道') | |
| df_package = df[df['一级分类'] == '套餐'].copy() | |
| df_non_package = df[df['一级分类'] != '套餐'].copy() | |
| if not df_package.empty: | |
| package_summary = df_package.groupby(['售卖位置', '售卖键名称']).agg( | |
| {'数量': 'sum', '实收总金额': 'sum'}).reset_index() | |
| package_summary = package_summary[package_summary['数量'] != 0] | |
| top_10_package = package_summary.sort_values(by='实收总金额', ascending=False).head(10)[ | |
| ['售卖键名称', '数量', '实收总金额']] | |
| else: | |
| top_10_package = pd.DataFrame(columns=['售卖键名称', '数量', '实收总金额']) | |
| if not df_non_package.empty: | |
| non_package_summary = df_non_package.groupby(['售卖位置', '售卖键名称']).agg( | |
| {'数量': 'sum', '实收总金额': 'sum'}).reset_index() | |
| non_package_summary = non_package_summary[non_package_summary['数量'] != 0] | |
| top_5_non_package = non_package_summary.sort_values(by='实收总金额', ascending=False).head(5)[ | |
| ['售卖键名称', '数量', '实收总金额']] | |
| else: | |
| top_5_non_package = pd.DataFrame(columns=['售卖键名称', '数量', '实收总金额']) | |
| summary_df = pd.concat([top_10_package, top_5_non_package], ignore_index=True) | |
| final_display_df = pd.DataFrame(index=range(15), columns=['售卖键名称', '数量', '实收总金额']) | |
| final_display_df['售卖键名称'] = '' | |
| final_display_df['数量'] = np.nan | |
| final_display_df['实收总金额'] = np.nan | |
| if not summary_df.empty: | |
| final_display_df.iloc[:len(summary_df)] = summary_df.to_numpy() | |
| copy_df = final_display_df.copy() | |
| # **修改**: 先转换为 object 类型再填充,避免 FutureWarning | |
| copy_df = copy_df.astype(object) | |
| copy_df.fillna('', inplace=True) | |
| copy_text = copy_df.to_csv(sep='\t', index=False, header=False) | |
| return final_display_df, copy_text | |
| # --- 2.2 影片映出日累计报表数据处理模块 --- | |
| def process_and_filter_data_for_report(schedule_list, hall_seat_map, selected_date_str, token=None): | |
| """核心数据处理函数 for 影片映出日累计报表""" | |
| if not schedule_list: | |
| st.warning("未获取到任何排片数据。") | |
| return pd.DataFrame() | |
| df = pd.DataFrame(schedule_list) | |
| # 过滤掉观影人数为0的场次 | |
| df['soldTicketNum'] = pd.to_numeric(df['soldTicketNum'], errors='coerce').fillna(0) | |
| df = df[df['soldTicketNum'] > 0].copy() | |
| if df.empty: | |
| st.info("所有场次的观影人数均为 0,没有可显示的数据。") | |
| return pd.DataFrame() | |
| # 获取标准电影名并清洗 | |
| canonical_names = [] | |
| if token and selected_date_str: | |
| canonical_names = fetch_canonical_movie_names(token, selected_date_str) | |
| df['影片'] = df['movieName'].apply(lambda x: clean_movie_title(x, canonical_names)) | |
| df['座位数'] = df['hallId'].map(hall_seat_map).fillna(0).astype(int) | |
| # 计算上座率 | |
| with np.errstate(divide='ignore', invalid='ignore'): | |
| df['上座率%'] = np.divide(df['soldTicketNum'], df['座位数']) * 100 | |
| df['上座率%'] = df['上座率%'].fillna(0) | |
| df.rename(columns={'showStartTime': '放映时间', 'hallName': '影厅', 'soldTicketNum': '人数合计'}, inplace=True) | |
| df['放映日期'] = selected_date_str | |
| final_cols = ['影片', '放映日期', '放映时间', '影厅', '人数合计', '座位数', '上座率%'] | |
| result_df = df[final_cols] | |
| result_df = result_df.sort_values(by='放映时间').reset_index(drop=True) | |
| return result_df | |
| # --- 2.3 打印功能数据处理与布局模块 --- | |
| def get_font_properties(font_path, size=14): | |
| """通用字体加载函数""" | |
| if font_path and os.path.exists(font_path): | |
| return font_manager.FontProperties(fname=font_path, size=size) | |
| else: | |
| st.warning(f"警告:未找到字体文件 '{font_path}',显示可能不正确。将使用默认字体。") | |
| return font_manager.FontProperties(family='sans-serif', size=size) | |
| def get_pinyin_abbr(text): | |
| """获取中文文本前两个字的拼音首字母""" | |
| if not text: return "" | |
| chars = [c for c in text if '\u4e00' <= c <= '\u9fff'][:2] | |
| if not chars: return "" | |
| pinyin_list = lazy_pinyin(chars, style=Style.FIRST_LETTER) | |
| return ''.join(pinyin_list).upper() | |
| def format_seq(n): | |
| """将数字或字符转换为带圈序号 (①, ②, ③...),非数字则直接返回""" | |
| try: | |
| n = int(n) | |
| except (ValueError, TypeError): | |
| return str(n) | |
| if n <= 0: return str(n) | |
| circled_chars = "①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳㉑㉒㉓㉔㉕㉖㉗㉘㉙㉚㉛㉜㉝㉞㉟㊱㊲㊳㊴㊵㊶㊷㊸㊹㊺㊻㊼㊽㊾㊿" | |
| if 1 <= n <= 50: return circled_chars[n - 1] | |
| return f'({n})' | |
| def process_schedule_df(df, base_date, split_time_str, time_adjustment_minutes=0): | |
| """ | |
| 处理排片DataFrame,生成LED屏数据和散场时间数据 | |
| """ | |
| if df is None or df.empty: | |
| return None, None, None | |
| # 'LED屏排片表' 数据处理 | |
| led_df = df.copy() | |
| try: | |
| # 优先提取 'X号',失败则取第一个字符 + '号' | |
| extracted = led_df['Hall'].astype(str).str.extract(r'(\d+号)') | |
| fallback = led_df['Hall'].astype(str).str[0] + '号' | |
| led_df['Hall'] = extracted[0].fillna(fallback) | |
| led_df['StartTime_dt'] = pd.to_datetime(led_df['StartTime'], format='%H:%M', errors='coerce').apply( | |
| lambda t: t.replace(year=base_date.year, month=base_date.month, day=base_date.day) if pd.notnull(t) else t) | |
| led_df['EndTime_dt'] = pd.to_datetime(led_df['EndTime'], format='%H:%M', errors='coerce').apply( | |
| lambda t: t.replace(year=base_date.year, month=base_date.month, day=base_date.day) if pd.notnull(t) else t) | |
| led_df.loc[led_df['EndTime_dt'] < led_df['StartTime_dt'], 'EndTime_dt'] += timedelta(days=1) | |
| led_df = led_df.sort_values(['Hall', 'StartTime_dt']) | |
| merged_rows = [] | |
| for _, group in led_df.groupby('Hall'): | |
| current = None | |
| for _, row in group.sort_values('StartTime_dt').iterrows(): | |
| if current is None: | |
| current = row.copy() | |
| elif row['Movie'] == current['Movie']: | |
| current['EndTime_dt'] = row['EndTime_dt'] | |
| else: | |
| merged_rows.append(current) | |
| current = row.copy() | |
| if current is not None: merged_rows.append(current) | |
| merged_df = pd.DataFrame(merged_rows) | |
| merged_df['StartTime_dt'] -= timedelta(minutes=10) | |
| merged_df['EndTime_dt'] -= timedelta(minutes=5) | |
| merged_df['Seq'] = merged_df.groupby('Hall').cumcount() + 1 | |
| merged_df['StartTime_str'] = merged_df['StartTime_dt'].dt.strftime('%H:%M') | |
| merged_df['EndTime_str'] = merged_df['EndTime_dt'].dt.strftime('%H:%M') | |
| led_schedule_df = merged_df[['Hall', 'Seq', 'Movie', 'StartTime_str', 'EndTime_str']] | |
| except Exception as e: | |
| st.error(f"处理 'LED 屏打印表' 数据时出错: {e}") | |
| led_schedule_df = None | |
| # '散场时间快捷打印' 数据处理 | |
| times_df = df.copy() | |
| try: | |
| # 优先提取数字,失败则取第一个字符 | |
| num_part = times_df['Hall'].str.extract(r'(\d+)')[0] | |
| char_part = times_df['Hall'].astype(str).str[0] | |
| times_df['Hall'] = num_part.fillna(char_part) | |
| times_df.dropna(subset=['Hall', 'StartTime', 'EndTime'], inplace=True) | |
| times_df['StartTime_dt'] = pd.to_datetime(times_df['StartTime'], format='%H:%M', errors='coerce').apply( | |
| lambda t: datetime.combine(base_date, t.time()) if pd.notnull(t) else pd.NaT) | |
| times_df['EndTime_dt'] = pd.to_datetime(times_df['EndTime'], format='%H:%M', errors='coerce').apply( | |
| lambda t: datetime.combine(base_date, t.time()) if pd.notnull(t) else pd.NaT) | |
| times_df.loc[times_df['EndTime_dt'] < times_df['StartTime_dt'], 'EndTime_dt'] += timedelta(days=1) | |
| # 应用时间提前量 | |
| if time_adjustment_minutes > 0: | |
| times_df['EndTime_dt'] -= timedelta(minutes=time_adjustment_minutes) | |
| business_start_dt = datetime.combine(base_date, datetime.strptime(BUSINESS_START, "%H:%M").time()) | |
| business_end_dt = datetime.combine(base_date, datetime.strptime(BUSINESS_END, "%H:%M").time()) | |
| if business_end_dt < business_start_dt: business_end_dt += timedelta(days=1) | |
| times_df = times_df[(times_df['EndTime_dt'] >= business_start_dt) & (times_df['EndTime_dt'] <= business_end_dt)] | |
| times_df = times_df.sort_values('EndTime_dt') | |
| split_dt = datetime.combine(base_date, split_time_str) | |
| part1 = times_df[times_df['EndTime_dt'] <= split_dt].copy() | |
| part2 = times_df[times_df['EndTime_dt'] > split_dt].copy() | |
| # 使用 %H:%M 保证两位小时 | |
| part1['EndTime'] = part1['EndTime_dt'].dt.strftime('%H:%M') | |
| part2['EndTime'] = part2['EndTime_dt'].dt.strftime('%H:%M') | |
| times_part1_df = part1[['Hall', 'EndTime']] | |
| times_part2_df = part2[['Hall', 'EndTime']] | |
| except Exception as e: | |
| st.error(f"处理 '散场时间表' 数据时出错: {e}") | |
| times_part1_df, times_part2_df = None, None | |
| return led_schedule_df, times_part1_df, times_part2_df | |
| def process_file_upload(file, split_time_str, time_adjustment_minutes=0): | |
| """ | |
| Handles file upload, reads excel and calls the core processing function. | |
| """ | |
| try: | |
| date_df = pd.read_excel(file, header=None, skiprows=7, nrows=1, usecols=[3]) | |
| date_str = pd.to_datetime(date_df.iloc[0, 0]).strftime('%Y-%m-%d') | |
| base_date = pd.to_datetime(date_str).date() | |
| except Exception: | |
| date_str = datetime.today().strftime('%Y-%m-%d') | |
| base_date = datetime.today().date() | |
| try: | |
| df = pd.read_excel(file, header=9, usecols=[1, 2, 4, 5]) | |
| df.columns = ['Hall', 'StartTime', 'EndTime', 'Movie'] | |
| df['Hall'] = df['Hall'].ffill() | |
| df.dropna(subset=['StartTime', 'EndTime', 'Movie'], inplace=True) | |
| except Exception as e: | |
| st.error(f"读取数据时出错: {e}。请检查文件格式是否为'放映时间核对表'。") | |
| return None, None, None, date_str | |
| led_data, times_p1, times_p2 = process_schedule_df(df, base_date, split_time_str, time_adjustment_minutes) | |
| return led_data, times_p1, times_p2, date_str | |
| def create_print_layout_led(data, date_str, font_path, generate_png=False): | |
| """生成LED屏排片表的PDF/PNG""" | |
| if data is None or data.empty: return None | |
| A4_width_in, A4_height_in = 8.27, 11.69 | |
| dpi = 300 | |
| total_content_rows = len(data) | |
| layout_rows = max(total_content_rows, 25) | |
| totalA = layout_rows + 2 | |
| row_height = A4_height_in / totalA | |
| data = data.reset_index(drop=True) | |
| data['hall_str'] = '$' + data['Hall'].str.replace('号', '') + '^{\\#}$' | |
| data['seq_str'] = data['Seq'].apply(format_seq) | |
| data['pinyin_abbr'] = data['Movie'].apply(get_pinyin_abbr) | |
| data['time_str'] = data['StartTime_str'] + ' - ' + data['EndTime_str'] | |
| temp_fig = plt.figure(figsize=(A4_width_in, A4_height_in), dpi=dpi) | |
| renderer = temp_fig.canvas.get_renderer() | |
| base_font_size_pt = (row_height * 0.9) * 72 | |
| seq_font_size_pt = (row_height * 0.5) * 72 | |
| def get_col_width_in(series, font_size_pt, is_math=False): | |
| if series.empty: return 0 | |
| font_prop = get_font_properties(font_path, font_size_pt) | |
| longest_str_idx = series.astype(str).str.len().idxmax() | |
| max_content = str(series.loc[longest_str_idx]) | |
| text_width_px, _, _ = renderer.get_text_width_height_descent(max_content, font_prop, ismath=is_math) | |
| return (text_width_px / dpi) * 1.1 | |
| margin_col_width = row_height | |
| hall_col_width = get_col_width_in(data['hall_str'], base_font_size_pt, is_math=True) | |
| seq_col_width = get_col_width_in(data['seq_str'], seq_font_size_pt) | |
| pinyin_col_width = get_col_width_in(data['pinyin_abbr'], base_font_size_pt) | |
| time_col_width = get_col_width_in(data['time_str'], base_font_size_pt) | |
| movie_col_width = A4_width_in - ( | |
| margin_col_width * 2 + hall_col_width + seq_col_width + pinyin_col_width + time_col_width) | |
| plt.close(temp_fig) | |
| col_widths = {'hall': hall_col_width, 'seq': seq_col_width, 'movie': movie_col_width, 'pinyin': pinyin_col_width, | |
| 'time': time_col_width} | |
| col_x_starts = {} | |
| current_x = margin_col_width | |
| for col_name in ['hall', 'seq', 'movie', 'pinyin', 'time']: | |
| col_x_starts[col_name] = current_x | |
| current_x += col_widths[col_name] | |
| def draw_figure(fig, ax): | |
| renderer = fig.canvas.get_renderer() | |
| for col_name in ['hall', 'seq', 'movie', 'pinyin']: | |
| x_line = col_x_starts[col_name] + col_widths[col_name] | |
| line_top_y, line_bottom_y = A4_height_in - row_height, row_height | |
| ax.add_line( | |
| Line2D([x_line, x_line], [line_bottom_y, line_top_y], color='gray', linestyle=':', linewidth=0.5)) | |
| last_hall_drawn = None | |
| for i, row in data.iterrows(): | |
| y_bottom = A4_height_in - (i + 2) * row_height | |
| y_center = y_bottom + row_height / 2 | |
| if row['Hall'] != last_hall_drawn: | |
| ax.text(col_x_starts['hall'] + col_widths['hall'] / 2, y_center, row['hall_str'], | |
| fontproperties=get_font_properties(font_path, base_font_size_pt), ha='center', va='center') | |
| last_hall_drawn = row['Hall'] | |
| ax.text(col_x_starts['seq'] + col_widths['seq'] / 2, y_center, row['seq_str'], | |
| fontproperties=get_font_properties(font_path, seq_font_size_pt), ha='center', va='center') | |
| ax.text(col_x_starts['pinyin'] + col_widths['pinyin'] / 2, y_center, row['pinyin_abbr'], | |
| fontproperties=get_font_properties(font_path, base_font_size_pt), ha='center', va='center') | |
| ax.text(col_x_starts['time'] + col_widths['time'] / 2, y_center, row['time_str'], | |
| fontproperties=get_font_properties(font_path, base_font_size_pt), ha='center', va='center') | |
| movie_font_size = base_font_size_pt | |
| movie_font_prop = get_font_properties(font_path, movie_font_size) | |
| text_w_px, _, _ = renderer.get_text_width_height_descent(row['Movie'], movie_font_prop, ismath=False) | |
| text_w_in = text_w_px / dpi | |
| max_width_in = col_widths['movie'] * 0.9 | |
| if text_w_in > max_width_in: | |
| movie_font_size *= (max_width_in / text_w_in) | |
| movie_font_prop = get_font_properties(font_path, movie_font_size) | |
| ax.text(col_x_starts['movie'] + 0.05, y_center, row['Movie'], fontproperties=movie_font_prop, ha='left', | |
| va='center') | |
| is_last_in_hall = (i == len(data) - 1) or (row['Hall'] != data.loc[i + 1, 'Hall']) | |
| line_start_x, line_end_x = margin_col_width, A4_width_in - margin_col_width | |
| if is_last_in_hall: | |
| ax.add_line(Line2D([line_start_x, line_end_x], [y_bottom, y_bottom], color='black', linestyle='-', | |
| linewidth=0.8)) | |
| else: | |
| ax.add_line(Line2D([line_start_x, line_end_x], [y_bottom, y_bottom], color='gray', linestyle=':', | |
| linewidth=0.5)) | |
| outputs = {} | |
| fig = plt.figure(figsize=(A4_width_in, A4_height_in), dpi=300) | |
| ax = fig.add_axes([0, 0, 1, 1]) | |
| ax.set_axis_off(); | |
| ax.set_xlim(0, A4_width_in); | |
| ax.set_ylim(0, A4_height_in) | |
| ax.text(margin_col_width, A4_height_in - row_height, date_str, fontproperties=get_font_properties(font_path, 10), | |
| color=DATE_COLOR, ha='left', va='bottom', transform=ax.transData) | |
| draw_figure(fig, ax) | |
| pdf_buf = io.BytesIO() | |
| fig.savefig(pdf_buf, format='pdf', dpi=dpi, bbox_inches='tight', pad_inches=0) | |
| pdf_buf.seek(0) | |
| outputs['pdf'] = f"data:application/pdf;base64,{base64.b64encode(pdf_buf.getvalue()).decode()}" | |
| if generate_png: | |
| png_buf = io.BytesIO() | |
| fig.savefig(png_buf, format='png', dpi=dpi, bbox_inches='tight', pad_inches=0) | |
| png_buf.seek(0) | |
| outputs['png'] = f"data:image/png;base64,{base64.b64encode(png_buf.getvalue()).decode()}" | |
| plt.close(fig) | |
| return outputs | |
| def create_print_layout_times(data, title, date_str, font_path, size_multiplier=1.1, hall_format='Default', | |
| generate_png=False): | |
| """生成散场时间表的PDF/PNG""" | |
| if data is None or data.empty: return None | |
| def generate_figure(): | |
| total_items = len(data) | |
| num_rows = math.ceil(total_items / NUM_COLS) if total_items > 0 else 1 | |
| data_area_height_in, cell_width_in = A5_HEIGHT_IN, A5_WIDTH_IN / NUM_COLS | |
| cell_height_in = data_area_height_in / num_rows | |
| target_width_pt, target_height_pt = (cell_width_in * 0.9) * 72, (cell_height_in * 0.9) * 72 | |
| font_size_based_on_width = target_width_pt / (8 * 0.6) | |
| base_fontsize = min(font_size_based_on_width, target_height_pt) * size_multiplier | |
| fig = plt.figure(figsize=(A5_WIDTH_IN, A5_HEIGHT_IN), dpi=300) | |
| fig.subplots_adjust(left=0, right=1, top=1, bottom=0) | |
| gs = gridspec.GridSpec(num_rows, NUM_COLS, hspace=0, wspace=0, figure=fig) | |
| data_values = data.values.tolist() | |
| while len(data_values) % NUM_COLS != 0: data_values.append(['', '']) | |
| rows_per_col_layout = math.ceil(len(data_values) / NUM_COLS) | |
| sorted_data = [['', '']] * len(data_values) | |
| for i, item in enumerate(data_values): | |
| if item[0] and item[1]: | |
| row_in_col, col_idx = i % rows_per_col_layout, i // rows_per_col_layout | |
| new_index = row_in_col * NUM_COLS + col_idx | |
| if new_index < len(sorted_data): sorted_data[new_index] = item | |
| is_first_cell_with_data = True | |
| for idx, (hall, end_time) in enumerate(sorted_data): | |
| if hall and end_time: | |
| row_grid, col_grid = idx // NUM_COLS, idx % NUM_COLS | |
| ax = fig.add_subplot(gs[row_grid, col_grid]) | |
| for spine in ax.spines.values(): | |
| spine.set_visible(True); | |
| spine.set_linestyle((0, (1, 2))) | |
| spine.set_color(BORDER_COLOR); | |
| spine.set_linewidth(0.75) | |
| if is_first_cell_with_data: | |
| ax.text(0.05, 0.95, f"{date_str} {title}", | |
| fontproperties=get_font_properties(font_path, base_fontsize * 0.5), color=DATE_COLOR, | |
| ha='left', va='top', transform=ax.transAxes) | |
| is_first_cell_with_data = False | |
| if hall_format == 'Superscript': | |
| display_text = f'${str(hall)}^{{\\#}}$ {end_time}' | |
| elif hall_format == 'Circled': | |
| display_text = f'{format_seq(hall)} {end_time}' | |
| else: # Default | |
| display_text = f"{str(hall)} {end_time}" | |
| ax.text(0.5, 0.5, display_text, fontproperties=get_font_properties(font_path, base_fontsize), | |
| ha='center', va='center', transform=ax.transAxes) | |
| ax.set_xticks([]); | |
| ax.set_yticks([]); | |
| ax.set_facecolor('none') | |
| return fig | |
| fig_for_output = generate_figure() | |
| outputs = {} | |
| pdf_buffer = io.BytesIO() | |
| with PdfPages(pdf_buffer) as pdf: | |
| pdf.savefig(fig_for_output) | |
| pdf_buffer.seek(0) | |
| outputs['pdf'] = f"data:application/pdf;base64,{base64.b64encode(pdf_buffer.getvalue()).decode()}" | |
| if generate_png: | |
| png_buffer = io.BytesIO() | |
| fig_for_output.savefig(png_buffer, format='png') | |
| png_buffer.seek(0) | |
| outputs['png'] = f'data:image/png;base64,{base64.b64encode(png_buffer.getvalue()).decode()}' | |
| plt.close(fig_for_output) | |
| return outputs | |
| def display_pdf(base64_pdf): | |
| """在Streamlit中嵌入显示PDF""" | |
| return f'<iframe src="{base64_pdf}" width="100%" height="800" type="application/pdf"></iframe>' | |
| # --- 3. TMS 及天气查询模块 --- | |
| def fetch_and_process_server_movies(priority_movie_titles=None): | |
| if priority_movie_titles is None: priority_movie_titles = [] | |
| # 获取环境变量 | |
| app_secret = os.getenv("TMS_APP_SECRET") | |
| ticket = os.getenv("TMS_TICKET") | |
| theater_id_str = os.getenv("TMS_THEATER_ID") | |
| x_session_id = os.getenv("TMS_X_SESSION_ID") | |
| # 转换 ID 为整数 | |
| try: | |
| theater_id = int(theater_id_str) if theater_id_str else 0 | |
| except ValueError: | |
| st.error("环境变量 TMS_THEATER_ID 格式错误,应为数字。") | |
| return {}, [] | |
| token_headers = { | |
| 'Host': 'oa.hengdianfilm.com:7080', 'Content-Type': 'application/json', | |
| 'Origin': 'http://115.239.253.233:7080', 'Connection': 'keep-alive', | |
| 'Accept': 'application/json, text/javascript, */*; q=0.01', | |
| 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_5_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/138.0.7204.156 Mobile/15E148 Safari/604.1', | |
| 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', | |
| } | |
| # 使用变量 | |
| token_json_data = {'appId': 'hd', 'appSecret': app_secret, 'timeStamp': int(time.time() * 1000)} | |
| # 动态构建 URL | |
| token_url = f'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket={ticket}' | |
| response = requests.post(token_url, headers=token_headers, json=token_json_data, timeout=10) | |
| response.raise_for_status() | |
| token_data = response.json() | |
| if token_data.get('error_code') != '0000': raise Exception(f"获取Token失败: {token_data.get('error_desc')}") | |
| auth_token = token_data['param'] | |
| all_movies, page_index = [], 1 | |
| while True: | |
| list_headers = { | |
| 'Accept': 'application/json, text/javascript, */*; q=0.01', | |
| 'Content-Type': 'application/json; charset=UTF-8', | |
| 'Origin': 'http://115.239.253.233:7080', 'Proxy-Connection': 'keep-alive', | |
| 'Token': auth_token, | |
| 'User-Agent': 'Mozilla/5.0 ...', | |
| 'X-SESSIONID': x_session_id, # 使用变量 | |
| } | |
| list_params = {'token': 'hd', 'murl': 'ContentMovie'} | |
| # 使用变量 | |
| list_json_data = {'THEATER_ID': theater_id, 'SOURCE': 'SERVER', 'ASSERT_TYPE': 2, 'PAGE_CAPACITY': 20, | |
| 'PAGE_INDEX': page_index} | |
| list_url = 'http://oa.hengdianfilm.com:7080/cinema-api/cinema/server/dcp/list' | |
| response = requests.post(list_url, params=list_params, headers=list_headers, json=list_json_data, verify=False) | |
| response.raise_for_status() | |
| movie_data = response.json() | |
| if movie_data.get("RSPCD") != "000000": raise Exception(f"获取影片列表失败: {movie_data.get('RSPMSG')}") | |
| body = movie_data.get("BODY", {}); | |
| movies_on_page = body.get("LIST", []) | |
| if not movies_on_page: break | |
| all_movies.extend(movies_on_page) | |
| if len(all_movies) >= body.get("COUNT", 0): break | |
| page_index += 1; | |
| time.sleep(1) | |
| movie_details = {m.get('CONTENT_NAME'): {'assert_name': m.get('ASSERT_NAME'), | |
| 'halls': sorted([h.get('HALL_NAME') for h in m.get('HALL_INFO', [])]), | |
| 'play_time': m.get('PLAY_TIME')} for m in all_movies if | |
| m.get('CONTENT_NAME')} | |
| by_hall = defaultdict(list) | |
| for content_name, details in movie_details.items(): | |
| for hall_name in details['halls']: | |
| by_hall[hall_name].append({'content_name': content_name, 'details': details}) | |
| for hall_name in by_hall: | |
| by_hall[hall_name].sort( | |
| key=lambda item: (item['details']['assert_name'] is None or item['details']['assert_name'] == '', | |
| item['details']['assert_name'] or item['content_name'])) | |
| view2_list = [{'assert_name': d['assert_name'], 'content_name': c, 'halls': d['halls'], 'play_time': d['play_time']} | |
| for c, d in movie_details.items() if d.get('assert_name')] | |
| priority_list = [item for item in view2_list if | |
| any(p_title in item['assert_name'] for p_title in priority_movie_titles)] | |
| other_list_items = [item for item in view2_list if item not in priority_list] | |
| priority_list.sort(key=lambda x: x['assert_name']); | |
| other_list_items.sort(key=lambda x: x['assert_name']) | |
| final_sorted_list = priority_list + other_list_items | |
| return dict(sorted(by_hall.items())), final_sorted_list | |
| def get_weather_forecast(target_date): | |
| """获取指定日期的天气信息并格式化为标题字符串""" | |
| if not target_date: | |
| return "当日放映影片" | |
| url = "https://restapi.amap.com/v3/weather/weatherInfo" | |
| params = {'key': GAODE_API_KEY, 'city': ADCODE, 'extensions': 'all', 'output': 'JSON'} | |
| try: | |
| response = requests.get(url, params=params, timeout=5) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('status') == '1' and data.get('forecasts'): | |
| target_date_str = target_date.strftime('%Y-%m-%d') | |
| for day_cast in data['forecasts'][0].get('casts', []): | |
| if day_cast.get('date') == target_date_str: | |
| weekday_map = {'1': '一', '2': '二', '3': '三', '4': '四', '5': '五', '6': '六', '7': '日'} | |
| week = weekday_map.get(day_cast.get('week'), '') | |
| weather = day_cast.get('dayweather', '未知') | |
| max_temp = day_cast.get('daytemp', 'N/A') | |
| min_temp = day_cast.get('nighttemp', 'N/A') | |
| return f"今日放映影片({target_date_str},星期{week},{weather},{max_temp}℃ / {min_temp}℃)" | |
| except Exception as e: | |
| print(f"天气 API 请求失败: {e}") | |
| return f"今日放映影片({target_date.strftime('%Y-%m-%d')},天气未知)" | |
| def get_circled_number(hall_name): | |
| mapping = {'1': '①', '2': '②', '3': '③', '4': '④', '5': '⑤', '6': '⑥', '7': '⑦', '8': '⑧', '9': '⑨'} | |
| num_str = ''.join(filter(str.isdigit, hall_name)); | |
| return mapping.get(num_str, '') | |
| def format_play_time(time_str): | |
| if not time_str or not isinstance(time_str, str): return None | |
| try: | |
| parts = time_str.split(':'); | |
| hours = int(parts[0]); | |
| minutes = int(parts[1]) | |
| return hours * 60 + minutes | |
| except (ValueError, IndexError): | |
| return None | |
| def add_tms_locations_to_analysis(analysis_df, tms_movie_list): | |
| locations = [] | |
| for _, row in analysis_df.iterrows(): | |
| movie_title = row['影片'] | |
| found_versions = [] | |
| for tms_movie in tms_movie_list: | |
| if tms_movie['assert_name'].startswith(movie_title): | |
| version_name = tms_movie['assert_name'].replace(movie_title, '').strip() | |
| circled_halls = " ".join(sorted([get_circled_number(h) for h in tms_movie['halls']])) | |
| found_versions.append(f"{version_name}:{circled_halls}" if version_name else circled_halls) | |
| locations.append('|'.join(found_versions)) | |
| analysis_df['影片所在影厅位置'] = locations | |
| return analysis_df | |
| # --- 4. 新增的经营数据API函数 --- | |
| def fetch_income_data(token, date_str): | |
| """获取指定日期的票房、卖品收入和卖品占比""" | |
| url = 'https://app.bi.piao51.cn/cinema-app/mycinema/incomeProportion.action' | |
| params = {'cinemaId': CINEMA_ID, 'token': token, 'qTimeStart': date_str, 'qTimeEnd': date_str} | |
| headers = {'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'User-Agent': 'Mozilla/5.0'} | |
| try: | |
| response = requests.get(url, params=params, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('code') == 'A00000' and data.get('results'): | |
| day_data = data['results'][0] | |
| return { | |
| "ticket_income": float(day_data.get('ticketIncome') or 0), | |
| "goods_income": float(day_data.get('goodsIncome') or 0), | |
| "sold_incomes_zb": float(day_data.get('soldIncomesZb') or 0) | |
| } | |
| except Exception as e: | |
| st.warning(f"获取经营收入数据失败: {e}") | |
| return None | |
| def fetch_membership_data(token, date_str): | |
| """获取指定日期的文旅卡开卡数""" | |
| url = 'https://app.bi.piao51.cn/cinema-app/mycinema/membership.action' | |
| params = {'token': token, 'cinemaId': CINEMA_ID, 'startDate': date_str, 'endDate': date_str} | |
| headers = {'Host': 'app.bi.piao51.cn', 'X-Requested-With': 'XMLHttpRequest', 'User-Agent': 'Mozilla/5.0'} | |
| try: | |
| response = requests.get(url, params=params, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('code') == 'A00000' and data.get('results'): | |
| for card_type in data['results']: | |
| if card_type.get('cardLevelName') == '文旅消费卡': | |
| return int(card_type.get('sendCardNums', 0)) | |
| return 0 # 没找到文旅卡 | |
| except Exception as e: | |
| st.warning(f"获取会员开卡数据失败: {e}") | |
| return 0 | |
| # --- 6. 新增:场次合理性检查日志函数 --- | |
| def generate_schedule_check_logs(schedule_list, date_str): | |
| """ | |
| 生成场次合理性检查日志 | |
| :param schedule_list: 排片数据列表 (list of dicts) | |
| :param date_str: 排片日期字符串 (YYYY-MM-DD) | |
| :return: 格式化后的日志文本 (str) | |
| """ | |
| if not schedule_list: | |
| return "无排片数据,无法进行合理性检查。" | |
| # 转换为 DataFrame 方便处理 | |
| df_original = pd.DataFrame(schedule_list) | |
| # 重命名列以符合逻辑处理习惯 | |
| df_original.rename( | |
| columns={'hallName': 'Hall', 'movieName': 'filmName', 'showStartTime': 'StartTime', 'showEndTime': 'EndTime'}, | |
| inplace=True) | |
| # 预处理:转换时间列,简化影厅名 | |
| df_original['startTime'] = pd.to_datetime(df_original['StartTime'], format='%H:%M', errors='coerce').apply( | |
| lambda t: datetime.combine(datetime.strptime(date_str, '%Y-%m-%d').date(), t.time()) if pd.notnull( | |
| t) else pd.NaT) | |
| df_original['endTime'] = pd.to_datetime(df_original['EndTime'], format='%H:%M', errors='coerce').apply( | |
| lambda t: datetime.combine(datetime.strptime(date_str, '%Y-%m-%d').date(), t.time()) if pd.notnull( | |
| t) else pd.NaT) | |
| # 处理跨天时间 | |
| df_original.loc[df_original['endTime'] < df_original['startTime'], 'endTime'] += timedelta(days=1) | |
| # 简单影厅名处理 (仅提取数字或主要标识) | |
| def simplify_hall(name): | |
| import re | |
| match = re.search(r'(\d+号?)', str(name)) | |
| return match.group(1) if match else str(name)[:2] | |
| df_original['simpleHallName'] = df_original['Hall'].apply(simplify_hall) | |
| df_check = df_original.sort_values(by='startTime').reset_index(drop=True) | |
| final_log_parts = [] | |
| # --- Rule 1: 同影片场次间隔过近 --- | |
| logs_r1 = [] | |
| for film_name in df_check['filmName'].unique(): | |
| film_schedules = df_check[df_check['filmName'] == film_name].sort_values(by='startTime').reset_index() | |
| if len(film_schedules) > 1: | |
| for i in range(len(film_schedules) - 1): | |
| s1, s2 = film_schedules.iloc[i], film_schedules.iloc[i + 1] | |
| interval = (s2['startTime'] - s1['startTime']).total_seconds() / 60 | |
| if interval < 30: | |
| log_entry = f"《{s1['filmName']}》{s1['simpleHallName']}【{s1['startTime'].strftime('%H:%M')}】和 {s2['simpleHallName']}【{s2['startTime'].strftime('%H:%M')}】开场时间距离 {int(interval)} 分钟" | |
| logs_r1.append(log_entry) | |
| final_log_parts.append("规则一:同影片场次间隔过近(少于 30 分钟)") | |
| if logs_r1: | |
| for i, log in enumerate(logs_r1, 1): | |
| final_log_parts.append(f"{i}. {log}") | |
| else: | |
| final_log_parts.append("(无)") | |
| # --- Rule 2: 30 分钟内影片开场超过 4 场 --- | |
| logs_r2 = [] | |
| i = 0 | |
| processed_indices_r2 = set() | |
| while i < len(df_check): | |
| if i in processed_indices_r2: | |
| i += 1 | |
| continue | |
| window_start_time = df_check.iloc[i]['startTime'] | |
| window_end_time_30min = window_start_time + timedelta(minutes=30) | |
| window_df = df_check[ | |
| (df_check['startTime'] >= window_start_time) & (df_check['startTime'] < window_end_time_30min)] | |
| if len(window_df) > 4: | |
| start_t_str = window_df.iloc[0]['startTime'].strftime('%H:%M') | |
| end_t_str = window_df.iloc[-1]['startTime'].strftime('%H:%M') | |
| log_message_lines = [f"【{start_t_str} - {end_t_str}】开场时间比较集中:"] | |
| for _, row in window_df.iterrows(): | |
| log_message_lines.append( | |
| f" {row['simpleHallName']}《{row['filmName']}》> {row['startTime'].strftime('%H:%M')}") | |
| processed_indices_r2.add(row.name) | |
| logs_r2.append("\n".join(log_message_lines)) | |
| i += 1 | |
| final_log_parts.append("\n规则二:30 分钟内影片开场超过 4 场") | |
| if logs_r2: | |
| for i, log in enumerate(logs_r2, 1): | |
| final_log_parts.append(f"{i}. {log}") | |
| else: | |
| final_log_parts.append("(无)") | |
| # --- Rule 3: 场次开场间隔超过 30 分钟 --- | |
| logs_r3 = [] | |
| if len(df_check) > 1: | |
| for i in range(len(df_check) - 1): | |
| s1_start, s2_start = df_check.iloc[i]['startTime'], df_check.iloc[i + 1]['startTime'] | |
| gap = (s2_start - s1_start).total_seconds() / 60 | |
| if gap > 30: | |
| log_entry = f"【{s1_start.strftime('%H:%M')} ~ {s2_start.strftime('%H:%M')}】缺少影片开场,间隔 {int(gap)} 分钟" | |
| logs_r3.append(log_entry) | |
| final_log_parts.append("\n规则三:场次开场间隔超过 30 分钟") | |
| if logs_r3: | |
| for i, log in enumerate(logs_r3, 1): | |
| final_log_parts.append(f"{i}. {log}") | |
| else: | |
| final_log_parts.append("(无)") | |
| # --- Rule 4: 最早/最晚场次时间检查 --- | |
| logs_r4 = [] | |
| if not df_check.empty: | |
| first_sched = df_check.iloc[0] | |
| last_sched = df_check.iloc[-1] | |
| if first_sched['startTime'].time() > dt_time(10, 0): | |
| logs_r4.append( | |
| f"最早一场 {first_sched['simpleHallName']}《{first_sched['filmName']}》{first_sched['startTime'].strftime('%H:%M')} 晚于 10:00") | |
| if last_sched['startTime'].time() < dt_time(22, 30): | |
| logs_r4.append( | |
| f"最晚一场 {last_sched['simpleHallName']}《{last_sched['filmName']}》{last_sched['startTime'].strftime('%H:%M')} 早于 22:30") | |
| final_log_parts.append("\n规则四:最早一场晚于 10:00,最晚一场早于 22:30") | |
| if logs_r4: | |
| for i, log in enumerate(logs_r4, 1): | |
| final_log_parts.append(f"{i}. {log}") | |
| else: | |
| final_log_parts.append("(无)") | |
| # --- Rule 5: 影厅空闲时间超过 1 小时 (10:00-23:00) --- | |
| logs_r5 = [] | |
| today_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
| window_start_time_limit_r5 = datetime.combine(today_date, dt_time(10, 0)) | |
| window_end_time_limit_r5 = datetime.combine(today_date, dt_time(23, 0)) | |
| unique_halls_r5 = df_original['simpleHallName'].unique() | |
| for hall_name in unique_halls_r5: | |
| hall_df = df_original[df_original['simpleHallName'] == hall_name].sort_values(by='startTime') | |
| if len(hall_df) > 1: | |
| for i in range(len(hall_df) - 1): | |
| prev_sched_end = hall_df.iloc[i]['endTime'] | |
| curr_sched_start = hall_df.iloc[i + 1]['startTime'] | |
| if prev_sched_end < window_end_time_limit_r5 and curr_sched_start > window_start_time_limit_r5: | |
| idle_duration_minutes = (curr_sched_start - prev_sched_end).total_seconds() / 60 | |
| if idle_duration_minutes > 60: | |
| log_entry = f"{hall_name} 【{prev_sched_end.strftime('%H:%M')} ~ {curr_sched_start.strftime('%H:%M')}】无影片在播,时长 {int(idle_duration_minutes)} 分钟" | |
| logs_r5.append(log_entry) | |
| final_log_parts.append("\n规则五:影厅空闲时间超过 1 小时(10:00-23:00)") | |
| if logs_r5: | |
| for i, log in enumerate(logs_r5, 1): | |
| final_log_parts.append(f"{i}. {log}") | |
| else: | |
| final_log_parts.append("(无)") | |
| # --- Rule 6: 影厅场次转换时间检查 --- | |
| logs_r6 = [] | |
| for hall_name in df_original['simpleHallName'].unique(): | |
| hall_df = df_original[df_original['simpleHallName'] == hall_name].sort_values(by='startTime') | |
| if len(hall_df) > 1: | |
| for i in range(len(hall_df) - 1): | |
| prev_sched = hall_df.iloc[i] | |
| next_sched = hall_df.iloc[i + 1] | |
| conversion_time = (next_sched['startTime'] - prev_sched['endTime']).total_seconds() / 60 | |
| if conversion_time < 10: | |
| logs_r6.append( | |
| f"{hall_name} {prev_sched['endTime'].strftime('%H:%M')} 《{prev_sched['filmName']}》结束后影厅空闲时间仅为 {int(conversion_time)} 分钟") | |
| final_log_parts.append("\n规则六:影厅场次转换时间检查") | |
| if logs_r6: | |
| for i, log in enumerate(logs_r6, 1): | |
| final_log_parts.append(f"{i}. {log}") | |
| else: | |
| final_log_parts.append("(无)") | |
| # --- Rule 7: 动态散场和入场高峰预警 --- | |
| logs_r7 = [] | |
| final_log_parts.append("\n规则七:动态散场和入场高峰预警") | |
| if not df_check.empty: | |
| start_time = df_check.iloc[0]['startTime'].replace(second=0, microsecond=0) | |
| end_time = df_check.iloc[-1]['endTime'] | |
| current_time = start_time | |
| reported_windows = set() | |
| while current_time < end_time: | |
| window_end = current_time + timedelta(minutes=10) | |
| starts_in_window = df_check[(df_check['startTime'] >= current_time) & (df_check['startTime'] < window_end)] | |
| ends_in_window = df_check[(df_check['endTime'] > current_time) & (df_check['endTime'] <= window_end)] | |
| if len(starts_in_window) + len(ends_in_window) > 5: | |
| window_tuple = (current_time.strftime('%H:%M'), window_end.strftime('%H:%M')) | |
| if window_tuple not in reported_windows: | |
| exit_halls = "、".join(ends_in_window['simpleHallName']) | |
| entry_halls = "、".join(starts_in_window['simpleHallName']) | |
| log_msg = f"【{current_time.strftime('%H:%M')} ~ {window_end.strftime('%H:%M')}】" | |
| if not ends_in_window.empty: | |
| log_msg += f",{exit_halls}集中散场" | |
| if not starts_in_window.empty: | |
| if not ends_in_window.empty: | |
| log_msg += ",同时" | |
| else: | |
| log_msg += "," | |
| log_msg += f"{entry_halls}即将入场" | |
| log_msg += ",预计人流瞬时压力过大。" | |
| logs_r7.append(log_msg) | |
| reported_windows.add(window_tuple) | |
| current_time += timedelta(minutes=5) | |
| # Part 2: Simultaneous start | |
| start_groups = df_check.groupby('startTime').filter(lambda x: len(x) > 3) | |
| for time_val, group in start_groups.groupby('startTime'): | |
| halls = "、".join(group['simpleHallName']) | |
| logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时开场,注意预计人流瞬时压力过大。") | |
| # Part 3: Simultaneous end | |
| end_groups = df_check.groupby('endTime').filter(lambda x: len(x) > 3) | |
| for time_val, group in end_groups.groupby('endTime'): | |
| halls = "、".join(group['simpleHallName']) | |
| logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时散场,注意预计人流瞬时压力过大。") | |
| if logs_r7: | |
| for i, log in enumerate(logs_r7, 1): | |
| final_log_parts.append(f"{i}. {log}") | |
| else: | |
| final_log_parts.append("(无)") | |
| # --- Rule 8: “幽灵厅”预警 --- | |
| logs_r8 = [] | |
| final_log_parts.append("\n规则八:影厅结束运营过早预警") | |
| for hall_name in df_original['simpleHallName'].unique(): | |
| last_sched = df_original[df_original['simpleHallName'] == hall_name].nlargest(1, 'endTime').iloc[0] | |
| # 简单判断:如果最后一场结束时间早于 22:30 且就是当天(不是跨天到凌晨) | |
| if last_sched['endTime'].date() == today_date and last_sched['endTime'].time() < dt_time(22, 30): | |
| logs_r8.append(f"{hall_name} 最后一场于【{last_sched['endTime'].strftime('%H:%M')}】结束,过早停运。") | |
| if logs_r8: | |
| for i, log in enumerate(logs_r8, 1): | |
| final_log_parts.append(f"{i}. {log}") | |
| else: | |
| final_log_parts.append("(无)") | |
| # --- Rule 9: 真正意义的黄金时段热门影片排片密度检查 --- | |
| logs_r9 = [] | |
| final_log_parts.append("\n规则九:黄金时段热门影片排片密度检查") | |
| if not df_check.empty: | |
| weekday = today_date.weekday() | |
| golden_hours_r9 = [ | |
| [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], | |
| [(dt_time(14, 0), dt_time(15, 30)), (dt_time(19, 0), dt_time(22, 20))], | |
| [(dt_time(14, 30), dt_time(16, 0)), (dt_time(19, 0), dt_time(21, 40))], | |
| [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], | |
| [(dt_time(14, 0), dt_time(15, 0)), (dt_time(19, 0), dt_time(22, 0))], | |
| [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], | |
| [(dt_time(14, 0), dt_time(17, 0)), (dt_time(19, 0), dt_time(21, 30))] | |
| ][weekday] | |
| film_counts = df_check['filmName'].value_counts() | |
| if not film_counts.empty: | |
| max_count = film_counts.iloc[0] | |
| # 定义热门影片:排片量接近第一名(95%以上)的影片 | |
| hot_films = film_counts[film_counts >= max_count * 0.95].index.tolist() | |
| golden_hour_schedules = df_check[ | |
| df_check['startTime'].apply(lambda dt: any(start <= dt.time() < end for start, end in golden_hours_r9))] | |
| for film in hot_films: | |
| hot_film_total_in_golden = len(golden_hour_schedules[golden_hour_schedules['filmName'] == film]) | |
| golden_total = len(golden_hour_schedules) | |
| if golden_total > 0: | |
| ratio = hot_film_total_in_golden / golden_total | |
| if ratio < 0.3: # 热门影片黄金时段占比低于30%预警 | |
| period_str = " 和 ".join( | |
| [f"{s.strftime('%H:%M')}-{e.strftime('%H:%M')}" for s, e in golden_hours_r9]) | |
| logs_r9.append(f"《{film}》在核心黄金时段 {period_str} 排片占比仅为{ratio:.1%},低于 30%。") | |
| if logs_r9: | |
| for i, log in enumerate(logs_r9, 1): | |
| final_log_parts.append(f"{i}. {log}") | |
| else: | |
| final_log_parts.append("(无)") | |
| return "\n".join(final_log_parts) | |
| def check_tms_file_availability(schedule_list, tms_data, date_str): | |
| """ | |
| 对比排片表和TMS数据,检查影厅是否缺失对应的影片文件 | |
| 优化:仅匹配核心片名(去除版本后缀),优化影厅名显示,合并日志输出 | |
| """ | |
| if not schedule_list: | |
| return "未获取到排片数据,无法检查。" | |
| if not tms_data: | |
| return "未获取到 TMS 数据,无法检查。" | |
| # --- 内部辅助函数 --- | |
| def get_core_movie_name(raw_name): | |
| """ | |
| 获取核心片名用于匹配: | |
| 1. 先执行标准的 clean_movie_title (统一命名) | |
| 2. 再去除所有括号及括号内的内容 (去除版本/制式信息) | |
| 例如:'疯狂动物城2(数字3D)' -> '疯狂动物城2' | |
| """ | |
| # 1. 基础清洗 (利用现有的逻辑处理中英文/特殊后缀) | |
| # 注意:这里我们不传入 canonical_names,只做规则清洗 | |
| name = clean_movie_title(raw_name) | |
| # 2. 正则去除中文全角括号及内容 (...) | |
| name = re.sub(r'(.*?)', '', name) | |
| # 3. 正则去除英文半角括号及内容 (...) | |
| name = re.sub(r'\(.*?\)', '', name) | |
| return name.strip() | |
| def clean_hall_display_name(raw_name): | |
| """去除影厅名两端多余的 【】 [] 符号""" | |
| return str(raw_name).strip('【】[] ') | |
| def get_hall_key_num(name): | |
| """提取影厅数字ID用于数据匹配 (如 '1号厅' -> '1')""" | |
| nums = re.findall(r'\d+', str(name)) | |
| return nums[0] if nums else str(name) | |
| # ------------------ | |
| # 1. 预处理 TMS 数据 | |
| # 结构: {'1': ['Zootopia2', '疯狂动物城2', ...], ...} | |
| tms_map = defaultdict(set) # 使用 set 提高查找效率 | |
| for hall_name, movies in tms_data.items(): | |
| hall_key = get_hall_key_num(hall_name) | |
| for movie in movies: | |
| # 收集 Assert Name (显示名) | |
| if movie.get('details', {}).get('assert_name'): | |
| # 同样对 TMS 里的名字取核心名,提高匹配率 | |
| core_tms_name = get_core_movie_name(str(movie['details']['assert_name'])) | |
| tms_map[hall_key].add(core_tms_name.upper()) | |
| # 保留原始 Assert Name 用于兜底匹配 | |
| tms_map[hall_key].add(str(movie['details']['assert_name']).upper()) | |
| # 收集 Content Name (文件名/UUID) | |
| if movie.get('content_name'): | |
| tms_map[hall_key].add(str(movie['content_name']).upper()) | |
| # 2. 遍历排片数据进行检查 | |
| missing_logs = [] | |
| checked_combinations = set() | |
| for item in schedule_list: | |
| hall_raw = item.get('hallName') or item.get('Hall') | |
| movie_raw = item.get('movieName') or item.get('Movie') | |
| if not hall_raw or not movie_raw: | |
| continue | |
| # 准备数据 | |
| hall_num = get_hall_key_num(hall_raw) | |
| hall_display = clean_hall_display_name(hall_raw) # 清洗后的影厅名 | |
| # 获取排片的核心片名 (去掉版本后缀) | |
| target_movie_core = get_core_movie_name(movie_raw).upper() | |
| # 组合键去重 (同一厅同一部片只报一次) | |
| combo_key = (hall_num, target_movie_core) | |
| if combo_key in checked_combinations: | |
| continue | |
| checked_combinations.add(combo_key) | |
| # 检查逻辑 | |
| if hall_num not in tms_map: | |
| # 找不到影厅数据暂不报错,可能是未映射或设备离线,避免刷屏 | |
| continue | |
| # 核心匹配:检查 TMS 集合中是否包含核心片名 | |
| # 方式A:精确匹配核心名 (推荐,最准) | |
| # 方式B:模糊包含 (target in tms_file) | |
| has_file = False | |
| tms_files = tms_map[hall_num] | |
| # 策略:只要 TMS 中有一个文件名 包含 我们的核心排片名,就视为有片 | |
| # 例如:排片 core='疯狂动物城2',TMS='疯狂动物城2_IMAX' -> 匹配成功 | |
| for tms_file in tms_files: | |
| if target_movie_core in tms_file: | |
| has_file = True | |
| break | |
| if not has_file: | |
| # 记录日志,使用清洗后的影厅名和排片原名 | |
| missing_logs.append(f"【{hall_display}】排映《{movie_raw}》,但服务器未检测到包含“{target_movie_core}”的文件。") | |
| # 3. 格式化输出 | |
| if not missing_logs: | |
| return None # 返回 None 表示一切正常 | |
| # 生成带编号的字符串 | |
| formatted_output = [] | |
| for idx, log in enumerate(missing_logs, 1): | |
| formatted_output.append(f"{idx}. ❌ 缺片警告:{log}") | |
| return "\n".join(formatted_output) | |
| # --- 5. UI 渲染与交互逻辑 --- | |
| def display_analysis_results(df_raw, data_source_name, date_for_display, query_tms_enabled): | |
| if df_raw.empty: | |
| st.info(f"请先从 {data_source_name} 加载数据。"); | |
| return | |
| if data_source_name == "文件": | |
| token_data = load_token() | |
| if not token_data: | |
| token_data = login_and_get_token() | |
| token = token_data.get('token') if token_data else None | |
| date_str = date_for_display.strftime('%Y-%m-%d') if date_for_display else None | |
| canonical_names = [] | |
| if token and date_str: | |
| canonical_names = fetch_canonical_movie_names(token, date_str) | |
| df_raw['影片名称_清理后'] = df_raw['影片名称'].apply(lambda x: clean_movie_title(x, canonical_names)) | |
| else: | |
| df_raw['影片名称_清理后'] = df_raw['影片名称'] | |
| date_str = f"{date_for_display} " if date_for_display else "" | |
| total_revenue, total_attendance, total_sessions = df_raw['总收入'].sum(), df_raw['总人次'].sum(), len(df_raw) | |
| st.markdown( | |
| f"> {date_str}数据总览:总票房 **¥{total_revenue:,.2f}** | 总人次 **{total_attendance:,.0f}** | 总场次 **{total_sessions:,.0f}**") | |
| format_config = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', '均价': '{:.2f}', | |
| '座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', '座次效率': '{:.2f}', | |
| '场次效率': '{:.2f}'} | |
| full_day_analysis, prime_time_analysis = process_and_analyze_data(df_raw.copy()), process_and_analyze_data( | |
| df_raw[df_raw['放映时间'].between(dt_time(14, 0), dt_time(21, 0))].copy()) | |
| if query_tms_enabled: | |
| with st.spinner("正在关联查询 TMS 服务器..."): | |
| try: | |
| priority_titles = full_day_analysis['影片'].unique().tolist() | |
| _, tms_movie_list = fetch_and_process_server_movies(priority_titles) | |
| full_day_analysis = add_tms_locations_to_analysis(full_day_analysis, tms_movie_list) | |
| prime_time_analysis = add_tms_locations_to_analysis(prime_time_analysis, tms_movie_list) | |
| if '影片所在影厅位置' in full_day_analysis.columns: | |
| cols = full_day_analysis.columns.tolist(); | |
| full_day_analysis = full_day_analysis[cols[:1] + ['影片所在影厅位置'] + cols[1:-1]] | |
| if '影片所在影厅位置' in prime_time_analysis.columns: | |
| cols = prime_time_analysis.columns.tolist(); | |
| prime_time_analysis = prime_time_analysis[cols[:1] + ['影片所在影厅位置'] + cols[1:-1]] | |
| st.toast("TMS 影片位置关联成功!", icon="🔗") | |
| except Exception as e: | |
| st.error(f"关联TMS失败: {e}") | |
| st.markdown("#### 全天排片效率分析"); | |
| st.dataframe(full_day_analysis.style.format(format_config).apply(style_efficiency, axis=1), | |
| use_container_width=True, hide_index=True) | |
| st.markdown("#### 黄金时段排片效率分析 (14:00-21:00)"); | |
| st.dataframe(prime_time_analysis.style.format(format_config).apply(style_efficiency, axis=1), | |
| use_container_width=True, hide_index=True) | |
| if not full_day_analysis.empty: | |
| st.markdown("### 排片效率汇总") | |
| full_day_summary = full_day_analysis.rename( | |
| columns={'场次': '全部场次', '座次效率': '全部座次效率', '场次效率': '全部场次效率'}) | |
| full_day_cols_to_keep = ['影片', '票房', '全部场次', '全部座次效率', '全部场次效率'] | |
| if '影片所在影厅位置' in full_day_summary.columns: full_day_cols_to_keep.insert(1, '影片所在影厅位置') | |
| full_day_summary = full_day_summary[full_day_cols_to_keep] | |
| prime_time_summary = prime_time_analysis.rename( | |
| columns={'场次': '黄金时段场次', '座次效率': '黄金时段座次效率', '场次效率': '黄金时段场次效率'})[ | |
| ['影片', '黄金时段场次', '黄金时段座次效率', '黄金时段场次效率']] | |
| summary_df = pd.merge(full_day_summary, prime_time_summary, on='影片', how='left').fillna(0) | |
| summary_df['黄金时段场次'] = summary_df['黄金时段场次'].astype(int) | |
| summary_format_config = {'票房': '{:,.2f}', '全部场次': '{:,.0f}', '黄金时段场次': '{:,.0f}', | |
| '全部座次效率': '{:.2f}', '全部场次效率': '{:.2f}', '黄金时段座次效率': '{:.2f}', | |
| '黄金时段场次效率': '{:.2f}'} | |
| st.dataframe(summary_df.style.format(summary_format_config).apply(style_summary_efficiency, axis=1), | |
| use_container_width=True, hide_index=True) | |
| def fetch_and_process_daily_sessions(date_str, quiet=False): | |
| """获取并处理指定日期的排片场次,返回(场次字典, 总场次数)""" | |
| if not quiet: st.write(f"正在查询 {date_str} 的排片数据...") | |
| token_data = load_token() | |
| token = token_data.get('token') if token_data else None | |
| if not token: | |
| token_data = login_and_get_token() | |
| token = token_data.get('token') if token_data else None | |
| schedule, _ = get_api_data_with_token_management(date_str) | |
| if not schedule: | |
| if not quiet: st.warning(f"未能获取到 {date_str} 的排片数据。") | |
| return None, None, None # 修改返回值,增加 raw_schedule | |
| total_sessions = len(schedule) | |
| df = pd.DataFrame(schedule) | |
| canonical_names = [] | |
| if token: | |
| canonical_names = fetch_canonical_movie_names(token, date_str) | |
| df['影片名称_清理后'] = df['movieName'].apply(lambda x: clean_movie_title(x, canonical_names)) | |
| sessions_map = df.groupby('影片名称_清理后').size().to_dict() | |
| # 返回 (映射表, 总场次, 原始排片列表) | |
| return sessions_map, total_sessions, schedule | |
| def generate_efficiency_report_df(analysis_df, next_day_sessions_map=None, next_day_total_sessions=None): | |
| if analysis_df.empty: return pd.DataFrame() | |
| report_df = analysis_df[ | |
| ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率', '场次效率']].copy() | |
| report_df['情况说明'] = ''; | |
| report_df['次日调整方案'] = '' | |
| if next_day_sessions_map is not None: | |
| report_df['次日场数'] = report_df['影片'].map(next_day_sessions_map).fillna(0).astype(int) | |
| else: | |
| report_df['次日场数'] = report_df['场次'] | |
| totals = report_df[['座位数', '场次', '票房', '人次']].sum() | |
| totals['影片'] = '' | |
| totals['次日场数'] = next_day_total_sessions if next_day_total_sessions is not None else report_df['次日场数'].sum() | |
| report_df = pd.concat([report_df, pd.DataFrame(totals).T], ignore_index=True) | |
| return report_df | |
| def generate_excel_paste_data(df): | |
| if df.empty: return "" | |
| lines, total_row_num = [], len(df) + 1 | |
| for i, row in df.iterrows(): | |
| excel_row_num = i + 2 | |
| line = [row['影片'], row['座位数'], row['场次'], row['票房'], row['人次']] | |
| if i == len(df) - 1: # 合计行 | |
| line.extend(['', '', '', '', '', '', '', '', row['次日场数']]) | |
| else: # 数据行 | |
| line.extend([f"=IFERROR(F{excel_row_num}/G{excel_row_num},0)", f"=D{excel_row_num}/D${total_row_num}", | |
| f"=E{excel_row_num}/E${total_row_num}", f"=F{excel_row_num}/F${total_row_num}", | |
| f"=IFERROR(K{excel_row_num}/I{excel_row_num},0)", | |
| f"=IFERROR(K{excel_row_num}/J{excel_row_num},0)", row['情况说明'], row['次日调整方案'], | |
| row['次日场数']]) | |
| lines.append("\t".join(map(str, line))) | |
| return "\n".join(lines) | |
| def get_business_date(df_with_datetime): | |
| """根据包含完整日期时间列的DataFrame计算营业日""" | |
| crossover_time = dt_time(6, 0) | |
| df_with_datetime['business_date'] = df_with_datetime['datetime'].apply( | |
| lambda dt: (dt - timedelta(days=1)).date() if dt.time() < crossover_time else dt.date() | |
| ) | |
| return df_with_datetime['business_date'].mode()[0] | |
| # --- 主应用 --- | |
| def main(): | |
| st.title('影城工作便捷工具') | |
| # 初始化 session_state | |
| if 'file_df' not in st.session_state: st.session_state.file_df, st.session_state.api_df = pd.DataFrame(), pd.DataFrame() | |
| if 'api_date' not in st.session_state: st.session_state.api_date = datetime.now().date() | |
| if 'file_date' not in st.session_state: st.session_state.file_date = None | |
| if 'today_movie_count' not in st.session_state: st.session_state.today_movie_count = 0 | |
| if 'previous_day_movie_count' not in st.session_state: st.session_state.previous_day_movie_count = 0 | |
| if 'daily_report_df' not in st.session_state: st.session_state.daily_report_df = pd.DataFrame() | |
| if 'processed_print_data' not in st.session_state: st.session_state.processed_print_data = None | |
| if 'check_logs' not in st.session_state: st.session_state.check_logs = "" | |
| tab1, tab2, tab_sales, tab_report, tab_print, tab3 = st.tabs( | |
| ["🔍 排片效率分析", "📋 次日排片效率分析报表", "🍿 卖品品类分析表", "📑 影片映出日累计表", "🖨️ 场次与散场打印", | |
| "🎬 TMS 影片查询"]) | |
| with tab1: | |
| # 顶部控制区 | |
| col_a, col_b = st.columns(2) | |
| with col_a: | |
| import_from_file = st.checkbox("从`影片映出日累计报表.xlsx`导入数据") | |
| with col_b: | |
| query_tms_for_location = st.checkbox("查询 TMS 找影片所在影厅") | |
| if import_from_file: | |
| st.header("从本地文件导入数据") | |
| st.write("上传 `影片映出日累计报表.xlsx`,程序将自动处理数据。") | |
| uploaded_file = st.file_uploader("上传 Excel 文件", type=['xlsx', 'xls'], label_visibility="collapsed") | |
| if uploaded_file is not None: | |
| with st.spinner("正在处理文件..."): | |
| try: | |
| df = pd.read_excel(uploaded_file, skiprows=3, header=None) | |
| df.rename(columns={0: '影片名称', 1: '放映日期', 2: '放映时间', 5: '总人次', 6: '总收入', | |
| 7: '座位数'}, inplace=True) | |
| df_for_date_calc = df[['放映日期', '放映时间']].copy() | |
| df_for_date_calc['datetime_str'] = df_for_date_calc['放映日期'].astype(str).str.split(' ').str[ | |
| 0] + ' ' + df_for_date_calc['放映时间'].astype(str) | |
| df_for_date_calc['datetime'] = pd.to_datetime(df_for_date_calc['datetime_str'], errors='coerce') | |
| df_for_date_calc.dropna(subset=['datetime'], inplace=True) | |
| business_date = get_business_date(df_for_date_calc) | |
| st.session_state.file_date = business_date | |
| st.toast(f"文件营业日识别为: {business_date}", icon="🗓️") | |
| df = df[['影片名称', '放映时间', '座位数', '总收入', '总人次']] | |
| df.dropna(subset=['影片名称', '放映时间'], inplace=True) | |
| for col in ['座位数', '总收入', '总人次']: df[col] = pd.to_numeric(df[col], | |
| errors='coerce').fillna(0) | |
| df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M:%S', errors='coerce').dt.time | |
| df.dropna(subset=['放映时间'], inplace=True) | |
| st.session_state.file_df = df | |
| except Exception as e: | |
| st.error(f"处理文件或识别日期时出错: {e}"); | |
| st.session_state.file_df, st.session_state.file_date = pd.DataFrame(), None | |
| # 展示分析结果 (传递 query_tms_for_location 的状态) | |
| display_analysis_results(st.session_state.file_df, "文件", st.session_state.file_date, | |
| query_tms_for_location) | |
| else: | |
| st.header("使用 API 获取数据") | |
| st.session_state.api_date = st.date_input("选择要查询的排片日期", value=st.session_state.api_date, | |
| key="api_date_picker") | |
| if st.button("获取排片数据", key="fetch_api_data", icon="🫵"): | |
| with st.spinner(f"正在获取 {st.session_state.api_date} 的排片数据..."): | |
| token_data = load_token() | |
| if not token_data: | |
| token_data = login_and_get_token() | |
| token = token_data.get('token') if token_data else None | |
| schedule, halls = get_api_data_with_token_management(st.session_state.api_date.strftime('%Y-%m-%d')) | |
| if schedule is not None and halls is not None: | |
| # 传入 token 和 date 以进行标准名清洗 | |
| processed_df = process_api_data(schedule, halls, token, | |
| st.session_state.api_date.strftime('%Y-%m-%d')) | |
| st.session_state.api_df = processed_df | |
| if not processed_df.empty: st.toast(f"成功获取并处理了 {len(processed_df)} 条排片数据!", | |
| icon="✅") | |
| else: | |
| st.session_state.api_df = pd.DataFrame() | |
| # 展示分析结果 (传递 query_tms_for_location 的状态) | |
| display_analysis_results(st.session_state.api_df, "API", st.session_state.api_date, query_tms_for_location) | |
| with tab2: | |
| # 确定数据源 (逻辑保持不变,API > 文件) | |
| source_df_raw, source_date, source_date_str = pd.DataFrame(), None, "" | |
| if not st.session_state.api_df.empty: | |
| source_df_raw, source_date, source_date_str = st.session_state.api_df, st.session_state.api_date, f"{st.session_state.api_date}" | |
| st.toast("正在使用来自 **API 获取** 的最新数据。", icon="☁️") | |
| elif not st.session_state.file_df.empty: | |
| source_df_raw, source_date, source_date_str = st.session_state.file_df, st.session_state.file_date, f"{st.session_state.file_date} (文件)" | |
| st.toast("API数据为空,正在使用来自 **文件导入** 的数据。", icon="📃") | |
| else: | |
| st.warning('没有可用的数据。请先在 "🔍 排片效率分析" 标签页加载数据。') | |
| st.header(f"{source_date_str} 排片效率分析与调整建议") | |
| if not source_df_raw.empty: | |
| if st.button("生成分析报表", icon="🫵"): | |
| with st.spinner("正在生成分析报表 (含跨日数据查询)..."): | |
| next_day_sessions_map, next_day_total_sessions = None, None | |
| previous_day_sessions_map = None | |
| token_data = load_token() | |
| if source_date and token_data: | |
| token = token_data.get('token') | |
| # 获取次日数据,并拿到原始排片表用于检查 | |
| next_day = source_date + timedelta(days=1) | |
| next_day_str = next_day.strftime('%Y-%m-%d') | |
| next_day_sessions_map, next_day_total_sessions, next_day_raw_schedule = fetch_and_process_daily_sessions( | |
| next_day_str, quiet=True) | |
| # 生成合理性检查日志 | |
| if next_day_raw_schedule: | |
| logs = generate_schedule_check_logs(next_day_raw_schedule, next_day_str) | |
| st.session_state.check_logs = logs | |
| else: | |
| st.session_state.check_logs = "无法获取次日排片详情,跳过检查。" | |
| # 获取前一日数据 | |
| previous_day = source_date - timedelta(days=1) | |
| previous_day_sessions_map, _, _ = fetch_and_process_daily_sessions( | |
| previous_day.strftime('%Y-%m-%d'), quiet=True) | |
| # 获取经营摘要数据 | |
| date_str = source_date.strftime('%Y-%m-%d') | |
| income_data = fetch_income_data(token, date_str) | |
| wenlv_cards = fetch_membership_data(token, date_str) | |
| attendance = int(source_df_raw['总人次'].sum()) | |
| st.session_state.daily_summary_data = { | |
| "ticket_income": income_data.get('ticket_income', 0.0) if income_data else 0.0, | |
| "attendance": attendance, | |
| "goods_income": income_data.get('goods_income', 0.0) if income_data else 0.0, | |
| "sold_incomes_zb": income_data.get('sold_incomes_zb', 0.0) if income_data else 0.0, | |
| "wenlv_cards": wenlv_cards | |
| } | |
| else: | |
| st.error("无法确定源数据日期或Token,无法获取跨日及经营数据。") | |
| if '影片名称_清理后' not in source_df_raw.columns: | |
| source_df_raw['影片名称_清理后'] = source_df_raw['影片名称'] | |
| analysis_df = process_and_analyze_data(source_df_raw.copy()) | |
| st.session_state.today_movie_count = len(analysis_df) | |
| st.session_state.previous_day_movie_count = len( | |
| previous_day_sessions_map) if previous_day_sessions_map else 0 | |
| report_df = generate_efficiency_report_df(analysis_df, next_day_sessions_map, | |
| next_day_total_sessions) | |
| st.session_state.report_df = report_df | |
| st.session_state.excel_paste_data = generate_excel_paste_data(report_df) | |
| if 'report_df' in st.session_state and not st.session_state.report_df.empty: | |
| st.markdown("#### 排片效率分析表"); | |
| display_df = st.session_state.report_df.copy() | |
| display_df.insert(0, '序号', range(2, len(display_df) + 2)) | |
| report_format = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', | |
| '均价': '{:.2f}', '座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', | |
| '座次效率': '{:.2f}', '场次效率': '{:.2f}', '次日场数': '{:,.0f}'} | |
| display_df['均价'] = pd.to_numeric(display_df['均价'], errors='coerce').replace([np.inf, -np.inf], | |
| np.nan) | |
| st.dataframe(display_df.style.format(report_format, na_rep="#DIV/0!"), use_container_width=True, | |
| hide_index=True) | |
| n_diff = st.session_state.today_movie_count - st.session_state.previous_day_movie_count | |
| if n_diff > 0: | |
| excel_copy_title = f"复制到 Excel (需要增加 {n_diff} 行,从第一个电影名字开始粘贴)" | |
| elif n_diff < 0: | |
| excel_copy_title = f"复制到 Excel (需要减少 {abs(n_diff)} 行,从第一个电影名字开始粘贴)" | |
| else: | |
| excel_copy_title = "复制到 Excel (行数保持不变,从第一个电影名字开始粘贴)" | |
| st.markdown(f"##### {excel_copy_title}"); | |
| st.code(st.session_state.excel_paste_data, language='text') | |
| # 复制列表 | |
| report_df = st.session_state.report_df | |
| movie_titles = report_df.iloc[:-1]['影片'].tolist() | |
| weather_title = get_weather_forecast(source_date) | |
| st.markdown(f"##### {weather_title}") | |
| st.code(''.join([f'《{title}》' for title in movie_titles]), language='text') | |
| # 经营摘要 | |
| if 'daily_summary_data' in st.session_state: | |
| summary_data = st.session_state.daily_summary_data | |
| summary_text = ( | |
| f"花都店,今日票房:{summary_data.get('ticket_income', 0.0):.2f}元," | |
| f"观影人次:{summary_data.get('attendance', 0)}," | |
| f"卖品收入:{summary_data.get('goods_income', 0.0):.2f}元," | |
| f"卖品占比:{summary_data.get('sold_incomes_zb', 0.0):.2f}%," | |
| f"文旅卡:{summary_data.get('wenlv_cards', 0)}张。" | |
| ) | |
| st.markdown(f"##### 今日经营数据概览") | |
| st.code(summary_text) | |
| st.markdown(f"> 上述文旅卡数量不是实际数据,API 获取的数据是需要次日 10 点更新数据,请查询鼎新报表系统里查询`卡发行`报表。") | |
| st.markdown(f"> 抽奖券计算方法:先在鼎新报表系统里查询`卡发行`当日开卡数量然后查询`卡充值`当日详细的充值金额,**充值金额整除 200 加上卡发行数量即为抽奖券数量**。") | |
| st.markdown("#### 🔍 场次合理性检查日志") | |
| if st.session_state.check_logs: | |
| st.code(st.session_state.check_logs) | |
| st.markdown("#### 📡 次日排片 TMS 文件核对") | |
| st.info( | |
| "此功能将查询 TMS 服务器,检查次日排程的影厅是否有对应的影片文件(不区分语言和制式版本,仅匹配片名)。") | |
| if st.button("开始核对 TMS 文件", key="check_tms_files_btn", icon="🕵️♂️"): | |
| # 确定次日日期 | |
| check_date = source_date + timedelta(days=1) | |
| check_date_str = check_date.strftime('%Y-%m-%d') | |
| with st.spinner(f"正在获取 {check_date_str} 的排片数据并连接 TMS 服务器..."): | |
| # 1. 获取次日排片 | |
| schedule_data, _ = get_api_data_with_token_management(check_date_str) | |
| if not schedule_data: | |
| st.error(f"无法获取 {check_date_str} 的排片数据,请检查网络或 Token。") | |
| else: | |
| try: | |
| # 2. 获取 TMS 数据 | |
| df_sched = pd.DataFrame(schedule_data) | |
| priority_titles = df_sched[ | |
| 'movieName'].unique().tolist() if 'movieName' in df_sched.columns else [] | |
| tms_hall_data, _ = fetch_and_process_server_movies(priority_titles) | |
| # 3. 执行比对 (使用新函数) | |
| logs_text = check_tms_file_availability(schedule_data, tms_hall_data, | |
| check_date_str) | |
| if logs_text is None: | |
| st.success( | |
| f"✅ 核对完成:{check_date_str} 所有排映影片在对应影厅服务器中均存在关联文件。") | |
| else: | |
| st.warning("⚠️ 发现潜在缺片风险!请检查以下影厅服务器:") | |
| # 这里使用 st.code 展示多行带编号的文本 | |
| st.code(logs_text, language="text") | |
| except Exception as e: | |
| st.error(f"核对过程中发生错误: {e}") | |
| with tab_sales: | |
| # 卖品品类分析表 UI | |
| col_1, col_2 = st.columns(2) | |
| with col_1: | |
| sales_from_file = st.checkbox("从`商品销售汇总报表-已退减.xlsx`导入数据", key="sales_file_cb") | |
| if sales_from_file: | |
| st.header("从本地文件导入数据") | |
| st.write("请上传 `商品销售汇总报表-已退减.xlsx` 文件。") | |
| uploaded_file = st.file_uploader("上传 Excel 文件", type=['xlsx', 'xls'], label_visibility="collapsed", | |
| key="sales_file_uploader") | |
| if uploaded_file is not None: | |
| with st.spinner("正在读取并分析文件..."): | |
| try: | |
| df = pd.read_excel(uploaded_file, skiprows=3) | |
| final_summary, copy_text = process_sales_data(df) | |
| if final_summary is not None: | |
| st.markdown("#### 销售总览 (套餐 Top 10 + 单品 Top 5)") | |
| st.dataframe(final_summary, use_container_width=True, hide_index=True) | |
| st.markdown("##### 复制到 Excel") | |
| st.code(copy_text, language='text') | |
| except Exception as e: | |
| st.error(f"处理文件时发生错误: {e}") | |
| else: | |
| st.header("从服务器API获取实时数据") | |
| sales_date = st.date_input("选择要查询的日期", value=datetime.now().date(), key="sales_date_picker") | |
| if st.button("获取卖品销售数据", key="fetch_sales_data", icon="🫵"): | |
| with st.spinner(f"正在获取 {sales_date} 的销售数据..."): | |
| api_results = get_sales_data_with_token_management(sales_date) | |
| if api_results is not None: | |
| st.toast(f"成功获取到 {len(api_results)} 条销售记录!", icon="✅") | |
| df = transform_api_data_to_df(api_results) | |
| final_summary, copy_text = process_sales_data(df) | |
| if final_summary is not None: | |
| st.markdown("#### 销售总览 (套餐 Top 10 + 单品 Top 5)") | |
| st.dataframe(final_summary, use_container_width=True, hide_index=True) | |
| st.markdown("##### 复制到 Excel") | |
| st.code(copy_text, language='text') | |
| else: | |
| st.warning("未能从 API 获取到数据,请检查登录或网络连接。") | |
| with tab_report: | |
| # 影片映出日累计表 UI | |
| st.header("影片映出日累计报表生成") | |
| report_date = st.date_input("选择要查询的排片日期", value=datetime.now().date(), key="daily_report_date") | |
| if st.button("获取并生成报表", key="fetch_daily_report", icon="🫵"): | |
| report_date_str = report_date.strftime('%Y-%m-%d') | |
| with st.spinner(f"正在获取 {report_date_str} 的排片数据..."): | |
| token_data = load_token() | |
| if not token_data: token_data = login_and_get_token() | |
| token = token_data.get('token') if token_data else None | |
| schedule, halls_map = get_api_data_with_token_management(report_date_str) | |
| if schedule is not None and halls_map is not None: | |
| processed_df = process_and_filter_data_for_report(schedule, halls_map, report_date_str, token) | |
| st.session_state.daily_report_df = processed_df | |
| if not processed_df.empty: | |
| st.toast(f"成功获取并处理了 {len(processed_df)} 条有效场次数据!", icon="✅") | |
| else: | |
| st.session_state.daily_report_df = pd.DataFrame() | |
| if not st.session_state.daily_report_df.empty: | |
| st.markdown(f"#### {report_date.strftime('%Y-%m-%d')} 影片映出日累计报表") | |
| st.dataframe( | |
| st.session_state.daily_report_df.style.format({ | |
| '人数合计': '{:,.0f}', '座位数': '{:,.0f}', '上座率%': '{:.2f}%' | |
| }), | |
| use_container_width=True, hide_index=True | |
| ) | |
| import io | |
| output_buffer = io.BytesIO() | |
| st.session_state.daily_report_df.to_excel(output_buffer, index=False, engine='openpyxl') | |
| excel_data = output_buffer.getvalue() | |
| st.download_button( | |
| label="📥 下载 XLSX 报表文件", | |
| data=excel_data, | |
| file_name=f"{report_date.strftime('%Y-%m-%d')}_影片映出日累计报表.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| ) | |
| else: | |
| st.info("请选择日期并点击“获取并生成报表”以生成数据。") | |
| with tab_print: | |
| # 场次与散场时间快捷打印 UI | |
| st.header("场次与散场时间快捷打印") | |
| with st.expander("⚙️ 显示与打印设置", expanded=False): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("⚙️ 修改 LED 屏排片表设置") | |
| led_font_name = st.selectbox("字体选择", options=list(AVAILABLE_FONTS.keys()), index=0, key="led_font") | |
| led_font_path = AVAILABLE_FONTS.get(led_font_name) | |
| generate_png_led = st.checkbox("生成 PNG 图片", value=False, key="png_led") | |
| with col2: | |
| st.subheader("⚙️ 散场时间表设置") | |
| times_font_name = st.selectbox("字体选择", options=list(AVAILABLE_FONTS.keys()), index=1, | |
| key="times_font") | |
| times_font_path = AVAILABLE_FONTS.get(times_font_name) | |
| font_size_multiplier = st.slider("字体大小调节", min_value=0.8, max_value=1.5, value=1.2, step=0.05, | |
| help="调整字体在单元格内的相对大小") | |
| split_time = st.time_input("白班 / 晚班分割时间", value=dt_time(17, 0), | |
| help="散场时间在此时间点之前(含)的为白班") | |
| time_adjustment = st.slider("时间提前 (分钟)", min_value=0, max_value=10, value=0, | |
| help="将所有散场时间提前 N 分钟显示") | |
| hall_display_format = st.radio("影厅号格式", options=['Default', 'Superscript', 'Circled'], | |
| format_func=lambda x: | |
| {'Default': '默认 (2 18:28)', 'Superscript': '上标 (2# 18:28)', | |
| 'Circled': '带圈 (② 18:28)'}[x], horizontal=True) | |
| generate_png_times = st.checkbox("生成 PNG 图片", value=False, key="png_times") | |
| with st.expander("💡 使用帮助", expanded=False): | |
| st.markdown(""" | |
| #### 🖨️ 功能简介 | |
| 本工具用于将影院的排期数据快速转换为两种形式的打印页: | |
| 1. **修改 LED 屏幕排片表打印**:A4 竖版,详细列出影厅、场次、影片名、拼音缩写和时间范围,方便员工在修改 LED 屏幕时快速查阅和输入。 | |
| 2. **散场时间打印**:A5 竖版,以大字体分栏显示各影厅的散场时间,方便员工在疏散人群和清洁影厅时查阅。 | |
| #### ⬇️ 操作步骤 | |
| 1. **选择数据源**: | |
| * **从文件导入**:导出 `放映时间核对表.xls` 后,点击 "Browse files" 按钮上传。 | |
| * **从 API 获取**:选择日期,点击 "获取排片数据" 按钮,程序将自动登录并拉取最新数据。 | |
| 2. **调整设置 (可选)**:点击上方的 "显示与打印设置" 打开设置面板,根据需要调整字体、大小、格式等。 | |
| 3. **预览与打印**: | |
| * 数据加载成功后,下方会自动生成预览。 | |
| * 默认显示 **PDF 预览**,这是最适合打印的格式。可以直接在预览界面点击 🖨️ 打印按钮。 | |
| """) | |
| print_tab1, print_tab2 = st.tabs(["☁️ 从 API 获取", "📁 从文件导入"]) | |
| with print_tab2: | |
| uploaded_file_print = st.file_uploader("请上传 `放映时间核对表.xls` 文件", type=["xls"], | |
| key="print_file_uploader") | |
| if uploaded_file_print: | |
| with st.spinner("正在处理文件,请稍候..."): | |
| led_data, times_part1, times_part2, date_str = process_file_upload(uploaded_file_print, split_time, | |
| time_adjustment) | |
| st.session_state.processed_print_data = { | |
| "led_data": led_data, | |
| "times_part1": times_part1, | |
| "times_part2": times_part2, | |
| "date_str": date_str | |
| } | |
| if date_str: | |
| st.toast(f"文件处理完成!排期日期:**{date_str}**", icon="🎉") | |
| with print_tab1: | |
| # 修改 value 为当前日期 + 1天 | |
| print_api_date = st.date_input("选择要查询的排片日期", value=datetime.now().date() + timedelta(days=1), | |
| key="print_api_date_picker") | |
| if st.button("获取排片数据", key="fetch_print_api_data", icon="🫵"): | |
| with st.spinner(f"正在获取 {print_api_date} 的排片数据..."): | |
| date_str_api = print_api_date.strftime('%Y-%m-%d') | |
| # 重用 app2.py 中现有的 API 获取逻辑,不需要重复写 fetch_schedule_data | |
| schedule_list, _ = get_api_data_with_token_management(date_str_api) | |
| if schedule_list is not None and len(schedule_list) > 0: | |
| df_api = pd.DataFrame(schedule_list) | |
| df_api.rename(columns={'hallName': 'Hall', 'showStartTime': 'StartTime', | |
| 'showEndTime': 'EndTime', 'movieName': 'Movie'}, inplace=True) | |
| df_api = df_api[['Hall', 'StartTime', 'EndTime', 'Movie']] | |
| led_data, times_part1, times_part2 = process_schedule_df(df_api, print_api_date, split_time, | |
| time_adjustment) | |
| st.session_state.processed_print_data = { | |
| "led_data": led_data, | |
| "times_part1": times_part1, | |
| "times_part2": times_part2, | |
| "date_str": date_str_api | |
| } | |
| st.toast(f"成功获取 {len(schedule_list)} 条排片数据!", icon="✅") | |
| elif schedule_list is not None: | |
| st.warning("成功连接API,但当天没有排片数据。") | |
| st.session_state.processed_print_data = None | |
| else: | |
| st.error("获取API数据失败。") | |
| st.session_state.processed_print_data = None | |
| # --- 显示打印预览结果 --- | |
| if st.session_state.processed_print_data: | |
| data = st.session_state.processed_print_data | |
| led_data, times_part1, times_part2, date_str = data["led_data"], data["times_part1"], data["times_part2"], \ | |
| data["date_str"] | |
| # 显示 LED 屏排片表 | |
| st.header("🖥️ 修改 LED 屏幕排片表打印") | |
| if led_data is not None and not led_data.empty: | |
| led_output = create_print_layout_led(led_data, date_str, led_font_path, generate_png_led) | |
| if led_output: | |
| tabs = ["PDF 预览"] | |
| if 'png' in led_output: tabs.append("PNG 预览") | |
| tab_views = st.tabs(tabs) | |
| with tab_views[0]: | |
| st.markdown(display_pdf(led_output['pdf']), unsafe_allow_html=True) | |
| if 'png' in led_output: | |
| with tab_views[1]: st.image(led_output['png'], use_container_width=True) | |
| else: | |
| st.error("未能成功生成 '修改 LED 屏排片表'。请检查数据源。") | |
| # 显示散场时间快捷打印 | |
| st.header("🔚 散场时间打印") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if times_part1 is not None and not times_part1.empty: | |
| part1_output = create_print_layout_times(times_part1, "A", date_str, times_font_path, | |
| font_size_multiplier, hall_display_format, | |
| generate_png_times) | |
| if part1_output: | |
| tabs1 = [f"白班 (≤ {split_time.strftime('%H:%M')}) PDF 预览"] | |
| if 'png' in part1_output: tabs1.append(f"白班 (≤ {split_time.strftime('%H:%M')}) PNG 预览") | |
| tab_views1 = st.tabs(tabs1) | |
| with tab_views1[0]: | |
| st.markdown(display_pdf(part1_output['pdf']), unsafe_allow_html=True) | |
| if 'png' in part1_output: | |
| with tab_views1[1]: st.image(part1_output['png']) | |
| else: | |
| st.info(f"白班 (≤ {split_time.strftime('%H:%M')}) 没有排期数据。") | |
| with col2: | |
| if times_part2 is not None and not times_part2.empty: | |
| part2_output = create_print_layout_times(times_part2, "C", date_str, times_font_path, | |
| font_size_multiplier, hall_display_format, | |
| generate_png_times) | |
| if part2_output: | |
| tabs2 = [f"晚班 (> {split_time.strftime('%H:%M')}) PDF 预览"] | |
| if 'png' in part2_output: tabs2.append(f"晚班 (> {split_time.strftime('%H:%M')}) PNG 预览") | |
| tab_views2 = st.tabs(tabs2) | |
| with tab_views2[0]: | |
| st.markdown(display_pdf(part2_output['pdf']), unsafe_allow_html=True) | |
| if 'png' in part2_output: | |
| with tab_views2[1]: st.image(part2_output['png']) | |
| else: | |
| st.info(f"晚班 (> {split_time.strftime('%H:%M')}) 没有排期数据。") | |
| else: | |
| st.info("👆 请先从文件或API加载数据以生成预览。") | |
| with tab3: | |
| st.header("TMS 服务器影片内容查询") | |
| if st.button('点击查询 TMS 服务器', key="query_tms", icon="🫵"): | |
| with st.spinner("正在从 TMS 服务器获取数据中..."): | |
| try: | |
| priority_titles, df_for_tms = [], pd.DataFrame() | |
| if not st.session_state.api_df.empty: | |
| df_for_tms = st.session_state.api_df | |
| elif not st.session_state.file_df.empty: | |
| df_for_tms = st.session_state.file_df | |
| if not df_for_tms.empty: | |
| # 优先使用清洗后的名字 | |
| if '影片名称_清理后' in df_for_tms.columns: | |
| priority_titles = df_for_tms['影片名称_清理后'].unique().tolist() | |
| else: | |
| priority_titles = df_for_tms['影片名称'].apply( | |
| lambda x: clean_movie_title(x)).unique().tolist() | |
| halls_data, movie_list_sorted = fetch_and_process_server_movies(priority_titles) | |
| st.toast("TMS 服务器数据获取成功!", icon="🎉") | |
| st.markdown("#### 按影片查看所在影厅") | |
| view2_data = [{'影片名称': item['assert_name'], | |
| '所在影厅': " ".join(sorted([get_circled_number(h) for h in item['halls']])), | |
| '文件名': item['content_name'], '时长(分钟)': format_play_time(item['play_time'])} | |
| for item in movie_list_sorted] | |
| st.dataframe(pd.DataFrame(view2_data), hide_index=True, use_container_width=True) | |
| st.markdown("#### 按影厅查看影片内容") | |
| hall_tabs = st.tabs(list(halls_data.keys())) | |
| for tab, hall_name in zip(hall_tabs, halls_data.keys()): | |
| with tab: | |
| view1_data = [{'影片名称': item['details']['assert_name'], '所在影厅': " ".join( | |
| sorted([get_circled_number(h) for h in item['details']['halls']])), | |
| '文件名': item['content_name'], | |
| '时长(分钟)': format_play_time(item['details']['play_time'])} for item in | |
| halls_data[hall_name]] | |
| st.dataframe(pd.DataFrame(view1_data), hide_index=True, use_container_width=True) | |
| except Exception as e: | |
| st.error(f"查询TMS服务器时出错: {e}") | |
| if __name__ == "__main__": | |
| main() | |