Spaces:
Sleeping
Sleeping
| """ | |
| HTTP Client Pool for Medium-MCP | |
| Singleton pattern for httpx.AsyncClient connection pooling. | |
| Addresses Critical Gap #5: Connection Leaks. | |
| Based on httpx official documentation best practices: | |
| - Single AsyncClient instance for connection reuse | |
| - Configurable limits via httpx.Limits | |
| - Proper async cleanup with aclose() | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from contextlib import asynccontextmanager | |
| from typing import TYPE_CHECKING, AsyncGenerator | |
| import httpx | |
| from src.constants import ( | |
| DEFAULT_CONNECT_TIMEOUT, | |
| DEFAULT_KEEPALIVE_CONNECTIONS, | |
| DEFAULT_KEEPALIVE_EXPIRY, | |
| DEFAULT_MAX_CONNECTIONS, | |
| DEFAULT_READ_TIMEOUT, | |
| DEFAULT_USER_AGENT, | |
| ) | |
| if TYPE_CHECKING: | |
| pass | |
| logger = logging.getLogger(__name__) | |
| class HTTPClientPool: | |
| """ | |
| Singleton HTTP client pool for connection reuse. | |
| This class ensures a single httpx.AsyncClient instance is shared | |
| across the entire application, providing: | |
| - Connection pooling (reuse TCP connections) | |
| - HTTP/2 support | |
| - Automatic redirect following | |
| - Proper resource cleanup | |
| Usage: | |
| pool = HTTPClientPool() | |
| client = await pool.get_client() | |
| response = await client.get("https://example.com") | |
| # Or with context manager: | |
| async with http_session() as client: | |
| response = await client.get("https://example.com") | |
| """ | |
| _instance: "HTTPClientPool | None" = None | |
| _client: httpx.AsyncClient | None = None | |
| def __new__(cls) -> "HTTPClientPool": | |
| """Ensure only one instance exists (Singleton pattern).""" | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| async def get_client( | |
| self, | |
| *, | |
| max_connections: int = DEFAULT_MAX_CONNECTIONS, | |
| max_keepalive_connections: int = DEFAULT_KEEPALIVE_CONNECTIONS, | |
| keepalive_expiry: float = DEFAULT_KEEPALIVE_EXPIRY, | |
| connect_timeout: float = DEFAULT_CONNECT_TIMEOUT, | |
| read_timeout: float = DEFAULT_READ_TIMEOUT, | |
| ) -> httpx.AsyncClient: | |
| """ | |
| Get or create the shared HTTP client. | |
| Args: | |
| max_connections: Maximum concurrent connections | |
| max_keepalive_connections: Maximum idle connections to keep | |
| keepalive_expiry: How long to keep idle connections (seconds) | |
| connect_timeout: Timeout for establishing connections | |
| read_timeout: Timeout for reading responses | |
| Returns: | |
| Configured httpx.AsyncClient instance | |
| """ | |
| if self._client is None or self._client.is_closed: | |
| logger.info("Creating new HTTP client pool") | |
| limits = httpx.Limits( | |
| max_connections=max_connections, | |
| max_keepalive_connections=max_keepalive_connections, | |
| keepalive_expiry=keepalive_expiry, | |
| ) | |
| timeout = httpx.Timeout( | |
| connect=connect_timeout, | |
| read=read_timeout, | |
| write=30.0, | |
| pool=5.0, | |
| ) | |
| self._client = httpx.AsyncClient( | |
| limits=limits, | |
| timeout=timeout, | |
| http2=True, | |
| follow_redirects=True, | |
| headers={ | |
| "User-Agent": DEFAULT_USER_AGENT, | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "Connection": "keep-alive", | |
| }, | |
| ) | |
| logger.debug( | |
| "HTTP client created with limits: max=%d, keepalive=%d", | |
| max_connections, | |
| max_keepalive_connections, | |
| ) | |
| return self._client | |
| async def close(self) -> None: | |
| """Close the HTTP client and release resources.""" | |
| if self._client is not None and not self._client.is_closed: | |
| logger.info("Closing HTTP client pool") | |
| await self._client.aclose() | |
| self._client = None | |
| def is_connected(self) -> bool: | |
| """Check if client is active and connected.""" | |
| return self._client is not None and not self._client.is_closed | |
| # Global pool instance | |
| _pool = HTTPClientPool() | |
| async def get_http_client() -> httpx.AsyncClient: | |
| """ | |
| Get the shared HTTP client instance. | |
| This is the primary way to access the HTTP pool. | |
| Returns: | |
| Configured httpx.AsyncClient for making requests | |
| Example: | |
| client = await get_http_client() | |
| response = await client.get("https://medium.com/...") | |
| """ | |
| return await _pool.get_client() | |
| async def close_http_pool() -> None: | |
| """Close the global HTTP pool (call on shutdown).""" | |
| await _pool.close() | |
| async def http_session() -> AsyncGenerator[httpx.AsyncClient, None]: | |
| """ | |
| Context manager for HTTP operations. | |
| Provides a client with automatic error logging. | |
| Does NOT close the client (it's a shared pool). | |
| Example: | |
| async with http_session() as client: | |
| response = await client.get("https://example.com") | |
| data = response.text | |
| """ | |
| client = await get_http_client() | |
| try: | |
| yield client | |
| except httpx.HTTPStatusError as e: | |
| logger.error( | |
| "HTTP error: status=%d url=%s", | |
| e.response.status_code, | |
| str(e.request.url), | |
| ) | |
| raise | |
| except httpx.RequestError as e: | |
| logger.error( | |
| "HTTP request error: %s url=%s", | |
| type(e).__name__, | |
| str(e.request.url) if e.request else "unknown", | |
| ) | |
| raise | |
| async def new_http_client(**kwargs: object) -> AsyncGenerator[httpx.AsyncClient, None]: | |
| """ | |
| Create a new temporary HTTP client (not from pool). | |
| Use this when you need isolated client settings. | |
| The client is closed when the context exits. | |
| Example: | |
| async with new_http_client(timeout=60.0) as client: | |
| response = await client.get("https://slow-api.example.com") | |
| """ | |
| client = httpx.AsyncClient(**kwargs) # type: ignore | |
| try: | |
| yield client | |
| finally: | |
| await client.aclose() | |