ds2api-browser / deepseek_browser.py
nacho
perf: shave ~5s off chat — shorter waits, non-blocking delete, lower timeouts
fc44ec7
import asyncio
import logging
import os
import random
import time
from pathlib import Path
from typing import AsyncGenerator, Optional
from cloakbrowser import launch_persistent_context_async
logger = logging.getLogger(__name__)
SCREENSHOT_DIR = Path(__file__).parent / "static" / "screenshots"
class DeepSeekBrowser:
DEEPSEEK_URL = "https://chat.deepseek.com"
def __init__(self, email, password, profile_dir="./profiles",
headless=True, humanize=True, proxy=None):
self.email = email
self.password = password
self.profile_dir = Path(profile_dir) / email.replace("@", "_at_").replace("+", "_plus_")
self.headless = headless
self.humanize = humanize
self.proxy = proxy
self.context = None
self.page = None
self._logged_in = False
self._ready = False
def _safe_email(self):
return self.email.replace("@", "_at_").replace("+", "_plus_")
async def _save_screenshot(self, tag, error_msg=""):
try:
SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)
path = SCREENSHOT_DIR / f"{tag}_{self._safe_email()}.png"
await self.page.screenshot(path=str(path))
# Write companion error context file
if error_msg:
txt_path = SCREENSHOT_DIR / f"{tag}_{self._safe_email()}.txt"
txt_path.write_text(
f"Account: {self.email}\n"
f"Tag: {tag}\n"
f"Time: {time.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Error: {error_msg}\n",
encoding="utf-8"
)
logger.error("Screenshot saved to %s", path)
except Exception as e:
logger.debug("Screenshot failed: %s", e)
async def _human_delay(self, min_ms=5, max_ms=30):
await asyncio.sleep(random.uniform(min_ms, max_ms) / 1000)
async def start(self):
self.profile_dir.mkdir(parents=True, exist_ok=True)
# Clean up stale Chromium lock files from previous crashes
for lock in ["SingletonLock", "SingletonCookie", "SingletonSocket"]:
lp = self.profile_dir / lock
if lp.exists():
try:
lp.unlink()
except Exception:
pass
args = [
"--disable-gpu", "--disable-dev-shm-usage", "--disable-extensions",
"--disable-background-networking", "--disable-default-apps",
"--disable-sync", "--mute-audio", "--no-sandbox",
"--js-flags=--max-old-space-size=128", "--renderer-process-limit=1",
]
self.context = await launch_persistent_context_async(
user_data_dir=str(self.profile_dir), headless=self.headless,
humanize=self.humanize, proxy=self.proxy,
viewport={"width": 1280, "height": 720}, locale="zh-CN", args=args,
)
self.page = await self.context.new_page()
await self.page.goto(self.DEEPSEEK_URL, timeout=20000)
await self._wait_for_cloudflare()
await self._check_login_state()
async def _wait_for_cloudflare(self):
deadline = time.time() + 15
last_url = ""
while time.time() < deadline:
try:
url = self.page.url
if url == last_url and "/cdn-cgi" not in url:
try:
await self.page.wait_for_selector(
'textarea, input[type="text"], input[type="password"]',
timeout=3000)
return
except Exception:
pass
last_url = url
except Exception:
pass
await asyncio.sleep(1)
await asyncio.sleep(2)
async def _check_login_state(self):
current_url = self.page.url
if "/sign_in" in current_url:
await self._auto_login()
else:
try:
await self.page.wait_for_selector("textarea", timeout=8000)
self._logged_in = True
self._ready = True
except Exception:
await self._auto_login()
if self._logged_in:
await self._check_mute()
async def _check_mute(self):
try:
muted, until = await self.page.evaluate("""() => {
const t = document.body.innerText || '';
const m = t.match(/禁言至\\s*(\\d{4}[-年]\\d{1,2}[-月]\\d{1,2}[日]?\\s*\\d{1,2}:\\d{2})/);
if (m) return [true, m[1]];
if (t.includes('禁言')) return [true, ''];
return [false, ''];
}""")
self._is_muted = muted
self._muted_until = until
if muted:
logger.warning("[mute] %s is muted until %s", self.email, until)
except Exception:
self._is_muted = False
self._muted_until = ""
def is_muted(self):
return getattr(self, "_is_muted", False)
def muted_until(self):
return getattr(self, "_muted_until", "")
async def _auto_login(self):
logger.info("Logging in as %s...", self.email)
try:
pwd_tab = self.page.locator('text="密码登录"').first
if await pwd_tab.is_visible():
await pwd_tab.click()
await asyncio.sleep(0.3)
except Exception as e:
logger.debug("No password login tab: %s", e)
try:
email_input = None
deadline = time.time() + 15
while time.time() < deadline:
for sel in [
'input[placeholder*="邮箱"]',
'input[placeholder*="手机号"]',
'input[placeholder*="Email"]',
'input.ds-input__input[type="text"]',
]:
el = self.page.locator(sel).first
try:
if await el.count() > 0 and await el.is_visible():
email_input = el
break
except Exception:
continue
if email_input:
break
await asyncio.sleep(0.8)
if not email_input:
raise TimeoutError("Email input not found")
await email_input.fill(self.email)
await asyncio.sleep(0.1)
except Exception as e:
await self._save_screenshot("login_fail_email", str(e))
logger.error("Email input error: %s", e)
raise
try:
pwd = self.page.locator('input[type="password"]').first
await pwd.wait_for(state="visible", timeout=5000)
await pwd.fill(self.password)
await asyncio.sleep(0.1)
except Exception as e:
await self._save_screenshot("login_fail_password", str(e))
logger.error("Password input error: %s", e)
raise
try:
btn = self.page.locator('button:has-text("登录")').first
await btn.click()
await asyncio.sleep(1.5)
except Exception as e:
await self._save_screenshot("login_fail_button", str(e))
logger.error("Login button error: %s", e)
raise
try:
await self.page.wait_for_selector("textarea", timeout=20000)
self._logged_in = True
self._ready = True
logger.info("Login successful for %s", self.email)
except Exception as e:
await self._save_screenshot("login_fail_final", str(e))
raise Exception("Login failed")
async def new_chat(self):
try:
btn = self.page.locator(
'a:has-text("开启新对话"), button:has-text("开启新对话"), '
'a:has-text("新对话"), button:has-text("新对话"), '
'[class*="new-chat"], [class*="newChat"]'
).first
if await btn.count() > 0:
await btn.click()
await self.page.wait_for_selector("textarea", timeout=10000)
return
await self.page.goto(self.DEEPSEEK_URL, timeout=20000)
await self.page.wait_for_selector("textarea", timeout=15000)
except Exception as e:
logger.error("New chat error: %s", e)
raise
async def delete_chat(self):
try:
chat_list = self.page.locator(
'nav, aside, [class*="sidebar"], [class*="Sidebar"], div:has-text("开启新对话")')
if await chat_list.count() == 0:
return
active_item = chat_list.first.locator(
'[class*="active"], [class*="selected"], [class*="current"]').first
if await active_item.count() == 0:
return
result = await self.page.evaluate("""() => {
const a = document.querySelector('[class*="active"], [class*="selected"]');
if (!a) return 'no-active';
const walk = (n, d) => {
if (d > 10) return null;
for (const c of n.children || []) {
const t = c.tagName, cls = (c.className || '').toString();
if ((t === 'BUTTON' || t === 'svg' || cls.includes('icon')
|| cls.includes('more') || cls.includes('menu') || cls.includes('action'))
&& c.offsetWidth < 40 && c.offsetWidth > 0) return c;
const f = walk(c, d + 1);
if (f) return f;
}
return null;
};
const icon = walk(a, 0);
if (icon) { icon.click(); return 'clicked'; }
const btn = a.querySelector('button, [role="button"]');
if (btn) { btn.click(); return 'fallback'; }
return 'no-icon';
}""")
logger.debug("[delete_chat] icon click: %s", result)
await asyncio.sleep(0.5)
del_btn = self.page.locator(':has-text("删除"), :has-text("Delete")').first
if await del_btn.count() == 0:
return
await del_btn.click()
await asyncio.sleep(0.5)
confirm = self.page.locator(
'button:has-text("确认"), button:has-text("删除"), '
'button:has-text("Confirm"), button:has-text("Delete")').last
if await confirm.count() > 0:
await confirm.click()
await asyncio.sleep(1)
except Exception as e:
logger.warning("[delete_chat] error: %s", e)
async def switch_model(self, model):
try:
js = """(texts) => {
const els = Array.from(document.querySelectorAll('*'));
const t = els.reverse().find(el => {
if (!el.innerText || el.children.length > 0) return false;
return texts.some(x => el.innerText.includes(x)) && el.offsetParent !== null;
});
if (t) { t.click(); return true; }
return false;
}"""
await self.page.evaluate(js, ['深度思考', 'DeepThink', 'R1'])
await asyncio.sleep(0.5)
except Exception as e:
logger.warning("[switch_model] error: %s", e)
async def send_message(self, prompt, timeout=120, model="deepseek-chat"):
try:
await self.new_chat()
await self.switch_model(model)
inp = self.page.locator("textarea").first
await inp.wait_for(state="visible", timeout=15000)
await inp.fill(prompt)
await self._human_delay()
await inp.press("Enter")
result = await self._wait_for_response(timeout, prompt)
asyncio.create_task(self._safe_delete_chat())
return result
except Exception as e:
logger.error("Send message error: %s", e)
raise
async def _safe_delete_chat(self):
try:
await self.delete_chat()
except Exception as e:
logger.debug("[safe_delete] %s", e)
_EXTRACT_JS = """() => {
const r = {thinking: '', answer: '', done: false};
const msgs = document.querySelectorAll(
'[class*="assistant"], [class*="bot-"], [class*="message--"], [class*="message-wrapper"], [class*="chat-message"]');
let last = null;
for (let i = msgs.length - 1; i >= 0; i--) {
if (!(msgs[i].className || '').toLowerCase().includes('user')) { last = msgs[i]; break; }
}
if (!last) return r;
const scope = last;
const mdEls = Array.from(scope.querySelectorAll(
'[class*="markdown"], [class*="Markdown"], [class*="answer"], [class*="content"]'));
const top = mdEls.filter(el => !mdEls.some(p => p !== el && p.contains(el)));
let think = '', ans = '';
if (top.length >= 2) { think = top[0].innerText.trim(); ans = top[top.length - 1].innerText.trim(); }
else if (top.length === 1) {
const t = top[0].innerText.trim();
if (scope.innerText.includes('深度思考') && !scope.innerText.includes('已深度思考')) think = t;
else ans = t;
}
const bt = scope.innerText || '';
const hasMarker = bt.includes('深度思考') || bt.includes('极速思考') || bt.includes('思考过程');
if (!ans || (!think && hasMarker)) {
const lines = bt.split('\\n').map(l => l.trim()).filter(Boolean);
const skip = ['智能搜索','快速模式','专家模式','极速思考','内容由 AI 生成','开启新对话','暂无历史对话'];
let isTh = false, tl = [], al = [];
for (const l of lines) {
if (skip.some(s => l === s)) continue;
if (l.length < 30 && (l.includes('深度思考') || l.includes('极速思考') || l.includes('思考过程'))) {
isTh = !(l.includes('已') || l.includes('用时') || l.includes('完成'));
continue;
}
if (isTh) tl.push(l); else al.push(l);
}
if (tl.length) think = tl.join('\\n');
if (al.length) ans = al.join('\\n');
}
r.thinking = think; r.answer = ans;
const sb = document.querySelector('[class*="stop"], button[aria-label*="stop"]');
r.done = (!sb || sb.offsetParent === null);
if (!r.answer && !r.thinking) r.done = false;
return r;
}"""
async def _wait_for_response(self, timeout, prompt=""):
deadline = time.time() + timeout
await asyncio.sleep(0.3)
last_answer, last_thinking, stable = "", "", 0
while time.time() < deadline:
try:
result = await self.page.evaluate(self._EXTRACT_JS)
except Exception as e:
err = str(e)
if "Target crashed" in err or "Target closed" in err:
logger.error("[_wait_for_response] Browser crashed: %s", e)
raise
await asyncio.sleep(0.5)
continue
answer = (result.get("answer") or "").strip()
thinking = (result.get("thinking") or "").strip()
if answer or thinking:
if answer != last_answer or thinking != last_thinking:
last_answer, last_thinking, stable = answer, thinking, 0
else:
stable += 1
if stable >= 2:
return {"content": last_answer, "reasoning_content": last_thinking}
await asyncio.sleep(0.5)
if last_answer or last_thinking:
return {"content": last_answer, "reasoning_content": last_thinking}
raise TimeoutError("No response received")
async def stream_message(self, prompt, timeout=120, model="deepseek-chat"):
try:
await self.new_chat()
await self.switch_model(model)
inp = self.page.locator("textarea").first
await inp.wait_for(state="visible", timeout=15000)
await inp.fill(prompt)
await self._human_delay()
await inp.press("Enter")
deadline = time.time() + timeout
last_thinking, last_answer, stable = "", "", 0
await asyncio.sleep(0.3)
while time.time() < deadline:
try:
result = await self.page.evaluate(self._EXTRACT_JS)
except Exception as e:
err = str(e)
if "Target crashed" in err or "Target closed" in err:
logger.error("[stream_message] Browser crashed: %s", e)
raise
await asyncio.sleep(0.3)
continue
thinking = (result.get("thinking") or "").strip()
answer = (result.get("answer") or "").strip()
if thinking and thinking != last_thinking:
new = thinking[len(last_thinking):]
if new:
yield {"type": "thinking", "chunk": new}
last_thinking = thinking
if answer and answer != last_answer:
new = answer[len(last_answer):]
if new:
yield {"type": "content", "chunk": new}
last_answer, stable = answer, 0
elif answer:
stable += 1
if stable >= 2:
break
await asyncio.sleep(0.3)
asyncio.create_task(self._safe_delete_chat())
except Exception as e:
logger.error("Stream message error: %s", e)
raise
async def close(self):
if self.context:
try:
await self.context.close()
except Exception as e:
logger.debug("Error closing browser: %s", e)
self.context = None
self.page = None
self._logged_in = False
self._ready = False