| """ |
| 日志路由模块 |
| """ |
|
|
| from datetime import datetime |
| from typing import Dict, List, Optional |
|
|
| from fastapi import ( |
| APIRouter, |
| Body, |
| HTTPException, |
| Path, |
| Query, |
| Request, |
| Response, |
| status, |
| ) |
| from pydantic import BaseModel |
|
|
| from app.core.security import verify_auth_token |
| from app.log.logger import get_log_routes_logger |
| from app.service.error_log import error_log_service |
|
|
| router = APIRouter(prefix="/api/logs", tags=["logs"]) |
|
|
| logger = get_log_routes_logger() |
|
|
|
|
| class ErrorLogListItem(BaseModel): |
| id: int |
| gemini_key: Optional[str] = None |
| error_type: Optional[str] = None |
| error_code: Optional[int] = None |
| model_name: Optional[str] = None |
| request_time: Optional[datetime] = None |
|
|
|
|
| class ErrorLogListResponse(BaseModel): |
| logs: List[ErrorLogListItem] |
| total: int |
|
|
|
|
| @router.get("/errors", response_model=ErrorLogListResponse) |
| async def get_error_logs_api( |
| request: Request, |
| limit: int = Query(10, ge=1, le=1000), |
| offset: int = Query(0, ge=0), |
| key_search: Optional[str] = Query( |
| None, description="Search term for Gemini key (partial match)" |
| ), |
| error_search: Optional[str] = Query( |
| None, description="Search term for error type or log message" |
| ), |
| error_code_search: Optional[str] = Query( |
| None, description="Search term for error code" |
| ), |
| start_date: Optional[datetime] = Query( |
| None, description="Start datetime for filtering" |
| ), |
| end_date: Optional[datetime] = Query( |
| None, description="End datetime for filtering" |
| ), |
| sort_by: str = Query( |
| "id", description="Field to sort by (e.g., 'id', 'request_time')" |
| ), |
| sort_order: str = Query("desc", description="Sort order ('asc' or 'desc')"), |
| ): |
| """ |
| 获取错误日志列表 (返回错误码),支持过滤和排序 |
| |
| Args: |
| request: 请求对象 |
| limit: 限制数量 |
| offset: 偏移量 |
| key_search: 密钥搜索 |
| error_search: 错误搜索 (可能搜索类型或日志内容,由DB层决定) |
| error_code_search: 错误码搜索 |
| start_date: 开始日期 |
| end_date: 结束日期 |
| sort_by: 排序字段 |
| sort_order: 排序顺序 |
| |
| Returns: |
| ErrorLogListResponse: An object containing the list of logs (with error_code) and the total count. |
| """ |
| auth_token = request.cookies.get("auth_token") |
| if not auth_token or not verify_auth_token(auth_token): |
| logger.warning("Unauthorized access attempt to error logs list") |
| raise HTTPException(status_code=401, detail="Not authenticated") |
|
|
| try: |
| result = await error_log_service.process_get_error_logs( |
| limit=limit, |
| offset=offset, |
| key_search=key_search, |
| error_search=error_search, |
| error_code_search=error_code_search, |
| start_date=start_date, |
| end_date=end_date, |
| sort_by=sort_by, |
| sort_order=sort_order, |
| ) |
| logs_data = result["logs"] |
| total_count = result["total"] |
|
|
| validated_logs = [ErrorLogListItem(**log) for log in logs_data] |
| return ErrorLogListResponse(logs=validated_logs, total=total_count) |
| except Exception as e: |
| logger.exception(f"Failed to get error logs list: {str(e)}") |
| raise HTTPException( |
| status_code=500, detail=f"Failed to get error logs list: {str(e)}" |
| ) |
|
|
|
|
| class ErrorLogDetailResponse(BaseModel): |
| id: int |
| gemini_key: Optional[str] = None |
| error_type: Optional[str] = None |
| error_log: Optional[str] = None |
| request_msg: Optional[str] = None |
| model_name: Optional[str] = None |
| request_time: Optional[datetime] = None |
| error_code: Optional[int] = None |
|
|
|
|
| @router.get("/errors/{log_id}/details", response_model=ErrorLogDetailResponse) |
| async def get_error_log_detail_api(request: Request, log_id: int = Path(..., ge=1)): |
| """ |
| 根据日志 ID 获取错误日志的详细信息 (包括 error_log 和 request_msg) |
| """ |
| auth_token = request.cookies.get("auth_token") |
| if not auth_token or not verify_auth_token(auth_token): |
| logger.warning( |
| f"Unauthorized access attempt to error log details for ID: {log_id}" |
| ) |
| raise HTTPException(status_code=401, detail="Not authenticated") |
|
|
| try: |
| log_details = await error_log_service.process_get_error_log_details( |
| log_id=log_id |
| ) |
| if not log_details: |
| raise HTTPException(status_code=404, detail="Error log not found") |
|
|
| return ErrorLogDetailResponse(**log_details) |
| except HTTPException as http_exc: |
| raise http_exc |
| except Exception as e: |
| logger.exception(f"Failed to get error log details for ID {log_id}: {str(e)}") |
| raise HTTPException( |
| status_code=500, detail=f"Failed to get error log details: {str(e)}" |
| ) |
|
|
|
|
| @router.get("/errors/lookup", response_model=ErrorLogDetailResponse) |
| async def lookup_error_log_by_info( |
| request: Request, |
| gemini_key: str = Query(..., description="完整的 Gemini key"), |
| timestamp: datetime = Query(..., description="请求时间 (ISO8601)"), |
| status_code: Optional[int] = Query(None, description="错误码 (可选)"), |
| window_seconds: int = Query( |
| 100, ge=1, le=300, description="时间窗口(秒), 默认100秒" |
| ), |
| ): |
| """ |
| 通过 key / 错误码 / 时间窗口 查找最匹配的一条错误日志详情。 |
| """ |
| auth_token = request.cookies.get("auth_token") |
| if not auth_token or not verify_auth_token(auth_token): |
| logger.warning("Unauthorized access attempt to lookup error log by info") |
| raise HTTPException(status_code=401, detail="Not authenticated") |
|
|
| try: |
| detail = await error_log_service.process_find_error_log_by_info( |
| gemini_key=gemini_key, |
| timestamp=timestamp, |
| status_code=status_code, |
| window_seconds=window_seconds, |
| ) |
| if not detail: |
| raise HTTPException(status_code=404, detail="No matching error log found") |
| return ErrorLogDetailResponse(**detail) |
| except HTTPException as http_exc: |
| raise http_exc |
| except Exception as e: |
| logger.exception( |
| f"Failed to lookup error log by info for key=***{gemini_key[-4:] if gemini_key else ''}: {str(e)}" |
| ) |
| raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
| @router.delete("/errors", status_code=status.HTTP_204_NO_CONTENT) |
| async def delete_error_logs_bulk_api( |
| request: Request, payload: Dict[str, List[int]] = Body(...) |
| ): |
| """ |
| 批量删除错误日志 (异步) |
| """ |
| auth_token = request.cookies.get("auth_token") |
| if not auth_token or not verify_auth_token(auth_token): |
| logger.warning("Unauthorized access attempt to bulk delete error logs") |
| raise HTTPException(status_code=401, detail="Not authenticated") |
|
|
| log_ids = payload.get("ids") |
| if not log_ids: |
| raise HTTPException(status_code=400, detail="No log IDs provided for deletion.") |
|
|
| try: |
| deleted_count = await error_log_service.process_delete_error_logs_by_ids( |
| log_ids |
| ) |
| |
| logger.info( |
| f"Attempted bulk deletion for {deleted_count} error logs with IDs: {log_ids}" |
| ) |
| return Response(status_code=status.HTTP_204_NO_CONTENT) |
| except Exception as e: |
| logger.exception(f"Error bulk deleting error logs with IDs {log_ids}: {str(e)}") |
| raise HTTPException( |
| status_code=500, detail="Internal server error during bulk deletion" |
| ) |
|
|
|
|
| @router.delete("/errors/all", status_code=status.HTTP_204_NO_CONTENT) |
| async def delete_all_error_logs_api(request: Request): |
| """ |
| 删除所有错误日志 (异步) |
| """ |
| auth_token = request.cookies.get("auth_token") |
| if not auth_token or not verify_auth_token(auth_token): |
| logger.warning("Unauthorized access attempt to delete all error logs") |
| raise HTTPException(status_code=401, detail="Not authenticated") |
|
|
| try: |
| await error_log_service.process_delete_all_error_logs() |
| logger.info("Successfully deleted all error logs.") |
| |
| return Response(status_code=status.HTTP_204_NO_CONTENT) |
| except Exception as e: |
| logger.exception(f"Error deleting all error logs: {str(e)}") |
| raise HTTPException( |
| status_code=500, detail="Internal server error during deletion of all logs" |
| ) |
|
|
|
|
| @router.delete("/errors/{log_id}", status_code=status.HTTP_204_NO_CONTENT) |
| async def delete_error_log_api(request: Request, log_id: int = Path(..., ge=1)): |
| """ |
| 删除单个错误日志 (异步) |
| """ |
| auth_token = request.cookies.get("auth_token") |
| if not auth_token or not verify_auth_token(auth_token): |
| logger.warning(f"Unauthorized access attempt to delete error log ID: {log_id}") |
| raise HTTPException(status_code=401, detail="Not authenticated") |
|
|
| try: |
| success = await error_log_service.process_delete_error_log_by_id(log_id) |
| if not success: |
| |
| raise HTTPException( |
| status_code=404, detail=f"Error log with ID {log_id} not found" |
| ) |
| logger.info(f"Successfully deleted error log with ID: {log_id}") |
| return Response(status_code=status.HTTP_204_NO_CONTENT) |
| except HTTPException as http_exc: |
| raise http_exc |
| except Exception as e: |
| logger.exception(f"Error deleting error log with ID {log_id}: {str(e)}") |
| raise HTTPException( |
| status_code=500, detail="Internal server error during deletion" |
| ) |
|
|