Spaces:
Paused
Paused
| import streamlit as st | |
| import requests | |
| import time | |
| import json | |
| import os | |
| import threading | |
| from datetime import datetime, timedelta, timezone, time as dt_time | |
| from collections import deque | |
| # --- 1. 配置与常量 --- | |
| PUSHOVER_TOKEN = "a5cynwsr19y8bdvn2y447bg3vc51fr" | |
| PUSHOVER_USER = "uc86mdifsboas7bczuxmnoyq6qyouw" | |
| TOKEN_FILE = 'token_data.json' | |
| CINEMA_ID = "44001291" | |
| # 页面配置 | |
| st.set_page_config(page_title="影城防撤场监控系统 Pro", page_icon="🛡️", layout="wide") | |
| # --- 2. 核心时间处理 --- | |
| def get_beijing_now(): | |
| """获取当前的北京时间 (Naive)""" | |
| utc_now = datetime.now(timezone.utc) | |
| beijing_now = utc_now.astimezone(timezone(timedelta(hours=8))) | |
| return beijing_now.replace(tzinfo=None) | |
| def get_business_date(): | |
| """获取当前的营业日日期""" | |
| now = get_beijing_now() | |
| if now.time() < dt_time(6, 0): | |
| return (now - timedelta(days=1)).strftime("%Y-%m-%d") | |
| return now.strftime("%Y-%m-%d") | |
| def parse_show_datetime(date_str, time_str): | |
| """解析排片时间""" | |
| try: | |
| show_t = datetime.strptime(time_str, "%H:%M").time() | |
| base_date = datetime.strptime(date_str, "%Y-%m-%d") | |
| if show_t < dt_time(6, 0): | |
| base_date += timedelta(days=1) | |
| return datetime.combine(base_date.date(), show_t) | |
| except: | |
| return None | |
| def send_pushover(title, message, priority=0): | |
| url = "https://api.pushover.net/1/messages.json" | |
| data = { | |
| "token": PUSHOVER_TOKEN, | |
| "user": PUSHOVER_USER, | |
| "title": title, | |
| "message": message, | |
| "sound": "pushover", | |
| "priority": priority | |
| } | |
| try: | |
| requests.post(url, data=data, timeout=5) | |
| except: | |
| pass | |
| # --- 3. API 管理模块 --- | |
| class APIManager: | |
| def __init__(self, logger): | |
| self.last_login_fail_time = 0 | |
| self.logger = logger | |
| def save_token(self, token_data): | |
| try: | |
| with open(TOKEN_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(token_data, f, ensure_ascii=False, indent=4) | |
| except: | |
| pass | |
| def load_token(self): | |
| if os.path.exists(TOKEN_FILE): | |
| try: | |
| with open(TOKEN_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f).get('token') | |
| except: | |
| pass | |
| return None | |
| def login(self): | |
| if time.time() - self.last_login_fail_time < 600: | |
| self.logger("⏳ 登录冷却中,跳过重试。") | |
| return None | |
| self.logger("🔄 尝试后台自动登录...") | |
| session = requests.Session() | |
| session.headers.update({'User-Agent': 'Mozilla/5.0'}) | |
| login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' | |
| login_data = { | |
| 'username': 'chenhy', 'password': '123456', 'type': '1', | |
| 'resCode': '44001291', 'deviceid': '1517bfd3f6e7fef1333', 'dtype': 'ios', | |
| } | |
| try: | |
| session.post(login_url, data=login_data, timeout=15) | |
| resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10) | |
| info = resp.json() | |
| if info.get("success") and info.get("data", {}).get("token"): | |
| self.save_token(info['data']) | |
| self.logger("✅ 登录成功。") | |
| return info['data']['token'] | |
| else: | |
| raise Exception("未获取到Token") | |
| except Exception as e: | |
| self.last_login_fail_time = time.time() | |
| msg = f"登录失败: {e}" | |
| self.logger(f"❌ {msg}") | |
| send_pushover("监控异常", msg, priority=0) | |
| return None | |
| def fetch_schedule(self, date_str): | |
| """ | |
| 获取排片数据。 | |
| 返回 None 表示网络或API错误 (需要处理)。 | |
| 返回 [] 表示当天没排片或数据为空。 | |
| """ | |
| token = self.load_token() | |
| if not token: | |
| token = self.login() | |
| if not token: return None | |
| url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' | |
| params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)} | |
| headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} | |
| try: | |
| response = requests.get(url, params=params, headers=headers, timeout=15) | |
| data = response.json() | |
| if data.get('code') == 1: | |
| return data.get('data', []) | |
| elif data.get('code') == 500: | |
| self.logger("⚠️ Token失效,重试中...") | |
| token = self.login() | |
| if token: | |
| params['token'] = token | |
| response = requests.get(url, params=params, headers=headers, timeout=15) | |
| return response.json().get('data', []) | |
| return None # 其他错误码视为失败 | |
| except Exception as e: | |
| self.logger(f"API请求异常: {e}") | |
| return None # 网络异常返回 None | |
| # --- 4. 监控服务主逻辑 (修复版) --- | |
| class CinemaMonitor: | |
| def __init__(self): | |
| self.logs = deque(maxlen=50) | |
| self.status_text = "初始化中..." | |
| self.next_wakeup = None | |
| self.monitored_sessions = [] | |
| self.api = APIManager(self.log) | |
| self.current_business_date = None | |
| self.daily_schedule_cache = None | |
| self.zero_ticket_candidates = set() | |
| self.alerted_sessions = set() | |
| self.simulation_mode = False | |
| self.simulation_data = [] | |
| self.thread = threading.Thread(target=self._run_loop, daemon=True) | |
| self.thread.start() | |
| def log(self, msg): | |
| timestamp = get_beijing_now().strftime("%H:%M:%S") | |
| entry = f"[{timestamp}] {msg}" | |
| print(entry) | |
| self.logs.appendleft(entry) | |
| def _run_loop(self): | |
| self.log("🚀 监控服务已启动 (严格实时数据版)") | |
| while True: | |
| try: | |
| now = get_beijing_now() | |
| biz_date = get_business_date() | |
| # --- 1. 非营业时间休眠 --- | |
| start_check_time = now.replace(hour=9, minute=30, second=0, microsecond=0) | |
| is_early_morning = (dt_time(6,0) <= now.time() < dt_time(9,30)) | |
| if is_early_morning and not self.simulation_mode: | |
| self.status_text = "非监控时段 (等待 09:30)" | |
| self.next_wakeup = start_check_time | |
| self.zero_ticket_candidates.clear() | |
| self.alerted_sessions.clear() | |
| sleep_sec = (start_check_time - now).total_seconds() | |
| self.log(f"💤 系统休眠中... 预计唤醒: {start_check_time.strftime('%H:%M')}") | |
| time.sleep(min(sleep_sec, 300)) | |
| continue | |
| # --- 2. 缓存刷新 --- | |
| if self.daily_schedule_cache is None or self.current_business_date != biz_date: | |
| self.log(f"📅 获取 {biz_date} 全天排片...") | |
| schedule = self.api.fetch_schedule(biz_date) | |
| if schedule is None: | |
| self.log("❌ 初始化获取失败,1分钟后重试") | |
| time.sleep(60) | |
| continue | |
| self.daily_schedule_cache = schedule | |
| self.current_business_date = biz_date | |
| self.zero_ticket_candidates.clear() | |
| self.alerted_sessions.clear() | |
| self.log(f"✅ 数据已更新,共 {len(schedule)} 场") | |
| # --- 3. 筛选窗口期场次 --- | |
| active_check_needed = False | |
| min_sleep_seconds = 3600 | |
| sessions_in_window = [] | |
| current_schedule = self.daily_schedule_cache + self.simulation_data | |
| for item in current_schedule: | |
| start_time_str = item.get('showStartTime') | |
| if not start_time_str: continue | |
| show_dt = parse_show_datetime(biz_date, start_time_str) | |
| if not show_dt: continue | |
| if now >= show_dt: continue | |
| monitor_start_dt = show_dt - timedelta(minutes=11) | |
| if now >= monitor_start_dt: | |
| sessions_in_window.append({'data': item, 'dt': show_dt}) | |
| active_check_needed = True | |
| else: | |
| seconds_until_window = (monitor_start_dt - now).total_seconds() | |
| if seconds_until_window < min_sleep_seconds: | |
| min_sleep_seconds = seconds_until_window | |
| # --- 4. 严格执行逻辑 --- | |
| if active_check_needed: | |
| self.status_text = "🔥 监控进行中 (高频轮询)" | |
| self.next_wakeup = now + timedelta(seconds=60) | |
| # 关键修复:获取实时数据 | |
| realtime_schedule = self.api.fetch_schedule(biz_date) | |
| # 如果网络失败,realtime_schedule 为 None | |
| if realtime_schedule is None and not self.simulation_mode: | |
| self.log("⚠️ 网络/API请求失败,跳过本次判定,防止误报!") | |
| time.sleep(60) | |
| continue | |
| if realtime_schedule is None: realtime_schedule = [] | |
| if self.simulation_mode: realtime_schedule += self.simulation_data | |
| realtime_map = {f"{x.get('hallId')}_{x.get('showStartTime')}": x for x in realtime_schedule} | |
| display_monitors = [] | |
| for session in sessions_in_window: | |
| unique_id = f"{biz_date}_{session['data'].get('hallId')}_{session['data'].get('showStartTime')}" | |
| key_for_map = f"{session['data'].get('hallId')}_{session['data'].get('showStartTime')}" | |
| # 关键修复:绝对不能使用 fallback 回退到缓存数据! | |
| latest = realtime_map.get(key_for_map) | |
| if not latest: | |
| # 如果实时数据里找不到这一场(可能被删除了,或者数据没拉全),跳过判断 | |
| continue | |
| movie_name = latest.get('movieName') | |
| hall_name = latest.get('hallName') | |
| sold = int(latest.get('soldTicketNum', 0)) | |
| start_str = latest.get('showStartTime') | |
| mins_left = (session['dt'] - now).total_seconds() / 60 | |
| # === 判定逻辑 === | |
| if sold == 0: | |
| # 确认为0票,加入候选 | |
| self.zero_ticket_candidates.add(unique_id) | |
| display_monitors.append(f"👁️ {start_str} {movie_name} (0票, 监控中)") | |
| else: | |
| # 现在的票数 > 0 | |
| if unique_id in self.zero_ticket_candidates: | |
| # 之前我们确认过它是0票,现在变了 -> 报警 | |
| if unique_id not in self.alerted_sessions: | |
| self.log(f"🚨 突发购票检测!{hall_name}《{movie_name}》") | |
| send_pushover( | |
| f"禁止撤场:{hall_name}有票!", | |
| f"《{movie_name}》{start_str}\n倒计时 {mins_left:.1f}分\n⚠️ 从0票突增至{sold}张", | |
| priority=0 | |
| ) | |
| self.alerted_sessions.add(unique_id) | |
| self.zero_ticket_candidates.remove(unique_id) # 移出监控 | |
| display_monitors.append(f"✅ {start_str} {movie_name} (新售出{sold}张, 已报警)") | |
| else: | |
| # 它从未进入过0票名单,说明一开始就有票 -> 忽略 | |
| display_monitors.append(f"🛡️ {start_str} {movie_name} (原有{sold}张, 忽略)") | |
| self.monitored_sessions = display_monitors | |
| time.sleep(60) | |
| else: | |
| if 0 < min_sleep_seconds < 86400: | |
| wakeup_dt = now + timedelta(seconds=min_sleep_seconds) | |
| self.status_text = "💤 智能休眠中" | |
| self.next_wakeup = wakeup_dt | |
| self.monitored_sessions = [] | |
| self.log(f"休眠 {min_sleep_seconds/60:.1f} 分钟,直到 {wakeup_dt.strftime('%H:%M')}") | |
| time.sleep(min_sleep_seconds) | |
| else: | |
| self.log("今日无更多监控场次,休眠 5 分钟") | |
| time.sleep(300) | |
| except Exception as e: | |
| self.log(f"主循环异常: {e}") | |
| time.sleep(60) | |
| def trigger_simulation(self): | |
| self.simulation_mode = True | |
| fake_time = (get_beijing_now() + timedelta(minutes=5)).strftime("%H:%M") | |
| # 1. 先注入 0 票 | |
| self.simulation_data = [{ | |
| 'movieName': '测试影片-防误报验证', | |
| 'hallName': '模拟厅', | |
| 'hallId': 'SIM_SAFE', | |
| 'showStartTime': fake_time, | |
| 'soldTicketNum': '0' | |
| }] | |
| self.log(f"已注入 0票 模拟场次: {fake_time} 开场") | |
| # 2. 10秒后改为 1 票 | |
| def simulate_buy(): | |
| time.sleep(10) | |
| self.simulation_data[0]['soldTicketNum'] = '1' | |
| self.log("⚡ [测试] 模拟动作:购票成功!应触发报警。") | |
| threading.Thread(target=simulate_buy).start() | |
| # --- 5. Streamlit 前端 --- | |
| def get_monitor(): | |
| return CinemaMonitor() | |
| def main(): | |
| monitor = get_monitor() | |
| st.title("📽️ 影城排程防撤场监控终端 Pro") | |
| utc_now = datetime.now(timezone.utc).strftime("%H:%M:%S") | |
| bj_now = get_beijing_now().strftime("%H:%M:%S") | |
| c1, c2, c3 = st.columns(3) | |
| with c1: st.metric("当前状态", monitor.status_text) | |
| with c2: st.metric("下次唤醒 (北京时间)", monitor.next_wakeup.strftime("%H:%M:%S") if monitor.next_wakeup else "--") | |
| with c3: st.metric("系统时间 (UTC / BJ)", f"{utc_now} / {bj_now}") | |
| st.divider() | |
| col_logs, col_list = st.columns([3, 2]) | |
| with col_logs: | |
| st.subheader("📜 系统运行日志") | |
| if st.button("刷新日志"): pass | |
| st.text_area("Logs", "\n".join(list(monitor.logs)), height=450, disabled=True) | |
| with col_list: | |
| st.subheader("🎯 实时监控列表") | |
| st.info("说明:👁️=0票监控中 | ✅=突发购票(报警) | 🛡️=原有票(安全)") | |
| if monitor.monitored_sessions: | |
| for s in monitor.monitored_sessions: | |
| if "✅" in s: st.success(s) | |
| elif "👁️" in s: st.error(s) | |
| else: st.caption(s) | |
| else: | |
| st.info("当前休眠中,无临近场次") | |
| st.write("---") | |
| if st.button("模拟 '0票 -> 1票' 验证"): | |
| monitor.trigger_simulation() | |
| st.toast("测试已触发") | |
| if __name__ == "__main__": | |
| main() |