| """ |
| 结构化 JSON 日志 - 极简格式 |
| """ |
|
|
| import sys |
| import os |
| import json |
| import traceback |
| from pathlib import Path |
| from loguru import logger |
|
|
| |
| if not hasattr(logger, "isEnabledFor"): |
| logger.isEnabledFor = lambda _level: True |
|
|
| |
| DEFAULT_LOG_DIR = Path(__file__).parent.parent.parent / "logs" |
| LOG_DIR = Path(os.getenv("LOG_DIR", str(DEFAULT_LOG_DIR))) |
| _LOG_DIR_READY = False |
|
|
|
|
| def _prepare_log_dir() -> bool: |
| """确保日志目录可用""" |
| global LOG_DIR, _LOG_DIR_READY |
| if _LOG_DIR_READY: |
| return True |
| try: |
| LOG_DIR.mkdir(parents=True, exist_ok=True) |
| _LOG_DIR_READY = True |
| return True |
| except Exception: |
| _LOG_DIR_READY = False |
| return False |
|
|
|
|
| def _format_json(record) -> str: |
| """格式化日志""" |
| |
| time_str = record["time"].strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] |
| tz = record["time"].strftime("%z") |
| if tz: |
| time_str += tz[:3] + ":" + tz[3:] |
|
|
| log_entry = { |
| "time": time_str, |
| "level": record["level"].name.lower(), |
| "msg": record["message"], |
| "caller": f"{record['file'].name}:{record['line']}", |
| } |
|
|
| |
| extra = record["extra"] |
| if extra.get("traceID"): |
| log_entry["traceID"] = extra["traceID"] |
| if extra.get("spanID"): |
| log_entry["spanID"] = extra["spanID"] |
|
|
| |
| for key, value in extra.items(): |
| if key not in ("traceID", "spanID") and not key.startswith("_"): |
| log_entry[key] = value |
|
|
| |
| if record["level"].no >= 40 and record["exception"]: |
| log_entry["stacktrace"] = "".join( |
| traceback.format_exception( |
| record["exception"].type, |
| record["exception"].value, |
| record["exception"].traceback, |
| ) |
| ) |
|
|
| return json.dumps(log_entry, ensure_ascii=False) |
|
|
| def _env_flag(name: str, default: bool) -> bool: |
| raw = os.getenv(name) |
| if raw is None: |
| return default |
| return raw.strip().lower() in ("1", "true", "yes", "on", "y") |
|
|
|
|
| def _make_json_sink(output): |
| """创建 JSON sink""" |
|
|
| def sink(message): |
| json_str = _format_json(message.record) |
| print(json_str, file=output, flush=True) |
|
|
| return sink |
|
|
|
|
| def _file_json_sink(message): |
| """写入日志文件""" |
| record = message.record |
| json_str = _format_json(record) |
| log_file = LOG_DIR / f"app_{record['time'].strftime('%Y-%m-%d')}.log" |
| with open(log_file, "a", encoding="utf-8") as f: |
| f.write(json_str + "\n") |
|
|
|
|
| def setup_logging( |
| level: str = "DEBUG", |
| json_console: bool = True, |
| file_logging: bool = True, |
| ): |
| """设置日志配置""" |
| logger.remove() |
| file_logging = _env_flag("LOG_FILE_ENABLED", file_logging) |
|
|
| |
| if json_console: |
| logger.add( |
| _make_json_sink(sys.stdout), |
| level=level, |
| format="{message}", |
| colorize=False, |
| ) |
| else: |
| logger.add( |
| sys.stdout, |
| level=level, |
| format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{file.name}:{line}</cyan> - <level>{message}</level>", |
| colorize=True, |
| ) |
|
|
| |
| if file_logging: |
| if _prepare_log_dir(): |
| logger.add( |
| _file_json_sink, |
| level=level, |
| format="{message}", |
| enqueue=True, |
| ) |
| else: |
| logger.warning("File logging disabled: no writable log directory.") |
|
|
| return logger |
|
|
|
|
| def get_logger(trace_id: str = "", span_id: str = ""): |
| """获取绑定了 trace 上下文的 logger""" |
| bound = {} |
| if trace_id: |
| bound["traceID"] = trace_id |
| if span_id: |
| bound["spanID"] = span_id |
| return logger.bind(**bound) if bound else logger |
|
|
|
|
| __all__ = ["logger", "setup_logging", "get_logger", "LOG_DIR"] |
|
|