Spaces:
Running
Running
| import streamlit as st | |
| import requests | |
| import time | |
| import json | |
| import os | |
| import threading | |
| import re | |
| import urllib3 | |
| from datetime import datetime, timedelta, timezone, time as dt_time | |
| from collections import deque | |
| from dotenv import load_dotenv | |
| # --- 0. 基础配置 --- | |
| st.set_page_config(page_title="影城防撤场监控系统", page_icon="🛡️", layout="wide") | |
| # 屏蔽 HTTPS 证书警告 | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| # 尝试导入自动刷新组件 | |
| try: | |
| from streamlit_autorefresh import st_autorefresh | |
| except ImportError: | |
| st_autorefresh = None | |
| # --- 1. 配置与常量 --- | |
| load_dotenv() | |
| # Pushover 配置 | |
| PUSHOVER_TOKEN = os.getenv("PUSHOVER_TOKEN") | |
| PUSHOVER_USER = os.getenv("PUSHOVER_USER") | |
| # 微信配置 | |
| WX_APP_ID = os.getenv("WX_APP_ID") | |
| WX_APP_SECRET = os.getenv("WX_APP_SECRET") | |
| # 支持多个用户,逗号分隔 | |
| WX_USER_OPEN_IDS = os.getenv("WX_USER_OPEN_ID", "").split(',') | |
| WX_TEMPLATE_ID_TICKET = os.getenv("WX_TEMPLATE_ID_TICKET") | |
| WX_TEMPLATE_ID_DAILY = os.getenv("WX_TEMPLATE_ID_DAILY") | |
| # 影城配置 | |
| CINEMA_ID = os.getenv("CINEMA_ID") | |
| # 获取 Token 文件路径 | |
| CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| ROOT_DIR = os.path.dirname(CURRENT_DIR) | |
| TOKEN_FILE = os.path.join(ROOT_DIR, 'token_data.json') | |
| # --- 2. 工具函数 --- | |
| def get_beijing_now(): | |
| """获取当前的北京时间""" | |
| utc_now = datetime.now(timezone.utc) | |
| beijing_now = utc_now.astimezone(timezone(timedelta(hours=8))) | |
| return beijing_now.replace(tzinfo=None) | |
| def get_business_date(): | |
| """获取当前的营业日日期""" | |
| now = get_beijing_now() | |
| if now.time() < dt_time(6, 0): | |
| return (now - timedelta(days=1)).strftime("%Y-%m-%d") | |
| return now.strftime("%Y-%m-%d") | |
| def parse_show_datetime(date_str, time_str): | |
| """解析排片时间 (处理跨天)""" | |
| try: | |
| show_t = datetime.strptime(time_str, "%H:%M").time() | |
| base_date = datetime.strptime(date_str, "%Y-%m-%d") | |
| if show_t < dt_time(6, 0): | |
| base_date += timedelta(days=1) | |
| return datetime.combine(base_date.date(), show_t) | |
| except: | |
| return None | |
| def simplify_hall_name(raw_name): | |
| """ | |
| 影厅名称简化 | |
| 例如:'【和成天下1号厅】...' -> '1号厅' | |
| '6号激光厅' -> '6号厅' | |
| """ | |
| if not raw_name: | |
| return "未知影厅" | |
| match = re.search(r'(\d+)号', raw_name) | |
| if match: | |
| return f"{match.group(1)}号厅" | |
| return raw_name | |
| # --- 3. 微信推送模块 (含自动重试) --- | |
| class WeChatPusher: | |
| def __init__(self, app_id, app_secret): | |
| self.app_id = app_id | |
| self.app_secret = app_secret | |
| self.token = None | |
| self.token_expires_time = 0 | |
| def get_access_token(self, force_refresh=False): | |
| """获取并缓存微信 Access Token | |
| :param force_refresh: 是否强制刷新 Token | |
| """ | |
| if not force_refresh and self.token and datetime.now().timestamp() < self.token_expires_time: | |
| return self.token | |
| if not self.app_id or not self.app_secret: | |
| return None | |
| url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.app_id}&secret={self.app_secret}" | |
| try: | |
| resp = requests.get(url, timeout=10) | |
| data = resp.json() | |
| if 'access_token' in data: | |
| self.token = data['access_token'] | |
| # 提前 200 秒过期,防止临界点问题 | |
| self.token_expires_time = datetime.now().timestamp() + data['expires_in'] - 200 | |
| return self.token | |
| else: | |
| print(f"❌ 微信 Token 获取失败: {data}") | |
| except Exception as e: | |
| print(f"❌ 微信 API 请求异常: {e}") | |
| return None | |
| def send_template_message(self, user_ids, template_id, data_map): | |
| """发送模板消息给多个用户 (带 40001 错误重试机制)""" | |
| # 组装微信数据格式 | |
| formatted_data = {} | |
| for key, value in data_map.items(): | |
| formatted_data[key] = {"value": value, "color": "#173177"} | |
| # 清洗 user_ids | |
| valid_ids = [uid.strip() for uid in user_ids if uid and uid.strip()] | |
| if not valid_ids: | |
| return False | |
| success_any = False | |
| for user_id in valid_ids: | |
| # 每个用户最多尝试 2 次(1次正常,1次刷新 Token 后重试) | |
| for attempt in range(2): | |
| # 如果是重试(attempt > 0),强制刷新 Token | |
| token = self.get_access_token(force_refresh=(attempt > 0)) | |
| if not token: | |
| break # Token 都拿不到,不用重试了 | |
| url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={token}" | |
| payload = { | |
| "touser": user_id, | |
| "template_id": template_id, | |
| "data": formatted_data | |
| } | |
| try: | |
| resp = requests.post(url, json=payload, timeout=10) | |
| result = resp.json() | |
| errcode = result.get("errcode") | |
| if errcode == 0: | |
| # 发送成功 | |
| # print(f"✅ 微信发送成功 [{user_id}]") | |
| success_any = True | |
| break # 跳出重试循环,处理下一个用户 | |
| # 40001: Token 无效 | |
| # 40014: Token 不合法 | |
| # 42001: Token 过期 | |
| elif errcode in [40001, 40014, 42001]: | |
| print(f"🔄 微信 Token 失效 ({errcode}),正在刷新重试 [{user_id}]...") | |
| continue # 进入下一次循环,强制刷新 Token | |
| else: | |
| print(f"❌ 微信发送失败 [{user_id}]: {result}") | |
| break # 其他错误,不重试 | |
| except Exception as e: | |
| print(f"❌ 微信发送异常 [{user_id}]: {e}") | |
| break | |
| return success_any | |
| # --- 4. API 管理模块 --- | |
| class APIManager: | |
| def __init__(self, logger): | |
| self.last_login_fail_time = 0 | |
| self.logger = logger | |
| def save_token(self, token_data): | |
| try: | |
| with open(TOKEN_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(token_data, f, ensure_ascii=False, indent=4) | |
| except: | |
| pass | |
| def load_token(self): | |
| if os.path.exists(TOKEN_FILE): | |
| try: | |
| with open(TOKEN_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f).get('token') | |
| except: | |
| pass | |
| return None | |
| def login(self): | |
| if time.time() - self.last_login_fail_time < 600: | |
| self.logger("⏳ 登录冷却中,跳过重试。") | |
| return None | |
| self.logger("🔄 尝试后台自动登录...") | |
| username = os.getenv("CINEMA_USERNAME") | |
| password = os.getenv("CINEMA_PASSWORD") | |
| res_code = os.getenv("CINEMA_RES_CODE") | |
| device_id = os.getenv("CINEMA_DEVICE_ID") | |
| if not all([username, password, res_code]): | |
| self.logger("❌ 环境变量缺失,无法自动登录") | |
| return None | |
| session = requests.Session() | |
| session.headers.update({'User-Agent': 'Mozilla/5.0'}) | |
| login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action' | |
| login_data = { | |
| 'username': username, 'password': password, 'type': '1', | |
| 'resCode': res_code, 'deviceid': device_id, 'dtype': 'ios', | |
| } | |
| try: | |
| session.post(login_url, data=login_data, timeout=15) | |
| resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10) | |
| info = resp.json() | |
| if info.get("success") and info.get("data", {}).get("token"): | |
| self.save_token(info['data']) | |
| self.logger("✅ 登录成功。") | |
| return info['data']['token'] | |
| else: | |
| raise Exception("未获取到Token") | |
| except Exception as e: | |
| self.last_login_fail_time = time.time() | |
| self.logger(f"❌ 登录失败: {e}") | |
| return None | |
| def fetch_schedule(self, date_str): | |
| token = self.load_token() | |
| if not token: | |
| token = self.login() | |
| if not token: return None | |
| url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo' | |
| params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)} | |
| headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'} | |
| try: | |
| response = requests.get(url, params=params, headers=headers, timeout=15) | |
| data = response.json() | |
| if data.get('code') == 1: | |
| return data.get('data', []) | |
| elif data.get('code') == 500: | |
| self.logger("⚠️ Token失效,重试中...") | |
| token = self.login() | |
| if token: | |
| params['token'] = token | |
| response = requests.get(url, params=params, headers=headers, timeout=15) | |
| return response.json().get('data', []) | |
| return None | |
| except Exception as e: | |
| self.logger(f"API请求异常: {e}") | |
| return None | |
| # --- 5. 监控服务主逻辑 --- | |
| class CinemaMonitor: | |
| def __init__(self): | |
| self.logs = deque(maxlen=50) | |
| self.status_text = "初始化中..." | |
| self.next_wakeup = None | |
| self.monitored_sessions = [] | |
| self.api = APIManager(self.log) | |
| self.wx_pusher = WeChatPusher(WX_APP_ID, WX_APP_SECRET) | |
| self.current_business_date = None | |
| self.daily_schedule_cache = None | |
| self.zero_ticket_candidates = set() | |
| self.alerted_sessions = set() | |
| self.last_daily_report_date = None | |
| # 统计数据 (用于每日报告) | |
| self.stats = { | |
| "zero_total": 0, # 纳入监控的0票总场次 | |
| "alert_count": 0, # 触发通知次数 | |
| "api_fails": 0, # API获取失败次数 | |
| "notify_fails": 0 # 发送通知失败次数 | |
| } | |
| self.thread = threading.Thread(target=self._run_loop, daemon=True) | |
| self.thread.start() | |
| def log(self, msg): | |
| timestamp = get_beijing_now().strftime("%H:%M:%S") | |
| entry = f"[{timestamp}] {msg}" | |
| print(entry) | |
| self.logs.appendleft(entry) | |
| def _send_pushover(self, title, message, priority=0): | |
| """发送 Pushover 通知""" | |
| if not PUSHOVER_TOKEN or not PUSHOVER_USER: | |
| return False | |
| url = "https://api.pushover.net/1/messages.json" | |
| data = { | |
| "token": PUSHOVER_TOKEN, | |
| "user": PUSHOVER_USER, | |
| "title": title, | |
| "message": message, | |
| "sound": "pushover", | |
| "priority": priority | |
| } | |
| try: | |
| response = requests.post(url, data=data, timeout=10) | |
| if response.status_code == 200: | |
| return True | |
| else: | |
| print(f"❌ Pushover 拒绝: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| print(f"❌ Pushover 网络异常: {e}") | |
| return False | |
| def send_ticket_alert(self, hall_name, movie_name, show_time, ticket_count): | |
| """ | |
| 发送突发购票告警 (Pushover + WeChat) | |
| """ | |
| # 1. Pushover | |
| title = f"🎟 发现 {hall_name} {show_time} 场次有票!" | |
| msg = f"{hall_name} {show_time}《{movie_name}》\n⚠️ 突增至 {ticket_count} 张" | |
| if not self._send_pushover(title, msg, priority=1): | |
| self.stats["notify_fails"] += 1 | |
| # 2. WeChat | |
| if WX_TEMPLATE_ID_TICKET: | |
| data = { | |
| "first": f"发现 {hall_name} {show_time} 场次有票!\n{hall_name} {show_time}《{movie_name}》\n⚠️ 突增至 {ticket_count} 张\n", | |
| "keyword1": hall_name, | |
| "keyword2": movie_name, | |
| "keyword3": show_time, | |
| "keyword4": f"{ticket_count} 张", | |
| "remark": "\n请尽快处理。" | |
| } | |
| # 如果所有用户发送都失败才算失败 | |
| if not self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_TICKET, data): | |
| self.stats["notify_fails"] += 1 | |
| def send_daily_report(self, date_str): | |
| """ | |
| 发送每日统计报告 (Pushover + WeChat) | |
| """ | |
| # 1. Pushover | |
| stats_text_full = ( | |
| f"0票总场次:{self.stats['zero_total']}\n" | |
| f"触发通知次数:{self.stats['alert_count']}\n" | |
| f"发送通知失败次数:{self.stats['notify_fails']}\n" | |
| f"API获取失败次数:{self.stats['api_fails']}" | |
| ) | |
| title = "🟢 0票场次开场前10分钟内突发购票检查就绪" | |
| msg = ( | |
| f"{date_str},今日排片数据已加载,开始智能检查。\n" | |
| f"昨日情况:\n{stats_text_full}" | |
| ) | |
| if not self._send_pushover(title, msg, priority=0): | |
| self.stats["notify_fails"] += 1 | |
| # 2. WeChat | |
| if WX_TEMPLATE_ID_DAILY: | |
| # 格式化简短的昨日数据,避免微信字段超长 | |
| stats_short = ( | |
| f"0票场次({self.stats['zero_total']}) " | |
| f"触发({self.stats['alert_count']}) " | |
| f"s失败({self.stats['notify_fails']}) " | |
| f"g失败({self.stats['api_fails']})" | |
| ) | |
| data = { | |
| "first": f"0票场次开场前10分钟内突发购票检查就绪\n{date_str},今日排片数据已加载,开始智能检查。\n", | |
| "keyword1": "0票场次开场前10分钟内突发购票", | |
| "keyword2": date_str, | |
| "keyword3": stats_short, | |
| "remark": "\n服务正常运行中。" | |
| } | |
| if not self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_DAILY, data): | |
| self.stats["notify_fails"] += 1 | |
| def _run_loop(self): | |
| self.log("🚀 监控服务已启动 (WeChat + Pushover)") | |
| while True: | |
| try: | |
| now = get_beijing_now() | |
| biz_date = get_business_date() | |
| today_str = now.strftime("%Y-%m-%d") | |
| # --- 每日健康检查 (09:00) --- | |
| if now.time() >= dt_time(9, 0) and self.last_daily_report_date != today_str: | |
| report_date_str = now.strftime("%Y年%m月%d日") | |
| self.log(f"🔔 发送每日统计报告: {today_str}") | |
| # 发送报告 | |
| self.send_daily_report(report_date_str) | |
| # 重置统计数据 & 更新日期 | |
| self.last_daily_report_date = today_str | |
| self.stats = { | |
| "zero_total": 0, "alert_count": 0, | |
| "api_fails": 0, "notify_fails": 0 | |
| } | |
| # 休眠逻辑 (06:00 - 09:30) | |
| start_check_time = now.replace(hour=9, minute=30, second=0, microsecond=0) | |
| is_early_morning = (dt_time(6, 0) <= now.time() < dt_time(9, 30)) | |
| if is_early_morning: | |
| self.status_text = "非监控时段 (等待 09:30)" | |
| self.next_wakeup = start_check_time | |
| self.zero_ticket_candidates.clear() | |
| self.alerted_sessions.clear() | |
| time.sleep(60) | |
| continue | |
| # 缓存刷新 | |
| if self.daily_schedule_cache is None or self.current_business_date != biz_date: | |
| self.log(f"📅 获取 {biz_date} 全天排片...") | |
| schedule = self.api.fetch_schedule(biz_date) | |
| if schedule is None: | |
| self.log("❌ 初始化获取失败,1分钟后重试") | |
| self.stats["api_fails"] += 1 | |
| time.sleep(60) | |
| continue | |
| self.daily_schedule_cache = schedule | |
| self.current_business_date = biz_date | |
| self.zero_ticket_candidates.clear() | |
| self.alerted_sessions.clear() | |
| self.log(f"✅ 数据已更新,共 {len(schedule)} 场") | |
| # 筛选窗口期 | |
| active_check_needed = False | |
| min_sleep_seconds = 3600 | |
| sessions_in_window = [] | |
| current_schedule = self.daily_schedule_cache | |
| for item in current_schedule: | |
| start_time_str = item.get('showStartTime') | |
| if not start_time_str: continue | |
| show_dt = parse_show_datetime(biz_date, start_time_str) | |
| if not show_dt: continue | |
| if now >= show_dt: continue # 已开场 | |
| monitor_start_dt = show_dt - timedelta(minutes=11) | |
| if now >= monitor_start_dt: | |
| sessions_in_window.append({'data': item, 'dt': show_dt}) | |
| active_check_needed = True | |
| else: | |
| seconds_until_window = (monitor_start_dt - now).total_seconds() | |
| if seconds_until_window < min_sleep_seconds: | |
| min_sleep_seconds = seconds_until_window | |
| # 执行监控 | |
| if active_check_needed: | |
| self.status_text = "🔥 监控进行中" | |
| self.next_wakeup = now + timedelta(seconds=60) | |
| realtime_schedule = self.api.fetch_schedule(biz_date) | |
| if realtime_schedule is None: | |
| self.log("⚠️ API请求失败,跳过本次判定") | |
| self.stats["api_fails"] += 1 | |
| time.sleep(60) | |
| continue | |
| realtime_map = {f"{x.get('hallId')}_{x.get('showStartTime')}": x for x in realtime_schedule} | |
| display_monitors = [] | |
| for session in sessions_in_window: | |
| key = f"{session['data'].get('hallId')}_{session['data'].get('showStartTime')}" | |
| unique_id = f"{biz_date}_{key}" | |
| latest = realtime_map.get(key) | |
| if not latest: continue | |
| movie = latest.get('movieName') | |
| hall_raw = latest.get('hallName') | |
| # 影厅名简化 | |
| hall_short = simplify_hall_name(hall_raw) | |
| sold = int(latest.get('soldTicketNum', 0)) | |
| start = latest.get('showStartTime') | |
| if sold == 0: | |
| # 统计新加入的0票场次 | |
| if unique_id not in self.zero_ticket_candidates: | |
| self.stats["zero_total"] += 1 | |
| self.zero_ticket_candidates.add(unique_id) | |
| display_monitors.append(f"👁️ {start} {movie} (0票)") | |
| else: | |
| if unique_id in self.zero_ticket_candidates: | |
| if unique_id not in self.alerted_sessions: | |
| self.log(f"🚨 突发购票!{hall_short}《{movie}》") | |
| # 统计触发次数 | |
| self.stats["alert_count"] += 1 | |
| # 发送通知 (Pushover + WeChat) | |
| self.send_ticket_alert(hall_short, movie, start, sold) | |
| self.alerted_sessions.add(unique_id) | |
| self.zero_ticket_candidates.remove(unique_id) | |
| display_monitors.append(f"✅ {start} {movie} (新售出)") | |
| else: | |
| display_monitors.append(f"🛡️ {start} {movie} (原有票)") | |
| self.monitored_sessions = display_monitors | |
| time.sleep(60) | |
| else: | |
| if 0 < min_sleep_seconds < 86400: | |
| wakeup_dt = now + timedelta(seconds=min_sleep_seconds) | |
| self.status_text = "💤 智能休眠中" | |
| self.next_wakeup = wakeup_dt | |
| self.monitored_sessions = [] | |
| self.log(f"休眠 {min_sleep_seconds / 60:.1f} 分钟,直到 {wakeup_dt.strftime('%H:%M')}") | |
| time.sleep(min_sleep_seconds) | |
| else: | |
| self.log("今日监控结束,长休眠") | |
| time.sleep(300) | |
| except Exception as e: | |
| self.log(f"主循环异常: {e}") | |
| time.sleep(60) | |
| # --- 6. Streamlit 前端 --- | |
| def get_monitor(): | |
| return CinemaMonitor() | |
| def main(): | |
| monitor = get_monitor() | |
| # 自动刷新:30秒 | |
| if st_autorefresh: | |
| st_autorefresh(interval=30 * 1000, key="monitor_refresh") | |
| st.title("📢 零票影前十分钟内突发购票检查") | |
| c1, c2, c3 = st.columns(3) | |
| with c1: | |
| st.metric("运行状态", monitor.status_text) | |
| with c2: | |
| wakeup_str = monitor.next_wakeup.strftime("%H:%M:%S") if monitor.next_wakeup else "--" | |
| st.metric("下次唤醒", wakeup_str) | |
| # --- 统计逻辑 --- | |
| remaining_zero_count = 0 | |
| remaining_total_count = 0 | |
| if monitor.daily_schedule_cache: | |
| now = get_beijing_now() | |
| biz_date = monitor.current_business_date or get_business_date() | |
| for item in monitor.daily_schedule_cache: | |
| start_str = item.get('showStartTime') | |
| if not start_str: continue | |
| # 解析时间 | |
| show_dt = parse_show_datetime(biz_date, start_str) | |
| # 只统计“当前时间之后”的场次 | |
| if show_dt and show_dt > now: | |
| remaining_total_count += 1 | |
| if int(item.get('soldTicketNum', 0)) == 0: | |
| remaining_zero_count += 1 | |
| with c3: | |
| st.metric("剩余 0 票场次 / 剩余总场次", f"{remaining_zero_count} / {remaining_total_count}") | |
| st.divider() | |
| col_logs, col_list = st.columns([3, 2]) | |
| with col_logs: | |
| st.subheader("📜 运行日志") | |
| log_text = "\n".join(list(monitor.logs)) | |
| st.text_area("Logs", log_text, height=500, disabled=True) | |
| with col_list: | |
| st.subheader("🎯 当前监控窗口 (开场前10分)") | |
| if monitor.monitored_sessions: | |
| for s in monitor.monitored_sessions: | |
| if "✅" in s: | |
| st.success(s) | |
| elif "👁️" in s: | |
| st.error(s) | |
| else: | |
| st.info(s) | |
| else: | |
| st.caption("暂无临近开场的监控目标。") | |
| if __name__ == "__main__": | |
| main() |