Spaces:
Runtime error
Runtime error
| # PATH: bot/integrations/http.py | |
| import httpx | |
| from bot.core.settings import HTTP_TIMEOUT_SEC, MAX_RETRIES | |
| _async_client: httpx.AsyncClient | None = None | |
| def get_client() -> httpx.AsyncClient: | |
| global _async_client | |
| if _async_client is None: | |
| _async_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT_SEC, follow_redirects=True) | |
| return _async_client | |
| async def post_json(url: str, headers: dict, payload: dict) -> dict: | |
| c = get_client() | |
| last_err = None | |
| for _ in range(MAX_RETRIES + 1): | |
| try: | |
| r = await c.post(url, headers=headers, json=payload) | |
| data = r.json() if r.headers.get("content-type","").startswith("application/json") else {} | |
| if r.status_code >= 400: | |
| return {"ok": False, "status": r.status_code, "data": data, "text": r.text[:300]} | |
| return data | |
| except Exception as e: | |
| last_err = e | |
| return {"ok": False, "status": 0, "err": str(last_err)} |