import streamlit as st import requests import time import json import os import threading import re import urllib3 from datetime import datetime, timedelta, timezone, time as dt_time from collections import deque from dotenv import load_dotenv # --- 0. 基础配置 --- st.set_page_config(page_title="影城防撤场监控系统", page_icon="🛡️", layout="wide") # 屏蔽 HTTPS 证书警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 尝试导入自动刷新组件 try: from streamlit_autorefresh import st_autorefresh except ImportError: st_autorefresh = None # --- 1. 配置与常量 --- load_dotenv() # Pushover 配置 PUSHOVER_TOKEN = os.getenv("PUSHOVER_TOKEN") PUSHOVER_USER = os.getenv("PUSHOVER_USER") # 微信配置 WX_APP_ID = os.getenv("WX_APP_ID") WX_APP_SECRET = os.getenv("WX_APP_SECRET") # 支持多个用户,逗号分隔 WX_USER_OPEN_IDS = os.getenv("WX_USER_OPEN_ID", "").split(',') WX_TEMPLATE_ID_TICKET = os.getenv("WX_TEMPLATE_ID_TICKET") WX_TEMPLATE_ID_DAILY = os.getenv("WX_TEMPLATE_ID_DAILY") # 影城配置 CINEMA_ID = os.getenv("CINEMA_ID") # 获取 Token 文件路径 CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(CURRENT_DIR) TOKEN_FILE = os.path.join(ROOT_DIR, 'token_data.json') # --- 2. 工具函数 --- def get_beijing_now(): """获取当前的北京时间""" utc_now = datetime.now(timezone.utc) beijing_now = utc_now.astimezone(timezone(timedelta(hours=8))) return beijing_now.replace(tzinfo=None) def get_business_date(): """获取当前的营业日日期""" now = get_beijing_now() if now.time() < dt_time(6, 0): return (now - timedelta(days=1)).strftime("%Y-%m-%d") return now.strftime("%Y-%m-%d") def parse_show_datetime(date_str, time_str): """解析排片时间 (处理跨天)""" try: show_t = datetime.strptime(time_str, "%H:%M").time() base_date = datetime.strptime(date_str, "%Y-%m-%d") if show_t < dt_time(6, 0): base_date += timedelta(days=1) return datetime.combine(base_date.date(), show_t) except: return None def simplify_hall_name(raw_name): """ 影厅名称简化 例如:'【和成天下1号厅】...' -> '1号厅' '6号激光厅' -> '6号厅' """ if not raw_name: return "未知影厅" match = re.search(r'(\d+)号', raw_name) if match: return f"{match.group(1)}号厅" return raw_name # --- 3. 微信推送模块 (含自动重试) --- class WeChatPusher: def __init__(self, app_id, app_secret): self.app_id = app_id self.app_secret = app_secret self.token = None self.token_expires_time = 0 def get_access_token(self, force_refresh=False): """获取并缓存微信 Access Token :param force_refresh: 是否强制刷新 Token """ if not force_refresh and self.token and datetime.now().timestamp() < self.token_expires_time: return self.token if not self.app_id or not self.app_secret: return None url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.app_id}&secret={self.app_secret}" try: resp = requests.get(url, timeout=10) data = resp.json() if 'access_token' in data: self.token = data['access_token'] # 提前 200 秒过期,防止临界点问题 self.token_expires_time = datetime.now().timestamp() + data['expires_in'] - 200 return self.token else: print(f"❌ 微信 Token 获取失败: {data}") except Exception as e: print(f"❌ 微信 API 请求异常: {e}") return None def send_template_message(self, user_ids, template_id, data_map): """发送模板消息给多个用户 (带 40001 错误重试机制)""" # 组装微信数据格式 formatted_data = {} for key, value in data_map.items(): formatted_data[key] = {"value": value, "color": "#173177"} # 清洗 user_ids valid_ids = [uid.strip() for uid in user_ids if uid and uid.strip()] if not valid_ids: return False success_any = False for user_id in valid_ids: # 每个用户最多尝试 2 次(1次正常,1次刷新 Token 后重试) for attempt in range(2): # 如果是重试(attempt > 0),强制刷新 Token token = self.get_access_token(force_refresh=(attempt > 0)) if not token: break # Token 都拿不到,不用重试了 url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={token}" payload = { "touser": user_id, "template_id": template_id, "data": formatted_data } try: resp = requests.post(url, json=payload, timeout=10) result = resp.json() errcode = result.get("errcode") if errcode == 0: # 发送成功 # print(f"✅ 微信发送成功 [{user_id}]") success_any = True break # 跳出重试循环,处理下一个用户 # 40001: Token 无效 # 40014: Token 不合法 # 42001: Token 过期 elif errcode in [40001, 40014, 42001]: print(f"🔄 微信 Token 失效 ({errcode}),正在刷新重试 [{user_id}]...") continue # 进入下一次循环,强制刷新 Token else: print(f"❌ 微信发送失败 [{user_id}]: {result}") break # 其他错误,不重试 except Exception as e: print(f"❌ 微信发送异常 [{user_id}]: {e}") break return success_any # --- 4. API 管理模块 --- class APIManager: def __init__(self, logger): self.last_login_fail_time = 0 self.logger = logger def save_token(self, token_data): try: with open(TOKEN_FILE, 'w', encoding='utf-8') as f: json.dump(token_data, f, ensure_ascii=False, indent=4) except: pass def load_token(self): if os.path.exists(TOKEN_FILE): try: with open(TOKEN_FILE, 'r', encoding='utf-8') as f: return json.load(f).get('token') except: pass return None def login(self): if time.time() - self.last_login_fail_time < 600: self.logger("⏳ 登录冷却中,跳过重试。") return None self.logger("🔄 尝试后台自动登录...") username = os.getenv("CINEMA_USERNAME") password = os.getenv("CINEMA_PASSWORD") res_code = os.getenv("CINEMA_RES_CODE") device_id = os.getenv("CINEMA_DEVICE_ID") if not all([username, password, res_code]): self.logger("❌ 环境变量缺失,无法自动登录") return None session = requests.Session() session.headers.update({'User-Agent': 'Mozilla/5.0'}) login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' login_data = { 'username': username, 'password': password, 'type': '1', 'resCode': res_code, 'deviceid': device_id, 'dtype': 'ios', } try: session.post(login_url, data=login_data, timeout=15) resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10) info = resp.json() if info.get("success") and info.get("data", {}).get("token"): self.save_token(info['data']) self.logger("✅ 登录成功。") return info['data']['token'] else: raise Exception("未获取到Token") except Exception as e: self.last_login_fail_time = time.time() self.logger(f"❌ 登录失败: {e}") return None def fetch_schedule(self, date_str): token = self.load_token() if not token: token = self.login() if not token: return None url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)} headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} try: response = requests.get(url, params=params, headers=headers, timeout=15) data = response.json() if data.get('code') == 1: return data.get('data', []) elif data.get('code') == 500: self.logger("⚠️ Token失效,重试中...") token = self.login() if token: params['token'] = token response = requests.get(url, params=params, headers=headers, timeout=15) return response.json().get('data', []) return None except Exception as e: self.logger(f"API请求异常: {e}") return None # --- 5. 监控服务主逻辑 --- class CinemaMonitor: def __init__(self): self.logs = deque(maxlen=50) self.status_text = "初始化中..." self.next_wakeup = None self.monitored_sessions = [] self.api = APIManager(self.log) self.wx_pusher = WeChatPusher(WX_APP_ID, WX_APP_SECRET) self.current_business_date = None self.daily_schedule_cache = None self.zero_ticket_candidates = set() self.alerted_sessions = set() self.last_daily_report_date = None # 统计数据 (用于每日报告) self.stats = { "zero_total": 0, # 纳入监控的0票总场次 "alert_count": 0, # 触发通知次数 "api_fails": 0, # API获取失败次数 "notify_fails": 0 # 发送通知失败次数 } self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread.start() def log(self, msg): timestamp = get_beijing_now().strftime("%H:%M:%S") entry = f"[{timestamp}] {msg}" print(entry) self.logs.appendleft(entry) def _send_pushover(self, title, message, priority=0): """发送 Pushover 通知""" if not PUSHOVER_TOKEN or not PUSHOVER_USER: return False url = "https://api.pushover.net/1/messages.json" data = { "token": PUSHOVER_TOKEN, "user": PUSHOVER_USER, "title": title, "message": message, "sound": "pushover", "priority": priority } try: response = requests.post(url, data=data, timeout=10) if response.status_code == 200: return True else: print(f"❌ Pushover 拒绝: {response.status_code}") return False except Exception as e: print(f"❌ Pushover 网络异常: {e}") return False def send_ticket_alert(self, hall_name, movie_name, show_time, ticket_count): """ 发送突发购票告警 (Pushover + WeChat) """ # 1. Pushover title = f"🎟 发现 {hall_name} {show_time} 场次有票!" msg = f"{hall_name} {show_time}《{movie_name}》\n⚠️ 突增至 {ticket_count} 张" if not self._send_pushover(title, msg, priority=1): self.stats["notify_fails"] += 1 # 2. WeChat if WX_TEMPLATE_ID_TICKET: data = { "first": f"发现 {hall_name} {show_time} 场次有票!\n{hall_name} {show_time}《{movie_name}》\n⚠️ 突增至 {ticket_count} 张\n", "keyword1": hall_name, "keyword2": movie_name, "keyword3": show_time, "keyword4": f"{ticket_count} 张", "remark": "\n请尽快处理。" } # 如果所有用户发送都失败才算失败 if not self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_TICKET, data): self.stats["notify_fails"] += 1 def send_daily_report(self, date_str): """ 发送每日统计报告 (Pushover + WeChat) """ # 1. Pushover stats_text_full = ( f"0票总场次:{self.stats['zero_total']}\n" f"触发通知次数:{self.stats['alert_count']}\n" f"发送通知失败次数:{self.stats['notify_fails']}\n" f"API获取失败次数:{self.stats['api_fails']}" ) title = "🟢 0票场次开场前10分钟内突发购票检查就绪" msg = ( f"{date_str},今日排片数据已加载,开始智能检查。\n" f"昨日情况:\n{stats_text_full}" ) if not self._send_pushover(title, msg, priority=0): self.stats["notify_fails"] += 1 # 2. WeChat if WX_TEMPLATE_ID_DAILY: # 格式化简短的昨日数据,避免微信字段超长 stats_short = ( f"0票场次({self.stats['zero_total']}) " f"触发({self.stats['alert_count']}) " f"s失败({self.stats['notify_fails']}) " f"g失败({self.stats['api_fails']})" ) data = { "first": f"0票场次开场前10分钟内突发购票检查就绪\n{date_str},今日排片数据已加载,开始智能检查。\n", "keyword1": "0票场次开场前10分钟内突发购票", "keyword2": date_str, "keyword3": stats_short, "remark": "\n服务正常运行中。" } if not self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_DAILY, data): self.stats["notify_fails"] += 1 def _run_loop(self): self.log("🚀 监控服务已启动 (WeChat + Pushover)") while True: try: now = get_beijing_now() biz_date = get_business_date() today_str = now.strftime("%Y-%m-%d") # --- 每日健康检查 (09:00) --- if now.time() >= dt_time(9, 0) and self.last_daily_report_date != today_str: report_date_str = now.strftime("%Y年%m月%d日") self.log(f"🔔 发送每日统计报告: {today_str}") # 发送报告 self.send_daily_report(report_date_str) # 重置统计数据 & 更新日期 self.last_daily_report_date = today_str self.stats = { "zero_total": 0, "alert_count": 0, "api_fails": 0, "notify_fails": 0 } # 休眠逻辑 (06:00 - 09:30) start_check_time = now.replace(hour=9, minute=30, second=0, microsecond=0) is_early_morning = (dt_time(6, 0) <= now.time() < dt_time(9, 30)) if is_early_morning: self.status_text = "非监控时段 (等待 09:30)" self.next_wakeup = start_check_time self.zero_ticket_candidates.clear() self.alerted_sessions.clear() time.sleep(60) continue # 缓存刷新 if self.daily_schedule_cache is None or self.current_business_date != biz_date: self.log(f"📅 获取 {biz_date} 全天排片...") schedule = self.api.fetch_schedule(biz_date) if schedule is None: self.log("❌ 初始化获取失败,1分钟后重试") self.stats["api_fails"] += 1 time.sleep(60) continue self.daily_schedule_cache = schedule self.current_business_date = biz_date self.zero_ticket_candidates.clear() self.alerted_sessions.clear() self.log(f"✅ 数据已更新,共 {len(schedule)} 场") # 筛选窗口期 active_check_needed = False min_sleep_seconds = 3600 sessions_in_window = [] current_schedule = self.daily_schedule_cache for item in current_schedule: start_time_str = item.get('showStartTime') if not start_time_str: continue show_dt = parse_show_datetime(biz_date, start_time_str) if not show_dt: continue if now >= show_dt: continue # 已开场 monitor_start_dt = show_dt - timedelta(minutes=11) if now >= monitor_start_dt: sessions_in_window.append({'data': item, 'dt': show_dt}) active_check_needed = True else: seconds_until_window = (monitor_start_dt - now).total_seconds() if seconds_until_window < min_sleep_seconds: min_sleep_seconds = seconds_until_window # 执行监控 if active_check_needed: self.status_text = "🔥 监控进行中" self.next_wakeup = now + timedelta(seconds=60) realtime_schedule = self.api.fetch_schedule(biz_date) if realtime_schedule is None: self.log("⚠️ API请求失败,跳过本次判定") self.stats["api_fails"] += 1 time.sleep(60) continue realtime_map = {f"{x.get('hallId')}_{x.get('showStartTime')}": x for x in realtime_schedule} display_monitors = [] for session in sessions_in_window: key = f"{session['data'].get('hallId')}_{session['data'].get('showStartTime')}" unique_id = f"{biz_date}_{key}" latest = realtime_map.get(key) if not latest: continue movie = latest.get('movieName') hall_raw = latest.get('hallName') # 影厅名简化 hall_short = simplify_hall_name(hall_raw) sold = int(latest.get('soldTicketNum', 0)) start = latest.get('showStartTime') if sold == 0: # 统计新加入的0票场次 if unique_id not in self.zero_ticket_candidates: self.stats["zero_total"] += 1 self.zero_ticket_candidates.add(unique_id) display_monitors.append(f"👁️ {start} {movie} (0票)") else: if unique_id in self.zero_ticket_candidates: if unique_id not in self.alerted_sessions: self.log(f"🚨 突发购票!{hall_short}《{movie}》") # 统计触发次数 self.stats["alert_count"] += 1 # 发送通知 (Pushover + WeChat) self.send_ticket_alert(hall_short, movie, start, sold) self.alerted_sessions.add(unique_id) self.zero_ticket_candidates.remove(unique_id) display_monitors.append(f"✅ {start} {movie} (新售出)") else: display_monitors.append(f"🛡️ {start} {movie} (原有票)") self.monitored_sessions = display_monitors time.sleep(60) else: if 0 < min_sleep_seconds < 86400: wakeup_dt = now + timedelta(seconds=min_sleep_seconds) self.status_text = "💤 智能休眠中" self.next_wakeup = wakeup_dt self.monitored_sessions = [] self.log(f"休眠 {min_sleep_seconds / 60:.1f} 分钟,直到 {wakeup_dt.strftime('%H:%M')}") time.sleep(min_sleep_seconds) else: self.log("今日监控结束,长休眠") time.sleep(300) except Exception as e: self.log(f"主循环异常: {e}") time.sleep(60) # --- 6. Streamlit 前端 --- @st.cache_resource def get_monitor(): return CinemaMonitor() def main(): monitor = get_monitor() # 自动刷新:30秒 if st_autorefresh: st_autorefresh(interval=30 * 1000, key="monitor_refresh") st.title("📢 零票影前十分钟内突发购票检查") c1, c2, c3 = st.columns(3) with c1: st.metric("运行状态", monitor.status_text) with c2: wakeup_str = monitor.next_wakeup.strftime("%H:%M:%S") if monitor.next_wakeup else "--" st.metric("下次唤醒", wakeup_str) # --- 统计逻辑 --- remaining_zero_count = 0 remaining_total_count = 0 if monitor.daily_schedule_cache: now = get_beijing_now() biz_date = monitor.current_business_date or get_business_date() for item in monitor.daily_schedule_cache: start_str = item.get('showStartTime') if not start_str: continue # 解析时间 show_dt = parse_show_datetime(biz_date, start_str) # 只统计“当前时间之后”的场次 if show_dt and show_dt > now: remaining_total_count += 1 if int(item.get('soldTicketNum', 0)) == 0: remaining_zero_count += 1 with c3: st.metric("剩余 0 票场次 / 剩余总场次", f"{remaining_zero_count} / {remaining_total_count}") st.divider() col_logs, col_list = st.columns([3, 2]) with col_logs: st.subheader("📜 运行日志") log_text = "\n".join(list(monitor.logs)) st.text_area("Logs", log_text, height=500, disabled=True) with col_list: st.subheader("🎯 当前监控窗口 (开场前10分)") if monitor.monitored_sessions: for s in monitor.monitored_sessions: if "✅" in s: st.success(s) elif "👁️" in s: st.error(s) else: st.info(s) else: st.caption("暂无临近开场的监控目标。") if __name__ == "__main__": main()