PutuAPI / app /services /browser.py
suzmen's picture
Upload 64 files
55d3bfa verified
"""PhantomAPI β€” Browser automation engine.
Launches a persistent headless Chrome instance via Playwright
and interacts with chatgpt.com to generate responses.
"""
import asyncio
import threading
import os
import random
from app.config import settings
from playwright_stealth import stealth_async
class BrowserEngine(threading.Thread):
"""A dedicated thread that runs an async Playwright browser.
This avoids blocking the FastAPI event loop while still giving
us a persistent browser instance that can handle sequential requests.
"""
def __init__(self) -> None:
super().__init__(daemon=True)
self.loop = asyncio.new_event_loop()
self.ready = threading.Event()
self.browser = None
self.playwright = None
# ------------------------------------------------------------------
# Thread lifecycle
# ------------------------------------------------------------------
def run(self) -> None:
"""Thread entry point β€” start browser and run the event loop forever."""
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._launch())
self.ready.set()
print("[PhantomAPI] ⚑ Browser engine ready.")
self.loop.run_forever()
async def _launch(self) -> None:
"""Launch a stealth Chromium browser."""
from playwright.async_api import async_playwright
print("[PhantomAPI] πŸš€ Launching browser...")
launcher_args = {
"headless": settings.HEADLESS,
"args": [
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-gpu",
"--disable-dev-shm-usage",
"--disable-setuid-sandbox",
]
}
if settings.PROXY_URL:
print(f"[PhantomAPI] 🌐 Using proxy: {settings.PROXY_URL}")
launcher_args["proxy"] = {"server": settings.PROXY_URL}
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(**launcher_args)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def chat(self, prompt: str) -> str:
"""Send a prompt to ChatGPT and return the response text.
This is a blocking call that schedules work on the browser
thread's event loop and waits for the result.
"""
if not self.ready.wait(timeout=30) or self.browser is None:
raise RuntimeError("Browser engine is not ready. Is Chrome installed?")
future = asyncio.run_coroutine_threadsafe(
self._interact(prompt), self.loop
)
return future.result(timeout=settings.BROWSER_TIMEOUT // 1000 + 30)
# ------------------------------------------------------------------
# Private β€” browser interaction
# ------------------------------------------------------------------
async def _interact(self, prompt: str) -> str:
"""Open a new ChatGPT session, send the prompt, and scrape the reply."""
context = await self.browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
viewport={"width": 1280, "height": 800},
device_scale_factor=1,
has_touch=False,
is_mobile=False,
)
# Advanced Stealth Overrides
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
Object.defineProperty(navigator, 'platform', {get: () => 'Win32'});
Object.defineProperty(navigator, 'vendor', {get: () => 'Google Inc.'});
""")
# Inject session token if provided
if settings.CHATGPT_SESSION_TOKEN:
print("[PhantomAPI] πŸ”‘ Injecting session token...")
await context.add_cookies([{
"name": "__Secure-next-auth.session-token",
"value": settings.CHATGPT_SESSION_TOKEN,
"domain": ".chatgpt.com",
"path": "/",
"httpOnly": True,
"secure": True,
"sameSite": "Lax"
}])
page = await context.new_page()
try:
page.set_default_timeout(settings.BROWSER_TIMEOUT)
# Navigate to ChatGPT
print(f"[PhantomAPI] 🌐 Navigating to ChatGPT...")
await page.goto("https://chatgpt.com/", wait_until="load")
# Diagnostic Screenshot (See what the browser sees)
await self._save_debug_screenshot(page)
# --- Diagnostic Logging ---
title = await page.title()
current_url = page.url
print(f"[PhantomAPI] πŸ“ Page Title: '{title}'")
print(f"[PhantomAPI] πŸ“ Current URL: {current_url}")
if "auth0" in current_url or "login" in current_url:
print("[PhantomAPI] ⚠️ Detected Login/Auth wall.")
if not settings.CHATGPT_SESSION_TOKEN:
print("[PhantomAPI] ❌ ERROR: Not logged in. Please set CHATGPT_SESSION_TOKEN.")
if "cloudflare" in title.lower() or "hcaptcha" in await page.content():
print("[PhantomAPI] ⚠️ Detected Cloudflare/CAPTCHA wall.")
# Type the prompt
print("[PhantomAPI] ⌨️ Waiting for input box...")
await page.wait_for_selector("#prompt-textarea", timeout=45000)
await page.fill("#prompt-textarea", prompt)
await asyncio.sleep(1.0)
# Robust send: Press Enter AND mouse-click the send button
print("[PhantomAPI] πŸ“€ Sending prompt...")
await asyncio.sleep(random.uniform(0.5, 1.5))
await page.press("#prompt-textarea", "Enter")
# Fallback: Real mouse click on the send button
try:
btn = await page.wait_for_selector('[data-testid="send-button"]', timeout=3000)
if btn and await btn.is_enabled():
# Get button coordinates and click with mouse
box = await btn.bounding_box()
if box:
await page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2)
print("[PhantomAPI] πŸ–±οΈ Mouse-clicked Send button.")
except Exception:
pass
# Wait for assistant response OR error message
print("[PhantomAPI] πŸ€– Waiting for response...")
try:
# Wait for the first assistant bubble or an error
await page.wait_for_selector(
'[data-message-author-role="assistant"]',
timeout=settings.BROWSER_TIMEOUT,
)
print("[PhantomAPI] βœ… Assistant bubble appeared.")
# Phase 1: Wait for text to appear (Up to 90s)
phase1_start = asyncio.get_event_loop().time()
while True:
if asyncio.get_event_loop().time() - phase1_start > 90:
print("[PhantomAPI] ❌ Timeout waiting for first character.")
break
# Scroll to bottom to trigger rendering
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
bubble = await page.query_selector('[data-message-author-role="assistant"]')
target = await bubble.query_selector(".markdown, .prose, pre") or bubble
content = await target.inner_text()
if content.strip():
print(f"[PhantomAPI] πŸ“’ Detected typing start! ({len(content)} chars)")
break
await asyncio.sleep(2.0)
except Exception as e:
# Diagnostics: What is actually on the page?
print(f"[PhantomAPI] ❌ Response timeout/error: {e}")
await self._save_debug_screenshot(page)
# Check for common error messages
page_text = await page.evaluate("document.body.innerText")
if "Something went wrong" in page_text:
print("[PhantomAPI] β›” detected: 'Something went wrong'")
elif "Rate limit" in page_text:
print("[PhantomAPI] β›” detected: 'Rate limit'")
elif "Verify you are human" in page_text:
print("[PhantomAPI] β›” detected: 'Cloudflare / CAPTCHA'")
else:
print(f"[PhantomAPI] πŸ“ Diagnostic Text (first 300 chars): {page_text[:300]}")
raise
# --- Phase 2: Wait for stability (aria-busy=false) ---
print("[PhantomAPI] ⏳ Phase 2: Monitoring completion...")
last_text = ""
unchanged_count = 0
start_polling = asyncio.get_event_loop().time()
while True:
if asyncio.get_event_loop().time() - start_polling > 120:
print("[PhantomAPI] ⚠️ Hard timeout reached.")
break
# Scroll to bottom regularly
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
bubble = await page.query_selector('[data-message-author-role="assistant"]')
if not bubble: break
# Check busy status
is_busy = await bubble.get_attribute("aria-busy")
target = await bubble.query_selector(".markdown, .prose, pre") or bubble
current_text = await target.inner_text()
current_text = current_text.strip()
if current_text != last_text:
if len(current_text) > len(last_text):
print(f"[PhantomAPI] ⏳ Generating... ({len(current_text)} chars)")
last_text = current_text
unchanged_count = 0
else:
# If text is stable AND not busy, it's done
if is_busy != "true":
unchanged_count += 1
if unchanged_count >= 5: # Slightly longer stability check
print("[PhantomAPI] βœ… Generation finished.")
break
await asyncio.sleep(1.0)
await self._save_debug_screenshot(page)
print(f"[PhantomAPI] ✨ Response complete ({len(last_text)} chars).")
return last_text.strip()
except Exception as exc:
print(f"[PhantomAPI] ❌ Browser error: {exc}")
raise
finally:
await page.close()
await context.close()
# ------------------------------------------------------------------
# Debugging
# ------------------------------------------------------------------
async def _save_debug_screenshot(self, page) -> None:
"""Save a screenshot to the static folder for visual debugging."""
try:
static_dir = os.path.join(os.getcwd(), "static")
if not os.path.exists(static_dir):
os.makedirs(static_dir)
path = os.path.join(static_dir, "debug.png")
await page.screenshot(path=path, full_page=False)
print(f"[PhantomAPI] πŸ“Έ Debug screenshot saved to static/debug.png")
except Exception as e:
print(f"[PhantomAPI] ⚠️ Failed to save screenshot: {e}")
# ---------------------------------------------------------------------------
# Singleton β€” created once at import time, started in app lifespan
# ---------------------------------------------------------------------------
engine = BrowserEngine()