import streamlit as st import requests import time import json import os import threading import re import urllib3 from datetime import datetime, timedelta, timezone, time as dt_time from collections import deque from dotenv import load_dotenv # --- 0. 基础配置 --- st.set_page_config(page_title="影城空场防空转监控", page_icon="💸", layout="wide") # 屏蔽 HTTPS 证书警告 (TMS 系统通常使用自签名证书) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) try: from streamlit_autorefresh import st_autorefresh except ImportError: st_autorefresh = None # --- 1. 配置与常量 --- load_dotenv() # Pushover 配置 PUSHOVER_TOKEN = os.getenv("PUSHOVER_TOKEN") PUSHOVER_USER = os.getenv("PUSHOVER_USER") # 微信配置 WX_APP_ID = os.getenv("WX_APP_ID") WX_APP_SECRET = os.getenv("WX_APP_SECRET") # 支持多个用户,逗号分隔 WX_USER_OPEN_IDS = os.getenv("WX_USER_OPEN_ID", "").split(',') WX_TEMPLATE_ID_IDLE = os.getenv("WX_TEMPLATE_ID_IDLE") WX_TEMPLATE_ID_DAILY = os.getenv("WX_TEMPLATE_ID_DAILY") # 影城系统配置 CINEMA_ID = os.getenv("CINEMA_ID") TMS_APP_SECRET = os.getenv("TMS_APP_SECRET") TMS_TICKET = os.getenv("TMS_TICKET") TMS_X_SESSION_ID = os.getenv("TMS_X_SESSION_ID") TMS_THEATER_ID = os.getenv("TMS_THEATER_ID") # 新增:影院ID HALL_ID_MAP = { "1": "79181753", "2": "87350725", "3": "93340931", "4": "98009245", "5": "02194530", "6": "07183751", "7": "11314566", "8": "15532561", "9": "20079450" } CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(CURRENT_DIR) TOKEN_FILE = os.path.join(ROOT_DIR, 'token_data.json') # --- 2. 统计与工具类 --- class DailyStats: """每日运行数据统计""" def __init__(self): self.reset() self.yesterday_stats = { 'zero_sessions': 0, 'triggers': 0, 'notify_fails': 0, 'api_fails': 0, 'date': '未知' } def reset(self): self.zero_sessions = 0 # 0票总场次 self.triggers = 0 # 触发通知次数 self.notify_fails = 0 # 发送通知失败次数 self.api_fails = 0 # API获取失败次数 def snapshot_as_yesterday(self, date_str): self.yesterday_stats = { 'zero_sessions': self.zero_sessions, 'triggers': self.triggers, 'notify_fails': self.notify_fails, 'api_fails': self.api_fails, 'date': date_str } self.reset() # 全局统计实例 daily_stats = DailyStats() def get_beijing_now(): utc_now = datetime.now(timezone.utc) return utc_now.astimezone(timezone(timedelta(hours=8))).replace(tzinfo=None) def get_business_date(): now = get_beijing_now() if now.time() < dt_time(6, 0): return (now - timedelta(days=1)).strftime("%Y-%m-%d") return now.strftime("%Y-%m-%d") def parse_show_datetime(date_str, time_str): try: show_t = datetime.strptime(time_str, "%H:%M").time() base_date = datetime.strptime(date_str, "%Y-%m-%d") if show_t < dt_time(6, 0): base_date += timedelta(days=1) return datetime.combine(base_date.date(), show_t) except: return None def extract_hall_number_raw(hall_name): """仅提取数字ID,用于API映射""" match = re.search(r'(\d+)', str(hall_name)) return match.group(1) if match else str(hall_name) def format_hall_name(hall_name): """格式化影厅名称:【和成天下1号厅】 -> 1号厅""" match = re.search(r'(\d+)号', str(hall_name)) if match: return f"{match.group(1)}号厅" return str(hall_name) # --- 3. 微信推送模块 (含自动重试) --- class WeChatPusher: def __init__(self, app_id, app_secret): self.app_id = app_id self.app_secret = app_secret self.token = None self.token_expires_time = 0 def get_access_token(self, force_refresh=False): """获取并缓存微信 Access Token :param force_refresh: 是否强制刷新 Token """ if not force_refresh and self.token and datetime.now().timestamp() < self.token_expires_time: return self.token if not self.app_id or not self.app_secret: return None url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.app_id}&secret={self.app_secret}" try: resp = requests.get(url, timeout=10) data = resp.json() if 'access_token' in data: self.token = data['access_token'] # 提前 200 秒过期,防止临界点问题 self.token_expires_time = datetime.now().timestamp() + data['expires_in'] - 200 return self.token else: print(f"❌ 微信 Token 获取失败: {data}") except Exception as e: print(f"❌ 微信 API 请求异常: {e}") return None def send_template_message(self, user_ids, template_id, data_map): """发送模板消息给多个用户 (带 40001 错误重试机制)""" # 组装微信数据格式 formatted_data = {} for key, value in data_map.items(): formatted_data[key] = {"value": value, "color": "#173177"} # 清洗 user_ids valid_ids = [uid.strip() for uid in user_ids if uid and uid.strip()] if not valid_ids: return for user_id in valid_ids: # 每个用户最多尝试 2 次(1次正常,1次刷新 Token 后重试) for attempt in range(2): # 如果是重试(attempt > 0),强制刷新 Token token = self.get_access_token(force_refresh=(attempt > 0)) if not token: daily_stats.notify_fails += 1 break # Token 都拿不到,不用重试了 url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={token}" payload = { "touser": user_id, "template_id": template_id, "data": formatted_data } try: resp = requests.post(url, json=payload, timeout=10) result = resp.json() errcode = result.get("errcode") if errcode == 0: break # 成功 elif errcode in [40001, 40014, 42001]: print(f"🔄 微信 Token 失效 ({errcode}),正在刷新重试 [{user_id}]...") continue # 强制刷新 Token else: print(f"❌ 微信发送失败 [{user_id}]: {result}") daily_stats.notify_fails += 1 break # 其他错误,不重试 except Exception as e: print(f"❌ 微信发送异常 [{user_id}]: {e}") daily_stats.notify_fails += 1 break # --- 4. 通知管理系统 --- class NotificationManager: def __init__(self): self.wx_pusher = WeChatPusher(WX_APP_ID, WX_APP_SECRET) def send_pushover(self, title, message, priority=0): if not PUSHOVER_TOKEN or not PUSHOVER_USER: return url = "https://api.pushover.net/1/messages.json" data = { "token": PUSHOVER_TOKEN, "user": PUSHOVER_USER, "title": title, "message": message, "priority": priority } try: requests.post(url, data=data, timeout=10) except Exception as e: print(f"❌ Pushover 发送异常: {e}") daily_stats.notify_fails += 1 def send_idle_alert(self, hall_name, movie_name, show_time, count_info, remark_text=""): """发送空转告警 (Pushover + WeChat)""" daily_stats.triggers += 1 # 1. Pushover 内容 title = f"发现 {hall_name} 空场“空转”!" msg = ( f"{hall_name} {show_time}《{movie_name}》\n" f"无人购票但服务器有排程,未撤场,或许正在播放,请检查。\n" f"{count_info},{remark_text}" ) self.send_pushover(title, msg, priority=1) # 2. 微信内容 if WX_TEMPLATE_ID_IDLE: data_map = { "first": f"发现 {hall_name} 空场“空转”!\n{hall_name} {show_time}《{movie_name}》\n无人购票但服务器有排程,未撤场,或许正在播放,请检查。\n", "keyword1": hall_name, "keyword2": movie_name, "keyword3": show_time, "keyword4": count_info, "remark": f"\n{remark_text}" } self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_IDLE, data_map) def send_daily_report(self, today_date_cn): """发送每日报告 (Pushover + WeChat)""" y_stats = daily_stats.yesterday_stats # 统计数据文本 stats_text = ( f"0票场次({y_stats['zero_sessions']}) " f"触发({y_stats['triggers']}) " f"s失败({y_stats['notify_fails']}) " f"g失败({y_stats['api_fails']})" ) # 1. Pushover title = "影城“空转”检查服务就绪" msg = ( f"{today_date_cn},今日排片数据已加载,开始智能检查。\n\n" f"昨日情况:\n" f"0票总场次:{y_stats['zero_sessions']}\n" f"触发通知次数:{y_stats['triggers']}\n" f"发送通知失败次数:{y_stats['notify_fails']}\n" f"API获取失败次数:{y_stats['api_fails']}\n" ) self.send_pushover(title, msg, priority=0) # 2. 微信 if WX_TEMPLATE_ID_DAILY: data_map = { "first": f"影城“空转”检查服务就绪\n{today_date_cn},今日排片数据已加载,开始智能检查。\n", "keyword1": "影城“空转”检查服务", "keyword2": today_date_cn, "keyword3": stats_text, "remark": "\n服务正常运行中。" } self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_DAILY, data_map) notifier = NotificationManager() # --- 5. API 管理模块 --- class TicketAPIManager: def __init__(self, logger_func): self.logger = logger_func self.last_login_fail = 0 def load_token(self): if os.path.exists(TOKEN_FILE): try: with open(TOKEN_FILE, 'r', encoding='utf-8') as f: return json.load(f).get('token') except: pass return None def login(self): if time.time() - self.last_login_fail < 300: return None username = os.getenv("CINEMA_USERNAME") password = os.getenv("CINEMA_PASSWORD") res_code = os.getenv("CINEMA_RES_CODE") device_id = os.getenv("CINEMA_DEVICE_ID") if not all([username, password, res_code]): self.logger("❌ 票务系统环境变量缺失") daily_stats.api_fails += 1 return None try: session = requests.Session() login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' login_data = {'username': username, 'password': password, 'type': '1', 'resCode': res_code, 'deviceid': device_id, 'dtype': 'ios'} session.post(login_url, data=login_data, timeout=15) resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10) info = resp.json() if info.get("success") and info.get("data", {}).get("token"): with open(TOKEN_FILE, 'w', encoding='utf-8') as f: json.dump(info['data'], f) return info['data']['token'] except Exception as e: self.last_login_fail = time.time() self.logger(f"❌ 票务登录失败: {e}") daily_stats.api_fails += 1 return None def fetch_schedule(self, date_str): token = self.load_token() if not token: token = self.login() if not token: daily_stats.api_fails += 1 return None url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)} headers = {'User-Agent': 'Mozilla/5.0'} try: res = requests.get(url, params=params, headers=headers, timeout=10) data = res.json() if data.get('code') == 1: return data.get('data', []) elif data.get('code') == 500: token = self.login() if token: params['token'] = token res = requests.get(url, params=params, headers=headers, timeout=10) return res.json().get('data', []) except Exception as e: self.logger(f"⚠️ 票务API异常: {e}") daily_stats.api_fails += 1 return None class TMSAPIManager: def __init__(self, logger_func): self.logger = logger_func self.auth_token = None self.last_token_time = 0 def get_token(self): if self.auth_token and (time.time() - self.last_token_time < 1800): return self.auth_token if not all([TMS_APP_SECRET, TMS_TICKET]): return None # 获取 OA 系统 Token url = f'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket={TMS_TICKET}' headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0'} payload = {'appId': 'hd', 'appSecret': TMS_APP_SECRET, 'timeStamp': int(time.time() * 1000)} try: res = requests.post(url, json=payload, headers=headers, timeout=10) data = res.json() if data.get('error_code') == '0000': self.auth_token = data['param'] self.last_token_time = time.time() return self.auth_token except: pass daily_stats.api_fails += 1 return None def fetch_hall_schedule_list(self, hall_number): token = self.get_token() if not token: return None tms_hall_id = HALL_ID_MAP.get(str(hall_number)) if not tms_hall_id: return None url = 'https://tms.hengdianfilm.com/cinema-api/cinema/schedule/server/list' session_id = TMS_X_SESSION_ID or '' cookies = {'JSESSIONID': session_id} headers = { 'accept': 'application/json, text/javascript, */*; q=0.01', 'content-type': 'application/json; charset=UTF-8', 'origin': 'https://tms.hengdianfilm.com', 'referer': f'https://tms.hengdianfilm.com/hd/index?CinemaMonitorEdit&HALL_ID%3D{tms_hall_id}', 'token': token, 'user-agent': 'Mozilla/5.0', 'x-requested-with': 'XMLHttpRequest', 'x-sessionid': session_id } params = {'token': 'hd', 'murl': 'CinemaMonitor'} json_data = { 'THEATER_ID': 38205954, 'STATE': 0, 'HALL_ID': tms_hall_id, 'START_TIME': int(time.time() * 1000), 'PAGE_CAPACITY': 20, 'PAGE_INDEX': 1, } try: res = requests.post(url, params=params, cookies=cookies, headers=headers, json=json_data, timeout=10, verify=False) data = res.json() if data.get("RSPCD") == "000000": return data.get("BODY", {}).get("LIST", []) else: return None except Exception as e: self.logger(f"⚠️ TMS 请求异常: {e}") daily_stats.api_fails += 1 return None def trigger_schedule_refresh(self): """触发 TMS 排期刷新""" token = self.get_token() if not token or not TMS_THEATER_ID: return try: # 动态生成时间戳 now_tm = time.localtime() today_midnight = time.mktime((now_tm.tm_year, now_tm.tm_mon, now_tm.tm_mday, 0, 0, 0, 0, 0, 0)) start_date = int(today_midnight * 1000) end_date = start_date + (24 * 60 * 60 * 1000) url = 'https://tms.hengdianfilm.com/cinema-api/tms/cmd/show/schedule' session_id = TMS_X_SESSION_ID or '' headers = { 'accept': 'application/json, text/javascript, */*; q=0.01', 'content-type': 'application/json; charset=UTF-8', 'origin': 'https://tms.hengdianfilm.com', 'referer': f'https://tms.hengdianfilm.com/hd/index?Scheduling&THEATER_ID={TMS_THEATER_ID}&DATE={start_date}&PAGE_CAPACITY=20&PAGE_INDEX=1', 'token': token, # 这里的 token 由 get_token() 动态获取 'user-agent': 'Mozilla/5.0', 'x-requested-with': 'XMLHttpRequest', 'x-sessionid': session_id } params = { 'token': 'hd', 'murl': 'Scheduling', } cookies = {'JSESSIONID': session_id} json_data = { 'THEATER_LIST': [ { 'THEATER_ID': int(TMS_THEATER_ID), 'START_DATE': start_date, 'END_DATE': end_date, }, ], } # 发送请求,非阻塞,忽略证书错误 requests.post(url, params=params, headers=headers, cookies=cookies, json=json_data, timeout=5, verify=False) self.logger("🔄 TMS 排期刷新指令已发送") except Exception as e: # 刷新失败不影响主程序 self.logger(f"⚠️ TMS 排期刷新失败: {e}") # --- 6. 智能监控主逻辑 --- class PlaybackMonitor: def __init__(self): self.logs = deque(maxlen=50) self.status_text = "初始化中..." self.next_wakeup_str = "--:--:--" self.active_monitors = [] self.cleared_sessions = set() self.processed_checks = set() self.ticket_api = TicketAPIManager(self.log) self.tms_api = TMSAPIManager(self.log) self.current_business_date = None self.daily_schedule_cache = None self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread.start() def log(self, msg): ts = get_beijing_now().strftime("%H:%M:%S") entry = f"[{ts}] {msg}" self.logs.appendleft(entry) print(entry) def _get_target_check_points(self, schedule_list, business_date): check_points = [] zero_count = 0 for item in schedule_list: sold = int(item.get('soldTicketNum', 0)) if sold == 0: zero_count += 1 start_str = item.get('showStartTime') if not start_str: continue show_dt = parse_show_datetime(business_date, start_str) if not show_dt: continue for i in range(3): check_time = show_dt + timedelta(minutes=i * 5) check_points.append({ 'time': check_time, 'type': 'CHECK', 'check_index': i, 'data': item }) daily_stats.zero_sessions = zero_count check_points.sort(key=lambda x: x['time']) return check_points def _run_loop(self): self.log("🚀 智能防空转监控已启动 (WeChat+Pushover)") while True: try: now = get_beijing_now() biz_date = get_business_date() # --- 初始化与日期变更 --- need_refresh = False if self.daily_schedule_cache is None: need_refresh = True self.log("🆕 初始化:获取全天排片...") self.current_business_date = biz_date elif self.current_business_date != biz_date: if now.time() >= dt_time(9, 0): need_refresh = True daily_stats.snapshot_as_yesterday(self.current_business_date) self.cleared_sessions.clear() self.processed_checks.clear() self.log(f"🌞 新营业日 ({biz_date}):刷新排片...") if need_refresh: schedule = self.ticket_api.fetch_schedule(biz_date) if schedule: self.daily_schedule_cache = schedule self.current_business_date = biz_date self.log(f"✅ 排片已更新,共 {len(schedule)} 场。") try: d_obj = datetime.strptime(biz_date, "%Y-%m-%d") date_cn = f"{d_obj.year}年{d_obj.month}月{d_obj.day}日" except: date_cn = biz_date notifier.send_daily_report(date_cn) else: self.log("⚠️ 获取排片失败,5分钟后重试") daily_stats.api_fails += 1 time.sleep(300) continue # --- 任务计算 --- check_points = self._get_target_check_points(self.daily_schedule_cache, biz_date) next_target = None for cp in check_points: hall_id = extract_hall_number_raw(cp['data']['hallName']) start_str = cp['data']['showStartTime'] check_idx = cp['check_index'] dedup_key = f"{biz_date}_{hall_id}_{start_str}_{check_idx}" if dedup_key in self.processed_checks: continue if cp['time'] > (now - timedelta(seconds=30)): next_target = cp break if next_target: target_time = next_target['time'] # 09:00 拦截器 nine_am_today = datetime.combine(now.date(), dt_time(9, 0)) force_wake_for_daily_reset = False if now < nine_am_today and target_time > nine_am_today: target_time = nine_am_today force_wake_for_daily_reset = True sleep_seconds = max(0, (target_time - now).total_seconds()) self.status_text = f"💤 休眠中" self.next_wakeup_str = target_time.strftime('%H:%M:%S') if force_wake_for_daily_reset: self.active_monitors = ["等待 09:00 日报刷新..."] else: raw_hall = next_target['data']['hallName'] clean_hall = format_hall_name(raw_hall) self.active_monitors = [ f"下个任务: {clean_hall} {next_target['data']['showStartTime']} (第{next_target['check_index'] + 1}次检查)"] self.log(f"💤 智能休眠 {int(sleep_seconds)}秒,将在 {self.next_wakeup_str} 唤醒...") time.sleep(sleep_seconds) if force_wake_for_daily_reset: continue # --- 正常唤醒检查 --- self.status_text = "🔥 正在执行检查..." self.log("⏰ 唤醒!正在同步最新票务数据...") latest_schedule = self.ticket_api.fetch_schedule(biz_date) if latest_schedule: self.daily_schedule_cache = latest_schedule else: self.log("⚠️ 同步排片失败,使用旧数据") daily_stats.api_fails += 1 latest_schedule = self.daily_schedule_cache check_now = get_beijing_now() current_targets = [] latest_check_points = self._get_target_check_points(latest_schedule, biz_date) for cp in latest_check_points: time_diff = abs((cp['time'] - check_now).total_seconds()) if time_diff < 120: current_targets.append(cp) if current_targets: self._process_targets(current_targets, biz_date) time.sleep(5) else: self.status_text = "🌙 今日监控结束" self.active_monitors = [] tmr_9am = datetime.combine(datetime.strptime(biz_date, "%Y-%m-%d").date() + timedelta(days=1), dt_time(9, 0)) seconds_to_tmr = (tmr_9am - now).total_seconds() if seconds_to_tmr > 3600: self.log("暂无目标,休眠 1 小时...") time.sleep(3600) else: self.log(f"休眠至明日 09:00...") time.sleep(seconds_to_tmr) except Exception as e: self.log(f"❌ 主循环异常: {e}") time.sleep(60) def _process_targets(self, targets, biz_date): halls_to_check = {} for t in targets: hall_id = extract_hall_number_raw(t['data']['hallName']) start_str = t['data']['showStartTime'] check_idx = t['check_index'] dedup_key = f"{biz_date}_{hall_id}_{start_str}_{check_idx}" if dedup_key in self.processed_checks: continue if hall_id not in halls_to_check: halls_to_check[hall_id] = [] halls_to_check[hall_id].append(t) if not halls_to_check: return self.log(f"🔍 核查 {len(halls_to_check)} 个影厅 TMS 状态...") display_results = [] for hall_id, target_list in halls_to_check.items(): all_cleared = True for target in target_list: start_str = target['data'].get('showStartTime') key = f"{biz_date}_{hall_id}_{start_str}" if key not in self.cleared_sessions: all_cleared = False break if all_cleared: for target in target_list: dedup_key = f"{biz_date}_{hall_id}_{target['data']['showStartTime']}_{target['check_index']}" self.processed_checks.add(dedup_key) clean_name = format_hall_name(target_list[0]['data']['hallName']) display_results.append(f"{clean_name} ✅ [已跳过 (已确认撤场)]") continue tms_list = self.tms_api.fetch_hall_schedule_list(hall_id) for target in target_list: session = target['data'] check_idx = target['check_index'] hall_name_raw = session.get('hallName') movie_name = session.get('movieName') ticket_start_str = session.get('showStartTime') ticket_start_dt = parse_show_datetime(biz_date, ticket_start_str) unique_key = f"{biz_date}_{hall_id}_{ticket_start_str}" dedup_key = f"{biz_date}_{hall_id}_{ticket_start_str}_{check_idx}" if dedup_key in self.processed_checks: continue if unique_key in self.cleared_sessions: self.processed_checks.add(dedup_key) continue hall_name_clean = format_hall_name(hall_name_raw) status_desc = f"{hall_name_clean} {ticket_start_str} (第{check_idx + 1}/3次)" if tms_list is not None: match_found = False for tms_item in tms_list: t_start_str = tms_item.get('START_TIME') if not t_start_str: continue t_start_dt = parse_show_datetime(biz_date, t_start_str) if not t_start_dt: continue if abs((t_start_dt - ticket_start_dt).total_seconds()) < 1800: match_found = True break if match_found: status_desc += " ⚠️ [空转! TMS未撤]" self.log(f"🚨 发现空转: {hall_name_clean}《{movie_name}》") # 调用新的告警接口 count_info = f"第 {check_idx + 1}/3 次检查发现" remark = "仅停止播放未撤排程下次检查依然会触发通知。" notifier.send_idle_alert(hall_name_clean, movie_name, ticket_start_str, count_info, remark) # --- NEW: 第一次发现后,触发TMS刷新 --- if check_idx == 0: self.tms_api.trigger_schedule_refresh() else: status_desc += " ✅ [正常 (TMS无排期)]" self.cleared_sessions.add(unique_key) else: status_desc += " ❓ [TMS查询失败]" daily_stats.api_fails += 1 self.processed_checks.add(dedup_key) display_results.append(status_desc) self.active_monitors = display_results # --- 7. Streamlit 前端 --- @st.cache_resource def get_monitor(): return PlaybackMonitor() def main(): monitor = get_monitor() if st_autorefresh: st_autorefresh(interval=30 * 1000, key="pb_refresh") st.title("💸 影厅空场防空转监控") remaining_zero_count = 0 remaining_total_count = 0 if monitor.daily_schedule_cache: now = get_beijing_now() biz_date = monitor.current_business_date or get_business_date() for item in monitor.daily_schedule_cache: start_str = item.get('showStartTime') if not start_str: continue show_dt = parse_show_datetime(biz_date, start_str) if show_dt and show_dt > now: remaining_total_count += 1 if int(item.get('soldTicketNum', 0)) == 0: remaining_zero_count += 1 c1, c2, c3 = st.columns(3) with c1: st.metric("当前状态", monitor.status_text) with c2: st.metric("下次唤醒", monitor.next_wakeup_str) with c3: st.metric("剩余 0 票场次 / 剩余总场次", f"{remaining_zero_count} / {remaining_total_count}") st.divider() col_logs, col_mon = st.columns([3, 2]) with col_logs: st.subheader("📜 运行日志") st.text_area("Logs", "\n".join(list(monitor.logs)), height=450, disabled=True) with col_mon: st.subheader("🕵️ 最近一次检查结果") if monitor.active_monitors: for m in monitor.active_monitors: if "⚠️" in m: st.error(m) elif "✅" in m: st.success(m) else: st.info(m) else: st.caption("等待下一次唤醒...") if __name__ == "__main__": main()