cloudflare-ai / app.py
vikarshana's picture
Update app.py
cd2b06f verified
import asyncio
import tempfile
import time
import logging
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.core.os_manager import ChromeType
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import gradio as gr
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def capture_direct_download_link(url, opts=None):
if opts is None:
opts = {}
chrome_path = opts.get('chrome_path', '/opt/google/chrome/chrome')
headless = opts.get('headless', True)
wait_for_button_ms = opts.get('wait_for_button_ms', 60000)
driver = None
try:
chrome_options = Options()
if headless:
chrome_options.add_argument("--headless=new")
chrome_options.binary_location = chrome_path
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-setuid-sandbox")
chrome_options.add_argument("--disable-web-security")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--disable-features=IsolateOrigins,site-per-process")
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36")
# βœ… Set cache directory via environment variable (before importing webdriver-manager)
cache_dir = tempfile.mkdtemp(prefix="wdm_")
os.environ['WDM_LOCAL'] = cache_dir # πŸ‘ˆ This is the correct way
# Now import and use normally
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.core.os_manager import ChromeType
from selenium.webdriver.chrome.service import Service
service = Service(
ChromeDriverManager(chrome_type=ChromeType.GOOGLE).install()
)
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.set_window_size(1280, 900)
# Stealth script
stealth_script = """
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });
window.chrome = { runtime: {} };
window.DisableDevtool = function() {};
window.qajblusk = false;
"""
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {'source': stealth_script})
# Block images/fonts
driver.execute_cdp_cmd('Network.enable', {})
driver.execute_cdp_cmd('Network.setBlockedURLs', {
'urls': ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.webp', '*.woff', '*.woff2', '*.ttf']
})
# Navigate
await asyncio.get_event_loop().run_in_executor(None, lambda: driver.get(url))
WebDriverWait(driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
await asyncio.sleep(2)
# Find button
buttons = driver.find_elements(By.TAG_NAME, "button")
target_button = None
for btn in buttons:
if btn.text.strip() == "Direct Download 2":
target_button = btn
break
if not target_button:
raise Exception('No "Direct Download 2" button found on page!')
button_id = target_button.get_attribute("id")
logger.info(f'βœ… Found "Direct Download 2" button with ID: {button_id}')
target_button.click()
# Wait for processing to end
def wait_condition(d):
try:
el = d.find_element(By.ID, button_id)
span = el.find_element(By.CLASS_NAME, "download-text")
return span.text.strip() != "Processing..."
except:
return True
try:
WebDriverWait(driver, 45).until(wait_condition)
except TimeoutException:
pass
logger.info('βœ… Processing finished. Listening for download...')
original_window = driver.current_window_handle
final_url = None
# Check for popup
async def wait_for_popup_or_nav():
nonlocal final_url
start_time = time.time()
while time.time() - start_time < 25:
handles = driver.window_handles
if len(handles) > 1:
for handle in handles:
if handle != original_window:
driver.switch_to.window(handle)
try:
WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
final_url = driver.current_url
logger.info(f'πŸŽ‰ Captured Download URL from NEW TAB: {final_url}')
driver.close()
driver.switch_to.window(original_window)
return
except Exception as e:
logger.error(f"Error handling popup: {e}")
driver.switch_to.window(original_window)
break
await asyncio.sleep(0.5)
# Fallback: check main tab
await asyncio.sleep(3)
current_url = driver.current_url
if current_url != url and "fordev.jpg" not in current_url and not current_url.startswith("about:"):
final_url = current_url
logger.info(f'βœ… Fallback: Download URL from MAIN TAB: {final_url}')
await wait_for_popup_or_nav()
if not final_url or final_url == "about:blank" or "fordev.jpg" in final_url:
raise Exception("❌ Download URL is invalid or blocked.")
return {"success": True, "url": final_url}
except Exception as e:
logger.error(f"❌ capture_direct_download_link error: {str(e)}")
return {"success": False, "error": str(e)}
finally:
if driver:
try:
driver.quit()
except Exception as e:
logger.error(f"Error closing driver: {e}")
# --- GRADIO UI WRAPPER ---
async def gradio_wrapper(url_input):
"""Gradio expects sync functions, so we run async inside"""
try:
result = await capture_direct_download_link(url_input.strip(), {
"chrome_path": "/opt/google/chrome/chrome",
"headless": True,
"wait_for_button_ms": 60000,
})
if result["success"]:
return f"βœ… Success!\nDownload URL: {result['url']}"
else:
return f"❌ Failed: {result['error']}"
except Exception as e:
return f"πŸ’₯ Unexpected Error: {str(e)}"
def create_interface():
with gr.Blocks(title="Download Link Bypasser") as demo:
gr.Markdown("# πŸš€ Direct Download Link Bypasser")
with gr.Row():
url_input = gr.Textbox(label="Enter Target URL", placeholder="https://example.com/download")
submit_btn = gr.Button("Bypass & Get Link", variant="primary")
output = gr.Textbox(label="Result", lines=5)
submit_btn.click(fn=gradio_wrapper, inputs=url_input, outputs=output)
gr.Markdown("ℹ️ This tool finds and clicks 'Direct Download 2' button, then captures the final download URL.")
return demo
# --- MAIN ---
if __name__ == "__main__":
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False # ← MUST be False on Hugging Face Spaces
)