eeee / backend /adapters.py
harii88's picture
Update backend/adapters.py
acd16fb verified
import httpx
import hashlib
import logging
import os
import asyncio
import time
import json
from datetime import datetime, timezone
from pathlib import Path
from huggingface_hub import HfApi
from fastapi import HTTPException
logger = logging.getLogger(__name__)
REC_DIR = Path("/app/recordings")
REC_DIR.mkdir(parents=True, exist_ok=True)
def md5_name(source_id, title, start_ts):
return hashlib.md5(f"{source_id}_{title}_{start_ts}".encode()).hexdigest() + ".ts"
def to_seconds(ts):
return int(ts / 1000) if isinstance(ts, (int, float)) and ts > 1e12 else int(ts)
def to_milliseconds(ts):
return int(ts * 1000) if isinstance(ts, (int, float)) and ts < 1e12 else int(ts)
class BaseAdapter:
def __init__(self, url, api_key, source_id):
self.base_url = url.rstrip("/")
self.api_key = api_key
self.source_id = source_id
self.headers = {}
if api_key:
if "Bearer" in api_key:
self.headers["Authorization"] = api_key
else:
self.headers["Authorization"] = f"Bearer {api_key}"
async def get_epg(self, hours=24):
raise NotImplementedError
async def schedule_record(self, channel_id, start_ts, end_ts, title):
raise NotImplementedError
async def wait_and_download(self, record_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None):
raise NotImplementedError
async def _download_file(self, url, save_path, headers=None, on_progress=None):
async with httpx.AsyncClient(timeout=300.0, verify=False) as client:
async with client.stream("GET", url, headers=headers or self.headers) as res:
res.raise_for_status()
total = int(res.headers.get("content-length", 0))
downloaded = 0
with open(save_path, "wb") as f:
async for chunk in res.aiter_bytes(65536):
f.write(chunk)
downloaded += len(chunk)
if on_progress and total:
on_progress(downloaded / total)
async def _upload_to_dataset(self, local_path, md5_filename, original_name, repo_id, token):
if not token or not os.path.exists(local_path):
return
try:
api = HfApi()
await asyncio.to_thread(
api.upload_file,
path_or_fileobj=local_path,
path_in_repo=md5_filename,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"Upload: {original_name}",
token=token
)
os.remove(local_path)
logger.info(f"Uploaded {original_name} to {repo_id}")
except Exception as e:
logger.error(f"Upload failed: {e}")
class EPGStationAdapter(BaseAdapter):
async def get_epg(self, hours=24):
now = int(time.time() * 1000)
end = now + hours * 3600 * 1000
url = f"{self.base_url}/api/schedules"
logger.info(f"EPGStation: GET {url} (startAt={now}, endAt={end})")
try:
async with httpx.AsyncClient(timeout=60.0, verify=False) as c:
res = await c.get(
url,
headers=self.headers,
params={
"startAt": now,
"endAt": end,
"isHalfWidth": "true",
"GR": "true",
"BS": "true",
"CS": "true",
"SKY": "true"
}
)
logger.info(f"EPGStation: Response status={res.status_code}")
if res.status_code != 200:
logger.error(f"EPGStation API error {res.status_code}: {res.text[:300]}")
return []
try:
schedules = res.json()
except json.JSONDecodeError as e:
logger.error(f"EPGStation: Invalid JSON: {res.text[:200]}")
return []
if not isinstance(schedules, list):
logger.warning(f"EPGStation: Expected list, got {type(schedules)}")
return []
result = []
for sch in schedules:
channel = sch.get("channel", {})
cid = str(channel.get("id", ""))
cname = channel.get("name") or channel.get("displayName") or "Unknown"
for p in sch.get("programs", []):
result.append({
"id": str(p.get("id", "")),
"title": p.get("name", "Unknown"),
"start": to_seconds(p.get("startAt", 0)),
"end": to_seconds(p.get("endAt", 0)),
"channel_id": cid,
"channel_name": cname,
"description": p.get("description", ""),
"extended": p.get("extended", "")
})
logger.info(f"EPGStation: Fetched {len(result)} programs")
return result
except httpx.ConnectError as e:
logger.error(f"EPGStation: Connection failed - {str(e)}")
raise HTTPException(503, f"Cannot connect to EPGStation: {self.base_url}")
except httpx.TimeoutException as e:
logger.error(f"EPGStation: Request timeout - {str(e)}")
raise HTTPException(504, "EPGStation request timeout")
except Exception as e:
logger.error(f"EPGStation: Unexpected error - {type(e).__name__}: {str(e)}", exc_info=True)
raise HTTPException(500, f"EPG fetch failed: {str(e)}")
async def schedule_record(self, channel_id, start_ts, end_ts, title):
start_ms = to_milliseconds(start_ts)
end_ms = to_milliseconds(end_ts)
payload = {
"programId": None,
"timeSpecifiedOption": {
"name": title,
"channelId": int(channel_id) if str(channel_id).isdigit() else channel_id,
"startAt": start_ms,
"endAt": end_ms
},
"allowEndLack": True,
"tags": [],
"saveOption": {"parentDirectoryName": "", "directory": "", "recordedFormat": ""},
"encodeOption": {"mode1": None, "mode2": None, "mode3": None, "isDeleteOriginalAfterEncode": False}
}
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.post(f"{self.base_url}/api/reserves", json=payload, headers=self.headers)
if res.status_code == 200:
return str(res.json().get("reserveId", ""))
return ""
async def wait_and_download(self, reserve_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None):
wait_until = end_ts + 300
while time.time() < wait_until:
await asyncio.sleep(60)
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.get(f"{self.base_url}/api/recorded", headers=self.headers, params={"limit": 100})
if res.status_code == 200:
items = res.json()
for item in items:
if item.get("reserveId") == reserve_id or item.get("programId") == reserve_id:
recorded_id = item.get("id")
video_url = f"{self.base_url}/api/videos/{recorded_id}/stream"
save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts))
await self._download_file(video_url, save_path, on_progress=on_progress)
await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token)
return True
return False
class KonomiTVAdapter(BaseAdapter):
async def _get_headers(self):
headers = {}
if self.api_key:
if "Bearer" in self.api_key:
headers["Authorization"] = self.api_key
else:
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
async def get_epg(self, hours=24):
try:
async with httpx.AsyncClient(timeout=60.0, verify=False) as c:
res = await c.get(
f"{self.base_url}/api/programs",
headers=self._get_headers(),
params={"limit": 500}
)
if res.status_code != 200:
logger.error(f"KonomiTV API error {res.status_code}: {res.text[:200]}")
return []
programs = res.json()
if not isinstance(programs, list):
return []
result = []
for p in programs:
channel = p.get("channel", {})
result.append({
"id": str(p.get("id", "")),
"title": p.get("title", "Unknown"),
"start": to_seconds(p.get("start_at", 0)),
"end": to_seconds(p.get("end_at", 0)),
"channel_id": str(channel.get("id", "")),
"channel_name": channel.get("name") or channel.get("display_name") or "Unknown",
"description": p.get("description", ""),
"extended": p.get("extended", "")
})
logger.info(f"Fetched {len(result)} programs from KonomiTV")
return result
except httpx.ConnectError as e:
logger.error(f"Connection failed to {self.base_url}: {str(e)}")
return []
except httpx.TimeoutException as e:
logger.error(f"Request timeout: {str(e)}")
return []
except Exception as e:
logger.error(f"EPG fetch error: {type(e).__name__} - {str(e)}")
return []
async def schedule_record(self, channel_id, start_ts, end_ts, title):
start_sec = to_seconds(start_ts)
end_sec = to_seconds(end_ts)
payload = {
"channel_id": int(channel_id) if str(channel_id).isdigit() else channel_id,
"start_at": start_sec,
"end_at": end_sec,
"name": title
}
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.post(
f"{self.base_url}/api/reservations",
json=payload,
headers=self._get_headers()
)
if res.status_code in (200, 201):
return str(res.json().get("id", ""))
return ""
async def wait_and_download(self, reservation_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None):
wait_until = end_ts + 300
while time.time() < wait_until:
await asyncio.sleep(60)
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.get(
f"{self.base_url}/api/reservations/{reservation_id}",
headers=self._get_headers()
)
if res.status_code == 200:
data = res.json()
if data.get("is_completed"):
file_path = data.get("file_path")
if file_path:
stream_url = f"{self.base_url}/api/recordings/{reservation_id}/stream"
save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts))
await self._download_file(stream_url, save_path, on_progress=on_progress)
await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token)
return True
return False
class MirakurunAdapter(BaseAdapter):
async def get_epg(self, hours=24):
now = int(datetime.now(timezone.utc).timestamp())
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.get(
f"{self.base_url}/api/programs",
headers=self.headers,
params={"startAt": now, "endAt": now + hours*3600}
)
if res.status_code != 200:
return []
items = res.json()
return [{
"id": str(p["id"]),
"title": p.get("name", "Unknown"),
"start": p.get("startAt", 0),
"end": p.get("endAt", 0),
"channel_id": str(p.get("channelId", "")),
"channel_name": p.get("channelName", "Unknown")
} for p in items]
except:
return []
async def schedule_record(self, channel_id, start_ts, end_ts, title):
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.post(
f"{self.base_url}/api/reserves",
json={"channelType": "GR", "channel": channel_id, "startAt": int(start_ts), "endAt": int(end_ts)},
headers=self.headers
)
return str(res.json().get("id", "")) if res.status_code == 200 else ""
except:
return ""
async def wait_and_download(self, reserve_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None):
await asyncio.sleep(max(end_ts - time.time() + 300, 0))
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.get(f"{self.base_url}/api/recorded/{reserve_id}", headers=self.headers)
if res.status_code == 200:
video_url = f"{self.base_url}/api/recorded/{reserve_id}/stream.mp4"
save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts))
await self._download_file(video_url, save_path, on_progress=on_progress)
await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token)
return True
return False
class EDCBAdapter(BaseAdapter):
async def get_epg(self, hours=24):
now = int(datetime.now(timezone.utc).timestamp())
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.get(
f"{self.base_url}/api/programs",
headers=self.headers,
params={"startAt": now, "endAt": now + hours*3600}
)
if res.status_code != 200:
return []
items = res.json()
return [{
"id": str(p["id"]),
"title": p.get("title", "Unknown"),
"start": p.get("startTime", 0),
"end": p.get("endTime", 0),
"channel_id": str(p.get("channelID", "")),
"channel_name": p.get("channelName", "Unknown")
} for p in items]
except:
return []
async def schedule_record(self, channel_id, start_ts, end_ts, title):
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.post(
f"{self.base_url}/api/reserve",
json={"channelID": channel_id, "startTime": int(start_ts), "endTime": int(end_ts), "recMode": 4},
headers=self.headers
)
return str(res.json().get("id", "")) if res.status_code == 200 else ""
except:
return ""
async def wait_and_download(self, reserve_id, start_ts, end_ts, title, dataset_repo, hf_token, on_progress=None):
await asyncio.sleep(max(end_ts - time.time() + 300, 0))
async with httpx.AsyncClient(timeout=30.0, verify=False) as c:
res = await c.get(f"{self.base_url}/api/recfile/{reserve_id}", headers=self.headers)
if res.status_code == 200:
save_path = str(REC_DIR / md5_name(self.source_id, title, start_ts))
await self._download_file(str(res.url), save_path, on_progress=on_progress)
await self._upload_to_dataset(save_path, os.path.basename(save_path), title, dataset_repo, hf_token)
return True
return False
async def start_local_stream_record(stream_url, start_ts, end_ts, title, source_id, dataset_repo, hf_token, on_progress=None):
duration = max(int(end_ts - start_ts), 60)
filename = md5_name(source_id, title, start_ts)
save_path = str(REC_DIR / filename)
proc = await asyncio.create_subprocess_exec(
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
"-i", stream_url, "-t", str(duration), "-c", "copy", "-f", "mpegts", save_path,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
async def monitor():
while proc.returncode is None:
await asyncio.sleep(2)
if os.path.exists(save_path) and on_progress:
size = os.path.getsize(save_path)
on_progress(min(size / (duration * 100000), 1.0))
asyncio.create_task(monitor())
await proc.wait()
if proc.returncode == 0 and os.path.exists(save_path):
adapter = BaseAdapter("", "", "")
await adapter._upload_to_dataset(save_path, filename, title, dataset_repo, hf_token)
return True
return False
def get_adapter(source_type, url, api_key, source_id):
mapping = {
"epgstation": EPGStationAdapter,
"mirakurun": MirakurunAdapter,
"konomitv": KonomiTVAdapter,
"edcb": EDCBAdapter
}
cls = mapping.get(source_type.lower(), EPGStationAdapter)
return cls(url, api_key, source_id)