Spaces:
Sleeping
Sleeping
| """ | |
| Supabase 客户端 | |
| 用于保存对话历史到 Supabase PostgreSQL 数据库 | |
| """ | |
| import os | |
| from typing import Dict, Optional, List | |
| from datetime import datetime | |
| import json | |
| try: | |
| from supabase import create_client, Client | |
| SUPABASE_AVAILABLE = True | |
| except ImportError: | |
| SUPABASE_AVAILABLE = False | |
| print("警告: supabase 包未安装,Supabase 集成将不可用") | |
| class SupabaseClient: | |
| """Supabase 客户端封装""" | |
| def __init__(self): | |
| """初始化 Supabase 客户端""" | |
| if not SUPABASE_AVAILABLE: | |
| self.client = None | |
| return | |
| supabase_url = os.getenv("SUPABASE_URL") | |
| supabase_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| if not supabase_url or not supabase_key: | |
| print("警告: SUPABASE_URL 或 SUPABASE_SERVICE_ROLE_KEY 未设置") | |
| self.client = None | |
| return | |
| try: | |
| self.client: Client = create_client(supabase_url, supabase_key) | |
| print("✅ Supabase 客户端初始化成功") | |
| except Exception as e: | |
| print(f"❌ Supabase 客户端初始化失败: {e}") | |
| self.client = None | |
| def is_available(self) -> bool: | |
| """检查 Supabase 是否可用""" | |
| return self.client is not None | |
| def save_conversation( | |
| self, | |
| session_id: str, | |
| user_message: str, | |
| assistant_message: str, | |
| analysis_report: Optional[Dict] = None, | |
| context_summary: Optional[Dict] = None, | |
| metadata: Optional[Dict] = None | |
| ) -> bool: | |
| """ | |
| 保存对话到 Supabase | |
| Args: | |
| session_id: 会话ID | |
| user_message: 用户消息 | |
| assistant_message: 助手回复 | |
| analysis_report: 分析报告 | |
| context_summary: 上下文摘要 | |
| metadata: 额外元数据 | |
| Returns: | |
| 是否保存成功 | |
| """ | |
| if not self.is_available(): | |
| return False | |
| try: | |
| # 准备数据 | |
| data = { | |
| "session_id": session_id, | |
| "user_message": user_message, | |
| "assistant_message": assistant_message, | |
| "analysis_report": json.dumps(analysis_report) if analysis_report else None, | |
| "context_summary": json.dumps(context_summary) if context_summary else None, | |
| "metadata": json.dumps(metadata) if metadata else None, | |
| "created_at": datetime.utcnow().isoformat() | |
| } | |
| # 插入数据 | |
| response = self.client.table("conversation_history").insert(data).execute() | |
| if response.data: | |
| print(f"✅ 对话已保存到 Supabase: {session_id}") | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"❌ 保存对话到 Supabase 失败: {e}") | |
| return False | |
| def get_conversation_history( | |
| self, | |
| session_id: str, | |
| limit: int = 50 | |
| ) -> List[Dict]: | |
| """ | |
| 获取指定会话的历史记录 | |
| Args: | |
| session_id: 会话ID | |
| limit: 最大返回数量 | |
| Returns: | |
| 对话历史列表 | |
| """ | |
| if not self.is_available(): | |
| return [] | |
| try: | |
| response = self.client.table("conversation_history") \ | |
| .select("*") \ | |
| .eq("session_id", session_id) \ | |
| .order("created_at", desc=False) \ | |
| .limit(limit) \ | |
| .execute() | |
| return response.data if response.data else [] | |
| except Exception as e: | |
| print(f"❌ 获取对话历史失败: {e}") | |
| return [] | |
| def get_recent_conversations( | |
| self, | |
| hours: int = 24, | |
| limit: int = 100 | |
| ) -> List[Dict]: | |
| """ | |
| 获取最近的对话记录 | |
| Args: | |
| hours: 最近多少小时 | |
| limit: 最大返回数量 | |
| Returns: | |
| 对话列表 | |
| """ | |
| if not self.is_available(): | |
| return [] | |
| try: | |
| from datetime import timedelta | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| response = self.client.table("conversation_history") \ | |
| .select("*") \ | |
| .gte("created_at", cutoff_time.isoformat()) \ | |
| .order("created_at", desc=True) \ | |
| .limit(limit) \ | |
| .execute() | |
| return response.data if response.data else [] | |
| except Exception as e: | |
| print(f"❌ 获取最近对话失败: {e}") | |
| return [] | |
| def delete_conversation(self, conversation_id: str) -> bool: | |
| """ | |
| 删除指定对话 | |
| Args: | |
| conversation_id: 对话ID (UUID) | |
| Returns: | |
| 是否删除成功 | |
| """ | |
| if not self.is_available(): | |
| return False | |
| try: | |
| response = self.client.table("conversation_history") \ | |
| .delete() \ | |
| .eq("id", conversation_id) \ | |
| .execute() | |
| return len(response.data) > 0 | |
| except Exception as e: | |
| print(f"❌ 删除对话失败: {e}") | |
| return False | |
| def delete_session(self, session_id: str) -> bool: | |
| """ | |
| 删除整个会话的所有对话 | |
| Args: | |
| session_id: 会话ID | |
| Returns: | |
| 是否删除成功 | |
| """ | |
| if not self.is_available(): | |
| return False | |
| try: | |
| response = self.client.table("conversation_history") \ | |
| .delete() \ | |
| .eq("session_id", session_id) \ | |
| .execute() | |
| return True | |
| except Exception as e: | |
| print(f"❌ 删除会话失败: {e}") | |
| return False | |
| def get_statistics(self, hours: int = 24) -> Dict: | |
| """ | |
| 获取对话统计信息 | |
| Args: | |
| hours: 统计最近多少小时 | |
| Returns: | |
| 统计信息字典 | |
| """ | |
| if not self.is_available(): | |
| return {} | |
| try: | |
| conversations = self.get_recent_conversations(hours=hours, limit=10000) | |
| total = len(conversations) | |
| if total == 0: | |
| return { | |
| "total_conversations": 0, | |
| "unique_sessions": 0, | |
| "avg_score": 0, | |
| "high_risk_count": 0 | |
| } | |
| # 统计唯一会话数 | |
| unique_sessions = len(set(c["session_id"] for c in conversations)) | |
| # 计算平均分数 | |
| scores = [] | |
| high_risk_count = 0 | |
| for c in conversations: | |
| try: | |
| if c.get("analysis_report"): | |
| report = json.loads(c["analysis_report"]) if isinstance(c["analysis_report"], str) else c["analysis_report"] | |
| if "overall_score" in report: | |
| scores.append(report["overall_score"]) | |
| # 检查风险等级 | |
| risk_assessment = report.get("risk_assessment", {}) | |
| if risk_assessment.get("risk_level") == "high": | |
| high_risk_count += 1 | |
| except: | |
| pass | |
| avg_score = sum(scores) / len(scores) if scores else 0 | |
| return { | |
| "total_conversations": total, | |
| "unique_sessions": unique_sessions, | |
| "avg_score": round(avg_score, 2), | |
| "high_risk_count": high_risk_count, | |
| "period_hours": hours | |
| } | |
| except Exception as e: | |
| print(f"❌ 获取统计信息失败: {e}") | |
| return {} | |
| # 全局单例 | |
| _supabase_client_instance = None | |
| def get_supabase_client() -> SupabaseClient: | |
| """获取 Supabase 客户端单例""" | |
| global _supabase_client_instance | |
| if _supabase_client_instance is None: | |
| _supabase_client_instance = SupabaseClient() | |
| return _supabase_client_instance | |