import httpx import hashlib import logging import os import asyncio import time import json from datetime import datetime, timezone from pathlib import Path from huggingface_hub import HfApi from fastapi import HTTPException logger = logging.getLogger(__name__) REC_DIR = Path("/app/recordings") REC_DIR.mkdir(parents=True, exist_ok=True) def md5_name(source_id, title, start_ts): return hashlib.md5(f"{source_id}_{title}_{start_ts}".encode()).hexdigest() + ".ts" def to_seconds(ts): return int(ts / 1000) if isinstance(ts, (int, float)) and ts > 1e12 else int(ts) def to_milliseconds(ts): return int(ts * 1000) if isinstance(ts, (int, float)) and ts < 1e12 else int(ts) class BaseAdapter: def __init__(self, url, api_key, source_id): self.base_url = url.rstrip("/") self.api_key = api_key self.source_id = source_id self.headers = {} if api_key: if "Bearer" in api_key: self.headers["Authorization"] = api_key else: self.headers["Authorization"] = f"Bearer {api_key}" async def get_epg(self, hours=24): raise NotImplementedError async def schedule_record(self, channel_id, start_ts, end_ts, title): raise NotImplementedError async def wait_and_download(self, record_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): raise NotImplementedError async def _download_file(self, url, save_path, headers=None, on_progress=None): async with httpx.AsyncClient(timeout=300.0, verify=False) as client: async with client.stream("GET", url, headers=headers or self.headers) as res: res.raise_for_status() total = int(res.headers.get("content-length", 0)) downloaded = 0 with open(save_path, "wb") as f: async for chunk in res.aiter_bytes(65536): f.write(chunk) downloaded += len(chunk) if on_progress and total: on_progress(downloaded / total) async def _upload_to_dataset(self, local_path, md5_filename, original_name, repo_id, token): if not token or not os.path.exists(local_path): return try: api = HfApi() await asyncio.to_thread( api.upload_file, path_or_fileobj=local_path, path_in_repo=md5_filename, repo_id=repo_id, repo_type="dataset", commit_message=f"Upload: {original_name}", token=token ) os.remove(local_path) logger.info(f"Uploaded {original_name} to {repo_id}") except Exception as e: logger.error(f"Upload failed: {e}") class EPGStationAdapter(BaseAdapter): async def get_epg(self, hours=24): now = int(time.time() * 1000) end = now + hours * 3600 * 1000 url = f"{self.base_url}/api/schedules" logger.info(f"EPGStation: GET {url} (startAt={now}, endAt={end})") try: async with httpx.AsyncClient(timeout=60.0, verify=False) as c: res = await c.get( url, headers=self.headers, params={ "startAt": now, "endAt": end, "isHalfWidth": "true", "GR": "true", "BS": "true", "CS": "true", "SKY": "true" } ) logger.info(f"EPGStation: Response status={res.status_code}") if res.status_code != 200: logger.error(f"EPGStation API error {res.status_code}: {res.text[:300]}") return [] try: schedules = res.json() except json.JSONDecodeError as e: logger.error(f"EPGStation: Invalid JSON: {res.text[:200]}") return [] if not isinstance(schedules, list): logger.warning(f"EPGStation: Expected list, got {type(schedules)}") return [] result = [] for sch in schedules: channel = sch.get("channel", {}) cid = str(channel.get("id", "")) cname = channel.get("name") or channel.get("displayName") or "Unknown" for p in sch.get("programs", []): result.append({ "id": str(p.get("id", "")), "title": p.get("name", "Unknown"), "start": to_seconds(p.get("startAt", 0)), "end": to_seconds(p.get("endAt", 0)), "channel_id": cid, "channel_name": cname, "description": p.get("description", ""), "extended": p.get("extended", "") }) logger.info(f"EPGStation: Fetched {len(result)} programs") return result except httpx.ConnectError as e: logger.error(f"EPGStation: Connection failed - {str(e)}") raise HTTPException(503, f"Cannot connect to EPGStation: {self.base_url}") except httpx.TimeoutException as e: logger.error(f"EPGStation: Request timeout - {str(e)}") raise HTTPException(504, "EPGStation request timeout") except Exception as e: logger.error(f"EPGStation: Unexpected error - {type(e).__name__}: {str(e)}", exc_info=True) raise HTTPException(500, f"EPG fetch failed: {str(e)}") async def schedule_record(self, channel_id, start_ts, end_ts, title): start_ms = to_milliseconds(start_ts) end_ms = to_milliseconds(end_ts) payload = { "programId": None, "timeSpecifiedOption": { "name": title, "channelId": int(channel_id) if str(channel_id).isdigit() else channel_id, "startAt": start_ms, "endAt": end_ms }, "allowEndLack": True, "tags": [], "saveOption": {"parentDirectoryName": "", "directory": "", "recordedFormat": ""}, "encodeOption": {"mode1": None, "mode2": None, "mode3": None, "isDeleteOriginalAfterEncode": False} } async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.post(f"{self.base_url}/api/reserves", json=payload, headers=self.headers) if res.status_code == 200: return str(res.json().get("reserveId", "")) return "" async def wait_and_download(self, reserve_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): wait_until = end_ts + 300 while time.time() < wait_until: await asyncio.sleep(60) async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.get(f"{self.base_url}/api/recorded", headers=self.headers, params={"limit": 100}) if res.status_code == 200: items = res.json() for item in items: if item.get("reserveId") == reserve_id or item.get("programId") == reserve_id: recorded_id = item.get("id") video_url = f"{self.base_url}/api/videos/{recorded_id}/stream" save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts)) await self._download_file(video_url, save_path, on_progress=on_progress) await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token) return True return False class KonomiTVAdapter(BaseAdapter): async def _get_headers(self): headers = {} if self.api_key: if "Bearer" in self.api_key: headers["Authorization"] = self.api_key else: headers["Authorization"] = f"Bearer {self.api_key}" return headers async def get_epg(self, hours=24): try: async with httpx.AsyncClient(timeout=60.0, verify=False) as c: res = await c.get( f"{self.base_url}/api/programs", headers=self._get_headers(), params={"limit": 500} ) if res.status_code != 200: logger.error(f"KonomiTV API error {res.status_code}: {res.text[:200]}") return [] programs = res.json() if not isinstance(programs, list): return [] result = [] for p in programs: channel = p.get("channel", {}) result.append({ "id": str(p.get("id", "")), "title": p.get("title", "Unknown"), "start": to_seconds(p.get("start_at", 0)), "end": to_seconds(p.get("end_at", 0)), "channel_id": str(channel.get("id", "")), "channel_name": channel.get("name") or channel.get("display_name") or "Unknown", "description": p.get("description", ""), "extended": p.get("extended", "") }) logger.info(f"Fetched {len(result)} programs from KonomiTV") return result except httpx.ConnectError as e: logger.error(f"Connection failed to {self.base_url}: {str(e)}") return [] except httpx.TimeoutException as e: logger.error(f"Request timeout: {str(e)}") return [] except Exception as e: logger.error(f"EPG fetch error: {type(e).__name__} - {str(e)}") return [] async def schedule_record(self, channel_id, start_ts, end_ts, title): start_sec = to_seconds(start_ts) end_sec = to_seconds(end_ts) payload = { "channel_id": int(channel_id) if str(channel_id).isdigit() else channel_id, "start_at": start_sec, "end_at": end_sec, "name": title } async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.post( f"{self.base_url}/api/reservations", json=payload, headers=self._get_headers() ) if res.status_code in (200, 201): return str(res.json().get("id", "")) return "" async def wait_and_download(self, reservation_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): wait_until = end_ts + 300 while time.time() < wait_until: await asyncio.sleep(60) async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.get( f"{self.base_url}/api/reservations/{reservation_id}", headers=self._get_headers() ) if res.status_code == 200: data = res.json() if data.get("is_completed"): file_path = data.get("file_path") if file_path: stream_url = f"{self.base_url}/api/recordings/{reservation_id}/stream" save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts)) await self._download_file(stream_url, save_path, on_progress=on_progress) await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token) return True return False class MirakurunAdapter(BaseAdapter): async def get_epg(self, hours=24): now = int(datetime.now(timezone.utc).timestamp()) try: async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.get( f"{self.base_url}/api/programs", headers=self.headers, params={"startAt": now, "endAt": now + hours*3600} ) if res.status_code != 200: return [] items = res.json() return [{ "id": str(p["id"]), "title": p.get("name", "Unknown"), "start": p.get("startAt", 0), "end": p.get("endAt", 0), "channel_id": str(p.get("channelId", "")), "channel_name": p.get("channelName", "Unknown") } for p in items] except: return [] async def schedule_record(self, channel_id, start_ts, end_ts, title): try: async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.post( f"{self.base_url}/api/reserves", json={"channelType": "GR", "channel": channel_id, "startAt": int(start_ts), "endAt": int(end_ts)}, headers=self.headers ) return str(res.json().get("id", "")) if res.status_code == 200 else "" except: return "" async def wait_and_download(self, reserve_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): await asyncio.sleep(max(end_ts - time.time() + 300, 0)) async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.get(f"{self.base_url}/api/recorded/{reserve_id}", headers=self.headers) if res.status_code == 200: video_url = f"{self.base_url}/api/recorded/{reserve_id}/stream.mp4" save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts)) await self._download_file(video_url, save_path, on_progress=on_progress) await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token) return True return False class EDCBAdapter(BaseAdapter): async def get_epg(self, hours=24): now = int(datetime.now(timezone.utc).timestamp()) try: async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.get( f"{self.base_url}/api/programs", headers=self.headers, params={"startAt": now, "endAt": now + hours*3600} ) if res.status_code != 200: return [] items = res.json() return [{ "id": str(p["id"]), "title": p.get("title", "Unknown"), "start": p.get("startTime", 0), "end": p.get("endTime", 0), "channel_id": str(p.get("channelID", "")), "channel_name": p.get("channelName", "Unknown") } for p in items] except: return [] async def schedule_record(self, channel_id, start_ts, end_ts, title): try: async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.post( f"{self.base_url}/api/reserve", json={"channelID": channel_id, "startTime": int(start_ts), "endTime": int(end_ts), "recMode": 4}, headers=self.headers ) return str(res.json().get("id", "")) if res.status_code == 200 else "" except: return "" async def wait_and_download(self, reserve_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): await asyncio.sleep(max(end_ts - time.time() + 300, 0)) async with httpx.AsyncClient(timeout=30.0, verify=False) as c: res = await c.get(f"{self.base_url}/api/recfile/{reserve_id}", headers=self.headers) if res.status_code == 200: save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts)) await self._download_file(str(res.url), save_path, on_progress=on_progress) await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token) return True return False async def start_local_stream_record(stream_url, start_ts, end_ts, title, source_id, dataset_repo, hf_token, on_progress=None): duration = max(int(end_ts - start_ts), 60) filename = md5_name(source_id, title, start_ts) save_path = str(REC_DIR / filename) proc = await asyncio.create_subprocess_exec( "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", stream_url, "-t", str(duration), "-c", "copy", "-f", "mpegts", save_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) async def monitor(): while proc.returncode is None: await asyncio.sleep(2) if os.path.exists(save_path) and on_progress: size = os.path.getsize(save_path) on_progress(min(size / (duration * 100000), 1.0)) asyncio.create_task(monitor()) await proc.wait() if proc.returncode == 0 and os.path.exists(save_path): adapter = BaseAdapter("", "", "") await adapter._upload_to_dataset(save_path, filename, title, dataset_repo, hf_token) return True return False def get_adapter(source_type, url, api_key, source_id): mapping = { "epgstation": EPGStationAdapter, "mirakurun": MirakurunAdapter, "konomitv": KonomiTVAdapter, "edcb": EDCBAdapter } cls = mapping.get(source_type.lower(), EPGStationAdapter) return cls(url, api_key, source_id)