| import gradio as gr |
| import asyncio |
| from pyppeteer import launch |
| from pyppeteer.errors import TimeoutError |
| import nest_asyncio |
| from multiprocessing import Pool, cpu_count |
| import json |
| import os |
|
|
| |
| |
| nest_asyncio.apply() |
|
|
| async def _scrape_single_url_async(url: str): |
| """Asynchronous function to scrape a single URL.""" |
| browser = None |
| try: |
| print(f"[{os.getpid()}] Launching browser for {url}") |
| browser = await launch( |
| headless=True, |
| args=['--no-sandbox', '--disable-setuid-sandbox'] |
| ) |
| page = await browser.newPage() |
| page.setDefaultNavigationTimeout(60000) |
|
|
| await page.goto(url, {'waitUntil': 'networkidle2'}) |
|
|
| |
| captcha_detected = await page.evaluate('''() => { |
| const captchaKeywords = ['captcha', 'reCAPTCHA', 'hCaptcha', 'I am not a robot', 'verify you are human']; |
| const pageText = document.body.innerText; |
| return captchaKeywords.some(keyword => pageText.includes(keyword)) || |
| document.querySelector('iframe[src*="captcha"]') || |
| document.querySelector('div[data-sitekey]'); |
| }''') |
|
|
| if captcha_detected: |
| return {"url": url, "status": "CAPTCHA_DETECTED", "content": None, "error": "CAPTCHA detected, manual intervention or solver needed."} |
|
|
| |
| if "books.toscrape.com" in url: |
| books = await page.evaluate('''() => { |
| return Array.from(document.querySelectorAll('article.product_pod')).map(book => ({ |
| title: book.querySelector('h3 a').title, |
| price: book.querySelector('p.price_color').textContent, |
| availability: book.querySelector('p.instock').textContent.trim() |
| })) |
| }''') |
| return {"url": url, "status": "SUCCESS", "content": books, "error": None} |
| else: |
| |
| content = await page.content() |
| return {"url": url, "status": "SUCCESS", "content": content, "error": None} |
|
|
| except TimeoutError: |
| print(f"[{os.getpid()}] Timeout error for {url}") |
| return {"url": url, "status": "TIMEOUT_ERROR", "content": None, "error": f"Navigation timed out after 60 seconds for {url}"} |
| except Exception as e: |
| print(f"[{os.getpid()}] General error for {url}: {e}") |
| return {"url": url, "status": "ERROR", "content": None, "error": str(e)} |
| finally: |
| if browser: |
| print(f"[{os.getpid()}] Closing browser for {url}") |
| await browser.close() |
|
|
| def scrape_single_url_worker(url: str): |
| """Wrapper function to run the async scraper in a new event loop for each process.""" |
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| result = loop.run_until_complete(_scrape_single_url_async(url)) |
| loop.close() |
| return result |
|
|
| def concurrent_scrape_urls(urls_input: str): |
| """ |
| Gradio interface function to handle multiple URLs for concurrent scraping. |
| """ |
| urls = [url.strip() for url in urls_input.split('\n') if url.strip()] |
| if not urls: |
| return "Error: No URLs provided." |
|
|
| |
| num_processes = min(len(urls), cpu_count()) |
| print(f"Starting concurrent scraping with {num_processes} processes for {len(urls)} URLs.") |
|
|
| with Pool(processes=num_processes) as pool: |
| |
| |
| results = pool.map(scrape_single_url_worker, urls) |
| |
| formatted_results = [] |
| for result in results: |
| formatted_results.append(json.dumps(result, indent=2)) |
| |
| return "\n\n".join(formatted_results) |
|
|
| iface = gr.Interface( |
| fn=concurrent_scrape_urls, |
| inputs=gr.Textbox(label="Enter URLs to scrape (one per line)", lines=5, placeholder="e.g.,\nhttps://books.toscrape.com/\nhttps://example.com"), |
| outputs=gr.Textbox(label="Scraped Content / Error Messages"), |
| title="๐ Concurrent Web Scraper with CAPTCHA Detection", |
| description="Enter multiple URLs (one per line) to scrape their content concurrently. Basic CAPTCHA detection is included.", |
| allow_flagging="never" |
| ) |
|
|
| if __name__ == "__main__": |
| iface.launch(server_name="0.0.0.0", server_port=7860) |