| """PhantomAPI β Browser automation engine.
|
|
|
| Launches a persistent headless Chrome instance via Playwright
|
| and interacts with chatgpt.com to generate responses.
|
| """
|
|
|
| import asyncio
|
| import threading
|
| import os
|
| import random
|
| from app.config import settings
|
| from playwright_stealth import stealth_async
|
|
|
|
|
| class BrowserEngine(threading.Thread):
|
| """A dedicated thread that runs an async Playwright browser.
|
|
|
| This avoids blocking the FastAPI event loop while still giving
|
| us a persistent browser instance that can handle sequential requests.
|
| """
|
|
|
| def __init__(self) -> None:
|
| super().__init__(daemon=True)
|
| self.loop = asyncio.new_event_loop()
|
| self.ready = threading.Event()
|
| self.browser = None
|
| self.playwright = None
|
|
|
|
|
|
|
|
|
|
|
| def run(self) -> None:
|
| """Thread entry point β start browser and run the event loop forever."""
|
| asyncio.set_event_loop(self.loop)
|
| self.loop.run_until_complete(self._launch())
|
| self.ready.set()
|
| print("[PhantomAPI] β‘ Browser engine ready.")
|
| self.loop.run_forever()
|
|
|
| async def _launch(self) -> None:
|
| """Launch a stealth Chromium browser."""
|
| from playwright.async_api import async_playwright
|
|
|
| print("[PhantomAPI] π Launching browser...")
|
| launcher_args = {
|
| "headless": settings.HEADLESS,
|
| "args": [
|
| "--disable-blink-features=AutomationControlled",
|
| "--no-sandbox",
|
| "--disable-gpu",
|
| "--disable-dev-shm-usage",
|
| "--disable-setuid-sandbox",
|
| ]
|
| }
|
|
|
| if settings.PROXY_URL:
|
| print(f"[PhantomAPI] π Using proxy: {settings.PROXY_URL}")
|
| launcher_args["proxy"] = {"server": settings.PROXY_URL}
|
|
|
| self.playwright = await async_playwright().start()
|
| self.browser = await self.playwright.chromium.launch(**launcher_args)
|
|
|
|
|
|
|
|
|
|
|
| def chat(self, prompt: str) -> str:
|
| """Send a prompt to ChatGPT and return the response text.
|
|
|
| This is a blocking call that schedules work on the browser
|
| thread's event loop and waits for the result.
|
| """
|
| if not self.ready.wait(timeout=30) or self.browser is None:
|
| raise RuntimeError("Browser engine is not ready. Is Chrome installed?")
|
|
|
| future = asyncio.run_coroutine_threadsafe(
|
| self._interact(prompt), self.loop
|
| )
|
| return future.result(timeout=settings.BROWSER_TIMEOUT // 1000 + 30)
|
|
|
|
|
|
|
|
|
|
|
| async def _interact(self, prompt: str) -> str:
|
| """Open a new ChatGPT session, send the prompt, and scrape the reply."""
|
| context = await self.browser.new_context(
|
| user_agent=(
|
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
| "AppleWebKit/537.36 (KHTML, like Gecko) "
|
| "Chrome/124.0.0.0 Safari/537.36"
|
| ),
|
| viewport={"width": 1280, "height": 800},
|
| device_scale_factor=1,
|
| has_touch=False,
|
| is_mobile=False,
|
| )
|
|
|
|
|
| await context.add_init_script("""
|
| Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
|
| Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
|
| Object.defineProperty(navigator, 'platform', {get: () => 'Win32'});
|
| Object.defineProperty(navigator, 'vendor', {get: () => 'Google Inc.'});
|
| """)
|
|
|
|
|
| if settings.CHATGPT_SESSION_TOKEN:
|
| print("[PhantomAPI] π Injecting session token...")
|
| await context.add_cookies([{
|
| "name": "__Secure-next-auth.session-token",
|
| "value": settings.CHATGPT_SESSION_TOKEN,
|
| "domain": ".chatgpt.com",
|
| "path": "/",
|
| "httpOnly": True,
|
| "secure": True,
|
| "sameSite": "Lax"
|
| }])
|
|
|
| page = await context.new_page()
|
|
|
| try:
|
| page.set_default_timeout(settings.BROWSER_TIMEOUT)
|
|
|
|
|
| print(f"[PhantomAPI] π Navigating to ChatGPT...")
|
| await page.goto("https://chatgpt.com/", wait_until="load")
|
|
|
|
|
| await self._save_debug_screenshot(page)
|
|
|
|
|
| title = await page.title()
|
| current_url = page.url
|
| print(f"[PhantomAPI] π Page Title: '{title}'")
|
| print(f"[PhantomAPI] π Current URL: {current_url}")
|
|
|
| if "auth0" in current_url or "login" in current_url:
|
| print("[PhantomAPI] β οΈ Detected Login/Auth wall.")
|
| if not settings.CHATGPT_SESSION_TOKEN:
|
| print("[PhantomAPI] β ERROR: Not logged in. Please set CHATGPT_SESSION_TOKEN.")
|
|
|
| if "cloudflare" in title.lower() or "hcaptcha" in await page.content():
|
| print("[PhantomAPI] β οΈ Detected Cloudflare/CAPTCHA wall.")
|
|
|
|
|
| print("[PhantomAPI] β¨οΈ Waiting for input box...")
|
| await page.wait_for_selector("#prompt-textarea", timeout=45000)
|
| await page.fill("#prompt-textarea", prompt)
|
| await asyncio.sleep(1.0)
|
|
|
|
|
| print("[PhantomAPI] π€ Sending prompt...")
|
| await asyncio.sleep(random.uniform(0.5, 1.5))
|
| await page.press("#prompt-textarea", "Enter")
|
|
|
|
|
| try:
|
| btn = await page.wait_for_selector('[data-testid="send-button"]', timeout=3000)
|
| if btn and await btn.is_enabled():
|
|
|
| box = await btn.bounding_box()
|
| if box:
|
| await page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2)
|
| print("[PhantomAPI] π±οΈ Mouse-clicked Send button.")
|
| except Exception:
|
| pass
|
|
|
|
|
| print("[PhantomAPI] π€ Waiting for response...")
|
| try:
|
|
|
| await page.wait_for_selector(
|
| '[data-message-author-role="assistant"]',
|
| timeout=settings.BROWSER_TIMEOUT,
|
| )
|
| print("[PhantomAPI] β
Assistant bubble appeared.")
|
|
|
|
|
| phase1_start = asyncio.get_event_loop().time()
|
| while True:
|
| if asyncio.get_event_loop().time() - phase1_start > 90:
|
| print("[PhantomAPI] β Timeout waiting for first character.")
|
| break
|
|
|
|
|
| await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
|
| bubble = await page.query_selector('[data-message-author-role="assistant"]')
|
| target = await bubble.query_selector(".markdown, .prose, pre") or bubble
|
| content = await target.inner_text()
|
|
|
| if content.strip():
|
| print(f"[PhantomAPI] π’ Detected typing start! ({len(content)} chars)")
|
| break
|
|
|
| await asyncio.sleep(2.0)
|
|
|
| except Exception as e:
|
|
|
| print(f"[PhantomAPI] β Response timeout/error: {e}")
|
| await self._save_debug_screenshot(page)
|
|
|
|
|
| page_text = await page.evaluate("document.body.innerText")
|
| if "Something went wrong" in page_text:
|
| print("[PhantomAPI] β detected: 'Something went wrong'")
|
| elif "Rate limit" in page_text:
|
| print("[PhantomAPI] β detected: 'Rate limit'")
|
| elif "Verify you are human" in page_text:
|
| print("[PhantomAPI] β detected: 'Cloudflare / CAPTCHA'")
|
| else:
|
| print(f"[PhantomAPI] π Diagnostic Text (first 300 chars): {page_text[:300]}")
|
|
|
| raise
|
|
|
|
|
| print("[PhantomAPI] β³ Phase 2: Monitoring completion...")
|
| last_text = ""
|
| unchanged_count = 0
|
| start_polling = asyncio.get_event_loop().time()
|
|
|
| while True:
|
| if asyncio.get_event_loop().time() - start_polling > 120:
|
| print("[PhantomAPI] β οΈ Hard timeout reached.")
|
| break
|
|
|
|
|
| await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
|
| bubble = await page.query_selector('[data-message-author-role="assistant"]')
|
| if not bubble: break
|
|
|
|
|
| is_busy = await bubble.get_attribute("aria-busy")
|
|
|
| target = await bubble.query_selector(".markdown, .prose, pre") or bubble
|
| current_text = await target.inner_text()
|
| current_text = current_text.strip()
|
|
|
| if current_text != last_text:
|
| if len(current_text) > len(last_text):
|
| print(f"[PhantomAPI] β³ Generating... ({len(current_text)} chars)")
|
| last_text = current_text
|
| unchanged_count = 0
|
| else:
|
|
|
| if is_busy != "true":
|
| unchanged_count += 1
|
|
|
| if unchanged_count >= 5:
|
| print("[PhantomAPI] β
Generation finished.")
|
| break
|
|
|
| await asyncio.sleep(1.0)
|
|
|
| await self._save_debug_screenshot(page)
|
| print(f"[PhantomAPI] β¨ Response complete ({len(last_text)} chars).")
|
| return last_text.strip()
|
|
|
| except Exception as exc:
|
| print(f"[PhantomAPI] β Browser error: {exc}")
|
| raise
|
| finally:
|
| await page.close()
|
| await context.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
| async def _save_debug_screenshot(self, page) -> None:
|
| """Save a screenshot to the static folder for visual debugging."""
|
| try:
|
| static_dir = os.path.join(os.getcwd(), "static")
|
| if not os.path.exists(static_dir):
|
| os.makedirs(static_dir)
|
|
|
| path = os.path.join(static_dir, "debug.png")
|
| await page.screenshot(path=path, full_page=False)
|
| print(f"[PhantomAPI] πΈ Debug screenshot saved to static/debug.png")
|
| except Exception as e:
|
| print(f"[PhantomAPI] β οΈ Failed to save screenshot: {e}")
|
|
|
|
|
|
|
|
|
|
|
| engine = BrowserEngine()
|
|
|