|
|
import asyncio |
|
|
import json |
|
|
from http import HTTPStatus |
|
|
from typing import Annotated, Any |
|
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Request |
|
|
from fastapi.responses import JSONResponse, StreamingResponse |
|
|
|
|
|
from langflow.logging.logger import log_buffer |
|
|
|
|
|
log_router = APIRouter(tags=["Log"]) |
|
|
|
|
|
|
|
|
NUMBER_OF_NOT_SENT_BEFORE_KEEPALIVE = 5 |
|
|
|
|
|
|
|
|
async def event_generator(request: Request): |
|
|
global log_buffer |
|
|
last_read_item = None |
|
|
current_not_sent = 0 |
|
|
while not await request.is_disconnected(): |
|
|
to_write: list[Any] = [] |
|
|
with log_buffer.get_write_lock(): |
|
|
if last_read_item is None: |
|
|
last_read_item = log_buffer.buffer[len(log_buffer.buffer) - 1] |
|
|
else: |
|
|
found_last = False |
|
|
for item in log_buffer.buffer: |
|
|
if found_last: |
|
|
to_write.append(item) |
|
|
last_read_item = item |
|
|
continue |
|
|
if item is last_read_item: |
|
|
found_last = True |
|
|
continue |
|
|
|
|
|
|
|
|
if not found_last: |
|
|
for item in log_buffer.buffer: |
|
|
to_write.append(item) |
|
|
last_read_item = item |
|
|
if to_write: |
|
|
for ts, msg in to_write: |
|
|
yield f"{json.dumps({ts: msg})}\n\n" |
|
|
else: |
|
|
current_not_sent += 1 |
|
|
if current_not_sent == NUMBER_OF_NOT_SENT_BEFORE_KEEPALIVE: |
|
|
current_not_sent = 0 |
|
|
yield "keepalive\n\n" |
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
@log_router.get("/logs-stream") |
|
|
async def stream_logs( |
|
|
request: Request, |
|
|
): |
|
|
"""HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs. |
|
|
|
|
|
It establishes a long-lived connection to the server and receives log messages in real-time. |
|
|
The client should use the header "Accept: text/event-stream". |
|
|
""" |
|
|
global log_buffer |
|
|
if log_buffer.enabled() is False: |
|
|
raise HTTPException( |
|
|
status_code=HTTPStatus.NOT_IMPLEMENTED, |
|
|
detail="Log retrieval is disabled", |
|
|
) |
|
|
|
|
|
return StreamingResponse(event_generator(request), media_type="text/event-stream") |
|
|
|
|
|
|
|
|
@log_router.get("/logs") |
|
|
async def logs( |
|
|
lines_before: Annotated[int, Query(description="The number of logs before the timestamp or the last log")] = 0, |
|
|
lines_after: Annotated[int, Query(description="The number of logs after the timestamp")] = 0, |
|
|
timestamp: Annotated[int, Query(description="The timestamp to start getting logs from")] = 0, |
|
|
): |
|
|
global log_buffer |
|
|
if log_buffer.enabled() is False: |
|
|
raise HTTPException( |
|
|
status_code=HTTPStatus.NOT_IMPLEMENTED, |
|
|
detail="Log retrieval is disabled", |
|
|
) |
|
|
if lines_after > 0 and lines_before > 0: |
|
|
raise HTTPException( |
|
|
status_code=HTTPStatus.BAD_REQUEST, |
|
|
detail="Cannot request logs before and after the timestamp", |
|
|
) |
|
|
if timestamp <= 0: |
|
|
if lines_after > 0: |
|
|
raise HTTPException( |
|
|
status_code=HTTPStatus.BAD_REQUEST, |
|
|
detail="Timestamp is required when requesting logs after the timestamp", |
|
|
) |
|
|
content = log_buffer.get_last_n(10) if lines_before <= 0 else log_buffer.get_last_n(lines_before) |
|
|
elif lines_before > 0: |
|
|
content = log_buffer.get_before_timestamp(timestamp=timestamp, lines=lines_before) |
|
|
elif lines_after > 0: |
|
|
content = log_buffer.get_after_timestamp(timestamp=timestamp, lines=lines_after) |
|
|
else: |
|
|
content = log_buffer.get_before_timestamp(timestamp=timestamp, lines=10) |
|
|
return JSONResponse(content=content) |
|
|
|