File size: 1,438 Bytes
e0cb987 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | from typing import List, Dict, Optional
from app.models import Proxy
from datetime import datetime
import asyncio
class InMemoryStorage:
def __init__(self):
self._proxies: Dict[str, Proxy] = {}
self._lock = asyncio.Lock()
async def add_proxies(self, proxies: List[Proxy]) -> int:
async with self._lock:
added = 0
for proxy in proxies:
key = f"{proxy.ip}:{proxy.port}:{proxy.protocol}"
if key not in self._proxies:
self._proxies[key] = proxy
added += 1
return added
async def get_all_proxies(
self, protocol: Optional[str] = None, limit: int = 100, offset: int = 0
) -> List[Proxy]:
async with self._lock:
proxies = list(self._proxies.values())
if protocol:
proxies = [p for p in proxies if p.protocol == protocol]
return proxies[offset : offset + limit]
async def get_count(self, protocol: Optional[str] = None) -> int:
async with self._lock:
if protocol:
return len(
[p for p in self._proxies.values() if p.protocol == protocol]
)
return len(self._proxies)
async def clear(self):
async with self._lock:
self._proxies.clear()
storage = InMemoryStorage()
|