File size: 10,765 Bytes
c50496f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 | """
日志模块 - 使用环境变量配置
"""
import os
import sys
import threading
from datetime import datetime
from collections import deque
import atexit
# 日志级别定义
LOG_LEVELS = {"debug": 0, "info": 1, "warning": 2, "error": 3, "critical": 4}
# 文件写入状态标志(仅由 writer 线程修改,无需锁保护)
_file_writing_disabled = False
_disable_reason = None
# 全局文件句柄(仅由 writer 线程访问,无需文件锁)
_log_file_handle = None
# -----------------------------------------------------------------
# 高性能无锁队列:用 deque + Condition 替代 Queue
# deque.append / deque.popleft 在 CPython 中受 GIL 保护,是原子操作,
# 不需要额外的 Lock 做入队保护,只用 Condition 做"有数据"通知。
# -----------------------------------------------------------------
_log_deque: deque = deque()
_deque_condition = threading.Condition(threading.Lock())
_writer_thread = None
_writer_running = False
# -----------------------------------------------------------------
# 缓存日志级别,避免每次都读 os.getenv(高并发热路径)
# -----------------------------------------------------------------
_cached_log_level: int = LOG_LEVELS["info"]
_cached_log_file: str = "log.txt"
# ENABLE_LOG=0/false/no/off 时彻底关闭日志
_log_enabled: bool = True
def _refresh_config():
"""从环境变量刷新缓存配置(模块加载时及需要时调用)"""
global _cached_log_level, _cached_log_file, _log_enabled
level = os.getenv("LOG_LEVEL", "info").lower()
_cached_log_level = LOG_LEVELS.get(level, LOG_LEVELS["info"])
_cached_log_file = os.getenv("LOG_FILE", "log.txt")
_log_enabled = os.getenv("ENABLE_LOG", "1").strip().lower() not in ("0", "false", "no", "off")
def _get_current_log_level() -> int:
return _cached_log_level
def _get_log_file_path() -> str:
return _cached_log_file
# -----------------------------------------------------------------
# 文件句柄管理(仅在 writer 线程内调用,不需要 _file_lock)
# -----------------------------------------------------------------
def _close_log_file():
global _log_file_handle
if _log_file_handle is not None:
try:
_log_file_handle.flush()
_log_file_handle.close()
except Exception:
pass
finally:
_log_file_handle = None
def _open_log_file(mode: str = "a") -> bool:
global _log_file_handle, _file_writing_disabled, _disable_reason
_close_log_file()
try:
# 使用较大缓冲区(64 KB),由 writer 线程定期 flush,减少系统调用
_log_file_handle = open(_cached_log_file, mode, encoding="utf-8", buffering=65536)
return True
except (PermissionError, OSError, IOError) as e:
_file_writing_disabled = True
_disable_reason = str(e)
print(f"Warning: Cannot open log file, disabling file writing: {e}", file=sys.stderr)
print("Log messages will continue to display in console only.", file=sys.stderr)
return False
except Exception as e:
print(f"Warning: Failed to open log file: {e}", file=sys.stderr)
return False
def _clear_log_file():
"""清空日志文件(启动时调用,此时 writer 线程尚未启动,直接操作安全)"""
global _file_writing_disabled, _disable_reason
try:
with open(_cached_log_file, "w", encoding="utf-8") as f:
pass # 覆盖清空
_open_log_file("a")
except (PermissionError, OSError, IOError) as e:
_file_writing_disabled = True
_disable_reason = str(e)
print(
f"Warning: File system appears to be read-only or permission denied. "
f"Disabling log file writing: {e}",
file=sys.stderr,
)
print("Log messages will continue to display in console only.", file=sys.stderr)
except Exception as e:
print(f"Warning: Failed to clear log file: {e}", file=sys.stderr)
# -----------------------------------------------------------------
# Writer 线程:批量从 deque 取出并写入,减少系统调用次数
# -----------------------------------------------------------------
_BATCH_SIZE = 1000 # 单次最多批量写入条数
_FLUSH_INTERVAL = 2 # 秒:无新消息时强制 flush 周期
def _log_writer_worker():
global _writer_running
last_flush_time = 0.0
while True:
# 等待数据或超时
with _deque_condition:
if not _log_deque and _writer_running:
_deque_condition.wait(timeout=_FLUSH_INTERVAL)
# 批量取出
batch = []
for _ in range(_BATCH_SIZE):
if _log_deque:
batch.append(_log_deque.popleft())
else:
break
if batch and not _file_writing_disabled:
# 一次 write 调用搞定整批,最大化减少系统调用
chunk = "\n".join(batch) + "\n"
try:
if _log_file_handle is None:
_open_log_file("a")
if _log_file_handle is not None:
_log_file_handle.write(chunk)
except Exception as e:
print(f"Warning: Failed to write log batch: {e}", file=sys.stderr)
_close_log_file()
try:
_open_log_file("a")
except Exception:
pass
# 定时 flush
now = _now_ts()
if now - last_flush_time >= _FLUSH_INTERVAL:
if _log_file_handle is not None:
try:
_log_file_handle.flush()
except Exception:
pass
last_flush_time = now
# 退出条件:已停止 + deque 已清空
if not _writer_running and not _log_deque:
break
# 最终 flush & close
if _log_file_handle is not None:
try:
_log_file_handle.flush()
except Exception:
pass
_close_log_file()
def _now_ts() -> float:
import time
return time.monotonic()
def _start_writer_thread():
global _writer_thread, _writer_running
if _writer_thread is None or not _writer_thread.is_alive():
_writer_running = True
_writer_thread = threading.Thread(target=_log_writer_worker, daemon=True, name="LogWriter")
_writer_thread.start()
def _stop_writer_thread():
global _writer_running
_writer_running = False
# 唤醒 writer 线程让它能感知退出信号
with _deque_condition:
_deque_condition.notify_all()
if _writer_thread and _writer_thread.is_alive():
_writer_thread.join(timeout=3.0)
# -----------------------------------------------------------------
# 入队(热路径,极轻量)
# -----------------------------------------------------------------
_MAX_QUEUE_SIZE = 5000 # 防止极端情况内存无限增长
def _write_to_file(message: str):
if _file_writing_disabled:
return
# deque.append 在 CPython 受 GIL 保护,无需额外锁
if len(_log_deque) >= _MAX_QUEUE_SIZE:
return # 过载保护:丢弃而非阻塞
_log_deque.append(message)
# 非阻塞通知 writer(acquire 失败直接跳过,不影响主线程)
if _deque_condition.acquire(blocking=False):
try:
_deque_condition.notify()
finally:
_deque_condition.release()
# -----------------------------------------------------------------
# 核心日志函数(热路径)
# -----------------------------------------------------------------
def _log(level: str, message: str):
# 最快短路:日志整体已禁用时直接返回,零开销
if not _log_enabled:
return
level = level.lower()
level_val = LOG_LEVELS.get(level)
if level_val is None:
print(f"Warning: Unknown log level '{level}'", file=sys.stderr)
return
# 热路径:直接与缓存值比较,无函数调用开销
if level_val < _cached_log_level:
return
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] [{level.upper()}] {message}"
if level in ("error", "critical"):
print(entry, file=sys.stderr)
else:
print(entry)
_write_to_file(entry)
def set_log_level(level: str):
"""动态设置日志级别(同时更新缓存)"""
global _cached_log_level
level = level.lower()
if level not in LOG_LEVELS:
print(f"Warning: Unknown log level '{level}'. Valid levels: {', '.join(LOG_LEVELS.keys())}")
return False
_cached_log_level = LOG_LEVELS[level]
return True
class Logger:
"""支持 log('info', 'msg') 和 log.info('msg') 两种调用方式"""
def __call__(self, level: str, message: str):
_log(level, message)
def debug(self, message: str):
_log("debug", message)
def info(self, message: str):
_log("info", message)
def warning(self, message: str):
_log("warning", message)
def error(self, message: str):
_log("error", message)
def critical(self, message: str):
_log("critical", message)
def get_current_level(self) -> str:
current_level = _get_current_log_level()
for name, value in LOG_LEVELS.items():
if value == current_level:
return name
return "info"
def get_log_file(self) -> str:
return _get_log_file_path()
def close(self):
"""手动关闭(优雅退出用)"""
_stop_writer_thread()
def get_queue_size(self) -> int:
return len(_log_deque)
# 导出全局日志实例
log = Logger()
# 导出的公共接口
__all__ = ["log", "set_log_level", "LOG_LEVELS"]
# 模块加载时:读取配置缓存 → 清空日志文件 → 启动 writer 线程
_refresh_config()
if _log_enabled:
_clear_log_file()
_start_writer_thread()
# 注册退出清理
atexit.register(_stop_writer_thread)
# 使用说明:
# 1. 设置日志级别: export LOG_LEVEL=debug (或在 .env 中设置)
# 2. 设置日志文件: export LOG_FILE=log.txt (或在 .env 中设置)
# 3. 日志级别已缓存,热路径零 os.getenv 调用
# 4. 写入线程批量处理(最多 200 条/次),64 KB 缓冲区,每 0.5 s flush 一次
# 5. 队列上限 5000 条,超出时丢弃新日志(过载保护,不阻塞主线程)
# 6. 动态调整级别:set_log_level('debug') 立即生效
# 7. 彻底关闭日志(最高性能):export ENABLE_LOG=0 (或 false/no/off)
# 关闭后不会启动 writer 线程、不写文件、不打印控制台,_log 直接 return
|