hr-eval-api-v2 / services /supabase_logger.py
KarenYYH
Add: Supabase 日志系统
fcc1a30
"""
Supabase 日志记录器
提供结构化的日志记录功能,将日志存储到 Supabase
支持:API 访问日志、错误日志、调试日志、模型性能日志、用户行为日志
"""
import os
import json
import time
import traceback
from typing import Optional, Dict, Any, List
from datetime import datetime
from functools import wraps
import logging
class SupabaseLogger:
"""
Supabase 日志记录器
用法:
logger = SupabaseLogger()
# 记录 API 访问
logger.log_api_access(
endpoint="/api/v1/chat",
method="POST",
status_code=200,
response_time_ms=150
)
# 记录错误
logger.log_error(
error_type="model_load",
error_message="Failed to load model",
error_stack=traceback.format_exc()
)
# 记录调试信息
logger.log_debug(
level="INFO",
logger="intelligence_analyzer",
message="Scenario detected: policy_inquiry",
data={"scenario": "policy_inquiry", "confidence": 0.9}
)
"""
def __init__(self):
"""初始化日志记录器"""
self._client = None
self._enabled = None
self._fallback_logger = logging.getLogger(__name__)
@property
def client(self):
"""延迟加载 Supabase 客户端"""
if self._client is None:
try:
from services.supabase_client import get_supabase_client
self._client = get_supabase_client()
except Exception as e:
self._fallback_logger.warning(f"Supabase client not available: {e}")
self._client = False # 标记为不可用
return self._client if self._client is not False else None
@property
def enabled(self) -> bool:
"""检查是否启用日志记录"""
if self._enabled is None:
self._enabled = os.getenv("ENABLE_SUPABASE_LOGGING", "true").lower() == "true"
return self._enabled
def log_api_access(
self,
endpoint: str,
method: str,
path: Optional[str] = None,
query_params: Optional[Dict] = None,
client_ip: Optional[str] = None,
user_agent: Optional[str] = None,
session_id: Optional[str] = None,
response_time_ms: Optional[int] = None,
status_code: Optional[int] = None,
metadata: Optional[Dict] = None
) -> bool:
"""
记录 API 访问日志
Args:
endpoint: API 端点(如 /api/v1/chat)
method: HTTP 方法
path: 请求路径
query_params: 查询参数
client_ip: 客户端 IP
user_agent: 用户代理
session_id: 会话 ID
response_time_ms: 响应时间(毫秒)
status_code: HTTP 状态码
metadata: 额外元数据
Returns:
是否成功记录
"""
if not self.enabled or not self.client:
return False
try:
data = {
"endpoint": endpoint,
"method": method,
"path": path,
"query_params": query_params,
"client_ip": client_ip,
"user_agent": user_agent,
"session_id": session_id,
"response_time_ms": response_time_ms,
"status_code": status_code,
"metadata": metadata or {}
}
self.client.table("api_access_logs").insert(data).execute()
return True
except Exception as e:
self._fallback_logger.error(f"Failed to log API access: {e}")
return False
def log_error(
self,
error_type: str,
error_message: str,
error_stack: Optional[str] = None,
endpoint: Optional[str] = None,
session_id: Optional[str] = None,
user_input: Optional[str] = None,
request_data: Optional[Dict] = None,
metadata: Optional[Dict] = None
) -> bool:
"""
记录错误日志
Args:
error_type: 错误类型
error_message: 错误消息
error_stack: 错误堆栈
endpoint: 发生错误的端点
session_id: 会话 ID
user_input: 用户输入
request_data: 请求数据
metadata: 额外元数据
Returns:
是否成功记录
"""
if not self.enabled or not self.client:
return False
try:
data = {
"error_type": error_type,
"error_message": error_message,
"error_stack": error_stack,
"endpoint": endpoint,
"session_id": session_id,
"user_input": user_input,
"request_data": request_data,
"metadata": metadata or {}
}
self.client.table("error_logs").insert(data).execute()
return True
except Exception as e:
self._fallback_logger.error(f"Failed to log error: {e}")
return False
def log_debug(
self,
level: str,
logger: str,
message: str,
session_id: Optional[str] = None,
function_name: Optional[str] = None,
line_number: Optional[int] = None,
data: Optional[Dict] = None,
metadata: Optional[Dict] = None
) -> bool:
"""
记录调试日志
Args:
level: 日志级别(DEBUG, INFO, WARNING, ERROR)
logger: 模块名称
message: 日志消息
session_id: 会话 ID
function_name: 函数名
line_number: 行号
data: 结构化数据
metadata: 额外元数据
Returns:
是否成功记录
"""
if not self.enabled or not self.client:
return False
# 只记录 INFO 及以上级别的日志
if level == "DEBUG" and os.getenv("LOG_LEVEL", "INFO").upper() != "DEBUG":
return False
try:
log_data = {
"level": level,
"logger": logger,
"message": message,
"session_id": session_id,
"function_name": function_name,
"line_number": line_number,
"data": data,
"metadata": metadata or {}
}
self.client.table("debug_logs").insert(log_data).execute()
return True
except Exception as e:
self._fallback_logger.error(f"Failed to log debug: {e}")
return False
def log_model_performance(
self,
model_name: str,
model_type: str,
load_time_ms: Optional[int] = None,
inference_time_ms: Optional[int] = None,
total_time_ms: Optional[int] = None,
input_text: Optional[str] = None,
output_summary: Optional[str] = None,
metadata: Optional[Dict] = None
) -> bool:
"""
记录模型性能日志
Args:
model_name: 模型名称
model_type: 模型类型(sbert, sentiment, compliance, llm)
load_time_ms: 加载时间
inference_time_ms: 推理时间
total_time_ms: 总时间
input_text: 输入文本
output_summary: 输出摘要
metadata: 额外元数据
Returns:
是否成功记录
"""
if not self.enabled or not self.client:
return False
try:
data = {
"model_name": model_name,
"model_type": model_type,
"load_time_ms": load_time_ms,
"inference_time_ms": inference_time_ms,
"total_time_ms": total_time_ms,
"input_text": input_text[:500] if input_text else None, # 限制长度
"output_summary": output_summary[:500] if output_summary else None,
"metadata": metadata or {}
}
self.client.table("model_performance_logs").insert(data).execute()
return True
except Exception as e:
self._fallback_logger.error(f"Failed to log model performance: {e}")
return False
def log_user_behavior(
self,
session_id: str,
action_type: str,
action_detail: Optional[str] = None,
input_text: Optional[str] = None,
intent_detected: Optional[str] = None,
scenario_detected: Optional[str] = None,
success: Optional[bool] = None,
error_message: Optional[str] = None,
user_id: Optional[str] = None,
metadata: Optional[Dict] = None
) -> bool:
"""
记录用户行为日志
Args:
session_id: 会话 ID
action_type: 动作类型(chat, evaluate, config_check, etc.)
action_detail: 动作详情
input_text: 用户输入
intent_detected: 检测到的意图
scenario_detected: 检测到的场景
success: 是否成功
error_message: 错误消息
user_id: 用户 ID
metadata: 额外元数据
Returns:
是否成功记录
"""
if not self.enabled or not self.client:
return False
try:
data = {
"session_id": session_id,
"user_id": user_id,
"action_type": action_type,
"action_detail": action_detail,
"input_text": input_text,
"intent_detected": intent_detected,
"scenario_detected": scenario_detected,
"success": success,
"error_message": error_message,
"metadata": metadata or {}
}
self.client.table("user_behavior_logs").insert(data).execute()
return True
except Exception as e:
self._fallback_logger.error(f"Failed to log user behavior: {e}")
return False
# 全局单例
_supabase_logger_instance = None
def get_supabase_logger() -> SupabaseLogger:
"""获取 Supabase 日志记录器单例"""
global _supabase_logger_instance
if _supabase_logger_instance is None:
_supabase_logger_instance = SupabaseLogger()
return _supabase_logger_instance
# ============================================
# 装饰器
# ============================================
def log_api_access(endpoint: str = None):
"""
API 访问日志装饰器
用法:
@log_api_access("/api/v1/chat")
async def chat_endpoint(request):
...
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
logger = get_supabase_logger()
start_time = time.time()
# 获取端点名称
ep = endpoint or f"/{func.__name__}"
try:
result = await func(*args, **kwargs)
status_code = getattr(result, 'status_code', 200) if hasattr(result, 'status_code') else 200
# 记录成功的 API 调用
response_time = int((time.time() - start_time) * 1000)
logger.log_api_access(
endpoint=ep,
method="POST",
status_code=status_code,
response_time_ms=response_time
)
return result
except Exception as e:
# 记录失败的 API 调用
response_time = int((time.time() - start_time) * 1000)
logger.log_api_access(
endpoint=ep,
method="POST",
status_code=500,
response_time_ms=response_time
)
# 记录错误
logger.log_error(
error_type="api_error",
error_message=str(e),
error_stack=traceback.format_exc(),
endpoint=ep
)
raise
return wrapper
return decorator
def log_model_performance(model_name: str, model_type: str):
"""
模型性能日志装饰器
用法:
@log_model_performance("sbert-hr-v2", "sbert")
def encode_text(text):
...
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger = get_supabase_logger()
start_time = time.time()
try:
result = func(*args, **kwargs)
total_time = int((time.time() - start_time) * 1000)
# 记录性能
logger.log_model_performance(
model_name=model_name,
model_type=model_type,
total_time_ms=total_time,
input_text=str(args[0]) if args else None
)
return result
except Exception as e:
total_time = int((time.time() - start_time) * 1000)
# 记录错误
logger.log_error(
error_type="model_inference",
error_message=str(e),
error_stack=traceback.format_exc()
)
raise
return wrapper
return decorator
# ============================================
# 上下文管理器 - 用于性能追踪
# ============================================
class LogPerformance:
"""
性能日志上下文管理器
用法:
with LogPerformance("chat_processing", "llm"):
# 执行耗时操作
result = process_chat(request)
"""
def __init__(self, operation_name: str, model_name: str, model_type: str):
self.operation_name = operation_name
self.model_name = model_name
self.model_type = model_type
self.logger = get_supabase_logger()
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
total_time = int((time.time() - self.start_time) * 1000)
if exc_type is None:
# 成功完成
self.logger.log_model_performance(
model_name=self.model_name,
model_type=self.model_type,
total_time_ms=total_time
)
else:
# 发生错误
self.logger.log_error(
error_type="performance_error",
error_message=str(exc_val),
error_stack=traceback.format_exception(exc_type, exc_val, exc_tb),
metadata={
"operation": self.operation_name,
"duration_ms": total_time
}
)
return False # 不抑制异常
# ============================================
# 简化的日志函数
# ============================================
def log_debug(logger_name: str, message: str, level: str = "INFO", **kwargs):
"""简化的调试日志记录"""
get_supabase_logger().log_debug(
level=level,
logger=logger_name,
message=message,
**kwargs
)
def log_error(error_type: str, error_message: str, **kwargs):
"""简化的错误日志记录"""
get_supabase_logger().log_error(
error_type=error_type,
error_message=error_message,
**kwargs
)