| import time
|
| import json
|
| import secrets
|
| import asyncio
|
| import httpx
|
| import requests
|
| import os
|
| import io
|
| import re
|
| import hashlib
|
| import threading
|
| import traceback
|
| import tempfile
|
| from concurrent.futures import ThreadPoolExecutor
|
| from pathlib import Path
|
| from datetime import datetime, timedelta, timezone
|
| from urllib.parse import quote, urljoin, urlparse
|
| from fastapi import FastAPI, Request, Header, Depends, HTTPException, status
|
| from fastapi.responses import JSONResponse, Response, FileResponse, StreamingResponse
|
| from fastapi.staticfiles import StaticFiles
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from fastapi.middleware.gzip import GZipMiddleware
|
| from pydantic import BaseModel
|
| from typing import Optional, List
|
| from config import Config
|
| from cache_manager import cache
|
| from user_manager import user_manager, User, AVAILABLE_BADGES
|
| from proxy_handler import proxy_media, proxy_live_stream_direct, proxy_playback_stream, get_live_m3u8_url
|
| from utils import get_auth, get_channels, get_jst_date, fetch_epg, get_all_epg
|
| from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
| from archiver_manager import archive_manager
|
| app = FastAPI(title=Config.APP_NAME, version=Config.APP_VERSION, description=Config.APP_DESCRIPTION)
|
| scheduler = AsyncIOScheduler()
|
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["Content-Length", "Content-Range", "Accept-Ranges", "Content-Disposition"])
|
| if Config.ENABLE_GZIP: app.add_middleware(GZipMiddleware, minimum_size=1000)
|
| static_path = Path(__file__).parent / "static"
|
| if static_path.exists(): app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
|
| admin_tokens = {}
|
| def create_admin_token() -> str:
|
| token = secrets.token_urlsafe(32)
|
| admin_tokens[token] = datetime.now() + timedelta(hours=24)
|
| return token
|
| def verify_admin_token(token: str) -> bool:
|
| if not token: return False
|
| now = datetime.now()
|
| expired = [t for t, exp in admin_tokens.items() if exp < now]
|
| for t in expired: del admin_tokens[t]
|
| if token not in admin_tokens: return False
|
| if now > admin_tokens[token]:
|
| del admin_tokens[token]
|
| return False
|
| return True
|
| def get_admin_token(authorization: Optional[str]) -> Optional[str]:
|
| if not authorization: return None
|
| return authorization[7:] if authorization.startswith("Bearer ") else authorization
|
| def get_current_admin_token(authorization: Optional[str] = Header(None)) -> str:
|
| token = get_admin_token(authorization)
|
| if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="No token provided")
|
| if not verify_admin_token(token): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
|
| return token
|
| def _parse_epg_time(t):
|
| if not t: return None
|
| try:
|
| t_str = str(t).strip()
|
| jst = timezone(timedelta(hours=9))
|
| if len(t_str) == 14 and t_str.isdigit():
|
| return datetime.strptime(t_str, "%Y%m%d%H%M%S").replace(tzinfo=jst).timestamp()
|
| val = float(t_str)
|
| return val / 1000.0 if val > 9999999999 else val
|
| except:
|
| return None
|
| class PasswordVerify(BaseModel):
|
| username: str
|
| password_hash: str
|
| class AdminLogin(BaseModel):
|
| username: str
|
| password_hash: str
|
| class CreateUserRequest(BaseModel):
|
| username: str
|
| password: Optional[str] = None
|
| expires_days: Optional[int] = None
|
| notes: str = ""
|
| badge: Optional[str] = None
|
| is_admin: bool = False
|
| class ExtendExpiryRequest(BaseModel): days: int
|
| class SetBadgeRequest(BaseModel): badge: Optional[str] = None
|
| class UserSettings(BaseModel):
|
| favorite_channels: Optional[List[str]] = None
|
| playback_history: Optional[dict] = None
|
| program_reminders: Optional[List[dict]] = None
|
| download_concurrency: Optional[int] = None
|
| batch_download_concurrency: Optional[int] = None
|
| fab_position: Optional[dict] = None
|
| other_settings: Optional[dict] = None
|
| @app.middleware("http")
|
| async def protocol_middleware(request: Request, call_next):
|
| forwarded_proto = request.headers.get('X-Forwarded-Proto', '')
|
| forwarded_host = request.headers.get('X-Forwarded-Host', '')
|
| forwarded_port = request.headers.get('X-Forwarded-Port', '')
|
| if forwarded_proto: request.scope['scheme'] = forwarded_proto
|
| if forwarded_host:
|
| port = 443 if forwarded_proto == 'https' else 80
|
| if forwarded_port:
|
| try: port = int(forwarded_port)
|
| except: pass
|
| request.scope['server'] = (forwarded_host, port)
|
| return await call_next(request)
|
| @app.middleware("http")
|
| async def performance_middleware(request: Request, call_next):
|
| start_time = time.time()
|
| response = await call_next(request)
|
| response.headers["X-Response-Time"] = f"{int((time.time() - start_time) * 1000)}ms"
|
| if request.url.path.startswith('/static/'):
|
|
|
| if '/templates/' in request.url.path or request.url.path.endswith('.html'):
|
| response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
|
| response.headers['Pragma'] = 'no-cache'
|
| response.headers['Expires'] = '0'
|
| else:
|
| response.headers['Cache-Control'] = 'public, max-age=86400'
|
| if request.url.path.startswith('/api/') or request.url.path.startswith('/live/') or request.url.path.startswith('/vod/'):
|
| response.headers['Access-Control-Allow-Origin'] = '*'
|
| response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS, DELETE'
|
| response.headers['Access-Control-Allow-Headers'] = 'Authorization, Content-Type, Range'
|
| return response
|
| @app.get("/")
|
| @app.get("/channels")
|
| @app.get("/player")
|
| @app.get("/epg")
|
| @app.get("/archive")
|
| @app.get("/cache")
|
| @app.get("/api-test")
|
| async def render_frontend():
|
| html_path = Path(__file__).parent / "static" / "index.html"
|
| return FileResponse(html_path) if html_path.exists() else {"message": "Frontend not found"}
|
| @app.get("/admin")
|
| async def admin_page():
|
| html_path = Path(__file__).parent / "static" / "admin.html"
|
| return FileResponse(html_path) if html_path.exists() else {"message": "Not found"}
|
| @app.get("/admin/login")
|
| async def admin_login_page():
|
| html_path = Path(__file__).parent / "static" / "admin-login.html"
|
| return FileResponse(html_path) if html_path.exists() else {"message": "Not found"}
|
| @app.post("/api/verify-password")
|
| async def verify_password(data: PasswordVerify):
|
| try:
|
| if data.username == Config.ADMIN_USERNAME and data.password_hash == Config.ADMIN_PASSWORD_HASH:
|
| return {"success": True, "message": "Admin login successful", "user": {"username": data.username, "is_admin": True, "badge": None}}
|
| if data.username and user_manager.verify_user(data.username, data.password_hash):
|
| user = user_manager.get_user(data.username)
|
| if not user: return {"success": False, "message": "User not found"}
|
| return {"success": True, "message": "User login successful", "user": {"username": data.username, "is_admin": user.is_admin, "badge": user.badge}, "user_data": user_manager.get_user_data(data.username)}
|
| return {"success": False, "message": "Invalid username or password"}
|
| except Exception as e:
|
| return JSONResponse(content={"success": False, "message": str(e)}, status_code=500)
|
| @app.get("/api/badges")
|
| async def get_badges():
|
| return {"success": True, "badges": AVAILABLE_BADGES}
|
| @app.post("/api/admin/login")
|
| async def admin_login(data: AdminLogin):
|
| try:
|
| if data.username == Config.ADMIN_USERNAME and data.password_hash == Config.ADMIN_PASSWORD_HASH:
|
| return {"success": True, "token": create_admin_token(), "message": "Login successful"}
|
| return JSONResponse(content={"success": False, "message": "Invalid credentials"}, status_code=401)
|
| except Exception as e:
|
| return JSONResponse(content={"success": False, "message": str(e)}, status_code=500)
|
| @app.get("/api/admin/check")
|
| async def admin_check(authorization: Optional[str] = Header(None)):
|
| token = get_admin_token(authorization)
|
| if token and verify_admin_token(token): return {"authenticated": True}
|
| return JSONResponse(content={"authenticated": False}, status_code=401)
|
| @app.get("/api/admin/badges")
|
| async def admin_get_badges(token: str = Depends(get_current_admin_token)):
|
| return {"success": True, "badges": AVAILABLE_BADGES}
|
| @app.get("/api/admin/stats")
|
| async def admin_stats(token: str = Depends(get_current_admin_token)):
|
| return user_manager.get_stats()
|
| @app.get("/api/admin/users")
|
| async def admin_list_users(token: str = Depends(get_current_admin_token)):
|
| users = user_manager.list_users()
|
| return {"success": True, "count": len(users), "users": [u.dict() for u in users]}
|
| @app.post("/api/admin/users")
|
| async def admin_create_user(data: CreateUserRequest, token: str = Depends(get_current_admin_token)):
|
| try:
|
| if len(user_manager.users) >= Config.MAX_USERS: return JSONResponse(content={"error": f"Maximum {Config.MAX_USERS} users allowed"}, status_code=400)
|
| user, plain_password = user_manager.create_user(username=data.username, password=data.password, expires_days=data.expires_days, notes=data.notes, badge=data.badge, is_admin=data.is_admin)
|
| return {"success": True, "user": user.dict(), "password": plain_password}
|
| except Exception as e:
|
| return JSONResponse(content={"error": str(e)}, status_code=500)
|
| @app.delete("/api/admin/users/{username}")
|
| async def admin_delete_user(username: str, token: str = Depends(get_current_admin_token)):
|
| if user_manager.delete_user(username):
|
| user_manager.delete_user_settings(username)
|
| return {"success": True, "message": f"User deleted"}
|
| return JSONResponse(content={"error": "User not found"}, status_code=404)
|
| @app.post("/api/admin/users/{username}/activate")
|
| async def admin_activate_user(username: str, token: str = Depends(get_current_admin_token)):
|
| if user_manager.activate_user(username): return {"success": True}
|
| return JSONResponse(content={"error": "User not found"}, status_code=404)
|
| @app.post("/api/admin/users/{username}/deactivate")
|
| async def admin_deactivate_user(username: str, token: str = Depends(get_current_admin_token)):
|
| if user_manager.deactivate_user(username): return {"success": True}
|
| return JSONResponse(content={"error": "User not found"}, status_code=404)
|
| @app.post("/api/admin/users/{username}/extend")
|
| async def admin_extend_expiry(username: str, data: ExtendExpiryRequest, token: str = Depends(get_current_admin_token)):
|
| if user_manager.extend_expiry(username, data.days): return {"success": True}
|
| return JSONResponse(content={"error": "User not found"}, status_code=404)
|
| @app.post("/api/admin/users/{username}/badge")
|
| async def admin_set_badge(username: str, data: SetBadgeRequest, token: str = Depends(get_current_admin_token)):
|
| if user_manager.set_badge(username, data.badge): return {"success": True}
|
| return JSONResponse(content={"error": "User not found"}, status_code=404)
|
| @app.get("/api/user/{username}/settings")
|
| async def get_user_settings(username: str):
|
| return {"success": True, "settings": user_manager.get_user_settings(username)}
|
| class UserDataSync(BaseModel):
|
| username: str
|
| data: dict
|
| @app.post("/api/user/data/sync")
|
| async def sync_user_data(payload: UserDataSync):
|
| if user_manager.update_user_data(payload.username, payload.data): return {"success": True}
|
| return JSONResponse(content={"success": False, "error": "Not found"}, status_code=404)
|
| class UserBehaviorLog(BaseModel):
|
| username: str
|
| action: str
|
| data: dict
|
| @app.post("/api/user/behavior/track")
|
| async def track_user_behavior(payload: UserBehaviorLog):
|
| try:
|
| user_data = user_manager.get_user_data(payload.username)
|
| if not user_data: return JSONResponse(content={"success": False, "error": "Not found"}, status_code=404)
|
| update_data = {}
|
| if payload.action == 'play':
|
| playback_history = user_data.get('playback_history', [])
|
| playback_history.insert(0, {'timestamp': datetime.now(timezone(timedelta(hours=9))).isoformat(), 'channel_id': payload.data.get('channel_id'), 'channel_name': payload.data.get('channel_name'), 'duration': payload.data.get('duration', 0)})
|
| update_data['playback_history'] = playback_history[:100]
|
| elif payload.action == 'favorite':
|
| update_data['favorite_channels'] = payload.data.get('favorite_channels', [])
|
| elif payload.action == 'setting_change':
|
| for k, v in payload.data.items():
|
| if k in ['download_concurrency', 'batch_download_concurrency', 'fab_position']:
|
| update_data[k] = v
|
| elif payload.action == 'reminder':
|
| update_data['program_reminders'] = payload.data.get('program_reminders', [])
|
| if update_data and user_manager.update_user_data(payload.username, update_data): return {"success": True}
|
| return JSONResponse(content={"success": False, "error": "Invalid"}, status_code=400)
|
| except Exception as e:
|
| return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
|
| @app.get("/ping")
|
| async def ping():
|
| """UptimeRobot / 自 ping 保活用,返回 200 即可"""
|
| return {"ok": True}
|
|
|
| @app.get("/health")
|
| async def health_check():
|
| stats = cache.get_stats()
|
| is_valid, missing = Config.validate()
|
| return {"status": "running" if is_valid else "configuration_error", "config_valid": is_valid, "total_users": len(user_manager.users), "cache": stats}
|
| @app.get("/api/refresh")
|
| async def refresh_cache(type: str = "all"):
|
| cache.clear_cache(type)
|
| if type in ['auth', 'all']: await get_auth(force=True)
|
| elif type == 'cid':
|
| from utils import get_cid
|
| await get_cid(force=True)
|
| return {"success": True}
|
| @app.get("/api/list")
|
| async def list_channels(request: Request):
|
| auth = await get_auth()
|
| channels = await get_channels(auth)
|
| worker_base = f"{request.url.scheme}://{request.url.netloc}"
|
| return {"success": True, "channels": [{**ch, "playUrl": f"{worker_base}/api/live/{ch['no']}"} for ch in channels]}
|
| @app.get("/api/epg")
|
| async def get_epg(vid: str, date: str):
|
| auth = await get_auth()
|
| epg_data = await fetch_epg(vid, date, auth)
|
| return {"success": True, "epg": epg_data}
|
| @app.get("/api/epg/all")
|
| async def get_all_epg_data():
|
| auth = await get_auth()
|
| return {"success": True, "data": await get_all_epg(auth, force=False)}
|
| @app.get("/api/live/{chid}")
|
| async def live_stream_info(chid: str, request: Request):
|
| auth = await get_auth()
|
| channels = await get_channels(auth)
|
| channel = next((ch for ch in channels if str(ch['no']) == chid), None)
|
| if not channel: return JSONResponse(content={"success": False, "error": "Not found"}, status_code=404)
|
| worker_base = f"{request.url.scheme}://{request.url.netloc}"
|
| return {"success": True, "stream": {"m3u8": f"{worker_base}/stream/live/{chid}.m3u8", "direct": await get_live_m3u8_url(chid, auth)}}
|
| @app.get("/stream/live/{chid}.m3u8")
|
| async def live_stream_m3u8(chid: str, request: Request):
|
| return await proxy_live_stream_direct(chid, request)
|
| @app.get("/api/epg/search")
|
| async def search_epg(keyword: str, days: int = 30):
|
| try:
|
| if not keyword:
|
| return JSONResponse(content={"success": False, "error": "Missing keyword"}, status_code=400)
|
| auth = await get_auth()
|
| channels_list = await get_channels(auth)
|
| channel_map = {ch['id']: ch for ch in channels_list}
|
| now = datetime.now()
|
| date_list = [get_jst_date(now - timedelta(days=i)) for i in range(days + 1)]
|
| results = []
|
| keyword_lower = keyword.lower()
|
| cache_hits = 0
|
| cache_misses = 0
|
| full_cache = cache.get_epg('_all_', 'full')
|
| if full_cache:
|
| for channel_id, programs in full_cache.items():
|
| channel_info = channel_map.get(channel_id)
|
| if not channel_info:
|
| continue
|
| for program in programs:
|
| program_time = program.get('time', 0)
|
| program_date = get_jst_date(datetime.fromtimestamp(program_time))
|
| if program_date not in date_list:
|
| continue
|
| title = program.get('title') or program.get('name') or ''
|
| if keyword_lower in title.lower():
|
| results.append({
|
| 'channel_id': channel_id,
|
| 'channel_name': channel_info['name'],
|
| 'channel_no': channel_info['no'],
|
| 'program': program,
|
| 'date': program_date
|
| })
|
| cache_hits += 1
|
| else:
|
| for channel_id, channel_info in channel_map.items():
|
| for date_str in date_list:
|
| cached_epg = cache.get_epg(channel_id, date_str)
|
| if cached_epg is not None:
|
| cache_hits += 1
|
| for program in cached_epg:
|
| title = program.get('title') or program.get('name') or ''
|
| if keyword_lower in title.lower():
|
| results.append({
|
| 'channel_id': channel_id,
|
| 'channel_name': channel_info['name'],
|
| 'channel_no': channel_info['no'],
|
| 'program': program,
|
| 'date': date_str
|
| })
|
| else:
|
| cache_misses += 1
|
| if cache_hits == 0 or cache_misses > cache_hits:
|
| asyncio.create_task(background_fetch_all_epg(auth))
|
| results.sort(key=lambda x: x['program']['time'], reverse=True)
|
| return {
|
| "success": True, "keyword": keyword, "days": days,
|
| "total": len(results), "results": results,
|
| "cache_stats": {
|
| "hits": cache_hits, "misses": cache_misses,
|
| "strategy": "full_cache" if full_cache else "partial_cache",
|
| "hit_rate": f"{cache_hits * 100 // (cache_hits + cache_misses) if (cache_hits + cache_misses) > 0 else 0}%"
|
| },
|
| "message": "后台正在缓存数据,下次搜索会更快" if not full_cache and cache_misses > 0 else None
|
| }
|
| except Exception as e:
|
| return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
|
| async def background_fetch_all_epg(auth: dict):
|
| try:
|
| await get_all_epg(auth, force=False)
|
| except Exception:
|
| pass
|
| @app.get("/api/playback/{path:path}")
|
| async def playback_stream_info(path: str, request: Request):
|
| try:
|
| auth = await get_auth()
|
| worker_base = f"{request.url.scheme}://{request.url.netloc}"
|
| clean_path = path.strip('/')
|
| if clean_path.startswith('/'):
|
| clean_path = clean_path[1:]
|
| if not clean_path.startswith('query/'):
|
| if '/' not in clean_path:
|
| clean_path = f"query/{clean_path}"
|
| return {
|
| "success": True,
|
| "playback": {
|
| "path": f"/{clean_path}",
|
| "m3u8": f"{worker_base}/stream/playback/{clean_path}.m3u8",
|
| "original_path": path
|
| },
|
| "info": {"protocol": request.url.scheme, "type": "playback"}
|
| }
|
| except Exception as e:
|
| return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
|
| @app.get("/stream/playback/{path:path}.m3u8")
|
| async def playback_stream_m3u8(path: str, request: Request):
|
| return await proxy_playback_stream(path, request)
|
| @app.get("/api/download/playback/")
|
| async def download_playback_by_path(request: Request, path: str, channel: str):
|
| try:
|
| auth = await get_auth()
|
| channels = await get_channels(auth)
|
| target_channel = next((ch for ch in channels if str(ch['no']) == str(channel)), None)
|
| if not target_channel:
|
| raise ValueError(f"频道 {channel} 不存在")
|
| clean_path = path.strip()
|
| if clean_path.startswith('/'):
|
| clean_path = clean_path[1:]
|
| if clean_path.startswith('query/'):
|
| clean_path = clean_path[6:]
|
| if clean_path.endswith('.m3u8'):
|
| clean_path = clean_path[:-6]
|
| program_title = "Unknown"
|
| program_time = None
|
| JST = timezone(timedelta(hours=9))
|
| now_jst = datetime.now(JST)
|
| for days_ago in range(0, 30):
|
| check_date_jst = now_jst - timedelta(days=days_ago)
|
| check_date = check_date_jst.strftime('%Y-%m-%d')
|
| try:
|
| epg_list = await fetch_epg(target_channel['id'], check_date, auth)
|
| if not epg_list:
|
| continue
|
| for prog in epg_list:
|
| if prog.get('path'):
|
| prog_path = prog['path'].strip()
|
| if prog_path.startswith('/'):
|
| prog_path = prog_path[1:]
|
| if prog_path.startswith('query/'):
|
| prog_path = prog_path[6:]
|
| if prog_path.endswith('.m3u8'):
|
| prog_path = prog_path[:-6]
|
| if prog_path == clean_path:
|
| program_title = prog.get('title') or prog.get('name') or 'Unknown'
|
| program_time = datetime.fromtimestamp(prog['time'], tz=JST)
|
| break
|
| if program_time:
|
| break
|
| except Exception:
|
| continue
|
| if not program_time:
|
| program_time = now_jst
|
| program_title = f"Playback_{target_channel['name']}"
|
| def clean_text(text):
|
| text = str(text).strip()
|
| cleaned = re.sub(r'[<>:"/\\|?*]', '_', text)
|
| cleaned = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', cleaned)
|
| cleaned = re.sub(r'_+', '_', cleaned)
|
| cleaned = cleaned.strip('_').strip()
|
| if len(cleaned) > 150:
|
| cleaned = cleaned[:150]
|
| return cleaned if cleaned else "unknown"
|
| time_str = program_time.strftime('%Y%m%d_%H%M')
|
| channel_name = clean_text(target_channel['name'])
|
| program_name = clean_text(program_title)
|
| filename = f"{time_str}_{channel_name}_{program_name}.ts"
|
| playback_path = path.strip()
|
| if playback_path.startswith('/'):
|
| playback_path = playback_path[1:]
|
| if not playback_path.startswith('query/'):
|
| playback_path = f"query/{playback_path}"
|
| vod_host = Config.UPSTREAM_HOSTS['vod']
|
| access_token = quote(auth['access_token'])
|
| upstream_m3u8 = f"{vod_host}/{playback_path}.m3u8?type=vod&__cross_domain_user={access_token}"
|
| headers = {'Referer': Config.REQUIRED_REFERER, 'User-Agent': 'Mozilla/5.0'}
|
| async with httpx.AsyncClient(timeout=30.0) as client:
|
| resp = await client.get(upstream_m3u8, headers=headers)
|
| if resp.status_code != 200:
|
| raise Exception(f"M3U8获取失败: HTTP {resp.status_code}")
|
| m3u8_content = resp.text
|
| from utils import extract_playlist_url
|
| playlist_url = extract_playlist_url(m3u8_content, upstream_m3u8)
|
| if not playlist_url or playlist_url == upstream_m3u8:
|
| playlist_content = m3u8_content
|
| playlist_url = upstream_m3u8
|
| else:
|
| async with httpx.AsyncClient(timeout=30.0) as client:
|
| resp = await client.get(playlist_url, headers=headers)
|
| if resp.status_code != 200:
|
| raise Exception(f"播放列表获取失败: HTTP {resp.status_code}")
|
| playlist_content = resp.text
|
| base_url = playlist_url.rsplit('/', 1)[0]
|
| ts_urls = []
|
| for line in playlist_content.split('\n'):
|
| line = line.strip()
|
| if line and not line.startswith('#'):
|
| ts_urls.append(line if line.startswith('http') else f"{base_url}/{line}")
|
| if len(ts_urls) == 0:
|
| raise Exception("未找到TS分段")
|
| async def download_concurrent():
|
| async def fetch_segment(client, url, retries=3):
|
| for attempt in range(retries):
|
| try:
|
| r = await client.get(url, headers=headers, timeout=60.0)
|
| if r.status_code == 200:
|
| return r.content
|
| if r.status_code in (429, 503) and attempt < retries - 1:
|
| await asyncio.sleep(1.5 * (attempt + 1))
|
| except Exception:
|
| if attempt < retries - 1:
|
| await asyncio.sleep(0.5 * (attempt + 1))
|
| return None
|
| batch_size = 10
|
| fail_count = 0
|
| fail_limit = max(1, len(ts_urls) // 10)
|
| async with httpx.AsyncClient(timeout=60.0, limits=httpx.Limits(max_keepalive_connections=20, max_connections=30)) as client:
|
| for i in range(0, len(ts_urls), batch_size):
|
| batch = ts_urls[i:i+batch_size]
|
| tasks = [fetch_segment(client, url) for url in batch]
|
| results = await asyncio.gather(*tasks)
|
| for content in results:
|
| if content is None:
|
| fail_count += 1
|
| if fail_count > fail_limit:
|
| return
|
| else:
|
| yield content
|
| encoded_filename = quote(filename)
|
| return StreamingResponse(
|
| download_concurrent(),
|
| media_type="video/mp2t",
|
| headers={
|
| "Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}",
|
| "Cache-Control": "no-cache",
|
| }
|
| )
|
| except Exception as e:
|
| return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
|
| @app.options("/live/{path:path}")
|
| @app.options("/vod/{path:path}")
|
| @app.options("/query/{path:path}")
|
| @app.options("/stream/{path:path}")
|
| @app.options("/api/{path:path}")
|
| async def options_handler():
|
| return Response(status_code=200, headers={'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS, DELETE', 'Access-Control-Allow-Headers': 'Authorization, Content-Type, Range'})
|
| @app.get("/live/{path:path}")
|
| async def proxy_live_media(path: str, request: Request):
|
| return await proxy_media(request, f"/live/{path}")
|
| @app.get("/vod/{path:path}")
|
| async def proxy_vod_media(path: str, request: Request):
|
| return await proxy_media(request, f"/vod/{path}")
|
| @app.get("/query/{path:path}")
|
| async def proxy_query_media(path: str, request: Request):
|
| return await proxy_media(request, f"/query/{path}")
|
| @app.exception_handler(404)
|
| async def not_found_handler(request: Request, exc):
|
| return JSONResponse(content={"error": "Not Found"}, status_code=404)
|
| @app.exception_handler(500)
|
| async def server_error_handler(request: Request, exc):
|
| return JSONResponse(content={"error": "Internal Server Error"}, status_code=500)
|
| async def internal_download_m3u8(m3u8_url: str, output_path: str, progress_dict: dict, _refresh_count: int = 0) -> bool:
|
| def update_msg(msg):
|
| if progress_dict is not None: progress_dict["current_task"] = msg
|
| def update_sub(done, total):
|
| if progress_dict is not None: progress_dict["ts_done"] = done; progress_dict["ts_total"] = total
|
| def _is_stopping():
|
|
|
|
|
| return progress_dict is not None and progress_dict.get("status") == "stopping"
|
| async def _refresh_url(url: str) -> str:
|
| try:
|
| new_auth = await get_auth(force=True)
|
| new_token = new_auth.get('access_token', '')
|
| if not new_token: return url
|
| from urllib.parse import urlparse, urlencode, parse_qs, urlunparse
|
| parsed = urlparse(url)
|
| qs = parse_qs(parsed.query, keep_blank_values=True)
|
| for key in list(qs.keys()):
|
| if 'user' in key.lower() or 'token' in key.lower(): qs[key] = [new_token]
|
| return urlunparse(parsed._replace(query=urlencode({k: v[0] for k, v in qs.items()})))
|
| except: return url
|
| try:
|
| if _is_stopping():
|
| update_msg("已停止")
|
| return False
|
| update_msg("正在请求源站 M3U8...")
|
| headers = {
|
| 'Referer': getattr(Config, 'REQUIRED_REFERER', 'http://vod.yoitv.com/'),
|
| 'Origin': 'http://vod.yoitv.com',
|
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
| }
|
| async with httpx.AsyncClient(timeout=30.0, verify=False, follow_redirects=True) as client:
|
| resp = await client.get(m3u8_url, headers=headers)
|
| if resp.status_code in (401, 403):
|
| _refresh_count += 1
|
| if _refresh_count > 5:
|
| update_msg(f"Token 持续无效,已达最大刷新次数 ({_refresh_count}),放弃")
|
| return False
|
| update_msg(f"Token 已过期(HTTP {resp.status_code}),第 {_refresh_count} 次刷新重试...")
|
| await asyncio.sleep(min(2 + _refresh_count, 6))
|
| m3u8_url = await _refresh_url(m3u8_url)
|
| return await internal_download_m3u8(m3u8_url, output_path, progress_dict, _refresh_count=_refresh_count)
|
| if resp.status_code in (429, 500, 502, 503, 504):
|
|
|
| for _ru in range(2):
|
| if _is_stopping():
|
| update_msg("已停止")
|
| return False
|
| await asyncio.sleep(5 * (_ru + 1))
|
| update_msg(f"上游 HTTP {resp.status_code}, 第 {_ru + 1}/2 次重试 m3u8...")
|
| try:
|
| resp = await client.get(m3u8_url, headers=headers)
|
| except Exception as _re:
|
| update_msg(f"重试 m3u8 异常: {str(_re)[:60]}")
|
| continue
|
| if resp.status_code == 200:
|
| break
|
| if resp.status_code != 200:
|
| update_msg(f"下载失败: 源站持续 HTTP {resp.status_code}")
|
| return False
|
| elif resp.status_code != 200:
|
| update_msg(f"下载失败: 源站拒绝访问 HTTP {resp.status_code}")
|
| return False
|
| m3u8_text = resp.text
|
| current_url = str(resp.url)
|
| parsed_url = urlparse(current_url)
|
| query_string = parsed_url.query
|
| playlist_url = current_url
|
| for line in m3u8_text.splitlines():
|
| line = line.strip()
|
| if line and not line.startswith('#') and '.m3u8' in line:
|
| playlist_url = urljoin(current_url, line)
|
| if query_string and '?' not in playlist_url:
|
| playlist_url += f"?{query_string}"
|
| break
|
| if playlist_url != current_url:
|
| update_msg("正在解析嵌套视频流...")
|
| resp = await client.get(playlist_url, headers=headers)
|
| if resp.status_code != 200:
|
| update_msg(f"嵌套流获取失败: HTTP {resp.status_code}")
|
| return False
|
| m3u8_text = resp.text
|
| current_url = str(resp.url)
|
| query_string = urlparse(current_url).query or query_string
|
| ts_urls = []
|
| for line in m3u8_text.splitlines():
|
| line = line.strip()
|
| if line and not line.startswith('#') and not line.upper().startswith('KEY'):
|
| ts_url = urljoin(current_url, line)
|
| if query_string and '?' not in ts_url:
|
| ts_url += f"?{query_string}"
|
| ts_urls.append(ts_url)
|
| total_ts = len(ts_urls)
|
| if total_ts == 0:
|
| update_msg("下载失败: 未在视频流中找到任何画面数据")
|
| return False
|
| update_sub(0, total_ts)
|
| update_msg(f"准备下载 {total_ts} 个切片 (0%)")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| seg_dir = output_path + ".segs"
|
| try: os.makedirs(seg_dir, exist_ok=True)
|
| except Exception: pass
|
| seg_paths = [os.path.join(seg_dir, f"seg_{i:06d}.ts") for i in range(total_ts)]
|
| seg_ok = [False] * total_ts
|
| def _cleanup_segs():
|
| for _p in seg_paths:
|
| try:
|
| if os.path.exists(_p): os.remove(_p)
|
| except Exception: pass
|
| try: os.rmdir(seg_dir)
|
| except Exception: pass
|
|
|
| semaphore = asyncio.Semaphore(32)
|
| downloaded = 0
|
| async with httpx.AsyncClient(timeout=httpx.Timeout(30.0, connect=10.0), verify=False, follow_redirects=True,
|
| limits=httpx.Limits(max_keepalive_connections=40, max_connections=64, keepalive_expiry=60.0)) as dl_client:
|
| async def _fetch_ts(index, url):
|
| nonlocal downloaded
|
| if _is_stopping(): return False
|
| async with semaphore:
|
| if _is_stopping(): return False
|
| for retry in range(5):
|
| if _is_stopping(): return False
|
| try:
|
|
|
| async with dl_client.stream("GET", url, headers=headers) as r:
|
| if r.status_code == 200:
|
| with open(seg_paths[index], 'wb') as sf:
|
| async for chunk in r.aiter_bytes(64 * 1024):
|
| if chunk: sf.write(chunk)
|
| seg_ok[index] = True
|
| downloaded += 1
|
| update_sub(downloaded, total_ts)
|
| if downloaded % 10 == 0 or downloaded == total_ts:
|
| update_msg(f"下载中 {downloaded}/{total_ts} ({int(downloaded/total_ts*100)}%)")
|
| return True
|
| if r.status_code in (429, 503) and retry < 4:
|
| await asyncio.sleep(1.5 * (retry + 1))
|
| continue
|
| except Exception:
|
|
|
| try:
|
| if os.path.exists(seg_paths[index]): os.remove(seg_paths[index])
|
| except Exception: pass
|
| if retry < 4:
|
| await asyncio.sleep(0.5 * (retry + 1))
|
| return False
|
| await asyncio.gather(*[_fetch_ts(i, u) for i, u in enumerate(ts_urls)])
|
| if _is_stopping():
|
| update_msg("已停止")
|
| _cleanup_segs()
|
| return False
|
| downloaded_count = sum(1 for ok in seg_ok if ok)
|
| if downloaded_count == 0:
|
| update_msg(f"下载失败: {total_ts} 个切片全部获取失败,文件为空")
|
| _cleanup_segs()
|
| return False
|
| update_msg(f"正在合并 {downloaded_count}/{total_ts} 个切片...")
|
|
|
|
|
|
|
| def _do_merge():
|
| try:
|
| with open(output_path, 'wb') as outfile:
|
| for i in range(total_ts):
|
| if seg_ok[i] and os.path.exists(seg_paths[i]):
|
| try:
|
| with open(seg_paths[i], 'rb') as sf:
|
|
|
| while True:
|
| chunk = sf.read(1024 * 1024)
|
| if not chunk: break
|
| outfile.write(chunk)
|
| except Exception: pass
|
|
|
| try:
|
| if os.path.exists(seg_paths[i]): os.remove(seg_paths[i])
|
| except Exception: pass
|
| try: os.rmdir(seg_dir)
|
| except Exception: pass
|
| except Exception: pass
|
| try:
|
| _loop = asyncio.get_event_loop()
|
| await _loop.run_in_executor(None, _do_merge)
|
| except Exception: pass
|
| update_sub(total_ts, total_ts)
|
| update_msg("视频下载完成,正在打包入库...")
|
| return True
|
| except Exception as e:
|
| update_msg(f"后端下载崩溃: {str(e)}")
|
| return False
|
| def _build_display_name(date_str: str, time_str: str, ch_name_clean: str, prog_name: str) -> str:
|
| return f"{date_str}_{time_str}_{ch_name_clean}_{prog_name}.ts"
|
| def _clean_name(s: str) -> str:
|
| return re.sub(r'[\\/*?:"<>|]', "", s).replace(" ", "_").strip(".")
|
| def _build_meta(date_str: str, time_str: str, ch_id: str, ch_name: str, ch_name_clean: str, prog_name: str, display_name: str, year: str, month: str, day: str, program: dict) -> dict:
|
| return {
|
| "display_name": display_name,
|
| "date_str": date_str,
|
| "time_str": time_str,
|
| "ch_id": ch_id,
|
| "ch_name": ch_name,
|
| "ch_name_clean": ch_name_clean,
|
| "prog_name": prog_name,
|
| "year": year, "month": month, "day": day,
|
| "program": program,
|
| }
|
| def _make_m3u8_url(program: dict, auth: dict) -> str:
|
| if program.get('path'):
|
| prog_path = program['path'].strip().lstrip('/')
|
| if not prog_path.startswith('query/'):
|
| prog_path = f"query/{prog_path}"
|
| if prog_path.endswith('.m3u8'):
|
| prog_path = prog_path[:-5]
|
| vod_host = Config.UPSTREAM_HOSTS.get('vod', '')
|
| access_token = quote(auth.get('access_token', ''), safe='')
|
| if vod_host and access_token:
|
| return f"{vod_host}/{prog_path}.m3u8?type=vod&__cross_domain_user={access_token}"
|
| return program.get('m3u8') or program.get('playUrl') or ""
|
| @app.get("/api/admin/archive/daily_epg")
|
| async def get_daily_epg_archive(date: str):
|
| auth = await get_auth()
|
| channels = await get_channels(auth)
|
| cmap = {}
|
| for c in channels:
|
| cmap[str(c.get('id'))] = c.get('name', '')
|
| cmap[str(c.get('no'))] = c.get('name', '')
|
| try:
|
| epg_data = await get_all_epg(auth, channels, date=date)
|
| except Exception:
|
| epg_data = {}
|
| has_data = any(
|
| (v.get("data") if isinstance(v, dict) else v)
|
| for v in (epg_data.values() if isinstance(epg_data, dict) else [])
|
| )
|
| if not has_data:
|
| fmt_date = f"{date[:4]}-{date[4:6]}-{date[6:8]}"
|
| _sem_epg = asyncio.Semaphore(8)
|
| async def _fetch_one_ch(ch):
|
| ch_id = str(ch.get('id', ''))
|
| if not ch_id:
|
| return None
|
| async with _sem_epg:
|
| try:
|
| progs = await fetch_epg(ch_id, fmt_date, auth)
|
| return (ch_id, progs) if progs else None
|
| except Exception:
|
| return None
|
| _epg_results = await asyncio.gather(*[_fetch_one_ch(c) for c in channels])
|
| for _r in _epg_results:
|
| if _r:
|
| epg_data[_r[0]] = {"data": _r[1]}
|
| jst = timezone(timedelta(hours=9))
|
| target_dt = datetime.strptime(date, "%Y%m%d").replace(tzinfo=jst)
|
| start_ts = target_dt.timestamp()
|
| end_ts = (target_dt + timedelta(days=1)).timestamp()
|
| year, month, day = date[:4], date[4:6], date[6:8]
|
| is_history = target_dt.date() < datetime.now(jst).date()
|
|
|
|
|
| _ = is_history
|
|
|
| existing_paths = {f['path']: f for f in archive_manager.list_by_date(date)}
|
| results = []
|
| seen_ch_clean = set()
|
| for cid, epg in (epg_data.items() if isinstance(epg_data, dict) else {}.items()):
|
| cid_str = str(cid)
|
| ch_name = cmap.get(cid_str, cid_str)
|
| ch_name_clean = _clean_name(ch_name)
|
| programs = epg.get('data', []) if isinstance(epg, dict) else (epg if isinstance(epg, list) else [])
|
| ch_results = []
|
| for p in programs:
|
| if not isinstance(p, dict):
|
| continue
|
| t_val = _parse_epg_time(p.get('time'))
|
| if t_val is not None and not (start_ts <= t_val < end_ts):
|
| continue
|
| try:
|
| dt = datetime.fromtimestamp(t_val, tz=jst) if t_val else datetime.now(jst)
|
| time_str = dt.strftime("%H%M")
|
| except Exception:
|
| time_str = "0000"
|
| prog_name = _clean_name(p.get('name') or p.get('title') or '未命名')
|
| display_name = _build_display_name(date, time_str, ch_name_clean, prog_name)
|
| md5 = archive_manager.display_name_to_md5(display_name)
|
| video_path = archive_manager.make_video_path(year, month, day, ch_name_clean, md5)
|
| is_archived = video_path in existing_paths
|
| repo = existing_paths[video_path]['repo'] if is_archived else None
|
| play_url = f"/api/archive/play?path={quote(video_path)}" if is_archived else ""
|
| download_url = f"/api/archive/download?path={quote(video_path)}&name={quote(display_name)}" if is_archived else ""
|
| m3u8_url = "" if is_archived else _make_m3u8_url(p, auth)
|
| ch_results.append({
|
| "time": time_str,
|
| "prog_name": prog_name,
|
| "display_name": display_name,
|
| "md5": md5,
|
| "is_archived": is_archived,
|
| "video_path": video_path,
|
| "repo": repo,
|
| "file_size": int(existing_paths[video_path].get('file_size', 0)) if is_archived else 0,
|
| "m3u8": m3u8_url,
|
| "play_url": play_url,
|
| "download_url": download_url,
|
| "program": p,
|
| })
|
| if ch_results:
|
| results.append({"ch_id": cid_str, "ch_name": ch_name, "programs": ch_results})
|
| seen_ch_clean.add(ch_name_clean)
|
| date_prefix = f"video_archives/{year}/{month}/{day}/"
|
| ch_extra = {}
|
| for vpath, finfo in existing_paths.items():
|
| if not vpath.startswith(date_prefix):
|
| continue
|
| rest = vpath[len(date_prefix):]
|
| parts = rest.split('/')
|
| if len(parts) != 2 or not parts[1].endswith('.ts'):
|
| continue
|
| ch_name_clean = parts[0]
|
| display_name = finfo.get('display_name', parts[1])
|
| prog_name_parsed = display_name
|
| time_str_parsed = "0000"
|
| try:
|
| stem = display_name[:-3] if display_name.endswith('.ts') else display_name
|
| stem_parts = stem.split('_', 3)
|
| if len(stem_parts) >= 4 and stem_parts[0] == date:
|
| time_str_parsed = stem_parts[1]
|
| prog_name_parsed = stem_parts[3]
|
| except Exception:
|
| pass
|
| video_path = vpath
|
| dl_url = f"/api/archive/download?path={quote(video_path)}&name={quote(display_name)}"
|
| entry = {
|
| "time": time_str_parsed,
|
| "prog_name": prog_name_parsed,
|
| "display_name": display_name,
|
| "md5": parts[1][:-3],
|
| "is_archived": True,
|
| "video_path": video_path,
|
| "repo": finfo.get('repo'),
|
| "file_size": int(finfo.get('file_size') or 0),
|
| "m3u8": "",
|
| "play_url": f"/api/archive/play?path={quote(video_path)}",
|
| "download_url": dl_url,
|
| "program": {},
|
| }
|
| ch_extra.setdefault(ch_name_clean, []).append(entry)
|
| result_map = {r['ch_name']: r for r in results}
|
| for r in results:
|
| ch_nc = _clean_name(r['ch_name'])
|
| if ch_nc not in ch_extra:
|
| continue
|
| existing_paths_in_ch = {p['video_path'] for p in r['programs']}
|
| for entry in ch_extra[ch_nc]:
|
| if entry['video_path'] not in existing_paths_in_ch:
|
| r['programs'].append(entry)
|
| r['programs'].sort(key=lambda x: x.get('time', '0000'))
|
| for ch_name_clean, entries in ch_extra.items():
|
| if ch_name_clean in seen_ch_clean:
|
| continue
|
| entries_sorted = sorted(entries, key=lambda x: x.get('time', '0000'))
|
| results.append({
|
| "ch_id": ch_name_clean,
|
| "ch_name": ch_name_clean,
|
| "programs": entries_sorted,
|
| })
|
| return {"success": True, "date": date, "data": results}
|
| class DownloadSingleReq(BaseModel):
|
| date_str: str
|
| ch_id: str
|
| ch_name: Optional[str] = None
|
| program: dict
|
| m3u8: str
|
| @app.post("/api/admin/archive/download_single")
|
| async def download_single_archive(req: DownloadSingleReq):
|
| async def _dl():
|
| archive_manager.progress.update({"status": "running", "total": 1, "current": 1, "success": 0, "fail": 0, "current_task": "正在启动单点下载..."})
|
| try:
|
| auth = await get_auth()
|
| channels = await get_channels(auth)
|
| cmap = {}
|
| for c in channels:
|
| cmap[str(c.get('id'))] = c.get('name', '')
|
| cmap[str(c.get('no'))] = c.get('name', '')
|
| ch_name = cmap.get(str(req.ch_id), str(req.ch_id))
|
| p = req.program
|
| jst = timezone(timedelta(hours=9))
|
| time_val = _parse_epg_time(p.get('time'))
|
| try:
|
| dt = datetime.fromtimestamp(time_val, tz=jst) if time_val else datetime.now(jst)
|
| time_str = dt.strftime("%H%M")
|
| except Exception:
|
| time_str = "0000"
|
| ch_name_clean = _clean_name(ch_name)
|
| prog_name = _clean_name(p.get('name') or p.get('title') or '未命名')
|
| year, month, day = req.date_str[:4], req.date_str[4:6], req.date_str[6:8]
|
| display_name = _build_display_name(req.date_str, time_str, ch_name_clean, prog_name)
|
| md5 = archive_manager.display_name_to_md5(display_name)
|
| video_path = archive_manager.make_video_path(year, month, day, ch_name_clean, md5)
|
| if archive_manager.db_exists(video_path):
|
| archive_manager.progress.update({"status": "idle", "success": 1, "fail": 0, "current_task": f"云端已存在: {display_name}"})
|
| return
|
| temp_ts = os.path.join(tempfile.gettempdir(), md5 + ".ts")
|
| success_dl = await internal_download_m3u8(req.m3u8, temp_ts, archive_manager.progress)
|
| if success_dl and os.path.exists(temp_ts):
|
| archive_manager.progress["current_task"] = f"正在上传至云端: {display_name}"
|
| meta = _build_meta(req.date_str, time_str, str(req.ch_id), ch_name, ch_name_clean, prog_name, display_name, year, month, day, p)
|
| ok = archive_manager.upload_video(temp_ts, year, month, day, ch_name_clean, display_name, meta)
|
| if ok:
|
| archive_manager.progress.update({"status": "idle", "success": 1, "fail": 0, "current_task": f"存档成功: {display_name}"})
|
| archive_manager.refresh_archived_cache()
|
| else:
|
| archive_manager.progress.update({"status": "idle", "success": 0, "fail": 1, "current_task": "上传失败"})
|
| if os.path.exists(temp_ts):
|
| os.remove(temp_ts)
|
| else:
|
| archive_manager.progress.update({"status": "idle", "success": 0, "fail": 1, "current_task": "下载失败"})
|
| if os.path.exists(temp_ts):
|
| os.remove(temp_ts)
|
| except Exception as e:
|
| archive_manager.progress.update({"status": "idle", "current_task": f"Error: {str(e)}"})
|
| asyncio.create_task(_dl())
|
| return {"success": True}
|
| async def run_archive_task():
|
| global _archive_triggered
|
| if archive_manager.progress["status"] == "running":
|
| return
|
| try:
|
| await _run_archive_task_inner()
|
| finally:
|
| _archive_triggered = False
|
| async def _run_archive_task_inner():
|
| archive_manager.progress.update({
|
| "status": "running", "total": 0, "current": 0,
|
| "success": 0, "fail": 0, "skip": 0,
|
| "current_task": "正在初始化...",
|
| "ts_done": 0, "ts_total": 0,
|
| "current_file": "", "current_ch": "", "downloading": 0,
|
| })
|
|
|
|
|
| archive_manager.log_task_event("start")
|
| try:
|
| auth = await get_auth()
|
| channels = await get_channels(auth)
|
| cmap = {}
|
| for c in channels:
|
| name = c.get('name', '')
|
| if name:
|
| cmap[str(c.get('id'))] = name
|
| cmap[str(c.get('no'))] = name
|
| jst = timezone(timedelta(hours=9))
|
| now_jst = datetime.now(jst)
|
| today_jst = get_jst_date()
|
| try:
|
| today_obj = datetime.strptime(today_jst, "%Y-%m-%d")
|
| except Exception:
|
| today_obj = datetime.strptime(today_jst, "%Y%m%d")
|
|
|
|
|
|
|
|
|
|
|
| archive_manager.progress["current_task"] = "正在加载已有存档索引..."
|
| loop = asyncio.get_event_loop()
|
| existing_paths, existing_display_names = await loop.run_in_executor(None, archive_manager.load_existing_sets)
|
| archive_manager.progress["current_task"] = f"已加载 {len(existing_paths)} 条索引, 准备拉取 EPG..."
|
| repo_id = archive_manager._select_repo()
|
| failed_set = await loop.run_in_executor(None, archive_manager.load_failed_set, repo_id)
|
| cutoff_date = (today_obj - timedelta(days=365)).strftime("%Y%m%d")
|
| failed_set = {fp for fp in failed_set if len(fp.split("/"))>=4 and fp.split("/")[1]+fp.split("/")[2]+fp.split("/")[3] > cutoff_date}
|
|
|
| archive_manager.progress["current_task"] = "正在拉取全部 EPG..."
|
| try:
|
| all_epg = await get_all_epg(auth, channels, date=None, force=True)
|
| except Exception:
|
| all_epg = {}
|
|
|
| jst_tz = timezone(timedelta(hours=9))
|
| today_str = today_obj.strftime("%Y%m%d")
|
| epg_by_date = {}
|
| for ch_id, epg in all_epg.items():
|
| programs = epg.get("data", []) if isinstance(epg, dict) else (epg if isinstance(epg, list) else [])
|
| for p in programs:
|
| t_val = _parse_epg_time(p.get("time"))
|
| if t_val is None:
|
| continue
|
| d_str = datetime.fromtimestamp(t_val, tz=jst_tz).strftime("%Y%m%d")
|
| if d_str > today_str:
|
| continue
|
| epg_by_date.setdefault(d_str, {}).setdefault(str(ch_id), {"data": []})["data"].append(p)
|
|
|
|
|
| try:
|
| hf_epg_dates = await loop.run_in_executor(None, archive_manager.list_epg_dates)
|
| missing_dates = sorted(hf_epg_dates - set(epg_by_date.keys()))
|
| missing_dates = [md for md in missing_dates if md <= today_str]
|
| if missing_dates:
|
| archive_manager.progress["current_task"] = f"正在恢复 {len(missing_dates)} 天历史 EPG..."
|
| async def _dl_epg(md):
|
| try:
|
| return md, await loop.run_in_executor(None, archive_manager.download_epg_json, md)
|
| except: return md, {}
|
| epg_results = await asyncio.gather(*[_dl_epg(md) for md in missing_dates])
|
| for md, hf_epg in epg_results:
|
| if hf_epg:
|
| for ch_id, ch_epg in hf_epg.items():
|
| progs = ch_epg.get("data", []) if isinstance(ch_epg, dict) else (ch_epg if isinstance(ch_epg, list) else [])
|
| if progs:
|
| epg_by_date.setdefault(md, {}).setdefault(str(ch_id), {"data": []})["data"].extend(progs)
|
| except Exception:
|
| pass
|
|
|
|
|
| full_28_dates = {(today_obj - timedelta(days=i)).strftime("%Y%m%d") for i in range(28)}
|
| missing_dates_28 = sorted(full_28_dates - set(epg_by_date.keys()))
|
| if missing_dates_28:
|
| archive_manager.progress["current_task"] = f"正在补全 {len(missing_dates_28)} 天 EPG..."
|
| async def _fetch_day_epg(md):
|
| fmt_date = f"{md[:4]}-{md[4:6]}-{md[6:8]}"
|
| result = {}
|
| sem = asyncio.Semaphore(5)
|
| async def _fetch_ch(ch):
|
| ch_id = str(ch.get('id', ''))
|
| if not ch_id: return
|
| if epg_by_date.get(md, {}).get(ch_id, {}).get("data"): return
|
| async with sem:
|
| try:
|
| progs = await fetch_epg(ch_id, fmt_date, auth)
|
| if progs: result.setdefault(ch_id, {"data": []})["data"].extend(progs)
|
| except: pass
|
| await asyncio.gather(*[_fetch_ch(ch) for ch in channels])
|
| return md, result
|
| day_results = await asyncio.gather(*[_fetch_day_epg(md) for md in missing_dates_28])
|
| for md, res in day_results:
|
| for ch_id, ch_data in res.items():
|
| epg_by_date.setdefault(md, {}).setdefault(ch_id, {"data": []})["data"].extend(ch_data["data"])
|
|
|
| sorted_dates = sorted(epg_by_date.keys())
|
| archive_manager.progress["current_task"] = f"正在同步 {len(sorted_dates)} 天 EPG..."
|
| try:
|
| await loop.run_in_executor(None, archive_manager.upload_epg_batch, epg_by_date)
|
| except Exception:
|
| pass
|
|
|
| tasks_to_run = []
|
| seen_video_paths = set()
|
| for d_str in sorted_dates:
|
| dt_obj_d = datetime.strptime(d_str, "%Y%m%d").replace(tzinfo=jst)
|
| start_ts = dt_obj_d.timestamp()
|
| end_ts = (dt_obj_d + timedelta(days=1)).timestamp()
|
| year, month, day = d_str[:4], d_str[4:6], d_str[6:8]
|
| for ch_id, epg in epg_by_date.get(d_str, {}).items():
|
| cid_str = str(ch_id)
|
| ch_name = cmap.get(cid_str, cid_str)
|
| ch_name_clean = _clean_name(ch_name)
|
| programs = epg.get('data', []) if isinstance(epg, dict) else (epg if isinstance(epg, list) else [])
|
| programs.sort(key=lambda p: _parse_epg_time(p.get('time')) or 0)
|
| for program in programs:
|
| if not isinstance(program, dict): continue
|
| t_val = _parse_epg_time(program.get('time'))
|
| if t_val is not None and not (start_ts <= t_val < end_ts): continue
|
| m3u8_url = _make_m3u8_url(program, auth)
|
| if not m3u8_url: continue
|
| try:
|
| dt = datetime.fromtimestamp(t_val, tz=jst) if t_val else datetime.now(jst)
|
| time_str = dt.strftime("%H%M")
|
| except: time_str = "0000"
|
| prog_name = _clean_name(program.get('name') or program.get('title') or '未命名')
|
| if '休止' in (program.get('name') or program.get('title') or ''): continue
|
| display_name = _build_display_name(d_str, time_str, ch_name_clean, prog_name)
|
| md5 = archive_manager.display_name_to_md5(display_name)
|
| video_path = archive_manager.make_video_path(year, month, day, ch_name_clean, md5)
|
|
|
| if video_path in existing_paths: continue
|
| if video_path in seen_video_paths: continue
|
| if display_name in existing_display_names: continue
|
| if video_path in failed_set: continue
|
| seen_video_paths.add(video_path)
|
| tasks_to_run.append({
|
| 'program': program, 'ch_id': cid_str, 'ch_name': ch_name,
|
| 'ch_name_clean': ch_name_clean, 'prog_name': prog_name,
|
| 'date_str': d_str, 'time_str': time_str,
|
| 'year': year, 'month': month, 'day': day,
|
| 'display_name': display_name, 'md5': md5,
|
| 'video_path': video_path, 'm3u8': m3u8_url,
|
| })
|
|
|
|
|
|
|
|
|
| now_ts_28 = datetime.now(jst).timestamp()
|
| cutoff_ts_28 = now_ts_28 - 28 * 86400
|
| def _task_ts(t):
|
| ts = _parse_epg_time(t['program'].get('time'))
|
| if ts is not None: return ts
|
|
|
| try:
|
| return datetime.strptime(t['date_str'], "%Y%m%d").replace(tzinfo=jst, hour=12).timestamp()
|
| except Exception: return 0.0
|
| tasks_to_run = [t for t in tasks_to_run if _task_ts(t) >= cutoff_ts_28]
|
|
|
| tasks_to_run.sort(key=_task_ts)
|
|
|
| total_tasks = len(tasks_to_run)
|
| archive_manager.progress["total"] = total_tasks
|
| if not tasks_to_run:
|
| archive_manager.progress.update({"status": "idle", "current_task": "无新任务"})
|
| return
|
|
|
| CONCURRENCY = Config.ARCHIVE_DOWNLOAD_CONCURRENCY
|
| INDEX_SAVE_INTERVAL = 50
|
| new_failures = set()
|
| CONSEC_FAIL_LIMIT = 10
|
| date_fail_counts = {}
|
| repos = archive_manager.repos if archive_manager.repos else [repo_id]
|
| repo_queues = {r: asyncio.Queue() for r in repos}
|
| upload_done = asyncio.Event()
|
| progress_lock = asyncio.Lock()
|
| repo_rr_idx = 0
|
|
|
| async def _upload_worker(worker_repo_id, q):
|
| pending_temp_files = []
|
| while True:
|
| try: item = q.get_nowait()
|
| except asyncio.QueueEmpty:
|
| if upload_done.is_set(): break
|
| try: item = await asyncio.wait_for(q.get(), timeout=1.0)
|
| except asyncio.TimeoutError:
|
| if upload_done.is_set(): break
|
| continue
|
| if item is None: break
|
| task_item = item['task']
|
| added_ok = archive_manager.add_to_repo_batch(
|
| worker_repo_id, item['temp_ts'], task_item['year'], task_item['month'],
|
| task_item['day'], task_item['ch_name_clean'], task_item['display_name'], item['meta'])
|
| if added_ok:
|
| pending_temp_files.append(item['temp_ts'])
|
| else:
|
|
|
|
|
|
|
| reput_ok = False
|
| try:
|
| for _alt in archive_manager.get_available_repos():
|
| if _alt == worker_repo_id: continue
|
| if _alt not in repo_queues: continue
|
| try:
|
| await repo_queues[_alt].put(item)
|
| reput_ok = True
|
| break
|
| except Exception:
|
| continue
|
| except Exception:
|
| pass
|
| if not reput_ok:
|
| def _single_upload():
|
| try:
|
| return archive_manager.upload_video(
|
| item['temp_ts'], task_item['year'], task_item['month'],
|
| task_item['day'], task_item['ch_name_clean'],
|
| task_item['display_name'], item['meta'], False)
|
| except Exception:
|
| return False
|
| ok_single = await loop.run_in_executor(None, _single_upload)
|
| async with progress_lock:
|
| if ok_single:
|
| archive_manager.progress["success"] += 1
|
| else:
|
| archive_manager.progress["fail"] += 1
|
| try:
|
| if os.path.exists(item['temp_ts']): os.remove(item['temp_ts'])
|
| except Exception:
|
| pass
|
| if archive_manager.repo_batch_ready(worker_repo_id):
|
| batch_n = archive_manager.repo_batch_size(worker_repo_id)
|
| ok = await loop.run_in_executor(None, archive_manager.flush_repo_batch, worker_repo_id)
|
| for tf in pending_temp_files:
|
| try:
|
| if os.path.exists(tf): os.remove(tf)
|
| except Exception:
|
| pass
|
| pending_temp_files.clear()
|
| async with progress_lock:
|
| archive_manager.progress["success"] += ok
|
| archive_manager.progress["fail"] += max(0, batch_n - ok)
|
| if archive_manager.progress["success"] % INDEX_SAVE_INTERVAL == 0:
|
| await loop.run_in_executor(None, archive_manager.flush_index)
|
| q.task_done()
|
| remaining = archive_manager.repo_batch_size(worker_repo_id)
|
| if remaining > 0:
|
| ok = await loop.run_in_executor(None, archive_manager.flush_repo_batch, worker_repo_id)
|
| async with progress_lock:
|
| archive_manager.progress["success"] += ok
|
| archive_manager.progress["fail"] += max(0, remaining - ok)
|
| for tf in pending_temp_files:
|
| try:
|
| if os.path.exists(tf): os.remove(tf)
|
| except Exception:
|
| pass
|
|
|
| upload_workers = [asyncio.create_task(_upload_worker(r, repo_queues[r])) for r in repos]
|
| dl_semaphore = asyncio.Semaphore(CONCURRENCY)
|
| completed_count = 0
|
| completed_lock = asyncio.Lock()
|
|
|
| async def _download_one(idx, task):
|
| nonlocal completed_count, repo_rr_idx
|
| if archive_manager.progress["status"] == "stopping": return
|
| d_str = task['date_str']
|
| if date_fail_counts.get(d_str, 0) >= CONSEC_FAIL_LIMIT:
|
| async with progress_lock:
|
| archive_manager.progress["skip"] += 1
|
|
|
|
|
|
|
|
|
|
|
| async with completed_lock:
|
| completed_count += 1; archive_manager.progress["current"] = completed_count
|
| return
|
| async with dl_semaphore:
|
|
|
|
|
|
|
| while archive_manager.progress["status"] in ("pausing", "paused"):
|
| archive_manager.progress["status"] = "paused"
|
| await asyncio.sleep(1)
|
| if archive_manager.progress["status"] == "stopping": return
|
| async with progress_lock:
|
| archive_manager.progress["downloading"] = archive_manager.progress.get("downloading", 0) + 1
|
| d_label = f"{task['date_str'][:4]}/{task['date_str'][4:6]}/{task['date_str'][6:8]}"
|
| archive_manager.progress["current_task"] = f"[{d_label}] {task['prog_name']}"
|
| archive_manager.progress["current_file"] = task['display_name']
|
| archive_manager.progress["current_ch"] = task['ch_name']
|
| temp_ts = os.path.join(tempfile.gettempdir(), task['md5'] + ".ts")
|
| try:
|
| success_dl = await internal_download_m3u8(task['m3u8'], temp_ts, archive_manager.progress)
|
| except Exception:
|
| success_dl = False
|
| async with completed_lock:
|
| completed_count += 1; archive_manager.progress["current"] = completed_count
|
|
|
|
|
|
|
| if completed_count % 25 == 0:
|
| try: archive_manager.log_task_event("heartbeat", f"completed={completed_count}/{archive_manager.progress.get('total',0)}")
|
| except Exception: pass
|
| if success_dl and os.path.exists(temp_ts):
|
| meta = _build_meta(
|
| task['date_str'], task['time_str'], task['ch_id'], task['ch_name'],
|
| task['ch_name_clean'], task['prog_name'], task['display_name'],
|
| task['year'], task['month'], task['day'], task['program'])
|
| existing_paths.add(task['video_path'])
|
| existing_display_names.add(task['display_name'])
|
| available_repos = archive_manager.get_available_repos()
|
| if not available_repos:
|
|
|
|
|
|
|
|
|
|
|
| def _refresh_all_repos():
|
| for _r in archive_manager.repos:
|
| try: archive_manager._force_refresh(_r)
|
| except Exception: pass
|
| try:
|
|
|
|
|
|
|
| await loop.run_in_executor(None, _refresh_all_repos)
|
| except Exception:
|
| pass
|
| available_repos = archive_manager.get_available_repos()
|
| if not available_repos:
|
| async with progress_lock:
|
| archive_manager.progress["fail"] += 1
|
| archive_manager.progress["current_task"] = f"所有 dataset 暂时无空位, 跳过单项继续: {task['display_name']}"
|
| try:
|
| if os.path.exists(temp_ts): os.remove(temp_ts)
|
| except: pass
|
| return
|
| target_repo = available_repos[repo_rr_idx % len(available_repos)]
|
| repo_rr_idx += 1
|
| await repo_queues[target_repo].put({'temp_ts': temp_ts, 'task': task, 'meta': meta})
|
| date_fail_counts[d_str] = 0
|
| else:
|
| async with progress_lock:
|
| if task['date_str'] <= cutoff_date:
|
| new_failures.add(task['video_path'])
|
| archive_manager.progress["skip"] += 1
|
| else:
|
| archive_manager.progress["fail"] += 1
|
| date_fail_counts[d_str] = date_fail_counts.get(d_str, 0) + 1
|
| try:
|
| if os.path.exists(temp_ts): os.remove(temp_ts)
|
| except: pass
|
| async with progress_lock:
|
| archive_manager.progress["downloading"] = max(0, archive_manager.progress.get("downloading", 1) - 1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| BATCH_SIZE = 50
|
| total_batches = (total_tasks + BATCH_SIZE - 1) // BATCH_SIZE
|
| for batch_start in range(0, total_tasks, BATCH_SIZE):
|
| if archive_manager.progress["status"] == "stopping": break
|
| batch = tasks_to_run[batch_start:batch_start + BATCH_SIZE]
|
| batch_no = batch_start // BATCH_SIZE + 1
|
|
|
|
|
| try:
|
|
|
|
|
| if batch_no > 1:
|
| archive_manager.progress["current_task"] = f"批次 {batch_no}/{total_batches}: 刷新 auth token..."
|
| try:
|
| auth = await get_auth(force=True)
|
| for t in batch:
|
| new_url = _make_m3u8_url(t['program'], auth)
|
| if new_url: t['m3u8'] = new_url
|
| except Exception: pass
|
|
|
| for _i in range(2):
|
| if archive_manager.progress["status"] == "stopping": break
|
| await asyncio.sleep(1)
|
| if archive_manager.progress["status"] == "stopping": break
|
| archive_manager.progress["current_task"] = f"批次 {batch_no}/{total_batches}: 下载 {len(batch)} 项"
|
|
|
| date_fail_counts.clear()
|
| batch_dl_tasks = [_download_one(batch_start + i, t) for i, t in enumerate(batch)]
|
| await asyncio.gather(*batch_dl_tasks)
|
|
|
| try: archive_manager.log_task_event("batch_done", f"batch={batch_no}/{total_batches}")
|
| except Exception: pass
|
| except Exception as _be:
|
| import traceback as _tb; _tb.print_exc()
|
| archive_manager.progress["current_task"] = f"批次 {batch_no} 异常已隔离, 继续下一批: {str(_be)[:80]}"
|
| continue
|
| upload_done.set()
|
| await asyncio.gather(*upload_workers)
|
|
|
|
|
|
|
| await loop.run_in_executor(None, archive_manager.flush_index)
|
| try: archive_manager.log_task_event("main_loop_done", f"batches={total_batches}")
|
| except Exception: pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| retry_round = 0
|
| retry_total_count = 0
|
| no_progress_rounds = 0
|
| prev_pending_count = -1
|
| while True:
|
| if archive_manager.progress["status"] == "stopping": break
|
|
|
| while archive_manager.progress["status"] in ("pausing", "paused"):
|
| archive_manager.progress["status"] = "paused"
|
| archive_manager.progress["current_task"] = "重试间隔已暂停, 等待恢复..."
|
| await asyncio.sleep(1)
|
| if archive_manager.progress["status"] == "stopping": break
|
| pending_retry = [t for t in tasks_to_run if not archive_manager.db_exists(t['video_path'])]
|
| now_ts_r = datetime.now(jst).timestamp()
|
| cutoff_ts_r = now_ts_r - 28 * 86400
|
| pending_retry = [t for t in pending_retry if _task_ts(t) >= cutoff_ts_r]
|
| if not pending_retry: break
|
|
|
|
|
|
|
|
|
|
|
| if prev_pending_count >= 0 and len(pending_retry) >= prev_pending_count:
|
| no_progress_rounds += 1
|
| else:
|
| no_progress_rounds = 0
|
| prev_pending_count = len(pending_retry)
|
| retry_round += 1
|
| archive_manager.progress["current_task"] = f"第 {retry_round} 轮重试: 刷新 auth token..."
|
| try:
|
| auth = await get_auth(force=True)
|
| rebuilt = 0
|
| for t in pending_retry:
|
| new_url = _make_m3u8_url(t['program'], auth)
|
| if new_url:
|
| t['m3u8'] = new_url
|
| rebuilt += 1
|
| archive_manager.progress["current_task"] = f"第 {retry_round} 轮: 已刷新 {rebuilt} 个 URL"
|
| except Exception as _e:
|
| archive_manager.progress["current_task"] = f"第 {retry_round} 轮: auth 刷新失败 {_e}"
|
| pending_retry_with_url = [t for t in pending_retry if t.get('m3u8')]
|
| if not pending_retry_with_url:
|
|
|
|
|
|
|
| archive_manager.progress["current_task"] = f"第 {retry_round} 轮: m3u8 全部重建失败, 60s 后再来一轮 ({len(pending_retry)} 项)"
|
| for _s in range(60):
|
| if archive_manager.progress["status"] == "stopping": break
|
| await asyncio.sleep(1)
|
| continue
|
| pending_retry = pending_retry_with_url
|
|
|
|
|
|
|
| if no_progress_rounds >= 3:
|
| backoff_sec = min(120 + 60 * (no_progress_rounds - 2), 600)
|
| else:
|
| backoff_sec = min(30 * retry_round, 120)
|
| for _i in range(backoff_sec):
|
| if archive_manager.progress["status"] == "stopping": break
|
|
|
| if archive_manager.progress["status"] in ("pausing", "paused"):
|
| await asyncio.sleep(1)
|
| continue
|
| archive_manager.progress["current_task"] = f"第 {retry_round} 轮: 等待 {backoff_sec - _i}s 后重试 ({len(pending_retry)} 项)"
|
| await asyncio.sleep(1)
|
| if archive_manager.progress["status"] == "stopping": break
|
| n_retry = len(pending_retry)
|
| retry_total_count += n_retry
|
| n_retry_skip = sum(1 for t in pending_retry if t['video_path'] in new_failures)
|
| n_retry_fail = n_retry - n_retry_skip
|
| archive_manager.progress["skip"] = max(0, archive_manager.progress.get("skip", 0) - n_retry_skip)
|
| archive_manager.progress["fail"] = max(0, archive_manager.progress["fail"] - n_retry_fail)
|
| archive_manager.progress["total"] += n_retry
|
| new_failures -= {t['video_path'] for t in pending_retry}
|
| archive_manager.progress["current_task"] = f"开始重试 {n_retry} 项 (第 {retry_round} 轮)"
|
|
|
| try:
|
| upload_done = asyncio.Event()
|
| repo_queues = {r: asyncio.Queue() for r in repos}
|
| upload_workers = [asyncio.create_task(_upload_worker(r, repo_queues[r])) for r in repos]
|
| date_fail_counts.clear()
|
| completed_count = 0
|
| archive_manager.progress["current"] = 0
|
| retry_dl_tasks = [_download_one(0, t) for t in pending_retry]
|
| await asyncio.gather(*retry_dl_tasks)
|
| upload_done.set()
|
| await asyncio.gather(*upload_workers)
|
|
|
| await loop.run_in_executor(None, archive_manager.flush_index)
|
| try: archive_manager.log_task_event("retry_round_done", f"round={retry_round}, n_retry={n_retry}")
|
| except Exception: pass
|
| except Exception as _re:
|
| import traceback as _tb; _tb.print_exc()
|
| archive_manager.progress["current_task"] = f"重试第 {retry_round} 轮异常已隔离, 继续下一轮: {str(_re)[:80]}"
|
| try: upload_done.set()
|
| except Exception: pass
|
| continue
|
| if new_failures or retry_total_count > 0:
|
| merged = failed_set | new_failures
|
|
|
| succeeded_now = {t['video_path'] for t in tasks_to_run if archive_manager.db_exists(t['video_path'])}
|
| merged -= succeeded_now
|
| for r in repos:
|
| await loop.run_in_executor(None, archive_manager.save_failed_set, r, merged)
|
| s = archive_manager.progress
|
| was_stopped = (s["status"] == "stopping")
|
| status_word = "已停止" if was_stopped else "完成"
|
| retry_hint = f" [重试 {retry_total_count}]" if retry_total_count > 0 else ""
|
| summary = f"{status_word}: 成功 {s['success']}, 失败 {s['fail']}, 过期跳过 {s.get('skip',0)}, 共 {s['total']}{retry_hint}"
|
| archive_manager.progress["current_task"] = summary
|
| archive_manager.progress["current_file"] = ""
|
| archive_manager.progress["ts_done"] = 0
|
| archive_manager.progress["ts_total"] = 0
|
| archive_manager.progress["downloading"] = 0
|
| try: archive_manager.log_task_event("end", summary)
|
| except Exception: pass
|
|
|
| await loop.run_in_executor(None, archive_manager.flush_index)
|
| archive_manager.refresh_archived_cache()
|
| except Exception as e:
|
| archive_manager.progress["current_task"] = f"任务崩溃: {str(e)}"
|
| import traceback as _tb; _tb.print_exc()
|
| try: archive_manager.log_task_event("crash", str(e)[:300])
|
| except Exception: pass
|
| finally:
|
| archive_manager.progress.update({"status": "idle"})
|
|
|
| @app.get("/api/archive/url")
|
| async def get_archive_url(path: str):
|
| item = await archive_manager.get_by_path(path)
|
| if not item:
|
| raise HTTPException(status_code=404, detail="文件未找到")
|
| hf_url = archive_manager.get_hf_raw_url(item['repo'], item['path'])
|
| def _resolve_url():
|
| try:
|
| r = requests.head(hf_url, allow_redirects=True, timeout=15, headers={})
|
| return r.url, r.headers.get("content-length", "")
|
| except Exception:
|
| return hf_url, ""
|
| loop = asyncio.get_event_loop()
|
| final_url, content_length = await loop.run_in_executor(None, _resolve_url)
|
| return {
|
| "url": final_url,
|
| "content_length": content_length,
|
| "display_name": item.get('display_name', ''),
|
| }
|
| @app.get("/api/archive/play")
|
| async def play_archive(path: str, request: Request):
|
|
|
| item = await archive_manager.get_by_path(path)
|
| if not item:
|
| raise HTTPException(status_code=404, detail="文件未找到")
|
| hf_url = archive_manager.get_hf_raw_url(item['repo'], item['path'])
|
| hf_h = {}
|
| rng = request.headers.get("Range")
|
| if rng:
|
| hf_h["Range"] = rng
|
| client = httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=10.0), follow_redirects=True)
|
| try:
|
| upstream = await client.send(client.build_request("GET", hf_url, headers=hf_h), stream=True)
|
| except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.ConnectError, httpx.RemoteProtocolError) as e:
|
| await client.aclose()
|
| return JSONResponse(content={"success": False, "error": f"上游连接失败: {type(e).__name__}"}, status_code=504)
|
| except Exception as e:
|
| await client.aclose()
|
| return JSONResponse(content={"success": False, "error": f"上游请求异常: {str(e)[:120]}"}, status_code=502)
|
| resp_h = {}
|
| upstream_ce = upstream.headers.get("Content-Encoding", "").lower()
|
| pass_len = (not upstream_ce) or upstream_ce == "identity"
|
| for k in ["Content-Type", "Content-Range", "Accept-Ranges", "ETag", "Last-Modified"]:
|
| if k in upstream.headers:
|
| resp_h[k] = upstream.headers[k]
|
| if pass_len and "Content-Length" in upstream.headers:
|
| resp_h["Content-Length"] = upstream.headers["Content-Length"]
|
| resp_h["Accept-Ranges"] = "bytes"
|
|
|
|
|
| resp_h["Content-Encoding"] = "identity"
|
| async def _it():
|
| try:
|
| async for chunk in upstream.aiter_bytes(1024 * 1024):
|
| if chunk:
|
| yield chunk
|
| except Exception:
|
| return
|
| finally:
|
| try: await upstream.aclose()
|
| except: pass
|
| try: await client.aclose()
|
| except: pass
|
| return StreamingResponse(_it(), status_code=upstream.status_code, headers=resp_h, media_type="video/MP2T")
|
| @app.get("/api/archive/download")
|
| async def download_archive(path: str, name: Optional[str] = None):
|
|
|
| item = await archive_manager.get_by_path(path)
|
| if not item:
|
| raise HTTPException(status_code=404, detail="文件未找到")
|
| hf_url = archive_manager.get_hf_raw_url(item['repo'], item['path'])
|
| filename = name or item.get('display_name') or os.path.basename(path)
|
| if not filename.endswith('.ts'):
|
| filename += '.ts'
|
|
|
| ascii_filename = re.sub(r'[^\x20-\x7e]', '_', filename).strip().strip('"') or 'archive.ts'
|
| encoded_name = quote(filename, safe='')
|
| client = httpx.AsyncClient(timeout=httpx.Timeout(300.0, connect=10.0), follow_redirects=True)
|
| try:
|
| upstream = await client.send(client.build_request("GET", hf_url), stream=True)
|
| except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.ConnectError, httpx.RemoteProtocolError) as e:
|
| await client.aclose()
|
| return JSONResponse(content={"success": False, "error": f"上游连接失败: {type(e).__name__}"}, status_code=504)
|
| except Exception as e:
|
| await client.aclose()
|
| return JSONResponse(content={"success": False, "error": f"上游请求异常: {str(e)[:120]}"}, status_code=502)
|
| resp_h = {}
|
| upstream_ce = upstream.headers.get("Content-Encoding", "").lower()
|
|
|
| if (not upstream_ce or upstream_ce == "identity") and "Content-Length" in upstream.headers:
|
| resp_h["Content-Length"] = upstream.headers["Content-Length"]
|
| if "ETag" in upstream.headers:
|
| resp_h["ETag"] = upstream.headers["ETag"]
|
| resp_h["Content-Disposition"] = f"attachment; filename=\"{ascii_filename}\"; filename*=UTF-8''{encoded_name}"
|
| resp_h["Accept-Ranges"] = "bytes"
|
|
|
|
|
| resp_h["Content-Encoding"] = "identity"
|
| async def _it():
|
| try:
|
| async for chunk in upstream.aiter_bytes(1024 * 1024):
|
| if chunk:
|
| yield chunk
|
| except Exception:
|
| return
|
| finally:
|
| try: await upstream.aclose()
|
| except: pass
|
| try: await client.aclose()
|
| except: pass
|
| return StreamingResponse(_it(), status_code=upstream.status_code, headers=resp_h, media_type="application/octet-stream")
|
| @app.get("/api/admin/archive/list")
|
| async def list_archives():
|
| return {"success": True, "files": await archive_manager.list_all_archives()}
|
| @app.get("/api/admin/archive/stats")
|
| async def archive_stats():
|
| """轻量存档统计: SQLite 聚合 + HF dataset 真实仓体积 (均走索引/缓存, 不扫 HF 文件列表)"""
|
| loop = asyncio.get_event_loop()
|
| def _stats():
|
| with archive_manager._db_lock:
|
| cur = archive_manager._db.execute("SELECT COUNT(*), COALESCE(SUM(file_size),0) FROM archives")
|
| total_count, total_size_db = cur.fetchone()
|
| cur = archive_manager._db.execute(
|
| "SELECT date, COUNT(*), COALESCE(SUM(file_size),0) FROM archives WHERE date != '' GROUP BY date ORDER BY date DESC LIMIT 30")
|
| by_date = [{"date": r[0], "count": r[1], "size": int(r[2] or 0)} for r in cur.fetchall()]
|
| cur = archive_manager._db.execute(
|
| "SELECT channel, COUNT(*), COALESCE(SUM(file_size),0) FROM archives WHERE channel != '' GROUP BY channel ORDER BY 2 DESC LIMIT 50")
|
| by_channel = [{"channel": r[0], "count": r[1], "size": int(r[2] or 0)} for r in cur.fetchall()]
|
| cur = archive_manager._db.execute("SELECT MAX(date) FROM archives WHERE date != ''")
|
| latest_date = cur.fetchone()[0] or ""
|
|
|
|
|
| repo_list = list(archive_manager.repos)
|
| def _one(r):
|
| try: return r, int(archive_manager._ensure_size(r) or 0)
|
| except Exception: return r, 0
|
| repos = []
|
| if repo_list:
|
| with ThreadPoolExecutor(max_workers=min(8, len(repo_list))) as ex:
|
| repos = [{"repo": r, "size": s} for r, s in ex.map(_one, repo_list)]
|
| total_size_repos = sum(r["size"] for r in repos)
|
| return {
|
| "total_count": int(total_count or 0),
|
| "total_size": total_size_repos if total_size_repos > 0 else int(total_size_db or 0),
|
| "total_size_db": int(total_size_db or 0),
|
| "total_size_repos": total_size_repos,
|
| "repos": repos,
|
| "latest_date": latest_date,
|
| "by_date": by_date,
|
| "by_channel": by_channel,
|
| }
|
| stats = await loop.run_in_executor(None, _stats)
|
| return {"success": True, "stats": stats}
|
|
|
| @app.get("/api/admin/archive/dates")
|
| async def list_archive_dates():
|
|
|
| dates = set(archive_manager.list_dates())
|
| jst_tz = timezone(timedelta(hours=9))
|
| now_jst = datetime.now(jst_tz)
|
| for i in range(28):
|
| dates.add((now_jst - timedelta(days=i)).strftime("%Y%m%d"))
|
| return {"success": True, "dates": sorted(dates)}
|
| @app.post("/api/admin/archive/rebuild_index")
|
| async def rebuild_archive_index():
|
| if not archive_manager.api or not archive_manager.repos:
|
| return {"success": False, "error": "archive_manager not configured"}
|
| loop = asyncio.get_event_loop()
|
| repos = list(archive_manager.repos)
|
| archive_manager.progress["current_task"] = f"重建中: 并行扫描 {len(repos)} 个 dataset..."
|
| repo_done = {"n": 0}
|
| repo_done_lock = threading.Lock()
|
| def _scan_repo(repo_id):
|
|
|
| try:
|
| idx = archive_manager._build_index_from_meta(repo_id)
|
| except Exception:
|
| idx = {}
|
| rs = []
|
| for vp, info in idx.items():
|
|
|
| rs.append((info.get("repo", repo_id), vp, info.get("display_name") or os.path.basename(vp), int(info.get("file_size") or 0)))
|
| with repo_done_lock:
|
| repo_done["n"] += 1
|
| archive_manager.progress["current_task"] = f"重建中: {repo_done['n']}/{len(repos)} 仓 完成, 累计 {sum(1 for _ in rs)+0} 条 (本仓) → {repo_id}"
|
| return rs
|
| def _rebuild_parallel():
|
| rows = []
|
| with ThreadPoolExecutor(max_workers=max(1, min(len(repos), 8))) as ex:
|
| for rs in ex.map(_scan_repo, repos):
|
| rows.extend(rs)
|
| if not rows:
|
| archive_manager.progress["current_task"] = "重建完成: 0 条"
|
| return 0
|
| archive_manager.progress["current_task"] = f"重建中: 写入 SQLite ({len(rows)} 条)..."
|
| with archive_manager._db_lock:
|
| archive_manager._db.execute("DELETE FROM archives")
|
| archive_manager._db_insert_many(rows)
|
| archive_manager.progress["current_task"] = "重建中: 上传 archive_index.db..."
|
| archive_manager.upload_db_to_hf()
|
| archive_manager.progress["current_task"] = f"重建完成: {len(rows)} 条"
|
| return len(rows)
|
| count = await loop.run_in_executor(None, _rebuild_parallel)
|
| return {"success": True, "count": count}
|
| @app.post("/api/admin/archive/cleanup_db")
|
| async def cleanup_db_history():
|
| """压缩索引仓 main 分支历史: 释放历次上传 archive_index.db 累积的旧 LFS blob。
|
| 100GB 索引仓快满时手动调用, 立即回收空间。任务结束 flush_index 也会自动判断是否触发。"""
|
| loop = asyncio.get_event_loop()
|
| ok = await loop.run_in_executor(None, archive_manager.squash_db_history)
|
| return {"success": ok}
|
| @app.get("/api/admin/epg/sync_now")
|
| async def epg_sync_now():
|
| try:
|
| auth = await get_auth()
|
| channels = await get_channels(auth)
|
| raw = await get_all_epg(auth, channels, date=None, force=True)
|
| except Exception as e:
|
| return {"success": False, "error": str(e), "traceback": traceback.format_exc()}
|
| jst_tz = timezone(timedelta(hours=9))
|
| epg_by_date = {}
|
| total_programs = 0
|
| empty_channels = []
|
| if isinstance(raw, dict):
|
| for ch_id, ch_epg in raw.items():
|
| programs = ch_epg.get('data', []) if isinstance(ch_epg, dict) else []
|
| if not programs:
|
| empty_channels.append(ch_id)
|
| continue
|
| for p in programs:
|
| ts = _parse_epg_time(p.get('time'))
|
| if ts is None:
|
| continue
|
| d_str = datetime.fromtimestamp(ts, tz=jst_tz).strftime('%Y%m%d')
|
| if d_str not in epg_by_date:
|
| epg_by_date[d_str] = {}
|
| if ch_id not in epg_by_date[d_str]:
|
| epg_by_date[d_str][ch_id] = {'data': []}
|
| epg_by_date[d_str][ch_id]['data'].append(p)
|
| total_programs += 1
|
| upload_results = {}
|
| loop = asyncio.get_event_loop()
|
| for d_str, d_epg in epg_by_date.items():
|
| try:
|
| ok = await loop.run_in_executor(None, archive_manager.upload_epg_json, d_str, d_epg)
|
| upload_results[d_str] = "ok" if ok else "failed"
|
| except Exception as e:
|
| upload_results[d_str] = f"exception: {e}"
|
| return {
|
| "success": True,
|
| "channels_total": len(raw) if isinstance(raw, dict) else 0,
|
| "channels_empty": len(empty_channels),
|
| "programs_total": total_programs,
|
| "dates_found": list(epg_by_date.keys()),
|
| "upload_results": upload_results,
|
| "sample_channel": (lambda items: {
|
| "ch_id": items[0][0],
|
| "program_count": len(items[0][1].get('data', [])),
|
| "first_program": items[0][1].get('data', [{}])[0] if items[0][1].get('data') else None
|
| })(list(raw.items())) if isinstance(raw, dict) and raw else None,
|
| }
|
| @app.delete("/api/admin/archive/delete")
|
| async def delete_archive(path: str):
|
| item = archive_manager.db_get(path)
|
| if not item:
|
| return {"success": False, "error": "Not found"}
|
| loop = asyncio.get_event_loop()
|
| success = await loop.run_in_executor(None, archive_manager.delete_archive, item['path'], item['repo'])
|
| return {"success": success}
|
| @app.get("/api/admin/archive/diagnostic")
|
| async def archive_diagnostic():
|
| """跨容器重启追踪任务事件. 看到 dashboard '空闲 等待任务开始' 时调这个,
|
| 能倒序看到 start / batch_done / retry_round_done / end / crash 事件时间线,
|
| 以此分辨 '真没跑' / '跟完了' / '中途被容器重启吃掉了'."""
|
| return {"success": True, "logs": archive_manager.get_recent_task_logs(100)}
|
|
|
| @app.post("/api/admin/archive/start")
|
| async def start_manual_archive():
|
| if archive_manager.progress["status"] == "running":
|
| return {"success": False, "error": "Already running"}
|
| asyncio.create_task(run_archive_task())
|
| return {"success": True}
|
| @app.post("/api/admin/archive/pause")
|
| async def pause_archive():
|
| if archive_manager.progress["status"] == "running":
|
| archive_manager.progress["status"] = "pausing"
|
| return {"success": True}
|
| return {"success": False, "error": "Not running"}
|
| @app.post("/api/admin/archive/resume")
|
| async def resume_archive():
|
| if archive_manager.progress["status"] == "paused":
|
| archive_manager.progress["status"] = "running"
|
| return {"success": True}
|
| return {"success": False, "error": "Not paused"}
|
| @app.post("/api/admin/archive/stop")
|
| async def stop_archive():
|
| if archive_manager.progress["status"] in ("running", "pausing", "paused"):
|
| archive_manager.progress["status"] = "stopping"
|
| return {"success": True}
|
| return {"success": False, "error": "Not running"}
|
| _archive_triggered = False
|
| @app.get("/api/admin/archive/progress")
|
| async def get_archive_progress():
|
| return {"success": True, "progress": archive_manager.progress}
|
| @app.on_event("startup")
|
| async def startup_event():
|
| scheduler.add_job(auto_archive_job, 'cron', hour=0, timezone='Asia/Tokyo')
|
| scheduler.add_job(self_ping_job, 'interval', minutes=20)
|
| scheduler.start()
|
|
|
|
|
|
|
|
|
| import signal as _sig
|
| def _on_term(signum, frame):
|
| try: archive_manager.log_task_event("signal_received", f"signum={signum}")
|
| except Exception: pass
|
| try:
|
| _sig.signal(_sig.SIGTERM, _on_term)
|
| _sig.signal(_sig.SIGINT, _on_term)
|
| except Exception: pass
|
|
|
|
|
|
|
| async def _maybe_auto_resume():
|
|
|
| await asyncio.sleep(30)
|
| try:
|
| if archive_manager.progress["status"] == "running": return
|
| unfinished = archive_manager.find_unfinished_task()
|
| if not unfinished: return
|
| try: archive_manager.log_task_event("auto_resume", f"prev_event={unfinished.get('event')} prev_success={unfinished.get('success')}/{unfinished.get('total')}")
|
| except Exception: pass
|
| await run_archive_task()
|
| except Exception as _e:
|
| try: archive_manager.log_task_event("auto_resume_failed", str(_e)[:200])
|
| except Exception: pass
|
| asyncio.create_task(_maybe_auto_resume())
|
|
|
|
|
| def _cleanup_orphan_tmp():
|
| import glob
|
| cleaned = 0
|
| for pattern in ["/tmp/*.ts", "/tmp/*.segs"]:
|
| for path in glob.glob(pattern):
|
| try:
|
| if os.path.isfile(path):
|
| os.remove(path)
|
| elif os.path.isdir(path):
|
| import shutil
|
| shutil.rmtree(path, ignore_errors=True)
|
| cleaned += 1
|
| except Exception:
|
| pass
|
| if cleaned:
|
| try: archive_manager.log_task_event("startup_cleanup", f"removed={cleaned} orphan tmp files")
|
| except Exception: pass
|
| try:
|
| import threading as _t
|
| _t.Thread(target=_cleanup_orphan_tmp, daemon=True).start()
|
| except Exception:
|
| pass
|
| @app.on_event("shutdown")
|
| async def shutdown_event():
|
| if cache.storage_type == 'disk':
|
| cache._save_to_disk(force=True)
|
| async def auto_archive_job():
|
| await run_archive_task()
|
|
|
| async def self_ping_job():
|
| """每 20 分钟 self-ping,防止 HF Free Space 因无流量被 sleep"""
|
| try:
|
| async with httpx.AsyncClient(timeout=10.0) as _c:
|
| await _c.get("http://localhost:7860/ping")
|
| except Exception:
|
| pass
|
| if __name__ == "__main__":
|
| import uvicorn
|
| uvicorn.run(app, host="0.0.0.0", port=7860, log_level="error") |