Spaces:
Sleeping
Sleeping
| import datetime | |
| import json | |
| import uuid | |
| from typing import Iterable | |
| import asyncio | |
| from fastapi import FastAPI, Request | |
| from sse_starlette.sse import EventSourceResponse | |
| from starlette.responses import FileResponse | |
| app = FastAPI() | |
| async def read_index(): | |
| return FileResponse("index.html") | |
| STREAM_DELAY = 1 # second | |
| RETRY_TIMEOUT = 15000 # millisecond | |
| def get_new_messages() -> Iterable: | |
| return [ | |
| { | |
| "event": "new_message", | |
| "retry": RETRY_TIMEOUT, | |
| "data": json.dumps( | |
| { | |
| "message": "test message", | |
| "datetime": datetime.datetime.now(datetime.timezone.utc).isoformat(), | |
| } | |
| ), | |
| "id": str(uuid.uuid4()), | |
| } | |
| ] | |
| async def event_generator(request: Request): | |
| while True: | |
| if await request.is_disconnected(): | |
| break | |
| for message in get_new_messages(): | |
| yield message | |
| await asyncio.sleep(STREAM_DELAY) | |
| async def message_stream(request: Request): | |
| return EventSourceResponse(event_generator(request)) | |