# app.py from __future__ import annotations import os import time import logging import threading import asyncio from typing import Optional, Dict, Any, Tuple from concurrent.futures import ThreadPoolExecutor from urllib.parse import quote_plus, urljoin from fastapi import FastAPI, HTTPException, Query, Body from pydantic import BaseModel from starlette.responses import JSONResponse from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.common.exceptions import WebDriverException, SessionNotCreatedException from webdriver_manager.chrome import ChromeDriverManager from bs4 import BeautifulSoup from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException # virtual display from pyvirtualdisplay import Display # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("fast_fetcher") # ---------------- BrowserManager ---------------- class BrowserManager: def __init__( self, headless: bool = True, user_agent: Optional[str] = None, window_size: str = "1366,768", disable_images: bool = True, block_resource_urls: Optional[list[str]] = None, ): self.headless = headless self.user_agent = user_agent self.window_size = window_size self.disable_images = disable_images self.block_resource_urls = block_resource_urls or [ "*.doubleclick.net/*", "*.google-analytics.com/*", "*.googlesyndication.com/*", "*.adservice.google.com/*", ] self._driver_lock = threading.Lock() self._driver: Optional[webdriver.Chrome] = None self._display: Optional[Display] = None self._start_driver_with_retries() def _build_options(self) -> Options: opts = Options() # If CHROME_BIN is present, point to it chrome_bin = os.environ.get("CHROME_BIN", "/usr/bin/google-chrome-stable") if os.path.exists(chrome_bin): opts.binary_location = chrome_bin logger.debug("Using chrome binary: %s", chrome_bin) else: logger.warning("Chrome binary not found at %s (will rely on system/browser manager).", chrome_bin) if self.headless: opts.add_argument("--headless=new") opts.add_argument("--headless") # container-friendly flags (and stable fallback) opts.add_argument("--no-sandbox") opts.add_argument("--disable-setuid-sandbox") opts.add_argument("--disable-dev-shm-usage") opts.add_argument("--disable-gpu") opts.add_argument("--disable-extensions") opts.add_argument("--disable-blink-features=AutomationControlled") opts.add_argument("--disable-software-rasterizer") opts.add_argument(f"--window-size={self.window_size}") opts.add_argument("--remote-debugging-port=0") if self.user_agent: opts.add_argument(f"--user-agent={self.user_agent}") if self.disable_images: prefs = { "profile.managed_default_content_settings.images": 2, "profile.managed_default_content_settings.stylesheets": 2, "profile.managed_default_content_settings.fonts": 2, } opts.add_experimental_option("prefs", prefs) opts.add_experimental_option("excludeSwitches", ["enable-logging"]) opts.add_experimental_option("useAutomationExtension", False) return opts def _start_driver_with_retries(self, attempts: int = 3, delay_seconds: float = 1.0): last_exc = None for attempt in range(1, attempts + 1): try: logger.info("Starting Chrome driver (attempt %d/%d)...", attempt, attempts) self._start_driver() logger.info("Chrome driver started successfully.") return except Exception as exc: logger.exception("Failed to start driver on attempt %d: %s", attempt, exc) last_exc = exc time.sleep(delay_seconds) raise RuntimeError(f"Unable to start Chrome driver after {attempts} attempts: {last_exc}") from last_exc def _start_xvfb_if_needed(self): # If headless=False AND no DISPLAY, start Xvfb via pyvirtualdisplay if not self.headless and os.environ.get("DISPLAY", "") == "": try: logger.info("No DISPLAY found and headless=False — starting virtual X display (Xvfb).") self._display = Display(visible=0, size=(int(self.window_size.split(",")[0]), int(self.window_size.split(",")[1]))) self._display.start() logger.info("Virtual X display started (DISPLAY=%s).", os.environ.get("DISPLAY")) except Exception as e: logger.exception("Failed to start virtual display: %s", e) raise def _stop_xvfb_if_started(self): if self._display: try: self._display.stop() logger.info("Virtual X display stopped.") except Exception: pass self._display = None def _start_driver(self): # start virtual display if required BEFORE launching Chrome self._start_xvfb_if_needed() opts = self._build_options() # 1) Try Selenium Manager (webdriver.Chrome(options=opts)). Selenium >=4.14 may use driver manager itself. primary_exc = None fallback_exc = None try: logger.debug("Attempting to start Chrome via Selenium Manager (webdriver.Chrome(options=opts))") self._driver = webdriver.Chrome(options=opts) # quick smoke test: ensure browser is responsive (may throw) try: self._driver.execute_script("return navigator.userAgent") except Exception as e: # browser started but died quickly raise RuntimeError("Browser started by Selenium Manager but crashed immediately.") from e self._post_start_setup() return except Exception as e_primary: primary_exc = e_primary logger.warning("Selenium Manager attempt failed: %s", e_primary) # 2) Fallback: use webdriver-manager to download driver and start with the explicit Service try: driver_path = ChromeDriverManager().install() logger.info("webdriver-manager installed chromedriver: %s", driver_path) try: os.chmod(driver_path, 0o755) except Exception: logger.debug("chmod on chromedriver failed or unnecessary.") service = Service(driver_path) self._driver = webdriver.Chrome(service=service, options=opts) self._post_start_setup() return except Exception as e_fallback: fallback_exc = e_fallback logger.exception("webdriver-manager fallback failed: %s", e_fallback) # 3) Final fallback: attempt system /usr/bin/chromedriver if available try: sys_path = "/usr/bin/chromedriver" if os.path.exists(sys_path): logger.info("Trying system chromedriver at %s", sys_path) try: os.chmod(sys_path, 0o755) except Exception: pass service = Service(sys_path) self._driver = webdriver.Chrome(service=service, options=opts) self._post_start_setup() return except Exception as e_sys: logger.exception("System chromedriver attempt failed: %s", e_sys) # If all failed, stop virtual display (if started) and raise a helpful error self._stop_xvfb_if_started() # Include both primary and fallback messages in the raised exception raise RuntimeError(f"Failed to start Chrome driver. primary_error={primary_exc}, fallback_error={fallback_exc}") def _post_start_setup(self): try: self._driver.set_page_load_timeout(60) # best-effort CDP network blocking try: self._driver.execute_cdp_cmd("Network.enable", {}) if self.block_resource_urls: self._driver.execute_cdp_cmd("Network.setBlockedURLs", {"urls": self.block_resource_urls}) except Exception: pass except Exception: pass def fetch_html( self, url: str, wait_seconds: Optional[float] = 10.0, wait_for_selector: Optional[str] = None, ) -> str: if self._driver is None: self._start_driver_with_retries() with self._driver_lock: driver = self._driver try: driver.get(url) if wait_for_selector and wait_seconds: try: WebDriverWait(driver, wait_seconds).until( EC.presence_of_element_located((By.CSS_SELECTOR, wait_for_selector)) ) except TimeoutException: pass else: if wait_seconds: try: WebDriverWait(driver, min(wait_seconds, 3)).until( lambda d: d.execute_script("return document.readyState") == "complete" ) except Exception: time.sleep(0.5) return driver.page_source except WebDriverException as e: logger.exception("WebDriver exception during fetch: %s", e) # restart driver and raise try: self._safe_quit_driver() except Exception: pass self._start_driver_with_retries() raise RuntimeError(f"WebDriver error during fetch: {e}") def _safe_quit_driver(self): if self._driver: try: self._driver.quit() except Exception: pass self._driver = None # stop display if we started one self._stop_xvfb_if_started() def close(self): self._safe_quit_driver() # ---------------- EXTRACT_DATA (same as your earlier implementation) ---------------- def EXTRACT_DATA(html: str) -> Dict[str, Any]: soup = BeautifulSoup(html, "html.parser") BASE_URL = "https://www.google.com" def safe_text(el): return el.get_text(strip=True) if el else "" def safe_attr(el, attr): return el.get(attr) if el and el.has_attr(attr) else "" def abs_url(url): return urljoin(BASE_URL, url) if url else "" def clean_thumb(src): if src and not src.startswith("data:"): return abs_url(src) return None def is_ad_element(element): for parent in element.parents: if parent.get("id") in ["tads", "tadsb"] or "ads-ad" in parent.get("class", []): return True return False web_results = [] for result in soup.select(".tF2Cxc"): if is_ad_element(result): continue title_tag = result.select_one("h3") link_tag = result.select_one("a") cite_tag = result.select_one("cite") snippet_tag = result.select_one(".VwiC3b") read_more_tag = result.select_one(".vzmbzf") if title_tag and link_tag: entry = { "no": len(web_results) + 1, "title": safe_text(title_tag), "link": abs_url(safe_attr(link_tag, "href")), "displayed_url": safe_text(cite_tag), "snippet": safe_text(snippet_tag) } extra = [] if read_more_tag: read_more_url = abs_url(safe_attr(read_more_tag, "href")) if read_more_url: extra.append({"read_more": read_more_url}) if extra: entry["extra"] = extra web_results.append(entry) image_results = [] for img_item in soup.select(".eA0Zlc"): img_tag = img_item.select_one("img") link_tag = img_item.select_one("a") source_tag = img_item.select_one(".s0fJje span") src = safe_attr(img_tag, "data-src") or safe_attr(img_tag, "src") thumb = clean_thumb(src) if thumb: image_results.append({ "thumbnail": thumb, "alt": safe_attr(img_tag, "alt"), "source": safe_text(source_tag), "link": abs_url(safe_attr(link_tag, "href")) }) video_results = [] for video in soup.select(".KYaZsb"): title_tag = video.select_one(".tNxQIb.ynAwRc") link_tag = video.select_one("a.rIRoqf") thumb_img = video.select_one(".AZJdrc img") duration_tag = video.select_one(".c8rnLc") channel_tag = video.select_one(".Sg4azc span:first-child") date_tag = video.select_one(".rbYSKb span") desc_tag = video.select_one(".wNifxf .p4wth") thumb_src = safe_attr(thumb_img, "data-src") or safe_attr(thumb_img, "src") thumb = clean_thumb(thumb_src) if title_tag and link_tag: video_results.append({ "title": safe_text(title_tag), "link": abs_url(safe_attr(link_tag, "href")), "thumbnail": thumb, "duration": safe_text(duration_tag), "channel": safe_text(channel_tag), "date": safe_text(date_tag), "description_snippet": safe_text(desc_tag) }) news_results = [] for news in soup.select(".m7jPZ"): title_tag = news.select_one(".n0jPhd") link_tag = news.select_one("a") source_tag = news.select_one(".MgUUmf span") time_tag = news.select_one(".rbYSKb span") thumb_img = news.select_one(".uhHOwf img") thumb_src = safe_attr(thumb_img, "data-src") or safe_attr(thumb_img, "src") thumb = clean_thumb(thumb_src) if title_tag and link_tag: news_results.append({ "title": safe_text(title_tag), "link": abs_url(safe_attr(link_tag, "href")), "source": safe_text(source_tag), "time": safe_text(time_tag), "thumbnail": thumb }) knowledge_panel = {} rhs = soup.find(id="rhs") if rhs: title_tag = rhs.select_one(".PZPZlf.ssJ7i") subtitle_tag = rhs.select_one(".iAIpCb span") if title_tag: knowledge_panel["title"] = safe_text(title_tag) if subtitle_tag: knowledge_panel["subtitle"] = safe_text(subtitle_tag) desc_tag = rhs.select_one(".kno-rdesc span") if desc_tag: knowledge_panel["description"] = safe_text(desc_tag) facts = {} for fact in rhs.select(".zloOqf"): label_tag = fact.select_one(".w8qArf") value_tag = fact.select_one(".LrzXr") if label_tag and value_tag: label = safe_text(label_tag).replace(":", "").strip() links = value_tag.find_all("a") if links and len(links) > 1: names = [safe_text(a) for a in links if safe_text(a)] if names: facts[label] = names else: text = safe_text(value_tag) if text: facts[label] = text if facts: knowledge_panel["facts"] = facts profiles = [] for profile in rhs.select(".dRrfkf a"): name_tag = profile.select_one(".CtCigf") link = safe_attr(profile, "href") if name_tag and link: profiles.append({ "platform": safe_text(name_tag), "link": abs_url(link) }) if profiles: knowledge_panel["profiles"] = profiles if not knowledge_panel: knowledge_panel = None ai_overview = None ai_container = soup.select_one(".p2M1Qe .f5cPye") if ai_container: text = safe_text(ai_container) if text: ai_overview = text thumbnails = set() for img in soup.select("img[data-src], img[src]"): src = safe_attr(img, "data-src") or safe_attr(img, "src") clean = clean_thumb(src) if clean: thumbnails.add(clean) all_thumbnails = sorted(thumbnails) if thumbnails else None data = {} if web_results: data["web_results"] = web_results if image_results: data["image_results"] = image_results if video_results: data["video_results"] = video_results if news_results: data["news_results"] = news_results if knowledge_panel: data["knowledge_panel"] = knowledge_panel if ai_overview: data["ai_overview"] = ai_overview if all_thumbnails: data["all_thumbnail_urls"] = all_thumbnails return data # ---------------- BrowserPool and API ---------------- class BrowserPool: def __init__(self, pool_size: int = 1, headless: bool = True): self.pool_size = max(1, pool_size) self.managers = [BrowserManager(headless=headless) for _ in range(self.pool_size)] self._rr_index = 0 self._rr_lock = threading.Lock() def pick_manager(self) -> BrowserManager: with self._rr_lock: idx = self._rr_index self._rr_index = (self._rr_index + 1) % self.pool_size return self.managers[idx] def close_all(self): for m in self.managers: try: m.close() except Exception: pass class SimpleTTLCache: def __init__(self, ttl_seconds: int = 20): self.ttl = ttl_seconds self._cache: Dict[str, Tuple[float, Any]] = {} self._lock = threading.Lock() def get(self, key: str): with self._lock: item = self._cache.get(key) if not item: return None ts, value = item if time.time() - ts > self.ttl: del self._cache[key] return None return value def set(self, key: str, value: Any): with self._lock: self._cache[key] = (time.time(), value) class SearchRequest(BaseModel): query: Optional[str] = None url: Optional[str] = None wait_for_selector: Optional[str] = None headless: Optional[bool] = True app = FastAPI(title="fast_fetcher_api", version="0.1") POOL: Optional[BrowserPool] = None EXECUTOR: Optional[ThreadPoolExecutor] = None CACHE = SimpleTTLCache(ttl_seconds=25) @app.on_event("startup") async def startup_event(): global POOL, EXECUTOR # Switch headless here to False as you asked. The BrowserManager will start an Xvfb display automatically. POOL = BrowserPool(pool_size=1, headless=False) EXECUTOR = ThreadPoolExecutor(max_workers=2) app.state.executor = EXECUTOR app.state.pool = POOL logger.info("Startup: browser pool created (size=%d).", 1) @app.on_event("shutdown") async def shutdown_event(): global POOL, EXECUTOR if POOL: POOL.close_all() if EXECUTOR: EXECUTOR.shutdown(wait=True) logger.info("Shutdown: browsers closed and executor stopped.") def _blocking_fetch_and_extract(manager: BrowserManager, url: str, wait_for_selector: Optional[str], wait_seconds: Optional[float]): start = time.time() html = manager.fetch_html(url, wait_seconds=wait_seconds, wait_for_selector=wait_for_selector) extracted = EXTRACT_DATA(html) duration = time.time() - start return {"url": url, "duration": duration, "data": extracted} @app.get("/health") async def health(): return {"status": "ok"} @app.get("/search") async def search(query: str = Query(..., min_length=1), wait_for_selector: Optional[str] = None): q = query.strip() if not q: raise HTTPException(status_code=400, detail="query parameter required") url = f"https://www.google.com/search?q={quote_plus(q)}" cache_key = f"search:{q}:{wait_for_selector}" cached = CACHE.get(cache_key) if cached: return JSONResponse(content={"cached": True, **cached}) manager = app.state.pool.pick_manager() loop = asyncio.get_event_loop() fut = loop.run_in_executor(app.state.executor, _blocking_fetch_and_extract, manager, url, wait_for_selector, 5.0) result = await fut CACHE.set(cache_key, result) return JSONResponse(content={"cached": False, **result}) @app.get("/fetch") async def fetch(url: str = Query(..., min_length=5), wait_for_selector: Optional[str] = None): manager = app.state.pool.pick_manager() loop = asyncio.get_event_loop() fut = loop.run_in_executor(app.state.executor, _blocking_fetch_and_extract, manager, url, wait_for_selector, 6.0) result = await fut return JSONResponse(content=result) @app.post("/search") async def post_search(body: SearchRequest = Body(...)): if not (body.query or body.url): raise HTTPException(status_code=400, detail="Either query or url must be provided") if body.url: target = body.url else: target = f"https://www.google.com/search?q={quote_plus(body.query)}" cache_key = f"search_post:{target}:{body.wait_for_selector}" cached = CACHE.get(cache_key) if cached: return JSONResponse(content={"cached": True, **cached}) manager = app.state.pool.pick_manager() loop = asyncio.get_event_loop() fut = loop.run_in_executor(app.state.executor, _blocking_fetch_and_extract, manager, target, body.wait_for_selector, 6.0) result = await fut CACHE.set(cache_key, result) return JSONResponse(content={"cached": False, **result})