File size: 6,460 Bytes
f6a18fc
466f6e2
f7df793
f6a18fc
 
 
 
 
 
466f6e2
 
f6a18fc
 
 
 
 
 
 
 
 
 
f7df793
f6a18fc
2940fed
 
b8df3fa
f6a18fc
 
 
594e4c0
f6a18fc
 
b8df3fa
f6a18fc
 
 
 
f7df793
f6a18fc
 
 
 
 
466f6e2
f6a18fc
b8df3fa
f6a18fc
 
 
 
 
 
 
 
 
 
466f6e2
f6a18fc
b8df3fa
f6a18fc
 
 
 
 
 
b8df3fa
f6a18fc
 
 
 
f7df793
f6a18fc
 
f7df793
f6a18fc
 
 
 
 
b8df3fa
 
 
 
 
 
 
 
 
 
 
f6a18fc
 
 
b8df3fa
f6a18fc
f7df793
 
 
f6a18fc
 
 
f7df793
 
99cb7cd
 
2d9d4ce
 
 
 
 
 
 
f6a18fc
 
466f6e2
f6a18fc
 
 
 
2d9d4ce
 
 
 
 
 
 
 
 
 
 
f6a18fc
 
 
 
 
466f6e2
 
f6a18fc
 
 
 
 
 
466f6e2
f6a18fc
2940fed
 
 
f7df793
 
 
 
2940fed
 
 
f6a18fc
f7df793
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import asyncio
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Dict, Optional

from deepseek_browser import DeepSeekBrowser

logger = logging.getLogger(__name__)


@dataclass
class Account:
    email: str
    password: str
    name: str = ""
    proxy: Optional[str] = None
    browser: Optional[DeepSeekBrowser] = field(default=None, repr=False)
    in_use: bool = False
    error_count: int = 0
    last_error: str = ""
    logged_in: bool = False
    is_muted: bool = False
    muted_until: str = ""
    last_used: float = 0.0


class AccountManager:
    def __init__(self, max_active_browsers: int = 3):
        self.accounts: Dict[str, Account] = {}
        self.queue: deque = deque()
        self.max_active_browsers = max_active_browsers
        self._lock = asyncio.Lock()

    def add_account(self, email: str, password: str, name: str = "", proxy: Optional[str] = None):
        self.accounts[email] = Account(
            email=email, password=password, name=name, proxy=proxy,
        )

    async def acquire(self) -> Account:
        async with self._lock:
            for account in self.accounts.values():
                if not account.in_use and account.error_count < 3 and not account.is_muted:
                    account.in_use = True
                    account.last_used = time.time()
                    return account
        return await self._wait_for_account()

    async def _wait_for_account(self) -> Account:
        event = asyncio.Event()
        async with self._lock:
            self.queue.append(event)
        await event.wait()
        async with self._lock:
            for account in self.accounts.values():
                if not account.in_use and account.error_count < 3 and not account.is_muted:
                    account.in_use = True
                    account.last_used = time.time()
                    return account
        raise RuntimeError("No account available")

    async def release(self, account: Account):
        async with self._lock:
            account.in_use = False
            account.last_used = time.time()
            if self.queue:
                event = self.queue.popleft()
                event.set()

    async def mark_error(self, account: Account, error_msg: str = ""):
        async with self._lock:
            account.error_count += 1
            account.last_error = error_msg
            account.in_use = False
            if self.queue:
                event = self.queue.popleft()
                event.set()

    async def _enforce_browser_limit(self):
        active = [a for a in self.accounts.values() if a.browser is not None]
        if len(active) >= self.max_active_browsers:
            idle = [a for a in active if not a.in_use]
            if idle:
                idle.sort(key=lambda x: x.last_used)
                to_close = len(active) - self.max_active_browsers + 1
                for a in idle[:to_close]:
                    logger.info("Closing idle browser for %s to free memory", a.email)
                    await self.close_browser(a)

    async def get_or_create_browser(self, account: Account, headless: bool = True) -> DeepSeekBrowser:
        try:
            if account.browser is None:
                await self._enforce_browser_limit()
                account.browser = DeepSeekBrowser(
                    email=account.email, password=account.password,
                    profile_dir="./profiles", headless=headless,
                    humanize=True, proxy=account.proxy,
                )
                await account.browser.start()
                account.logged_in = True
                account.error_count = 0
                account.last_error = ""
                account.is_muted = account.browser.is_muted() if account.browser else False
                account.muted_until = account.browser.muted_until() if account.browser else ""
            else:
                try:
                    await account.browser.page.evaluate("1+1", timeout=3000)
                except Exception:
                    logger.warning("Browser for %s seems dead, recreating...", account.email)
                    await self.close_browser(account)
                    raise RuntimeError("Browser dead, need recreate")
            return account.browser
        except Exception as e:
            logger.error("Error creating browser for %s: %s", account.email, e)
            await self.close_browser(account)
            raise

    async def get_or_create_browser_with_retry(self, account: Account, headless: bool = True) -> DeepSeekBrowser:
        last_error = ""
        for attempt in range(2):
            try:
                return await self.get_or_create_browser(account, headless)
            except Exception as e:
                last_error = str(e)
                if attempt == 0:
                    logger.info("Retrying browser for %s...", account.email)
                    await asyncio.sleep(2)
        await self.mark_error(account, last_error)
        raise RuntimeError(f"Failed to create browser for {account.email}: {last_error}")

    async def close_browser(self, account: Account):
        if account.browser:
            try:
                await account.browser.close()
            except Exception as e:
                logger.debug("Error closing browser for %s: %s", account.email, e)
            account.browser = None
            account.logged_in = False

    def get_stats(self) -> Dict:
        total = len(self.accounts)
        in_use = sum(1 for a in self.accounts.values() if a.in_use)
        available = sum(1 for a in self.accounts.values() if not a.in_use and a.error_count < 3 and not a.is_muted)
        logged_in = sum(1 for a in self.accounts.values() if a.logged_in)
        muted = sum(1 for a in self.accounts.values() if a.is_muted)
        accounts_list = [
            {
                "email": a.email, "name": a.name,
                "in_use": a.in_use, "logged_in": a.logged_in,
                "is_muted": a.is_muted, "muted_until": a.muted_until,
                "error_count": a.error_count, "last_error": a.last_error,
            }
            for a in self.accounts.values()
        ]
        return {
            "total": total, "in_use": in_use, "available": available,
            "logged_in": logged_in, "muted": muted,
            "queue_size": len(self.queue), "accounts": accounts_list,
        }