| """ |
| 日志路由模块 - 处理 /logs/* 相关的HTTP请求和WebSocket连接 |
| """ |
|
|
| import asyncio |
| import datetime |
| import os |
|
|
| from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect |
| from fastapi.responses import FileResponse, JSONResponse |
| from starlette.websockets import WebSocketState |
|
|
| import config |
| from log import log |
| from src.utils import verify_panel_token |
| from .utils import ConnectionManager |
|
|
|
|
| |
| router = APIRouter(prefix="/logs", tags=["logs"]) |
|
|
| |
| manager = ConnectionManager() |
|
|
|
|
| @router.post("/clear") |
| async def clear_logs(token: str = Depends(verify_panel_token)): |
| """清空日志文件""" |
| try: |
| |
| log_file_path = os.getenv("LOG_FILE", "log.txt") |
|
|
| |
| if os.path.exists(log_file_path): |
| try: |
| |
| |
| with open(log_file_path, "w", encoding="utf-8") as f: |
| f.write("") |
| f.flush() |
| |
| log.info(f"日志文件已清空: {log_file_path}") |
|
|
| |
| await manager.broadcast("--- 日志文件已清空 ---") |
|
|
| return JSONResponse( |
| content={"message": f"日志文件已清空: {os.path.basename(log_file_path)}"} |
| ) |
| except Exception as e: |
| log.error(f"清空日志文件失败: {e}") |
| raise HTTPException(status_code=500, detail=f"清空日志文件失败: {str(e)}") |
| else: |
| return JSONResponse(content={"message": "日志文件不存在"}) |
|
|
| except Exception as e: |
| log.error(f"清空日志文件失败: {e}") |
| raise HTTPException(status_code=500, detail=f"清空日志文件失败: {str(e)}") |
|
|
|
|
| @router.get("/download") |
| async def download_logs(token: str = Depends(verify_panel_token)): |
| """下载日志文件""" |
| try: |
| |
| log_file_path = os.getenv("LOG_FILE", "log.txt") |
|
|
| |
| if not os.path.exists(log_file_path): |
| raise HTTPException(status_code=404, detail="日志文件不存在") |
|
|
| |
| file_size = os.path.getsize(log_file_path) |
| if file_size == 0: |
| raise HTTPException(status_code=404, detail="日志文件为空") |
|
|
| |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| filename = f"gcli2api_logs_{timestamp}.txt" |
|
|
| log.info(f"下载日志文件: {log_file_path}") |
|
|
| return FileResponse( |
| path=log_file_path, |
| filename=filename, |
| media_type="text/plain", |
| headers={"Content-Disposition": f"attachment; filename={filename}"}, |
| ) |
|
|
| except HTTPException: |
| raise |
| except Exception as e: |
| log.error(f"下载日志文件失败: {e}") |
| raise HTTPException(status_code=500, detail=f"下载日志文件失败: {str(e)}") |
|
|
|
|
| @router.websocket("/stream") |
| async def websocket_logs(websocket: WebSocket): |
| """WebSocket端点,用于实时日志流""" |
| |
| token = websocket.query_params.get("token") |
|
|
| if not token: |
| await websocket.close(code=403, reason="Missing authentication token") |
| log.warning("WebSocket连接被拒绝: 缺少认证token") |
| return |
|
|
| |
| try: |
| panel_password = await config.get_panel_password() |
| if token != panel_password: |
| await websocket.close(code=403, reason="Invalid authentication token") |
| log.warning("WebSocket连接被拒绝: token验证失败") |
| return |
| except Exception as e: |
| await websocket.close(code=1011, reason="Authentication error") |
| log.error(f"WebSocket认证过程出错: {e}") |
| return |
|
|
| |
| if not await manager.connect(websocket): |
| return |
|
|
| try: |
| |
| log_file_path = os.getenv("LOG_FILE", "log.txt") |
|
|
| |
| if os.path.exists(log_file_path): |
| try: |
| |
| with open(log_file_path, "r", encoding="utf-8") as f: |
| lines = f.readlines() |
| |
| for line in lines[-50:]: |
| if line.strip(): |
| await websocket.send_text(line.strip()) |
| except Exception as e: |
| await websocket.send_text(f"Error reading log file: {e}") |
| log.error(f"WebSocket初始日志读取错误: {e}") |
|
|
| |
| last_size = os.path.getsize(log_file_path) if os.path.exists(log_file_path) else 0 |
| max_read_size = 8192 |
| check_interval = 2 |
|
|
| |
| |
| async def listen_for_disconnect(): |
| try: |
| while True: |
| await websocket.receive_text() |
| except Exception: |
| pass |
|
|
| listener_task = asyncio.create_task(listen_for_disconnect()) |
|
|
| try: |
| while websocket.client_state == WebSocketState.CONNECTED: |
| |
| |
| done, pending = await asyncio.wait( |
| [listener_task], |
| timeout=check_interval, |
| return_when=asyncio.FIRST_COMPLETED |
| ) |
|
|
| |
| if listener_task in done: |
| break |
|
|
| if os.path.exists(log_file_path): |
| current_size = os.path.getsize(log_file_path) |
| if current_size > last_size: |
| |
| read_size = min(current_size - last_size, max_read_size) |
|
|
| try: |
| |
| with open(log_file_path, "r", encoding="utf-8", errors="replace") as f: |
| f.seek(last_size) |
| new_content = f.read(read_size) |
| |
|
|
| |
| if not new_content: |
| last_size = current_size |
| continue |
|
|
| |
| lines = new_content.splitlines(keepends=True) |
| if lines: |
| |
| if not lines[-1].endswith("\n") and len(lines) > 1: |
| |
| for line in lines[:-1]: |
| if line.strip(): |
| await websocket.send_text(line.rstrip()) |
| |
| last_size += len(new_content.encode("utf-8")) - len( |
| lines[-1].encode("utf-8") |
| ) |
| else: |
| |
| for line in lines: |
| if line.strip(): |
| await websocket.send_text(line.rstrip()) |
| last_size += len(new_content.encode("utf-8")) |
| except UnicodeDecodeError as e: |
| |
| log.warning(f"WebSocket日志读取编码错误: {e}, 跳过部分内容") |
| last_size = current_size |
| except Exception as e: |
| await websocket.send_text(f"Error reading new content: {e}") |
| |
| last_size = current_size |
|
|
| |
| elif current_size < last_size: |
| last_size = 0 |
| await websocket.send_text("--- 日志已清空 ---") |
|
|
| finally: |
| |
| if not listener_task.done(): |
| listener_task.cancel() |
| try: |
| await listener_task |
| except asyncio.CancelledError: |
| pass |
|
|
| except WebSocketDisconnect: |
| pass |
| except Exception as e: |
| log.error(f"WebSocket logs error: {e}") |
| finally: |
| manager.disconnect(websocket) |
|
|