hengdian / pages /monitor.py
Ethscriptions's picture
Update pages/monitor.py
e00dc2a verified
import streamlit as st
import requests
import time
import json
import os
import threading
import re
import urllib3
from datetime import datetime, timedelta, timezone, time as dt_time
from collections import deque
from dotenv import load_dotenv
# --- 0. 基础配置 ---
st.set_page_config(page_title="影城防撤场监控系统", page_icon="🛡️", layout="wide")
# 屏蔽 HTTPS 证书警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 尝试导入自动刷新组件
try:
from streamlit_autorefresh import st_autorefresh
except ImportError:
st_autorefresh = None
# --- 1. 配置与常量 ---
load_dotenv()
# Pushover 配置
PUSHOVER_TOKEN = os.getenv("PUSHOVER_TOKEN")
PUSHOVER_USER = os.getenv("PUSHOVER_USER")
# 微信配置
WX_APP_ID = os.getenv("WX_APP_ID")
WX_APP_SECRET = os.getenv("WX_APP_SECRET")
# 支持多个用户,逗号分隔
WX_USER_OPEN_IDS = os.getenv("WX_USER_OPEN_ID", "").split(',')
WX_TEMPLATE_ID_TICKET = os.getenv("WX_TEMPLATE_ID_TICKET")
WX_TEMPLATE_ID_DAILY = os.getenv("WX_TEMPLATE_ID_DAILY")
# 影城配置
CINEMA_ID = os.getenv("CINEMA_ID")
# 获取 Token 文件路径
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(CURRENT_DIR)
TOKEN_FILE = os.path.join(ROOT_DIR, 'token_data.json')
# --- 2. 工具函数 ---
def get_beijing_now():
"""获取当前的北京时间"""
utc_now = datetime.now(timezone.utc)
beijing_now = utc_now.astimezone(timezone(timedelta(hours=8)))
return beijing_now.replace(tzinfo=None)
def get_business_date():
"""获取当前的营业日日期"""
now = get_beijing_now()
if now.time() < dt_time(6, 0):
return (now - timedelta(days=1)).strftime("%Y-%m-%d")
return now.strftime("%Y-%m-%d")
def parse_show_datetime(date_str, time_str):
"""解析排片时间 (处理跨天)"""
try:
show_t = datetime.strptime(time_str, "%H:%M").time()
base_date = datetime.strptime(date_str, "%Y-%m-%d")
if show_t < dt_time(6, 0):
base_date += timedelta(days=1)
return datetime.combine(base_date.date(), show_t)
except:
return None
def simplify_hall_name(raw_name):
"""
影厅名称简化
例如:'【和成天下1号厅】...' -> '1号厅'
'6号激光厅' -> '6号厅'
"""
if not raw_name:
return "未知影厅"
match = re.search(r'(\d+)号', raw_name)
if match:
return f"{match.group(1)}号厅"
return raw_name
# --- 3. 微信推送模块 (含自动重试) ---
class WeChatPusher:
def __init__(self, app_id, app_secret):
self.app_id = app_id
self.app_secret = app_secret
self.token = None
self.token_expires_time = 0
def get_access_token(self, force_refresh=False):
"""获取并缓存微信 Access Token
:param force_refresh: 是否强制刷新 Token
"""
if not force_refresh and self.token and datetime.now().timestamp() < self.token_expires_time:
return self.token
if not self.app_id or not self.app_secret:
return None
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.app_id}&secret={self.app_secret}"
try:
resp = requests.get(url, timeout=10)
data = resp.json()
if 'access_token' in data:
self.token = data['access_token']
# 提前 200 秒过期,防止临界点问题
self.token_expires_time = datetime.now().timestamp() + data['expires_in'] - 200
return self.token
else:
print(f"❌ 微信 Token 获取失败: {data}")
except Exception as e:
print(f"❌ 微信 API 请求异常: {e}")
return None
def send_template_message(self, user_ids, template_id, data_map):
"""发送模板消息给多个用户 (带 40001 错误重试机制)"""
# 组装微信数据格式
formatted_data = {}
for key, value in data_map.items():
formatted_data[key] = {"value": value, "color": "#173177"}
# 清洗 user_ids
valid_ids = [uid.strip() for uid in user_ids if uid and uid.strip()]
if not valid_ids:
return False
success_any = False
for user_id in valid_ids:
# 每个用户最多尝试 2 次(1次正常,1次刷新 Token 后重试)
for attempt in range(2):
# 如果是重试(attempt > 0),强制刷新 Token
token = self.get_access_token(force_refresh=(attempt > 0))
if not token:
break # Token 都拿不到,不用重试了
url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={token}"
payload = {
"touser": user_id,
"template_id": template_id,
"data": formatted_data
}
try:
resp = requests.post(url, json=payload, timeout=10)
result = resp.json()
errcode = result.get("errcode")
if errcode == 0:
# 发送成功
# print(f"✅ 微信发送成功 [{user_id}]")
success_any = True
break # 跳出重试循环,处理下一个用户
# 40001: Token 无效
# 40014: Token 不合法
# 42001: Token 过期
elif errcode in [40001, 40014, 42001]:
print(f"🔄 微信 Token 失效 ({errcode}),正在刷新重试 [{user_id}]...")
continue # 进入下一次循环,强制刷新 Token
else:
print(f"❌ 微信发送失败 [{user_id}]: {result}")
break # 其他错误,不重试
except Exception as e:
print(f"❌ 微信发送异常 [{user_id}]: {e}")
break
return success_any
# --- 4. API 管理模块 ---
class APIManager:
def __init__(self, logger):
self.last_login_fail_time = 0
self.logger = logger
def save_token(self, token_data):
try:
with open(TOKEN_FILE, 'w', encoding='utf-8') as f:
json.dump(token_data, f, ensure_ascii=False, indent=4)
except:
pass
def load_token(self):
if os.path.exists(TOKEN_FILE):
try:
with open(TOKEN_FILE, 'r', encoding='utf-8') as f:
return json.load(f).get('token')
except:
pass
return None
def login(self):
if time.time() - self.last_login_fail_time < 600:
self.logger("⏳ 登录冷却中,跳过重试。")
return None
self.logger("🔄 尝试后台自动登录...")
username = os.getenv("CINEMA_USERNAME")
password = os.getenv("CINEMA_PASSWORD")
res_code = os.getenv("CINEMA_RES_CODE")
device_id = os.getenv("CINEMA_DEVICE_ID")
if not all([username, password, res_code]):
self.logger("❌ 环境变量缺失,无法自动登录")
return None
session = requests.Session()
session.headers.update({'User-Agent': 'Mozilla/5.0'})
login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action'
login_data = {
'username': username, 'password': password, 'type': '1',
'resCode': res_code, 'deviceid': device_id, 'dtype': 'ios',
}
try:
session.post(login_url, data=login_data, timeout=15)
resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10)
info = resp.json()
if info.get("success") and info.get("data", {}).get("token"):
self.save_token(info['data'])
self.logger("✅ 登录成功。")
return info['data']['token']
else:
raise Exception("未获取到Token")
except Exception as e:
self.last_login_fail_time = time.time()
self.logger(f"❌ 登录失败: {e}")
return None
def fetch_schedule(self, date_str):
token = self.load_token()
if not token:
token = self.login()
if not token: return None
url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo'
params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)}
headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'}
try:
response = requests.get(url, params=params, headers=headers, timeout=15)
data = response.json()
if data.get('code') == 1:
return data.get('data', [])
elif data.get('code') == 500:
self.logger("⚠️ Token失效,重试中...")
token = self.login()
if token:
params['token'] = token
response = requests.get(url, params=params, headers=headers, timeout=15)
return response.json().get('data', [])
return None
except Exception as e:
self.logger(f"API请求异常: {e}")
return None
# --- 5. 监控服务主逻辑 ---
class CinemaMonitor:
def __init__(self):
self.logs = deque(maxlen=50)
self.status_text = "初始化中..."
self.next_wakeup = None
self.monitored_sessions = []
self.api = APIManager(self.log)
self.wx_pusher = WeChatPusher(WX_APP_ID, WX_APP_SECRET)
self.current_business_date = None
self.daily_schedule_cache = None
self.zero_ticket_candidates = set()
self.alerted_sessions = set()
self.last_daily_report_date = None
# 统计数据 (用于每日报告)
self.stats = {
"zero_total": 0, # 纳入监控的0票总场次
"alert_count": 0, # 触发通知次数
"api_fails": 0, # API获取失败次数
"notify_fails": 0 # 发送通知失败次数
}
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def log(self, msg):
timestamp = get_beijing_now().strftime("%H:%M:%S")
entry = f"[{timestamp}] {msg}"
print(entry)
self.logs.appendleft(entry)
def _send_pushover(self, title, message, priority=0):
"""发送 Pushover 通知"""
if not PUSHOVER_TOKEN or not PUSHOVER_USER:
return False
url = "https://api.pushover.net/1/messages.json"
data = {
"token": PUSHOVER_TOKEN,
"user": PUSHOVER_USER,
"title": title,
"message": message,
"sound": "pushover",
"priority": priority
}
try:
response = requests.post(url, data=data, timeout=10)
if response.status_code == 200:
return True
else:
print(f"❌ Pushover 拒绝: {response.status_code}")
return False
except Exception as e:
print(f"❌ Pushover 网络异常: {e}")
return False
def send_ticket_alert(self, hall_name, movie_name, show_time, ticket_count):
"""
发送突发购票告警 (Pushover + WeChat)
"""
# 1. Pushover
title = f"🎟 发现 {hall_name} {show_time} 场次有票!"
msg = f"{hall_name} {show_time}{movie_name}》\n⚠️ 突增至 {ticket_count} 张"
if not self._send_pushover(title, msg, priority=1):
self.stats["notify_fails"] += 1
# 2. WeChat
if WX_TEMPLATE_ID_TICKET:
data = {
"first": f"发现 {hall_name} {show_time} 场次有票!\n{hall_name} {show_time}{movie_name}》\n⚠️ 突增至 {ticket_count} 张\n",
"keyword1": hall_name,
"keyword2": movie_name,
"keyword3": show_time,
"keyword4": f"{ticket_count} 张",
"remark": "\n请尽快处理。"
}
# 如果所有用户发送都失败才算失败
if not self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_TICKET, data):
self.stats["notify_fails"] += 1
def send_daily_report(self, date_str):
"""
发送每日统计报告 (Pushover + WeChat)
"""
# 1. Pushover
stats_text_full = (
f"0票总场次:{self.stats['zero_total']}\n"
f"触发通知次数:{self.stats['alert_count']}\n"
f"发送通知失败次数:{self.stats['notify_fails']}\n"
f"API获取失败次数:{self.stats['api_fails']}"
)
title = "🟢 0票场次开场前10分钟内突发购票检查就绪"
msg = (
f"{date_str},今日排片数据已加载,开始智能检查。\n"
f"昨日情况:\n{stats_text_full}"
)
if not self._send_pushover(title, msg, priority=0):
self.stats["notify_fails"] += 1
# 2. WeChat
if WX_TEMPLATE_ID_DAILY:
# 格式化简短的昨日数据,避免微信字段超长
stats_short = (
f"0票场次({self.stats['zero_total']}) "
f"触发({self.stats['alert_count']}) "
f"s失败({self.stats['notify_fails']}) "
f"g失败({self.stats['api_fails']})"
)
data = {
"first": f"0票场次开场前10分钟内突发购票检查就绪\n{date_str},今日排片数据已加载,开始智能检查。\n",
"keyword1": "0票场次开场前10分钟内突发购票",
"keyword2": date_str,
"keyword3": stats_short,
"remark": "\n服务正常运行中。"
}
if not self.wx_pusher.send_template_message(WX_USER_OPEN_IDS, WX_TEMPLATE_ID_DAILY, data):
self.stats["notify_fails"] += 1
def _run_loop(self):
self.log("🚀 监控服务已启动 (WeChat + Pushover)")
while True:
try:
now = get_beijing_now()
biz_date = get_business_date()
today_str = now.strftime("%Y-%m-%d")
# --- 每日健康检查 (09:00) ---
if now.time() >= dt_time(9, 0) and self.last_daily_report_date != today_str:
report_date_str = now.strftime("%Y年%m月%d日")
self.log(f"🔔 发送每日统计报告: {today_str}")
# 发送报告
self.send_daily_report(report_date_str)
# 重置统计数据 & 更新日期
self.last_daily_report_date = today_str
self.stats = {
"zero_total": 0, "alert_count": 0,
"api_fails": 0, "notify_fails": 0
}
# 休眠逻辑 (06:00 - 09:30)
start_check_time = now.replace(hour=9, minute=30, second=0, microsecond=0)
is_early_morning = (dt_time(6, 0) <= now.time() < dt_time(9, 30))
if is_early_morning:
self.status_text = "非监控时段 (等待 09:30)"
self.next_wakeup = start_check_time
self.zero_ticket_candidates.clear()
self.alerted_sessions.clear()
time.sleep(60)
continue
# 缓存刷新
if self.daily_schedule_cache is None or self.current_business_date != biz_date:
self.log(f"📅 获取 {biz_date} 全天排片...")
schedule = self.api.fetch_schedule(biz_date)
if schedule is None:
self.log("❌ 初始化获取失败,1分钟后重试")
self.stats["api_fails"] += 1
time.sleep(60)
continue
self.daily_schedule_cache = schedule
self.current_business_date = biz_date
self.zero_ticket_candidates.clear()
self.alerted_sessions.clear()
self.log(f"✅ 数据已更新,共 {len(schedule)} 场")
# 筛选窗口期
active_check_needed = False
min_sleep_seconds = 3600
sessions_in_window = []
current_schedule = self.daily_schedule_cache
for item in current_schedule:
start_time_str = item.get('showStartTime')
if not start_time_str: continue
show_dt = parse_show_datetime(biz_date, start_time_str)
if not show_dt: continue
if now >= show_dt: continue # 已开场
monitor_start_dt = show_dt - timedelta(minutes=11)
if now >= monitor_start_dt:
sessions_in_window.append({'data': item, 'dt': show_dt})
active_check_needed = True
else:
seconds_until_window = (monitor_start_dt - now).total_seconds()
if seconds_until_window < min_sleep_seconds:
min_sleep_seconds = seconds_until_window
# 执行监控
if active_check_needed:
self.status_text = "🔥 监控进行中"
self.next_wakeup = now + timedelta(seconds=60)
realtime_schedule = self.api.fetch_schedule(biz_date)
if realtime_schedule is None:
self.log("⚠️ API请求失败,跳过本次判定")
self.stats["api_fails"] += 1
time.sleep(60)
continue
realtime_map = {f"{x.get('hallId')}_{x.get('showStartTime')}": x for x in realtime_schedule}
display_monitors = []
for session in sessions_in_window:
key = f"{session['data'].get('hallId')}_{session['data'].get('showStartTime')}"
unique_id = f"{biz_date}_{key}"
latest = realtime_map.get(key)
if not latest: continue
movie = latest.get('movieName')
hall_raw = latest.get('hallName')
# 影厅名简化
hall_short = simplify_hall_name(hall_raw)
sold = int(latest.get('soldTicketNum', 0))
start = latest.get('showStartTime')
if sold == 0:
# 统计新加入的0票场次
if unique_id not in self.zero_ticket_candidates:
self.stats["zero_total"] += 1
self.zero_ticket_candidates.add(unique_id)
display_monitors.append(f"👁️ {start} {movie} (0票)")
else:
if unique_id in self.zero_ticket_candidates:
if unique_id not in self.alerted_sessions:
self.log(f"🚨 突发购票!{hall_short}{movie}》")
# 统计触发次数
self.stats["alert_count"] += 1
# 发送通知 (Pushover + WeChat)
self.send_ticket_alert(hall_short, movie, start, sold)
self.alerted_sessions.add(unique_id)
self.zero_ticket_candidates.remove(unique_id)
display_monitors.append(f"✅ {start} {movie} (新售出)")
else:
display_monitors.append(f"🛡️ {start} {movie} (原有票)")
self.monitored_sessions = display_monitors
time.sleep(60)
else:
if 0 < min_sleep_seconds < 86400:
wakeup_dt = now + timedelta(seconds=min_sleep_seconds)
self.status_text = "💤 智能休眠中"
self.next_wakeup = wakeup_dt
self.monitored_sessions = []
self.log(f"休眠 {min_sleep_seconds / 60:.1f} 分钟,直到 {wakeup_dt.strftime('%H:%M')}")
time.sleep(min_sleep_seconds)
else:
self.log("今日监控结束,长休眠")
time.sleep(300)
except Exception as e:
self.log(f"主循环异常: {e}")
time.sleep(60)
# --- 6. Streamlit 前端 ---
@st.cache_resource
def get_monitor():
return CinemaMonitor()
def main():
monitor = get_monitor()
# 自动刷新:30秒
if st_autorefresh:
st_autorefresh(interval=30 * 1000, key="monitor_refresh")
st.title("📢 零票影前十分钟内突发购票检查")
c1, c2, c3 = st.columns(3)
with c1:
st.metric("运行状态", monitor.status_text)
with c2:
wakeup_str = monitor.next_wakeup.strftime("%H:%M:%S") if monitor.next_wakeup else "--"
st.metric("下次唤醒", wakeup_str)
# --- 统计逻辑 ---
remaining_zero_count = 0
remaining_total_count = 0
if monitor.daily_schedule_cache:
now = get_beijing_now()
biz_date = monitor.current_business_date or get_business_date()
for item in monitor.daily_schedule_cache:
start_str = item.get('showStartTime')
if not start_str: continue
# 解析时间
show_dt = parse_show_datetime(biz_date, start_str)
# 只统计“当前时间之后”的场次
if show_dt and show_dt > now:
remaining_total_count += 1
if int(item.get('soldTicketNum', 0)) == 0:
remaining_zero_count += 1
with c3:
st.metric("剩余 0 票场次 / 剩余总场次", f"{remaining_zero_count} / {remaining_total_count}")
st.divider()
col_logs, col_list = st.columns([3, 2])
with col_logs:
st.subheader("📜 运行日志")
log_text = "\n".join(list(monitor.logs))
st.text_area("Logs", log_text, height=500, disabled=True)
with col_list:
st.subheader("🎯 当前监控窗口 (开场前10分)")
if monitor.monitored_sessions:
for s in monitor.monitored_sessions:
if "✅" in s:
st.success(s)
elif "👁️" in s:
st.error(s)
else:
st.info(s)
else:
st.caption("暂无临近开场的监控目标。")
if __name__ == "__main__":
main()