| import json | |
| import httpx | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional, Dict, Any | |
| from urllib.parse import urlparse | |
| from config import Config | |
| from cache_manager import cache | |
| async def get_cid(force: bool = False) -> str: | |
| if not force: | |
| cached = cache.get_cid() | |
| if cached: | |
| return cached | |
| try: | |
| url = Config.get_cid_url() | |
| async with httpx.AsyncClient(timeout=Config.TIMEOUT) as client: | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| if 'cid' not in data: | |
| raise ValueError("CID not found in response") | |
| cid = data['cid'] | |
| cache.set_cid(cid) | |
| return cid | |
| except Exception as e: | |
| if cache.cid: | |
| return cache.cid | |
| raise e | |
| async def get_auth(force: bool = False, retry_count: int = 0) -> Dict[str, Any]: | |
| if not force: | |
| cached = cache.get_auth() | |
| if cached: | |
| return cached | |
| try: | |
| cid = await get_cid(force=(retry_count > 0)) | |
| login_url = Config.get_login_url(cid) | |
| async with httpx.AsyncClient(timeout=Config.TIMEOUT) as client: | |
| response = await client.get(login_url) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('code') != 'OK': | |
| error_msg = data.get('message', 'Unknown error') | |
| if 'cid' in error_msg.lower() and retry_count < 2: | |
| return await get_auth(force=True, retry_count=retry_count + 1) | |
| raise ValueError(f"Login failed: {error_msg}") | |
| product_config = json.loads(data.get('product_config', '{}')) | |
| auth = { | |
| 'access_token': data['access_token'], | |
| 'vms_host': product_config['vms_host'].rstrip('/'), | |
| 'vms_uid': product_config['vms_uid'] | |
| } | |
| if not all(auth.values()): | |
| raise ValueError("Incomplete auth data") | |
| cache.set_auth(auth) | |
| return auth | |
| except Exception as e: | |
| if cache.auth and retry_count == 0: | |
| return cache.auth | |
| raise e | |
| async def get_channels(auth: Dict[str, Any], force: bool = False) -> list: | |
| if not force: | |
| cached = cache.get_channels() | |
| if cached: | |
| return cached | |
| try: | |
| url = Config.get_list_url(auth['vms_uid'], with_epg=False) | |
| headers = { | |
| 'Referer': Config.REQUIRED_REFERER, | |
| 'User-Agent': 'Mozilla/5.0' | |
| } | |
| async with httpx.AsyncClient(timeout=Config.TIMEOUT) as client: | |
| response = await client.get(url, headers=headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| channels = [ | |
| ch for ch in data.get('result', []) | |
| if ch.get('id') and ch.get('no') and ch.get('name') and ch.get('playpath') | |
| ] | |
| if not channels: | |
| raise ValueError("No channels found") | |
| cache.set_channels(channels) | |
| return channels | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code in [401, 403]: | |
| new_auth = await get_auth(force=True) | |
| return await get_channels(new_auth, force=True) | |
| raise e | |
| except Exception as e: | |
| if cache.channels: | |
| return cache.channels | |
| raise e | |
| async def fetch_epg(vid: str, date: str, auth: dict, retry_count: int = 0) -> list: | |
| cached = cache.get_epg(vid, date) | |
| if cached is not None: | |
| return cached | |
| try: | |
| url = Config.get_epg_url(auth['vms_uid'], vid) | |
| headers = { | |
| 'Referer': Config.REQUIRED_REFERER, | |
| 'User-Agent': 'Mozilla/5.0' | |
| } | |
| async with httpx.AsyncClient(timeout=Config.TIMEOUT) as client: | |
| response = await client.get(url, headers=headers) | |
| if response.status_code in [401, 403] and retry_count < 2: | |
| new_auth = await get_auth(force=True) | |
| return await fetch_epg(vid, date, new_auth, retry_count + 1) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data.get('result') or not data['result'][0].get('record_epg'): | |
| cache.set_epg(vid, date, []) | |
| return [] | |
| full_epg = json.loads(data['result'][0]['record_epg']) | |
| processed_epg = [] | |
| for i, program in enumerate(full_epg): | |
| if not program.get('time'): | |
| continue | |
| if 'time_end' not in program or not program['time_end']: | |
| if i + 1 < len(full_epg) and full_epg[i + 1].get('time'): | |
| program['time_end'] = full_epg[i + 1]['time'] | |
| else: | |
| continue | |
| processed_epg.append(program) | |
| jst = timezone(timedelta(hours=9)) | |
| daily_epg = {} | |
| for program in processed_epg: | |
| ts = float(program['time']) | |
| ts = ts / 1000.0 if ts > 9999999999 else ts | |
| dt = datetime.fromtimestamp(ts, tz=jst) | |
| date_str = dt.strftime('%Y-%m-%d') | |
| if date_str not in daily_epg: | |
| daily_epg[date_str] = [] | |
| daily_epg[date_str].append(program) | |
| for d, programs in daily_epg.items(): | |
| sorted_programs = sorted(programs, key=lambda x: x['time']) | |
| cache.set_epg(vid, d, sorted_programs) | |
| result = daily_epg.get(date, []) | |
| if result: | |
| return sorted(result, key=lambda x: x['time']) | |
| else: | |
| if date not in daily_epg: | |
| cache.set_epg(vid, date, []) | |
| return [] | |
| except Exception as e: | |
| raise e | |
| def _ts_normalize(t_val) -> Optional[float]: | |
| if t_val is None: | |
| return None | |
| try: | |
| val = float(str(t_val).strip()) | |
| return val / 1000.0 if val > 9999999999 else val | |
| except Exception: | |
| return None | |
| async def get_all_epg(auth: Dict[str, Any], channels: list = None, date: str = None, force: bool = False) -> Dict[str, Any]: | |
| cache_key_date = date if date else 'full' | |
| if not force: | |
| cached = cache.get_epg('_all_', cache_key_date) | |
| if cached: | |
| return cached | |
| try: | |
| url = Config.get_list_url(auth['vms_uid'], with_epg=True) | |
| headers = { | |
| 'Referer': Config.REQUIRED_REFERER, | |
| 'User-Agent': 'Mozilla/5.0' | |
| } | |
| async with httpx.AsyncClient(timeout=Config.TIMEOUT) as client: | |
| response = await client.get(url, headers=headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| result = {} | |
| jst = timezone(timedelta(hours=9)) | |
| if date: | |
| try: | |
| target_dt = datetime.strptime(date, "%Y%m%d").replace(tzinfo=jst) | |
| except ValueError: | |
| target_dt = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=jst) | |
| start_ts = target_dt.timestamp() | |
| end_ts = (target_dt + timedelta(days=1)).timestamp() | |
| else: | |
| start_ts = None | |
| end_ts = None | |
| source_channels = data.get('result', []) | |
| for channel in source_channels: | |
| channel_id = channel.get('id') | |
| record_epg = channel.get('record_epg') | |
| if not channel_id: | |
| continue | |
| if not record_epg: | |
| result[channel_id] = {'data': []} | |
| continue | |
| try: | |
| epg_list = json.loads(record_epg) | |
| processed_programs = [] | |
| for i, program in enumerate(epg_list): | |
| if not program.get('time'): | |
| continue | |
| if 'time_end' not in program or not program['time_end']: | |
| if i + 1 < len(epg_list) and epg_list[i + 1].get('time'): | |
| program['time_end'] = epg_list[i + 1]['time'] | |
| else: | |
| continue | |
| processed_programs.append(program) | |
| daily_epg = {} | |
| for program in processed_programs: | |
| ts = _ts_normalize(program['time']) | |
| if ts is None: | |
| continue | |
| dt = datetime.fromtimestamp(ts, tz=jst) | |
| date_str = dt.strftime('%Y-%m-%d') | |
| if date_str not in daily_epg: | |
| daily_epg[date_str] = [] | |
| daily_epg[date_str].append(program) | |
| for d, programs in daily_epg.items(): | |
| sorted_programs = sorted(programs, key=lambda x: x['time']) | |
| cache.set_epg(channel_id, d, sorted_programs) | |
| if start_ts is not None and end_ts is not None: | |
| filtered = [ | |
| p for p in processed_programs | |
| if start_ts <= (_ts_normalize(p.get('time')) or 0) < end_ts | |
| ] | |
| result[channel_id] = {'data': sorted(filtered, key=lambda x: x['time'])} | |
| else: | |
| result[channel_id] = {'data': processed_programs} | |
| except json.JSONDecodeError: | |
| result[channel_id] = {'data': []} | |
| continue | |
| cache.set_epg('_all_', cache_key_date, result) | |
| return result | |
| except Exception as e: | |
| cached = cache.get_epg('_all_', cache_key_date) | |
| if cached: | |
| return cached | |
| return {} | |
| def get_jst_date(dt: Optional[datetime] = None) -> str: | |
| if dt is None: | |
| dt = datetime.now(timezone.utc) | |
| jst = timezone(timedelta(hours=9)) | |
| jst_time = dt.astimezone(jst) | |
| return jst_time.strftime('%Y-%m-%d') | |
| def rewrite_m3u8(content: str, current_path: str, worker_base: str) -> str: | |
| lines = content.split('\n') | |
| output = [] | |
| if '?' in current_path: | |
| base_path_part, query_part = current_path.rsplit('?', 1) | |
| base_dir = base_path_part[:base_path_part.rfind('/') + 1] | |
| else: | |
| base_dir = current_path[:current_path.rfind('/') + 1] | |
| query_part = '' | |
| for line in lines: | |
| trimmed = line.strip() | |
| if trimmed.startswith('#') or not trimmed: | |
| output.append(line) | |
| continue | |
| if trimmed.startswith('http://') or trimmed.startswith('https://'): | |
| parsed = urlparse(trimmed) | |
| target_path = parsed.path | |
| if parsed.query: | |
| target_path += f"?{parsed.query}" | |
| elif trimmed.startswith('/'): | |
| target_path = trimmed | |
| else: | |
| target_path = base_dir + trimmed | |
| if '?' not in target_path and query_part: | |
| target_path += f"?{query_part}" | |
| output.append(worker_base + target_path) | |
| return '\n'.join(output) | |
| def extract_playlist_url(content: str, base_url: str) -> Optional[str]: | |
| for line in content.split('\n'): | |
| trimmed = line.strip() | |
| if not trimmed or trimmed.startswith('#'): | |
| continue | |
| if trimmed.startswith('http'): | |
| return trimmed | |
| if trimmed.endswith('.m3u8') or trimmed.endswith('.M3U8'): | |
| parsed = urlparse(base_url) | |
| if trimmed.startswith('/'): | |
| return f"{parsed.scheme}://{parsed.netloc}{trimmed}" | |
| else: | |
| base_path = base_url[:base_url.rfind('/') + 1] | |
| return base_path + trimmed | |
| return None |