Spaces:
Sleeping
Sleeping
| """ | |
| Supabase 日志记录器 | |
| 提供结构化的日志记录功能,将日志存储到 Supabase | |
| 支持:API 访问日志、错误日志、调试日志、模型性能日志、用户行为日志 | |
| """ | |
| import os | |
| import json | |
| import time | |
| import traceback | |
| from typing import Optional, Dict, Any, List | |
| from datetime import datetime | |
| from functools import wraps | |
| import logging | |
| class SupabaseLogger: | |
| """ | |
| Supabase 日志记录器 | |
| 用法: | |
| logger = SupabaseLogger() | |
| # 记录 API 访问 | |
| logger.log_api_access( | |
| endpoint="/api/v1/chat", | |
| method="POST", | |
| status_code=200, | |
| response_time_ms=150 | |
| ) | |
| # 记录错误 | |
| logger.log_error( | |
| error_type="model_load", | |
| error_message="Failed to load model", | |
| error_stack=traceback.format_exc() | |
| ) | |
| # 记录调试信息 | |
| logger.log_debug( | |
| level="INFO", | |
| logger="intelligence_analyzer", | |
| message="Scenario detected: policy_inquiry", | |
| data={"scenario": "policy_inquiry", "confidence": 0.9} | |
| ) | |
| """ | |
| def __init__(self): | |
| """初始化日志记录器""" | |
| self._client = None | |
| self._enabled = None | |
| self._fallback_logger = logging.getLogger(__name__) | |
| def client(self): | |
| """延迟加载 Supabase 客户端""" | |
| if self._client is None: | |
| try: | |
| from services.supabase_client import get_supabase_client | |
| self._client = get_supabase_client() | |
| except Exception as e: | |
| self._fallback_logger.warning(f"Supabase client not available: {e}") | |
| self._client = False # 标记为不可用 | |
| return self._client if self._client is not False else None | |
| def enabled(self) -> bool: | |
| """检查是否启用日志记录""" | |
| if self._enabled is None: | |
| self._enabled = os.getenv("ENABLE_SUPABASE_LOGGING", "true").lower() == "true" | |
| return self._enabled | |
| def log_api_access( | |
| self, | |
| endpoint: str, | |
| method: str, | |
| path: Optional[str] = None, | |
| query_params: Optional[Dict] = None, | |
| client_ip: Optional[str] = None, | |
| user_agent: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| response_time_ms: Optional[int] = None, | |
| status_code: Optional[int] = None, | |
| metadata: Optional[Dict] = None | |
| ) -> bool: | |
| """ | |
| 记录 API 访问日志 | |
| Args: | |
| endpoint: API 端点(如 /api/v1/chat) | |
| method: HTTP 方法 | |
| path: 请求路径 | |
| query_params: 查询参数 | |
| client_ip: 客户端 IP | |
| user_agent: 用户代理 | |
| session_id: 会话 ID | |
| response_time_ms: 响应时间(毫秒) | |
| status_code: HTTP 状态码 | |
| metadata: 额外元数据 | |
| Returns: | |
| 是否成功记录 | |
| """ | |
| if not self.enabled or not self.client: | |
| return False | |
| try: | |
| data = { | |
| "endpoint": endpoint, | |
| "method": method, | |
| "path": path, | |
| "query_params": query_params, | |
| "client_ip": client_ip, | |
| "user_agent": user_agent, | |
| "session_id": session_id, | |
| "response_time_ms": response_time_ms, | |
| "status_code": status_code, | |
| "metadata": metadata or {} | |
| } | |
| self.client.table("api_access_logs").insert(data).execute() | |
| return True | |
| except Exception as e: | |
| self._fallback_logger.error(f"Failed to log API access: {e}") | |
| return False | |
| def log_error( | |
| self, | |
| error_type: str, | |
| error_message: str, | |
| error_stack: Optional[str] = None, | |
| endpoint: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| user_input: Optional[str] = None, | |
| request_data: Optional[Dict] = None, | |
| metadata: Optional[Dict] = None | |
| ) -> bool: | |
| """ | |
| 记录错误日志 | |
| Args: | |
| error_type: 错误类型 | |
| error_message: 错误消息 | |
| error_stack: 错误堆栈 | |
| endpoint: 发生错误的端点 | |
| session_id: 会话 ID | |
| user_input: 用户输入 | |
| request_data: 请求数据 | |
| metadata: 额外元数据 | |
| Returns: | |
| 是否成功记录 | |
| """ | |
| if not self.enabled or not self.client: | |
| return False | |
| try: | |
| data = { | |
| "error_type": error_type, | |
| "error_message": error_message, | |
| "error_stack": error_stack, | |
| "endpoint": endpoint, | |
| "session_id": session_id, | |
| "user_input": user_input, | |
| "request_data": request_data, | |
| "metadata": metadata or {} | |
| } | |
| self.client.table("error_logs").insert(data).execute() | |
| return True | |
| except Exception as e: | |
| self._fallback_logger.error(f"Failed to log error: {e}") | |
| return False | |
| def log_debug( | |
| self, | |
| level: str, | |
| logger: str, | |
| message: str, | |
| session_id: Optional[str] = None, | |
| function_name: Optional[str] = None, | |
| line_number: Optional[int] = None, | |
| data: Optional[Dict] = None, | |
| metadata: Optional[Dict] = None | |
| ) -> bool: | |
| """ | |
| 记录调试日志 | |
| Args: | |
| level: 日志级别(DEBUG, INFO, WARNING, ERROR) | |
| logger: 模块名称 | |
| message: 日志消息 | |
| session_id: 会话 ID | |
| function_name: 函数名 | |
| line_number: 行号 | |
| data: 结构化数据 | |
| metadata: 额外元数据 | |
| Returns: | |
| 是否成功记录 | |
| """ | |
| if not self.enabled or not self.client: | |
| return False | |
| # 只记录 INFO 及以上级别的日志 | |
| if level == "DEBUG" and os.getenv("LOG_LEVEL", "INFO").upper() != "DEBUG": | |
| return False | |
| try: | |
| log_data = { | |
| "level": level, | |
| "logger": logger, | |
| "message": message, | |
| "session_id": session_id, | |
| "function_name": function_name, | |
| "line_number": line_number, | |
| "data": data, | |
| "metadata": metadata or {} | |
| } | |
| self.client.table("debug_logs").insert(log_data).execute() | |
| return True | |
| except Exception as e: | |
| self._fallback_logger.error(f"Failed to log debug: {e}") | |
| return False | |
| def log_model_performance( | |
| self, | |
| model_name: str, | |
| model_type: str, | |
| load_time_ms: Optional[int] = None, | |
| inference_time_ms: Optional[int] = None, | |
| total_time_ms: Optional[int] = None, | |
| input_text: Optional[str] = None, | |
| output_summary: Optional[str] = None, | |
| metadata: Optional[Dict] = None | |
| ) -> bool: | |
| """ | |
| 记录模型性能日志 | |
| Args: | |
| model_name: 模型名称 | |
| model_type: 模型类型(sbert, sentiment, compliance, llm) | |
| load_time_ms: 加载时间 | |
| inference_time_ms: 推理时间 | |
| total_time_ms: 总时间 | |
| input_text: 输入文本 | |
| output_summary: 输出摘要 | |
| metadata: 额外元数据 | |
| Returns: | |
| 是否成功记录 | |
| """ | |
| if not self.enabled or not self.client: | |
| return False | |
| try: | |
| data = { | |
| "model_name": model_name, | |
| "model_type": model_type, | |
| "load_time_ms": load_time_ms, | |
| "inference_time_ms": inference_time_ms, | |
| "total_time_ms": total_time_ms, | |
| "input_text": input_text[:500] if input_text else None, # 限制长度 | |
| "output_summary": output_summary[:500] if output_summary else None, | |
| "metadata": metadata or {} | |
| } | |
| self.client.table("model_performance_logs").insert(data).execute() | |
| return True | |
| except Exception as e: | |
| self._fallback_logger.error(f"Failed to log model performance: {e}") | |
| return False | |
| def log_user_behavior( | |
| self, | |
| session_id: str, | |
| action_type: str, | |
| action_detail: Optional[str] = None, | |
| input_text: Optional[str] = None, | |
| intent_detected: Optional[str] = None, | |
| scenario_detected: Optional[str] = None, | |
| success: Optional[bool] = None, | |
| error_message: Optional[str] = None, | |
| user_id: Optional[str] = None, | |
| metadata: Optional[Dict] = None | |
| ) -> bool: | |
| """ | |
| 记录用户行为日志 | |
| Args: | |
| session_id: 会话 ID | |
| action_type: 动作类型(chat, evaluate, config_check, etc.) | |
| action_detail: 动作详情 | |
| input_text: 用户输入 | |
| intent_detected: 检测到的意图 | |
| scenario_detected: 检测到的场景 | |
| success: 是否成功 | |
| error_message: 错误消息 | |
| user_id: 用户 ID | |
| metadata: 额外元数据 | |
| Returns: | |
| 是否成功记录 | |
| """ | |
| if not self.enabled or not self.client: | |
| return False | |
| try: | |
| data = { | |
| "session_id": session_id, | |
| "user_id": user_id, | |
| "action_type": action_type, | |
| "action_detail": action_detail, | |
| "input_text": input_text, | |
| "intent_detected": intent_detected, | |
| "scenario_detected": scenario_detected, | |
| "success": success, | |
| "error_message": error_message, | |
| "metadata": metadata or {} | |
| } | |
| self.client.table("user_behavior_logs").insert(data).execute() | |
| return True | |
| except Exception as e: | |
| self._fallback_logger.error(f"Failed to log user behavior: {e}") | |
| return False | |
| # 全局单例 | |
| _supabase_logger_instance = None | |
| def get_supabase_logger() -> SupabaseLogger: | |
| """获取 Supabase 日志记录器单例""" | |
| global _supabase_logger_instance | |
| if _supabase_logger_instance is None: | |
| _supabase_logger_instance = SupabaseLogger() | |
| return _supabase_logger_instance | |
| # ============================================ | |
| # 装饰器 | |
| # ============================================ | |
| def log_api_access(endpoint: str = None): | |
| """ | |
| API 访问日志装饰器 | |
| 用法: | |
| @log_api_access("/api/v1/chat") | |
| async def chat_endpoint(request): | |
| ... | |
| """ | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| logger = get_supabase_logger() | |
| start_time = time.time() | |
| # 获取端点名称 | |
| ep = endpoint or f"/{func.__name__}" | |
| try: | |
| result = await func(*args, **kwargs) | |
| status_code = getattr(result, 'status_code', 200) if hasattr(result, 'status_code') else 200 | |
| # 记录成功的 API 调用 | |
| response_time = int((time.time() - start_time) * 1000) | |
| logger.log_api_access( | |
| endpoint=ep, | |
| method="POST", | |
| status_code=status_code, | |
| response_time_ms=response_time | |
| ) | |
| return result | |
| except Exception as e: | |
| # 记录失败的 API 调用 | |
| response_time = int((time.time() - start_time) * 1000) | |
| logger.log_api_access( | |
| endpoint=ep, | |
| method="POST", | |
| status_code=500, | |
| response_time_ms=response_time | |
| ) | |
| # 记录错误 | |
| logger.log_error( | |
| error_type="api_error", | |
| error_message=str(e), | |
| error_stack=traceback.format_exc(), | |
| endpoint=ep | |
| ) | |
| raise | |
| return wrapper | |
| return decorator | |
| def log_model_performance(model_name: str, model_type: str): | |
| """ | |
| 模型性能日志装饰器 | |
| 用法: | |
| @log_model_performance("sbert-hr-v2", "sbert") | |
| def encode_text(text): | |
| ... | |
| """ | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| logger = get_supabase_logger() | |
| start_time = time.time() | |
| try: | |
| result = func(*args, **kwargs) | |
| total_time = int((time.time() - start_time) * 1000) | |
| # 记录性能 | |
| logger.log_model_performance( | |
| model_name=model_name, | |
| model_type=model_type, | |
| total_time_ms=total_time, | |
| input_text=str(args[0]) if args else None | |
| ) | |
| return result | |
| except Exception as e: | |
| total_time = int((time.time() - start_time) * 1000) | |
| # 记录错误 | |
| logger.log_error( | |
| error_type="model_inference", | |
| error_message=str(e), | |
| error_stack=traceback.format_exc() | |
| ) | |
| raise | |
| return wrapper | |
| return decorator | |
| # ============================================ | |
| # 上下文管理器 - 用于性能追踪 | |
| # ============================================ | |
| class LogPerformance: | |
| """ | |
| 性能日志上下文管理器 | |
| 用法: | |
| with LogPerformance("chat_processing", "llm"): | |
| # 执行耗时操作 | |
| result = process_chat(request) | |
| """ | |
| def __init__(self, operation_name: str, model_name: str, model_type: str): | |
| self.operation_name = operation_name | |
| self.model_name = model_name | |
| self.model_type = model_type | |
| self.logger = get_supabase_logger() | |
| self.start_time = None | |
| def __enter__(self): | |
| self.start_time = time.time() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| total_time = int((time.time() - self.start_time) * 1000) | |
| if exc_type is None: | |
| # 成功完成 | |
| self.logger.log_model_performance( | |
| model_name=self.model_name, | |
| model_type=self.model_type, | |
| total_time_ms=total_time | |
| ) | |
| else: | |
| # 发生错误 | |
| self.logger.log_error( | |
| error_type="performance_error", | |
| error_message=str(exc_val), | |
| error_stack=traceback.format_exception(exc_type, exc_val, exc_tb), | |
| metadata={ | |
| "operation": self.operation_name, | |
| "duration_ms": total_time | |
| } | |
| ) | |
| return False # 不抑制异常 | |
| # ============================================ | |
| # 简化的日志函数 | |
| # ============================================ | |
| def log_debug(logger_name: str, message: str, level: str = "INFO", **kwargs): | |
| """简化的调试日志记录""" | |
| get_supabase_logger().log_debug( | |
| level=level, | |
| logger=logger_name, | |
| message=message, | |
| **kwargs | |
| ) | |
| def log_error(error_type: str, error_message: str, **kwargs): | |
| """简化的错误日志记录""" | |
| get_supabase_logger().log_error( | |
| error_type=error_type, | |
| error_message=error_message, | |
| **kwargs | |
| ) | |