Spaces:
Running
Running
| """ | |
| LiveTV Scraper - Production Ready Backend | |
| Aggregates channel lists, EPG, and extracts direct stream URLs. | |
| """ | |
| import os | |
| import re | |
| import base64 | |
| import logging | |
| import threading | |
| import urllib.parse | |
| from datetime import datetime, timezone, timedelta | |
| import requests | |
| import urllib3 | |
| import dns.resolver | |
| import json | |
| import asyncio | |
| from playwright.async_api import async_playwright | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import urllib3.util.connection as urllib3_connection | |
| # ββ CONFIGURATION ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Config: | |
| PORT = int(os.environ.get("PORT", 7860)) | |
| # Change to False for true production deployment | |
| DEBUG = os.environ.get("DEBUG", "true").lower() == "true" | |
| CHANNELS_FILE = "channels.json" | |
| EPG_API = "https://epg.pw/api/epg.json?channel_id={id}&date={date}&lang=en&timezone=QXNpYS9Lb2xrYXRh" | |
| EVENTS_API = "https://api.cdnlivetv.tv/api/v1/events/sports/?user=cdnlivetv&plan=free" | |
| DEFAULT_UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
| PLAYBACK_UA = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Mobile Safari/537.36" | |
| # ββ LOGGING ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', | |
| handlers=[logging.StreamHandler()] | |
| ) | |
| logger = logging.getLogger("LiveTV") | |
| # ββ SSL & DNS HANDLING βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| _dns_cache = {} | |
| _dns_lock = threading.Lock() | |
| def resolve_host(hostname): | |
| """Resolve hostname using custom DNS resolvers with local caching.""" | |
| with _dns_lock: | |
| if hostname in _dns_cache: | |
| return _dns_cache[hostname] | |
| try: | |
| resolver = dns.resolver.Resolver() | |
| resolver.nameservers = ['8.8.8.8', '1.1.1.1'] # Use reliable DNS | |
| answers = resolver.resolve(hostname, 'A') | |
| ip = str(answers[0]) | |
| with _dns_lock: | |
| _dns_cache[hostname] = ip | |
| return ip | |
| except Exception as e: | |
| logger.error(f"DNS resolution failed for {hostname}: {e}") | |
| return None | |
| # Monkey-patch urllib3 to handle environments with broken system DNS | |
| # This preserves SNI while forcing connection to the resolved IP | |
| _original_create_connection = urllib3_connection.create_connection | |
| def patched_create_connection(address, *args, **kwargs): | |
| host, port = address | |
| # If host is not an IP, resolve it manually | |
| if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', host): | |
| ip = resolve_host(host) | |
| if ip: | |
| return _original_create_connection((ip, port), *args, **kwargs) | |
| return _original_create_connection(address, *args, **kwargs) | |
| urllib3_connection.create_connection = patched_create_connection | |
| # ββ APP SETUP ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = Flask(__name__) | |
| CORS(app) # Allow cross-origin requests for the API | |
| EPG_IDS = { | |
| "cnn": 464857, "fox-news": 465372, "msnbc": 465087, | |
| "cbc-news": 470695, "cp24": 470621, "bbc-news": 12162, | |
| "sky-news": 12069, "france-24": 55716, "tf1": 443174, | |
| "al-jazeera": 12532, "al-arabiya": 55875, "ndtv": 464202, | |
| "republic-tv": 463873, "aaj-tak": 464262, "geo-news": 12349, | |
| "ary-news": 12348, "pogo": 464673, "nick": 465135, "willow-cricket": 465010, | |
| "espn": 464947, "sky-sports-cricket": 12059, "sky-sports-f1": 12061, | |
| } | |
| # ββ HELPERS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_channels(): | |
| """Load channel data from the local JSON file.""" | |
| try: | |
| if os.path.exists(Config.CHANNELS_FILE): | |
| with open(Config.CHANNELS_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| return {"channels": []} | |
| except Exception as e: | |
| logger.error(f"Error loading {Config.CHANNELS_FILE}: {e}") | |
| return {"channels": []} | |
| def fetch_url(url, timeout=15, extra_headers=None): | |
| """Unified fetcher with production headers and logging.""" | |
| headers = { | |
| "User-Agent": Config.DEFAULT_UA, | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Upgrade-Insecure-Requests": "1", | |
| } | |
| if extra_headers: | |
| headers.update(extra_headers) | |
| try: | |
| r = requests.get(url, headers=headers, verify=False, timeout=timeout) | |
| logger.info(f"FETCH {url} -> {r.status_code}") | |
| return r | |
| except Exception as e: | |
| logger.error(f"FETCH ERROR {url}: {e}") | |
| raise | |
| def fmt_time(dt_str, tz_offset=5.5): | |
| """Format UTC timestamp to 12h format with a specific timezone offset.""" | |
| if not dt_str: return "N/A" | |
| try: | |
| clean_dt = dt_str.replace('Z', '+00:00') | |
| dt = datetime.fromisoformat(clean_dt) | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=timezone.utc) | |
| target_tz = timezone(timedelta(hours=float(tz_offset))) | |
| local_dt = dt.astimezone(target_tz) | |
| time_part = local_dt.strftime("%I:%M %p").lstrip('0') | |
| tz_label = "IST" if tz_offset == 5.5 else f"UTC{'+' if tz_offset >= 0 else ''}{tz_offset}" | |
| return f"{time_part} {tz_label}" | |
| except Exception: | |
| return "N/A" | |
| def unpack_js(h, n_str, t, e): | |
| """De-obfuscate the custom-packed string used in the player page.""" | |
| try: | |
| delimiter = n_str[e] | |
| parts = h.split(delimiter) | |
| decoded = "" | |
| for s in parts: | |
| if not s: continue | |
| val = 0 | |
| for char in s: | |
| idx = n_str.find(char) | |
| if idx != -1: | |
| val = val * e + idx | |
| decoded += chr(val - t) | |
| return decoded | |
| except Exception: | |
| return "" | |
| def decode_custom_b64(s): | |
| """Mimic the Base64 decoding function used in the provider's obfuscated scripts.""" | |
| try: | |
| s = s.replace('-', '+').replace('_', '/') | |
| while len(s) % 4: | |
| s += '=' | |
| return base64.b64decode(s).decode('utf-8', errors='ignore') | |
| except Exception: | |
| return "" | |
| _epg_id_cache = {} | |
| _epg_id_lock = threading.Lock() | |
| def get_first_epg_id(channel_name): | |
| """Fallback method to search for an EPG ID on epg.pw.""" | |
| if not channel_name: return None | |
| with _epg_id_lock: | |
| if channel_name in _epg_id_cache: | |
| return _epg_id_cache[channel_name] | |
| headers = {"User-Agent": Config.DEFAULT_UA} | |
| variations = [channel_name] | |
| cleaned = re.sub(r'\s+(TV|HD|SD|CHANNEL)\b', '', channel_name, flags=re.IGNORECASE).strip() | |
| if cleaned and cleaned != channel_name: | |
| variations.append(cleaned) | |
| result_id = None | |
| for name_var in variations: | |
| try: | |
| encoded = base64.b64encode(name_var.encode()).decode() | |
| url = f"https://epg.pw/search/channel/{urllib.parse.quote(encoded)}.html?lang=en" | |
| logger.info(f"Searching EPG ID for '{name_var}'...") | |
| r = requests.get(url, headers=headers, timeout=15, verify=False) | |
| if r.status_code == 200: | |
| match = re.search(r'/last/(\d+)\.html', r.text) | |
| if match: | |
| result_id = match.group(1) | |
| logger.info(f"Found EPG ID {result_id} for '{name_var}'") | |
| break | |
| except Exception as e: | |
| logger.error(f"EPG search variation '{name_var}' failed: {e}") | |
| with _epg_id_lock: | |
| _epg_id_cache[channel_name] = result_id | |
| return result_id | |
| def extract_stream_url(html: str) -> str | None: | |
| """Extract and reconstruct the .m3u8 URL from the player HTML or raw text.""" | |
| raw_match = re.search(r'(https?://[^\s"\'\`]+\.m3u8[^\s"\'\`]*)', html) | |
| if raw_match: | |
| logger.info("Found direct raw .m3u8 link in response payload") | |
| return raw_match.group(1) | |
| pattern = r'\}\s*\(\s*(["\'])(?P<h>.*?)\1\s*,\s*(?P<u>\d+)\s*,\s*(["\'])(?P<n>.*?)\4\s*,\s*(?P<t>\d+)\s*,\s*(?P<e>\d+)' | |
| match = re.search(pattern, html, re.DOTALL) | |
| if not match: return None | |
| h, n, t, e = match.group('h'), match.group('n'), int(match.group('t')), int(match.group('e')) | |
| unpacked = unpack_js(h, n, t, e) | |
| if not unpacked: return None | |
| consts = {} | |
| for m in re.finditer(r"const\s+(\w+)\s*=\s*'([^']+)';", unpacked): | |
| consts[m.group(1)] = m.group(2) | |
| source_match = re.search(r"source:\s*{\s*src:\s*(\w+)", unpacked) | |
| if not source_match: return None | |
| var_name = source_match.group(1) | |
| def_match = re.search(rf"(?:const|let|var)\s+{var_name}\s*=\s*(.+?);", unpacked) | |
| if not def_match: return None | |
| parts_line = def_match.group(1) | |
| decoder_match = re.search(r"(\w+)\(\w+\)", parts_line) | |
| if not decoder_match: return None | |
| decoder_name = decoder_match.group(1) | |
| var_matches = re.findall(rf"{decoder_name}\((\w+)\)", parts_line) | |
| stream_url = "" | |
| for v in var_matches: | |
| if v in consts: | |
| stream_url += decode_custom_b64(consts[v]) | |
| return stream_url if stream_url.startswith("http") else None | |
| async def fallback_extract_m3u8(stream_url: str) -> str | None: | |
| """Fallback method using Playwright to intercept m3u8 requests from the site.""" | |
| m3u8_found = asyncio.Event() | |
| result = {} | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| context = await browser.new_context( | |
| user_agent=Config.PLAYBACK_UA, | |
| extra_http_headers={ | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Referer': 'https://streamsports99.su/', | |
| } | |
| ) | |
| page = await context.new_page() | |
| async def on_request(request): | |
| if '.m3u8' in request.url and not m3u8_found.is_set(): | |
| logger.info(f"FALLBACK Intercepted: {request.url}") | |
| result['m3u8'] = request.url | |
| m3u8_found.set() | |
| page.on('request', on_request) | |
| logger.info(f"FALLBACK Loading: {stream_url}") | |
| await page.goto(stream_url, wait_until='domcontentloaded', timeout=30000) | |
| try: | |
| await asyncio.wait_for(m3u8_found.wait(), timeout=15) | |
| except asyncio.TimeoutError: | |
| pass | |
| if 'm3u8' not in result: | |
| logger.info("FALLBACK No network intercept β scanning DOM...") | |
| src = await page.evaluate('''() => { | |
| const v = document.querySelector("video[src]"); | |
| return v ? v.src : null; | |
| }''') | |
| if src and '.m3u8' in src: | |
| result['m3u8'] = src | |
| await browser.close() | |
| except Exception as e: | |
| logger.error(f"FALLBACK Playwright error: {e}") | |
| return result.get('m3u8') | |
| async def extract_videasy_sources(url: str) -> dict: | |
| """Multi-quality extraction for player.videasy.net targets.""" | |
| logger.info(f"π Starting Videasy extraction for: {url}") | |
| playback_headers = { | |
| "User-Agent": Config.PLAYBACK_UA, | |
| "Origin": "https://player.videasy.net", | |
| "Referer": "https://player.videasy.net/" | |
| } | |
| result = { | |
| "status": "pending", | |
| "qualities": {}, | |
| "subtitles": [], | |
| "headers": playback_headers | |
| } | |
| captured_m3u8s = [] | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| context = await browser.new_context( | |
| user_agent=playback_headers["User-Agent"], | |
| is_mobile=True, | |
| viewport={'width': 360, 'height': 800} | |
| ) | |
| page = await context.new_page() | |
| async def handle_response(response): | |
| res_url = response.url | |
| if ".m3u8" in res_url and "index.m3u8" in res_url: | |
| if res_url not in captured_m3u8s: | |
| logger.info(f"β Captured m3u8: {res_url}") | |
| captured_m3u8s.append(res_url) | |
| if ".vtt" in res_url: | |
| label = res_url.split('/')[-1].replace('.vtt', '') | |
| if not any(s["src_url"] == res_url for s in result["subtitles"]): | |
| logger.info(f"β Captured subtitle: {res_url}") | |
| result["subtitles"].append({"label": label, "src_url": res_url}) | |
| page.on("response", handle_response) | |
| await page.goto(url, wait_until="networkidle", timeout=60000) | |
| await page.mouse.click(180, 400) | |
| await asyncio.sleep(4) | |
| await page.mouse.click(320, 750) | |
| await asyncio.sleep(1) | |
| try: | |
| trigger = await page.wait_for_selector("button[role='tab']:has-text('Quality')", timeout=3000) | |
| if trigger: await page.evaluate("(el) => el.click()", trigger) | |
| except: pass | |
| quality_buttons = await page.evaluate("""() => { | |
| const buttons = Array.from(document.querySelectorAll('button')); | |
| return buttons | |
| .filter(b => /^\\d{3,4}p$/.test(b.innerText.trim().split('\\n')[0])) | |
| .map(b => ({ | |
| text: b.innerText.trim().split('\\n')[0], | |
| isActive: b.className.includes('bg-primary') | |
| })); | |
| }""") | |
| for b in quality_buttons: | |
| if b['isActive']: | |
| q = b['text'] | |
| if captured_m3u8s: | |
| result["qualities"][q] = captured_m3u8s[0] | |
| break | |
| for b in quality_buttons: | |
| q = b['text'] | |
| if q in result["qualities"]: continue | |
| count_before = len(captured_m3u8s) | |
| await page.evaluate(f"""(text) => {{ | |
| const btn = Array.from(document.querySelectorAll('button')).find(b => b.innerText.includes(text)); | |
| if (btn) btn.click(); | |
| }}""", q) | |
| for _ in range(15): | |
| if len(captured_m3u8s) > count_before: | |
| result["qualities"][q] = captured_m3u8s[-1] | |
| break | |
| await asyncio.sleep(0.4) | |
| await browser.close() | |
| except Exception as e: | |
| logger.error(f"Videasy extraction anomaly: {e}") | |
| if captured_m3u8s and not result["qualities"]: | |
| result["qualities"]["auto"] = captured_m3u8s[0] | |
| result["status"] = "success" if captured_m3u8s else "failed" | |
| return result | |
| # ββ ROUTES βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def health(): | |
| return jsonify({"status": "healthy", "timestamp": datetime.now().isoformat()}) | |
| def get_details(): | |
| name = request.args.get("name", "").strip() | |
| code = request.args.get("code", "").strip().lower() | |
| try: | |
| tz_offset = float(request.args.get("tz", 5.5)) | |
| except Exception: | |
| tz_offset = 5.5 | |
| if not name: | |
| return jsonify({"error": "Missing 'name' parameter."}), 400 | |
| player_url = ( | |
| f"https://cdnlivetv.tv/api/v1/channels/player/" | |
| f"?name={urllib.parse.quote(name)}&code={code}&user=cdnlivetv&plan=free" | |
| ) | |
| logo_url = None | |
| try: | |
| channels_data = load_channels().get("channels", []) | |
| for ch in channels_data: | |
| ch_name = ch.get("name") or ch.get("channel_name") or "" | |
| if ch_name.lower() == name.lower(): | |
| logo_url = ch.get("image") | |
| break | |
| except Exception as e: | |
| logger.warning(f"Could not load channel metadata: {e}") | |
| epg = {"live_now": None, "upcoming": []} | |
| epg_slug = name.lower().replace(" ", "-") | |
| epg_id = EPG_IDS.get(epg_slug) | |
| if not epg_id: | |
| logger.info(f"EPG ID not in mapping for '{name}', trying fallback search...") | |
| epg_id = get_first_epg_id(name) | |
| if epg_id: | |
| def _extract_epg_list(resp_data): | |
| if not resp_data: | |
| return [] | |
| if isinstance(resp_data, list) and len(resp_data) > 0: | |
| return resp_data[0].get("epg_list", []) | |
| if isinstance(resp_data, dict): | |
| return resp_data.get("epg_list", []) | |
| return [] | |
| def parse_dt_utc(s): | |
| try: | |
| dt = datetime.fromisoformat(s.replace('Z', '+00:00')) | |
| return dt.astimezone(timezone.utc) | |
| except Exception: | |
| return datetime.min.replace(tzinfo=timezone.utc) | |
| try: | |
| ist_tz = timezone(timedelta(hours=5, minutes=30)) | |
| today_str = datetime.now(ist_tz).strftime("%Y%m%d") | |
| e_res = fetch_url(Config.EPG_API.format(id=epg_id, date=today_str)) | |
| e_res.raise_for_status() | |
| raw = _extract_epg_list(e_res.json()) | |
| logger.info(f"Today EPG ({today_str}): {len(raw)} programs for id={epg_id}") | |
| try: | |
| if raw: | |
| last_start = raw[-1].get("start_date", "") | |
| last_date = datetime.fromisoformat(last_start.replace('Z', '+00:00')).date() | |
| tomorrow_str = (last_date + timedelta(days=1)).strftime("%Y%m%d") | |
| else: | |
| tomorrow_str = (datetime.now(ist_tz) + timedelta(days=1)).strftime("%Y%m%d") | |
| t_res = fetch_url(Config.EPG_API.format(id=epg_id, date=tomorrow_str), timeout=20) | |
| tomorrow_list = _extract_epg_list(t_res.json()) if t_res.status_code == 200 else [] | |
| logger.info(f"Tomorrow EPG ({tomorrow_str}): {len(tomorrow_list)} programs for id={epg_id}") | |
| raw.extend(tomorrow_list) | |
| except Exception as te: | |
| logger.warning(f"Tomorrow EPG fetch failed for id={epg_id}: {te}") | |
| if raw: | |
| raw.sort(key=lambda x: parse_dt_utc(x.get("start_date", "1970-01-01T00:00:00+00:00"))) | |
| now = datetime.now(timezone.utc) | |
| current_idx = -1 | |
| for i, item in enumerate(raw): | |
| start_str = item.get("start_date") | |
| if not start_str: | |
| continue | |
| if parse_dt_utc(start_str) <= now: | |
| current_idx = i | |
| logger.info(f"current_idx={current_idx}, total programs={len(raw)}") | |
| if current_idx != -1: | |
| curr = raw[current_idx] | |
| next_item = raw[current_idx + 1] if (current_idx + 1) < len(raw) else None | |
| stop_time_str = next_item.get("start_date") if next_item else None | |
| progress_pct = 0 | |
| try: | |
| s_dt = parse_dt_utc(curr.get("start_date")) | |
| if stop_time_str: | |
| e_dt = parse_dt_utc(stop_time_str) | |
| total = (e_dt - s_dt).total_seconds() | |
| elapsed = (now - s_dt).total_seconds() | |
| if total > 0: | |
| progress_pct = min(100, max(0, int((elapsed / total) * 100))) | |
| except Exception: | |
| progress_pct = 0 | |
| epg["live_now"] = { | |
| "title": curr.get("title"), | |
| "description": curr.get("desc"), | |
| "start": curr.get("start_date"), | |
| "stop": stop_time_str, | |
| "time": f"{fmt_time(curr.get('start_date'), tz_offset)} - {fmt_time(stop_time_str, tz_offset)}", | |
| "progress_pct": progress_pct | |
| } | |
| for i in range(current_idx + 1, min(current_idx + 6, len(raw))): | |
| item = raw[i] | |
| item_stop = raw[i + 1].get("start_date") if (i + 1) < len(raw) else None | |
| epg["upcoming"].append({ | |
| "title": item.get("title"), | |
| "description": item.get("desc"), | |
| "start": item.get("start_date"), | |
| "stop": item_stop, | |
| "time": f"{fmt_time(item.get('start_date'), tz_offset)} - {fmt_time(item_stop, tz_offset)}" | |
| }) | |
| except Exception as e: | |
| logger.error(f"EPG Fetch error for id={epg_id}: {e}") | |
| return jsonify({ | |
| "name": name, | |
| "country": code.upper() if code else None, | |
| "logo_url": logo_url, | |
| "player_url": player_url, | |
| "epg_id": epg_id, | |
| "epg": epg, | |
| "timezone": "IST" if tz_offset == 5.5 else f"UTC{'+' if tz_offset >= 0 else ''}{tz_offset}" | |
| }) | |
| def get_stream(): | |
| name = request.args.get("name", "").strip() | |
| code = request.args.get("code", "").strip().lower() | |
| if not name: | |
| return jsonify({"error": "Missing 'name' parameter."}), 400 | |
| player_url = ( | |
| "https://cdnlivetv.tv/api/v1/channels/player/" | |
| f"?name={urllib.parse.quote(name)}&code={code}" | |
| "&user=cdnlivetv&plan=free" | |
| ) | |
| stream_url = None | |
| try: | |
| r = fetch_url(player_url, timeout=15) | |
| if r.status_code == 200: | |
| content_type = r.headers.get("Content-Type", "").lower() | |
| if "application/json" in content_type: | |
| logger.info("Primary API returned a JSON payload. Parsing directly...") | |
| json_data = r.json() | |
| stream_url = ( | |
| json_data.get("stream_url") or | |
| json_data.get("stream") or | |
| json_data.get("url") or | |
| json_data.get("data", {}).get("url") | |
| ) | |
| else: | |
| stream_url = extract_stream_url(r.text) | |
| if stream_url: | |
| logger.info(f"Successfully extracted stream URL via primary channel logic: {stream_url}") | |
| else: | |
| logger.warning(f"Primary API returned status {r.status_code}") | |
| except Exception as e: | |
| logger.warning(f"Primary extraction failed with error: {e}") | |
| # ββ Fallback Method (Playwright) ββββββββββββββββββββββββββββββββββββββ | |
| if not stream_url: | |
| logger.info(f"Primary method failed for '{name}', processing Playwright fallback parameters...") | |
| # Ensure code is populated if missing from request args to avoid trailing '__' structure crash | |
| if not code: | |
| try: | |
| channels_data = load_channels().get("channels", []) | |
| for ch in channels_data: | |
| ch_name = ch.get("name") or ch.get("channel_name") or "" | |
| if ch_name.lower() == name.lower(): | |
| # Extract mapped code property from json schema matching the target name | |
| code = (ch.get("code") or ch.get("country_code") or "").strip().lower() | |
| if code: | |
| logger.info(f"Dynamically loaded country code variant '{code}' from internal channels registry for '{name}'") | |
| break | |
| except Exception as ec: | |
| logger.warning(f"Could not load fallback channel configuration metadata map: {ec}") | |
| # Ultimate baseline fallback assignment if code isn't provided or discovered in local registry | |
| if not code: | |
| code = "ca" | |
| logger.info(f"Defaulting country variable syntax to baseline variant '{code}' for target channel '{name}'") | |
| site_url = f"https://streamsports99.su/live-tv/{urllib.parse.quote(name)}__{code}" | |
| try: | |
| stream_url = asyncio.run(fallback_extract_m3u8(site_url)) | |
| except Exception as fe: | |
| logger.error(f"Playwright fallback failed: {fe}") | |
| if stream_url: | |
| return jsonify({ | |
| "name": name, | |
| "stream_url": stream_url, | |
| "headers": { | |
| "Referer": "https://cdnlivetv.tv/", | |
| "User-Agent": Config.PLAYBACK_UA, | |
| "sec-ch-ua-platform": '"Android"', | |
| "sec-ch-ua": '"Chromium";v="148", "Google Chrome";v="148", "Not/A)Brand";v="99"', | |
| "sec-ch-ua-mobile": "?1" | |
| } | |
| }) | |
| else: | |
| return jsonify({"error": "Could not extract stream URL via primary or fallback methods."}), 500 | |
| def get_home(): | |
| try: | |
| data = load_channels() | |
| return jsonify(data) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def get_event(): | |
| try: | |
| r = fetch_url(Config.EVENTS_API, timeout=15) | |
| r.raise_for_status() | |
| return jsonify(r.json()) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def search(): | |
| query = request.args.get("q", "").strip().lower() | |
| if not query: | |
| return jsonify({"error": "Missing 'q' parameter for search."}), 400 | |
| try: | |
| channels = load_channels().get("channels", []) | |
| results = [] | |
| for ch in channels: | |
| ch_name = ch.get("name") or ch.get("channel_name") or "" | |
| if query in ch_name.lower(): | |
| results.append(ch) | |
| return jsonify({ | |
| "query": query, | |
| "results": results, | |
| "total": len(results) | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def get_videasy(): | |
| tmdb_id = request.args.get("tmdbId", "").strip() | |
| if not tmdb_id: | |
| return jsonify({"error": "Missing 'tmdbId' parameter."}), 400 | |
| target_url = f"https://player.videasy.net/movie/{tmdb_id}" | |
| try: | |
| data = asyncio.run(extract_videasy_sources(target_url)) | |
| return jsonify(data) | |
| except Exception as e: | |
| logger.error(f"Videasy endpoint error: {e}") | |
| return jsonify({"status": "failed", "error": str(e)}), 500 | |
| # ββ SERVER RUNNER ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| if Config.DEBUG: | |
| logger.info(f"Starting server in DEBUG mode on port {Config.PORT}") | |
| app.run(host="0.0.0.0", port=Config.PORT, debug=True) | |
| else: | |
| from waitress import serve | |
| logger.info(f"Starting server in PRODUCTION mode (Waitress) on port {Config.PORT}") | |
| serve(app, host="0.0.0.0", port=Config.PORT) |