test / utils /http.py
gaoqilan's picture
Upload 103 files
1f1b4db verified
import httpx
from typing import Dict, Any, AsyncGenerator
from config.constants import DEFAULT_HEADERS
class HTTPClient:
def __init__(self):
self.client = httpx.AsyncClient(verify=False, http2=True)
async def post(
self, url: str, data: Any = None, headers: Dict = None, compress: bool = False, default_headers: bool = True, **kwargs
) -> httpx.Response:
"""
Send POST request with optional compression
"""
if default_headers:
request_headers = {**DEFAULT_HEADERS, **(headers or {})}
else:
request_headers = headers or {}
if compress:
from .compression import compress_data
data = compress_data(data)
request_headers["Content-Encoding"] = "gzip"
request_headers["Connect-Content-Encoding"] = "gzip"
request_headers["Content-Length"] = str(len(data))
# Remove explicitly specified params from kwargs
for param in ['url', 'data', 'stream', 'headers', 'verify', 'timeout']:
kwargs.pop(param, None)
response = await self.client.post(
url=url,
content=data, # Changed from data to content
headers=request_headers,
timeout=60,
**kwargs
)
return response
async def stream_post(
self, url: str, data: Any = None, headers: Dict = None, compress: bool = False, default_headers: bool = True, **kwargs
) -> AsyncGenerator[bytes, None]:
"""
Send POST request with streaming response
Yields chunks of response data as they arrive
"""
if default_headers:
request_headers = {**DEFAULT_HEADERS, **(headers or {})}
else:
request_headers = headers or {}
if compress:
from .compression import compress_data
data = compress_data(data)
request_headers["Content-Encoding"] = "gzip"
request_headers["Connect-Content-Encoding"] = "gzip"
request_headers["Content-Length"] = str(len(data))
# Remove explicitly specified params from kwargs
for param in ['url', 'data', 'stream', 'headers', 'verify', 'timeout']:
kwargs.pop(param, None)
async with self.client.stream(
method="POST",
url=url,
content=data,
headers=request_headers,
timeout=60,
**kwargs
) as response:
async for chunk in response.aiter_bytes():
yield chunk
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()