import asyncio import logging import os import random import time from pathlib import Path from typing import AsyncGenerator, Optional from cloakbrowser import launch_persistent_context_async logger = logging.getLogger(__name__) SCREENSHOT_DIR = Path(__file__).parent / "static" / "screenshots" class DeepSeekBrowser: DEEPSEEK_URL = "https://chat.deepseek.com" def __init__(self, email, password, profile_dir="./profiles", headless=True, humanize=True, proxy=None): self.email = email self.password = password self.profile_dir = Path(profile_dir) / email.replace("@", "_at_").replace("+", "_plus_") self.headless = headless self.humanize = humanize self.proxy = proxy self.context = None self.page = None self._logged_in = False self._ready = False def _safe_email(self): return self.email.replace("@", "_at_").replace("+", "_plus_") async def _save_screenshot(self, tag, error_msg=""): try: SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True) path = SCREENSHOT_DIR / f"{tag}_{self._safe_email()}.png" await self.page.screenshot(path=str(path)) # Write companion error context file if error_msg: txt_path = SCREENSHOT_DIR / f"{tag}_{self._safe_email()}.txt" txt_path.write_text( f"Account: {self.email}\n" f"Tag: {tag}\n" f"Time: {time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Error: {error_msg}\n", encoding="utf-8" ) logger.error("Screenshot saved to %s", path) except Exception as e: logger.debug("Screenshot failed: %s", e) async def _human_delay(self, min_ms=5, max_ms=30): await asyncio.sleep(random.uniform(min_ms, max_ms) / 1000) async def start(self): self.profile_dir.mkdir(parents=True, exist_ok=True) # Clean up stale Chromium lock files from previous crashes for lock in ["SingletonLock", "SingletonCookie", "SingletonSocket"]: lp = self.profile_dir / lock if lp.exists(): try: lp.unlink() except Exception: pass args = [ "--disable-gpu", "--disable-dev-shm-usage", "--disable-extensions", "--disable-background-networking", "--disable-default-apps", "--disable-sync", "--mute-audio", "--no-sandbox", "--js-flags=--max-old-space-size=128", "--renderer-process-limit=1", ] self.context = await launch_persistent_context_async( user_data_dir=str(self.profile_dir), headless=self.headless, humanize=self.humanize, proxy=self.proxy, viewport={"width": 1280, "height": 720}, locale="zh-CN", args=args, ) self.page = await self.context.new_page() await self.page.goto(self.DEEPSEEK_URL, timeout=20000) await self._wait_for_cloudflare() await self._check_login_state() async def _wait_for_cloudflare(self): deadline = time.time() + 15 last_url = "" while time.time() < deadline: try: url = self.page.url if url == last_url and "/cdn-cgi" not in url: try: await self.page.wait_for_selector( 'textarea, input[type="text"], input[type="password"]', timeout=3000) return except Exception: pass last_url = url except Exception: pass await asyncio.sleep(1) await asyncio.sleep(2) async def _check_login_state(self): current_url = self.page.url if "/sign_in" in current_url: await self._auto_login() else: try: await self.page.wait_for_selector("textarea", timeout=8000) self._logged_in = True self._ready = True except Exception: await self._auto_login() if self._logged_in: await self._check_mute() async def _check_mute(self): try: muted, until = await self.page.evaluate("""() => { const t = document.body.innerText || ''; const m = t.match(/禁言至\\s*(\\d{4}[-年]\\d{1,2}[-月]\\d{1,2}[日]?\\s*\\d{1,2}:\\d{2})/); if (m) return [true, m[1]]; if (t.includes('禁言')) return [true, '']; return [false, '']; }""") self._is_muted = muted self._muted_until = until if muted: logger.warning("[mute] %s is muted until %s", self.email, until) except Exception: self._is_muted = False self._muted_until = "" def is_muted(self): return getattr(self, "_is_muted", False) def muted_until(self): return getattr(self, "_muted_until", "") async def _auto_login(self): logger.info("Logging in as %s...", self.email) try: pwd_tab = self.page.locator('text="密码登录"').first if await pwd_tab.is_visible(): await pwd_tab.click() await asyncio.sleep(0.3) except Exception as e: logger.debug("No password login tab: %s", e) try: email_input = None deadline = time.time() + 15 while time.time() < deadline: for sel in [ 'input[placeholder*="邮箱"]', 'input[placeholder*="手机号"]', 'input[placeholder*="Email"]', 'input.ds-input__input[type="text"]', ]: el = self.page.locator(sel).first try: if await el.count() > 0 and await el.is_visible(): email_input = el break except Exception: continue if email_input: break await asyncio.sleep(0.8) if not email_input: raise TimeoutError("Email input not found") await email_input.fill(self.email) await asyncio.sleep(0.1) except Exception as e: await self._save_screenshot("login_fail_email", str(e)) logger.error("Email input error: %s", e) raise try: pwd = self.page.locator('input[type="password"]').first await pwd.wait_for(state="visible", timeout=5000) await pwd.fill(self.password) await asyncio.sleep(0.1) except Exception as e: await self._save_screenshot("login_fail_password", str(e)) logger.error("Password input error: %s", e) raise try: btn = self.page.locator('button:has-text("登录")').first await btn.click() await asyncio.sleep(1.5) except Exception as e: await self._save_screenshot("login_fail_button", str(e)) logger.error("Login button error: %s", e) raise try: await self.page.wait_for_selector("textarea", timeout=20000) self._logged_in = True self._ready = True logger.info("Login successful for %s", self.email) except Exception as e: await self._save_screenshot("login_fail_final", str(e)) raise Exception("Login failed") async def new_chat(self): try: btn = self.page.locator( 'a:has-text("开启新对话"), button:has-text("开启新对话"), ' 'a:has-text("新对话"), button:has-text("新对话"), ' '[class*="new-chat"], [class*="newChat"]' ).first if await btn.count() > 0: await btn.click() await self.page.wait_for_selector("textarea", timeout=10000) return await self.page.goto(self.DEEPSEEK_URL, timeout=20000) await self.page.wait_for_selector("textarea", timeout=15000) except Exception as e: logger.error("New chat error: %s", e) raise async def delete_chat(self): try: chat_list = self.page.locator( 'nav, aside, [class*="sidebar"], [class*="Sidebar"], div:has-text("开启新对话")') if await chat_list.count() == 0: return active_item = chat_list.first.locator( '[class*="active"], [class*="selected"], [class*="current"]').first if await active_item.count() == 0: return result = await self.page.evaluate("""() => { const a = document.querySelector('[class*="active"], [class*="selected"]'); if (!a) return 'no-active'; const walk = (n, d) => { if (d > 10) return null; for (const c of n.children || []) { const t = c.tagName, cls = (c.className || '').toString(); if ((t === 'BUTTON' || t === 'svg' || cls.includes('icon') || cls.includes('more') || cls.includes('menu') || cls.includes('action')) && c.offsetWidth < 40 && c.offsetWidth > 0) return c; const f = walk(c, d + 1); if (f) return f; } return null; }; const icon = walk(a, 0); if (icon) { icon.click(); return 'clicked'; } const btn = a.querySelector('button, [role="button"]'); if (btn) { btn.click(); return 'fallback'; } return 'no-icon'; }""") logger.debug("[delete_chat] icon click: %s", result) await asyncio.sleep(0.5) del_btn = self.page.locator(':has-text("删除"), :has-text("Delete")').first if await del_btn.count() == 0: return await del_btn.click() await asyncio.sleep(0.5) confirm = self.page.locator( 'button:has-text("确认"), button:has-text("删除"), ' 'button:has-text("Confirm"), button:has-text("Delete")').last if await confirm.count() > 0: await confirm.click() await asyncio.sleep(1) except Exception as e: logger.warning("[delete_chat] error: %s", e) async def switch_model(self, model): try: js = """(texts) => { const els = Array.from(document.querySelectorAll('*')); const t = els.reverse().find(el => { if (!el.innerText || el.children.length > 0) return false; return texts.some(x => el.innerText.includes(x)) && el.offsetParent !== null; }); if (t) { t.click(); return true; } return false; }""" await self.page.evaluate(js, ['深度思考', 'DeepThink', 'R1']) await asyncio.sleep(0.5) except Exception as e: logger.warning("[switch_model] error: %s", e) async def send_message(self, prompt, timeout=120, model="deepseek-chat"): try: await self.new_chat() await self.switch_model(model) inp = self.page.locator("textarea").first await inp.wait_for(state="visible", timeout=15000) await inp.fill(prompt) await self._human_delay() await inp.press("Enter") result = await self._wait_for_response(timeout, prompt) asyncio.create_task(self._safe_delete_chat()) return result except Exception as e: logger.error("Send message error: %s", e) raise async def _safe_delete_chat(self): try: await self.delete_chat() except Exception as e: logger.debug("[safe_delete] %s", e) _EXTRACT_JS = """() => { const r = {thinking: '', answer: '', done: false}; const msgs = document.querySelectorAll( '[class*="assistant"], [class*="bot-"], [class*="message--"], [class*="message-wrapper"], [class*="chat-message"]'); let last = null; for (let i = msgs.length - 1; i >= 0; i--) { if (!(msgs[i].className || '').toLowerCase().includes('user')) { last = msgs[i]; break; } } if (!last) return r; const scope = last; const mdEls = Array.from(scope.querySelectorAll( '[class*="markdown"], [class*="Markdown"], [class*="answer"], [class*="content"]')); const top = mdEls.filter(el => !mdEls.some(p => p !== el && p.contains(el))); let think = '', ans = ''; if (top.length >= 2) { think = top[0].innerText.trim(); ans = top[top.length - 1].innerText.trim(); } else if (top.length === 1) { const t = top[0].innerText.trim(); if (scope.innerText.includes('深度思考') && !scope.innerText.includes('已深度思考')) think = t; else ans = t; } const bt = scope.innerText || ''; const hasMarker = bt.includes('深度思考') || bt.includes('极速思考') || bt.includes('思考过程'); if (!ans || (!think && hasMarker)) { const lines = bt.split('\\n').map(l => l.trim()).filter(Boolean); const skip = ['智能搜索','快速模式','专家模式','极速思考','内容由 AI 生成','开启新对话','暂无历史对话']; let isTh = false, tl = [], al = []; for (const l of lines) { if (skip.some(s => l === s)) continue; if (l.length < 30 && (l.includes('深度思考') || l.includes('极速思考') || l.includes('思考过程'))) { isTh = !(l.includes('已') || l.includes('用时') || l.includes('完成')); continue; } if (isTh) tl.push(l); else al.push(l); } if (tl.length) think = tl.join('\\n'); if (al.length) ans = al.join('\\n'); } r.thinking = think; r.answer = ans; const sb = document.querySelector('[class*="stop"], button[aria-label*="stop"]'); r.done = (!sb || sb.offsetParent === null); if (!r.answer && !r.thinking) r.done = false; return r; }""" async def _wait_for_response(self, timeout, prompt=""): deadline = time.time() + timeout await asyncio.sleep(0.3) last_answer, last_thinking, stable = "", "", 0 while time.time() < deadline: try: result = await self.page.evaluate(self._EXTRACT_JS) except Exception as e: err = str(e) if "Target crashed" in err or "Target closed" in err: logger.error("[_wait_for_response] Browser crashed: %s", e) raise await asyncio.sleep(0.5) continue answer = (result.get("answer") or "").strip() thinking = (result.get("thinking") or "").strip() if answer or thinking: if answer != last_answer or thinking != last_thinking: last_answer, last_thinking, stable = answer, thinking, 0 else: stable += 1 if stable >= 2: return {"content": last_answer, "reasoning_content": last_thinking} await asyncio.sleep(0.5) if last_answer or last_thinking: return {"content": last_answer, "reasoning_content": last_thinking} raise TimeoutError("No response received") async def stream_message(self, prompt, timeout=120, model="deepseek-chat"): try: await self.new_chat() await self.switch_model(model) inp = self.page.locator("textarea").first await inp.wait_for(state="visible", timeout=15000) await inp.fill(prompt) await self._human_delay() await inp.press("Enter") deadline = time.time() + timeout last_thinking, last_answer, stable = "", "", 0 await asyncio.sleep(0.3) while time.time() < deadline: try: result = await self.page.evaluate(self._EXTRACT_JS) except Exception as e: err = str(e) if "Target crashed" in err or "Target closed" in err: logger.error("[stream_message] Browser crashed: %s", e) raise await asyncio.sleep(0.3) continue thinking = (result.get("thinking") or "").strip() answer = (result.get("answer") or "").strip() if thinking and thinking != last_thinking: new = thinking[len(last_thinking):] if new: yield {"type": "thinking", "chunk": new} last_thinking = thinking if answer and answer != last_answer: new = answer[len(last_answer):] if new: yield {"type": "content", "chunk": new} last_answer, stable = answer, 0 elif answer: stable += 1 if stable >= 2: break await asyncio.sleep(0.3) asyncio.create_task(self._safe_delete_chat()) except Exception as e: logger.error("Stream message error: %s", e) raise async def close(self): if self.context: try: await self.context.close() except Exception as e: logger.debug("Error closing browser: %s", e) self.context = None self.page = None self._logged_in = False self._ready = False