LiveTv_API / app.py
Sanjeev1412's picture
Update app.py
144a976 verified
Raw
History Blame Contribute Delete
28 kB
"""
LiveTV Scraper - Production Ready Backend
Aggregates channel lists, EPG, and extracts direct stream URLs.
"""
import os
import re
import base64
import logging
import threading
import urllib.parse
from datetime import datetime, timezone, timedelta
import requests
import urllib3
import dns.resolver
import json
import asyncio
from playwright.async_api import async_playwright
from flask import Flask, request, jsonify
from flask_cors import CORS
import urllib3.util.connection as urllib3_connection
# ── CONFIGURATION ────────────────────────────────────────────────────────────
class Config:
PORT = int(os.environ.get("PORT", 7860))
# Change to False for true production deployment
DEBUG = os.environ.get("DEBUG", "true").lower() == "true"
CHANNELS_FILE = "channels.json"
EPG_API = "https://epg.pw/api/epg.json?channel_id={id}&date={date}&lang=en&timezone=QXNpYS9Lb2xrYXRh"
EVENTS_API = "https://api.cdnlivetv.tv/api/v1/events/sports/?user=cdnlivetv&plan=free"
DEFAULT_UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
PLAYBACK_UA = "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Mobile Safari/537.36"
# ── LOGGING ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("LiveTV")
# ── SSL & DNS HANDLING ───────────────────────────────────────────────────────
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
_dns_cache = {}
_dns_lock = threading.Lock()
def resolve_host(hostname):
"""Resolve hostname using custom DNS resolvers with local caching."""
with _dns_lock:
if hostname in _dns_cache:
return _dns_cache[hostname]
try:
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8', '1.1.1.1'] # Use reliable DNS
answers = resolver.resolve(hostname, 'A')
ip = str(answers[0])
with _dns_lock:
_dns_cache[hostname] = ip
return ip
except Exception as e:
logger.error(f"DNS resolution failed for {hostname}: {e}")
return None
# Monkey-patch urllib3 to handle environments with broken system DNS
# This preserves SNI while forcing connection to the resolved IP
_original_create_connection = urllib3_connection.create_connection
def patched_create_connection(address, *args, **kwargs):
host, port = address
# If host is not an IP, resolve it manually
if not re.match(r'^\d{1,3}(\.\d{1,3}){3}$', host):
ip = resolve_host(host)
if ip:
return _original_create_connection((ip, port), *args, **kwargs)
return _original_create_connection(address, *args, **kwargs)
urllib3_connection.create_connection = patched_create_connection
# ── APP SETUP ────────────────────────────────────────────────────────────────
app = Flask(__name__)
CORS(app) # Allow cross-origin requests for the API
EPG_IDS = {
"cnn": 464857, "fox-news": 465372, "msnbc": 465087,
"cbc-news": 470695, "cp24": 470621, "bbc-news": 12162,
"sky-news": 12069, "france-24": 55716, "tf1": 443174,
"al-jazeera": 12532, "al-arabiya": 55875, "ndtv": 464202,
"republic-tv": 463873, "aaj-tak": 464262, "geo-news": 12349,
"ary-news": 12348, "pogo": 464673, "nick": 465135, "willow-cricket": 465010,
"espn": 464947, "sky-sports-cricket": 12059, "sky-sports-f1": 12061,
}
# ── HELPERS ──────────────────────────────────────────────────────────────────
def load_channels():
"""Load channel data from the local JSON file."""
try:
if os.path.exists(Config.CHANNELS_FILE):
with open(Config.CHANNELS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {"channels": []}
except Exception as e:
logger.error(f"Error loading {Config.CHANNELS_FILE}: {e}")
return {"channels": []}
def fetch_url(url, timeout=15, extra_headers=None):
"""Unified fetcher with production headers and logging."""
headers = {
"User-Agent": Config.DEFAULT_UA,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Upgrade-Insecure-Requests": "1",
}
if extra_headers:
headers.update(extra_headers)
try:
r = requests.get(url, headers=headers, verify=False, timeout=timeout)
logger.info(f"FETCH {url} -> {r.status_code}")
return r
except Exception as e:
logger.error(f"FETCH ERROR {url}: {e}")
raise
def fmt_time(dt_str, tz_offset=5.5):
"""Format UTC timestamp to 12h format with a specific timezone offset."""
if not dt_str: return "N/A"
try:
clean_dt = dt_str.replace('Z', '+00:00')
dt = datetime.fromisoformat(clean_dt)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
target_tz = timezone(timedelta(hours=float(tz_offset)))
local_dt = dt.astimezone(target_tz)
time_part = local_dt.strftime("%I:%M %p").lstrip('0')
tz_label = "IST" if tz_offset == 5.5 else f"UTC{'+' if tz_offset >= 0 else ''}{tz_offset}"
return f"{time_part} {tz_label}"
except Exception:
return "N/A"
def unpack_js(h, n_str, t, e):
"""De-obfuscate the custom-packed string used in the player page."""
try:
delimiter = n_str[e]
parts = h.split(delimiter)
decoded = ""
for s in parts:
if not s: continue
val = 0
for char in s:
idx = n_str.find(char)
if idx != -1:
val = val * e + idx
decoded += chr(val - t)
return decoded
except Exception:
return ""
def decode_custom_b64(s):
"""Mimic the Base64 decoding function used in the provider's obfuscated scripts."""
try:
s = s.replace('-', '+').replace('_', '/')
while len(s) % 4:
s += '='
return base64.b64decode(s).decode('utf-8', errors='ignore')
except Exception:
return ""
_epg_id_cache = {}
_epg_id_lock = threading.Lock()
def get_first_epg_id(channel_name):
"""Fallback method to search for an EPG ID on epg.pw."""
if not channel_name: return None
with _epg_id_lock:
if channel_name in _epg_id_cache:
return _epg_id_cache[channel_name]
headers = {"User-Agent": Config.DEFAULT_UA}
variations = [channel_name]
cleaned = re.sub(r'\s+(TV|HD|SD|CHANNEL)\b', '', channel_name, flags=re.IGNORECASE).strip()
if cleaned and cleaned != channel_name:
variations.append(cleaned)
result_id = None
for name_var in variations:
try:
encoded = base64.b64encode(name_var.encode()).decode()
url = f"https://epg.pw/search/channel/{urllib.parse.quote(encoded)}.html?lang=en"
logger.info(f"Searching EPG ID for '{name_var}'...")
r = requests.get(url, headers=headers, timeout=15, verify=False)
if r.status_code == 200:
match = re.search(r'/last/(\d+)\.html', r.text)
if match:
result_id = match.group(1)
logger.info(f"Found EPG ID {result_id} for '{name_var}'")
break
except Exception as e:
logger.error(f"EPG search variation '{name_var}' failed: {e}")
with _epg_id_lock:
_epg_id_cache[channel_name] = result_id
return result_id
def extract_stream_url(html: str) -> str | None:
"""Extract and reconstruct the .m3u8 URL from the player HTML or raw text."""
raw_match = re.search(r'(https?://[^\s"\'\`]+\.m3u8[^\s"\'\`]*)', html)
if raw_match:
logger.info("Found direct raw .m3u8 link in response payload")
return raw_match.group(1)
pattern = r'\}\s*\(\s*(["\'])(?P<h>.*?)\1\s*,\s*(?P<u>\d+)\s*,\s*(["\'])(?P<n>.*?)\4\s*,\s*(?P<t>\d+)\s*,\s*(?P<e>\d+)'
match = re.search(pattern, html, re.DOTALL)
if not match: return None
h, n, t, e = match.group('h'), match.group('n'), int(match.group('t')), int(match.group('e'))
unpacked = unpack_js(h, n, t, e)
if not unpacked: return None
consts = {}
for m in re.finditer(r"const\s+(\w+)\s*=\s*'([^']+)';", unpacked):
consts[m.group(1)] = m.group(2)
source_match = re.search(r"source:\s*{\s*src:\s*(\w+)", unpacked)
if not source_match: return None
var_name = source_match.group(1)
def_match = re.search(rf"(?:const|let|var)\s+{var_name}\s*=\s*(.+?);", unpacked)
if not def_match: return None
parts_line = def_match.group(1)
decoder_match = re.search(r"(\w+)\(\w+\)", parts_line)
if not decoder_match: return None
decoder_name = decoder_match.group(1)
var_matches = re.findall(rf"{decoder_name}\((\w+)\)", parts_line)
stream_url = ""
for v in var_matches:
if v in consts:
stream_url += decode_custom_b64(consts[v])
return stream_url if stream_url.startswith("http") else None
async def fallback_extract_m3u8(stream_url: str) -> str | None:
"""Fallback method using Playwright to intercept m3u8 requests from the site."""
m3u8_found = asyncio.Event()
result = {}
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent=Config.PLAYBACK_UA,
extra_http_headers={
'Accept-Language': 'en-US,en;q=0.9',
'Referer': 'https://streamsports99.su/',
}
)
page = await context.new_page()
async def on_request(request):
if '.m3u8' in request.url and not m3u8_found.is_set():
logger.info(f"FALLBACK Intercepted: {request.url}")
result['m3u8'] = request.url
m3u8_found.set()
page.on('request', on_request)
logger.info(f"FALLBACK Loading: {stream_url}")
await page.goto(stream_url, wait_until='domcontentloaded', timeout=30000)
try:
await asyncio.wait_for(m3u8_found.wait(), timeout=15)
except asyncio.TimeoutError:
pass
if 'm3u8' not in result:
logger.info("FALLBACK No network intercept β€” scanning DOM...")
src = await page.evaluate('''() => {
const v = document.querySelector("video[src]");
return v ? v.src : null;
}''')
if src and '.m3u8' in src:
result['m3u8'] = src
await browser.close()
except Exception as e:
logger.error(f"FALLBACK Playwright error: {e}")
return result.get('m3u8')
async def extract_videasy_sources(url: str) -> dict:
"""Multi-quality extraction for player.videasy.net targets."""
logger.info(f"πŸš€ Starting Videasy extraction for: {url}")
playback_headers = {
"User-Agent": Config.PLAYBACK_UA,
"Origin": "https://player.videasy.net",
"Referer": "https://player.videasy.net/"
}
result = {
"status": "pending",
"qualities": {},
"subtitles": [],
"headers": playback_headers
}
captured_m3u8s = []
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent=playback_headers["User-Agent"],
is_mobile=True,
viewport={'width': 360, 'height': 800}
)
page = await context.new_page()
async def handle_response(response):
res_url = response.url
if ".m3u8" in res_url and "index.m3u8" in res_url:
if res_url not in captured_m3u8s:
logger.info(f"βœ… Captured m3u8: {res_url}")
captured_m3u8s.append(res_url)
if ".vtt" in res_url:
label = res_url.split('/')[-1].replace('.vtt', '')
if not any(s["src_url"] == res_url for s in result["subtitles"]):
logger.info(f"βœ… Captured subtitle: {res_url}")
result["subtitles"].append({"label": label, "src_url": res_url})
page.on("response", handle_response)
await page.goto(url, wait_until="networkidle", timeout=60000)
await page.mouse.click(180, 400)
await asyncio.sleep(4)
await page.mouse.click(320, 750)
await asyncio.sleep(1)
try:
trigger = await page.wait_for_selector("button[role='tab']:has-text('Quality')", timeout=3000)
if trigger: await page.evaluate("(el) => el.click()", trigger)
except: pass
quality_buttons = await page.evaluate("""() => {
const buttons = Array.from(document.querySelectorAll('button'));
return buttons
.filter(b => /^\\d{3,4}p$/.test(b.innerText.trim().split('\\n')[0]))
.map(b => ({
text: b.innerText.trim().split('\\n')[0],
isActive: b.className.includes('bg-primary')
}));
}""")
for b in quality_buttons:
if b['isActive']:
q = b['text']
if captured_m3u8s:
result["qualities"][q] = captured_m3u8s[0]
break
for b in quality_buttons:
q = b['text']
if q in result["qualities"]: continue
count_before = len(captured_m3u8s)
await page.evaluate(f"""(text) => {{
const btn = Array.from(document.querySelectorAll('button')).find(b => b.innerText.includes(text));
if (btn) btn.click();
}}""", q)
for _ in range(15):
if len(captured_m3u8s) > count_before:
result["qualities"][q] = captured_m3u8s[-1]
break
await asyncio.sleep(0.4)
await browser.close()
except Exception as e:
logger.error(f"Videasy extraction anomaly: {e}")
if captured_m3u8s and not result["qualities"]:
result["qualities"]["auto"] = captured_m3u8s[0]
result["status"] = "success" if captured_m3u8s else "failed"
return result
# ── ROUTES ───────────────────────────────────────────────────────────────────
@app.route("/health")
def health():
return jsonify({"status": "healthy", "timestamp": datetime.now().isoformat()})
@app.route("/details", methods=["GET"])
def get_details():
name = request.args.get("name", "").strip()
code = request.args.get("code", "").strip().lower()
try:
tz_offset = float(request.args.get("tz", 5.5))
except Exception:
tz_offset = 5.5
if not name:
return jsonify({"error": "Missing 'name' parameter."}), 400
player_url = (
f"https://cdnlivetv.tv/api/v1/channels/player/"
f"?name={urllib.parse.quote(name)}&code={code}&user=cdnlivetv&plan=free"
)
logo_url = None
try:
channels_data = load_channels().get("channels", [])
for ch in channels_data:
ch_name = ch.get("name") or ch.get("channel_name") or ""
if ch_name.lower() == name.lower():
logo_url = ch.get("image")
break
except Exception as e:
logger.warning(f"Could not load channel metadata: {e}")
epg = {"live_now": None, "upcoming": []}
epg_slug = name.lower().replace(" ", "-")
epg_id = EPG_IDS.get(epg_slug)
if not epg_id:
logger.info(f"EPG ID not in mapping for '{name}', trying fallback search...")
epg_id = get_first_epg_id(name)
if epg_id:
def _extract_epg_list(resp_data):
if not resp_data:
return []
if isinstance(resp_data, list) and len(resp_data) > 0:
return resp_data[0].get("epg_list", [])
if isinstance(resp_data, dict):
return resp_data.get("epg_list", [])
return []
def parse_dt_utc(s):
try:
dt = datetime.fromisoformat(s.replace('Z', '+00:00'))
return dt.astimezone(timezone.utc)
except Exception:
return datetime.min.replace(tzinfo=timezone.utc)
try:
ist_tz = timezone(timedelta(hours=5, minutes=30))
today_str = datetime.now(ist_tz).strftime("%Y%m%d")
e_res = fetch_url(Config.EPG_API.format(id=epg_id, date=today_str))
e_res.raise_for_status()
raw = _extract_epg_list(e_res.json())
logger.info(f"Today EPG ({today_str}): {len(raw)} programs for id={epg_id}")
try:
if raw:
last_start = raw[-1].get("start_date", "")
last_date = datetime.fromisoformat(last_start.replace('Z', '+00:00')).date()
tomorrow_str = (last_date + timedelta(days=1)).strftime("%Y%m%d")
else:
tomorrow_str = (datetime.now(ist_tz) + timedelta(days=1)).strftime("%Y%m%d")
t_res = fetch_url(Config.EPG_API.format(id=epg_id, date=tomorrow_str), timeout=20)
tomorrow_list = _extract_epg_list(t_res.json()) if t_res.status_code == 200 else []
logger.info(f"Tomorrow EPG ({tomorrow_str}): {len(tomorrow_list)} programs for id={epg_id}")
raw.extend(tomorrow_list)
except Exception as te:
logger.warning(f"Tomorrow EPG fetch failed for id={epg_id}: {te}")
if raw:
raw.sort(key=lambda x: parse_dt_utc(x.get("start_date", "1970-01-01T00:00:00+00:00")))
now = datetime.now(timezone.utc)
current_idx = -1
for i, item in enumerate(raw):
start_str = item.get("start_date")
if not start_str:
continue
if parse_dt_utc(start_str) <= now:
current_idx = i
logger.info(f"current_idx={current_idx}, total programs={len(raw)}")
if current_idx != -1:
curr = raw[current_idx]
next_item = raw[current_idx + 1] if (current_idx + 1) < len(raw) else None
stop_time_str = next_item.get("start_date") if next_item else None
progress_pct = 0
try:
s_dt = parse_dt_utc(curr.get("start_date"))
if stop_time_str:
e_dt = parse_dt_utc(stop_time_str)
total = (e_dt - s_dt).total_seconds()
elapsed = (now - s_dt).total_seconds()
if total > 0:
progress_pct = min(100, max(0, int((elapsed / total) * 100)))
except Exception:
progress_pct = 0
epg["live_now"] = {
"title": curr.get("title"),
"description": curr.get("desc"),
"start": curr.get("start_date"),
"stop": stop_time_str,
"time": f"{fmt_time(curr.get('start_date'), tz_offset)} - {fmt_time(stop_time_str, tz_offset)}",
"progress_pct": progress_pct
}
for i in range(current_idx + 1, min(current_idx + 6, len(raw))):
item = raw[i]
item_stop = raw[i + 1].get("start_date") if (i + 1) < len(raw) else None
epg["upcoming"].append({
"title": item.get("title"),
"description": item.get("desc"),
"start": item.get("start_date"),
"stop": item_stop,
"time": f"{fmt_time(item.get('start_date'), tz_offset)} - {fmt_time(item_stop, tz_offset)}"
})
except Exception as e:
logger.error(f"EPG Fetch error for id={epg_id}: {e}")
return jsonify({
"name": name,
"country": code.upper() if code else None,
"logo_url": logo_url,
"player_url": player_url,
"epg_id": epg_id,
"epg": epg,
"timezone": "IST" if tz_offset == 5.5 else f"UTC{'+' if tz_offset >= 0 else ''}{tz_offset}"
})
@app.route("/stream", methods=["GET"])
def get_stream():
name = request.args.get("name", "").strip()
code = request.args.get("code", "").strip().lower()
if not name:
return jsonify({"error": "Missing 'name' parameter."}), 400
player_url = (
"https://cdnlivetv.tv/api/v1/channels/player/"
f"?name={urllib.parse.quote(name)}&code={code}"
"&user=cdnlivetv&plan=free"
)
stream_url = None
try:
r = fetch_url(player_url, timeout=15)
if r.status_code == 200:
content_type = r.headers.get("Content-Type", "").lower()
if "application/json" in content_type:
logger.info("Primary API returned a JSON payload. Parsing directly...")
json_data = r.json()
stream_url = (
json_data.get("stream_url") or
json_data.get("stream") or
json_data.get("url") or
json_data.get("data", {}).get("url")
)
else:
stream_url = extract_stream_url(r.text)
if stream_url:
logger.info(f"Successfully extracted stream URL via primary channel logic: {stream_url}")
else:
logger.warning(f"Primary API returned status {r.status_code}")
except Exception as e:
logger.warning(f"Primary extraction failed with error: {e}")
# ── Fallback Method (Playwright) ──────────────────────────────────────
if not stream_url:
logger.info(f"Primary method failed for '{name}', processing Playwright fallback parameters...")
# Ensure code is populated if missing from request args to avoid trailing '__' structure crash
if not code:
try:
channels_data = load_channels().get("channels", [])
for ch in channels_data:
ch_name = ch.get("name") or ch.get("channel_name") or ""
if ch_name.lower() == name.lower():
# Extract mapped code property from json schema matching the target name
code = (ch.get("code") or ch.get("country_code") or "").strip().lower()
if code:
logger.info(f"Dynamically loaded country code variant '{code}' from internal channels registry for '{name}'")
break
except Exception as ec:
logger.warning(f"Could not load fallback channel configuration metadata map: {ec}")
# Ultimate baseline fallback assignment if code isn't provided or discovered in local registry
if not code:
code = "ca"
logger.info(f"Defaulting country variable syntax to baseline variant '{code}' for target channel '{name}'")
site_url = f"https://streamsports99.su/live-tv/{urllib.parse.quote(name)}__{code}"
try:
stream_url = asyncio.run(fallback_extract_m3u8(site_url))
except Exception as fe:
logger.error(f"Playwright fallback failed: {fe}")
if stream_url:
return jsonify({
"name": name,
"stream_url": stream_url,
"headers": {
"Referer": "https://cdnlivetv.tv/",
"User-Agent": Config.PLAYBACK_UA,
"sec-ch-ua-platform": '"Android"',
"sec-ch-ua": '"Chromium";v="148", "Google Chrome";v="148", "Not/A)Brand";v="99"',
"sec-ch-ua-mobile": "?1"
}
})
else:
return jsonify({"error": "Could not extract stream URL via primary or fallback methods."}), 500
@app.route("/home", methods=["GET"])
def get_home():
try:
data = load_channels()
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/event", methods=["GET"])
def get_event():
try:
r = fetch_url(Config.EVENTS_API, timeout=15)
r.raise_for_status()
return jsonify(r.json())
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/search", methods=["GET"])
def search():
query = request.args.get("q", "").strip().lower()
if not query:
return jsonify({"error": "Missing 'q' parameter for search."}), 400
try:
channels = load_channels().get("channels", [])
results = []
for ch in channels:
ch_name = ch.get("name") or ch.get("channel_name") or ""
if query in ch_name.lower():
results.append(ch)
return jsonify({
"query": query,
"results": results,
"total": len(results)
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/videasy", methods=["GET"])
def get_videasy():
tmdb_id = request.args.get("tmdbId", "").strip()
if not tmdb_id:
return jsonify({"error": "Missing 'tmdbId' parameter."}), 400
target_url = f"https://player.videasy.net/movie/{tmdb_id}"
try:
data = asyncio.run(extract_videasy_sources(target_url))
return jsonify(data)
except Exception as e:
logger.error(f"Videasy endpoint error: {e}")
return jsonify({"status": "failed", "error": str(e)}), 500
# ── SERVER RUNNER ────────────────────────────────────────────────────────────
if __name__ == "__main__":
if Config.DEBUG:
logger.info(f"Starting server in DEBUG mode on port {Config.PORT}")
app.run(host="0.0.0.0", port=Config.PORT, debug=True)
else:
from waitress import serve
logger.info(f"Starting server in PRODUCTION mode (Waitress) on port {Config.PORT}")
serve(app, host="0.0.0.0", port=Config.PORT)