import asyncio import logging import random import time from pathlib import Path from typing import AsyncGenerator, Optional from cloakbrowser import launch_persistent_context_async logger = logging.getLogger(__name__) class DeepSeekBrowser: DEEPSEEK_URL = "https://chat.deepseek.com" def __init__( self, email: str, password: str, profile_dir: str = "./profiles", headless: bool = True, humanize: bool = True, proxy: Optional[str] = None, ): self.email = email self.password = password self.profile_dir = Path(profile_dir) / email.replace("@", "_at_").replace("+", "_plus_") self.headless = headless self.humanize = humanize self.proxy = proxy self.context = None self.page = None self._logged_in = False self._ready = False def _mask_email(self) -> str: """Generate a masked version of the email for skip_phrases filtering.""" parts = self.email.split("@") if len(parts) == 2: local = parts[0] domain = parts[1] if len(local) > 4: masked = local[:4] + "*" * (len(local) - 4) else: masked = local[0] + "*" * (len(local) - 1) return f"{masked}@{domain}" return self.email async def start(self): self.profile_dir.mkdir(parents=True, exist_ok=True) self.context = await launch_persistent_context_async( user_data_dir=str(self.profile_dir), headless=self.headless, humanize=self.humanize, proxy=self.proxy, viewport={"width": 1920, "height": 1080}, locale="zh-CN", ) self.page = await self.context.new_page() await self.page.goto(self.DEEPSEEK_URL, timeout=60000) await asyncio.sleep(5) await self._check_login_state() async def _check_login_state(self): current_url = self.page.url if '/sign_in' in current_url: await self._auto_login() else: try: await self.page.wait_for_selector('textarea', timeout=10000) self._logged_in = True self._ready = True except Exception: await self._auto_login() # Check if account is muted after login if self._logged_in: await self._check_mute() async def _check_mute(self): """Check if account is muted and extract mute expiry.""" try: muted, until = await self.page.evaluate("""() => { const text = document.body.innerText || ''; // Match: 禁言至 YYYY年M月D日 HH:MM or 禁言至 YYYY-MM-DD HH:MM const match = text.match(/禁言至\\s*(\\d{4}[-年]\\d{1,2}[-月]\\d{1,2}[日]?\\s*\\d{1,2}:\\d{2})/); if (match) return [true, match[1]]; if (text.includes('禁言')) return [true, '']; return [false, '']; }""") self._is_muted = muted self._muted_until = until if muted: logger.warning("[mute] %s is muted until %s", self.email, until) except Exception: self._is_muted = False self._muted_until = "" def is_muted(self) -> bool: return getattr(self, '_is_muted', False) def muted_until(self) -> str: return getattr(self, '_muted_until', "") async def _auto_login(self): logger.info("Logging in as %s...", self.email) try: email_input = self.page.locator('input[placeholder*="邮箱"], input[placeholder*="手机"], input[placeholder*="Email"], input[placeholder*="email"], input[type="text"]').first await email_input.wait_for(state="visible", timeout=10000) await email_input.fill(self.email) await asyncio.sleep(0.5) except Exception as e: # Take screenshot to debug try: await self.page.screenshot(path=f"/tmp/login_fail_{self.email.replace('@','_at_')}.png") logger.error("Screenshot saved to /tmp/login_fail_%s.png", self.email.replace('@', '_at_')) except Exception: pass logger.error("Email input error: %s", e) raise try: password_input = self.page.locator('input[type="password"]').first await password_input.wait_for(state="visible", timeout=5000) await password_input.fill(self.password) await asyncio.sleep(0.5) except Exception as e: logger.error("Password input error: %s", e) raise try: login_button = self.page.locator('button:has-text("登录")').first await login_button.click() await asyncio.sleep(3) except Exception as e: logger.error("Login button error: %s", e) raise try: await self.page.wait_for_selector('textarea', timeout=30000) self._logged_in = True self._ready = True logger.info("Login successful!") except Exception: raise Exception("Login failed") async def _human_delay(self, min_ms: int = 300, max_ms: int = 1500): delay = random.uniform(min_ms, max_ms) / 1000 await asyncio.sleep(delay) def _get_skip_phrases(self) -> list: """Build skip phrases list, dynamically including masked email.""" phrases = [ '深度思考', '智能搜索', '快速模式', '专家模式', '内容由 AI 生成', '开启新对话', '暂无历史对话', '今天', ] masked = self._mask_email() phrases.append(masked) return phrases async def new_chat(self): try: await self.page.goto(self.DEEPSEEK_URL, timeout=30000) await asyncio.sleep(2) await self.page.wait_for_selector('textarea', timeout=15000) except Exception as e: logger.error("New chat error: %s", e) raise async def delete_chat(self): try: # Find the sidebar and active conversation chat_list = self.page.locator( 'nav, aside, [class*="sidebar"], [class*="Sidebar"], div:has-text("开启新对话")' ) chat_list_count = await chat_list.count() if chat_list_count == 0: logger.debug("[delete_chat] no sidebar") return active_item = chat_list.first.locator( '[class*="active"], [class*="selected"], [class*="current"]' ).first active_count = await active_item.count() if active_count == 0: # No active item yet (first chat), skip deletion logger.debug("[delete_chat] no active item, skipping") return # Get bounding box and click near right edge where "..." should be box = await active_item.bounding_box() if not box: logger.debug("[delete_chat] no bbox") return # Instead of position-based click, find the "..." element in DOM click_result = await self.page.evaluate("""() => { // Find the active/highlighted conversation item const active = document.querySelector('[class*="active"], [class*="selected"]'); if (!active) return 'no-active'; // Walk down to find a clickable child that looks like "..." // The "..." is often a button or div with no text (SVG only) const walk = (node, depth) => { if (depth > 10) return null; for (const child of node.children || []) { const tag = child.tagName; const cls = (child.className || '').toString(); // Look for small icon-like elements if ((tag === 'BUTTON' || tag === 'svg' || cls.includes('icon') || cls.includes('more') || cls.includes('menu') || cls.includes('action')) && child.offsetWidth < 40 && child.offsetWidth > 0) { return child; } const found = walk(child, depth + 1); if (found) return found; } return null; }; const icon = walk(active, 0); if (icon) { icon.click(); return 'clicked:' + icon.tagName + ':' + (icon.className || '').substring(0, 40); } // Fallback: find any button/svg in active item const btn = active.querySelector('button, [role="button"]'); if (btn) { btn.click(); return 'fallback:' + btn.tagName; } return 'no-icon'; }""") logger.debug("[delete_chat] icon click: %s", click_result) await asyncio.sleep(0.5) # Search for "删除" or "Delete" anywhere on page delete_btn = self.page.locator( ':has-text("删除"), :has-text("Delete")' ).first delete_count = await delete_btn.count() if delete_count == 0: logger.debug("[delete_chat] no delete option found") return await delete_btn.click() await asyncio.sleep(0.5) # Confirm confirm_btn = self.page.locator( 'button:has-text("确认"), button:has-text("删除"), ' 'button:has-text("Confirm"), button:has-text("Delete")' ).last if await confirm_btn.count() > 0: await confirm_btn.click() await asyncio.sleep(1) logger.debug("[delete_chat] done!") else: logger.debug("[delete_chat] no confirm btn") except Exception as e: logger.warning("[delete_chat] error: %s", e) async def switch_model(self, model: str): try: if 'reasoner' in model or 'thinking' in model or 'pro' in model: thinking_btn = self.page.locator('button:has-text("深度思考"), div:has-text("深度思考")').first if await thinking_btn.count() > 0: await thinking_btn.click() await asyncio.sleep(0.5) if 'search' in model: search_btn = self.page.locator('button:has-text("智能搜索"), div:has-text("智能搜索")').first if await search_btn.count() > 0: await search_btn.click() await asyncio.sleep(0.5) except Exception: pass async def send_message(self, prompt: str, timeout: int = 120, model: str = "deepseek-chat") -> str: try: await self.new_chat() await self.switch_model(model) input_field = self.page.locator('textarea').first await input_field.wait_for(state="visible", timeout=15000) await self._human_delay(500, 2000) await input_field.clear() await input_field.type(prompt, delay=random.randint(30, 80)) await self._human_delay(200, 800) await input_field.press('Enter') response = await self._wait_for_response(timeout, prompt) await self.delete_chat() return response except Exception as e: logger.error("Send message error: %s", e) raise async def _wait_for_response(self, timeout: int, prompt: str = "") -> str: deadline = time.time() + timeout await asyncio.sleep(3) last_text = "" stable_count = 0 skip_phrases = self._get_skip_phrases() while time.time() < deadline: try: text = await self.page.inner_text('body') lines = text.split('\n') response_started = False response_text = [] for line in lines: line = line.strip() if not line: continue if line == '内容由 AI 生成,请仔细甄别': break if any(phrase in line for phrase in skip_phrases): continue if response_started: response_text.append(line) if prompt and prompt in line: response_started = True if response_text: current_text = '\n'.join(response_text) if current_text != last_text: last_text = current_text stable_count = 0 else: stable_count += 1 if stable_count >= 3: return current_text.strip() except Exception: pass await asyncio.sleep(0.5) if last_text: return last_text.strip() raise TimeoutError("No response received") async def stream_message(self, prompt: str, timeout: int = 120, model: str = "deepseek-chat") -> AsyncGenerator[str, None]: try: await self.new_chat() await self.switch_model(model) input_field = self.page.locator('textarea').first await input_field.wait_for(state="visible", timeout=15000) await self._human_delay(500, 2000) await input_field.clear() await input_field.type(prompt, delay=random.randint(30, 80)) await self._human_delay(200, 800) await input_field.press('Enter') deadline = time.time() + timeout last_text = "" stable_count = 0 skip_phrases = self._get_skip_phrases() await asyncio.sleep(3) while time.time() < deadline: try: text = await self.page.inner_text('body') lines = text.split('\n') response_started = False response_text = [] for line in lines: line = line.strip() if not line: continue if line == '内容由 AI 生成,请仔细甄别': break if any(phrase in line for phrase in skip_phrases): continue if response_started: response_text.append(line) if prompt and prompt in line: response_started = True if response_text: current_text = '\n'.join(response_text) if current_text != last_text: new_chunk = current_text[len(last_text):] if new_chunk: yield new_chunk last_text = current_text stable_count = 0 else: stable_count += 1 if stable_count >= 3: break except Exception: pass await asyncio.sleep(0.3) # Clean up: delete the chat after streaming is done (#17) try: await self.delete_chat() except Exception as e: logger.warning("[stream_message] delete_chat cleanup error: %s", e) except Exception as e: logger.error("Stream message error: %s", e) raise async def close(self): if self.context: await self.context.close()