Medium-MCP / src /http_pool.py
Nikhil Pravin Pise
feat: implement comprehensive improvement plan (Phases 1-5)
e98cc10
"""
HTTP Client Pool for Medium-MCP
Singleton pattern for httpx.AsyncClient connection pooling.
Addresses Critical Gap #5: Connection Leaks.
Based on httpx official documentation best practices:
- Single AsyncClient instance for connection reuse
- Configurable limits via httpx.Limits
- Proper async cleanup with aclose()
"""
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, AsyncGenerator
import httpx
from src.constants import (
DEFAULT_CONNECT_TIMEOUT,
DEFAULT_KEEPALIVE_CONNECTIONS,
DEFAULT_KEEPALIVE_EXPIRY,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_READ_TIMEOUT,
DEFAULT_USER_AGENT,
)
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
class HTTPClientPool:
"""
Singleton HTTP client pool for connection reuse.
This class ensures a single httpx.AsyncClient instance is shared
across the entire application, providing:
- Connection pooling (reuse TCP connections)
- HTTP/2 support
- Automatic redirect following
- Proper resource cleanup
Usage:
pool = HTTPClientPool()
client = await pool.get_client()
response = await client.get("https://example.com")
# Or with context manager:
async with http_session() as client:
response = await client.get("https://example.com")
"""
_instance: "HTTPClientPool | None" = None
_client: httpx.AsyncClient | None = None
def __new__(cls) -> "HTTPClientPool":
"""Ensure only one instance exists (Singleton pattern)."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
async def get_client(
self,
*,
max_connections: int = DEFAULT_MAX_CONNECTIONS,
max_keepalive_connections: int = DEFAULT_KEEPALIVE_CONNECTIONS,
keepalive_expiry: float = DEFAULT_KEEPALIVE_EXPIRY,
connect_timeout: float = DEFAULT_CONNECT_TIMEOUT,
read_timeout: float = DEFAULT_READ_TIMEOUT,
) -> httpx.AsyncClient:
"""
Get or create the shared HTTP client.
Args:
max_connections: Maximum concurrent connections
max_keepalive_connections: Maximum idle connections to keep
keepalive_expiry: How long to keep idle connections (seconds)
connect_timeout: Timeout for establishing connections
read_timeout: Timeout for reading responses
Returns:
Configured httpx.AsyncClient instance
"""
if self._client is None or self._client.is_closed:
logger.info("Creating new HTTP client pool")
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
keepalive_expiry=keepalive_expiry,
)
timeout = httpx.Timeout(
connect=connect_timeout,
read=read_timeout,
write=30.0,
pool=5.0,
)
self._client = httpx.AsyncClient(
limits=limits,
timeout=timeout,
http2=True,
follow_redirects=True,
headers={
"User-Agent": DEFAULT_USER_AGENT,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
},
)
logger.debug(
"HTTP client created with limits: max=%d, keepalive=%d",
max_connections,
max_keepalive_connections,
)
return self._client
async def close(self) -> None:
"""Close the HTTP client and release resources."""
if self._client is not None and not self._client.is_closed:
logger.info("Closing HTTP client pool")
await self._client.aclose()
self._client = None
@property
def is_connected(self) -> bool:
"""Check if client is active and connected."""
return self._client is not None and not self._client.is_closed
# Global pool instance
_pool = HTTPClientPool()
async def get_http_client() -> httpx.AsyncClient:
"""
Get the shared HTTP client instance.
This is the primary way to access the HTTP pool.
Returns:
Configured httpx.AsyncClient for making requests
Example:
client = await get_http_client()
response = await client.get("https://medium.com/...")
"""
return await _pool.get_client()
async def close_http_pool() -> None:
"""Close the global HTTP pool (call on shutdown)."""
await _pool.close()
@asynccontextmanager
async def http_session() -> AsyncGenerator[httpx.AsyncClient, None]:
"""
Context manager for HTTP operations.
Provides a client with automatic error logging.
Does NOT close the client (it's a shared pool).
Example:
async with http_session() as client:
response = await client.get("https://example.com")
data = response.text
"""
client = await get_http_client()
try:
yield client
except httpx.HTTPStatusError as e:
logger.error(
"HTTP error: status=%d url=%s",
e.response.status_code,
str(e.request.url),
)
raise
except httpx.RequestError as e:
logger.error(
"HTTP request error: %s url=%s",
type(e).__name__,
str(e.request.url) if e.request else "unknown",
)
raise
@asynccontextmanager
async def new_http_client(**kwargs: object) -> AsyncGenerator[httpx.AsyncClient, None]:
"""
Create a new temporary HTTP client (not from pool).
Use this when you need isolated client settings.
The client is closed when the context exits.
Example:
async with new_http_client(timeout=60.0) as client:
response = await client.get("https://slow-api.example.com")
"""
client = httpx.AsyncClient(**kwargs) # type: ignore
try:
yield client
finally:
await client.aclose()