Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from playwright.sync_api import sync_playwright | |
| import re | |
| app = FastAPI() | |
| # --- Configuration --- | |
| # You MUST confirm the exact text on the download button. | |
| # Assuming the text is exactly "Download" | |
| DOWNLOAD_BUTTON_SELECTOR = 'button:has-text("Download")' | |
| # Set a long timeout for the entire Playwright operation (in milliseconds) | |
| PLAYWRIGHT_TIMEOUT_MS = 60000 | |
| def get_pixeldrain_info(pixeldrain_url: str): | |
| """ | |
| Launches a headless Chromium browser using Playwright to bypass Pixeldrain's | |
| client-side hotlink detection, extracts the final tokenized URL and cookies. | |
| """ | |
| # 1. Initialization and Stealth Headers | |
| user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
| final_download_url = None | |
| try: | |
| with sync_playwright() as p: | |
| # 2. Launch Browser with Stealth Context | |
| # Set a slower speed (slow_mo) to give the page a chance to react | |
| browser = p.chromium.launch(headless=True, slow_mo=50) | |
| # Use a custom context with realistic viewport and locale | |
| context = browser.new_context( | |
| user_agent=user_agent, | |
| viewport={"width": 1920, "height": 1080}, | |
| locale="en-US" | |
| ) | |
| # Set the overall operation timeout | |
| context.set_default_timeout(PLAYWRIGHT_TIMEOUT_MS) | |
| page = context.new_page() | |
| # 3. Setup Network Listener | |
| def handle_response(response): | |
| nonlocal final_download_url | |
| # Look for the final redirect URL pattern (https://pd.pixeldrain.com/api/file/HASH/FILENAME) | |
| # We check for '/api/file/' and a 200 OK status. | |
| if "/api/file/" in response.url and response.status == 200 and len(response.url) > 50: | |
| print(f"Captured final link via network: {response.url}") | |
| final_download_url = response.url | |
| page.on("response", handle_response) | |
| # 4. Navigate (Using the least strict wait condition: "load") | |
| page.goto(pixeldrain_url, wait_until="load", timeout=PLAYWRIGHT_TIMEOUT_MS) | |
| print("Successfully navigated to Pixeldrain page (waiting until 'load').") | |
| # 5. Wait for the button and click it (to trigger s("download")) | |
| try: | |
| # Wait for 30 seconds for the button to appear after the page has loaded | |
| page.wait_for_selector(DOWNLOAD_BUTTON_SELECTOR, timeout=30000) | |
| print(f"Found button with selector: {DOWNLOAD_BUTTON_SELECTOR}. Clicking...") | |
| page.click(DOWNLOAD_BUTTON_SELECTOR) | |
| except Exception as e: | |
| raise ValueError(f"Button not found/Timeout during click: {e}") | |
| # 6. Wait for the final URL to be captured | |
| # Give it 15 seconds for the network request to fire and be captured | |
| page.wait_for_timeout(15000) | |
| if not final_download_url: | |
| raise ValueError("Could not capture the final tokenized download URL after clicking.") | |
| # 7. Extract necessary data | |
| cookies = page.context.cookies() | |
| cookie_string = "; ".join([f"{c['name']}={c['value']}" for c in cookies]) | |
| # Extract filename from the URL (best guess is the last part) | |
| filename = final_download_url.split('/')[-1] | |
| browser.close() | |
| return { | |
| "download_url": final_download_url, | |
| "cookies": cookie_string, | |
| "user_agent": user_agent, | |
| "filename": filename | |
| } | |
| except Exception as e: | |
| print(f"Error during Playwright execution: {e}") | |
| # The Cloudflare Worker will catch the 500 error raised below | |
| return None | |
| # --- FastAPI Routes --- | |
| def resolve_url(url: str): | |
| data = get_pixeldrain_info(url) | |
| if not data or not data["download_url"]: | |
| # Raise 500 to send the failure detail back to the Cloudflare Worker | |
| raise HTTPException(status_code=500, detail="Failed to resolve URL via Playwright.") | |
| return data | |
| def home(): | |
| return {"status": "Pixeldrain Resolver Running"} |