""" LiveTV Scraper - Production Ready Backend Aggregates channel lists, EPG, and extracts direct stream URLs. """ import os import re import base64 import logging import threading import urllib.parse from datetime import datetime, timezone, timedelta import requests import urllib3 import dns.resolver import json import asyncio from playwright.async_api import async_playwright from flask import Flask, request, jsonify from flask_cors import CORS import urllib3.util.connection as urllib3_connection # ── CONFIGURATION ──────────────────────────────────────────────────────────── class Config: PORT = int(os.environ.get("PORT", 7860)) # Change to False for true production deployment DEBUG = os.environ.get("DEBUG", "true").lower() == "true" CHANNELS_FILE = "channels.json" EPG_API = "https://epg.pw/api/epg.json?channel_id={id}&date={date}&lang=en&timezone=QXNpYS9Lb2xrYXRh" EVENTS_API = "https://api.cdnlivetv.tv/api/v1/events/sports/?user=cdnlivetv&plan=free" DEFAULT_UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" PLAYBACK_UA = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Mobile Safari/537.36" # ── LOGGING ────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', handlers=[logging.StreamHandler()] ) logger = logging.getLogger("LiveTV") # ── SSL & DNS HANDLING ─────────────────────────────────────────────────────── urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) _dns_cache = {} _dns_lock = threading.Lock() def resolve_host(hostname): """Resolve hostname using custom DNS resolvers with local caching.""" with _dns_lock: if hostname in _dns_cache: return _dns_cache[hostname] try: resolver = dns.resolver.Resolver() resolver.nameservers = ['8.8.8.8', '1.1.1.1'] # Use reliable DNS answers = resolver.resolve(hostname, 'A') ip = str(answers[0]) with _dns_lock: _dns_cache[hostname] = ip return ip except Exception as e: logger.error(f"DNS resolution failed for {hostname}: {e}") return None # Monkey-patch urllib3 to handle environments with broken system DNS # This preserves SNI while forcing connection to the resolved IP _original_create_connection = urllib3_connection.create_connection def patched_create_connection(address, *args, **kwargs): host, port = address # If host is not an IP, resolve it manually if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', host): ip = resolve_host(host) if ip: return _original_create_connection((ip, port), *args, **kwargs) return _original_create_connection(address, *args, **kwargs) urllib3_connection.create_connection = patched_create_connection # ── APP SETUP ──────────────────────────────────────────────────────────────── app = Flask(__name__) CORS(app) # Allow cross-origin requests for the API EPG_IDS = { "cnn": 464857, "fox-news": 465372, "msnbc": 465087, "cbc-news": 470695, "cp24": 470621, "bbc-news": 12162, "sky-news": 12069, "france-24": 55716, "tf1": 443174, "al-jazeera": 12532, "al-arabiya": 55875, "ndtv": 464202, "republic-tv": 463873, "aaj-tak": 464262, "geo-news": 12349, "ary-news": 12348, "pogo": 464673, "nick": 465135, "willow-cricket": 465010, "espn": 464947, "sky-sports-cricket": 12059, "sky-sports-f1": 12061, } # ── HELPERS ────────────────────────────────────────────────────────────────── def load_channels(): """Load channel data from the local JSON file.""" try: if os.path.exists(Config.CHANNELS_FILE): with open(Config.CHANNELS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {"channels": []} except Exception as e: logger.error(f"Error loading {Config.CHANNELS_FILE}: {e}") return {"channels": []} def fetch_url(url, timeout=15, extra_headers=None): """Unified fetcher with production headers and logging.""" headers = { "User-Agent": Config.DEFAULT_UA, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Upgrade-Insecure-Requests": "1", } if extra_headers: headers.update(extra_headers) try: r = requests.get(url, headers=headers, verify=False, timeout=timeout) logger.info(f"FETCH {url} -> {r.status_code}") return r except Exception as e: logger.error(f"FETCH ERROR {url}: {e}") raise def fmt_time(dt_str, tz_offset=5.5): """Format UTC timestamp to 12h format with a specific timezone offset.""" if not dt_str: return "N/A" try: clean_dt = dt_str.replace('Z', '+00:00') dt = datetime.fromisoformat(clean_dt) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) target_tz = timezone(timedelta(hours=float(tz_offset))) local_dt = dt.astimezone(target_tz) time_part = local_dt.strftime("%I:%M %p").lstrip('0') tz_label = "IST" if tz_offset == 5.5 else f"UTC{'+' if tz_offset >= 0 else ''}{tz_offset}" return f"{time_part} {tz_label}" except Exception: return "N/A" def unpack_js(h, n_str, t, e): """De-obfuscate the custom-packed string used in the player page.""" try: delimiter = n_str[e] parts = h.split(delimiter) decoded = "" for s in parts: if not s: continue val = 0 for char in s: idx = n_str.find(char) if idx != -1: val = val * e + idx decoded += chr(val - t) return decoded except Exception: return "" def decode_custom_b64(s): """Mimic the Base64 decoding function used in the provider's obfuscated scripts.""" try: s = s.replace('-', '+').replace('_', '/') while len(s) % 4: s += '=' return base64.b64decode(s).decode('utf-8', errors='ignore') except Exception: return "" _epg_id_cache = {} _epg_id_lock = threading.Lock() def get_first_epg_id(channel_name): """Fallback method to search for an EPG ID on epg.pw.""" if not channel_name: return None with _epg_id_lock: if channel_name in _epg_id_cache: return _epg_id_cache[channel_name] headers = {"User-Agent": Config.DEFAULT_UA} variations = [channel_name] cleaned = re.sub(r'\s+(TV|HD|SD|CHANNEL)\b', '', channel_name, flags=re.IGNORECASE).strip() if cleaned and cleaned != channel_name: variations.append(cleaned) result_id = None for name_var in variations: try: encoded = base64.b64encode(name_var.encode()).decode() url = f"https://epg.pw/search/channel/{urllib.parse.quote(encoded)}.html?lang=en" logger.info(f"Searching EPG ID for '{name_var}'...") r = requests.get(url, headers=headers, timeout=15, verify=False) if r.status_code == 200: match = re.search(r'/last/(\d+)\.html', r.text) if match: result_id = match.group(1) logger.info(f"Found EPG ID {result_id} for '{name_var}'") break except Exception as e: logger.error(f"EPG search variation '{name_var}' failed: {e}") with _epg_id_lock: _epg_id_cache[channel_name] = result_id return result_id def extract_stream_url(html: str) -> str | None: """Extract and reconstruct the .m3u8 URL from the player HTML or raw text.""" raw_match = re.search(r'(https?://[^\s"\'\`]+\.m3u8[^\s"\'\`]*)', html) if raw_match: logger.info("Found direct raw .m3u8 link in response payload") return raw_match.group(1) pattern = r'\}\s*\(\s*(["\'])(?P.*?)\1\s*,\s*(?P\d+)\s*,\s*(["\'])(?P.*?)\4\s*,\s*(?P\d+)\s*,\s*(?P\d+)' match = re.search(pattern, html, re.DOTALL) if not match: return None h, n, t, e = match.group('h'), match.group('n'), int(match.group('t')), int(match.group('e')) unpacked = unpack_js(h, n, t, e) if not unpacked: return None consts = {} for m in re.finditer(r"const\s+(\w+)\s*=\s*'([^']+)';", unpacked): consts[m.group(1)] = m.group(2) source_match = re.search(r"source:\s*{\s*src:\s*(\w+)", unpacked) if not source_match: return None var_name = source_match.group(1) def_match = re.search(rf"(?:const|let|var)\s+{var_name}\s*=\s*(.+?);", unpacked) if not def_match: return None parts_line = def_match.group(1) decoder_match = re.search(r"(\w+)\(\w+\)", parts_line) if not decoder_match: return None decoder_name = decoder_match.group(1) var_matches = re.findall(rf"{decoder_name}\((\w+)\)", parts_line) stream_url = "" for v in var_matches: if v in consts: stream_url += decode_custom_b64(consts[v]) return stream_url if stream_url.startswith("http") else None async def fallback_extract_m3u8(stream_url: str) -> str | None: """Fallback method using Playwright to intercept m3u8 requests from the site.""" m3u8_found = asyncio.Event() result = {} try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=Config.PLAYBACK_UA, extra_http_headers={ 'Accept-Language': 'en-US,en;q=0.9', 'Referer': 'https://streamsports99.su/', } ) page = await context.new_page() async def on_request(request): if '.m3u8' in request.url and not m3u8_found.is_set(): logger.info(f"FALLBACK Intercepted: {request.url}") result['m3u8'] = request.url m3u8_found.set() page.on('request', on_request) logger.info(f"FALLBACK Loading: {stream_url}") await page.goto(stream_url, wait_until='domcontentloaded', timeout=30000) try: await asyncio.wait_for(m3u8_found.wait(), timeout=15) except asyncio.TimeoutError: pass if 'm3u8' not in result: logger.info("FALLBACK No network intercept — scanning DOM...") src = await page.evaluate('''() => { const v = document.querySelector("video[src]"); return v ? v.src : null; }''') if src and '.m3u8' in src: result['m3u8'] = src await browser.close() except Exception as e: logger.error(f"FALLBACK Playwright error: {e}") return result.get('m3u8') async def extract_videasy_sources(url: str) -> dict: """Multi-quality extraction for player.videasy.net targets.""" logger.info(f"🚀 Starting Videasy extraction for: {url}") playback_headers = { "User-Agent": Config.PLAYBACK_UA, "Origin": "https://player.videasy.net", "Referer": "https://player.videasy.net/" } result = { "status": "pending", "qualities": {}, "subtitles": [], "headers": playback_headers } captured_m3u8s = [] try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=playback_headers["User-Agent"], is_mobile=True, viewport={'width': 360, 'height': 800} ) page = await context.new_page() async def handle_response(response): res_url = response.url if ".m3u8" in res_url and "index.m3u8" in res_url: if res_url not in captured_m3u8s: logger.info(f"✅ Captured m3u8: {res_url}") captured_m3u8s.append(res_url) if ".vtt" in res_url: label = res_url.split('/')[-1].replace('.vtt', '') if not any(s["src_url"] == res_url for s in result["subtitles"]): logger.info(f"✅ Captured subtitle: {res_url}") result["subtitles"].append({"label": label, "src_url": res_url}) page.on("response", handle_response) await page.goto(url, wait_until="networkidle", timeout=60000) await page.mouse.click(180, 400) await asyncio.sleep(4) await page.mouse.click(320, 750) await asyncio.sleep(1) try: trigger = await page.wait_for_selector("button[role='tab']:has-text('Quality')", timeout=3000) if trigger: await page.evaluate("(el) => el.click()", trigger) except: pass quality_buttons = await page.evaluate("""() => { const buttons = Array.from(document.querySelectorAll('button')); return buttons .filter(b => /^\\d{3,4}p$/.test(b.innerText.trim().split('\\n')[0])) .map(b => ({ text: b.innerText.trim().split('\\n')[0], isActive: b.className.includes('bg-primary') })); }""") for b in quality_buttons: if b['isActive']: q = b['text'] if captured_m3u8s: result["qualities"][q] = captured_m3u8s[0] break for b in quality_buttons: q = b['text'] if q in result["qualities"]: continue count_before = len(captured_m3u8s) await page.evaluate(f"""(text) => {{ const btn = Array.from(document.querySelectorAll('button')).find(b => b.innerText.includes(text)); if (btn) btn.click(); }}""", q) for _ in range(15): if len(captured_m3u8s) > count_before: result["qualities"][q] = captured_m3u8s[-1] break await asyncio.sleep(0.4) await browser.close() except Exception as e: logger.error(f"Videasy extraction anomaly: {e}") if captured_m3u8s and not result["qualities"]: result["qualities"]["auto"] = captured_m3u8s[0] result["status"] = "success" if captured_m3u8s else "failed" return result # ── ROUTES ─────────────────────────────────────────────────────────────────── @app.route("/health") def health(): return jsonify({"status": "healthy", "timestamp": datetime.now().isoformat()}) @app.route("/details", methods=["GET"]) def get_details(): name = request.args.get("name", "").strip() code = request.args.get("code", "").strip().lower() try: tz_offset = float(request.args.get("tz", 5.5)) except Exception: tz_offset = 5.5 if not name: return jsonify({"error": "Missing 'name' parameter."}), 400 player_url = ( f"https://cdnlivetv.tv/api/v1/channels/player/" f"?name={urllib.parse.quote(name)}&code={code}&user=cdnlivetv&plan=free" ) logo_url = None try: channels_data = load_channels().get("channels", []) for ch in channels_data: ch_name = ch.get("name") or ch.get("channel_name") or "" if ch_name.lower() == name.lower(): logo_url = ch.get("image") break except Exception as e: logger.warning(f"Could not load channel metadata: {e}") epg = {"live_now": None, "upcoming": []} epg_slug = name.lower().replace(" ", "-") epg_id = EPG_IDS.get(epg_slug) if not epg_id: logger.info(f"EPG ID not in mapping for '{name}', trying fallback search...") epg_id = get_first_epg_id(name) if epg_id: def _extract_epg_list(resp_data): if not resp_data: return [] if isinstance(resp_data, list) and len(resp_data) > 0: return resp_data[0].get("epg_list", []) if isinstance(resp_data, dict): return resp_data.get("epg_list", []) return [] def parse_dt_utc(s): try: dt = datetime.fromisoformat(s.replace('Z', '+00:00')) return dt.astimezone(timezone.utc) except Exception: return datetime.min.replace(tzinfo=timezone.utc) try: ist_tz = timezone(timedelta(hours=5, minutes=30)) today_str = datetime.now(ist_tz).strftime("%Y%m%d") e_res = fetch_url(Config.EPG_API.format(id=epg_id, date=today_str)) e_res.raise_for_status() raw = _extract_epg_list(e_res.json()) logger.info(f"Today EPG ({today_str}): {len(raw)} programs for id={epg_id}") try: if raw: last_start = raw[-1].get("start_date", "") last_date = datetime.fromisoformat(last_start.replace('Z', '+00:00')).date() tomorrow_str = (last_date + timedelta(days=1)).strftime("%Y%m%d") else: tomorrow_str = (datetime.now(ist_tz) + timedelta(days=1)).strftime("%Y%m%d") t_res = fetch_url(Config.EPG_API.format(id=epg_id, date=tomorrow_str), timeout=20) tomorrow_list = _extract_epg_list(t_res.json()) if t_res.status_code == 200 else [] logger.info(f"Tomorrow EPG ({tomorrow_str}): {len(tomorrow_list)} programs for id={epg_id}") raw.extend(tomorrow_list) except Exception as te: logger.warning(f"Tomorrow EPG fetch failed for id={epg_id}: {te}") if raw: raw.sort(key=lambda x: parse_dt_utc(x.get("start_date", "1970-01-01T00:00:00+00:00"))) now = datetime.now(timezone.utc) current_idx = -1 for i, item in enumerate(raw): start_str = item.get("start_date") if not start_str: continue if parse_dt_utc(start_str) <= now: current_idx = i logger.info(f"current_idx={current_idx}, total programs={len(raw)}") if current_idx != -1: curr = raw[current_idx] next_item = raw[current_idx + 1] if (current_idx + 1) < len(raw) else None stop_time_str = next_item.get("start_date") if next_item else None progress_pct = 0 try: s_dt = parse_dt_utc(curr.get("start_date")) if stop_time_str: e_dt = parse_dt_utc(stop_time_str) total = (e_dt - s_dt).total_seconds() elapsed = (now - s_dt).total_seconds() if total > 0: progress_pct = min(100, max(0, int((elapsed / total) * 100))) except Exception: progress_pct = 0 epg["live_now"] = { "title": curr.get("title"), "description": curr.get("desc"), "start": curr.get("start_date"), "stop": stop_time_str, "time": f"{fmt_time(curr.get('start_date'), tz_offset)} - {fmt_time(stop_time_str, tz_offset)}", "progress_pct": progress_pct } for i in range(current_idx + 1, min(current_idx + 6, len(raw))): item = raw[i] item_stop = raw[i + 1].get("start_date") if (i + 1) < len(raw) else None epg["upcoming"].append({ "title": item.get("title"), "description": item.get("desc"), "start": item.get("start_date"), "stop": item_stop, "time": f"{fmt_time(item.get('start_date'), tz_offset)} - {fmt_time(item_stop, tz_offset)}" }) except Exception as e: logger.error(f"EPG Fetch error for id={epg_id}: {e}") return jsonify({ "name": name, "country": code.upper() if code else None, "logo_url": logo_url, "player_url": player_url, "epg_id": epg_id, "epg": epg, "timezone": "IST" if tz_offset == 5.5 else f"UTC{'+' if tz_offset >= 0 else ''}{tz_offset}" }) @app.route("/stream", methods=["GET"]) def get_stream(): name = request.args.get("name", "").strip() code = request.args.get("code", "").strip().lower() if not name: return jsonify({"error": "Missing 'name' parameter."}), 400 player_url = ( "https://cdnlivetv.tv/api/v1/channels/player/" f"?name={urllib.parse.quote(name)}&code={code}" "&user=cdnlivetv&plan=free" ) stream_url = None try: r = fetch_url(player_url, timeout=15) if r.status_code == 200: content_type = r.headers.get("Content-Type", "").lower() if "application/json" in content_type: logger.info("Primary API returned a JSON payload. Parsing directly...") json_data = r.json() stream_url = ( json_data.get("stream_url") or json_data.get("stream") or json_data.get("url") or json_data.get("data", {}).get("url") ) else: stream_url = extract_stream_url(r.text) if stream_url: logger.info(f"Successfully extracted stream URL via primary channel logic: {stream_url}") else: logger.warning(f"Primary API returned status {r.status_code}") except Exception as e: logger.warning(f"Primary extraction failed with error: {e}") # ── Fallback Method (Playwright) ────────────────────────────────────── if not stream_url: logger.info(f"Primary method failed for '{name}', processing Playwright fallback parameters...") # Ensure code is populated if missing from request args to avoid trailing '__' structure crash if not code: try: channels_data = load_channels().get("channels", []) for ch in channels_data: ch_name = ch.get("name") or ch.get("channel_name") or "" if ch_name.lower() == name.lower(): # Extract mapped code property from json schema matching the target name code = (ch.get("code") or ch.get("country_code") or "").strip().lower() if code: logger.info(f"Dynamically loaded country code variant '{code}' from internal channels registry for '{name}'") break except Exception as ec: logger.warning(f"Could not load fallback channel configuration metadata map: {ec}") # Ultimate baseline fallback assignment if code isn't provided or discovered in local registry if not code: code = "ca" logger.info(f"Defaulting country variable syntax to baseline variant '{code}' for target channel '{name}'") site_url = f"https://streamsports99.su/live-tv/{urllib.parse.quote(name)}__{code}" try: stream_url = asyncio.run(fallback_extract_m3u8(site_url)) except Exception as fe: logger.error(f"Playwright fallback failed: {fe}") if stream_url: return jsonify({ "name": name, "stream_url": stream_url, "headers": { "Referer": "https://cdnlivetv.tv/", "User-Agent": Config.PLAYBACK_UA, "sec-ch-ua-platform": '"Android"', "sec-ch-ua": '"Chromium";v="148", "Google Chrome";v="148", "Not/A)Brand";v="99"', "sec-ch-ua-mobile": "?1" } }) else: return jsonify({"error": "Could not extract stream URL via primary or fallback methods."}), 500 @app.route("/home", methods=["GET"]) def get_home(): try: data = load_channels() return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/event", methods=["GET"]) def get_event(): try: r = fetch_url(Config.EVENTS_API, timeout=15) r.raise_for_status() return jsonify(r.json()) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/search", methods=["GET"]) def search(): query = request.args.get("q", "").strip().lower() if not query: return jsonify({"error": "Missing 'q' parameter for search."}), 400 try: channels = load_channels().get("channels", []) results = [] for ch in channels: ch_name = ch.get("name") or ch.get("channel_name") or "" if query in ch_name.lower(): results.append(ch) return jsonify({ "query": query, "results": results, "total": len(results) }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/videasy", methods=["GET"]) def get_videasy(): tmdb_id = request.args.get("tmdbId", "").strip() if not tmdb_id: return jsonify({"error": "Missing 'tmdbId' parameter."}), 400 target_url = f"https://player.videasy.net/movie/{tmdb_id}" try: data = asyncio.run(extract_videasy_sources(target_url)) return jsonify(data) except Exception as e: logger.error(f"Videasy endpoint error: {e}") return jsonify({"status": "failed", "error": str(e)}), 500 # ── SERVER RUNNER ──────────────────────────────────────────────────────────── if __name__ == "__main__": if Config.DEBUG: logger.info(f"Starting server in DEBUG mode on port {Config.PORT}") app.run(host="0.0.0.0", port=Config.PORT, debug=True) else: from waitress import serve logger.info(f"Starting server in PRODUCTION mode (Waitress) on port {Config.PORT}") serve(app, host="0.0.0.0", port=Config.PORT)