Spaces:
Running
Running
| import streamlit as st | |
| import requests | |
| import time | |
| import json | |
| import os | |
| import threading | |
| import re | |
| import urllib3 | |
| from datetime import datetime, timedelta, timezone, time as dt_time | |
| from collections import deque | |
| from dotenv import load_dotenv | |
| # --- 0. 基础配置 --- | |
| st.set_page_config(page_title="影城空场防空转监控", page_icon="💸", layout="wide") | |
| # 屏蔽 HTTPS 证书警告 (TMS 系统通常使用自签名证书) | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| try: | |
| from streamlit_autorefresh import st_autorefresh | |
| except ImportError: | |
| st_autorefresh = None | |
| # --- 1. 配置与常量 --- | |
| load_dotenv() | |
| # Pushover 配置 | |
| PUSHOVER_TOKEN = os.getenv("PUSHOVER_TOKEN") | |
| PUSHOVER_USER = os.getenv("PUSHOVER_USER") | |
| # 微信配置 | |
| WX_APP_ID = os.getenv("WX_APP_ID") | |
| WX_APP_SECRET = os.getenv("WX_APP_SECRET") | |
| # 支持多个用户,逗号分隔 | |
| WX_USER_OPEN_IDS = os.getenv("WX_USER_OPEN_ID", "").split(',') | |
| WX_TEMPLATE_ID_IDLE = os.getenv("WX_TEMPLATE_ID_IDLE") | |
| WX_TEMPLATE_ID_DAILY = os.getenv("WX_TEMPLATE_ID_DAILY") | |
| # 影城系统配置 | |
| CINEMA_ID = os.getenv("CINEMA_ID") | |
| TMS_APP_SECRET = os.getenv("TMS_APP_SECRET") | |
| TMS_TICKET = os.getenv("TMS_TICKET") | |
| TMS_X_SESSION_ID = os.getenv("TMS_X_SESSION_ID") | |
| TMS_THEATER_ID = os.getenv("TMS_THEATER_ID") # 新增:影院ID | |
| HALL_ID_MAP = { | |
| "1": "79181753", "2": "87350725", "3": "93340931", | |
| "4": "98009245", "5": "02194530", "6": "07183751", | |
| "7": "11314566", "8": "15532561", "9": "20079450" | |
| } | |
| CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| ROOT_DIR = os.path.dirname(CURRENT_DIR) | |
| TOKEN_FILE = os.path.join(ROOT_DIR, 'token_data.json') | |
| # --- 2. 统计与工具类 --- | |
| class DailyStats: | |
| """每日运行数据统计""" | |
| def __init__(self): | |
| self.reset() | |
| self.yesterday_stats = { | |
| 'zero_sessions': 0, | |
| 'triggers': 0, | |
| 'notify_fails': 0, | |
| 'api_fails': 0, | |
| 'date': '未知' | |
| } | |
| def reset(self): | |
| self.zero_sessions = 0 # 0票总场次 | |
| self.triggers = 0 # 触发通知次数 | |
| self.notify_fails = 0 # 发送通知失败次数 | |
| self.api_fails = 0 # API获取失败次数 | |
| def snapshot_as_yesterday(self, date_str): | |
| self.yesterday_stats = { | |
| 'zero_sessions': self.zero_sessions, | |
| 'triggers': self.triggers, | |
| 'notify_fails': self.notify_fails, | |
| 'api_fails': self.api_fails, | |
| 'date': date_str | |
| } | |
| self.reset() | |
| # 全局统计实例 | |
| daily_stats = DailyStats() | |
| def get_beijing_now(): | |
| utc_now = datetime.now(timezone.utc) | |
| return utc_now.astimezone(timezone(timedelta(hours=8))).replace(tzinfo=None) | |
| def get_business_date(): | |
| now = get_beijing_now() | |
| if now.time() < dt_time(6, 0): | |
| return (now - timedelta(days=1)).strftime("%Y-%m-%d") | |
| return now.strftime("%Y-%m-%d") | |
| def parse_show_datetime(date_str, time_str): | |
| try: | |
| show_t = datetime.strptime(time_str, "%H:%M").time() | |
| base_date = datetime.strptime(date_str, "%Y-%m-%d") | |
| if show_t < dt_time(6, 0): | |
| base_date += timedelta(days=1) | |
| return datetime.combine(base_date.date(), show_t) | |
| except: | |
| return None | |
| def extract_hall_number_raw(hall_name): | |
| """仅提取数字ID,用于API映射""" | |
| match = re.search(r'(\d+)', str(hall_name)) | |
| return match.group(1) if match else str(hall_name) | |
| def format_hall_name(hall_name): | |
| """格式化影厅名称:【和成天下1号厅】 -> 1号厅""" | |
| match = re.search(r'(\d+)号', str(hall_name)) | |
| if match: | |
| return f"{match.group(1)}号厅" | |
| return str(hall_name) | |
| # --- 3. 微信推送模块 (含自动重试) --- | |
| class WeChatPusher: | |
| def __init__(self, app_id, app_secret): | |
| self.app_id = app_id | |
| self.app_secret = app_secret | |
| self.token = None | |
| self.token_expires_time = 0 | |
| def get_access_token(self, force_refresh=False): | |
| """获取并缓存微信 Access Token | |
| :param force_refresh: 是否强制刷新 Token | |
| """ | |
| if not force_refresh and self.token and datetime.now().timestamp() < self.token_expires_time: | |
| return self.token | |
| if not self.app_id or not self.app_secret: | |
| return None | |
| url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.app_id}&secret={self.app_secret}" | |
| try: | |
| resp = requests.get(url, timeout=10) | |
| data = resp.json() | |
| if 'access_token' in data: | |
| self.token = data['access_token'] | |
| # 提前 200 秒过期,防止临界点问题 | |
| self.token_expires_time = datetime.now().timestamp() + data['expires_in'] - 200 | |
| return self.token | |
| else: | |
| print(f"❌ 微信 Token 获取失败: {data}") | |
| except Exception as e: | |
| print(f"❌ 微信 API 请求异常: {e}") | |
| return None | |
| def send_template_message(self, user_ids, template_id, data_map): | |
| """发送模板消息给多个用户 (带 40001 错误重试机制)""" | |
| # 组装微信数据格式 | |
| formatted_data = {} | |
| for key, value in data_map.items(): | |
| formatted_data[key] = {"value": value, "color": "#173177"} | |
| # 清洗 user_ids | |
| valid_ids = [uid.strip() for uid in user_ids if uid and uid.strip()] | |
| if not valid_ids: | |
| return | |
| for user_id in valid_ids: | |
| # 每个用户最多尝试 2 次(1次正常,1次刷新 Token 后重试) | |
| for attempt in range(2): | |
| # 如果是重试(attempt > 0),强制刷新 Token | |
| token = self.get_access_token(force_refresh=(attempt > 0)) | |
| if not token: | |
| daily_stats.notify_fails += 1 | |
| break # Token 都拿不到,不用重试了 | |
| url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={token}" | |
| payload = { | |
| "touser": user_id, | |
| "template_id": template_id, | |
| "data": formatted_data | |
| } | |
| try: | |
| resp = requests.post(url, json=payload, timeout=10) | |
| result = resp.json() | |
| errcode = result.get("errcode") | |
| if errcode == 0: | |
| break # 成功 | |
| elif errcode in [40001, 40014, 42001]: | |
| print(f"🔄 微信 Token 失效 ({errcode}),正在刷新重试 [{user_id}]...") | |
| continue # 强制刷新 Token | |
| else: | |
| print(f"❌ 微信发送失败 [{user_id}]: {result}") | |
| daily_stats.notify_fails += 1 | |
| break # 其他错误,不重试 | |
| except Exception as e: | |
| print(f"❌ 微信发送异常 [{user_id}]: {e}") | |
| daily_stats.notify_fails += 1 | |
| break | |
| # --- 4. 通知管理系统 --- | |
| class NotificationManager: | |
| def __init__(self): | |
| self.wx_pusher = WeChatPusher(WX_APP_ID, WX_APP_SECRET) | |
| def send_pushover(self, title, message, priority=0): | |
| if not PUSHOVER_TOKEN or not PUSHOVER_USER: | |
| return | |
| url = "https://api.pushover.net/1/messages.json" | |
| data = { | |
| "token": PUSHOVER_TOKEN, | |
| "user": PUSHOVER_USER, | |
| "title": title, | |
| "message": message, | |
| "priority": priority | |
| } | |
| try: | |
| requests.post(url, data=data, timeout=10) | |
| except Exception as e: | |
| print(f"❌ Pushover 发送异常: {e}") | |
| daily_stats.notify_fails += 1 | |
| def send_idle_alert(self, hall_name, movie_name, show_time, count_info, remark_text=""): | |
| """发送空转告警 (Pushover + WeChat)""" | |
| daily_stats.triggers += 1 | |
| # 1. Pushover 内容 | |
| title = f"发现 {hall_name} 空场“空转”!" | |
| msg = ( | |
| f"{hall_name} {show_time}《{movie_name}》\n" | |
| f"无人购票但服务器有排程,未撤场,或许正在播放,请检查。\n" | |
| f"{count_info},{remark_text}" | |
| ) | |
| self.send_pushover(title, msg, priority=1) | |
| # 2. 微信内容 | |
| if WX_TEMPLATE_ID_IDLE: | |
| data_map = { | |
| "first": f"发现 {hall_name} 空场“空转”!\n{hall_name} {show_time}《{movie_name}》\n无人购票但服务器有排程,未撤场,或许正在播放,请检查。\n", | |
| "keyword1": hall_name, | |
| "keyword2": movie_name, | |
| "keyword3": show_time, | |
| "keyword4": count_info, | |
| "remark": f"\n{remark_text}" | |
| } | |
| self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_IDLE, data_map) | |
| def send_daily_report(self, today_date_cn): | |
| """发送每日报告 (Pushover + WeChat)""" | |
| y_stats = daily_stats.yesterday_stats | |
| # 统计数据文本 | |
| stats_text = ( | |
| f"0票场次({y_stats['zero_sessions']}) " | |
| f"触发({y_stats['triggers']}) " | |
| f"s失败({y_stats['notify_fails']}) " | |
| f"g失败({y_stats['api_fails']})" | |
| ) | |
| # 1. Pushover | |
| title = "影城“空转”检查服务就绪" | |
| msg = ( | |
| f"{today_date_cn},今日排片数据已加载,开始智能检查。\n\n" | |
| f"昨日情况:\n" | |
| f"0票总场次:{y_stats['zero_sessions']}\n" | |
| f"触发通知次数:{y_stats['triggers']}\n" | |
| f"发送通知失败次数:{y_stats['notify_fails']}\n" | |
| f"API获取失败次数:{y_stats['api_fails']}\n" | |
| ) | |
| self.send_pushover(title, msg, priority=0) | |
| # 2. 微信 | |
| if WX_TEMPLATE_ID_DAILY: | |
| data_map = { | |
| "first": f"影城“空转”检查服务就绪\n{today_date_cn},今日排片数据已加载,开始智能检查。\n", | |
| "keyword1": "影城“空转”检查服务", | |
| "keyword2": today_date_cn, | |
| "keyword3": stats_text, | |
| "remark": "\n服务正常运行中。" | |
| } | |
| self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_DAILY, data_map) | |
| notifier = NotificationManager() | |
| # --- 5. API 管理模块 --- | |
| class TicketAPIManager: | |
| def __init__(self, logger_func): | |
| self.logger = logger_func | |
| self.last_login_fail = 0 | |
| def load_token(self): | |
| if os.path.exists(TOKEN_FILE): | |
| try: | |
| with open(TOKEN_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f).get('token') | |
| except: | |
| pass | |
| return None | |
| def login(self): | |
| if time.time() - self.last_login_fail < 300: return None | |
| username = os.getenv("CINEMA_USERNAME") | |
| password = os.getenv("CINEMA_PASSWORD") | |
| res_code = os.getenv("CINEMA_RES_CODE") | |
| device_id = os.getenv("CINEMA_DEVICE_ID") | |
| if not all([username, password, res_code]): | |
| self.logger("❌ 票务系统环境变量缺失") | |
| daily_stats.api_fails += 1 | |
| return None | |
| try: | |
| session = requests.Session() | |
| login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' | |
| login_data = {'username': username, 'password': password, 'type': '1', 'resCode': res_code, | |
| 'deviceid': device_id, 'dtype': 'ios'} | |
| session.post(login_url, data=login_data, timeout=15) | |
| resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10) | |
| info = resp.json() | |
| if info.get("success") and info.get("data", {}).get("token"): | |
| with open(TOKEN_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(info['data'], f) | |
| return info['data']['token'] | |
| except Exception as e: | |
| self.last_login_fail = time.time() | |
| self.logger(f"❌ 票务登录失败: {e}") | |
| daily_stats.api_fails += 1 | |
| return None | |
| def fetch_schedule(self, date_str): | |
| token = self.load_token() | |
| if not token: | |
| token = self.login() | |
| if not token: | |
| daily_stats.api_fails += 1 | |
| return None | |
| url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' | |
| params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)} | |
| headers = {'User-Agent': 'Mozilla/5.0'} | |
| try: | |
| res = requests.get(url, params=params, headers=headers, timeout=10) | |
| data = res.json() | |
| if data.get('code') == 1: | |
| return data.get('data', []) | |
| elif data.get('code') == 500: | |
| token = self.login() | |
| if token: | |
| params['token'] = token | |
| res = requests.get(url, params=params, headers=headers, timeout=10) | |
| return res.json().get('data', []) | |
| except Exception as e: | |
| self.logger(f"⚠️ 票务API异常: {e}") | |
| daily_stats.api_fails += 1 | |
| return None | |
| class TMSAPIManager: | |
| def __init__(self, logger_func): | |
| self.logger = logger_func | |
| self.auth_token = None | |
| self.last_token_time = 0 | |
| def get_token(self): | |
| if self.auth_token and (time.time() - self.last_token_time < 1800): | |
| return self.auth_token | |
| if not all([TMS_APP_SECRET, TMS_TICKET]): return None | |
| # 获取 OA 系统 Token | |
| url = f'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket={TMS_TICKET}' | |
| headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0'} | |
| payload = {'appId': 'hd', 'appSecret': TMS_APP_SECRET, 'timeStamp': int(time.time() * 1000)} | |
| try: | |
| res = requests.post(url, json=payload, headers=headers, timeout=10) | |
| data = res.json() | |
| if data.get('error_code') == '0000': | |
| self.auth_token = data['param'] | |
| self.last_token_time = time.time() | |
| return self.auth_token | |
| except: | |
| pass | |
| daily_stats.api_fails += 1 | |
| return None | |
| def fetch_hall_schedule_list(self, hall_number): | |
| token = self.get_token() | |
| if not token: return None | |
| tms_hall_id = HALL_ID_MAP.get(str(hall_number)) | |
| if not tms_hall_id: return None | |
| url = 'https://tms.hengdianfilm.com/cinema-api/cinema/schedule/server/list' | |
| session_id = TMS_X_SESSION_ID or '' | |
| cookies = {'JSESSIONID': session_id} | |
| headers = { | |
| 'accept': 'application/json, text/javascript, */*; q=0.01', | |
| 'content-type': 'application/json; charset=UTF-8', | |
| 'origin': 'https://tms.hengdianfilm.com', | |
| 'referer': f'https://tms.hengdianfilm.com/hd/index?CinemaMonitorEdit&HALL_ID%3D{tms_hall_id}', | |
| 'token': token, | |
| 'user-agent': 'Mozilla/5.0', | |
| 'x-requested-with': 'XMLHttpRequest', | |
| 'x-sessionid': session_id | |
| } | |
| params = {'token': 'hd', 'murl': 'CinemaMonitor'} | |
| json_data = { | |
| 'THEATER_ID': 38205954, | |
| 'STATE': 0, | |
| 'HALL_ID': tms_hall_id, | |
| 'START_TIME': int(time.time() * 1000), | |
| 'PAGE_CAPACITY': 20, | |
| 'PAGE_INDEX': 1, | |
| } | |
| try: | |
| res = requests.post(url, params=params, cookies=cookies, headers=headers, json=json_data, timeout=10, | |
| verify=False) | |
| data = res.json() | |
| if data.get("RSPCD") == "000000": | |
| return data.get("BODY", {}).get("LIST", []) | |
| else: | |
| return None | |
| except Exception as e: | |
| self.logger(f"⚠️ TMS 请求异常: {e}") | |
| daily_stats.api_fails += 1 | |
| return None | |
| def trigger_schedule_refresh(self): | |
| """触发 TMS 排期刷新""" | |
| token = self.get_token() | |
| if not token or not TMS_THEATER_ID: return | |
| try: | |
| # 动态生成时间戳 | |
| now_tm = time.localtime() | |
| today_midnight = time.mktime((now_tm.tm_year, now_tm.tm_mon, now_tm.tm_mday, 0, 0, 0, 0, 0, 0)) | |
| start_date = int(today_midnight * 1000) | |
| end_date = start_date + (24 * 60 * 60 * 1000) | |
| url = 'https://tms.hengdianfilm.com/cinema-api/tms/cmd/show/schedule' | |
| session_id = TMS_X_SESSION_ID or '' | |
| headers = { | |
| 'accept': 'application/json, text/javascript, */*; q=0.01', | |
| 'content-type': 'application/json; charset=UTF-8', | |
| 'origin': 'https://tms.hengdianfilm.com', | |
| 'referer': f'https://tms.hengdianfilm.com/hd/index?Scheduling&THEATER_ID={TMS_THEATER_ID}&DATE={start_date}&PAGE_CAPACITY=20&PAGE_INDEX=1', | |
| 'token': token, # 这里的 token 由 get_token() 动态获取 | |
| 'user-agent': 'Mozilla/5.0', | |
| 'x-requested-with': 'XMLHttpRequest', | |
| 'x-sessionid': session_id | |
| } | |
| params = { | |
| 'token': 'hd', | |
| 'murl': 'Scheduling', | |
| } | |
| cookies = {'JSESSIONID': session_id} | |
| json_data = { | |
| 'THEATER_LIST': [ | |
| { | |
| 'THEATER_ID': int(TMS_THEATER_ID), | |
| 'START_DATE': start_date, | |
| 'END_DATE': end_date, | |
| }, | |
| ], | |
| } | |
| # 发送请求,非阻塞,忽略证书错误 | |
| requests.post(url, params=params, headers=headers, cookies=cookies, json=json_data, timeout=5, verify=False) | |
| self.logger("🔄 TMS 排期刷新指令已发送") | |
| except Exception as e: | |
| # 刷新失败不影响主程序 | |
| self.logger(f"⚠️ TMS 排期刷新失败: {e}") | |
| # --- 6. 智能监控主逻辑 --- | |
| class PlaybackMonitor: | |
| def __init__(self): | |
| self.logs = deque(maxlen=50) | |
| self.status_text = "初始化中..." | |
| self.next_wakeup_str = "--:--:--" | |
| self.active_monitors = [] | |
| self.cleared_sessions = set() | |
| self.processed_checks = set() | |
| self.ticket_api = TicketAPIManager(self.log) | |
| self.tms_api = TMSAPIManager(self.log) | |
| self.current_business_date = None | |
| self.daily_schedule_cache = None | |
| self.thread = threading.Thread(target=self._run_loop, daemon=True) | |
| self.thread.start() | |
| def log(self, msg): | |
| ts = get_beijing_now().strftime("%H:%M:%S") | |
| entry = f"[{ts}] {msg}" | |
| self.logs.appendleft(entry) | |
| print(entry) | |
| def _get_target_check_points(self, schedule_list, business_date): | |
| check_points = [] | |
| zero_count = 0 | |
| for item in schedule_list: | |
| sold = int(item.get('soldTicketNum', 0)) | |
| if sold == 0: | |
| zero_count += 1 | |
| start_str = item.get('showStartTime') | |
| if not start_str: continue | |
| show_dt = parse_show_datetime(business_date, start_str) | |
| if not show_dt: continue | |
| for i in range(3): | |
| check_time = show_dt + timedelta(minutes=i * 5) | |
| check_points.append({ | |
| 'time': check_time, | |
| 'type': 'CHECK', | |
| 'check_index': i, | |
| 'data': item | |
| }) | |
| daily_stats.zero_sessions = zero_count | |
| check_points.sort(key=lambda x: x['time']) | |
| return check_points | |
| def _run_loop(self): | |
| self.log("🚀 智能防空转监控已启动 (WeChat+Pushover)") | |
| while True: | |
| try: | |
| now = get_beijing_now() | |
| biz_date = get_business_date() | |
| # --- 初始化与日期变更 --- | |
| need_refresh = False | |
| if self.daily_schedule_cache is None: | |
| need_refresh = True | |
| self.log("🆕 初始化:获取全天排片...") | |
| self.current_business_date = biz_date | |
| elif self.current_business_date != biz_date: | |
| if now.time() >= dt_time(9, 0): | |
| need_refresh = True | |
| daily_stats.snapshot_as_yesterday(self.current_business_date) | |
| self.cleared_sessions.clear() | |
| self.processed_checks.clear() | |
| self.log(f"🌞 新营业日 ({biz_date}):刷新排片...") | |
| if need_refresh: | |
| schedule = self.ticket_api.fetch_schedule(biz_date) | |
| if schedule: | |
| self.daily_schedule_cache = schedule | |
| self.current_business_date = biz_date | |
| self.log(f"✅ 排片已更新,共 {len(schedule)} 场。") | |
| try: | |
| d_obj = datetime.strptime(biz_date, "%Y-%m-%d") | |
| date_cn = f"{d_obj.year}年{d_obj.month}月{d_obj.day}日" | |
| except: | |
| date_cn = biz_date | |
| notifier.send_daily_report(date_cn) | |
| else: | |
| self.log("⚠️ 获取排片失败,5分钟后重试") | |
| daily_stats.api_fails += 1 | |
| time.sleep(300) | |
| continue | |
| # --- 任务计算 --- | |
| check_points = self._get_target_check_points(self.daily_schedule_cache, biz_date) | |
| next_target = None | |
| for cp in check_points: | |
| hall_id = extract_hall_number_raw(cp['data']['hallName']) | |
| start_str = cp['data']['showStartTime'] | |
| check_idx = cp['check_index'] | |
| dedup_key = f"{biz_date}_{hall_id}_{start_str}_{check_idx}" | |
| if dedup_key in self.processed_checks: | |
| continue | |
| if cp['time'] > (now - timedelta(seconds=30)): | |
| next_target = cp | |
| break | |
| if next_target: | |
| target_time = next_target['time'] | |
| # 09:00 拦截器 | |
| nine_am_today = datetime.combine(now.date(), dt_time(9, 0)) | |
| force_wake_for_daily_reset = False | |
| if now < nine_am_today and target_time > nine_am_today: | |
| target_time = nine_am_today | |
| force_wake_for_daily_reset = True | |
| sleep_seconds = max(0, (target_time - now).total_seconds()) | |
| self.status_text = f"💤 休眠中" | |
| self.next_wakeup_str = target_time.strftime('%H:%M:%S') | |
| if force_wake_for_daily_reset: | |
| self.active_monitors = ["等待 09:00 日报刷新..."] | |
| else: | |
| raw_hall = next_target['data']['hallName'] | |
| clean_hall = format_hall_name(raw_hall) | |
| self.active_monitors = [ | |
| f"下个任务: {clean_hall} {next_target['data']['showStartTime']} (第{next_target['check_index'] + 1}次检查)"] | |
| self.log(f"💤 智能休眠 {int(sleep_seconds)}秒,将在 {self.next_wakeup_str} 唤醒...") | |
| time.sleep(sleep_seconds) | |
| if force_wake_for_daily_reset: | |
| continue | |
| # --- 正常唤醒检查 --- | |
| self.status_text = "🔥 正在执行检查..." | |
| self.log("⏰ 唤醒!正在同步最新票务数据...") | |
| latest_schedule = self.ticket_api.fetch_schedule(biz_date) | |
| if latest_schedule: | |
| self.daily_schedule_cache = latest_schedule | |
| else: | |
| self.log("⚠️ 同步排片失败,使用旧数据") | |
| daily_stats.api_fails += 1 | |
| latest_schedule = self.daily_schedule_cache | |
| check_now = get_beijing_now() | |
| current_targets = [] | |
| latest_check_points = self._get_target_check_points(latest_schedule, biz_date) | |
| for cp in latest_check_points: | |
| time_diff = abs((cp['time'] - check_now).total_seconds()) | |
| if time_diff < 120: | |
| current_targets.append(cp) | |
| if current_targets: | |
| self._process_targets(current_targets, biz_date) | |
| time.sleep(5) | |
| else: | |
| self.status_text = "🌙 今日监控结束" | |
| self.active_monitors = [] | |
| tmr_9am = datetime.combine(datetime.strptime(biz_date, "%Y-%m-%d").date() + timedelta(days=1), | |
| dt_time(9, 0)) | |
| seconds_to_tmr = (tmr_9am - now).total_seconds() | |
| if seconds_to_tmr > 3600: | |
| self.log("暂无目标,休眠 1 小时...") | |
| time.sleep(3600) | |
| else: | |
| self.log(f"休眠至明日 09:00...") | |
| time.sleep(seconds_to_tmr) | |
| except Exception as e: | |
| self.log(f"❌ 主循环异常: {e}") | |
| time.sleep(60) | |
| def _process_targets(self, targets, biz_date): | |
| halls_to_check = {} | |
| for t in targets: | |
| hall_id = extract_hall_number_raw(t['data']['hallName']) | |
| start_str = t['data']['showStartTime'] | |
| check_idx = t['check_index'] | |
| dedup_key = f"{biz_date}_{hall_id}_{start_str}_{check_idx}" | |
| if dedup_key in self.processed_checks: | |
| continue | |
| if hall_id not in halls_to_check: | |
| halls_to_check[hall_id] = [] | |
| halls_to_check[hall_id].append(t) | |
| if not halls_to_check: return | |
| self.log(f"🔍 核查 {len(halls_to_check)} 个影厅 TMS 状态...") | |
| display_results = [] | |
| for hall_id, target_list in halls_to_check.items(): | |
| all_cleared = True | |
| for target in target_list: | |
| start_str = target['data'].get('showStartTime') | |
| key = f"{biz_date}_{hall_id}_{start_str}" | |
| if key not in self.cleared_sessions: | |
| all_cleared = False | |
| break | |
| if all_cleared: | |
| for target in target_list: | |
| dedup_key = f"{biz_date}_{hall_id}_{target['data']['showStartTime']}_{target['check_index']}" | |
| self.processed_checks.add(dedup_key) | |
| clean_name = format_hall_name(target_list[0]['data']['hallName']) | |
| display_results.append(f"{clean_name} ✅ [已跳过 (已确认撤场)]") | |
| continue | |
| tms_list = self.tms_api.fetch_hall_schedule_list(hall_id) | |
| for target in target_list: | |
| session = target['data'] | |
| check_idx = target['check_index'] | |
| hall_name_raw = session.get('hallName') | |
| movie_name = session.get('movieName') | |
| ticket_start_str = session.get('showStartTime') | |
| ticket_start_dt = parse_show_datetime(biz_date, ticket_start_str) | |
| unique_key = f"{biz_date}_{hall_id}_{ticket_start_str}" | |
| dedup_key = f"{biz_date}_{hall_id}_{ticket_start_str}_{check_idx}" | |
| if dedup_key in self.processed_checks: continue | |
| if unique_key in self.cleared_sessions: | |
| self.processed_checks.add(dedup_key) | |
| continue | |
| hall_name_clean = format_hall_name(hall_name_raw) | |
| status_desc = f"{hall_name_clean} {ticket_start_str} (第{check_idx + 1}/3次)" | |
| if tms_list is not None: | |
| match_found = False | |
| for tms_item in tms_list: | |
| t_start_str = tms_item.get('START_TIME') | |
| if not t_start_str: continue | |
| t_start_dt = parse_show_datetime(biz_date, t_start_str) | |
| if not t_start_dt: continue | |
| if abs((t_start_dt - ticket_start_dt).total_seconds()) < 1800: | |
| match_found = True | |
| break | |
| if match_found: | |
| status_desc += " ⚠️ [空转! TMS未撤]" | |
| self.log(f"🚨 发现空转: {hall_name_clean}《{movie_name}》") | |
| # 调用新的告警接口 | |
| count_info = f"第 {check_idx + 1}/3 次检查发现" | |
| remark = "仅停止播放未撤排程下次检查依然会触发通知。" | |
| notifier.send_idle_alert(hall_name_clean, movie_name, ticket_start_str, count_info, remark) | |
| # --- NEW: 第一次发现后,触发TMS刷新 --- | |
| if check_idx == 0: | |
| self.tms_api.trigger_schedule_refresh() | |
| else: | |
| status_desc += " ✅ [正常 (TMS无排期)]" | |
| self.cleared_sessions.add(unique_key) | |
| else: | |
| status_desc += " ❓ [TMS查询失败]" | |
| daily_stats.api_fails += 1 | |
| self.processed_checks.add(dedup_key) | |
| display_results.append(status_desc) | |
| self.active_monitors = display_results | |
| # --- 7. Streamlit 前端 --- | |
| def get_monitor(): | |
| return PlaybackMonitor() | |
| def main(): | |
| monitor = get_monitor() | |
| if st_autorefresh: | |
| st_autorefresh(interval=30 * 1000, key="pb_refresh") | |
| st.title("💸 影厅空场防空转监控") | |
| remaining_zero_count = 0 | |
| remaining_total_count = 0 | |
| if monitor.daily_schedule_cache: | |
| now = get_beijing_now() | |
| biz_date = monitor.current_business_date or get_business_date() | |
| for item in monitor.daily_schedule_cache: | |
| start_str = item.get('showStartTime') | |
| if not start_str: continue | |
| show_dt = parse_show_datetime(biz_date, start_str) | |
| if show_dt and show_dt > now: | |
| remaining_total_count += 1 | |
| if int(item.get('soldTicketNum', 0)) == 0: | |
| remaining_zero_count += 1 | |
| c1, c2, c3 = st.columns(3) | |
| with c1: | |
| st.metric("当前状态", monitor.status_text) | |
| with c2: | |
| st.metric("下次唤醒", monitor.next_wakeup_str) | |
| with c3: | |
| st.metric("剩余 0 票场次 / 剩余总场次", f"{remaining_zero_count} / {remaining_total_count}") | |
| st.divider() | |
| col_logs, col_mon = st.columns([3, 2]) | |
| with col_logs: | |
| st.subheader("📜 运行日志") | |
| st.text_area("Logs", "\n".join(list(monitor.logs)), height=450, disabled=True) | |
| with col_mon: | |
| st.subheader("🕵️ 最近一次检查结果") | |
| if monitor.active_monitors: | |
| for m in monitor.active_monitors: | |
| if "⚠️" in m: | |
| st.error(m) | |
| elif "✅" in m: | |
| st.success(m) | |
| else: | |
| st.info(m) | |
| else: | |
| st.caption("等待下一次唤醒...") | |
| if __name__ == "__main__": | |
| main() |