ds2api-browser / account_manager.py
nacho
fix: guard against None browser when reading mute status
99cb7cd
import asyncio
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Dict, Optional
from deepseek_browser import DeepSeekBrowser
logger = logging.getLogger(__name__)
@dataclass
class Account:
email: str
password: str
name: str = ""
proxy: Optional[str] = None
browser: Optional[DeepSeekBrowser] = field(default=None, repr=False)
in_use: bool = False
error_count: int = 0
last_error: str = ""
logged_in: bool = False
is_muted: bool = False
muted_until: str = ""
last_used: float = 0.0
class AccountManager:
def __init__(self, max_active_browsers: int = 3):
self.accounts: Dict[str, Account] = {}
self.queue: deque = deque()
self.max_active_browsers = max_active_browsers
self._lock = asyncio.Lock()
def add_account(self, email: str, password: str, name: str = "", proxy: Optional[str] = None):
self.accounts[email] = Account(
email=email, password=password, name=name, proxy=proxy,
)
async def acquire(self) -> Account:
async with self._lock:
for account in self.accounts.values():
if not account.in_use and account.error_count < 3 and not account.is_muted:
account.in_use = True
account.last_used = time.time()
return account
return await self._wait_for_account()
async def _wait_for_account(self) -> Account:
event = asyncio.Event()
async with self._lock:
self.queue.append(event)
await event.wait()
async with self._lock:
for account in self.accounts.values():
if not account.in_use and account.error_count < 3 and not account.is_muted:
account.in_use = True
account.last_used = time.time()
return account
raise RuntimeError("No account available")
async def release(self, account: Account):
async with self._lock:
account.in_use = False
account.last_used = time.time()
if self.queue:
event = self.queue.popleft()
event.set()
async def mark_error(self, account: Account, error_msg: str = ""):
async with self._lock:
account.error_count += 1
account.last_error = error_msg
account.in_use = False
if self.queue:
event = self.queue.popleft()
event.set()
async def _enforce_browser_limit(self):
active = [a for a in self.accounts.values() if a.browser is not None]
if len(active) >= self.max_active_browsers:
idle = [a for a in active if not a.in_use]
if idle:
idle.sort(key=lambda x: x.last_used)
to_close = len(active) - self.max_active_browsers + 1
for a in idle[:to_close]:
logger.info("Closing idle browser for %s to free memory", a.email)
await self.close_browser(a)
async def get_or_create_browser(self, account: Account, headless: bool = True) -> DeepSeekBrowser:
try:
if account.browser is None:
await self._enforce_browser_limit()
account.browser = DeepSeekBrowser(
email=account.email, password=account.password,
profile_dir="./profiles", headless=headless,
humanize=True, proxy=account.proxy,
)
await account.browser.start()
account.logged_in = True
account.error_count = 0
account.last_error = ""
account.is_muted = account.browser.is_muted() if account.browser else False
account.muted_until = account.browser.muted_until() if account.browser else ""
else:
try:
await account.browser.page.evaluate("1+1", timeout=3000)
except Exception:
logger.warning("Browser for %s seems dead, recreating...", account.email)
await self.close_browser(account)
raise RuntimeError("Browser dead, need recreate")
return account.browser
except Exception as e:
logger.error("Error creating browser for %s: %s", account.email, e)
await self.close_browser(account)
raise
async def get_or_create_browser_with_retry(self, account: Account, headless: bool = True) -> DeepSeekBrowser:
last_error = ""
for attempt in range(2):
try:
return await self.get_or_create_browser(account, headless)
except Exception as e:
last_error = str(e)
if attempt == 0:
logger.info("Retrying browser for %s...", account.email)
await asyncio.sleep(2)
await self.mark_error(account, last_error)
raise RuntimeError(f"Failed to create browser for {account.email}: {last_error}")
async def close_browser(self, account: Account):
if account.browser:
try:
await account.browser.close()
except Exception as e:
logger.debug("Error closing browser for %s: %s", account.email, e)
account.browser = None
account.logged_in = False
def get_stats(self) -> Dict:
total = len(self.accounts)
in_use = sum(1 for a in self.accounts.values() if a.in_use)
available = sum(1 for a in self.accounts.values() if not a.in_use and a.error_count < 3 and not a.is_muted)
logged_in = sum(1 for a in self.accounts.values() if a.logged_in)
muted = sum(1 for a in self.accounts.values() if a.is_muted)
accounts_list = [
{
"email": a.email, "name": a.name,
"in_use": a.in_use, "logged_in": a.logged_in,
"is_muted": a.is_muted, "muted_until": a.muted_until,
"error_count": a.error_count, "last_error": a.last_error,
}
for a in self.accounts.values()
]
return {
"total": total, "in_use": in_use, "available": available,
"logged_in": logged_in, "muted": muted,
"queue_size": len(self.queue), "accounts": accounts_list,
}