Spaces:
Building
Building
| import os | |
| import base64 | |
| import asyncio # Required for sleep | |
| import uuid | |
| from pathlib import Path | |
| from fastapi import APIRouter | |
| from fastapi.responses import HTMLResponse | |
| from playwright.async_api import async_playwright | |
| from playwright_stealth import Stealth | |
| router = APIRouter() | |
| async def download_deduplicated(tiktok_url): | |
| OUTPUT_DIR = Path("outputs") | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True, args=["--no-sandbox"]) | |
| context = await browser.new_context() | |
| page = await context.new_page() | |
| await Stealth().apply_stealth_async(page) | |
| captured_links = {} | |
| def handle_request(request): | |
| print(f"Request: {request.url[:60]}...") | |
| url = request.url | |
| if "video/tos" in url and "tiktok" in url: | |
| if url not in captured_links: | |
| captured_links[url] = False | |
| page.on("request", handle_request) | |
| # 5. Await navigation | |
| response = await page.goto(tiktok_url) | |
| print(f"Page loaded with status: {response.status}") | |
| DEBUG_PATH = OUTPUT_DIR / "debug_tiktok.png" | |
| await page.screenshot(path=str(DEBUG_PATH), full_page=True) | |
| print(f"Debug screenshot saved to: {DEBUG_PATH}") | |
| print("Listening for unique streams... (6s)") | |
| # 6. IMPORTANT: Use async sleep so the server doesn't hang | |
| await asyncio.sleep(6) | |
| unique_urls = list(captured_links.keys()) | |
| print(f"Found {len(unique_urls)} unique candidate links.") | |
| for i, url in enumerate(unique_urls): | |
| if i > 0: | |
| break | |
| print(f"[{i+1}/{len(unique_urls)}] Checking: {url}...") | |
| js_fetch = f""" | |
| async () => {{ | |
| try {{ | |
| const r = await fetch("{url}"); | |
| const b = await r.blob(); | |
| if (b.size === 0) return "EMPTY"; | |
| return new Promise(res => {{ | |
| const reader = new FileReader(); | |
| reader.onloadend = () => res(reader.result.split(',')[1]); | |
| reader.readAsDataURL(b); | |
| }}); | |
| }} catch {{ return null; }} | |
| }} | |
| """ | |
| # 7. Await the JS evaluation | |
| b64_result = await page.evaluate(js_fetch) | |
| if b64_result and b64_result != "EMPTY": | |
| data = base64.b64decode(b64_result) | |
| size_mb = len(data)/1024/1024 | |
| if size_mb > 0.1: | |
| job_id = str(uuid.uuid4())[:8] | |
| filename = f"input_{job_id}.mp4" | |
| filepath = OUTPUT_DIR / filename | |
| # Note: Writing files is technically sync, but fine for small files | |
| with open(filepath, "wb") as f: | |
| f.write(data) | |
| print(f" -> SUCCESS: Saved {filename} ({size_mb:.2f} MB)") | |
| else: | |
| print(f" -> Skipped: File too small ({size_mb:.4f} MB)") | |
| else: | |
| print(f" -> Skipped: Empty or Failed response.") | |
| await browser.close() | |
| print(f"\nDone!") | |
| target = "https://www.tiktok.com/@_luna.rayne_/video/7582251394883718422" | |
| # target = "https://www.tiktok.com/@_luna.rayne_/video/7597442279355223318" | |
| async def read_tt(): | |
| # 8. Await the function call | |
| await download_deduplicated(target) | |
| return "<h1>Processing complete. Check your logs!</h1>" |