Spaces:
Sleeping
Sleeping
| import httpx | |
| from typing import AsyncGenerator | |
| class SSEClient: | |
| def __init__(self, url: str): | |
| self.url = url | |
| async def connect(self) -> AsyncGenerator[str, None]: | |
| print("a") | |
| async with httpx.AsyncClient(timeout=None) as client: | |
| async with client.stream("GET", self.url) as response: | |
| async for line in response.aiter_lines(): | |
| if line.startswith("data:"): | |
| yield line[5:].strip() |