| import httpx |
| import hashlib |
| import logging |
| import os |
| import asyncio |
| import time |
| import json |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from huggingface_hub import HfApi |
| from fastapi import HTTPException |
|
|
| logger = logging.getLogger(__name__) |
| REC_DIR = Path("/app/recordings") |
| REC_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| def md5_name(source_id, title, start_ts): |
| return hashlib.md5(f"{source_id}_{title}_{start_ts}".encode()).hexdigest() + ".ts" |
|
|
| def to_seconds(ts): |
| return int(ts / 1000) if isinstance(ts, (int, float)) and ts > 1e12 else int(ts) |
|
|
| def to_milliseconds(ts): |
| return int(ts * 1000) if isinstance(ts, (int, float)) and ts < 1e12 else int(ts) |
|
|
| class BaseAdapter: |
| def __init__(self, url, api_key, source_id): |
| self.base_url = url.rstrip("/") |
| self.api_key = api_key |
| self.source_id = source_id |
| self.headers = {} |
| if api_key: |
| if "Bearer" in api_key: |
| self.headers["Authorization"] = api_key |
| else: |
| self.headers["Authorization"] = f"Bearer {api_key}" |
| async def get_epg(self, hours=24): |
| raise NotImplementedError |
| async def schedule_record(self, channel_id, start_ts, end_ts, title): |
| raise NotImplementedError |
| async def wait_and_download(self, record_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): |
| raise NotImplementedError |
| async def _download_file(self, url, save_path, headers=None, on_progress=None): |
| async with httpx.AsyncClient(timeout=300.0, verify=False) as client: |
| async with client.stream("GET", url, headers=headers or self.headers) as res: |
| res.raise_for_status() |
| total = int(res.headers.get("content-length", 0)) |
| downloaded = 0 |
| with open(save_path, "wb") as f: |
| async for chunk in res.aiter_bytes(65536): |
| f.write(chunk) |
| downloaded += len(chunk) |
| if on_progress and total: |
| on_progress(downloaded / total) |
| async def _upload_to_dataset(self, local_path, md5_filename, original_name, repo_id, token): |
| if not token or not os.path.exists(local_path): |
| return |
| try: |
| api = HfApi() |
| await asyncio.to_thread( |
| api.upload_file, |
| path_or_fileobj=local_path, |
| path_in_repo=md5_filename, |
| repo_id=repo_id, |
| repo_type="dataset", |
| commit_message=f"Upload: {original_name}", |
| token=token |
| ) |
| os.remove(local_path) |
| logger.info(f"Uploaded {original_name} to {repo_id}") |
| except Exception as e: |
| logger.error(f"Upload failed: {e}") |
|
|
| class EPGStationAdapter(BaseAdapter): |
| async def get_epg(self, hours=24): |
| now = int(time.time() * 1000) |
| end = now + hours * 3600 * 1000 |
| url = f"{self.base_url}/api/schedules" |
| logger.info(f"EPGStation: GET {url} (startAt={now}, endAt={end})") |
| try: |
| async with httpx.AsyncClient(timeout=60.0, verify=False) as c: |
| res = await c.get( |
| url, |
| headers=self.headers, |
| params={ |
| "startAt": now, |
| "endAt": end, |
| "isHalfWidth": "true", |
| "GR": "true", |
| "BS": "true", |
| "CS": "true", |
| "SKY": "true" |
| } |
| ) |
| logger.info(f"EPGStation: Response status={res.status_code}") |
| if res.status_code != 200: |
| logger.error(f"EPGStation API error {res.status_code}: {res.text[:300]}") |
| return [] |
| try: |
| schedules = res.json() |
| except json.JSONDecodeError as e: |
| logger.error(f"EPGStation: Invalid JSON: {res.text[:200]}") |
| return [] |
| if not isinstance(schedules, list): |
| logger.warning(f"EPGStation: Expected list, got {type(schedules)}") |
| return [] |
| result = [] |
| for sch in schedules: |
| channel = sch.get("channel", {}) |
| cid = str(channel.get("id", "")) |
| cname = channel.get("name") or channel.get("displayName") or "Unknown" |
| for p in sch.get("programs", []): |
| result.append({ |
| "id": str(p.get("id", "")), |
| "title": p.get("name", "Unknown"), |
| "start": to_seconds(p.get("startAt", 0)), |
| "end": to_seconds(p.get("endAt", 0)), |
| "channel_id": cid, |
| "channel_name": cname, |
| "description": p.get("description", ""), |
| "extended": p.get("extended", "") |
| }) |
| logger.info(f"EPGStation: Fetched {len(result)} programs") |
| return result |
| except httpx.ConnectError as e: |
| logger.error(f"EPGStation: Connection failed - {str(e)}") |
| raise HTTPException(503, f"Cannot connect to EPGStation: {self.base_url}") |
| except httpx.TimeoutException as e: |
| logger.error(f"EPGStation: Request timeout - {str(e)}") |
| raise HTTPException(504, "EPGStation request timeout") |
| except Exception as e: |
| logger.error(f"EPGStation: Unexpected error - {type(e).__name__}: {str(e)}", exc_info=True) |
| raise HTTPException(500, f"EPG fetch failed: {str(e)}") |
| async def schedule_record(self, channel_id, start_ts, end_ts, title): |
| start_ms = to_milliseconds(start_ts) |
| end_ms = to_milliseconds(end_ts) |
| payload = { |
| "programId": None, |
| "timeSpecifiedOption": { |
| "name": title, |
| "channelId": int(channel_id) if str(channel_id).isdigit() else channel_id, |
| "startAt": start_ms, |
| "endAt": end_ms |
| }, |
| "allowEndLack": True, |
| "tags": [], |
| "saveOption": {"parentDirectoryName": "", "directory": "", "recordedFormat": ""}, |
| "encodeOption": {"mode1": None, "mode2": None, "mode3": None, "isDeleteOriginalAfterEncode": False} |
| } |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.post(f"{self.base_url}/api/reserves", json=payload, headers=self.headers) |
| if res.status_code == 200: |
| return str(res.json().get("reserveId", "")) |
| return "" |
| async def wait_and_download(self, reserve_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): |
| wait_until = end_ts + 300 |
| while time.time() < wait_until: |
| await asyncio.sleep(60) |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.get(f"{self.base_url}/api/recorded", headers=self.headers, params={"limit": 100}) |
| if res.status_code == 200: |
| items = res.json() |
| for item in items: |
| if item.get("reserveId") == reserve_id or item.get("programId") == reserve_id: |
| recorded_id = item.get("id") |
| video_url = f"{self.base_url}/api/videos/{recorded_id}/stream" |
| save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts)) |
| await self._download_file(video_url, save_path, on_progress=on_progress) |
| await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token) |
| return True |
| return False |
|
|
| class KonomiTVAdapter(BaseAdapter): |
| async def _get_headers(self): |
| headers = {} |
| if self.api_key: |
| if "Bearer" in self.api_key: |
| headers["Authorization"] = self.api_key |
| else: |
| headers["Authorization"] = f"Bearer {self.api_key}" |
| return headers |
| async def get_epg(self, hours=24): |
| try: |
| async with httpx.AsyncClient(timeout=60.0, verify=False) as c: |
| res = await c.get( |
| f"{self.base_url}/api/programs", |
| headers=self._get_headers(), |
| params={"limit": 500} |
| ) |
| if res.status_code != 200: |
| logger.error(f"KonomiTV API error {res.status_code}: {res.text[:200]}") |
| return [] |
| programs = res.json() |
| if not isinstance(programs, list): |
| return [] |
| result = [] |
| for p in programs: |
| channel = p.get("channel", {}) |
| result.append({ |
| "id": str(p.get("id", "")), |
| "title": p.get("title", "Unknown"), |
| "start": to_seconds(p.get("start_at", 0)), |
| "end": to_seconds(p.get("end_at", 0)), |
| "channel_id": str(channel.get("id", "")), |
| "channel_name": channel.get("name") or channel.get("display_name") or "Unknown", |
| "description": p.get("description", ""), |
| "extended": p.get("extended", "") |
| }) |
| logger.info(f"Fetched {len(result)} programs from KonomiTV") |
| return result |
| except httpx.ConnectError as e: |
| logger.error(f"Connection failed to {self.base_url}: {str(e)}") |
| return [] |
| except httpx.TimeoutException as e: |
| logger.error(f"Request timeout: {str(e)}") |
| return [] |
| except Exception as e: |
| logger.error(f"EPG fetch error: {type(e).__name__} - {str(e)}") |
| return [] |
| async def schedule_record(self, channel_id, start_ts, end_ts, title): |
| start_sec = to_seconds(start_ts) |
| end_sec = to_seconds(end_ts) |
| payload = { |
| "channel_id": int(channel_id) if str(channel_id).isdigit() else channel_id, |
| "start_at": start_sec, |
| "end_at": end_sec, |
| "name": title |
| } |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.post( |
| f"{self.base_url}/api/reservations", |
| json=payload, |
| headers=self._get_headers() |
| ) |
| if res.status_code in (200, 201): |
| return str(res.json().get("id", "")) |
| return "" |
| async def wait_and_download(self, reservation_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): |
| wait_until = end_ts + 300 |
| while time.time() < wait_until: |
| await asyncio.sleep(60) |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.get( |
| f"{self.base_url}/api/reservations/{reservation_id}", |
| headers=self._get_headers() |
| ) |
| if res.status_code == 200: |
| data = res.json() |
| if data.get("is_completed"): |
| file_path = data.get("file_path") |
| if file_path: |
| stream_url = f"{self.base_url}/api/recordings/{reservation_id}/stream" |
| save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts)) |
| await self._download_file(stream_url, save_path, on_progress=on_progress) |
| await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token) |
| return True |
| return False |
|
|
| class MirakurunAdapter(BaseAdapter): |
| async def get_epg(self, hours=24): |
| now = int(datetime.now(timezone.utc).timestamp()) |
| try: |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.get( |
| f"{self.base_url}/api/programs", |
| headers=self.headers, |
| params={"startAt": now, "endAt": now + hours*3600} |
| ) |
| if res.status_code != 200: |
| return [] |
| items = res.json() |
| return [{ |
| "id": str(p["id"]), |
| "title": p.get("name", "Unknown"), |
| "start": p.get("startAt", 0), |
| "end": p.get("endAt", 0), |
| "channel_id": str(p.get("channelId", "")), |
| "channel_name": p.get("channelName", "Unknown") |
| } for p in items] |
| except: |
| return [] |
| async def schedule_record(self, channel_id, start_ts, end_ts, title): |
| try: |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.post( |
| f"{self.base_url}/api/reserves", |
| json={"channelType": "GR", "channel": channel_id, "startAt": int(start_ts), "endAt": int(end_ts)}, |
| headers=self.headers |
| ) |
| return str(res.json().get("id", "")) if res.status_code == 200 else "" |
| except: |
| return "" |
| async def wait_and_download(self, reserve_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): |
| await asyncio.sleep(max(end_ts - time.time() + 300, 0)) |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.get(f"{self.base_url}/api/recorded/{reserve_id}", headers=self.headers) |
| if res.status_code == 200: |
| video_url = f"{self.base_url}/api/recorded/{reserve_id}/stream.mp4" |
| save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts)) |
| await self._download_file(video_url, save_path, on_progress=on_progress) |
| await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token) |
| return True |
| return False |
|
|
| class EDCBAdapter(BaseAdapter): |
| async def get_epg(self, hours=24): |
| now = int(datetime.now(timezone.utc).timestamp()) |
| try: |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.get( |
| f"{self.base_url}/api/programs", |
| headers=self.headers, |
| params={"startAt": now, "endAt": now + hours*3600} |
| ) |
| if res.status_code != 200: |
| return [] |
| items = res.json() |
| return [{ |
| "id": str(p["id"]), |
| "title": p.get("title", "Unknown"), |
| "start": p.get("startTime", 0), |
| "end": p.get("endTime", 0), |
| "channel_id": str(p.get("channelID", "")), |
| "channel_name": p.get("channelName", "Unknown") |
| } for p in items] |
| except: |
| return [] |
| async def schedule_record(self, channel_id, start_ts, end_ts, title): |
| try: |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.post( |
| f"{self.base_url}/api/reserve", |
| json={"channelID": channel_id, "startTime": int(start_ts), "endTime": int(end_ts), "recMode": 4}, |
| headers=self.headers |
| ) |
| return str(res.json().get("id", "")) if res.status_code == 200 else "" |
| except: |
| return "" |
| async def wait_and_download(self, reserve_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None): |
| await asyncio.sleep(max(end_ts - time.time() + 300, 0)) |
| async with httpx.AsyncClient(timeout=30.0, verify=False) as c: |
| res = await c.get(f"{self.base_url}/api/recfile/{reserve_id}", headers=self.headers) |
| if res.status_code == 200: |
| save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts)) |
| await self._download_file(str(res.url), save_path, on_progress=on_progress) |
| await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token) |
| return True |
| return False |
|
|
| async def start_local_stream_record(stream_url, start_ts, end_ts, title, source_id, dataset_repo, hf_token, on_progress=None): |
| duration = max(int(end_ts - start_ts), 60) |
| filename = md5_name(source_id, title, start_ts) |
| save_path = str(REC_DIR / filename) |
| proc = await asyncio.create_subprocess_exec( |
| "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", |
| "-i", stream_url, "-t", str(duration), "-c", "copy", "-f", "mpegts", save_path, |
| stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
| ) |
| async def monitor(): |
| while proc.returncode is None: |
| await asyncio.sleep(2) |
| if os.path.exists(save_path) and on_progress: |
| size = os.path.getsize(save_path) |
| on_progress(min(size / (duration * 100000), 1.0)) |
| asyncio.create_task(monitor()) |
| await proc.wait() |
| if proc.returncode == 0 and os.path.exists(save_path): |
| adapter = BaseAdapter("", "", "") |
| await adapter._upload_to_dataset(save_path, filename, title, dataset_repo, hf_token) |
| return True |
| return False |
|
|
| def get_adapter(source_type, url, api_key, source_id): |
| mapping = { |
| "epgstation": EPGStationAdapter, |
| "mirakurun": MirakurunAdapter, |
| "konomitv": KonomiTVAdapter, |
| "edcb": EDCBAdapter |
| } |
| cls = mapping.get(source_type.lower(), EPGStationAdapter) |
| return cls(url, api_key, source_id) |