rongduan / app.py
Ethscriptions's picture
Update app.py
04141b7 verified
import streamlit as st
import requests
import time
import json
import os
import threading
from datetime import datetime, timedelta, timezone, time as dt_time
from collections import deque
# --- 1. 配置与常量 ---
PUSHOVER_TOKEN = "a5cynwsr19y8bdvn2y447bg3vc51fr"
PUSHOVER_USER = "uc86mdifsboas7bczuxmnoyq6qyouw"
TOKEN_FILE = 'token_data.json'
CINEMA_ID = "44001291"
# 页面配置
st.set_page_config(page_title="影城防撤场监控系统 Pro", page_icon="🛡️", layout="wide")
# --- 2. 核心时间处理 ---
def get_beijing_now():
"""获取当前的北京时间 (Naive)"""
utc_now = datetime.now(timezone.utc)
beijing_now = utc_now.astimezone(timezone(timedelta(hours=8)))
return beijing_now.replace(tzinfo=None)
def get_business_date():
"""获取当前的营业日日期"""
now = get_beijing_now()
if now.time() < dt_time(6, 0):
return (now - timedelta(days=1)).strftime("%Y-%m-%d")
return now.strftime("%Y-%m-%d")
def parse_show_datetime(date_str, time_str):
"""解析排片时间"""
try:
show_t = datetime.strptime(time_str, "%H:%M").time()
base_date = datetime.strptime(date_str, "%Y-%m-%d")
if show_t < dt_time(6, 0):
base_date += timedelta(days=1)
return datetime.combine(base_date.date(), show_t)
except:
return None
def send_pushover(title, message, priority=0):
url = "https://api.pushover.net/1/messages.json"
data = {
"token": PUSHOVER_TOKEN,
"user": PUSHOVER_USER,
"title": title,
"message": message,
"sound": "pushover",
"priority": priority
}
try:
requests.post(url, data=data, timeout=5)
except:
pass
# --- 3. API 管理模块 ---
class APIManager:
def __init__(self, logger):
self.last_login_fail_time = 0
self.logger = logger
def save_token(self, token_data):
try:
with open(TOKEN_FILE, 'w', encoding='utf-8') as f:
json.dump(token_data, f, ensure_ascii=False, indent=4)
except:
pass
def load_token(self):
if os.path.exists(TOKEN_FILE):
try:
with open(TOKEN_FILE, 'r', encoding='utf-8') as f:
return json.load(f).get('token')
except:
pass
return None
def login(self):
if time.time() - self.last_login_fail_time < 600:
self.logger("⏳ 登录冷却中,跳过重试。")
return None
self.logger("🔄 尝试后台自动登录...")
session = requests.Session()
session.headers.update({'User-Agent': 'Mozilla/5.0'})
login_url = 'https://app.bi.piao51.cn/cinema-app/credential/login.action'
login_data = {
'username': 'chenhy', 'password': '123456', 'type': '1',
'resCode': '44001291', 'deviceid': '1517bfd3f6e7fef1333', 'dtype': 'ios',
}
try:
session.post(login_url, data=login_data, timeout=15)
resp = session.get('https://app.bi.piao51.cn/cinema-app/security/logined.action', timeout=10)
info = resp.json()
if info.get("success") and info.get("data", {}).get("token"):
self.save_token(info['data'])
self.logger("✅ 登录成功。")
return info['data']['token']
else:
raise Exception("未获取到Token")
except Exception as e:
self.last_login_fail_time = time.time()
msg = f"登录失败: {e}"
self.logger(f"❌ {msg}")
send_pushover("监控异常", msg, priority=0)
return None
def fetch_schedule(self, date_str):
"""
获取排片数据。
返回 None 表示网络或API错误 (需要处理)。
返回 [] 表示当天没排片或数据为空。
"""
token = self.load_token()
if not token:
token = self.login()
if not token: return None
url = 'https://cawapi.yinghezhong.com/showInfo/getHallShowInfo'
params = {'showDate': date_str, 'token': token, '_': int(time.time() * 1000)}
headers = {'Origin': 'https://caw.yinghezhong.com', 'User-Agent': 'Mozilla/5.0'}
try:
response = requests.get(url, params=params, headers=headers, timeout=15)
data = response.json()
if data.get('code') == 1:
return data.get('data', [])
elif data.get('code') == 500:
self.logger("⚠️ Token失效,重试中...")
token = self.login()
if token:
params['token'] = token
response = requests.get(url, params=params, headers=headers, timeout=15)
return response.json().get('data', [])
return None # 其他错误码视为失败
except Exception as e:
self.logger(f"API请求异常: {e}")
return None # 网络异常返回 None
# --- 4. 监控服务主逻辑 (修复版) ---
class CinemaMonitor:
def __init__(self):
self.logs = deque(maxlen=50)
self.status_text = "初始化中..."
self.next_wakeup = None
self.monitored_sessions = []
self.api = APIManager(self.log)
self.current_business_date = None
self.daily_schedule_cache = None
self.zero_ticket_candidates = set()
self.alerted_sessions = set()
self.simulation_mode = False
self.simulation_data = []
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def log(self, msg):
timestamp = get_beijing_now().strftime("%H:%M:%S")
entry = f"[{timestamp}] {msg}"
print(entry)
self.logs.appendleft(entry)
def _run_loop(self):
self.log("🚀 监控服务已启动 (严格实时数据版)")
while True:
try:
now = get_beijing_now()
biz_date = get_business_date()
# --- 1. 非营业时间休眠 ---
start_check_time = now.replace(hour=9, minute=30, second=0, microsecond=0)
is_early_morning = (dt_time(6,0) <= now.time() < dt_time(9,30))
if is_early_morning and not self.simulation_mode:
self.status_text = "非监控时段 (等待 09:30)"
self.next_wakeup = start_check_time
self.zero_ticket_candidates.clear()
self.alerted_sessions.clear()
sleep_sec = (start_check_time - now).total_seconds()
self.log(f"💤 系统休眠中... 预计唤醒: {start_check_time.strftime('%H:%M')}")
time.sleep(min(sleep_sec, 300))
continue
# --- 2. 缓存刷新 ---
if self.daily_schedule_cache is None or self.current_business_date != biz_date:
self.log(f"📅 获取 {biz_date} 全天排片...")
schedule = self.api.fetch_schedule(biz_date)
if schedule is None:
self.log("❌ 初始化获取失败,1分钟后重试")
time.sleep(60)
continue
self.daily_schedule_cache = schedule
self.current_business_date = biz_date
self.zero_ticket_candidates.clear()
self.alerted_sessions.clear()
self.log(f"✅ 数据已更新,共 {len(schedule)} 场")
# --- 3. 筛选窗口期场次 ---
active_check_needed = False
min_sleep_seconds = 3600
sessions_in_window = []
current_schedule = self.daily_schedule_cache + self.simulation_data
for item in current_schedule:
start_time_str = item.get('showStartTime')
if not start_time_str: continue
show_dt = parse_show_datetime(biz_date, start_time_str)
if not show_dt: continue
if now >= show_dt: continue
monitor_start_dt = show_dt - timedelta(minutes=11)
if now >= monitor_start_dt:
sessions_in_window.append({'data': item, 'dt': show_dt})
active_check_needed = True
else:
seconds_until_window = (monitor_start_dt - now).total_seconds()
if seconds_until_window < min_sleep_seconds:
min_sleep_seconds = seconds_until_window
# --- 4. 严格执行逻辑 ---
if active_check_needed:
self.status_text = "🔥 监控进行中 (高频轮询)"
self.next_wakeup = now + timedelta(seconds=60)
# 关键修复:获取实时数据
realtime_schedule = self.api.fetch_schedule(biz_date)
# 如果网络失败,realtime_schedule 为 None
if realtime_schedule is None and not self.simulation_mode:
self.log("⚠️ 网络/API请求失败,跳过本次判定,防止误报!")
time.sleep(60)
continue
if realtime_schedule is None: realtime_schedule = []
if self.simulation_mode: realtime_schedule += self.simulation_data
realtime_map = {f"{x.get('hallId')}_{x.get('showStartTime')}": x for x in realtime_schedule}
display_monitors = []
for session in sessions_in_window:
unique_id = f"{biz_date}_{session['data'].get('hallId')}_{session['data'].get('showStartTime')}"
key_for_map = f"{session['data'].get('hallId')}_{session['data'].get('showStartTime')}"
# 关键修复:绝对不能使用 fallback 回退到缓存数据!
latest = realtime_map.get(key_for_map)
if not latest:
# 如果实时数据里找不到这一场(可能被删除了,或者数据没拉全),跳过判断
continue
movie_name = latest.get('movieName')
hall_name = latest.get('hallName')
sold = int(latest.get('soldTicketNum', 0))
start_str = latest.get('showStartTime')
mins_left = (session['dt'] - now).total_seconds() / 60
# === 判定逻辑 ===
if sold == 0:
# 确认为0票,加入候选
self.zero_ticket_candidates.add(unique_id)
display_monitors.append(f"👁️ {start_str} {movie_name} (0票, 监控中)")
else:
# 现在的票数 > 0
if unique_id in self.zero_ticket_candidates:
# 之前我们确认过它是0票,现在变了 -> 报警
if unique_id not in self.alerted_sessions:
self.log(f"🚨 突发购票检测!{hall_name}{movie_name}》")
send_pushover(
f"禁止撤场:{hall_name}有票!",
f"《{movie_name}{start_str}\n倒计时 {mins_left:.1f}分\n⚠️ 从0票突增至{sold}张",
priority=0
)
self.alerted_sessions.add(unique_id)
self.zero_ticket_candidates.remove(unique_id) # 移出监控
display_monitors.append(f"✅ {start_str} {movie_name} (新售出{sold}张, 已报警)")
else:
# 它从未进入过0票名单,说明一开始就有票 -> 忽略
display_monitors.append(f"🛡️ {start_str} {movie_name} (原有{sold}张, 忽略)")
self.monitored_sessions = display_monitors
time.sleep(60)
else:
if 0 < min_sleep_seconds < 86400:
wakeup_dt = now + timedelta(seconds=min_sleep_seconds)
self.status_text = "💤 智能休眠中"
self.next_wakeup = wakeup_dt
self.monitored_sessions = []
self.log(f"休眠 {min_sleep_seconds/60:.1f} 分钟,直到 {wakeup_dt.strftime('%H:%M')}")
time.sleep(min_sleep_seconds)
else:
self.log("今日无更多监控场次,休眠 5 分钟")
time.sleep(300)
except Exception as e:
self.log(f"主循环异常: {e}")
time.sleep(60)
def trigger_simulation(self):
self.simulation_mode = True
fake_time = (get_beijing_now() + timedelta(minutes=5)).strftime("%H:%M")
# 1. 先注入 0 票
self.simulation_data = [{
'movieName': '测试影片-防误报验证',
'hallName': '模拟厅',
'hallId': 'SIM_SAFE',
'showStartTime': fake_time,
'soldTicketNum': '0'
}]
self.log(f"已注入 0票 模拟场次: {fake_time} 开场")
# 2. 10秒后改为 1 票
def simulate_buy():
time.sleep(10)
self.simulation_data[0]['soldTicketNum'] = '1'
self.log("⚡ [测试] 模拟动作:购票成功!应触发报警。")
threading.Thread(target=simulate_buy).start()
# --- 5. Streamlit 前端 ---
@st.cache_resource
def get_monitor():
return CinemaMonitor()
def main():
monitor = get_monitor()
st.title("📽️ 影城排程防撤场监控终端 Pro")
utc_now = datetime.now(timezone.utc).strftime("%H:%M:%S")
bj_now = get_beijing_now().strftime("%H:%M:%S")
c1, c2, c3 = st.columns(3)
with c1: st.metric("当前状态", monitor.status_text)
with c2: st.metric("下次唤醒 (北京时间)", monitor.next_wakeup.strftime("%H:%M:%S") if monitor.next_wakeup else "--")
with c3: st.metric("系统时间 (UTC / BJ)", f"{utc_now} / {bj_now}")
st.divider()
col_logs, col_list = st.columns([3, 2])
with col_logs:
st.subheader("📜 系统运行日志")
if st.button("刷新日志"): pass
st.text_area("Logs", "\n".join(list(monitor.logs)), height=450, disabled=True)
with col_list:
st.subheader("🎯 实时监控列表")
st.info("说明:👁️=0票监控中 | ✅=突发购票(报警) | 🛡️=原有票(安全)")
if monitor.monitored_sessions:
for s in monitor.monitored_sessions:
if "✅" in s: st.success(s)
elif "👁️" in s: st.error(s)
else: st.caption(s)
else:
st.info("当前休眠中,无临近场次")
st.write("---")
if st.button("模拟 '0票 -> 1票' 验证"):
monitor.trigger_simulation()
st.toast("测试已触发")
if __name__ == "__main__":
main()