import streamlit as st import requests import time import json import os import threading from datetime import datetime, timedelta, timezone, time as dt_time from collections import deque # --- 1. 配置与常量 --- PUSHOVER_TOKEN = "a5cynwsr19y8bdvn2y447bg3vc51fr" PUSHOVER_USER = "uc86mdifsboas7bczuxmnoyq6qyouw" TOKEN_FILE = 'token_data.json' CINEMA_ID = "44001291" # 页面配置 st.set_page_config(page_title="影城防撤场监控系统 Pro", page_icon="🛡️", layout="wide") # --- 2. 核心时间处理 --- def get_beijing_now(): """获取当前的北京时间 (Naive)""" utc_now = datetime.now(timezone.utc) beijing_now = utc_now.astimezone(timezone(timedelta(hours=8))) return beijing_now.replace(tzinfo=None) def get_business_date(): """获取当前的营业日日期""" now = get_beijing_now() if now.time() < dt_time(6, 0): return (now - timedelta(days=1)).strftime("%Y-%m-%d") return now.strftime("%Y-%m-%d") def parse_show_datetime(date_str, time_str): """解析排片时间""" try: show_t = datetime.strptime(time_str, "%H:%M").time() base_date = datetime.strptime(date_str, "%Y-%m-%d") if show_t < dt_time(6, 0): base_date += timedelta(days=1) return datetime.combine(base_date.date(), show_t) except: return None def send_pushover(title, message, priority=0): url = "https://api.pushover.net/1/messages.json" data = { "token": PUSHOVER_TOKEN, "user": PUSHOVER_USER, "title": title, "message": message, "sound": "pushover", "priority": priority } try: requests.post(url, data=data, timeout=5) except: pass # --- 3. API 管理模块 --- class APIManager: def __init__(self, logger): self.last_login_fail_time = 0 self.logger = logger def save_token(self, token_data): try: with open(TOKEN_FILE, 'w', encoding='utf-8') as f: json.dump(token_data, f, ensure_ascii=False, indent=4) except: pass def load_token(self): if os.path.exists(TOKEN_FILE): try: with open(TOKEN_FILE, 'r', encoding='utf-8') as f: return json.load(f).get('token') except: pass return None def login(self): if time.time() - self.last_login_fail_time < 600: self.logger("⏳ 登录冷却中,跳过重试。") return None self.logger("🔄 尝试后台自动登录...") session = requests.Session() session.headers.update({'User-Agent': 'Mozilla/5.0'}) login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' login_data = { 'username': 'chenhy', 'password': '123456', 'type': '1', 'resCode': '44001291', 'deviceid': '1517bfd3f6e7fef1333', 'dtype': 'ios', } try: session.post(login_url, data=login_data, timeout=15) resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10) info = resp.json() if info.get("success") and info.get("data", {}).get("token"): self.save_token(info['data']) self.logger("✅ 登录成功。") return info['data']['token'] else: raise Exception("未获取到Token") except Exception as e: self.last_login_fail_time = time.time() msg = f"登录失败: {e}" self.logger(f"❌ {msg}") send_pushover("监控异常", msg, priority=0) return None def fetch_schedule(self, date_str): """ 获取排片数据。 返回 None 表示网络或API错误 (需要处理)。 返回 [] 表示当天没排片或数据为空。 """ token = self.load_token() if not token: token = self.login() if not token: return None url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)} headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} try: response = requests.get(url, params=params, headers=headers, timeout=15) data = response.json() if data.get('code') == 1: return data.get('data', []) elif data.get('code') == 500: self.logger("⚠️ Token失效,重试中...") token = self.login() if token: params['token'] = token response = requests.get(url, params=params, headers=headers, timeout=15) return response.json().get('data', []) return None # 其他错误码视为失败 except Exception as e: self.logger(f"API请求异常: {e}") return None # 网络异常返回 None # --- 4. 监控服务主逻辑 (修复版) --- class CinemaMonitor: def __init__(self): self.logs = deque(maxlen=50) self.status_text = "初始化中..." self.next_wakeup = None self.monitored_sessions = [] self.api = APIManager(self.log) self.current_business_date = None self.daily_schedule_cache = None self.zero_ticket_candidates = set() self.alerted_sessions = set() self.simulation_mode = False self.simulation_data = [] self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread.start() def log(self, msg): timestamp = get_beijing_now().strftime("%H:%M:%S") entry = f"[{timestamp}] {msg}" print(entry) self.logs.appendleft(entry) def _run_loop(self): self.log("🚀 监控服务已启动 (严格实时数据版)") while True: try: now = get_beijing_now() biz_date = get_business_date() # --- 1. 非营业时间休眠 --- start_check_time = now.replace(hour=9, minute=30, second=0, microsecond=0) is_early_morning = (dt_time(6,0) <= now.time() < dt_time(9,30)) if is_early_morning and not self.simulation_mode: self.status_text = "非监控时段 (等待 09:30)" self.next_wakeup = start_check_time self.zero_ticket_candidates.clear() self.alerted_sessions.clear() sleep_sec = (start_check_time - now).total_seconds() self.log(f"💤 系统休眠中... 预计唤醒: {start_check_time.strftime('%H:%M')}") time.sleep(min(sleep_sec, 300)) continue # --- 2. 缓存刷新 --- if self.daily_schedule_cache is None or self.current_business_date != biz_date: self.log(f"📅 获取 {biz_date} 全天排片...") schedule = self.api.fetch_schedule(biz_date) if schedule is None: self.log("❌ 初始化获取失败,1分钟后重试") time.sleep(60) continue self.daily_schedule_cache = schedule self.current_business_date = biz_date self.zero_ticket_candidates.clear() self.alerted_sessions.clear() self.log(f"✅ 数据已更新,共 {len(schedule)} 场") # --- 3. 筛选窗口期场次 --- active_check_needed = False min_sleep_seconds = 3600 sessions_in_window = [] current_schedule = self.daily_schedule_cache + self.simulation_data for item in current_schedule: start_time_str = item.get('showStartTime') if not start_time_str: continue show_dt = parse_show_datetime(biz_date, start_time_str) if not show_dt: continue if now >= show_dt: continue monitor_start_dt = show_dt - timedelta(minutes=11) if now >= monitor_start_dt: sessions_in_window.append({'data': item, 'dt': show_dt}) active_check_needed = True else: seconds_until_window = (monitor_start_dt - now).total_seconds() if seconds_until_window < min_sleep_seconds: min_sleep_seconds = seconds_until_window # --- 4. 严格执行逻辑 --- if active_check_needed: self.status_text = "🔥 监控进行中 (高频轮询)" self.next_wakeup = now + timedelta(seconds=60) # 关键修复:获取实时数据 realtime_schedule = self.api.fetch_schedule(biz_date) # 如果网络失败,realtime_schedule 为 None if realtime_schedule is None and not self.simulation_mode: self.log("⚠️ 网络/API请求失败,跳过本次判定,防止误报!") time.sleep(60) continue if realtime_schedule is None: realtime_schedule = [] if self.simulation_mode: realtime_schedule += self.simulation_data realtime_map = {f"{x.get('hallId')}_{x.get('showStartTime')}": x for x in realtime_schedule} display_monitors = [] for session in sessions_in_window: unique_id = f"{biz_date}_{session['data'].get('hallId')}_{session['data'].get('showStartTime')}" key_for_map = f"{session['data'].get('hallId')}_{session['data'].get('showStartTime')}" # 关键修复:绝对不能使用 fallback 回退到缓存数据! latest = realtime_map.get(key_for_map) if not latest: # 如果实时数据里找不到这一场(可能被删除了,或者数据没拉全),跳过判断 continue movie_name = latest.get('movieName') hall_name = latest.get('hallName') sold = int(latest.get('soldTicketNum', 0)) start_str = latest.get('showStartTime') mins_left = (session['dt'] - now).total_seconds() / 60 # === 判定逻辑 === if sold == 0: # 确认为0票,加入候选 self.zero_ticket_candidates.add(unique_id) display_monitors.append(f"👁️ {start_str} {movie_name} (0票, 监控中)") else: # 现在的票数 > 0 if unique_id in self.zero_ticket_candidates: # 之前我们确认过它是0票,现在变了 -> 报警 if unique_id not in self.alerted_sessions: self.log(f"🚨 突发购票检测!{hall_name}《{movie_name}》") send_pushover( f"禁止撤场:{hall_name}有票!", f"《{movie_name}》{start_str}\n倒计时 {mins_left:.1f}分\n⚠️ 从0票突增至{sold}张", priority=0 ) self.alerted_sessions.add(unique_id) self.zero_ticket_candidates.remove(unique_id) # 移出监控 display_monitors.append(f"✅ {start_str} {movie_name} (新售出{sold}张, 已报警)") else: # 它从未进入过0票名单,说明一开始就有票 -> 忽略 display_monitors.append(f"🛡️ {start_str} {movie_name} (原有{sold}张, 忽略)") self.monitored_sessions = display_monitors time.sleep(60) else: if 0 < min_sleep_seconds < 86400: wakeup_dt = now + timedelta(seconds=min_sleep_seconds) self.status_text = "💤 智能休眠中" self.next_wakeup = wakeup_dt self.monitored_sessions = [] self.log(f"休眠 {min_sleep_seconds/60:.1f} 分钟,直到 {wakeup_dt.strftime('%H:%M')}") time.sleep(min_sleep_seconds) else: self.log("今日无更多监控场次,休眠 5 分钟") time.sleep(300) except Exception as e: self.log(f"主循环异常: {e}") time.sleep(60) def trigger_simulation(self): self.simulation_mode = True fake_time = (get_beijing_now() + timedelta(minutes=5)).strftime("%H:%M") # 1. 先注入 0 票 self.simulation_data = [{ 'movieName': '测试影片-防误报验证', 'hallName': '模拟厅', 'hallId': 'SIM_SAFE', 'showStartTime': fake_time, 'soldTicketNum': '0' }] self.log(f"已注入 0票 模拟场次: {fake_time} 开场") # 2. 10秒后改为 1 票 def simulate_buy(): time.sleep(10) self.simulation_data[0]['soldTicketNum'] = '1' self.log("⚡ [测试] 模拟动作:购票成功!应触发报警。") threading.Thread(target=simulate_buy).start() # --- 5. Streamlit 前端 --- @st.cache_resource def get_monitor(): return CinemaMonitor() def main(): monitor = get_monitor() st.title("📽️ 影城排程防撤场监控终端 Pro") utc_now = datetime.now(timezone.utc).strftime("%H:%M:%S") bj_now = get_beijing_now().strftime("%H:%M:%S") c1, c2, c3 = st.columns(3) with c1: st.metric("当前状态", monitor.status_text) with c2: st.metric("下次唤醒 (北京时间)", monitor.next_wakeup.strftime("%H:%M:%S") if monitor.next_wakeup else "--") with c3: st.metric("系统时间 (UTC / BJ)", f"{utc_now} / {bj_now}") st.divider() col_logs, col_list = st.columns([3, 2]) with col_logs: st.subheader("📜 系统运行日志") if st.button("刷新日志"): pass st.text_area("Logs", "\n".join(list(monitor.logs)), height=450, disabled=True) with col_list: st.subheader("🎯 实时监控列表") st.info("说明:👁️=0票监控中 | ✅=突发购票(报警) | 🛡️=原有票(安全)") if monitor.monitored_sessions: for s in monitor.monitored_sessions: if "✅" in s: st.success(s) elif "👁️" in s: st.error(s) else: st.caption(s) else: st.info("当前休眠中,无临近场次") st.write("---") if st.button("模拟 '0票 -> 1票' 验证"): monitor.trigger_simulation() st.toast("测试已触发") if __name__ == "__main__": main()