| import asyncio |
| import urllib.parse |
| import json |
| from playwright.async_api import async_playwright |
| from bs4 import BeautifulSoup |
| import re |
| import os |
| import shutil |
| from pyvirtualdisplay import Display |
|
|
| DOMENA = "https://mrkaj.me" |
| EXT_PATH = "/app/dew_vpn" |
| EXT_ID = "pgpnoemmehpnealclfbkejocclofdbon" |
|
|
| class AsyncBrowserManager: |
| _instance = None |
|
|
| def __init__(self): |
| self.playwright = None |
| self.context = None |
| self.display = None |
| self._setup_done = False |
| self._lock = asyncio.Lock() |
|
|
| @classmethod |
| def get_instance(cls): |
| if cls._instance is None: |
| cls._instance = cls() |
| return cls._instance |
|
|
| async def get_screenshot(self): |
| if not self.context: return None |
| pages = self.context.pages |
| if pages: |
| for p in pages: |
| if "extension" in p.url: return await p.screenshot(type="png") |
| return await pages[-1].screenshot(type="png") |
| return None |
|
|
| async def start(self): |
| async with self._lock: |
| if self._setup_done: return |
| |
| user_data_dir = "/tmp/browser_profile" |
| shutil.rmtree(user_data_dir, ignore_errors=True) |
|
|
| print("[BROWSER] Spúšťam virtuálnu obrazovku...") |
| try: |
| self.display = Display(visible=0, size=(1280, 1024)) |
| self.display.start() |
| except Exception as e: |
| print(f"[Xvfb ERROR] {e}") |
|
|
| print("[BROWSER] Spúšťam Chrome...") |
| self.playwright = await async_playwright().start() |
| print("[BROWSER] Spúšťam Chrome krok 1...") |
| launch_args = [ |
| "--no-sandbox", |
| "--disable-setuid-sandbox", |
| "--disable-dev-shm-usage", |
| "--disable-blink-features=AutomationControlled", |
| f"--disable-extensions-except={EXT_PATH}", |
| f"--load-extension={EXT_PATH}" |
| ] |
|
|
| self.context = await self.playwright.chromium.launch_persistent_context( |
| user_data_dir, |
| headless=False, |
| args=launch_args, |
| ignore_default_args=["--enable-automation"], |
| bypass_csp=True, |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" |
| ) |
| print("[BROWSER] Chrome spustený") |
| await asyncio.sleep(12) |
| await self.activate_dewvpn() |
| self._setup_done = True |
|
|
| async def activate_dewvpn(self): |
| print("[VPN] Aktivujem DewVPN...") |
| page = await self.context.new_page() |
| try: |
| await page.goto(f"chrome-extension://{EXT_ID}/popup.html", timeout=30000) |
| await asyncio.sleep(5) |
| |
| |
| |
| try: |
| |
| await page.click(".current-location, .location-selector") |
| await asyncio.sleep(2) |
| except: pass |
| await page.fill("input[placeholder*='Search']", "Slovakia") |
| await asyncio.sleep(2) |
| |
| await page.click("text=Slovakia") |
| print("[VPN] DewVPN: Kliknuté na Slovensko!") |
| await asyncio.sleep(8) |
| |
| except Exception as e: |
| print(f"[VPN ERROR] {e}") |
| |
| finally: |
| await page.close() |
|
|
| async def get_page(self): |
| if not self._setup_done: await self.start() |
| return await self.context.new_page() |
|
|
| manager = AsyncBrowserManager.get_instance() |
|
|
| |
|
|
| async def search_movies(query): |
| print(f"[SEARCH] Hľadám: {query}") |
| page = await manager.get_page() |
| try: |
| url = f"{DOMENA}/se/j/json?q={urllib.parse.quote(query)}" |
| await page.goto(url, wait_until="networkidle", timeout=45000) |
| content = await page.inner_text("body") |
| return json.loads(content) |
| except Exception as e: |
| print(f"[SEARCH ERROR] {e}") |
| return [] |
| finally: await page.close() |
|
|
| async def get_details(slug, media_type): |
| print(f"[DETAILS] Info: {slug}") |
| page = await manager.get_page() |
| url = f"{DOMENA}/p/{slug}" if media_type in ["movie", "film"] else f"{DOMENA}/tv/{slug}/S01E01" |
| res = {"languages": [], "seasons": []} |
| try: |
| await page.goto(url, wait_until="domcontentloaded", timeout=45000) |
| soup = BeautifulSoup(await page.content(), "html.parser") |
| kw = soup.find("div", class_="movie-keyword") |
| if kw: res["languages"] = [p.get_text(strip=True) for p in kw.find_all(recursive=False)] |
| |
| matches = re.findall(r'Séria\s+(\d+)', soup.get_text()) |
| for seria in sorted(set(matches), key=int): |
| res["seasons"].append({"season": int(seria), "episodes": 0}) |
| except Exception as e: print(f"[DETAILS ERROR] {e}") |
| finally: await page.close() |
| return res |
|
|
| async def get_stream_url(slug, media_type, season=None, episode=None, lang=None, source_idx=0): |
| print(f"[STREAM] Skenujem: {slug}") |
| page = await manager.get_page() |
| url = f"{DOMENA}/p/{slug}" if media_type in ["movie", "film"] else f"{DOMENA}/tv/{slug}/S{int(season):02d}E{int(episode):02d}" |
| |
| params = [f"source={source_idx}"] |
| if lang: params.append(f"lng={urllib.parse.quote(lang)}") |
| url += "?" + "&".join(params) |
| |
| found = {"stream": None, "vtt": None} |
| |
| async def zachyt_request(request): |
| if ".m3u8" in request.url and not found["stream"]: found["stream"] = request.url |
| if request.url.endswith(".vtt") and not found["vtt"]: found["vtt"] = request.url |
|
|
| page.on("request", zachyt_request) |
| |
| try: |
| await page.goto(url, wait_until="domcontentloaded", timeout=60000) |
| for _ in range(20): |
| if found["stream"]: break |
| await asyncio.sleep(1) |
| |
| if not found["stream"]: |
| iframe = page.locator("iframe#frm, video").first |
| if await iframe.count() > 0: |
| await iframe.click(force=True) |
| for _ in range(10): |
| if found["stream"]: break |
| await asyncio.sleep(1) |
| except Exception as e: print(f"[STREAM ERROR] {e}") |
| finally: await page.close() |
| return found |
|
|