govon-runtime / scripts /e2e_gpu_test /http_client.py
umyunsang's picture
Upload folder using huggingface_hub
d2585c1 verified
"""HTTP/SSE 클라이언트 레이어.
httpx 우선, urllib fallback. 요청/응답 자동 로깅.
"""
from __future__ import annotations
import json
from typing import Any, Optional
from .config import API_KEY, BASE_URL, TIMEOUT
try:
import httpx
_HTTP_BACKEND = "httpx"
def _build_headers() -> dict:
h = {"Content-Type": "application/json", "Accept": "application/json"}
if API_KEY:
h["X-API-Key"] = API_KEY
return h
async def http_get(path: str, timeout: float = TIMEOUT) -> tuple[int, dict]:
url = BASE_URL + path
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url, headers=_build_headers())
try:
return resp.status_code, resp.json()
except Exception:
return resp.status_code, {"_raw": resp.text[:200]}
async def http_post(path: str, body: dict, timeout: float = TIMEOUT) -> tuple[int, dict]:
url = BASE_URL + path
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=body, headers=_build_headers())
try:
return resp.status_code, resp.json()
except Exception:
return resp.status_code, {"_raw": resp.text[:200]}
async def http_post_sse(
path: str, body: dict, timeout: float = TIMEOUT
) -> tuple[int, list[dict]]:
"""SSE 스트리밍 POST. 청크를 수집하여 파싱된 이벤트 목록을 반환한다."""
url = BASE_URL + path
h = _build_headers()
h["Accept"] = "text/event-stream"
events: list[dict] = []
status_code = 0
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream("POST", url, json=body, headers=h) as resp:
status_code = resp.status_code
async for line in resp.aiter_lines():
line = line.strip()
if not line.startswith("data:"):
continue
payload = line[len("data:") :].strip()
if not payload:
continue
try:
events.append(json.loads(payload))
except json.JSONDecodeError:
events.append({"_raw": payload})
return status_code, events
async def http_get_raw(url: str, timeout: float = 10) -> tuple[int, str]:
"""Raw GET for external connectivity checks."""
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url)
return resp.status_code, resp.text[:200]
except ImportError:
import asyncio
import urllib.error
import urllib.request
_HTTP_BACKEND = "urllib"
def _build_headers() -> dict:
h = {"Content-Type": "application/json", "Accept": "application/json"}
if API_KEY:
h["X-API-Key"] = API_KEY
return h
def _sync_get(path: str, timeout: float) -> tuple[int, dict]:
url = BASE_URL + path
req = urllib.request.Request(url, headers=_build_headers(), method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, json.loads(r.read().decode())
except urllib.error.HTTPError as e:
return e.code, {}
async def http_get(path: str, timeout: float = TIMEOUT) -> tuple[int, dict]:
return await asyncio.to_thread(_sync_get, path, timeout)
def _sync_post(path: str, body: dict, timeout: float) -> tuple[int, dict]:
url = BASE_URL + path
data = json.dumps(body).encode()
req = urllib.request.Request(url, data=data, headers=_build_headers(), method="POST")
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, json.loads(r.read().decode())
except urllib.error.HTTPError as e:
return e.code, {}
async def http_post(path: str, body: dict, timeout: float = TIMEOUT) -> tuple[int, dict]:
return await asyncio.to_thread(_sync_post, path, body, timeout)
def _sync_post_sse(path: str, body: dict, timeout: float) -> tuple[int, list[dict]]:
url = BASE_URL + path
data = json.dumps(body).encode()
h = _build_headers()
h["Accept"] = "text/event-stream"
req = urllib.request.Request(url, data=data, headers=h, method="POST")
events: list[dict] = []
status_code = 0
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
status_code = r.status
for raw_line in r:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line.startswith("data:"):
continue
payload = line[len("data:") :].strip()
if not payload:
continue
try:
events.append(json.loads(payload))
except json.JSONDecodeError:
events.append({"_raw": payload})
except urllib.error.HTTPError as e:
status_code = e.code
return status_code, events
async def http_post_sse(
path: str, body: dict, timeout: float = TIMEOUT
) -> tuple[int, list[dict]]:
return await asyncio.to_thread(_sync_post_sse, path, body, timeout)
def _sync_get_raw(url: str, timeout: float) -> tuple[int, str]:
req = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, r.read().decode()[:200]
except urllib.error.HTTPError as e:
return e.code, ""
except Exception:
return 0, ""
async def http_get_raw(url: str, timeout: float = 10) -> tuple[int, str]:
return await asyncio.to_thread(_sync_get_raw, url, timeout)
def get_http_backend() -> str:
return _HTTP_BACKEND