Playwright / main.py
hsmm's picture
Upload 4 files
f4f3994 verified
import asyncio
import json
import os
import re
import time
from dataclasses import dataclass
from typing import Optional
from fastapi import FastAPI, Header, HTTPException
from fastapi.responses import FileResponse, HTMLResponse, PlainTextResponse
from pydantic import BaseModel
from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError
# -------------------------
# Runtime configuration
# -------------------------
WORKER_API_KEY = os.getenv("WORKER_API_KEY", "")
PUBLIC_BASE_URL = os.getenv("PUBLIC_BASE_URL", "").rstrip("/") # e.g. https://YOUR_SPACE.hf.space
PROFILE_DIR = os.getenv("PROFILE_DIR", "/data/doubao_profile")
STATE_FILE = os.path.join(PROFILE_DIR, "worker_state.json")
# Chat entry URL. You can keep it as a specific chat id if you prefer stability.
DOUBAO_CHAT_URL = os.getenv("DOUBAO_CHAT_URL", "https://www.doubao.com/chat/")
# Manual auth rate-limit: allow at most once per N hours
AUTH_LIMIT_HOURS = int(os.getenv("AUTH_LIMIT_HOURS", "24"))
# Where to store the latest auth screenshot (QR / login screen)
AUTH_SCREENSHOT = os.path.join(PROFILE_DIR, "auth.png")
# Basic selectors (may need adjustment if Doubao updates UI)
INPUT_SELECTOR = os.getenv("INPUT_SELECTOR", "textarea")
SEND_BTN_SELECTOR = os.getenv("SEND_BTN_SELECTOR", "#flow-end-msg-send")
ANSWER_SELECTOR = os.getenv("ANSWER_SELECTOR", ".container-ncFTrL")
# -------------------------
# FastAPI app
# -------------------------
app = FastAPI(title="Doubao Web Worker (Playwright)", version="1.0.0")
class AskRequest(BaseModel):
question: str
@dataclass
class Runtime:
pw: Optional[object] = None
ctx: Optional[object] = None
page: Optional[object] = None
lock: asyncio.Lock = asyncio.Lock()
rt = Runtime()
def _now_ts() -> int:
return int(time.time())
def _read_state() -> dict:
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _write_state(state: dict) -> None:
os.makedirs(PROFILE_DIR, exist_ok=True)
tmp = STATE_FILE + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
os.replace(tmp, STATE_FILE)
def _require_key(x_api_key: Optional[str]) -> None:
if not WORKER_API_KEY:
raise HTTPException(status_code=500, detail="WORKER_API_KEY is not set on the worker.")
if not x_api_key or x_api_key != WORKER_API_KEY:
raise HTTPException(status_code=401, detail="Invalid X-Api-Key.")
async def _ensure_browser() -> None:
if rt.ctx and rt.page:
return
os.makedirs(PROFILE_DIR, exist_ok=True)
rt.pw = await async_playwright().start()
# Persistent context keeps cookies/localStorage/etc (crucial for reducing auth frequency)
rt.ctx = await rt.pw.chromium.launch_persistent_context(
user_data_dir=PROFILE_DIR,
headless=True,
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
],
viewport={"width": 1280, "height": 900},
locale="zh-CN",
)
# Reuse first page if exists
if rt.ctx.pages:
rt.page = rt.ctx.pages[0]
else:
rt.page = await rt.ctx.new_page()
rt.page.set_default_timeout(30_000)
async def _goto_chat() -> None:
assert rt.page is not None
await rt.page.goto(DOUBAO_CHAT_URL, wait_until="domcontentloaded")
# give SPA a moment to hydrate
await rt.page.wait_for_timeout(1200)
async def _is_chat_ready() -> bool:
assert rt.page is not None
try:
send_btn = rt.page.locator(SEND_BTN_SELECTOR)
if await send_btn.count() > 0 and await send_btn.first.is_visible():
# also require an input
inp = rt.page.locator(f"{INPUT_SELECTOR}:visible")
return (await inp.count()) > 0
except Exception:
pass
return False
async def _prepare_human_auth() -> str:
"""
Prepare login screen and save a screenshot to AUTH_SCREENSHOT.
Returns a user-facing instruction string.
"""
assert rt.page is not None
os.makedirs(PROFILE_DIR, exist_ok=True)
state = _read_state()
last = state.get("last_human_auth_ts")
if last and (_now_ts() - int(last)) < AUTH_LIMIT_HOURS * 3600:
# Daily (or N-hour) limit reached
return (
"NEED_HUMAN_AUTH\n"
f"已触发登录/验证码,但在过去 {AUTH_LIMIT_HOURS} 小时内已进行过一次人工验证;为满足“每天最多一次人工验证码”约束,当前拒绝继续自动触发。\n"
"建议:先检查 Space 是否重启导致 Cookie 丢失;或升级持久化磁盘并将 PROFILE_DIR 指向 /data。\n"
)
# Record that we are consuming today's manual auth quota
state["last_human_auth_ts"] = _now_ts()
_write_state(state)
await _goto_chat()
# Best-effort: click "登录" if it exists (UI may vary; screenshot will still help)
try:
login_btn = rt.page.get_by_role("button", name=re.compile(r"登录|Log\s*in", re.I))
if await login_btn.count() > 0:
await login_btn.first.click()
await rt.page.wait_for_timeout(800)
except Exception:
pass
# Save screenshot for QR / login status
try:
await rt.page.screenshot(path=AUTH_SCREENSHOT, full_page=True)
except Exception:
pass
base = PUBLIC_BASE_URL or "<WORKER_BASE_URL>"
return (
"NEED_HUMAN_AUTH\n"
"需要你手工完成一次登录(扫码或短信)。\n"
f"1) 打开登录截图:{base}/auth/qr.png\n"
f"2) 打开辅助页面(可填短信验证码/刷新截图):{base}/auth\n"
"登录成功后,回到 Dify 重新运行同一个问题。\n"
)
async def _send_and_read_answer(question: str) -> str:
assert rt.page is not None
# Locate input (best-effort). Doubao is SPA; selectors may change.
inp = rt.page.locator(f"{INPUT_SELECTOR}:visible").last
await inp.click()
await inp.fill(question)
# Count answers before sending
ans = rt.page.locator(ANSWER_SELECTOR)
before = await ans.count()
# Send
send_btn = rt.page.locator(SEND_BTN_SELECTOR).first
await send_btn.click()
# Wait for at least one new answer block
try:
await rt.page.wait_for_function(
"(sel, n) => document.querySelectorAll(sel).length > n",
ANSWER_SELECTOR,
before,
timeout=180_000,
)
except PWTimeoutError:
# Fallback: return last visible text if any
if await ans.count() > 0:
return (await ans.last.inner_text()).strip()
raise
# Stabilize: poll last answer until it stops changing (streaming-like UIs)
last_text = ""
stable_rounds = 0
for _ in range(60):
txt = (await ans.last.inner_text()).strip()
if txt == last_text and txt:
stable_rounds += 1
else:
stable_rounds = 0
last_text = txt
if stable_rounds >= 3:
break
await rt.page.wait_for_timeout(500)
return last_text or "(空响应:已发送但未抓取到文本,请检查选择器或页面结构)"
@app.get("/api/health")
async def health():
return {"ok": True}
@app.post("/api/ask", response_class=PlainTextResponse)
async def ask(req: AskRequest, x_api_key: Optional[str] = Header(default=None, convert_underscores=False)):
_require_key(x_api_key)
q = (req.question or "").strip()
if not q:
raise HTTPException(status_code=400, detail="question is required")
async with rt.lock:
await _ensure_browser()
await _goto_chat()
if not await _is_chat_ready():
msg = await _prepare_human_auth()
return PlainTextResponse(msg, status_code=409)
try:
ans = await _send_and_read_answer(q)
return PlainTextResponse(ans, status_code=200)
except Exception as e:
# Screenshot helps debug selector/UI changes
try:
os.makedirs(PROFILE_DIR, exist_ok=True)
await rt.page.screenshot(path=os.path.join(PROFILE_DIR, "last_error.png"), full_page=True)
except Exception:
pass
raise HTTPException(status_code=500, detail=f"Ask failed: {type(e).__name__}: {e}")
@app.get("/auth/qr.png")
async def auth_qr_png(token: Optional[str] = None):
# Optional lightweight protection (re-use WORKER_API_KEY)
if WORKER_API_KEY and token != WORKER_API_KEY:
raise HTTPException(status_code=401, detail="token required")
if not os.path.exists(AUTH_SCREENSHOT):
raise HTTPException(status_code=404, detail="auth screenshot not ready. Call /api/ask once to generate it.")
return FileResponse(AUTH_SCREENSHOT, media_type="image/png")
@app.get("/auth", response_class=HTMLResponse)
async def auth_page(token: Optional[str] = None):
# Optional lightweight protection (re-use WORKER_API_KEY)
if WORKER_API_KEY and token != WORKER_API_KEY:
raise HTTPException(status_code=401, detail="token required")
base = PUBLIC_BASE_URL or ""
# A minimal helper page: auto-refresh screenshot and allow SMS code input.
# Note: SMS login selectors may need adjustment; QR scan is usually the most robust.
html = f"""
<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<title>Doubao 登录辅助</title>
<style>
body {{ font-family: system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial; margin: 24px; }}
img {{ max-width: 100%; border: 1px solid #ddd; }}
code {{ background:#f6f8fa; padding:2px 4px; border-radius:4px; }}
.row {{ display:flex; gap:24px; flex-wrap:wrap; }}
.card {{ border:1px solid #eee; border-radius:8px; padding:16px; max-width:900px; }}
input {{ padding:8px; width:260px; }}
button {{ padding:8px 12px; }}
</style>
</head>
<body>
<h2>豆包登录辅助(扫码/短信)</h2>
<p>1) 优先使用扫码:用手机直接扫下方截图里的二维码。</p>
<p>2) 如果走短信:先在豆包页面点“短信登录/获取验证码”,再把验证码填到下面提交(需要页面结构匹配)。</p>
<p>3) 登录完成后,回到 Dify 重新运行。</p>
<div class="row">
<div class="card">
<h3>实时截图</h3>
<p><code>/auth/qr.png?token=***</code>(页面每 2 秒刷新)</p>
<img id="shot" src="{base}/auth/qr.png?token={WORKER_API_KEY}&t={_now_ts()}" alt="auth screenshot"/>
</div>
<div class="card">
<h3>短信验证码提交(可选)</h3>
<form method="post" action="{base}/auth/sms?token={WORKER_API_KEY}">
<p><label>手机号(可选):<br/><input name="phone" placeholder="手机号"/></label></p>
<p><label>验证码:<br/><input name="code" placeholder="短信验证码" required/></label></p>
<p><button type="submit">提交验证码</button></p>
</form>
<p>如提交失败,请改用扫码,或调整 Worker 的短信登录选择器。</p>
</div>
</div>
<script>
setInterval(() => {{
const img = document.getElementById('shot');
img.src = "{base}/auth/qr.png?token={WORKER_API_KEY}&t=" + Date.now();
}}, 2000);
</script>
</body>
</html>
"""
return HTMLResponse(html)
@app.post("/auth/sms", response_class=PlainTextResponse)
async def auth_sms(token: Optional[str] = None, phone: Optional[str] = None, code: str = ""):
# Optional lightweight protection (re-use WORKER_API_KEY)
if WORKER_API_KEY and token != WORKER_API_KEY:
raise HTTPException(status_code=401, detail="token required")
code = (code or "").strip()
phone = (phone or "").strip()
if not code:
raise HTTPException(status_code=400, detail="code is required")
async with rt.lock:
await _ensure_browser()
await _goto_chat()
# Best-effort: find phone/code inputs and fill them.
# If Doubao UI differs, prefer QR scan or adjust these selectors.
try:
if phone:
phone_box = rt.page.get_by_placeholder(re.compile(r"手机号|手机", re.I))
if await phone_box.count() > 0:
await phone_box.first.click()
await phone_box.first.fill(phone)
code_box = rt.page.get_by_placeholder(re.compile(r"验证码", re.I))
if await code_box.count() > 0:
await code_box.first.click()
await code_box.first.fill(code)
# Click a confirm/login button
ok_btn = rt.page.get_by_role("button", name=re.compile(r"登录|确定|确认|提交", re.I))
if await ok_btn.count() > 0:
await ok_btn.first.click()
await rt.page.wait_for_timeout(1500)
await rt.page.screenshot(path=AUTH_SCREENSHOT, full_page=True)
return PlainTextResponse("SMS_SUBMITTED. 如果仍未登录,请优先扫码,或检查/调整短信选择器。", status_code=200)
except Exception as e:
try:
await rt.page.screenshot(path=AUTH_SCREENSHOT, full_page=True)
except Exception:
pass
raise HTTPException(status_code=500, detail=f"SMS submit failed: {type(e).__name__}: {e}")