Spaces:
Paused
Paused
File size: 6,107 Bytes
d2585c1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | """HTTP/SSE ํด๋ผ์ด์ธํธ ๋ ์ด์ด.
httpx ์ฐ์ , urllib fallback. ์์ฒญ/์๋ต ์๋ ๋ก๊น
.
"""
from __future__ import annotations
import json
from typing import Any, Optional
from .config import API_KEY, BASE_URL, TIMEOUT
try:
import httpx
_HTTP_BACKEND = "httpx"
def _build_headers() -> dict:
h = {"Content-Type": "application/json", "Accept": "application/json"}
if API_KEY:
h["X-API-Key"] = API_KEY
return h
async def http_get(path: str, timeout: float = TIMEOUT) -> tuple[int, dict]:
url = BASE_URL + path
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url, headers=_build_headers())
try:
return resp.status_code, resp.json()
except Exception:
return resp.status_code, {"_raw": resp.text[:200]}
async def http_post(path: str, body: dict, timeout: float = TIMEOUT) -> tuple[int, dict]:
url = BASE_URL + path
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=body, headers=_build_headers())
try:
return resp.status_code, resp.json()
except Exception:
return resp.status_code, {"_raw": resp.text[:200]}
async def http_post_sse(
path: str, body: dict, timeout: float = TIMEOUT
) -> tuple[int, list[dict]]:
"""SSE ์คํธ๋ฆฌ๋ฐ POST. ์ฒญํฌ๋ฅผ ์์งํ์ฌ ํ์ฑ๋ ์ด๋ฒคํธ ๋ชฉ๋ก์ ๋ฐํํ๋ค."""
url = BASE_URL + path
h = _build_headers()
h["Accept"] = "text/event-stream"
events: list[dict] = []
status_code = 0
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream("POST", url, json=body, headers=h) as resp:
status_code = resp.status_code
async for line in resp.aiter_lines():
line = line.strip()
if not line.startswith("data:"):
continue
payload = line[len("data:") :].strip()
if not payload:
continue
try:
events.append(json.loads(payload))
except json.JSONDecodeError:
events.append({"_raw": payload})
return status_code, events
async def http_get_raw(url: str, timeout: float = 10) -> tuple[int, str]:
"""Raw GET for external connectivity checks."""
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url)
return resp.status_code, resp.text[:200]
except ImportError:
import asyncio
import urllib.error
import urllib.request
_HTTP_BACKEND = "urllib"
def _build_headers() -> dict:
h = {"Content-Type": "application/json", "Accept": "application/json"}
if API_KEY:
h["X-API-Key"] = API_KEY
return h
def _sync_get(path: str, timeout: float) -> tuple[int, dict]:
url = BASE_URL + path
req = urllib.request.Request(url, headers=_build_headers(), method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, json.loads(r.read().decode())
except urllib.error.HTTPError as e:
return e.code, {}
async def http_get(path: str, timeout: float = TIMEOUT) -> tuple[int, dict]:
return await asyncio.to_thread(_sync_get, path, timeout)
def _sync_post(path: str, body: dict, timeout: float) -> tuple[int, dict]:
url = BASE_URL + path
data = json.dumps(body).encode()
req = urllib.request.Request(url, data=data, headers=_build_headers(), method="POST")
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, json.loads(r.read().decode())
except urllib.error.HTTPError as e:
return e.code, {}
async def http_post(path: str, body: dict, timeout: float = TIMEOUT) -> tuple[int, dict]:
return await asyncio.to_thread(_sync_post, path, body, timeout)
def _sync_post_sse(path: str, body: dict, timeout: float) -> tuple[int, list[dict]]:
url = BASE_URL + path
data = json.dumps(body).encode()
h = _build_headers()
h["Accept"] = "text/event-stream"
req = urllib.request.Request(url, data=data, headers=h, method="POST")
events: list[dict] = []
status_code = 0
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
status_code = r.status
for raw_line in r:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line.startswith("data:"):
continue
payload = line[len("data:") :].strip()
if not payload:
continue
try:
events.append(json.loads(payload))
except json.JSONDecodeError:
events.append({"_raw": payload})
except urllib.error.HTTPError as e:
status_code = e.code
return status_code, events
async def http_post_sse(
path: str, body: dict, timeout: float = TIMEOUT
) -> tuple[int, list[dict]]:
return await asyncio.to_thread(_sync_post_sse, path, body, timeout)
def _sync_get_raw(url: str, timeout: float) -> tuple[int, str]:
req = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, r.read().decode()[:200]
except urllib.error.HTTPError as e:
return e.code, ""
except Exception:
return 0, ""
async def http_get_raw(url: str, timeout: float = 10) -> tuple[int, str]:
return await asyncio.to_thread(_sync_get_raw, url, timeout)
def get_http_backend() -> str:
return _HTTP_BACKEND
|