hengdian / pages /monitor_playback.py
Ethscriptions's picture
Update pages/monitor_playback.py
b6ec259 verified
import streamlit as st
import requests
import time
import json
import os
import threading
import re
import urllib3
from datetime import datetime, timedelta, timezone, time as dt_time
from collections import deque
from dotenv import load_dotenv
# --- 0. 基础配置 ---
st.set_page_config(page_title="影城空场防空转监控", page_icon="💸", layout="wide")
# 屏蔽 HTTPS 证书警告 (TMS 系统通常使用自签名证书)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
from streamlit_autorefresh import st_autorefresh
except ImportError:
st_autorefresh = None
# --- 1. 配置与常量 ---
load_dotenv()
# Pushover 配置
PUSHOVER_TOKEN = os.getenv("PUSHOVER_TOKEN")
PUSHOVER_USER = os.getenv("PUSHOVER_USER")
# 微信配置
WX_APP_ID = os.getenv("WX_APP_ID")
WX_APP_SECRET = os.getenv("WX_APP_SECRET")
# 支持多个用户,逗号分隔
WX_USER_OPEN_IDS = os.getenv("WX_USER_OPEN_ID", "").split(',')
WX_TEMPLATE_ID_IDLE = os.getenv("WX_TEMPLATE_ID_IDLE")
WX_TEMPLATE_ID_DAILY = os.getenv("WX_TEMPLATE_ID_DAILY")
# 影城系统配置
CINEMA_ID = os.getenv("CINEMA_ID")
TMS_APP_SECRET = os.getenv("TMS_APP_SECRET")
TMS_TICKET = os.getenv("TMS_TICKET")
TMS_X_SESSION_ID = os.getenv("TMS_X_SESSION_ID")
TMS_THEATER_ID = os.getenv("TMS_THEATER_ID") # 新增:影院ID
HALL_ID_MAP = {
"1": "79181753", "2": "87350725", "3": "93340931",
"4": "98009245", "5": "02194530", "6": "07183751",
"7": "11314566", "8": "15532561", "9": "20079450"
}
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(CURRENT_DIR)
TOKEN_FILE = os.path.join(ROOT_DIR, 'token_data.json')
# --- 2. 统计与工具类 ---
class DailyStats:
"""每日运行数据统计"""
def __init__(self):
self.reset()
self.yesterday_stats = {
'zero_sessions': 0,
'triggers': 0,
'notify_fails': 0,
'api_fails': 0,
'date': '未知'
}
def reset(self):
self.zero_sessions = 0 # 0票总场次
self.triggers = 0 # 触发通知次数
self.notify_fails = 0 # 发送通知失败次数
self.api_fails = 0 # API获取失败次数
def snapshot_as_yesterday(self, date_str):
self.yesterday_stats = {
'zero_sessions': self.zero_sessions,
'triggers': self.triggers,
'notify_fails': self.notify_fails,
'api_fails': self.api_fails,
'date': date_str
}
self.reset()
# 全局统计实例
daily_stats = DailyStats()
def get_beijing_now():
utc_now = datetime.now(timezone.utc)
return utc_now.astimezone(timezone(timedelta(hours=8))).replace(tzinfo=None)
def get_business_date():
now = get_beijing_now()
if now.time() < dt_time(6, 0):
return (now - timedelta(days=1)).strftime("%Y-%m-%d")
return now.strftime("%Y-%m-%d")
def parse_show_datetime(date_str, time_str):
try:
show_t = datetime.strptime(time_str, "%H:%M").time()
base_date = datetime.strptime(date_str, "%Y-%m-%d")
if show_t < dt_time(6, 0):
base_date += timedelta(days=1)
return datetime.combine(base_date.date(), show_t)
except:
return None
def extract_hall_number_raw(hall_name):
"""仅提取数字ID,用于API映射"""
match = re.search(r'(\d+)', str(hall_name))
return match.group(1) if match else str(hall_name)
def format_hall_name(hall_name):
"""格式化影厅名称:【和成天下1号厅】 -> 1号厅"""
match = re.search(r'(\d+)号', str(hall_name))
if match:
return f"{match.group(1)}号厅"
return str(hall_name)
# --- 3. 微信推送模块 (含自动重试) ---
class WeChatPusher:
def __init__(self, app_id, app_secret):
self.app_id = app_id
self.app_secret = app_secret
self.token = None
self.token_expires_time = 0
def get_access_token(self, force_refresh=False):
"""获取并缓存微信 Access Token
:param force_refresh: 是否强制刷新 Token
"""
if not force_refresh and self.token and datetime.now().timestamp() < self.token_expires_time:
return self.token
if not self.app_id or not self.app_secret:
return None
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.app_id}&secret={self.app_secret}"
try:
resp = requests.get(url, timeout=10)
data = resp.json()
if 'access_token' in data:
self.token = data['access_token']
# 提前 200 秒过期,防止临界点问题
self.token_expires_time = datetime.now().timestamp() + data['expires_in'] - 200
return self.token
else:
print(f"❌ 微信 Token 获取失败: {data}")
except Exception as e:
print(f"❌ 微信 API 请求异常: {e}")
return None
def send_template_message(self, user_ids, template_id, data_map):
"""发送模板消息给多个用户 (带 40001 错误重试机制)"""
# 组装微信数据格式
formatted_data = {}
for key, value in data_map.items():
formatted_data[key] = {"value": value, "color": "#173177"}
# 清洗 user_ids
valid_ids = [uid.strip() for uid in user_ids if uid and uid.strip()]
if not valid_ids:
return
for user_id in valid_ids:
# 每个用户最多尝试 2 次(1次正常,1次刷新 Token 后重试)
for attempt in range(2):
# 如果是重试(attempt > 0),强制刷新 Token
token = self.get_access_token(force_refresh=(attempt > 0))
if not token:
daily_stats.notify_fails += 1
break # Token 都拿不到,不用重试了
url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={token}"
payload = {
"touser": user_id,
"template_id": template_id,
"data": formatted_data
}
try:
resp = requests.post(url, json=payload, timeout=10)
result = resp.json()
errcode = result.get("errcode")
if errcode == 0:
break # 成功
elif errcode in [40001, 40014, 42001]:
print(f"🔄 微信 Token 失效 ({errcode}),正在刷新重试 [{user_id}]...")
continue # 强制刷新 Token
else:
print(f"❌ 微信发送失败 [{user_id}]: {result}")
daily_stats.notify_fails += 1
break # 其他错误,不重试
except Exception as e:
print(f"❌ 微信发送异常 [{user_id}]: {e}")
daily_stats.notify_fails += 1
break
# --- 4. 通知管理系统 ---
class NotificationManager:
def __init__(self):
self.wx_pusher = WeChatPusher(WX_APP_ID, WX_APP_SECRET)
def send_pushover(self, title, message, priority=0):
if not PUSHOVER_TOKEN or not PUSHOVER_USER:
return
url = "https://api.pushover.net/1/messages.json"
data = {
"token": PUSHOVER_TOKEN,
"user": PUSHOVER_USER,
"title": title,
"message": message,
"priority": priority
}
try:
requests.post(url, data=data, timeout=10)
except Exception as e:
print(f"❌ Pushover 发送异常: {e}")
daily_stats.notify_fails += 1
def send_idle_alert(self, hall_name, movie_name, show_time, count_info, remark_text=""):
"""发送空转告警 (Pushover + WeChat)"""
daily_stats.triggers += 1
# 1. Pushover 内容
title = f"发现 {hall_name} 空场“空转”!"
msg = (
f"{hall_name} {show_time}{movie_name}》\n"
f"无人购票但服务器有排程,未撤场,或许正在播放,请检查。\n"
f"{count_info}{remark_text}"
)
self.send_pushover(title, msg, priority=1)
# 2. 微信内容
if WX_TEMPLATE_ID_IDLE:
data_map = {
"first": f"发现 {hall_name} 空场“空转”!\n{hall_name} {show_time}{movie_name}》\n无人购票但服务器有排程,未撤场,或许正在播放,请检查。\n",
"keyword1": hall_name,
"keyword2": movie_name,
"keyword3": show_time,
"keyword4": count_info,
"remark": f"\n{remark_text}"
}
self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_IDLE, data_map)
def send_daily_report(self, today_date_cn):
"""发送每日报告 (Pushover + WeChat)"""
y_stats = daily_stats.yesterday_stats
# 统计数据文本
stats_text = (
f"0票场次({y_stats['zero_sessions']}) "
f"触发({y_stats['triggers']}) "
f"s失败({y_stats['notify_fails']}) "
f"g失败({y_stats['api_fails']})"
)
# 1. Pushover
title = "影城“空转”检查服务就绪"
msg = (
f"{today_date_cn},今日排片数据已加载,开始智能检查。\n\n"
f"昨日情况:\n"
f"0票总场次:{y_stats['zero_sessions']}\n"
f"触发通知次数:{y_stats['triggers']}\n"
f"发送通知失败次数:{y_stats['notify_fails']}\n"
f"API获取失败次数:{y_stats['api_fails']}\n"
)
self.send_pushover(title, msg, priority=0)
# 2. 微信
if WX_TEMPLATE_ID_DAILY:
data_map = {
"first": f"影城“空转”检查服务就绪\n{today_date_cn},今日排片数据已加载,开始智能检查。\n",
"keyword1": "影城“空转”检查服务",
"keyword2": today_date_cn,
"keyword3": stats_text,
"remark": "\n服务正常运行中。"
}
self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_DAILY, data_map)
notifier = NotificationManager()
# --- 5. API 管理模块 ---
class TicketAPIManager:
def __init__(self, logger_func):
self.logger = logger_func
self.last_login_fail = 0
def load_token(self):
if os.path.exists(TOKEN_FILE):
try:
with open(TOKEN_FILE, 'r', encoding='utf-8') as f:
return json.load(f).get('token')
except:
pass
return None
def login(self):
if time.time() - self.last_login_fail < 300: return None
username = os.getenv("CINEMA_USERNAME")
password = os.getenv("CINEMA_PASSWORD")
res_code = os.getenv("CINEMA_RES_CODE")
device_id = os.getenv("CINEMA_DEVICE_ID")
if not all([username, password, res_code]):
self.logger("❌ 票务系统环境变量缺失")
daily_stats.api_fails += 1
return None
try:
session = requests.Session()
login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action'
login_data = {'username': username, 'password': password, 'type': '1', 'resCode': res_code,
'deviceid': device_id, 'dtype': 'ios'}
session.post(login_url, data=login_data, timeout=15)
resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10)
info = resp.json()
if info.get("success") and info.get("data", {}).get("token"):
with open(TOKEN_FILE, 'w', encoding='utf-8') as f:
json.dump(info['data'], f)
return info['data']['token']
except Exception as e:
self.last_login_fail = time.time()
self.logger(f"❌ 票务登录失败: {e}")
daily_stats.api_fails += 1
return None
def fetch_schedule(self, date_str):
token = self.load_token()
if not token:
token = self.login()
if not token:
daily_stats.api_fails += 1
return None
url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo'
params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)}
headers = {'User-Agent': 'Mozilla/5.0'}
try:
res = requests.get(url, params=params, headers=headers, timeout=10)
data = res.json()
if data.get('code') == 1:
return data.get('data', [])
elif data.get('code') == 500:
token = self.login()
if token:
params['token'] = token
res = requests.get(url, params=params, headers=headers, timeout=10)
return res.json().get('data', [])
except Exception as e:
self.logger(f"⚠️ 票务API异常: {e}")
daily_stats.api_fails += 1
return None
class TMSAPIManager:
def __init__(self, logger_func):
self.logger = logger_func
self.auth_token = None
self.last_token_time = 0
def get_token(self):
if self.auth_token and (time.time() - self.last_token_time < 1800):
return self.auth_token
if not all([TMS_APP_SECRET, TMS_TICKET]): return None
# 获取 OA 系统 Token
url = f'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket={TMS_TICKET}'
headers = {'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0'}
payload = {'appId': 'hd', 'appSecret': TMS_APP_SECRET, 'timeStamp': int(time.time() * 1000)}
try:
res = requests.post(url, json=payload, headers=headers, timeout=10)
data = res.json()
if data.get('error_code') == '0000':
self.auth_token = data['param']
self.last_token_time = time.time()
return self.auth_token
except:
pass
daily_stats.api_fails += 1
return None
def fetch_hall_schedule_list(self, hall_number):
token = self.get_token()
if not token: return None
tms_hall_id = HALL_ID_MAP.get(str(hall_number))
if not tms_hall_id: return None
url = 'https://tms.hengdianfilm.com/cinema-api/cinema/schedule/server/list'
session_id = TMS_X_SESSION_ID or ''
cookies = {'JSESSIONID': session_id}
headers = {
'accept': 'application/json, text/javascript, */*; q=0.01',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://tms.hengdianfilm.com',
'referer': f'https://tms.hengdianfilm.com/hd/index?CinemaMonitorEdit&HALL_ID%3D{tms_hall_id}',
'token': token,
'user-agent': 'Mozilla/5.0',
'x-requested-with': 'XMLHttpRequest',
'x-sessionid': session_id
}
params = {'token': 'hd', 'murl': 'CinemaMonitor'}
json_data = {
'THEATER_ID': 38205954,
'STATE': 0,
'HALL_ID': tms_hall_id,
'START_TIME': int(time.time() * 1000),
'PAGE_CAPACITY': 20,
'PAGE_INDEX': 1,
}
try:
res = requests.post(url, params=params, cookies=cookies, headers=headers, json=json_data, timeout=10,
verify=False)
data = res.json()
if data.get("RSPCD") == "000000":
return data.get("BODY", {}).get("LIST", [])
else:
return None
except Exception as e:
self.logger(f"⚠️ TMS 请求异常: {e}")
daily_stats.api_fails += 1
return None
def trigger_schedule_refresh(self):
"""触发 TMS 排期刷新"""
token = self.get_token()
if not token or not TMS_THEATER_ID: return
try:
# 动态生成时间戳
now_tm = time.localtime()
today_midnight = time.mktime((now_tm.tm_year, now_tm.tm_mon, now_tm.tm_mday, 0, 0, 0, 0, 0, 0))
start_date = int(today_midnight * 1000)
end_date = start_date + (24 * 60 * 60 * 1000)
url = 'https://tms.hengdianfilm.com/cinema-api/tms/cmd/show/schedule'
session_id = TMS_X_SESSION_ID or ''
headers = {
'accept': 'application/json, text/javascript, */*; q=0.01',
'content-type': 'application/json; charset=UTF-8',
'origin': 'https://tms.hengdianfilm.com',
'referer': f'https://tms.hengdianfilm.com/hd/index?Scheduling&THEATER_ID={TMS_THEATER_ID}&DATE={start_date}&PAGE_CAPACITY=20&PAGE_INDEX=1',
'token': token, # 这里的 token 由 get_token() 动态获取
'user-agent': 'Mozilla/5.0',
'x-requested-with': 'XMLHttpRequest',
'x-sessionid': session_id
}
params = {
'token': 'hd',
'murl': 'Scheduling',
}
cookies = {'JSESSIONID': session_id}
json_data = {
'THEATER_LIST': [
{
'THEATER_ID': int(TMS_THEATER_ID),
'START_DATE': start_date,
'END_DATE': end_date,
},
],
}
# 发送请求,非阻塞,忽略证书错误
requests.post(url, params=params, headers=headers, cookies=cookies, json=json_data, timeout=5, verify=False)
self.logger("🔄 TMS 排期刷新指令已发送")
except Exception as e:
# 刷新失败不影响主程序
self.logger(f"⚠️ TMS 排期刷新失败: {e}")
# --- 6. 智能监控主逻辑 ---
class PlaybackMonitor:
def __init__(self):
self.logs = deque(maxlen=50)
self.status_text = "初始化中..."
self.next_wakeup_str = "--:--:--"
self.active_monitors = []
self.cleared_sessions = set()
self.processed_checks = set()
self.ticket_api = TicketAPIManager(self.log)
self.tms_api = TMSAPIManager(self.log)
self.current_business_date = None
self.daily_schedule_cache = None
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def log(self, msg):
ts = get_beijing_now().strftime("%H:%M:%S")
entry = f"[{ts}] {msg}"
self.logs.appendleft(entry)
print(entry)
def _get_target_check_points(self, schedule_list, business_date):
check_points = []
zero_count = 0
for item in schedule_list:
sold = int(item.get('soldTicketNum', 0))
if sold == 0:
zero_count += 1
start_str = item.get('showStartTime')
if not start_str: continue
show_dt = parse_show_datetime(business_date, start_str)
if not show_dt: continue
for i in range(3):
check_time = show_dt + timedelta(minutes=i * 5)
check_points.append({
'time': check_time,
'type': 'CHECK',
'check_index': i,
'data': item
})
daily_stats.zero_sessions = zero_count
check_points.sort(key=lambda x: x['time'])
return check_points
def _run_loop(self):
self.log("🚀 智能防空转监控已启动 (WeChat+Pushover)")
while True:
try:
now = get_beijing_now()
biz_date = get_business_date()
# --- 初始化与日期变更 ---
need_refresh = False
if self.daily_schedule_cache is None:
need_refresh = True
self.log("🆕 初始化:获取全天排片...")
self.current_business_date = biz_date
elif self.current_business_date != biz_date:
if now.time() >= dt_time(9, 0):
need_refresh = True
daily_stats.snapshot_as_yesterday(self.current_business_date)
self.cleared_sessions.clear()
self.processed_checks.clear()
self.log(f"🌞 新营业日 ({biz_date}):刷新排片...")
if need_refresh:
schedule = self.ticket_api.fetch_schedule(biz_date)
if schedule:
self.daily_schedule_cache = schedule
self.current_business_date = biz_date
self.log(f"✅ 排片已更新,共 {len(schedule)} 场。")
try:
d_obj = datetime.strptime(biz_date, "%Y-%m-%d")
date_cn = f"{d_obj.year}{d_obj.month}{d_obj.day}日"
except:
date_cn = biz_date
notifier.send_daily_report(date_cn)
else:
self.log("⚠️ 获取排片失败,5分钟后重试")
daily_stats.api_fails += 1
time.sleep(300)
continue
# --- 任务计算 ---
check_points = self._get_target_check_points(self.daily_schedule_cache, biz_date)
next_target = None
for cp in check_points:
hall_id = extract_hall_number_raw(cp['data']['hallName'])
start_str = cp['data']['showStartTime']
check_idx = cp['check_index']
dedup_key = f"{biz_date}_{hall_id}_{start_str}_{check_idx}"
if dedup_key in self.processed_checks:
continue
if cp['time'] > (now - timedelta(seconds=30)):
next_target = cp
break
if next_target:
target_time = next_target['time']
# 09:00 拦截器
nine_am_today = datetime.combine(now.date(), dt_time(9, 0))
force_wake_for_daily_reset = False
if now < nine_am_today and target_time > nine_am_today:
target_time = nine_am_today
force_wake_for_daily_reset = True
sleep_seconds = max(0, (target_time - now).total_seconds())
self.status_text = f"💤 休眠中"
self.next_wakeup_str = target_time.strftime('%H:%M:%S')
if force_wake_for_daily_reset:
self.active_monitors = ["等待 09:00 日报刷新..."]
else:
raw_hall = next_target['data']['hallName']
clean_hall = format_hall_name(raw_hall)
self.active_monitors = [
f"下个任务: {clean_hall} {next_target['data']['showStartTime']} (第{next_target['check_index'] + 1}次检查)"]
self.log(f"💤 智能休眠 {int(sleep_seconds)}秒,将在 {self.next_wakeup_str} 唤醒...")
time.sleep(sleep_seconds)
if force_wake_for_daily_reset:
continue
# --- 正常唤醒检查 ---
self.status_text = "🔥 正在执行检查..."
self.log("⏰ 唤醒!正在同步最新票务数据...")
latest_schedule = self.ticket_api.fetch_schedule(biz_date)
if latest_schedule:
self.daily_schedule_cache = latest_schedule
else:
self.log("⚠️ 同步排片失败,使用旧数据")
daily_stats.api_fails += 1
latest_schedule = self.daily_schedule_cache
check_now = get_beijing_now()
current_targets = []
latest_check_points = self._get_target_check_points(latest_schedule, biz_date)
for cp in latest_check_points:
time_diff = abs((cp['time'] - check_now).total_seconds())
if time_diff < 120:
current_targets.append(cp)
if current_targets:
self._process_targets(current_targets, biz_date)
time.sleep(5)
else:
self.status_text = "🌙 今日监控结束"
self.active_monitors = []
tmr_9am = datetime.combine(datetime.strptime(biz_date, "%Y-%m-%d").date() + timedelta(days=1),
dt_time(9, 0))
seconds_to_tmr = (tmr_9am - now).total_seconds()
if seconds_to_tmr > 3600:
self.log("暂无目标,休眠 1 小时...")
time.sleep(3600)
else:
self.log(f"休眠至明日 09:00...")
time.sleep(seconds_to_tmr)
except Exception as e:
self.log(f"❌ 主循环异常: {e}")
time.sleep(60)
def _process_targets(self, targets, biz_date):
halls_to_check = {}
for t in targets:
hall_id = extract_hall_number_raw(t['data']['hallName'])
start_str = t['data']['showStartTime']
check_idx = t['check_index']
dedup_key = f"{biz_date}_{hall_id}_{start_str}_{check_idx}"
if dedup_key in self.processed_checks:
continue
if hall_id not in halls_to_check:
halls_to_check[hall_id] = []
halls_to_check[hall_id].append(t)
if not halls_to_check: return
self.log(f"🔍 核查 {len(halls_to_check)} 个影厅 TMS 状态...")
display_results = []
for hall_id, target_list in halls_to_check.items():
all_cleared = True
for target in target_list:
start_str = target['data'].get('showStartTime')
key = f"{biz_date}_{hall_id}_{start_str}"
if key not in self.cleared_sessions:
all_cleared = False
break
if all_cleared:
for target in target_list:
dedup_key = f"{biz_date}_{hall_id}_{target['data']['showStartTime']}_{target['check_index']}"
self.processed_checks.add(dedup_key)
clean_name = format_hall_name(target_list[0]['data']['hallName'])
display_results.append(f"{clean_name} ✅ [已跳过 (已确认撤场)]")
continue
tms_list = self.tms_api.fetch_hall_schedule_list(hall_id)
for target in target_list:
session = target['data']
check_idx = target['check_index']
hall_name_raw = session.get('hallName')
movie_name = session.get('movieName')
ticket_start_str = session.get('showStartTime')
ticket_start_dt = parse_show_datetime(biz_date, ticket_start_str)
unique_key = f"{biz_date}_{hall_id}_{ticket_start_str}"
dedup_key = f"{biz_date}_{hall_id}_{ticket_start_str}_{check_idx}"
if dedup_key in self.processed_checks: continue
if unique_key in self.cleared_sessions:
self.processed_checks.add(dedup_key)
continue
hall_name_clean = format_hall_name(hall_name_raw)
status_desc = f"{hall_name_clean} {ticket_start_str} (第{check_idx + 1}/3次)"
if tms_list is not None:
match_found = False
for tms_item in tms_list:
t_start_str = tms_item.get('START_TIME')
if not t_start_str: continue
t_start_dt = parse_show_datetime(biz_date, t_start_str)
if not t_start_dt: continue
if abs((t_start_dt - ticket_start_dt).total_seconds()) < 1800:
match_found = True
break
if match_found:
status_desc += " ⚠️ [空转! TMS未撤]"
self.log(f"🚨 发现空转: {hall_name_clean}{movie_name}》")
# 调用新的告警接口
count_info = f"第 {check_idx + 1}/3 次检查发现"
remark = "仅停止播放未撤排程下次检查依然会触发通知。"
notifier.send_idle_alert(hall_name_clean, movie_name, ticket_start_str, count_info, remark)
# --- NEW: 第一次发现后,触发TMS刷新 ---
if check_idx == 0:
self.tms_api.trigger_schedule_refresh()
else:
status_desc += " ✅ [正常 (TMS无排期)]"
self.cleared_sessions.add(unique_key)
else:
status_desc += " ❓ [TMS查询失败]"
daily_stats.api_fails += 1
self.processed_checks.add(dedup_key)
display_results.append(status_desc)
self.active_monitors = display_results
# --- 7. Streamlit 前端 ---
@st.cache_resource
def get_monitor():
return PlaybackMonitor()
def main():
monitor = get_monitor()
if st_autorefresh:
st_autorefresh(interval=30 * 1000, key="pb_refresh")
st.title("💸 影厅空场防空转监控")
remaining_zero_count = 0
remaining_total_count = 0
if monitor.daily_schedule_cache:
now = get_beijing_now()
biz_date = monitor.current_business_date or get_business_date()
for item in monitor.daily_schedule_cache:
start_str = item.get('showStartTime')
if not start_str: continue
show_dt = parse_show_datetime(biz_date, start_str)
if show_dt and show_dt > now:
remaining_total_count += 1
if int(item.get('soldTicketNum', 0)) == 0:
remaining_zero_count += 1
c1, c2, c3 = st.columns(3)
with c1:
st.metric("当前状态", monitor.status_text)
with c2:
st.metric("下次唤醒", monitor.next_wakeup_str)
with c3:
st.metric("剩余 0 票场次 / 剩余总场次", f"{remaining_zero_count} / {remaining_total_count}")
st.divider()
col_logs, col_mon = st.columns([3, 2])
with col_logs:
st.subheader("📜 运行日志")
st.text_area("Logs", "\n".join(list(monitor.logs)), height=450, disabled=True)
with col_mon:
st.subheader("🕵️ 最近一次检查结果")
if monitor.active_monitors:
for m in monitor.active_monitors:
if "⚠️" in m:
st.error(m)
elif "✅" in m:
st.success(m)
else:
st.info(m)
else:
st.caption("等待下一次唤醒...")
if __name__ == "__main__":
main()