a2p / api /provider.py
Spooker's picture
Upload 19 files
28a52dc verified
"""Call the assistant-ui upstream LLM endpoint."""
from __future__ import annotations
from collections.abc import AsyncIterator
import httpx
from config import UPSTREAM_HEADERS, UPSTREAM_URL
class UpstreamError(Exception):
def __init__(self, status_code: int, message: str):
self.status_code = status_code
super().__init__(message)
async def call_upstream(payload: dict) -> AsyncIterator[str]:
"""POST to assistant-ui and yield raw SSE lines as they arrive."""
async with httpx.AsyncClient(timeout=httpx.Timeout(120, connect=10)) as client:
async with client.stream(
"POST",
UPSTREAM_URL,
json=payload,
headers=UPSTREAM_HEADERS,
) as resp:
if resp.status_code == 429:
raise UpstreamError(429, "Rate limit exceeded (upstream: 5 req / 30s per IP)")
if resp.status_code >= 400:
body = await resp.aread()
raise UpstreamError(resp.status_code, body.decode(errors="replace")[:200])
async for line in resp.aiter_lines():
yield line
async def call_upstream_full(payload: dict) -> list[str]:
"""POST to assistant-ui and collect all SSE lines (for non-stream mode)."""
lines: list[str] = []
async for line in call_upstream(payload):
lines.append(line)
return lines