| """ |
| Resettable session wrapper for reverse requests. |
| """ |
|
|
| import asyncio |
| from typing import Any, Iterable, Optional |
|
|
| from curl_cffi.requests import AsyncSession |
|
|
| from app.core.config import get_config |
| from app.core.logger import logger |
|
|
|
|
| class ResettableSession: |
| """AsyncSession wrapper that resets connection on specific HTTP status codes.""" |
|
|
| def __init__( |
| self, |
| *, |
| reset_on_status: Optional[Iterable[int]] = None, |
| **session_kwargs: Any, |
| ): |
| self._session_kwargs = dict(session_kwargs) |
| if not self._session_kwargs.get("impersonate"): |
| browser = get_config("proxy.browser") |
| if browser: |
| self._session_kwargs["impersonate"] = browser |
| config_codes = get_config("retry.reset_session_status_codes") |
| if reset_on_status is None: |
| reset_on_status = config_codes if config_codes is not None else [403] |
| if isinstance(reset_on_status, int): |
| reset_on_status = [reset_on_status] |
| self._reset_on_status = ( |
| {int(code) for code in reset_on_status} if reset_on_status else set() |
| ) |
| self._reset_requested = False |
| self._reset_lock = asyncio.Lock() |
| self._session = AsyncSession(**self._session_kwargs) |
|
|
| async def _maybe_reset(self) -> None: |
| if not self._reset_requested: |
| return |
| async with self._reset_lock: |
| if not self._reset_requested: |
| return |
| self._reset_requested = False |
| old_session = self._session |
| self._session = AsyncSession(**self._session_kwargs) |
| try: |
| await old_session.close() |
| except Exception: |
| pass |
| logger.debug("ResettableSession: session reset") |
|
|
| async def _request(self, method: str, *args: Any, **kwargs: Any): |
| await self._maybe_reset() |
| response = await getattr(self._session, method)(*args, **kwargs) |
| if self._reset_on_status and response.status_code in self._reset_on_status: |
| self._reset_requested = True |
| return response |
|
|
| async def get(self, *args: Any, **kwargs: Any): |
| return await self._request("get", *args, **kwargs) |
|
|
| async def post(self, *args: Any, **kwargs: Any): |
| return await self._request("post", *args, **kwargs) |
|
|
| async def reset(self) -> None: |
| self._reset_requested = True |
| await self._maybe_reset() |
|
|
| async def close(self) -> None: |
| if self._session is None: |
| return |
| try: |
| await self._session.close() |
| finally: |
| self._session = None |
| self._reset_requested = False |
|
|
| async def __aenter__(self) -> "ResettableSession": |
| return self |
|
|
| async def __aexit__(self, exc_type, exc, tb) -> None: |
| await self.close() |
|
|
| def __getattr__(self, name: str) -> Any: |
| return getattr(self._session, name) |
|
|
|
|
| __all__ = ["ResettableSession"] |
|
|