File size: 12,728 Bytes
2af6ef5 0347658 2af6ef5 0347658 2af6ef5 bd113f3 2af6ef5 bd113f3 2af6ef5 55d3bfa 2af6ef5 55d3bfa 2af6ef5 bd113f3 2af6ef5 bd113f3 0347658 bd113f3 2af6ef5 bd113f3 2af6ef5 64cfada 55d3bfa 64cfada 55d3bfa 2af6ef5 64cfada 55d3bfa 64cfada 55d3bfa 64cfada 55d3bfa 64cfada bd113f3 64cfada 0347658 55d3bfa 0347658 55d3bfa 0347658 55d3bfa 0347658 55d3bfa 0347658 55d3bfa 0347658 55d3bfa 0347658 64cfada 0347658 64cfada 0347658 2af6ef5 43e0402 595c165 dcf3155 595c165 43e0402 55d3bfa 0347658 dcf3155 0347658 595c165 55d3bfa 0347658 43e0402 55d3bfa 2af6ef5 0347658 64cfada 2af6ef5 0347658 2af6ef5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 | """PhantomAPI β Browser automation engine.
Launches a persistent headless Chrome instance via Playwright
and interacts with chatgpt.com to generate responses.
"""
import asyncio
import threading
import os
import random
from app.config import settings
from playwright_stealth import stealth_async
class BrowserEngine(threading.Thread):
"""A dedicated thread that runs an async Playwright browser.
This avoids blocking the FastAPI event loop while still giving
us a persistent browser instance that can handle sequential requests.
"""
def __init__(self) -> None:
super().__init__(daemon=True)
self.loop = asyncio.new_event_loop()
self.ready = threading.Event()
self.browser = None
self.playwright = None
# ------------------------------------------------------------------
# Thread lifecycle
# ------------------------------------------------------------------
def run(self) -> None:
"""Thread entry point β start browser and run the event loop forever."""
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._launch())
self.ready.set()
print("[PhantomAPI] β‘ Browser engine ready.")
self.loop.run_forever()
async def _launch(self) -> None:
"""Launch a stealth Chromium browser."""
from playwright.async_api import async_playwright
print("[PhantomAPI] π Launching browser...")
launcher_args = {
"headless": settings.HEADLESS,
"args": [
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-gpu",
"--disable-dev-shm-usage",
"--disable-setuid-sandbox",
]
}
if settings.PROXY_URL:
print(f"[PhantomAPI] π Using proxy: {settings.PROXY_URL}")
launcher_args["proxy"] = {"server": settings.PROXY_URL}
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(**launcher_args)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def chat(self, prompt: str) -> str:
"""Send a prompt to ChatGPT and return the response text.
This is a blocking call that schedules work on the browser
thread's event loop and waits for the result.
"""
if not self.ready.wait(timeout=30) or self.browser is None:
raise RuntimeError("Browser engine is not ready. Is Chrome installed?")
future = asyncio.run_coroutine_threadsafe(
self._interact(prompt), self.loop
)
return future.result(timeout=settings.BROWSER_TIMEOUT // 1000 + 30)
# ------------------------------------------------------------------
# Private β browser interaction
# ------------------------------------------------------------------
async def _interact(self, prompt: str) -> str:
"""Open a new ChatGPT session, send the prompt, and scrape the reply."""
context = await self.browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
viewport={"width": 1280, "height": 800},
device_scale_factor=1,
has_touch=False,
is_mobile=False,
)
# Advanced Stealth Overrides
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
Object.defineProperty(navigator, 'platform', {get: () => 'Win32'});
Object.defineProperty(navigator, 'vendor', {get: () => 'Google Inc.'});
""")
# Inject session token if provided
if settings.CHATGPT_SESSION_TOKEN:
print("[PhantomAPI] π Injecting session token...")
await context.add_cookies([{
"name": "__Secure-next-auth.session-token",
"value": settings.CHATGPT_SESSION_TOKEN,
"domain": ".chatgpt.com",
"path": "/",
"httpOnly": True,
"secure": True,
"sameSite": "Lax"
}])
page = await context.new_page()
try:
page.set_default_timeout(settings.BROWSER_TIMEOUT)
# Navigate to ChatGPT
print(f"[PhantomAPI] π Navigating to ChatGPT...")
await page.goto("https://chatgpt.com/", wait_until="load")
# Diagnostic Screenshot (See what the browser sees)
await self._save_debug_screenshot(page)
# --- Diagnostic Logging ---
title = await page.title()
current_url = page.url
print(f"[PhantomAPI] π Page Title: '{title}'")
print(f"[PhantomAPI] π Current URL: {current_url}")
if "auth0" in current_url or "login" in current_url:
print("[PhantomAPI] β οΈ Detected Login/Auth wall.")
if not settings.CHATGPT_SESSION_TOKEN:
print("[PhantomAPI] β ERROR: Not logged in. Please set CHATGPT_SESSION_TOKEN.")
if "cloudflare" in title.lower() or "hcaptcha" in await page.content():
print("[PhantomAPI] β οΈ Detected Cloudflare/CAPTCHA wall.")
# Type the prompt
print("[PhantomAPI] β¨οΈ Waiting for input box...")
await page.wait_for_selector("#prompt-textarea", timeout=45000)
await page.fill("#prompt-textarea", prompt)
await asyncio.sleep(1.0)
# Robust send: Press Enter AND mouse-click the send button
print("[PhantomAPI] π€ Sending prompt...")
await asyncio.sleep(random.uniform(0.5, 1.5))
await page.press("#prompt-textarea", "Enter")
# Fallback: Real mouse click on the send button
try:
btn = await page.wait_for_selector('[data-testid="send-button"]', timeout=3000)
if btn and await btn.is_enabled():
# Get button coordinates and click with mouse
box = await btn.bounding_box()
if box:
await page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2)
print("[PhantomAPI] π±οΈ Mouse-clicked Send button.")
except Exception:
pass
# Wait for assistant response OR error message
print("[PhantomAPI] π€ Waiting for response...")
try:
# Wait for the first assistant bubble or an error
await page.wait_for_selector(
'[data-message-author-role="assistant"]',
timeout=settings.BROWSER_TIMEOUT,
)
print("[PhantomAPI] β
Assistant bubble appeared.")
# Phase 1: Wait for text to appear (Up to 90s)
phase1_start = asyncio.get_event_loop().time()
while True:
if asyncio.get_event_loop().time() - phase1_start > 90:
print("[PhantomAPI] β Timeout waiting for first character.")
break
# Scroll to bottom to trigger rendering
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
bubble = await page.query_selector('[data-message-author-role="assistant"]')
target = await bubble.query_selector(".markdown, .prose, pre") or bubble
content = await target.inner_text()
if content.strip():
print(f"[PhantomAPI] π’ Detected typing start! ({len(content)} chars)")
break
await asyncio.sleep(2.0)
except Exception as e:
# Diagnostics: What is actually on the page?
print(f"[PhantomAPI] β Response timeout/error: {e}")
await self._save_debug_screenshot(page)
# Check for common error messages
page_text = await page.evaluate("document.body.innerText")
if "Something went wrong" in page_text:
print("[PhantomAPI] β detected: 'Something went wrong'")
elif "Rate limit" in page_text:
print("[PhantomAPI] β detected: 'Rate limit'")
elif "Verify you are human" in page_text:
print("[PhantomAPI] β detected: 'Cloudflare / CAPTCHA'")
else:
print(f"[PhantomAPI] π Diagnostic Text (first 300 chars): {page_text[:300]}")
raise
# --- Phase 2: Wait for stability (aria-busy=false) ---
print("[PhantomAPI] β³ Phase 2: Monitoring completion...")
last_text = ""
unchanged_count = 0
start_polling = asyncio.get_event_loop().time()
while True:
if asyncio.get_event_loop().time() - start_polling > 120:
print("[PhantomAPI] β οΈ Hard timeout reached.")
break
# Scroll to bottom regularly
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
bubble = await page.query_selector('[data-message-author-role="assistant"]')
if not bubble: break
# Check busy status
is_busy = await bubble.get_attribute("aria-busy")
target = await bubble.query_selector(".markdown, .prose, pre") or bubble
current_text = await target.inner_text()
current_text = current_text.strip()
if current_text != last_text:
if len(current_text) > len(last_text):
print(f"[PhantomAPI] β³ Generating... ({len(current_text)} chars)")
last_text = current_text
unchanged_count = 0
else:
# If text is stable AND not busy, it's done
if is_busy != "true":
unchanged_count += 1
if unchanged_count >= 5: # Slightly longer stability check
print("[PhantomAPI] β
Generation finished.")
break
await asyncio.sleep(1.0)
await self._save_debug_screenshot(page)
print(f"[PhantomAPI] β¨ Response complete ({len(last_text)} chars).")
return last_text.strip()
except Exception as exc:
print(f"[PhantomAPI] β Browser error: {exc}")
raise
finally:
await page.close()
await context.close()
# ------------------------------------------------------------------
# Debugging
# ------------------------------------------------------------------
async def _save_debug_screenshot(self, page) -> None:
"""Save a screenshot to the static folder for visual debugging."""
try:
static_dir = os.path.join(os.getcwd(), "static")
if not os.path.exists(static_dir):
os.makedirs(static_dir)
path = os.path.join(static_dir, "debug.png")
await page.screenshot(path=path, full_page=False)
print(f"[PhantomAPI] πΈ Debug screenshot saved to static/debug.png")
except Exception as e:
print(f"[PhantomAPI] β οΈ Failed to save screenshot: {e}")
# ---------------------------------------------------------------------------
# Singleton β created once at import time, started in app lifespan
# ---------------------------------------------------------------------------
engine = BrowserEngine()
|