pixeldrain / app.py
nexacore's picture
Update app.py
19b40c0 verified
from fastapi import FastAPI, HTTPException
from playwright.sync_api import sync_playwright
import re
app = FastAPI()
# --- Configuration ---
# You MUST confirm the exact text on the download button.
# Assuming the text is exactly "Download"
DOWNLOAD_BUTTON_SELECTOR = 'button:has-text("Download")'
# Set a long timeout for the entire Playwright operation (in milliseconds)
PLAYWRIGHT_TIMEOUT_MS = 60000
def get_pixeldrain_info(pixeldrain_url: str):
"""
Launches a headless Chromium browser using Playwright to bypass Pixeldrain's
client-side hotlink detection, extracts the final tokenized URL and cookies.
"""
# 1. Initialization and Stealth Headers
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
final_download_url = None
try:
with sync_playwright() as p:
# 2. Launch Browser with Stealth Context
# Set a slower speed (slow_mo) to give the page a chance to react
browser = p.chromium.launch(headless=True, slow_mo=50)
# Use a custom context with realistic viewport and locale
context = browser.new_context(
user_agent=user_agent,
viewport={"width": 1920, "height": 1080},
locale="en-US"
)
# Set the overall operation timeout
context.set_default_timeout(PLAYWRIGHT_TIMEOUT_MS)
page = context.new_page()
# 3. Setup Network Listener
def handle_response(response):
nonlocal final_download_url
# Look for the final redirect URL pattern (https://pd.pixeldrain.com/api/file/HASH/FILENAME)
# We check for '/api/file/' and a 200 OK status.
if "/api/file/" in response.url and response.status == 200 and len(response.url) > 50:
print(f"Captured final link via network: {response.url}")
final_download_url = response.url
page.on("response", handle_response)
# 4. Navigate (Using the least strict wait condition: "load")
page.goto(pixeldrain_url, wait_until="load", timeout=PLAYWRIGHT_TIMEOUT_MS)
print("Successfully navigated to Pixeldrain page (waiting until 'load').")
# 5. Wait for the button and click it (to trigger s("download"))
try:
# Wait for 30 seconds for the button to appear after the page has loaded
page.wait_for_selector(DOWNLOAD_BUTTON_SELECTOR, timeout=30000)
print(f"Found button with selector: {DOWNLOAD_BUTTON_SELECTOR}. Clicking...")
page.click(DOWNLOAD_BUTTON_SELECTOR)
except Exception as e:
raise ValueError(f"Button not found/Timeout during click: {e}")
# 6. Wait for the final URL to be captured
# Give it 15 seconds for the network request to fire and be captured
page.wait_for_timeout(15000)
if not final_download_url:
raise ValueError("Could not capture the final tokenized download URL after clicking.")
# 7. Extract necessary data
cookies = page.context.cookies()
cookie_string = "; ".join([f"{c['name']}={c['value']}" for c in cookies])
# Extract filename from the URL (best guess is the last part)
filename = final_download_url.split('/')[-1]
browser.close()
return {
"download_url": final_download_url,
"cookies": cookie_string,
"user_agent": user_agent,
"filename": filename
}
except Exception as e:
print(f"Error during Playwright execution: {e}")
# The Cloudflare Worker will catch the 500 error raised below
return None
# --- FastAPI Routes ---
@app.get("/resolve_pixeldrain")
def resolve_url(url: str):
data = get_pixeldrain_info(url)
if not data or not data["download_url"]:
# Raise 500 to send the failure detail back to the Cloudflare Worker
raise HTTPException(status_code=500, detail="Failed to resolve URL via Playwright.")
return data
@app.get("/")
def home():
return {"status": "Pixeldrain Resolver Running"}