"""PhantomAPI — Browser automation engine. Launches a persistent headless Chrome instance via Playwright and interacts with chatgpt.com to generate responses. """ import asyncio import threading import os import random from app.config import settings from playwright_stealth import stealth_async class BrowserEngine(threading.Thread): """A dedicated thread that runs an async Playwright browser. This avoids blocking the FastAPI event loop while still giving us a persistent browser instance that can handle sequential requests. """ def __init__(self) -> None: super().__init__(daemon=True) self.loop = asyncio.new_event_loop() self.ready = threading.Event() self.browser = None self.playwright = None # ------------------------------------------------------------------ # Thread lifecycle # ------------------------------------------------------------------ def run(self) -> None: """Thread entry point — start browser and run the event loop forever.""" asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self._launch()) self.ready.set() print("[PhantomAPI] ⚡ Browser engine ready.") self.loop.run_forever() async def _launch(self) -> None: """Launch a stealth Chromium browser.""" from playwright.async_api import async_playwright print("[PhantomAPI] 🚀 Launching browser...") launcher_args = { "headless": settings.HEADLESS, "args": [ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-gpu", "--disable-dev-shm-usage", "--disable-setuid-sandbox", ] } if settings.PROXY_URL: print(f"[PhantomAPI] 🌐 Using proxy: {settings.PROXY_URL}") launcher_args["proxy"] = {"server": settings.PROXY_URL} self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch(**launcher_args) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def chat(self, prompt: str) -> str: """Send a prompt to ChatGPT and return the response text. This is a blocking call that schedules work on the browser thread's event loop and waits for the result. """ if not self.ready.wait(timeout=30) or self.browser is None: raise RuntimeError("Browser engine is not ready. Is Chrome installed?") future = asyncio.run_coroutine_threadsafe( self._interact(prompt), self.loop ) return future.result(timeout=settings.BROWSER_TIMEOUT // 1000 + 30) # ------------------------------------------------------------------ # Private — browser interaction # ------------------------------------------------------------------ async def _interact(self, prompt: str) -> str: """Open a new ChatGPT session, send the prompt, and scrape the reply.""" context = await self.browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), viewport={"width": 1280, "height": 800}, device_scale_factor=1, has_touch=False, is_mobile=False, ) # Advanced Stealth Overrides await context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']}); Object.defineProperty(navigator, 'platform', {get: () => 'Win32'}); Object.defineProperty(navigator, 'vendor', {get: () => 'Google Inc.'}); """) # Inject session token if provided if settings.CHATGPT_SESSION_TOKEN: print("[PhantomAPI] 🔑 Injecting session token...") await context.add_cookies([{ "name": "__Secure-next-auth.session-token", "value": settings.CHATGPT_SESSION_TOKEN, "domain": ".chatgpt.com", "path": "/", "httpOnly": True, "secure": True, "sameSite": "Lax" }]) page = await context.new_page() try: page.set_default_timeout(settings.BROWSER_TIMEOUT) # Navigate to ChatGPT print(f"[PhantomAPI] 🌐 Navigating to ChatGPT...") await page.goto("https://chatgpt.com/", wait_until="load") # Diagnostic Screenshot (See what the browser sees) await self._save_debug_screenshot(page) # --- Diagnostic Logging --- title = await page.title() current_url = page.url print(f"[PhantomAPI] 📍 Page Title: '{title}'") print(f"[PhantomAPI] 📍 Current URL: {current_url}") if "auth0" in current_url or "login" in current_url: print("[PhantomAPI] ⚠️ Detected Login/Auth wall.") if not settings.CHATGPT_SESSION_TOKEN: print("[PhantomAPI] ❌ ERROR: Not logged in. Please set CHATGPT_SESSION_TOKEN.") if "cloudflare" in title.lower() or "hcaptcha" in await page.content(): print("[PhantomAPI] ⚠️ Detected Cloudflare/CAPTCHA wall.") # Type the prompt print("[PhantomAPI] ⌨️ Waiting for input box...") await page.wait_for_selector("#prompt-textarea", timeout=45000) await page.fill("#prompt-textarea", prompt) await asyncio.sleep(1.0) # Robust send: Press Enter AND mouse-click the send button print("[PhantomAPI] 📤 Sending prompt...") await asyncio.sleep(random.uniform(0.5, 1.5)) await page.press("#prompt-textarea", "Enter") # Fallback: Real mouse click on the send button try: btn = await page.wait_for_selector('[data-testid="send-button"]', timeout=3000) if btn and await btn.is_enabled(): # Get button coordinates and click with mouse box = await btn.bounding_box() if box: await page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2) print("[PhantomAPI] 🖱️ Mouse-clicked Send button.") except Exception: pass # Wait for assistant response OR error message print("[PhantomAPI] 🤖 Waiting for response...") try: # Wait for the first assistant bubble or an error await page.wait_for_selector( '[data-message-author-role="assistant"]', timeout=settings.BROWSER_TIMEOUT, ) print("[PhantomAPI] ✅ Assistant bubble appeared.") # Phase 1: Wait for text to appear (Up to 90s) phase1_start = asyncio.get_event_loop().time() while True: if asyncio.get_event_loop().time() - phase1_start > 90: print("[PhantomAPI] ❌ Timeout waiting for first character.") break # Scroll to bottom to trigger rendering await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") bubble = await page.query_selector('[data-message-author-role="assistant"]') target = await bubble.query_selector(".markdown, .prose, pre") or bubble content = await target.inner_text() if content.strip(): print(f"[PhantomAPI] 📢 Detected typing start! ({len(content)} chars)") break await asyncio.sleep(2.0) except Exception as e: # Diagnostics: What is actually on the page? print(f"[PhantomAPI] ❌ Response timeout/error: {e}") await self._save_debug_screenshot(page) # Check for common error messages page_text = await page.evaluate("document.body.innerText") if "Something went wrong" in page_text: print("[PhantomAPI] ⛔ detected: 'Something went wrong'") elif "Rate limit" in page_text: print("[PhantomAPI] ⛔ detected: 'Rate limit'") elif "Verify you are human" in page_text: print("[PhantomAPI] ⛔ detected: 'Cloudflare / CAPTCHA'") else: print(f"[PhantomAPI] 📍 Diagnostic Text (first 300 chars): {page_text[:300]}") raise # --- Phase 2: Wait for stability (aria-busy=false) --- print("[PhantomAPI] ⏳ Phase 2: Monitoring completion...") last_text = "" unchanged_count = 0 start_polling = asyncio.get_event_loop().time() while True: if asyncio.get_event_loop().time() - start_polling > 120: print("[PhantomAPI] ⚠️ Hard timeout reached.") break # Scroll to bottom regularly await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") bubble = await page.query_selector('[data-message-author-role="assistant"]') if not bubble: break # Check busy status is_busy = await bubble.get_attribute("aria-busy") target = await bubble.query_selector(".markdown, .prose, pre") or bubble current_text = await target.inner_text() current_text = current_text.strip() if current_text != last_text: if len(current_text) > len(last_text): print(f"[PhantomAPI] ⏳ Generating... ({len(current_text)} chars)") last_text = current_text unchanged_count = 0 else: # If text is stable AND not busy, it's done if is_busy != "true": unchanged_count += 1 if unchanged_count >= 5: # Slightly longer stability check print("[PhantomAPI] ✅ Generation finished.") break await asyncio.sleep(1.0) await self._save_debug_screenshot(page) print(f"[PhantomAPI] ✨ Response complete ({len(last_text)} chars).") return last_text.strip() except Exception as exc: print(f"[PhantomAPI] ❌ Browser error: {exc}") raise finally: await page.close() await context.close() # ------------------------------------------------------------------ # Debugging # ------------------------------------------------------------------ async def _save_debug_screenshot(self, page) -> None: """Save a screenshot to the static folder for visual debugging.""" try: static_dir = os.path.join(os.getcwd(), "static") if not os.path.exists(static_dir): os.makedirs(static_dir) path = os.path.join(static_dir, "debug.png") await page.screenshot(path=path, full_page=False) print(f"[PhantomAPI] 📸 Debug screenshot saved to static/debug.png") except Exception as e: print(f"[PhantomAPI] ⚠️ Failed to save screenshot: {e}") # --------------------------------------------------------------------------- # Singleton — created once at import time, started in app lifespan # --------------------------------------------------------------------------- engine = BrowserEngine()