gcli2api / log.py
a3216's picture
sync: github -> hf space
96d34f2
"""
日志模块 - 使用环境变量配置
"""
import os
import sys
import threading
from datetime import datetime
from collections import deque
import atexit
# 日志级别定义
LOG_LEVELS = {"debug": 0, "info": 1, "warning": 2, "error": 3, "critical": 4}
# 文件写入状态标志(仅由 writer 线程修改,无需锁保护)
_file_writing_disabled = False
_disable_reason = None
# 全局文件句柄(仅由 writer 线程访问,无需文件锁)
_log_file_handle = None
# -----------------------------------------------------------------
# 高性能无锁队列:用 deque + Condition 替代 Queue
# deque.append / deque.popleft 在 CPython 中受 GIL 保护,是原子操作,
# 不需要额外的 Lock 做入队保护,只用 Condition 做"有数据"通知。
# -----------------------------------------------------------------
_log_deque: deque = deque()
_deque_condition = threading.Condition(threading.Lock())
_writer_thread = None
_writer_running = False
# -----------------------------------------------------------------
# 缓存日志级别,避免每次都读 os.getenv(高并发热路径)
# -----------------------------------------------------------------
_cached_log_level: int = LOG_LEVELS["info"]
_cached_log_file: str = "log.txt"
# ENABLE_LOG=0/false/no/off 时彻底关闭日志
_log_enabled: bool = True
def _refresh_config():
"""从环境变量刷新缓存配置(模块加载时及需要时调用)"""
global _cached_log_level, _cached_log_file, _log_enabled
level = os.getenv("LOG_LEVEL", "info").lower()
_cached_log_level = LOG_LEVELS.get(level, LOG_LEVELS["info"])
_cached_log_file = os.getenv("LOG_FILE", "log.txt")
_log_enabled = os.getenv("ENABLE_LOG", "1").strip().lower() not in ("0", "false", "no", "off")
def _get_current_log_level() -> int:
return _cached_log_level
def _get_log_file_path() -> str:
return _cached_log_file
# -----------------------------------------------------------------
# 文件句柄管理(仅在 writer 线程内调用,不需要 _file_lock)
# -----------------------------------------------------------------
def _close_log_file():
global _log_file_handle
if _log_file_handle is not None:
try:
_log_file_handle.flush()
_log_file_handle.close()
except Exception:
pass
finally:
_log_file_handle = None
def _open_log_file(mode: str = "a") -> bool:
global _log_file_handle, _file_writing_disabled, _disable_reason
_close_log_file()
try:
# 使用较大缓冲区(64 KB),由 writer 线程定期 flush,减少系统调用
_log_file_handle = open(_cached_log_file, mode, encoding="utf-8", buffering=65536)
return True
except (PermissionError, OSError, IOError) as e:
_file_writing_disabled = True
_disable_reason = str(e)
print(f"Warning: Cannot open log file, disabling file writing: {e}", file=sys.stderr)
print("Log messages will continue to display in console only.", file=sys.stderr)
return False
except Exception as e:
print(f"Warning: Failed to open log file: {e}", file=sys.stderr)
return False
def _clear_log_file():
"""清空日志文件(启动时调用,此时 writer 线程尚未启动,直接操作安全)"""
global _file_writing_disabled, _disable_reason
try:
with open(_cached_log_file, "w", encoding="utf-8") as f:
pass # 覆盖清空
_open_log_file("a")
except (PermissionError, OSError, IOError) as e:
_file_writing_disabled = True
_disable_reason = str(e)
print(
f"Warning: File system appears to be read-only or permission denied. "
f"Disabling log file writing: {e}",
file=sys.stderr,
)
print("Log messages will continue to display in console only.", file=sys.stderr)
except Exception as e:
print(f"Warning: Failed to clear log file: {e}", file=sys.stderr)
# -----------------------------------------------------------------
# Writer 线程:批量从 deque 取出并写入,减少系统调用次数
# -----------------------------------------------------------------
_BATCH_SIZE = 1000 # 单次最多批量写入条数
_FLUSH_INTERVAL = 2 # 秒:无新消息时强制 flush 周期
def _log_writer_worker():
global _writer_running
last_flush_time = 0.0
while True:
# 等待数据或超时
with _deque_condition:
if not _log_deque and _writer_running:
_deque_condition.wait(timeout=_FLUSH_INTERVAL)
# 批量取出
batch = []
for _ in range(_BATCH_SIZE):
if _log_deque:
batch.append(_log_deque.popleft())
else:
break
if batch and not _file_writing_disabled:
# 一次 write 调用搞定整批,最大化减少系统调用
chunk = "\n".join(batch) + "\n"
try:
if _log_file_handle is None:
_open_log_file("a")
if _log_file_handle is not None:
_log_file_handle.write(chunk)
except Exception as e:
print(f"Warning: Failed to write log batch: {e}", file=sys.stderr)
_close_log_file()
try:
_open_log_file("a")
except Exception:
pass
# 定时 flush
now = _now_ts()
if now - last_flush_time >= _FLUSH_INTERVAL:
if _log_file_handle is not None:
try:
_log_file_handle.flush()
except Exception:
pass
last_flush_time = now
# 退出条件:已停止 + deque 已清空
if not _writer_running and not _log_deque:
break
# 最终 flush & close
if _log_file_handle is not None:
try:
_log_file_handle.flush()
except Exception:
pass
_close_log_file()
def _now_ts() -> float:
import time
return time.monotonic()
def _start_writer_thread():
global _writer_thread, _writer_running
if _writer_thread is None or not _writer_thread.is_alive():
_writer_running = True
_writer_thread = threading.Thread(target=_log_writer_worker, daemon=True, name="LogWriter")
_writer_thread.start()
def _stop_writer_thread():
global _writer_running
_writer_running = False
# 唤醒 writer 线程让它能感知退出信号
with _deque_condition:
_deque_condition.notify_all()
if _writer_thread and _writer_thread.is_alive():
_writer_thread.join(timeout=3.0)
# -----------------------------------------------------------------
# 入队(热路径,极轻量)
# -----------------------------------------------------------------
_MAX_QUEUE_SIZE = 5000 # 防止极端情况内存无限增长
def _write_to_file(message: str):
if _file_writing_disabled:
return
# deque.append 在 CPython 受 GIL 保护,无需额外锁
if len(_log_deque) >= _MAX_QUEUE_SIZE:
return # 过载保护:丢弃而非阻塞
_log_deque.append(message)
# 非阻塞通知 writer(acquire 失败直接跳过,不影响主线程)
if _deque_condition.acquire(blocking=False):
try:
_deque_condition.notify()
finally:
_deque_condition.release()
# -----------------------------------------------------------------
# 核心日志函数(热路径)
# -----------------------------------------------------------------
def _log(level: str, message: str):
# 最快短路:日志整体已禁用时直接返回,零开销
if not _log_enabled:
return
level = level.lower()
level_val = LOG_LEVELS.get(level)
if level_val is None:
print(f"Warning: Unknown log level '{level}'", file=sys.stderr)
return
# 热路径:直接与缓存值比较,无函数调用开销
if level_val < _cached_log_level:
return
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] [{level.upper()}] {message}"
if level in ("error", "critical"):
print(entry, file=sys.stderr)
else:
print(entry)
_write_to_file(entry)
def set_log_level(level: str):
"""动态设置日志级别(同时更新缓存)"""
global _cached_log_level
level = level.lower()
if level not in LOG_LEVELS:
print(f"Warning: Unknown log level '{level}'. Valid levels: {', '.join(LOG_LEVELS.keys())}")
return False
_cached_log_level = LOG_LEVELS[level]
return True
class Logger:
"""支持 log('info', 'msg') 和 log.info('msg') 两种调用方式"""
def __call__(self, level: str, message: str):
_log(level, message)
def debug(self, message: str):
_log("debug", message)
def info(self, message: str):
_log("info", message)
def warning(self, message: str):
_log("warning", message)
def error(self, message: str):
_log("error", message)
def critical(self, message: str):
_log("critical", message)
def get_current_level(self) -> str:
current_level = _get_current_log_level()
for name, value in LOG_LEVELS.items():
if value == current_level:
return name
return "info"
def get_log_file(self) -> str:
return _get_log_file_path()
def close(self):
"""手动关闭(优雅退出用)"""
_stop_writer_thread()
def get_queue_size(self) -> int:
return len(_log_deque)
# 导出全局日志实例
log = Logger()
# 导出的公共接口
__all__ = ["log", "set_log_level", "LOG_LEVELS"]
# 模块加载时:读取配置缓存 → 清空日志文件 → 启动 writer 线程
_refresh_config()
if _log_enabled:
_clear_log_file()
_start_writer_thread()
# 注册退出清理
atexit.register(_stop_writer_thread)
# 使用说明:
# 1. 设置日志级别: export LOG_LEVEL=debug (或在 .env 中设置)
# 2. 设置日志文件: export LOG_FILE=log.txt (或在 .env 中设置)
# 3. 日志级别已缓存,热路径零 os.getenv 调用
# 4. 写入线程批量处理(最多 200 条/次),64 KB 缓冲区,每 0.5 s flush 一次
# 5. 队列上限 5000 条,超出时丢弃新日志(过载保护,不阻塞主线程)
# 6. 动态调整级别:set_log_level('debug') 立即生效
# 7. 彻底关闭日志(最高性能):export ENABLE_LOG=0 (或 false/no/off)
# 关闭后不会启动 writer 线程、不写文件、不打印控制台,_log 直接 return