gt / app.py
harii66's picture
Update app.py
1b0a797 verified
import time
import json
import secrets
import asyncio
import httpx
import requests
import os
import io
import re
import hashlib
import threading
import traceback
import tempfile
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from datetime import datetime, timedelta, timezone
from urllib.parse import quote, urljoin, urlparse
from fastapi import FastAPI, Request, Header, Depends, HTTPException, status
from fastapi.responses import JSONResponse, Response, FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from pydantic import BaseModel
from typing import Optional, List
from config import Config
from cache_manager import cache
from user_manager import user_manager, User, AVAILABLE_BADGES
from proxy_handler import proxy_media, proxy_live_stream_direct, proxy_playback_stream, get_live_m3u8_url
from utils import get_auth, get_channels, get_jst_date, fetch_epg, get_all_epg
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from archiver_manager import archive_manager
app = FastAPI(title=Config.APP_NAME, version=Config.APP_VERSION, description=Config.APP_DESCRIPTION)
scheduler = AsyncIOScheduler()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["Content-Length", "Content-Range", "Accept-Ranges", "Content-Disposition"])
if Config.ENABLE_GZIP: app.add_middleware(GZipMiddleware, minimum_size=1000)
static_path = Path(__file__).parent / "static"
if static_path.exists(): app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
admin_tokens = {}
def create_admin_token() -> str:
token = secrets.token_urlsafe(32)
admin_tokens[token] = datetime.now() + timedelta(hours=24)
return token
def verify_admin_token(token: str) -> bool:
if not token: return False
now = datetime.now()
expired = [t for t, exp in admin_tokens.items() if exp < now]
for t in expired: del admin_tokens[t]
if token not in admin_tokens: return False
if now > admin_tokens[token]:
del admin_tokens[token]
return False
return True
def get_admin_token(authorization: Optional[str]) -> Optional[str]:
if not authorization: return None
return authorization[7:] if authorization.startswith("Bearer ") else authorization
def get_current_admin_token(authorization: Optional[str] = Header(None)) -> str:
token = get_admin_token(authorization)
if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="No token provided")
if not verify_admin_token(token): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
return token
def _parse_epg_time(t):
if not t: return None
try:
t_str = str(t).strip()
jst = timezone(timedelta(hours=9))
if len(t_str) == 14 and t_str.isdigit():
return datetime.strptime(t_str, "%Y%m%d%H%M%S").replace(tzinfo=jst).timestamp()
val = float(t_str)
return val / 1000.0 if val > 9999999999 else val
except:
return None
class PasswordVerify(BaseModel):
username: str
password_hash: str
class AdminLogin(BaseModel):
username: str
password_hash: str
class CreateUserRequest(BaseModel):
username: str
password: Optional[str] = None
expires_days: Optional[int] = None
notes: str = ""
badge: Optional[str] = None
is_admin: bool = False
class ExtendExpiryRequest(BaseModel): days: int
class SetBadgeRequest(BaseModel): badge: Optional[str] = None
class UserSettings(BaseModel):
favorite_channels: Optional[List[str]] = None
playback_history: Optional[dict] = None
program_reminders: Optional[List[dict]] = None
download_concurrency: Optional[int] = None
batch_download_concurrency: Optional[int] = None
fab_position: Optional[dict] = None
other_settings: Optional[dict] = None
@app.middleware("http")
async def protocol_middleware(request: Request, call_next):
forwarded_proto = request.headers.get('X-Forwarded-Proto', '')
forwarded_host = request.headers.get('X-Forwarded-Host', '')
forwarded_port = request.headers.get('X-Forwarded-Port', '')
if forwarded_proto: request.scope['scheme'] = forwarded_proto
if forwarded_host:
port = 443 if forwarded_proto == 'https' else 80
if forwarded_port:
try: port = int(forwarded_port)
except: pass
request.scope['server'] = (forwarded_host, port)
return await call_next(request)
@app.middleware("http")
async def performance_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
response.headers["X-Response-Time"] = f"{int((time.time() - start_time) * 1000)}ms"
if request.url.path.startswith('/static/'):
# 模板 (HTML) 不能缓存, 否则前端改了按钮也看不到; CSS/JS/图片仍保留长缓存
if '/templates/' in request.url.path or request.url.path.endswith('.html'):
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
else:
response.headers['Cache-Control'] = 'public, max-age=86400'
if request.url.path.startswith('/api/') or request.url.path.startswith('/live/') or request.url.path.startswith('/vod/'):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS, DELETE'
response.headers['Access-Control-Allow-Headers'] = 'Authorization, Content-Type, Range'
return response
@app.get("/")
@app.get("/channels")
@app.get("/player")
@app.get("/epg")
@app.get("/archive")
@app.get("/cache")
@app.get("/api-test")
async def render_frontend():
html_path = Path(__file__).parent / "static" / "index.html"
return FileResponse(html_path) if html_path.exists() else {"message": "Frontend not found"}
@app.get("/admin")
async def admin_page():
html_path = Path(__file__).parent / "static" / "admin.html"
return FileResponse(html_path) if html_path.exists() else {"message": "Not found"}
@app.get("/admin/login")
async def admin_login_page():
html_path = Path(__file__).parent / "static" / "admin-login.html"
return FileResponse(html_path) if html_path.exists() else {"message": "Not found"}
@app.post("/api/verify-password")
async def verify_password(data: PasswordVerify):
try:
if data.username == Config.ADMIN_USERNAME and data.password_hash == Config.ADMIN_PASSWORD_HASH:
return {"success": True, "message": "Admin login successful", "user": {"username": data.username, "is_admin": True, "badge": None}}
if data.username and user_manager.verify_user(data.username, data.password_hash):
user = user_manager.get_user(data.username)
if not user: return {"success": False, "message": "User not found"}
return {"success": True, "message": "User login successful", "user": {"username": data.username, "is_admin": user.is_admin, "badge": user.badge}, "user_data": user_manager.get_user_data(data.username)}
return {"success": False, "message": "Invalid username or password"}
except Exception as e:
return JSONResponse(content={"success": False, "message": str(e)}, status_code=500)
@app.get("/api/badges")
async def get_badges():
return {"success": True, "badges": AVAILABLE_BADGES}
@app.post("/api/admin/login")
async def admin_login(data: AdminLogin):
try:
if data.username == Config.ADMIN_USERNAME and data.password_hash == Config.ADMIN_PASSWORD_HASH:
return {"success": True, "token": create_admin_token(), "message": "Login successful"}
return JSONResponse(content={"success": False, "message": "Invalid credentials"}, status_code=401)
except Exception as e:
return JSONResponse(content={"success": False, "message": str(e)}, status_code=500)
@app.get("/api/admin/check")
async def admin_check(authorization: Optional[str] = Header(None)):
token = get_admin_token(authorization)
if token and verify_admin_token(token): return {"authenticated": True}
return JSONResponse(content={"authenticated": False}, status_code=401)
@app.get("/api/admin/badges")
async def admin_get_badges(token: str = Depends(get_current_admin_token)):
return {"success": True, "badges": AVAILABLE_BADGES}
@app.get("/api/admin/stats")
async def admin_stats(token: str = Depends(get_current_admin_token)):
return user_manager.get_stats()
@app.get("/api/admin/users")
async def admin_list_users(token: str = Depends(get_current_admin_token)):
users = user_manager.list_users()
return {"success": True, "count": len(users), "users": [u.dict() for u in users]}
@app.post("/api/admin/users")
async def admin_create_user(data: CreateUserRequest, token: str = Depends(get_current_admin_token)):
try:
if len(user_manager.users) >= Config.MAX_USERS: return JSONResponse(content={"error": f"Maximum {Config.MAX_USERS} users allowed"}, status_code=400)
user, plain_password = user_manager.create_user(username=data.username, password=data.password, expires_days=data.expires_days, notes=data.notes, badge=data.badge, is_admin=data.is_admin)
return {"success": True, "user": user.dict(), "password": plain_password}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.delete("/api/admin/users/{username}")
async def admin_delete_user(username: str, token: str = Depends(get_current_admin_token)):
if user_manager.delete_user(username):
user_manager.delete_user_settings(username)
return {"success": True, "message": f"User deleted"}
return JSONResponse(content={"error": "User not found"}, status_code=404)
@app.post("/api/admin/users/{username}/activate")
async def admin_activate_user(username: str, token: str = Depends(get_current_admin_token)):
if user_manager.activate_user(username): return {"success": True}
return JSONResponse(content={"error": "User not found"}, status_code=404)
@app.post("/api/admin/users/{username}/deactivate")
async def admin_deactivate_user(username: str, token: str = Depends(get_current_admin_token)):
if user_manager.deactivate_user(username): return {"success": True}
return JSONResponse(content={"error": "User not found"}, status_code=404)
@app.post("/api/admin/users/{username}/extend")
async def admin_extend_expiry(username: str, data: ExtendExpiryRequest, token: str = Depends(get_current_admin_token)):
if user_manager.extend_expiry(username, data.days): return {"success": True}
return JSONResponse(content={"error": "User not found"}, status_code=404)
@app.post("/api/admin/users/{username}/badge")
async def admin_set_badge(username: str, data: SetBadgeRequest, token: str = Depends(get_current_admin_token)):
if user_manager.set_badge(username, data.badge): return {"success": True}
return JSONResponse(content={"error": "User not found"}, status_code=404)
@app.get("/api/user/{username}/settings")
async def get_user_settings(username: str):
return {"success": True, "settings": user_manager.get_user_settings(username)}
class UserDataSync(BaseModel):
username: str
data: dict
@app.post("/api/user/data/sync")
async def sync_user_data(payload: UserDataSync):
if user_manager.update_user_data(payload.username, payload.data): return {"success": True}
return JSONResponse(content={"success": False, "error": "Not found"}, status_code=404)
class UserBehaviorLog(BaseModel):
username: str
action: str
data: dict
@app.post("/api/user/behavior/track")
async def track_user_behavior(payload: UserBehaviorLog):
try:
user_data = user_manager.get_user_data(payload.username)
if not user_data: return JSONResponse(content={"success": False, "error": "Not found"}, status_code=404)
update_data = {}
if payload.action == 'play':
playback_history = user_data.get('playback_history', [])
playback_history.insert(0, {'timestamp': datetime.now(timezone(timedelta(hours=9))).isoformat(), 'channel_id': payload.data.get('channel_id'), 'channel_name': payload.data.get('channel_name'), 'duration': payload.data.get('duration', 0)})
update_data['playback_history'] = playback_history[:100]
elif payload.action == 'favorite':
update_data['favorite_channels'] = payload.data.get('favorite_channels', [])
elif payload.action == 'setting_change':
for k, v in payload.data.items():
if k in ['download_concurrency', 'batch_download_concurrency', 'fab_position']:
update_data[k] = v
elif payload.action == 'reminder':
update_data['program_reminders'] = payload.data.get('program_reminders', [])
if update_data and user_manager.update_user_data(payload.username, update_data): return {"success": True}
return JSONResponse(content={"success": False, "error": "Invalid"}, status_code=400)
except Exception as e:
return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
@app.get("/ping")
async def ping():
"""UptimeRobot / 自 ping 保活用,返回 200 即可"""
return {"ok": True}
@app.get("/health")
async def health_check():
stats = cache.get_stats()
is_valid, missing = Config.validate()
return {"status": "running" if is_valid else "configuration_error", "config_valid": is_valid, "total_users": len(user_manager.users), "cache": stats}
@app.get("/api/refresh")
async def refresh_cache(type: str = "all"):
cache.clear_cache(type)
if type in ['auth', 'all']: await get_auth(force=True)
elif type == 'cid':
from utils import get_cid
await get_cid(force=True)
return {"success": True}
@app.get("/api/list")
async def list_channels(request: Request):
auth = await get_auth()
channels = await get_channels(auth)
worker_base = f"{request.url.scheme}://{request.url.netloc}"
return {"success": True, "channels": [{**ch, "playUrl": f"{worker_base}/api/live/{ch['no']}"} for ch in channels]}
@app.get("/api/epg")
async def get_epg(vid: str, date: str):
auth = await get_auth()
epg_data = await fetch_epg(vid, date, auth)
return {"success": True, "epg": epg_data}
@app.get("/api/epg/all")
async def get_all_epg_data():
auth = await get_auth()
return {"success": True, "data": await get_all_epg(auth, force=False)}
@app.get("/api/live/{chid}")
async def live_stream_info(chid: str, request: Request):
auth = await get_auth()
channels = await get_channels(auth)
channel = next((ch for ch in channels if str(ch['no']) == chid), None)
if not channel: return JSONResponse(content={"success": False, "error": "Not found"}, status_code=404)
worker_base = f"{request.url.scheme}://{request.url.netloc}"
return {"success": True, "stream": {"m3u8": f"{worker_base}/stream/live/{chid}.m3u8", "direct": await get_live_m3u8_url(chid, auth)}}
@app.get("/stream/live/{chid}.m3u8")
async def live_stream_m3u8(chid: str, request: Request):
return await proxy_live_stream_direct(chid, request)
@app.get("/api/epg/search")
async def search_epg(keyword: str, days: int = 30):
try:
if not keyword:
return JSONResponse(content={"success": False, "error": "Missing keyword"}, status_code=400)
auth = await get_auth()
channels_list = await get_channels(auth)
channel_map = {ch['id']: ch for ch in channels_list}
now = datetime.now()
date_list = [get_jst_date(now - timedelta(days=i)) for i in range(days + 1)]
results = []
keyword_lower = keyword.lower()
cache_hits = 0
cache_misses = 0
full_cache = cache.get_epg('_all_', 'full')
if full_cache:
for channel_id, programs in full_cache.items():
channel_info = channel_map.get(channel_id)
if not channel_info:
continue
for program in programs:
program_time = program.get('time', 0)
program_date = get_jst_date(datetime.fromtimestamp(program_time))
if program_date not in date_list:
continue
title = program.get('title') or program.get('name') or ''
if keyword_lower in title.lower():
results.append({
'channel_id': channel_id,
'channel_name': channel_info['name'],
'channel_no': channel_info['no'],
'program': program,
'date': program_date
})
cache_hits += 1
else:
for channel_id, channel_info in channel_map.items():
for date_str in date_list:
cached_epg = cache.get_epg(channel_id, date_str)
if cached_epg is not None:
cache_hits += 1
for program in cached_epg:
title = program.get('title') or program.get('name') or ''
if keyword_lower in title.lower():
results.append({
'channel_id': channel_id,
'channel_name': channel_info['name'],
'channel_no': channel_info['no'],
'program': program,
'date': date_str
})
else:
cache_misses += 1
if cache_hits == 0 or cache_misses > cache_hits:
asyncio.create_task(background_fetch_all_epg(auth))
results.sort(key=lambda x: x['program']['time'], reverse=True)
return {
"success": True, "keyword": keyword, "days": days,
"total": len(results), "results": results,
"cache_stats": {
"hits": cache_hits, "misses": cache_misses,
"strategy": "full_cache" if full_cache else "partial_cache",
"hit_rate": f"{cache_hits * 100 // (cache_hits + cache_misses) if (cache_hits + cache_misses) > 0 else 0}%"
},
"message": "后台正在缓存数据,下次搜索会更快" if not full_cache and cache_misses > 0 else None
}
except Exception as e:
return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
async def background_fetch_all_epg(auth: dict):
try:
await get_all_epg(auth, force=False)
except Exception:
pass
@app.get("/api/playback/{path:path}")
async def playback_stream_info(path: str, request: Request):
try:
auth = await get_auth()
worker_base = f"{request.url.scheme}://{request.url.netloc}"
clean_path = path.strip('/')
if clean_path.startswith('/'):
clean_path = clean_path[1:]
if not clean_path.startswith('query/'):
if '/' not in clean_path:
clean_path = f"query/{clean_path}"
return {
"success": True,
"playback": {
"path": f"/{clean_path}",
"m3u8": f"{worker_base}/stream/playback/{clean_path}.m3u8",
"original_path": path
},
"info": {"protocol": request.url.scheme, "type": "playback"}
}
except Exception as e:
return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
@app.get("/stream/playback/{path:path}.m3u8")
async def playback_stream_m3u8(path: str, request: Request):
return await proxy_playback_stream(path, request)
@app.get("/api/download/playback/")
async def download_playback_by_path(request: Request, path: str, channel: str):
try:
auth = await get_auth()
channels = await get_channels(auth)
target_channel = next((ch for ch in channels if str(ch['no']) == str(channel)), None)
if not target_channel:
raise ValueError(f"频道 {channel} 不存在")
clean_path = path.strip()
if clean_path.startswith('/'):
clean_path = clean_path[1:]
if clean_path.startswith('query/'):
clean_path = clean_path[6:]
if clean_path.endswith('.m3u8'):
clean_path = clean_path[:-6]
program_title = "Unknown"
program_time = None
JST = timezone(timedelta(hours=9))
now_jst = datetime.now(JST)
for days_ago in range(0, 30):
check_date_jst = now_jst - timedelta(days=days_ago)
check_date = check_date_jst.strftime('%Y-%m-%d')
try:
epg_list = await fetch_epg(target_channel['id'], check_date, auth)
if not epg_list:
continue
for prog in epg_list:
if prog.get('path'):
prog_path = prog['path'].strip()
if prog_path.startswith('/'):
prog_path = prog_path[1:]
if prog_path.startswith('query/'):
prog_path = prog_path[6:]
if prog_path.endswith('.m3u8'):
prog_path = prog_path[:-6]
if prog_path == clean_path:
program_title = prog.get('title') or prog.get('name') or 'Unknown'
program_time = datetime.fromtimestamp(prog['time'], tz=JST)
break
if program_time:
break
except Exception:
continue
if not program_time:
program_time = now_jst
program_title = f"Playback_{target_channel['name']}"
def clean_text(text):
text = str(text).strip()
cleaned = re.sub(r'[<>:"/\\|?*]', '_', text)
cleaned = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', cleaned)
cleaned = re.sub(r'_+', '_', cleaned)
cleaned = cleaned.strip('_').strip()
if len(cleaned) > 150:
cleaned = cleaned[:150]
return cleaned if cleaned else "unknown"
time_str = program_time.strftime('%Y%m%d_%H%M')
channel_name = clean_text(target_channel['name'])
program_name = clean_text(program_title)
filename = f"{time_str}_{channel_name}_{program_name}.ts"
playback_path = path.strip()
if playback_path.startswith('/'):
playback_path = playback_path[1:]
if not playback_path.startswith('query/'):
playback_path = f"query/{playback_path}"
vod_host = Config.UPSTREAM_HOSTS['vod']
access_token = quote(auth['access_token'])
upstream_m3u8 = f"{vod_host}/{playback_path}.m3u8?type=vod&__cross_domain_user={access_token}"
headers = {'Referer': Config.REQUIRED_REFERER, 'User-Agent': 'Mozilla/5.0'}
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(upstream_m3u8, headers=headers)
if resp.status_code != 200:
raise Exception(f"M3U8获取失败: HTTP {resp.status_code}")
m3u8_content = resp.text
from utils import extract_playlist_url
playlist_url = extract_playlist_url(m3u8_content, upstream_m3u8)
if not playlist_url or playlist_url == upstream_m3u8:
playlist_content = m3u8_content
playlist_url = upstream_m3u8
else:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(playlist_url, headers=headers)
if resp.status_code != 200:
raise Exception(f"播放列表获取失败: HTTP {resp.status_code}")
playlist_content = resp.text
base_url = playlist_url.rsplit('/', 1)[0]
ts_urls = []
for line in playlist_content.split('\n'):
line = line.strip()
if line and not line.startswith('#'):
ts_urls.append(line if line.startswith('http') else f"{base_url}/{line}")
if len(ts_urls) == 0:
raise Exception("未找到TS分段")
async def download_concurrent():
async def fetch_segment(client, url, retries=3):
for attempt in range(retries):
try:
r = await client.get(url, headers=headers, timeout=60.0)
if r.status_code == 200:
return r.content
if r.status_code in (429, 503) and attempt < retries - 1:
await asyncio.sleep(1.5 * (attempt + 1))
except Exception:
if attempt < retries - 1:
await asyncio.sleep(0.5 * (attempt + 1))
return None
batch_size = 10
fail_count = 0
fail_limit = max(1, len(ts_urls) // 10)
async with httpx.AsyncClient(timeout=60.0, limits=httpx.Limits(max_keepalive_connections=20, max_connections=30)) as client:
for i in range(0, len(ts_urls), batch_size):
batch = ts_urls[i:i+batch_size]
tasks = [fetch_segment(client, url) for url in batch]
results = await asyncio.gather(*tasks)
for content in results:
if content is None:
fail_count += 1
if fail_count > fail_limit:
return
else:
yield content
encoded_filename = quote(filename)
return StreamingResponse(
download_concurrent(),
media_type="video/mp2t",
headers={
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}",
"Cache-Control": "no-cache",
}
)
except Exception as e:
return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
@app.options("/live/{path:path}")
@app.options("/vod/{path:path}")
@app.options("/query/{path:path}")
@app.options("/stream/{path:path}")
@app.options("/api/{path:path}")
async def options_handler():
return Response(status_code=200, headers={'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS, DELETE', 'Access-Control-Allow-Headers': 'Authorization, Content-Type, Range'})
@app.get("/live/{path:path}")
async def proxy_live_media(path: str, request: Request):
return await proxy_media(request, f"/live/{path}")
@app.get("/vod/{path:path}")
async def proxy_vod_media(path: str, request: Request):
return await proxy_media(request, f"/vod/{path}")
@app.get("/query/{path:path}")
async def proxy_query_media(path: str, request: Request):
return await proxy_media(request, f"/query/{path}")
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
return JSONResponse(content={"error": "Not Found"}, status_code=404)
@app.exception_handler(500)
async def server_error_handler(request: Request, exc):
return JSONResponse(content={"error": "Internal Server Error"}, status_code=500)
async def internal_download_m3u8(m3u8_url: str, output_path: str, progress_dict: dict, _refresh_count: int = 0) -> bool:
def update_msg(msg):
if progress_dict is not None: progress_dict["current_task"] = msg
def update_sub(done, total):
if progress_dict is not None: progress_dict["ts_done"] = done; progress_dict["ts_total"] = total
def _is_stopping():
# 用户点击 "停止" 后, archive_manager.progress["status"] 会被翻成 "stopping",
# 这里的探测让正在跑的 m3u8 下载循环能尽快感知并退出, 而不是把几百个切片下完才返回
return progress_dict is not None and progress_dict.get("status") == "stopping"
async def _refresh_url(url: str) -> str:
try:
new_auth = await get_auth(force=True)
new_token = new_auth.get('access_token', '')
if not new_token: return url
from urllib.parse import urlparse, urlencode, parse_qs, urlunparse
parsed = urlparse(url)
qs = parse_qs(parsed.query, keep_blank_values=True)
for key in list(qs.keys()):
if 'user' in key.lower() or 'token' in key.lower(): qs[key] = [new_token]
return urlunparse(parsed._replace(query=urlencode({k: v[0] for k, v in qs.items()})))
except: return url
try:
if _is_stopping():
update_msg("已停止")
return False
update_msg("正在请求源站 M3U8...")
headers = {
'Referer': getattr(Config, 'REQUIRED_REFERER', 'http://vod.yoitv.com/'),
'Origin': 'http://vod.yoitv.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
async with httpx.AsyncClient(timeout=30.0, verify=False, follow_redirects=True) as client:
resp = await client.get(m3u8_url, headers=headers)
if resp.status_code in (401, 403):
_refresh_count += 1
if _refresh_count > 5:
update_msg(f"Token 持续无效,已达最大刷新次数 ({_refresh_count}),放弃")
return False
update_msg(f"Token 已过期(HTTP {resp.status_code}),第 {_refresh_count} 次刷新重试...")
await asyncio.sleep(min(2 + _refresh_count, 6))
m3u8_url = await _refresh_url(m3u8_url)
return await internal_download_m3u8(m3u8_url, output_path, progress_dict, _refresh_count=_refresh_count)
if resp.status_code in (429, 500, 502, 503, 504):
# 上游限流/抖动: 不要让整个 m3u8 fetch 一次失败就放弃, 短暂等待后再重试 2 次.
for _ru in range(2):
if _is_stopping():
update_msg("已停止")
return False
await asyncio.sleep(5 * (_ru + 1))
update_msg(f"上游 HTTP {resp.status_code}, 第 {_ru + 1}/2 次重试 m3u8...")
try:
resp = await client.get(m3u8_url, headers=headers)
except Exception as _re:
update_msg(f"重试 m3u8 异常: {str(_re)[:60]}")
continue
if resp.status_code == 200:
break
if resp.status_code != 200:
update_msg(f"下载失败: 源站持续 HTTP {resp.status_code}")
return False
elif resp.status_code != 200:
update_msg(f"下载失败: 源站拒绝访问 HTTP {resp.status_code}")
return False
m3u8_text = resp.text
current_url = str(resp.url)
parsed_url = urlparse(current_url)
query_string = parsed_url.query
playlist_url = current_url
for line in m3u8_text.splitlines():
line = line.strip()
if line and not line.startswith('#') and '.m3u8' in line:
playlist_url = urljoin(current_url, line)
if query_string and '?' not in playlist_url:
playlist_url += f"?{query_string}"
break
if playlist_url != current_url:
update_msg("正在解析嵌套视频流...")
resp = await client.get(playlist_url, headers=headers)
if resp.status_code != 200:
update_msg(f"嵌套流获取失败: HTTP {resp.status_code}")
return False
m3u8_text = resp.text
current_url = str(resp.url)
query_string = urlparse(current_url).query or query_string
ts_urls = []
for line in m3u8_text.splitlines():
line = line.strip()
if line and not line.startswith('#') and not line.upper().startswith('KEY'):
ts_url = urljoin(current_url, line)
if query_string and '?' not in ts_url:
ts_url += f"?{query_string}"
ts_urls.append(ts_url)
total_ts = len(ts_urls)
if total_ts == 0:
update_msg("下载失败: 未在视频流中找到任何画面数据")
return False
update_sub(0, total_ts)
update_msg(f"准备下载 {total_ts} 个切片 (0%)")
# ====================================================================
# OOM 元凶 (之前): ts_buffers = [None]*N 把一个节目的所有切片 (常见 1-2GB)
# 全部加载到内存, 外层 dl_semaphore=Semaphore(CONCURRENCY) 又让 16 路视频并发跑,
# 单实例内存峰值轻松 20-30GB. HF Space 16GB 限额 → 被 OOM kill → 容器重启,
# Python try/except 根本收不到信号 (SIGKILL 不可捕获), 表现为 "跑了 50-200 项无声停止
# 且 logs 丢失, current_task 变回初始值 '空闲 等待任务开始'".
# 现在: 每个切片流式写入独立 seg 文件, 全部下完后按顺序合并 + 边合并边删 seg.
# 单视频内存从整片降到 ~3MB, 16 路并发的内存峰值从 20-30GB 降到 < 200MB.
# ====================================================================
seg_dir = output_path + ".segs"
try: os.makedirs(seg_dir, exist_ok=True)
except Exception: pass
seg_paths = [os.path.join(seg_dir, f"seg_{i:06d}.ts") for i in range(total_ts)]
seg_ok = [False] * total_ts
def _cleanup_segs():
for _p in seg_paths:
try:
if os.path.exists(_p): os.remove(_p)
except Exception: pass
try: os.rmdir(seg_dir)
except Exception: pass
# 切片下载并发: 16 → 32; 连接池 32 → 64. 单视频切片下载吞吐 ≈ 翻倍, 上游 vod 站经验值能稳定吃下
semaphore = asyncio.Semaphore(32)
downloaded = 0
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0, connect=10.0), verify=False, follow_redirects=True,
limits=httpx.Limits(max_keepalive_connections=40, max_connections=64, keepalive_expiry=60.0)) as dl_client:
async def _fetch_ts(index, url):
nonlocal downloaded
if _is_stopping(): return False
async with semaphore:
if _is_stopping(): return False
for retry in range(5):
if _is_stopping(): return False
try:
# 流式落盘: 切片体积 1-3MB, 64KB chunk 吞吐充足, 且全程内存不超过 chunk size
async with dl_client.stream("GET", url, headers=headers) as r:
if r.status_code == 200:
with open(seg_paths[index], 'wb') as sf:
async for chunk in r.aiter_bytes(64 * 1024):
if chunk: sf.write(chunk)
seg_ok[index] = True
downloaded += 1
update_sub(downloaded, total_ts)
if downloaded % 10 == 0 or downloaded == total_ts:
update_msg(f"下载中 {downloaded}/{total_ts} ({int(downloaded/total_ts*100)}%)")
return True
if r.status_code in (429, 503) and retry < 4:
await asyncio.sleep(1.5 * (retry + 1))
continue
except Exception:
# 写一半异常可能留 partial seg 文件, 清掉避免污染后续合并
try:
if os.path.exists(seg_paths[index]): os.remove(seg_paths[index])
except Exception: pass
if retry < 4:
await asyncio.sleep(0.5 * (retry + 1))
return False
await asyncio.gather(*[_fetch_ts(i, u) for i, u in enumerate(ts_urls)])
if _is_stopping():
update_msg("已停止")
_cleanup_segs()
return False
downloaded_count = sum(1 for ok in seg_ok if ok)
if downloaded_count == 0:
update_msg(f"下载失败: {total_ts} 个切片全部获取失败,文件为空")
_cleanup_segs()
return False
update_msg(f"正在合并 {downloaded_count}/{total_ts} 个切片...")
# 合并 1-2GB 切片是纯同步 IO, 直接在 async 里跑会阻塞事件循环 5-10s,
# uvicorn /health 收不到响应可能被 HF healthcheck 杀 → 容器重启 → dashboard 回 '空闲'.
# 改成丢线程池, 事件循环继续接其它视频下载 + 响应健康检查.
def _do_merge():
try:
with open(output_path, 'wb') as outfile:
for i in range(total_ts):
if seg_ok[i] and os.path.exists(seg_paths[i]):
try:
with open(seg_paths[i], 'rb') as sf:
# 1MB chunk 拷贝, 避免一次性 read() 全部 seg
while True:
chunk = sf.read(1024 * 1024)
if not chunk: break
outfile.write(chunk)
except Exception: pass
# 边合并边删, 让 seg 占用的磁盘立即释放 (避免临时峰值冲出 Space 磁配额)
try:
if os.path.exists(seg_paths[i]): os.remove(seg_paths[i])
except Exception: pass
try: os.rmdir(seg_dir)
except Exception: pass
except Exception: pass
try:
_loop = asyncio.get_event_loop()
await _loop.run_in_executor(None, _do_merge)
except Exception: pass
update_sub(total_ts, total_ts)
update_msg("视频下载完成,正在打包入库...")
return True
except Exception as e:
update_msg(f"后端下载崩溃: {str(e)}")
return False
def _build_display_name(date_str: str, time_str: str, ch_name_clean: str, prog_name: str) -> str:
return f"{date_str}_{time_str}_{ch_name_clean}_{prog_name}.ts"
def _clean_name(s: str) -> str:
return re.sub(r'[\\/*?:"<>|]', "", s).replace(" ", "_").strip(".")
def _build_meta(date_str: str, time_str: str, ch_id: str, ch_name: str, ch_name_clean: str, prog_name: str, display_name: str, year: str, month: str, day: str, program: dict) -> dict:
return {
"display_name": display_name,
"date_str": date_str,
"time_str": time_str,
"ch_id": ch_id,
"ch_name": ch_name,
"ch_name_clean": ch_name_clean,
"prog_name": prog_name,
"year": year, "month": month, "day": day,
"program": program,
}
def _make_m3u8_url(program: dict, auth: dict) -> str:
if program.get('path'):
prog_path = program['path'].strip().lstrip('/')
if not prog_path.startswith('query/'):
prog_path = f"query/{prog_path}"
if prog_path.endswith('.m3u8'):
prog_path = prog_path[:-5]
vod_host = Config.UPSTREAM_HOSTS.get('vod', '')
access_token = quote(auth.get('access_token', ''), safe='')
if vod_host and access_token:
return f"{vod_host}/{prog_path}.m3u8?type=vod&__cross_domain_user={access_token}"
return program.get('m3u8') or program.get('playUrl') or ""
@app.get("/api/admin/archive/daily_epg")
async def get_daily_epg_archive(date: str):
auth = await get_auth()
channels = await get_channels(auth)
cmap = {}
for c in channels:
cmap[str(c.get('id'))] = c.get('name', '')
cmap[str(c.get('no'))] = c.get('name', '')
try:
epg_data = await get_all_epg(auth, channels, date=date)
except Exception:
epg_data = {}
has_data = any(
(v.get("data") if isinstance(v, dict) else v)
for v in (epg_data.values() if isinstance(epg_data, dict) else [])
)
if not has_data:
fmt_date = f"{date[:4]}-{date[4:6]}-{date[6:8]}"
_sem_epg = asyncio.Semaphore(8)
async def _fetch_one_ch(ch):
ch_id = str(ch.get('id', ''))
if not ch_id:
return None
async with _sem_epg:
try:
progs = await fetch_epg(ch_id, fmt_date, auth)
return (ch_id, progs) if progs else None
except Exception:
return None
_epg_results = await asyncio.gather(*[_fetch_one_ch(c) for c in channels])
for _r in _epg_results:
if _r:
epg_data[_r[0]] = {"data": _r[1]}
jst = timezone(timedelta(hours=9))
target_dt = datetime.strptime(date, "%Y%m%d").replace(tzinfo=jst)
start_ts = target_dt.timestamp()
end_ts = (target_dt + timedelta(days=1)).timestamp()
year, month, day = date[:4], date[4:6], date[6:8]
is_history = target_dt.date() < datetime.now(jst).date()
# 不再每次访问历史日期都强刷整个 HF 索引(管理页卡顿元凶)。
# 索引由后台/手动 rebuild_index 刷新即可。
_ = is_history
# 只取该日期的存档 (SQLite 索引): 从 10w+ 缩到几百条, 切日期 <10ms
existing_paths = {f['path']: f for f in archive_manager.list_by_date(date)}
results = []
seen_ch_clean = set()
for cid, epg in (epg_data.items() if isinstance(epg_data, dict) else {}.items()):
cid_str = str(cid)
ch_name = cmap.get(cid_str, cid_str)
ch_name_clean = _clean_name(ch_name)
programs = epg.get('data', []) if isinstance(epg, dict) else (epg if isinstance(epg, list) else [])
ch_results = []
for p in programs:
if not isinstance(p, dict):
continue
t_val = _parse_epg_time(p.get('time'))
if t_val is not None and not (start_ts <= t_val < end_ts):
continue
try:
dt = datetime.fromtimestamp(t_val, tz=jst) if t_val else datetime.now(jst)
time_str = dt.strftime("%H%M")
except Exception:
time_str = "0000"
prog_name = _clean_name(p.get('name') or p.get('title') or '未命名')
display_name = _build_display_name(date, time_str, ch_name_clean, prog_name)
md5 = archive_manager.display_name_to_md5(display_name)
video_path = archive_manager.make_video_path(year, month, day, ch_name_clean, md5)
is_archived = video_path in existing_paths
repo = existing_paths[video_path]['repo'] if is_archived else None
play_url = f"/api/archive/play?path={quote(video_path)}" if is_archived else ""
download_url = f"/api/archive/download?path={quote(video_path)}&name={quote(display_name)}" if is_archived else ""
m3u8_url = "" if is_archived else _make_m3u8_url(p, auth)
ch_results.append({
"time": time_str,
"prog_name": prog_name,
"display_name": display_name,
"md5": md5,
"is_archived": is_archived,
"video_path": video_path,
"repo": repo,
"file_size": int(existing_paths[video_path].get('file_size', 0)) if is_archived else 0,
"m3u8": m3u8_url,
"play_url": play_url,
"download_url": download_url,
"program": p,
})
if ch_results:
results.append({"ch_id": cid_str, "ch_name": ch_name, "programs": ch_results})
seen_ch_clean.add(ch_name_clean)
date_prefix = f"video_archives/{year}/{month}/{day}/"
ch_extra = {}
for vpath, finfo in existing_paths.items():
if not vpath.startswith(date_prefix):
continue
rest = vpath[len(date_prefix):]
parts = rest.split('/')
if len(parts) != 2 or not parts[1].endswith('.ts'):
continue
ch_name_clean = parts[0]
display_name = finfo.get('display_name', parts[1])
prog_name_parsed = display_name
time_str_parsed = "0000"
try:
stem = display_name[:-3] if display_name.endswith('.ts') else display_name
stem_parts = stem.split('_', 3)
if len(stem_parts) >= 4 and stem_parts[0] == date:
time_str_parsed = stem_parts[1]
prog_name_parsed = stem_parts[3]
except Exception:
pass
video_path = vpath
dl_url = f"/api/archive/download?path={quote(video_path)}&name={quote(display_name)}"
entry = {
"time": time_str_parsed,
"prog_name": prog_name_parsed,
"display_name": display_name,
"md5": parts[1][:-3],
"is_archived": True,
"video_path": video_path,
"repo": finfo.get('repo'),
"file_size": int(finfo.get('file_size') or 0),
"m3u8": "",
"play_url": f"/api/archive/play?path={quote(video_path)}",
"download_url": dl_url,
"program": {},
}
ch_extra.setdefault(ch_name_clean, []).append(entry)
result_map = {r['ch_name']: r for r in results}
for r in results:
ch_nc = _clean_name(r['ch_name'])
if ch_nc not in ch_extra:
continue
existing_paths_in_ch = {p['video_path'] for p in r['programs']}
for entry in ch_extra[ch_nc]:
if entry['video_path'] not in existing_paths_in_ch:
r['programs'].append(entry)
r['programs'].sort(key=lambda x: x.get('time', '0000'))
for ch_name_clean, entries in ch_extra.items():
if ch_name_clean in seen_ch_clean:
continue
entries_sorted = sorted(entries, key=lambda x: x.get('time', '0000'))
results.append({
"ch_id": ch_name_clean,
"ch_name": ch_name_clean,
"programs": entries_sorted,
})
return {"success": True, "date": date, "data": results}
class DownloadSingleReq(BaseModel):
date_str: str
ch_id: str
ch_name: Optional[str] = None
program: dict
m3u8: str
@app.post("/api/admin/archive/download_single")
async def download_single_archive(req: DownloadSingleReq):
async def _dl():
archive_manager.progress.update({"status": "running", "total": 1, "current": 1, "success": 0, "fail": 0, "current_task": "正在启动单点下载..."})
try:
auth = await get_auth()
channels = await get_channels(auth)
cmap = {}
for c in channels:
cmap[str(c.get('id'))] = c.get('name', '')
cmap[str(c.get('no'))] = c.get('name', '')
ch_name = cmap.get(str(req.ch_id), str(req.ch_id))
p = req.program
jst = timezone(timedelta(hours=9))
time_val = _parse_epg_time(p.get('time'))
try:
dt = datetime.fromtimestamp(time_val, tz=jst) if time_val else datetime.now(jst)
time_str = dt.strftime("%H%M")
except Exception:
time_str = "0000"
ch_name_clean = _clean_name(ch_name)
prog_name = _clean_name(p.get('name') or p.get('title') or '未命名')
year, month, day = req.date_str[:4], req.date_str[4:6], req.date_str[6:8]
display_name = _build_display_name(req.date_str, time_str, ch_name_clean, prog_name)
md5 = archive_manager.display_name_to_md5(display_name)
video_path = archive_manager.make_video_path(year, month, day, ch_name_clean, md5)
if archive_manager.db_exists(video_path):
archive_manager.progress.update({"status": "idle", "success": 1, "fail": 0, "current_task": f"云端已存在: {display_name}"})
return
temp_ts = os.path.join(tempfile.gettempdir(), md5 + ".ts")
success_dl = await internal_download_m3u8(req.m3u8, temp_ts, archive_manager.progress)
if success_dl and os.path.exists(temp_ts):
archive_manager.progress["current_task"] = f"正在上传至云端: {display_name}"
meta = _build_meta(req.date_str, time_str, str(req.ch_id), ch_name, ch_name_clean, prog_name, display_name, year, month, day, p)
ok = archive_manager.upload_video(temp_ts, year, month, day, ch_name_clean, display_name, meta)
if ok:
archive_manager.progress.update({"status": "idle", "success": 1, "fail": 0, "current_task": f"存档成功: {display_name}"})
archive_manager.refresh_archived_cache()
else:
archive_manager.progress.update({"status": "idle", "success": 0, "fail": 1, "current_task": "上传失败"})
if os.path.exists(temp_ts):
os.remove(temp_ts)
else:
archive_manager.progress.update({"status": "idle", "success": 0, "fail": 1, "current_task": "下载失败"})
if os.path.exists(temp_ts):
os.remove(temp_ts)
except Exception as e:
archive_manager.progress.update({"status": "idle", "current_task": f"Error: {str(e)}"})
asyncio.create_task(_dl())
return {"success": True}
async def run_archive_task():
global _archive_triggered
if archive_manager.progress["status"] == "running":
return
try:
await _run_archive_task_inner()
finally:
_archive_triggered = False
async def _run_archive_task_inner():
archive_manager.progress.update({
"status": "running", "total": 0, "current": 0,
"success": 0, "fail": 0, "skip": 0,
"current_task": "正在初始化...",
"ts_done": 0, "ts_total": 0,
"current_file": "", "current_ch": "", "downloading": 0,
})
# 写一条 start 事件 (同时强制 push DB 到 HF). 这样即使容器后面被杀,
# 重启后从 /api/admin/archive/diagnostic 也能看到 "任务确实跑过, 开始于 XX 时间".
archive_manager.log_task_event("start")
try:
auth = await get_auth()
channels = await get_channels(auth)
cmap = {}
for c in channels:
name = c.get('name', '')
if name:
cmap[str(c.get('id'))] = name
cmap[str(c.get('no'))] = name
jst = timezone(timedelta(hours=9))
now_jst = datetime.now(jst)
today_jst = get_jst_date()
try:
today_obj = datetime.strptime(today_jst, "%Y-%m-%d")
except Exception:
today_obj = datetime.strptime(today_jst, "%Y%m%d")
# 之前这里只建了两个空 set, 真正去重靠任务收集循环里每个节目调用 is_already_archived (两次 SQLite SELECT).
# 28 天 × 几十频道 × 几十节目 ≈ 10w+ 次 SQLite 查询, 即便有索引也会因为文件锁/Python 回调累计到几十秒,
# 表现就是 "正在加载已有存档索引..." 阶段看似卡住. 改成一次性把全表 path + display_name 读到 set 里,
# 后面循环全部 O(1) in-set 查询, 10w 条量级通常 <1s 完成预加载.
archive_manager.progress["current_task"] = "正在加载已有存档索引..."
loop = asyncio.get_event_loop()
existing_paths, existing_display_names = await loop.run_in_executor(None, archive_manager.load_existing_sets)
archive_manager.progress["current_task"] = f"已加载 {len(existing_paths)} 条索引, 准备拉取 EPG..."
repo_id = archive_manager._select_repo()
failed_set = await loop.run_in_executor(None, archive_manager.load_failed_set, repo_id)
cutoff_date = (today_obj - timedelta(days=365)).strftime("%Y%m%d")
failed_set = {fp for fp in failed_set if len(fp.split("/"))>=4 and fp.split("/")[1]+fp.split("/")[2]+fp.split("/")[3] > cutoff_date}
archive_manager.progress["current_task"] = "正在拉取全部 EPG..."
try:
all_epg = await get_all_epg(auth, channels, date=None, force=True)
except Exception:
all_epg = {}
jst_tz = timezone(timedelta(hours=9))
today_str = today_obj.strftime("%Y%m%d")
epg_by_date = {}
for ch_id, epg in all_epg.items():
programs = epg.get("data", []) if isinstance(epg, dict) else (epg if isinstance(epg, list) else [])
for p in programs:
t_val = _parse_epg_time(p.get("time"))
if t_val is None:
continue
d_str = datetime.fromtimestamp(t_val, tz=jst_tz).strftime("%Y%m%d")
if d_str > today_str:
continue
epg_by_date.setdefault(d_str, {}).setdefault(str(ch_id), {"data": []})["data"].append(p)
# 并发恢复历史EPG
try:
hf_epg_dates = await loop.run_in_executor(None, archive_manager.list_epg_dates)
missing_dates = sorted(hf_epg_dates - set(epg_by_date.keys()))
missing_dates = [md for md in missing_dates if md <= today_str]
if missing_dates:
archive_manager.progress["current_task"] = f"正在恢复 {len(missing_dates)} 天历史 EPG..."
async def _dl_epg(md):
try:
return md, await loop.run_in_executor(None, archive_manager.download_epg_json, md)
except: return md, {}
epg_results = await asyncio.gather(*[_dl_epg(md) for md in missing_dates])
for md, hf_epg in epg_results:
if hf_epg:
for ch_id, ch_epg in hf_epg.items():
progs = ch_epg.get("data", []) if isinstance(ch_epg, dict) else (ch_epg if isinstance(ch_epg, list) else [])
if progs:
epg_by_date.setdefault(md, {}).setdefault(str(ch_id), {"data": []})["data"].extend(progs)
except Exception:
pass
# 并发补全28天EPG
full_28_dates = {(today_obj - timedelta(days=i)).strftime("%Y%m%d") for i in range(28)}
missing_dates_28 = sorted(full_28_dates - set(epg_by_date.keys()))
if missing_dates_28:
archive_manager.progress["current_task"] = f"正在补全 {len(missing_dates_28)} 天 EPG..."
async def _fetch_day_epg(md):
fmt_date = f"{md[:4]}-{md[4:6]}-{md[6:8]}"
result = {}
sem = asyncio.Semaphore(5)
async def _fetch_ch(ch):
ch_id = str(ch.get('id', ''))
if not ch_id: return
if epg_by_date.get(md, {}).get(ch_id, {}).get("data"): return
async with sem:
try:
progs = await fetch_epg(ch_id, fmt_date, auth)
if progs: result.setdefault(ch_id, {"data": []})["data"].extend(progs)
except: pass
await asyncio.gather(*[_fetch_ch(ch) for ch in channels])
return md, result
day_results = await asyncio.gather(*[_fetch_day_epg(md) for md in missing_dates_28])
for md, res in day_results:
for ch_id, ch_data in res.items():
epg_by_date.setdefault(md, {}).setdefault(ch_id, {"data": []})["data"].extend(ch_data["data"])
sorted_dates = sorted(epg_by_date.keys())
archive_manager.progress["current_task"] = f"正在同步 {len(sorted_dates)} 天 EPG..."
try:
await loop.run_in_executor(None, archive_manager.upload_epg_batch, epg_by_date)
except Exception:
pass
tasks_to_run = []
seen_video_paths = set()
for d_str in sorted_dates:
dt_obj_d = datetime.strptime(d_str, "%Y%m%d").replace(tzinfo=jst)
start_ts = dt_obj_d.timestamp()
end_ts = (dt_obj_d + timedelta(days=1)).timestamp()
year, month, day = d_str[:4], d_str[4:6], d_str[6:8]
for ch_id, epg in epg_by_date.get(d_str, {}).items():
cid_str = str(ch_id)
ch_name = cmap.get(cid_str, cid_str)
ch_name_clean = _clean_name(ch_name)
programs = epg.get('data', []) if isinstance(epg, dict) else (epg if isinstance(epg, list) else [])
programs.sort(key=lambda p: _parse_epg_time(p.get('time')) or 0)
for program in programs:
if not isinstance(program, dict): continue
t_val = _parse_epg_time(program.get('time'))
if t_val is not None and not (start_ts <= t_val < end_ts): continue
m3u8_url = _make_m3u8_url(program, auth)
if not m3u8_url: continue
try:
dt = datetime.fromtimestamp(t_val, tz=jst) if t_val else datetime.now(jst)
time_str = dt.strftime("%H%M")
except: time_str = "0000"
prog_name = _clean_name(program.get('name') or program.get('title') or '未命名')
if '休止' in (program.get('name') or program.get('title') or ''): continue
display_name = _build_display_name(d_str, time_str, ch_name_clean, prog_name)
md5 = archive_manager.display_name_to_md5(display_name)
video_path = archive_manager.make_video_path(year, month, day, ch_name_clean, md5)
# 去重全部走内存 set (启动时从 SQLite 一次性加载); 不再逐项查 SQLite
if video_path in existing_paths: continue
if video_path in seen_video_paths: continue
if display_name in existing_display_names: continue
if video_path in failed_set: continue
seen_video_paths.add(video_path)
tasks_to_run.append({
'program': program, 'ch_id': cid_str, 'ch_name': ch_name,
'ch_name_clean': ch_name_clean, 'prog_name': prog_name,
'date_str': d_str, 'time_str': time_str,
'year': year, 'month': month, 'day': day,
'display_name': display_name, 'md5': md5,
'video_path': video_path, 'm3u8': m3u8_url,
})
# 上游切片源 28 天后失效, 必须用秒级时间戳精确判断 (28*86400):
# 按天截断会把 "今日零点 - 28d" 整天的节目都判为还在窗口内, 但其中凌晨/上午的节目其实早过期, 跑了只会刷 fail;
# 反过来 28 天前傍晚的节目本来还有几小时窗口, 也可能被一刀切错杀.
now_ts_28 = datetime.now(jst).timestamp()
cutoff_ts_28 = now_ts_28 - 28 * 86400
def _task_ts(t):
ts = _parse_epg_time(t['program'].get('time'))
if ts is not None: return ts
# 节目时间解析不到时退化为该日 12:00 JST, 避免整天被错放/错杀
try:
return datetime.strptime(t['date_str'], "%Y%m%d").replace(tzinfo=jst, hour=12).timestamp()
except Exception: return 0.0
tasks_to_run = [t for t in tasks_to_run if _task_ts(t) >= cutoff_ts_28]
# 按节目实际开播时间从旧到新排序, 优先抢救最接近 28 天过期阈值的节目
tasks_to_run.sort(key=_task_ts)
total_tasks = len(tasks_to_run)
archive_manager.progress["total"] = total_tasks
if not tasks_to_run:
archive_manager.progress.update({"status": "idle", "current_task": "无新任务"})
return
CONCURRENCY = Config.ARCHIVE_DOWNLOAD_CONCURRENCY
INDEX_SAVE_INTERVAL = 50
new_failures = set()
CONSEC_FAIL_LIMIT = 10
date_fail_counts = {}
repos = archive_manager.repos if archive_manager.repos else [repo_id]
repo_queues = {r: asyncio.Queue() for r in repos}
upload_done = asyncio.Event()
progress_lock = asyncio.Lock()
repo_rr_idx = 0
async def _upload_worker(worker_repo_id, q):
pending_temp_files = []
while True:
try: item = q.get_nowait()
except asyncio.QueueEmpty:
if upload_done.is_set(): break
try: item = await asyncio.wait_for(q.get(), timeout=1.0)
except asyncio.TimeoutError:
if upload_done.is_set(): break
continue
if item is None: break
task_item = item['task']
added_ok = archive_manager.add_to_repo_batch(
worker_repo_id, item['temp_ts'], task_item['year'], task_item['month'],
task_item['day'], task_item['ch_name_clean'], task_item['display_name'], item['meta'])
if added_ok:
pending_temp_files.append(item['temp_ts'])
else:
# 之前这里忽略 add_to_repo_batch 返回值, 仓满时 item 被静默丢弃, 临时文件随下次
# flush 一起被 os.remove, 视频彻底消失. 现在改成: (1) 先尝试换其它仓重投;
# (2) 仍不行则单项走 upload_video (内部会自选可用仓 + 自带 3 次重试). 绝不丢临时文件.
reput_ok = False
try:
for _alt in archive_manager.get_available_repos():
if _alt == worker_repo_id: continue
if _alt not in repo_queues: continue
try:
await repo_queues[_alt].put(item)
reput_ok = True
break
except Exception:
continue
except Exception:
pass
if not reput_ok:
def _single_upload():
try:
return archive_manager.upload_video(
item['temp_ts'], task_item['year'], task_item['month'],
task_item['day'], task_item['ch_name_clean'],
task_item['display_name'], item['meta'], False)
except Exception:
return False
ok_single = await loop.run_in_executor(None, _single_upload)
async with progress_lock:
if ok_single:
archive_manager.progress["success"] += 1
else:
archive_manager.progress["fail"] += 1
try:
if os.path.exists(item['temp_ts']): os.remove(item['temp_ts'])
except Exception:
pass
if archive_manager.repo_batch_ready(worker_repo_id):
batch_n = archive_manager.repo_batch_size(worker_repo_id)
ok = await loop.run_in_executor(None, archive_manager.flush_repo_batch, worker_repo_id)
for tf in pending_temp_files:
try:
if os.path.exists(tf): os.remove(tf)
except Exception:
pass
pending_temp_files.clear()
async with progress_lock:
archive_manager.progress["success"] += ok
archive_manager.progress["fail"] += max(0, batch_n - ok)
if archive_manager.progress["success"] % INDEX_SAVE_INTERVAL == 0:
await loop.run_in_executor(None, archive_manager.flush_index)
q.task_done()
remaining = archive_manager.repo_batch_size(worker_repo_id)
if remaining > 0:
ok = await loop.run_in_executor(None, archive_manager.flush_repo_batch, worker_repo_id)
async with progress_lock:
archive_manager.progress["success"] += ok
archive_manager.progress["fail"] += max(0, remaining - ok)
for tf in pending_temp_files:
try:
if os.path.exists(tf): os.remove(tf)
except Exception:
pass
upload_workers = [asyncio.create_task(_upload_worker(r, repo_queues[r])) for r in repos]
dl_semaphore = asyncio.Semaphore(CONCURRENCY)
completed_count = 0
completed_lock = asyncio.Lock()
async def _download_one(idx, task):
nonlocal completed_count, repo_rr_idx
if archive_manager.progress["status"] == "stopping": return
d_str = task['date_str']
if date_fail_counts.get(d_str, 0) >= CONSEC_FAIL_LIMIT:
async with progress_lock:
archive_manager.progress["skip"] += 1
# 不要把熔断跳过的项目写入 failed.json。
# 28 天窗口内的节目只是这一轮上游/限流抖动被 circuit breaker 误杀,
# 写进 failed.json 会让下次任务 (`if video_path in failed_set: continue`)
# 永远 filter 掉它们 —— 这是 "只能下几百个, 越来越少, 失败也不重试" 的根因.
# 本轮内的 retry-loop 仍会用 `archive_manager.db_exists` 自动捞回来重试.
async with completed_lock:
completed_count += 1; archive_manager.progress["current"] = completed_count
return
async with dl_semaphore:
# 暂停/停止 check 必须在信号量内: asyncio.gather 启动时会把所有协程瘦跑一遍函数头,
# 之后全在 dl_semaphore 上排队. 把 check 搼出函数头 / 放在 semaphore 之前都不生效,
# 只有放在 semaphore acquire 后才能让 2000 个排队任务逐个 check pause/stop 状态.
while archive_manager.progress["status"] in ("pausing", "paused"):
archive_manager.progress["status"] = "paused"
await asyncio.sleep(1)
if archive_manager.progress["status"] == "stopping": return
async with progress_lock:
archive_manager.progress["downloading"] = archive_manager.progress.get("downloading", 0) + 1
d_label = f"{task['date_str'][:4]}/{task['date_str'][4:6]}/{task['date_str'][6:8]}"
archive_manager.progress["current_task"] = f"[{d_label}] {task['prog_name']}"
archive_manager.progress["current_file"] = task['display_name']
archive_manager.progress["current_ch"] = task['ch_name']
temp_ts = os.path.join(tempfile.gettempdir(), task['md5'] + ".ts")
try:
success_dl = await internal_download_m3u8(task['m3u8'], temp_ts, archive_manager.progress)
except Exception:
success_dl = False
async with completed_lock:
completed_count += 1; archive_manager.progress["current"] = completed_count
# 每 25 个完成项打一条 heartbeat (带 rss/disk + 强制 push DB 到 HF).
# 容器被 SIGKILL 后, /diagnostic 能看到最后一条 heartbeat 的 rss/disk,
# 直接判断死因: rss 逼近 16GB → OOM; disk 逼近 50GB → 磁盘满; 都不接近 → 基础设施 kill.
if completed_count % 25 == 0:
try: archive_manager.log_task_event("heartbeat", f"completed={completed_count}/{archive_manager.progress.get('total',0)}")
except Exception: pass
if success_dl and os.path.exists(temp_ts):
meta = _build_meta(
task['date_str'], task['time_str'], task['ch_id'], task['ch_name'],
task['ch_name_clean'], task['prog_name'], task['display_name'],
task['year'], task['month'], task['day'], task['program'])
existing_paths.add(task['video_path'])
existing_display_names.add(task['display_name'])
available_repos = archive_manager.get_available_repos()
if not available_repos:
# 之前这里直接 status="stopping" 把整个任务一刀切, 任何一刻 size 计算抖动
# / 限流让 _repo_sizes 算出来全部 >= lim, 就会让主循环+重试循环全部退出,
# 用户表现 = "跑了 50~200 项就无声停止". 现在改成: 先强刷一次实际仓体积,
# 仍然没空位就只把这一项算 fail (且不写 failed.json), 任务继续跑后面项目,
# 这个 video_path 留给后面的重试循环按 db_exists 兜底.
def _refresh_all_repos():
for _r in archive_manager.repos:
try: archive_manager._force_refresh(_r)
except Exception: pass
try:
# 24 仓 × ~2s/仓 的 dataset_info 是同步 HTTP 调用, 直接在 async 里跑会阻塞
# 事件循环 ~48s, HF Space healthcheck (通常 30s 超时) 会判定为死亡 → 重启容器,
# 表现: dashboard 突然回到 '空闲 等待任务开始'. 丢线程池.
await loop.run_in_executor(None, _refresh_all_repos)
except Exception:
pass
available_repos = archive_manager.get_available_repos()
if not available_repos:
async with progress_lock:
archive_manager.progress["fail"] += 1
archive_manager.progress["current_task"] = f"所有 dataset 暂时无空位, 跳过单项继续: {task['display_name']}"
try:
if os.path.exists(temp_ts): os.remove(temp_ts)
except: pass
return
target_repo = available_repos[repo_rr_idx % len(available_repos)]
repo_rr_idx += 1
await repo_queues[target_repo].put({'temp_ts': temp_ts, 'task': task, 'meta': meta})
date_fail_counts[d_str] = 0
else:
async with progress_lock:
if task['date_str'] <= cutoff_date:
new_failures.add(task['video_path'])
archive_manager.progress["skip"] += 1
else:
archive_manager.progress["fail"] += 1
date_fail_counts[d_str] = date_fail_counts.get(d_str, 0) + 1
try:
if os.path.exists(temp_ts): os.remove(temp_ts)
except: pass
async with progress_lock:
archive_manager.progress["downloading"] = max(0, archive_manager.progress.get("downloading", 1) - 1)
# 分批下载: 一次性把几千个任务全 schedule 给 asyncio.gather, 即便 semaphore=16 限并发,
# 也会出现这些问题: (1) 后批次跑到时 auth token 已经接近过期, 大量任务卡在 token-refresh;
# (2) 上游 vod 站连续被打 30+ 分钟容易触发节流, 整体失败率上升;
# (3) 单次任务整体周期被拉长, 一旦 token 二次过期或上游短暂抖动, 影响面就是几千项.
# 改成每批 BATCH_SIZE 项, 批间刷新 auth + m3u8 URL, 批间短暂冷却, 显著降低批次内失败率.
# 200 → 50: HF Space 免费档容器随时可能被基础设施重启 (实测 batch=1/11 之后被杀,
# 无 crash 事件 = SIGKILL 硬杀). 缩小 batch → flush_index 入库频率 ×4,
# 单次被杀最多丢 50 项的工作, 重启后 retry-loop 会从 SQLite 兜底捞回未入库项继续跑.
BATCH_SIZE = 50
total_batches = (total_tasks + BATCH_SIZE - 1) // BATCH_SIZE
for batch_start in range(0, total_tasks, BATCH_SIZE):
if archive_manager.progress["status"] == "stopping": break
batch = tasks_to_run[batch_start:batch_start + BATCH_SIZE]
batch_no = batch_start // BATCH_SIZE + 1
# 单批异常隔离: 之前任何批次抛错都会冒泡到外层 try/except, 整个任务一次性结束.
# 现在改成: 单批异常只 log 一下, 剩下的批次继续跑; 未完成项交给后面的重试循环按 db_exists 兜底.
try:
# 第一批用初始 auth, 之后每批先刷一次 token 并重建 m3u8 URL,
# 避免后批拿着老 token 整批触发 401 → token-refresh 串行风暴.
if batch_no > 1:
archive_manager.progress["current_task"] = f"批次 {batch_no}/{total_batches}: 刷新 auth token..."
try:
auth = await get_auth(force=True)
for t in batch:
new_url = _make_m3u8_url(t['program'], auth)
if new_url: t['m3u8'] = new_url
except Exception: pass
# 批间 5s → 2s 冷却: 上游限流计数器衰减并不需要 5s, 2s 足够 + 响应停止指令
for _i in range(2):
if archive_manager.progress["status"] == "stopping": break
await asyncio.sleep(1)
if archive_manager.progress["status"] == "stopping": break
archive_manager.progress["current_task"] = f"批次 {batch_no}/{total_batches}: 下载 {len(batch)} 项"
# 每批独立熔断计数: 上一批某天累计的失败不应该影响下一批同一天的尝试
date_fail_counts.clear()
batch_dl_tasks = [_download_one(batch_start + i, t) for i, t in enumerate(batch)]
await asyncio.gather(*batch_dl_tasks)
# 心跳: 每批完成写一条 batch_done 到 task_log, 容器重启后从 diagnostic 接口能看到跑到哪了
try: archive_manager.log_task_event("batch_done", f"batch={batch_no}/{total_batches}")
except Exception: pass
except Exception as _be:
import traceback as _tb; _tb.print_exc()
archive_manager.progress["current_task"] = f"批次 {batch_no} 异常已隔离, 继续下一批: {str(_be)[:80]}"
continue
upload_done.set()
await asyncio.gather(*upload_workers)
# flush_index 内部串行调用 24 仓 flush_repo_batch + upload_db_to_hf + 可能的 squash_history,
# 都是同步 HF API, 总耗时 30-60s. 直接在 async 里调 → 阻塞事件循环 → healthcheck 超时
# → HF 杀容器 → dashboard 回 '空闲 等待任务开始'. 丢线程池.
await loop.run_in_executor(None, archive_manager.flush_index)
try: archive_manager.log_task_event("main_loop_done", f"batches={total_batches}")
except Exception: pass
# ====================================================================
# 自动重试: 主任务跑完后, 凡是没落到 SQLite 的都再跑一轮
# 以“是否入库”为唯一真源, 覆盖所有失败分支: download fail / batch commit fail /
# 10-连续-失败-skip / >365d-old-skip; date_fail_counts 重置让上轮的假阳性 skip 也能重试
# ====================================================================
# 无限重试: 只要还有未入库且未过期的项, 就一直跳下一轮重试;
# 退出条件三选一: 全部入库 / 用户点停止 / 连续 2 轮 pending 数量未减少 (剩下的是不可恢复项)
retry_round = 0
retry_total_count = 0
no_progress_rounds = 0
prev_pending_count = -1
while True:
if archive_manager.progress["status"] == "stopping": break
# 被用户暂停时, 在等待重试间隔中也应该能被暂停
while archive_manager.progress["status"] in ("pausing", "paused"):
archive_manager.progress["status"] = "paused"
archive_manager.progress["current_task"] = "重试间隔已暂停, 等待恢复..."
await asyncio.sleep(1)
if archive_manager.progress["status"] == "stopping": break
pending_retry = [t for t in tasks_to_run if not archive_manager.db_exists(t['video_path'])]
now_ts_r = datetime.now(jst).timestamp()
cutoff_ts_r = now_ts_r - 28 * 86400
pending_retry = [t for t in pending_retry if _task_ts(t) >= cutoff_ts_r]
if not pending_retry: break
# 用户要求: 不允许提前停止. 即便连续多轮无进展, 也只是把 backoff 拉长让上游恢复,
# 永远不主动 break — 退出只能靠 (1) 全部入库 (上面 `if not pending_retry: break`),
# 或者 (2) 用户手动点停止 (上面 `if status == "stopping": break`).
# 之前的 4 轮 break 会在上游 vod 站持续抖动 ≥10 分钟时把几百项未处理任务直接放弃,
# 表现为 "完成: 成功 X, 失败 Y" 但 X+Y 远小于 total.
if prev_pending_count >= 0 and len(pending_retry) >= prev_pending_count:
no_progress_rounds += 1
else:
no_progress_rounds = 0
prev_pending_count = len(pending_retry)
retry_round += 1
archive_manager.progress["current_task"] = f"第 {retry_round} 轮重试: 刷新 auth token..."
try:
auth = await get_auth(force=True)
rebuilt = 0
for t in pending_retry:
new_url = _make_m3u8_url(t['program'], auth)
if new_url:
t['m3u8'] = new_url
rebuilt += 1
archive_manager.progress["current_task"] = f"第 {retry_round} 轮: 已刷新 {rebuilt} 个 URL"
except Exception as _e:
archive_manager.progress["current_task"] = f"第 {retry_round} 轮: auth 刷新失败 {_e}"
pending_retry_with_url = [t for t in pending_retry if t.get('m3u8')]
if not pending_retry_with_url:
# 之前: 全部 m3u8 重建失败 (通常是 auth 暂时拉不到 / 上游接口挂了) 就直接 break,
# 整个任务退出, 但 SQLite 里这些项没入库, 用户表现 = "几十项就停".
# 现在: 60s 后重新进下一轮, 给 auth/上游恢复窗口, 永远不放弃未入库项.
archive_manager.progress["current_task"] = f"第 {retry_round} 轮: m3u8 全部重建失败, 60s 后再来一轮 ({len(pending_retry)} 项)"
for _s in range(60):
if archive_manager.progress["status"] == "stopping": break
await asyncio.sleep(1)
continue
pending_retry = pending_retry_with_url
# 让上游限流恢复. 普通节奏: 30 / 60 / 90 / 120s, 4 轮起稳定 120s.
# 连续 ≥3 轮无进展 (上游可能彻底挂了): 在 120s 基础上每多 1 轮 +60s, 上限 600s (10min),
# 给上游充分恢复窗口, 但绝不主动放弃任务 — 用户要求必须把每个视频都尝试到底.
if no_progress_rounds >= 3:
backoff_sec = min(120 + 60 * (no_progress_rounds - 2), 600)
else:
backoff_sec = min(30 * retry_round, 120)
for _i in range(backoff_sec):
if archive_manager.progress["status"] == "stopping": break
# 暂停期间 backoff 计时暂停
if archive_manager.progress["status"] in ("pausing", "paused"):
await asyncio.sleep(1)
continue
archive_manager.progress["current_task"] = f"第 {retry_round} 轮: 等待 {backoff_sec - _i}s 后重试 ({len(pending_retry)} 项)"
await asyncio.sleep(1)
if archive_manager.progress["status"] == "stopping": break
n_retry = len(pending_retry)
retry_total_count += n_retry
n_retry_skip = sum(1 for t in pending_retry if t['video_path'] in new_failures)
n_retry_fail = n_retry - n_retry_skip
archive_manager.progress["skip"] = max(0, archive_manager.progress.get("skip", 0) - n_retry_skip)
archive_manager.progress["fail"] = max(0, archive_manager.progress["fail"] - n_retry_fail)
archive_manager.progress["total"] += n_retry
new_failures -= {t['video_path'] for t in pending_retry}
archive_manager.progress["current_task"] = f"开始重试 {n_retry} 项 (第 {retry_round} 轮)"
# 重试单轮异常隔离: 单轮抛错不应该让整个重试循环退出.
try:
upload_done = asyncio.Event()
repo_queues = {r: asyncio.Queue() for r in repos}
upload_workers = [asyncio.create_task(_upload_worker(r, repo_queues[r])) for r in repos]
date_fail_counts.clear()
completed_count = 0
archive_manager.progress["current"] = 0
retry_dl_tasks = [_download_one(0, t) for t in pending_retry]
await asyncio.gather(*retry_dl_tasks)
upload_done.set()
await asyncio.gather(*upload_workers)
# 同主循环, flush_index 必须丢线程池 (24 仓 同步 HF API 会阻塞事件循环)
await loop.run_in_executor(None, archive_manager.flush_index)
try: archive_manager.log_task_event("retry_round_done", f"round={retry_round}, n_retry={n_retry}")
except Exception: pass
except Exception as _re:
import traceback as _tb; _tb.print_exc()
archive_manager.progress["current_task"] = f"重试第 {retry_round} 轮异常已隔离, 继续下一轮: {str(_re)[:80]}"
try: upload_done.set()
except Exception: pass
continue
if new_failures or retry_total_count > 0:
merged = failed_set | new_failures
# 凡是已落库的, 都不应该残留在 failed.json (重试成功项 / 历史脏数据)
succeeded_now = {t['video_path'] for t in tasks_to_run if archive_manager.db_exists(t['video_path'])}
merged -= succeeded_now
for r in repos:
await loop.run_in_executor(None, archive_manager.save_failed_set, r, merged)
s = archive_manager.progress
was_stopped = (s["status"] == "stopping")
status_word = "已停止" if was_stopped else "完成"
retry_hint = f" [重试 {retry_total_count}]" if retry_total_count > 0 else ""
summary = f"{status_word}: 成功 {s['success']}, 失败 {s['fail']}, 过期跳过 {s.get('skip',0)}, 共 {s['total']}{retry_hint}"
archive_manager.progress["current_task"] = summary
archive_manager.progress["current_file"] = ""
archive_manager.progress["ts_done"] = 0
archive_manager.progress["ts_total"] = 0
archive_manager.progress["downloading"] = 0
try: archive_manager.log_task_event("end", summary)
except Exception: pass
# 最后一次全量 flush也走线程池, 避免事件循环在收尾被同步 HF API 阻塞
await loop.run_in_executor(None, archive_manager.flush_index)
archive_manager.refresh_archived_cache()
except Exception as e:
archive_manager.progress["current_task"] = f"任务崩溃: {str(e)}"
import traceback as _tb; _tb.print_exc()
try: archive_manager.log_task_event("crash", str(e)[:300])
except Exception: pass
finally:
archive_manager.progress.update({"status": "idle"})
@app.get("/api/archive/url")
async def get_archive_url(path: str):
item = await archive_manager.get_by_path(path)
if not item:
raise HTTPException(status_code=404, detail="文件未找到")
hf_url = archive_manager.get_hf_raw_url(item['repo'], item['path'])
def _resolve_url():
try:
r = requests.head(hf_url, allow_redirects=True, timeout=15, headers={})
return r.url, r.headers.get("content-length", "")
except Exception:
return hf_url, ""
loop = asyncio.get_event_loop()
final_url, content_length = await loop.run_in_executor(None, _resolve_url)
return {
"url": final_url,
"content_length": content_length,
"display_name": item.get('display_name', ''),
}
@app.get("/api/archive/play")
async def play_archive(path: str, request: Request):
# 异步 httpx 流式转发;HF 连接超时/异常不要让 ASGI 直接 500
item = await archive_manager.get_by_path(path)
if not item:
raise HTTPException(status_code=404, detail="文件未找到")
hf_url = archive_manager.get_hf_raw_url(item['repo'], item['path'])
hf_h = {}
rng = request.headers.get("Range")
if rng:
hf_h["Range"] = rng
client = httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=10.0), follow_redirects=True)
try:
upstream = await client.send(client.build_request("GET", hf_url, headers=hf_h), stream=True)
except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.ConnectError, httpx.RemoteProtocolError) as e:
await client.aclose()
return JSONResponse(content={"success": False, "error": f"上游连接失败: {type(e).__name__}"}, status_code=504)
except Exception as e:
await client.aclose()
return JSONResponse(content={"success": False, "error": f"上游请求异常: {str(e)[:120]}"}, status_code=502)
resp_h = {}
upstream_ce = upstream.headers.get("Content-Encoding", "").lower()
pass_len = (not upstream_ce) or upstream_ce == "identity"
for k in ["Content-Type", "Content-Range", "Accept-Ranges", "ETag", "Last-Modified"]:
if k in upstream.headers:
resp_h[k] = upstream.headers[k]
if pass_len and "Content-Length" in upstream.headers:
resp_h["Content-Length"] = upstream.headers["Content-Length"]
resp_h["Accept-Ranges"] = "bytes"
# 关键: 标记为 identity, 阻止 GZipMiddleware 二次压缩, 否则
# 浏览器会因 chunked + content-encoding: gzip + attachment 判定下载未完成而不保存。
resp_h["Content-Encoding"] = "identity"
async def _it():
try:
async for chunk in upstream.aiter_bytes(1024 * 1024):
if chunk:
yield chunk
except Exception:
return
finally:
try: await upstream.aclose()
except: pass
try: await client.aclose()
except: pass
return StreamingResponse(_it(), status_code=upstream.status_code, headers=resp_h, media_type="video/MP2T")
@app.get("/api/archive/download")
async def download_archive(path: str, name: Optional[str] = None):
# 异步流式下载;修复中文文件名导致 starlette latin-1 编码崩溃,以及 HF 连接超时未捕获 500
item = await archive_manager.get_by_path(path)
if not item:
raise HTTPException(status_code=404, detail="文件未找到")
hf_url = archive_manager.get_hf_raw_url(item['repo'], item['path'])
filename = name or item.get('display_name') or os.path.basename(path)
if not filename.endswith('.ts'):
filename += '.ts'
# HTTP 头只能 latin-1;中文/非 ASCII 都走 RFC 5987 的 filename*
ascii_filename = re.sub(r'[^\x20-\x7e]', '_', filename).strip().strip('"') or 'archive.ts'
encoded_name = quote(filename, safe='')
client = httpx.AsyncClient(timeout=httpx.Timeout(300.0, connect=10.0), follow_redirects=True)
try:
upstream = await client.send(client.build_request("GET", hf_url), stream=True)
except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.ConnectError, httpx.RemoteProtocolError) as e:
await client.aclose()
return JSONResponse(content={"success": False, "error": f"上游连接失败: {type(e).__name__}"}, status_code=504)
except Exception as e:
await client.aclose()
return JSONResponse(content={"success": False, "error": f"上游请求异常: {str(e)[:120]}"}, status_code=502)
resp_h = {}
upstream_ce = upstream.headers.get("Content-Encoding", "").lower()
# 上游若已 gzip, 透传它的 Content-Length 会和真实落地字节数对不上, 必须丢掉。
if (not upstream_ce or upstream_ce == "identity") and "Content-Length" in upstream.headers:
resp_h["Content-Length"] = upstream.headers["Content-Length"]
if "ETag" in upstream.headers:
resp_h["ETag"] = upstream.headers["ETag"]
resp_h["Content-Disposition"] = f"attachment; filename=\"{ascii_filename}\"; filename*=UTF-8''{encoded_name}"
resp_h["Accept-Ranges"] = "bytes"
# 关键: 标记 identity 让 GZipMiddleware 跳过, 否则二进制 .ts 会被重新 gzip+chunked,
# 浏览器对 attachment 类响应会因 EOF 不确定 / Content-Length 缺失而不保存到本地。
resp_h["Content-Encoding"] = "identity"
async def _it():
try:
async for chunk in upstream.aiter_bytes(1024 * 1024):
if chunk:
yield chunk
except Exception:
return
finally:
try: await upstream.aclose()
except: pass
try: await client.aclose()
except: pass
return StreamingResponse(_it(), status_code=upstream.status_code, headers=resp_h, media_type="application/octet-stream")
@app.get("/api/admin/archive/list")
async def list_archives():
return {"success": True, "files": await archive_manager.list_all_archives()}
@app.get("/api/admin/archive/stats")
async def archive_stats():
"""轻量存档统计: SQLite 聚合 + HF dataset 真实仓体积 (均走索引/缓存, 不扫 HF 文件列表)"""
loop = asyncio.get_event_loop()
def _stats():
with archive_manager._db_lock:
cur = archive_manager._db.execute("SELECT COUNT(*), COALESCE(SUM(file_size),0) FROM archives")
total_count, total_size_db = cur.fetchone()
cur = archive_manager._db.execute(
"SELECT date, COUNT(*), COALESCE(SUM(file_size),0) FROM archives WHERE date != '' GROUP BY date ORDER BY date DESC LIMIT 30")
by_date = [{"date": r[0], "count": r[1], "size": int(r[2] or 0)} for r in cur.fetchall()]
cur = archive_manager._db.execute(
"SELECT channel, COUNT(*), COALESCE(SUM(file_size),0) FROM archives WHERE channel != '' GROUP BY channel ORDER BY 2 DESC LIMIT 50")
by_channel = [{"channel": r[0], "count": r[1], "size": int(r[2] or 0)} for r in cur.fetchall()]
cur = archive_manager._db.execute("SELECT MAX(date) FROM archives WHERE date != ''")
latest_date = cur.fetchone()[0] or ""
# HF dataset 仓真实体积. dataset_info(files_metadata=True) 一仓 1-3s,
# 24 仓串行会卡 30-60s; 线程池并行, 之后命中 30min 缓存秒回
repo_list = list(archive_manager.repos)
def _one(r):
try: return r, int(archive_manager._ensure_size(r) or 0)
except Exception: return r, 0
repos = []
if repo_list:
with ThreadPoolExecutor(max_workers=min(8, len(repo_list))) as ex:
repos = [{"repo": r, "size": s} for r, s in ex.map(_one, repo_list)]
total_size_repos = sum(r["size"] for r in repos)
return {
"total_count": int(total_count or 0),
"total_size": total_size_repos if total_size_repos > 0 else int(total_size_db or 0),
"total_size_db": int(total_size_db or 0),
"total_size_repos": total_size_repos,
"repos": repos,
"latest_date": latest_date,
"by_date": by_date,
"by_channel": by_channel,
}
stats = await loop.run_in_executor(None, _stats)
return {"success": True, "stats": stats}
@app.get("/api/admin/archive/dates")
async def list_archive_dates():
# SQLite DISTINCT date 查询, O(索引) 不再扫全表
dates = set(archive_manager.list_dates())
jst_tz = timezone(timedelta(hours=9))
now_jst = datetime.now(jst_tz)
for i in range(28):
dates.add((now_jst - timedelta(days=i)).strftime("%Y%m%d"))
return {"success": True, "dates": sorted(dates)}
@app.post("/api/admin/archive/rebuild_index")
async def rebuild_archive_index():
if not archive_manager.api or not archive_manager.repos:
return {"success": False, "error": "archive_manager not configured"}
loop = asyncio.get_event_loop()
repos = list(archive_manager.repos)
archive_manager.progress["current_task"] = f"重建中: 并行扫描 {len(repos)} 个 dataset..."
repo_done = {"n": 0}
repo_done_lock = threading.Lock()
def _scan_repo(repo_id):
# 每个 repo 一个线程: 内部 _build_index_from_meta 会再开 16 路拉 .ts.meta
try:
idx = archive_manager._build_index_from_meta(repo_id)
except Exception:
idx = {}
rs = []
for vp, info in idx.items():
# 重建索引顺手写入 file_size, 之后 SQLite SUM 也准 (不再依赖 HF dataset_info)
rs.append((info.get("repo", repo_id), vp, info.get("display_name") or os.path.basename(vp), int(info.get("file_size") or 0)))
with repo_done_lock:
repo_done["n"] += 1
archive_manager.progress["current_task"] = f"重建中: {repo_done['n']}/{len(repos)} 仓 完成, 累计 {sum(1 for _ in rs)+0} 条 (本仓) → {repo_id}"
return rs
def _rebuild_parallel():
rows = []
with ThreadPoolExecutor(max_workers=max(1, min(len(repos), 8))) as ex:
for rs in ex.map(_scan_repo, repos):
rows.extend(rs)
if not rows:
archive_manager.progress["current_task"] = "重建完成: 0 条"
return 0
archive_manager.progress["current_task"] = f"重建中: 写入 SQLite ({len(rows)} 条)..."
with archive_manager._db_lock:
archive_manager._db.execute("DELETE FROM archives")
archive_manager._db_insert_many(rows)
archive_manager.progress["current_task"] = "重建中: 上传 archive_index.db..."
archive_manager.upload_db_to_hf()
archive_manager.progress["current_task"] = f"重建完成: {len(rows)} 条"
return len(rows)
count = await loop.run_in_executor(None, _rebuild_parallel)
return {"success": True, "count": count}
@app.post("/api/admin/archive/cleanup_db")
async def cleanup_db_history():
"""压缩索引仓 main 分支历史: 释放历次上传 archive_index.db 累积的旧 LFS blob。
100GB 索引仓快满时手动调用, 立即回收空间。任务结束 flush_index 也会自动判断是否触发。"""
loop = asyncio.get_event_loop()
ok = await loop.run_in_executor(None, archive_manager.squash_db_history)
return {"success": ok}
@app.get("/api/admin/epg/sync_now")
async def epg_sync_now():
try:
auth = await get_auth()
channels = await get_channels(auth)
raw = await get_all_epg(auth, channels, date=None, force=True)
except Exception as e:
return {"success": False, "error": str(e), "traceback": traceback.format_exc()}
jst_tz = timezone(timedelta(hours=9))
epg_by_date = {}
total_programs = 0
empty_channels = []
if isinstance(raw, dict):
for ch_id, ch_epg in raw.items():
programs = ch_epg.get('data', []) if isinstance(ch_epg, dict) else []
if not programs:
empty_channels.append(ch_id)
continue
for p in programs:
ts = _parse_epg_time(p.get('time'))
if ts is None:
continue
d_str = datetime.fromtimestamp(ts, tz=jst_tz).strftime('%Y%m%d')
if d_str not in epg_by_date:
epg_by_date[d_str] = {}
if ch_id not in epg_by_date[d_str]:
epg_by_date[d_str][ch_id] = {'data': []}
epg_by_date[d_str][ch_id]['data'].append(p)
total_programs += 1
upload_results = {}
loop = asyncio.get_event_loop()
for d_str, d_epg in epg_by_date.items():
try:
ok = await loop.run_in_executor(None, archive_manager.upload_epg_json, d_str, d_epg)
upload_results[d_str] = "ok" if ok else "failed"
except Exception as e:
upload_results[d_str] = f"exception: {e}"
return {
"success": True,
"channels_total": len(raw) if isinstance(raw, dict) else 0,
"channels_empty": len(empty_channels),
"programs_total": total_programs,
"dates_found": list(epg_by_date.keys()),
"upload_results": upload_results,
"sample_channel": (lambda items: {
"ch_id": items[0][0],
"program_count": len(items[0][1].get('data', [])),
"first_program": items[0][1].get('data', [{}])[0] if items[0][1].get('data') else None
})(list(raw.items())) if isinstance(raw, dict) and raw else None,
}
@app.delete("/api/admin/archive/delete")
async def delete_archive(path: str):
item = archive_manager.db_get(path)
if not item:
return {"success": False, "error": "Not found"}
loop = asyncio.get_event_loop()
success = await loop.run_in_executor(None, archive_manager.delete_archive, item['path'], item['repo'])
return {"success": success}
@app.get("/api/admin/archive/diagnostic")
async def archive_diagnostic():
"""跨容器重启追踪任务事件. 看到 dashboard '空闲 等待任务开始' 时调这个,
能倒序看到 start / batch_done / retry_round_done / end / crash 事件时间线,
以此分辨 '真没跑' / '跟完了' / '中途被容器重启吃掉了'."""
return {"success": True, "logs": archive_manager.get_recent_task_logs(100)}
@app.post("/api/admin/archive/start")
async def start_manual_archive():
if archive_manager.progress["status"] == "running":
return {"success": False, "error": "Already running"}
asyncio.create_task(run_archive_task())
return {"success": True}
@app.post("/api/admin/archive/pause")
async def pause_archive():
if archive_manager.progress["status"] == "running":
archive_manager.progress["status"] = "pausing"
return {"success": True}
return {"success": False, "error": "Not running"}
@app.post("/api/admin/archive/resume")
async def resume_archive():
if archive_manager.progress["status"] == "paused":
archive_manager.progress["status"] = "running"
return {"success": True}
return {"success": False, "error": "Not paused"}
@app.post("/api/admin/archive/stop")
async def stop_archive():
if archive_manager.progress["status"] in ("running", "pausing", "paused"):
archive_manager.progress["status"] = "stopping"
return {"success": True}
return {"success": False, "error": "Not running"}
_archive_triggered = False
@app.get("/api/admin/archive/progress")
async def get_archive_progress():
return {"success": True, "progress": archive_manager.progress}
@app.on_event("startup")
async def startup_event():
scheduler.add_job(auto_archive_job, 'cron', hour=0, timezone='Asia/Tokyo')
scheduler.add_job(self_ping_job, 'interval', minutes=20)
scheduler.start()
# 信号 handler: HF Space 优雅关停 / healthcheck 早期警告会先发 SIGTERM 再 SIGKILL.
# 收到 SIGTERM 时立即写一条 signal_received 事件 + 当时 rss/disk 到 HF.
# 重启后查 /diagnostic 能区分: 看到 signal_received → HF 优雅关停 (常见于 sleep / 基础设施重启);
# 没有 signal_received 但任务戛然而止 → SIGKILL 硬杀 (通常 OOM / 磁盘满, 看最后 heartbeat 的 rss/disk).
import signal as _sig
def _on_term(signum, frame):
try: archive_manager.log_task_event("signal_received", f"signum={signum}")
except Exception: pass
try:
_sig.signal(_sig.SIGTERM, _on_term)
_sig.signal(_sig.SIGINT, _on_term)
except Exception: pass
# 自动续跑: HF Free Spaces 实测每 1-2 小时被基础设施 SIGTERM 重启一次, 代码层无法避免.
# 启动时查 task_log: 上一轮最后一条不是 end/crash 说明被中断, 30s 后自动调 run_archive_task.
# SQLite 中已成功项会被 existing_paths 去重跳过, 只跑未入库项 → 一直被杀一直续跑, 直到全部存档.
async def _maybe_auto_resume():
# 30s 缓冲: 等 HF Hub 初始化 / DB 从 HF 下载 / scheduler 启动完成, 避免 启动 → 立即调任务 反复冲突
await asyncio.sleep(30)
try:
if archive_manager.progress["status"] == "running": return # 别的路径已启动
unfinished = archive_manager.find_unfinished_task()
if not unfinished: return
try: archive_manager.log_task_event("auto_resume", f"prev_event={unfinished.get('event')} prev_success={unfinished.get('success')}/{unfinished.get('total')}")
except Exception: pass
await run_archive_task()
except Exception as _e:
try: archive_manager.log_task_event("auto_resume_failed", str(_e)[:200])
except Exception: pass
asyncio.create_task(_maybe_auto_resume())
# 启动时清理上次容器崩溃/SIGKILL 遗留的孤儿临时文件,防止磁盘慢慢积满
def _cleanup_orphan_tmp():
import glob
cleaned = 0
for pattern in ["/tmp/*.ts", "/tmp/*.segs"]:
for path in glob.glob(pattern):
try:
if os.path.isfile(path):
os.remove(path)
elif os.path.isdir(path):
import shutil
shutil.rmtree(path, ignore_errors=True)
cleaned += 1
except Exception:
pass
if cleaned:
try: archive_manager.log_task_event("startup_cleanup", f"removed={cleaned} orphan tmp files")
except Exception: pass
try:
import threading as _t
_t.Thread(target=_cleanup_orphan_tmp, daemon=True).start()
except Exception:
pass
@app.on_event("shutdown")
async def shutdown_event():
if cache.storage_type == 'disk':
cache._save_to_disk(force=True)
async def auto_archive_job():
await run_archive_task()
async def self_ping_job():
"""每 20 分钟 self-ping,防止 HF Free Space 因无流量被 sleep"""
try:
async with httpx.AsyncClient(timeout=10.0) as _c:
await _c.get("http://localhost:7860/ping")
except Exception:
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860, log_level="error")