ds2api-browser / deepseek_browser.py
nacho
fix: 全面代码审查修复
466f6e2
raw
history blame
16.3 kB
import asyncio
import logging
import random
import time
from pathlib import Path
from typing import AsyncGenerator, Optional
from cloakbrowser import launch_persistent_context_async
logger = logging.getLogger(__name__)
class DeepSeekBrowser:
DEEPSEEK_URL = "https://chat.deepseek.com"
def __init__(
self,
email: str,
password: str,
profile_dir: str = "./profiles",
headless: bool = True,
humanize: bool = True,
proxy: Optional[str] = None,
):
self.email = email
self.password = password
self.profile_dir = Path(profile_dir) / email.replace("@", "_at_").replace("+", "_plus_")
self.headless = headless
self.humanize = humanize
self.proxy = proxy
self.context = None
self.page = None
self._logged_in = False
self._ready = False
def _mask_email(self) -> str:
"""Generate a masked version of the email for skip_phrases filtering."""
parts = self.email.split("@")
if len(parts) == 2:
local = parts[0]
domain = parts[1]
if len(local) > 4:
masked = local[:4] + "*" * (len(local) - 4)
else:
masked = local[0] + "*" * (len(local) - 1)
return f"{masked}@{domain}"
return self.email
async def start(self):
self.profile_dir.mkdir(parents=True, exist_ok=True)
self.context = await launch_persistent_context_async(
user_data_dir=str(self.profile_dir),
headless=self.headless,
humanize=self.humanize,
proxy=self.proxy,
viewport={"width": 1920, "height": 1080},
locale="zh-CN",
)
self.page = await self.context.new_page()
await self.page.goto(self.DEEPSEEK_URL, timeout=60000)
await asyncio.sleep(5)
await self._check_login_state()
async def _check_login_state(self):
current_url = self.page.url
if '/sign_in' in current_url:
await self._auto_login()
else:
try:
await self.page.wait_for_selector('textarea', timeout=10000)
self._logged_in = True
self._ready = True
except Exception:
await self._auto_login()
# Check if account is muted after login
if self._logged_in:
await self._check_mute()
async def _check_mute(self):
"""Check if account is muted and extract mute expiry."""
try:
muted, until = await self.page.evaluate("""() => {
const text = document.body.innerText || '';
// Match: 禁言至 YYYY年M月D日 HH:MM or 禁言至 YYYY-MM-DD HH:MM
const match = text.match(/禁言至\\s*(\\d{4}[-年]\\d{1,2}[-月]\\d{1,2}[日]?\\s*\\d{1,2}:\\d{2})/);
if (match) return [true, match[1]];
if (text.includes('禁言')) return [true, ''];
return [false, ''];
}""")
self._is_muted = muted
self._muted_until = until
if muted:
logger.warning("[mute] %s is muted until %s", self.email, until)
except Exception:
self._is_muted = False
self._muted_until = ""
def is_muted(self) -> bool:
return getattr(self, '_is_muted', False)
def muted_until(self) -> str:
return getattr(self, '_muted_until', "")
async def _auto_login(self):
logger.info("Logging in as %s...", self.email)
try:
email_input = self.page.locator('input[placeholder*="邮箱"], input[placeholder*="手机"], input[placeholder*="Email"], input[placeholder*="email"], input[type="text"]').first
await email_input.wait_for(state="visible", timeout=10000)
await email_input.fill(self.email)
await asyncio.sleep(0.5)
except Exception as e:
# Take screenshot to debug
try:
await self.page.screenshot(path=f"/tmp/login_fail_{self.email.replace('@','_at_')}.png")
logger.error("Screenshot saved to /tmp/login_fail_%s.png", self.email.replace('@', '_at_'))
except Exception:
pass
logger.error("Email input error: %s", e)
raise
try:
password_input = self.page.locator('input[type="password"]').first
await password_input.wait_for(state="visible", timeout=5000)
await password_input.fill(self.password)
await asyncio.sleep(0.5)
except Exception as e:
logger.error("Password input error: %s", e)
raise
try:
login_button = self.page.locator('button:has-text("登录")').first
await login_button.click()
await asyncio.sleep(3)
except Exception as e:
logger.error("Login button error: %s", e)
raise
try:
await self.page.wait_for_selector('textarea', timeout=30000)
self._logged_in = True
self._ready = True
logger.info("Login successful!")
except Exception:
raise Exception("Login failed")
async def _human_delay(self, min_ms: int = 300, max_ms: int = 1500):
delay = random.uniform(min_ms, max_ms) / 1000
await asyncio.sleep(delay)
def _get_skip_phrases(self) -> list:
"""Build skip phrases list, dynamically including masked email."""
phrases = [
'深度思考', '智能搜索', '快速模式', '专家模式',
'内容由 AI 生成', '开启新对话', '暂无历史对话', '今天',
]
masked = self._mask_email()
phrases.append(masked)
return phrases
async def new_chat(self):
try:
await self.page.goto(self.DEEPSEEK_URL, timeout=30000)
await asyncio.sleep(2)
await self.page.wait_for_selector('textarea', timeout=15000)
except Exception as e:
logger.error("New chat error: %s", e)
raise
async def delete_chat(self):
try:
# Find the sidebar and active conversation
chat_list = self.page.locator(
'nav, aside, [class*="sidebar"], [class*="Sidebar"], div:has-text("开启新对话")'
)
chat_list_count = await chat_list.count()
if chat_list_count == 0:
logger.debug("[delete_chat] no sidebar")
return
active_item = chat_list.first.locator(
'[class*="active"], [class*="selected"], [class*="current"]'
).first
active_count = await active_item.count()
if active_count == 0:
# No active item yet (first chat), skip deletion
logger.debug("[delete_chat] no active item, skipping")
return
# Get bounding box and click near right edge where "..." should be
box = await active_item.bounding_box()
if not box:
logger.debug("[delete_chat] no bbox")
return
# Instead of position-based click, find the "..." element in DOM
click_result = await self.page.evaluate("""() => {
// Find the active/highlighted conversation item
const active = document.querySelector('[class*="active"], [class*="selected"]');
if (!active) return 'no-active';
// Walk down to find a clickable child that looks like "..."
// The "..." is often a button or div with no text (SVG only)
const walk = (node, depth) => {
if (depth > 10) return null;
for (const child of node.children || []) {
const tag = child.tagName;
const cls = (child.className || '').toString();
// Look for small icon-like elements
if ((tag === 'BUTTON' || tag === 'svg' || cls.includes('icon') || cls.includes('more') || cls.includes('menu') || cls.includes('action')) &&
child.offsetWidth < 40 && child.offsetWidth > 0) {
return child;
}
const found = walk(child, depth + 1);
if (found) return found;
}
return null;
};
const icon = walk(active, 0);
if (icon) {
icon.click();
return 'clicked:' + icon.tagName + ':' + (icon.className || '').substring(0, 40);
}
// Fallback: find any button/svg in active item
const btn = active.querySelector('button, [role="button"]');
if (btn) {
btn.click();
return 'fallback:' + btn.tagName;
}
return 'no-icon';
}""")
logger.debug("[delete_chat] icon click: %s", click_result)
await asyncio.sleep(0.5)
# Search for "删除" or "Delete" anywhere on page
delete_btn = self.page.locator(
':has-text("删除"), :has-text("Delete")'
).first
delete_count = await delete_btn.count()
if delete_count == 0:
logger.debug("[delete_chat] no delete option found")
return
await delete_btn.click()
await asyncio.sleep(0.5)
# Confirm
confirm_btn = self.page.locator(
'button:has-text("确认"), button:has-text("删除"), '
'button:has-text("Confirm"), button:has-text("Delete")'
).last
if await confirm_btn.count() > 0:
await confirm_btn.click()
await asyncio.sleep(1)
logger.debug("[delete_chat] done!")
else:
logger.debug("[delete_chat] no confirm btn")
except Exception as e:
logger.warning("[delete_chat] error: %s", e)
async def switch_model(self, model: str):
try:
if 'reasoner' in model or 'thinking' in model or 'pro' in model:
thinking_btn = self.page.locator('button:has-text("深度思考"), div:has-text("深度思考")').first
if await thinking_btn.count() > 0:
await thinking_btn.click()
await asyncio.sleep(0.5)
if 'search' in model:
search_btn = self.page.locator('button:has-text("智能搜索"), div:has-text("智能搜索")').first
if await search_btn.count() > 0:
await search_btn.click()
await asyncio.sleep(0.5)
except Exception:
pass
async def send_message(self, prompt: str, timeout: int = 120, model: str = "deepseek-chat") -> str:
try:
await self.new_chat()
await self.switch_model(model)
input_field = self.page.locator('textarea').first
await input_field.wait_for(state="visible", timeout=15000)
await self._human_delay(500, 2000)
await input_field.clear()
await input_field.type(prompt, delay=random.randint(30, 80))
await self._human_delay(200, 800)
await input_field.press('Enter')
response = await self._wait_for_response(timeout, prompt)
await self.delete_chat()
return response
except Exception as e:
logger.error("Send message error: %s", e)
raise
async def _wait_for_response(self, timeout: int, prompt: str = "") -> str:
deadline = time.time() + timeout
await asyncio.sleep(3)
last_text = ""
stable_count = 0
skip_phrases = self._get_skip_phrases()
while time.time() < deadline:
try:
text = await self.page.inner_text('body')
lines = text.split('\n')
response_started = False
response_text = []
for line in lines:
line = line.strip()
if not line:
continue
if line == '内容由 AI 生成,请仔细甄别':
break
if any(phrase in line for phrase in skip_phrases):
continue
if response_started:
response_text.append(line)
if prompt and prompt in line:
response_started = True
if response_text:
current_text = '\n'.join(response_text)
if current_text != last_text:
last_text = current_text
stable_count = 0
else:
stable_count += 1
if stable_count >= 3:
return current_text.strip()
except Exception:
pass
await asyncio.sleep(0.5)
if last_text:
return last_text.strip()
raise TimeoutError("No response received")
async def stream_message(self, prompt: str, timeout: int = 120, model: str = "deepseek-chat") -> AsyncGenerator[str, None]:
try:
await self.new_chat()
await self.switch_model(model)
input_field = self.page.locator('textarea').first
await input_field.wait_for(state="visible", timeout=15000)
await self._human_delay(500, 2000)
await input_field.clear()
await input_field.type(prompt, delay=random.randint(30, 80))
await self._human_delay(200, 800)
await input_field.press('Enter')
deadline = time.time() + timeout
last_text = ""
stable_count = 0
skip_phrases = self._get_skip_phrases()
await asyncio.sleep(3)
while time.time() < deadline:
try:
text = await self.page.inner_text('body')
lines = text.split('\n')
response_started = False
response_text = []
for line in lines:
line = line.strip()
if not line:
continue
if line == '内容由 AI 生成,请仔细甄别':
break
if any(phrase in line for phrase in skip_phrases):
continue
if response_started:
response_text.append(line)
if prompt and prompt in line:
response_started = True
if response_text:
current_text = '\n'.join(response_text)
if current_text != last_text:
new_chunk = current_text[len(last_text):]
if new_chunk:
yield new_chunk
last_text = current_text
stable_count = 0
else:
stable_count += 1
if stable_count >= 3:
break
except Exception:
pass
await asyncio.sleep(0.3)
# Clean up: delete the chat after streaming is done (#17)
try:
await self.delete_chat()
except Exception as e:
logger.warning("[stream_message] delete_chat cleanup error: %s", e)
except Exception as e:
logger.error("Stream message error: %s", e)
raise
async def close(self):
if self.context:
await self.context.close()