Spaces:
Running
Running
| import asyncio | |
| import json | |
| from http import HTTPStatus | |
| from typing import Annotated, Any | |
| from fastapi import APIRouter, HTTPException, Query, Request | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from langflow.logging.logger import log_buffer | |
| log_router = APIRouter(tags=["Log"]) | |
| NUMBER_OF_NOT_SENT_BEFORE_KEEPALIVE = 5 | |
| async def event_generator(request: Request): | |
| global log_buffer # noqa: PLW0602 | |
| last_read_item = None | |
| current_not_sent = 0 | |
| while not await request.is_disconnected(): | |
| to_write: list[Any] = [] | |
| with log_buffer.get_write_lock(): | |
| if last_read_item is None: | |
| last_read_item = log_buffer.buffer[len(log_buffer.buffer) - 1] | |
| else: | |
| found_last = False | |
| for item in log_buffer.buffer: | |
| if found_last: | |
| to_write.append(item) | |
| last_read_item = item | |
| continue | |
| if item is last_read_item: | |
| found_last = True | |
| continue | |
| # in case the last item is nomore in the buffer | |
| if not found_last: | |
| for item in log_buffer.buffer: | |
| to_write.append(item) | |
| last_read_item = item | |
| if to_write: | |
| for ts, msg in to_write: | |
| yield f"{json.dumps({ts: msg})}\n\n" | |
| else: | |
| current_not_sent += 1 | |
| if current_not_sent == NUMBER_OF_NOT_SENT_BEFORE_KEEPALIVE: | |
| current_not_sent = 0 | |
| yield "keepalive\n\n" | |
| await asyncio.sleep(1) | |
| async def stream_logs( | |
| request: Request, | |
| ): | |
| """HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs. | |
| It establishes a long-lived connection to the server and receives log messages in real-time. | |
| The client should use the header "Accept: text/event-stream". | |
| """ | |
| global log_buffer # noqa: PLW0602 | |
| if log_buffer.enabled() is False: | |
| raise HTTPException( | |
| status_code=HTTPStatus.NOT_IMPLEMENTED, | |
| detail="Log retrieval is disabled", | |
| ) | |
| return StreamingResponse(event_generator(request), media_type="text/event-stream") | |
| async def logs( | |
| lines_before: Annotated[int, Query(description="The number of logs before the timestamp or the last log")] = 0, | |
| lines_after: Annotated[int, Query(description="The number of logs after the timestamp")] = 0, | |
| timestamp: Annotated[int, Query(description="The timestamp to start getting logs from")] = 0, | |
| ): | |
| global log_buffer # noqa: PLW0602 | |
| if log_buffer.enabled() is False: | |
| raise HTTPException( | |
| status_code=HTTPStatus.NOT_IMPLEMENTED, | |
| detail="Log retrieval is disabled", | |
| ) | |
| if lines_after > 0 and lines_before > 0: | |
| raise HTTPException( | |
| status_code=HTTPStatus.BAD_REQUEST, | |
| detail="Cannot request logs before and after the timestamp", | |
| ) | |
| if timestamp <= 0: | |
| if lines_after > 0: | |
| raise HTTPException( | |
| status_code=HTTPStatus.BAD_REQUEST, | |
| detail="Timestamp is required when requesting logs after the timestamp", | |
| ) | |
| content = log_buffer.get_last_n(10) if lines_before <= 0 else log_buffer.get_last_n(lines_before) | |
| elif lines_before > 0: | |
| content = log_buffer.get_before_timestamp(timestamp=timestamp, lines=lines_before) | |
| elif lines_after > 0: | |
| content = log_buffer.get_after_timestamp(timestamp=timestamp, lines=lines_after) | |
| else: | |
| content = log_buffer.get_before_timestamp(timestamp=timestamp, lines=10) | |
| return JSONResponse(content=content) | |