File size: 6,217 Bytes
df57c85 ba3a9cc df57c85 ba3a9cc df57c85 ba3a9cc df57c85 ba3a9cc df57c85 ba3a9cc 0fa3414 ba3a9cc 0fa3414 ba3a9cc 0fa3414 ba3a9cc 0fa3414 ba3a9cc df57c85 ba3a9cc df57c85 ba3a9cc 0fa3414 df57c85 0fa3414 ba3a9cc 0fa3414 ba3a9cc a37b010 ba3a9cc a37b010 ba3a9cc a37b010 804e7ad a37b010 ba3a9cc a37b010 ba3a9cc a37b010 ba3a9cc a37b010 ba3a9cc a37b010 ba3a9cc dd2d50e 53398ef dd2d50e ba3a9cc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | import asyncio
import urllib.parse
import json
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
import re
from xray_manager import setup_and_start_xray
DOMENA = "https://mrkaj.me"
class XrayBrowserManager:
_instance = None
def __init__(self):
self.playwright = None
self.context = None
self._setup_done = False
self._lock = asyncio.Lock()
@classmethod
def get_instance(cls):
if cls._instance is None: cls._instance = cls()
return cls._instance
async def start(self):
async with self._lock:
if self._setup_done: return
setup_and_start_xray()
self.playwright = await async_playwright().start()
self.context = await self.playwright.chromium.launch_persistent_context(
"/tmp/xray_profile",
headless=True,
proxy={"server": "socks5://127.0.0.1:10808"},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
# --- OVERENIE IP ADRESY ---
test_page = await self.context.new_page()
try:
print("[DEBUG] Overujem IP cez tunel...")
await test_page.goto("https://api.ipify.org", timeout=15000)
ip = await test_page.inner_text("body")
print(f"[DEBUG] Tvoja IP v tuneli je: {ip}")
except Exception as e:
print(f"[DEBUG ERROR] Tunel nefunguje: {e}")
finally:
await test_page.close()
# ---------------------------
self._setup_done = True
async def get_page(self):
if not self._setup_done: await self.start()
return await self.context.new_page()
manager = XrayBrowserManager.get_instance()
async def search_movies(query):
page = await manager.get_page()
try:
url = f"{DOMENA}/se/j/json?q={urllib.parse.quote(query)}"
print(f"[TUNNEL] Vyhľadávam: {query}")
# Otestujeme aj klasickú HTML stránku, či nás pustí
await page.goto(url, wait_until="networkidle", timeout=30000)
# Získame surový obsah
raw_content = await page.content()
print(f"[DEBUG] Surová dĺžka obsahu: {len(raw_content)}")
if len(raw_content) < 200:
print(f"[DEBUG] POZOR! Web vrátil podozrivo málo dát: {raw_content}")
content = await page.evaluate("() => document.body.innerText")
return json.loads(content)
except Exception as e:
print(f"[SEARCH ERROR] {e}")
return []
finally: await page.close()
async def get_details(slug, media_type):
page = await manager.get_page()
url = f"{DOMENA}/p/{slug}" if media_type in ["movie", "film"] else f"{DOMENA}/tv/{slug}/S01E01"
res = {"languages": [], "seasons": []}
try:
await page.goto(url, timeout=30000)
soup = BeautifulSoup(await page.content(), "html.parser")
kw = soup.find("div", class_="movie-keyword")
if kw: res["languages"] = [p.get_text(strip=True) for p in kw.find_all(recursive=False)]
matches = re.findall(r'Séria\s+(\d+)', soup.get_text())
for s in sorted(set(matches), key=int): res["seasons"].append({"season": int(s), "episodes": 0})
except: pass
finally: await page.close()
return res
async def get_stream_url(slug, media_type, season=None, episode=None, lang=None, source_idx=0):
print(f"[STREAM] Skenujem stream: {slug} (Zdroj: {source_idx})")
page = await manager.get_page()
# Príprava URL
url = f"{DOMENA}/p/{slug}" if media_type in ["movie", "film"] else f"{DOMENA}/tv/{slug}/S{int(season):02d}E{int(episode):02d}"
url += f"?source={source_idx}"
if lang: url += f"&lng={urllib.parse.quote(lang)}"
found = {"stream": None, "vtt": None}
# Interceptor na zachytávanie sieťovej komunikácie
async def handle_request(request):
u = request.url
if (".m3u8" in u or ".mp4" in u) and not found["stream"]:
if "master.m3u8" not in u:
# Filtrujeme reklamy, chceme len reálne streamy
if "ads" not in u and "google" not in u:
print(f"[ZACHYTENÉ VIDEO] {u[:80]}...")
found["stream"] = u
if u.endswith(".vtt") and not found["vtt"]:
found["vtt"] = u
page.on("request", handle_request)
try:
# 1. Načítame stránku (predĺžený timeout na 60s kvôli tunelu)
await page.goto(url, wait_until="domcontentloaded", timeout=60000)
# 2. Skúsime počkať, či sa stream nespustí sám
for _ in range(15):
if found["stream"]: break
await asyncio.sleep(1)
# 3. Ak stále nič, skúsime KLIKNÚŤ na stred prehrávača
if not found["stream"]:
print("[STREAM] Video sa nenačítalo samo, skúšam KLIK na stred...")
# Hľadáme iframe alebo video element
locators = ["iframe#frm", "iframe[src*='api']", "video", ".play-button"]
for loc in locators:
target = page.locator(loc).first
if await target.count() > 0:
try:
# Klikneme presne na stred elementu
await target.click(force=True, timeout=5000)
await asyncio.sleep(2)
if found["stream"]: break
except: continue
# 4. Finálne čakanie
for _ in range(15):
if found["stream"]: break
await asyncio.sleep(1)
except Exception as e:
print(f"[STREAM ERROR] {e}")
finally:
await page.close()
return found
def zachyt(req, found):
if ".m3u8" in req.url and not found["stream"]:
print("[Zachytený stream neoverený]",req.url)
print("[stav zoznamu]",found)
if "master.m3u8" not in req.url:
found["stream"] = req.url
if req.url.endswith(".vtt") and not found["vtt"]: found["vtt"] = req.url
|