File size: 3,619 Bytes
1a9e2c2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | """全局日志模块 - 单例模式的日志管理器"""
import sys
import logging
from pathlib import Path
from logging.handlers import RotatingFileHandler
from app.core.config import setting
# 过滤模式
FILTER_PATTERNS = [
"chunk: b'", # SSE原始字节
"Got event:", # SSE事件
"Closing", # SSE关闭
]
class MCPLogFilter(logging.Filter):
"""MCP日志过滤器 - 过滤大量数据的DEBUG日志"""
def filter(self, record: logging.LogRecord) -> bool:
"""过滤日志"""
# 过滤SSE的DEBUG日志
if record.name == "sse_starlette.sse" and record.levelno == logging.DEBUG:
msg = record.getMessage()
return not any(p in msg for p in FILTER_PATTERNS)
# 过滤MCP streamable_http的DEBUG日志
if "mcp.server.streamable_http" in record.name and record.levelno == logging.DEBUG:
return False
return True
class LoggerManager:
"""日志管理器(单例)"""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""初始化日志系统"""
if LoggerManager._initialized:
return
# 配置
log_dir = Path(__file__).parents[2] / "logs"
log_dir.mkdir(exist_ok=True)
log_level = setting.global_config.get("log_level", "INFO").upper()
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
log_file = log_dir / "app.log"
# 根日志器
self.logger = logging.getLogger()
self.logger.setLevel(log_level)
# 避免重复添加
if self.logger.handlers:
return
# 格式器和过滤器
formatter = logging.Formatter(log_format)
mcp_filter = MCPLogFilter()
# 控制台处理器
console = logging.StreamHandler(sys.stdout)
console.setLevel(log_level)
console.setFormatter(formatter)
console.addFilter(mcp_filter)
# 文件处理器(10MB,5个备份)
file_handler = RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5, encoding="utf-8"
)
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
file_handler.addFilter(mcp_filter)
# 添加处理器
self.logger.addHandler(console)
self.logger.addHandler(file_handler)
# 配置第三方库
self._configure_third_party()
LoggerManager._initialized = True
def _configure_third_party(self):
"""配置第三方库日志级别"""
config = {
"asyncio": logging.WARNING,
"uvicorn": logging.INFO,
"fastapi": logging.INFO,
"aiomysql": logging.WARNING,
"mcp": logging.CRITICAL,
"fastmcp": logging.CRITICAL,
}
for name, level in config.items():
logging.getLogger(name).setLevel(level)
def debug(self, msg: str) -> None:
"""调试日志"""
self.logger.debug(msg)
def info(self, msg: str) -> None:
"""信息日志"""
self.logger.info(msg)
def warning(self, msg: str) -> None:
"""警告日志"""
self.logger.warning(msg)
def error(self, msg: str) -> None:
"""错误日志"""
self.logger.error(msg)
def critical(self, msg: str) -> None:
"""严重错误日志"""
self.logger.critical(msg)
# 全局实例
logger = LoggerManager()
|