hr-eval-api-v2 / services /supabase_client.py
KarenYYH
Initial commit - HR Evaluation API v2
c8b1f17
"""
Supabase 客户端
用于保存对话历史到 Supabase PostgreSQL 数据库
"""
import os
from typing import Dict, Optional, List
from datetime import datetime
import json
try:
from supabase import create_client, Client
SUPABASE_AVAILABLE = True
except ImportError:
SUPABASE_AVAILABLE = False
print("警告: supabase 包未安装,Supabase 集成将不可用")
class SupabaseClient:
"""Supabase 客户端封装"""
def __init__(self):
"""初始化 Supabase 客户端"""
if not SUPABASE_AVAILABLE:
self.client = None
return
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
if not supabase_url or not supabase_key:
print("警告: SUPABASE_URL 或 SUPABASE_SERVICE_ROLE_KEY 未设置")
self.client = None
return
try:
self.client: Client = create_client(supabase_url, supabase_key)
print("✅ Supabase 客户端初始化成功")
except Exception as e:
print(f"❌ Supabase 客户端初始化失败: {e}")
self.client = None
def is_available(self) -> bool:
"""检查 Supabase 是否可用"""
return self.client is not None
def save_conversation(
self,
session_id: str,
user_message: str,
assistant_message: str,
analysis_report: Optional[Dict] = None,
context_summary: Optional[Dict] = None,
metadata: Optional[Dict] = None
) -> bool:
"""
保存对话到 Supabase
Args:
session_id: 会话ID
user_message: 用户消息
assistant_message: 助手回复
analysis_report: 分析报告
context_summary: 上下文摘要
metadata: 额外元数据
Returns:
是否保存成功
"""
if not self.is_available():
return False
try:
# 准备数据
data = {
"session_id": session_id,
"user_message": user_message,
"assistant_message": assistant_message,
"analysis_report": json.dumps(analysis_report) if analysis_report else None,
"context_summary": json.dumps(context_summary) if context_summary else None,
"metadata": json.dumps(metadata) if metadata else None,
"created_at": datetime.utcnow().isoformat()
}
# 插入数据
response = self.client.table("conversation_history").insert(data).execute()
if response.data:
print(f"✅ 对话已保存到 Supabase: {session_id}")
return True
return False
except Exception as e:
print(f"❌ 保存对话到 Supabase 失败: {e}")
return False
def get_conversation_history(
self,
session_id: str,
limit: int = 50
) -> List[Dict]:
"""
获取指定会话的历史记录
Args:
session_id: 会话ID
limit: 最大返回数量
Returns:
对话历史列表
"""
if not self.is_available():
return []
try:
response = self.client.table("conversation_history") \
.select("*") \
.eq("session_id", session_id) \
.order("created_at", desc=False) \
.limit(limit) \
.execute()
return response.data if response.data else []
except Exception as e:
print(f"❌ 获取对话历史失败: {e}")
return []
def get_recent_conversations(
self,
hours: int = 24,
limit: int = 100
) -> List[Dict]:
"""
获取最近的对话记录
Args:
hours: 最近多少小时
limit: 最大返回数量
Returns:
对话列表
"""
if not self.is_available():
return []
try:
from datetime import timedelta
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
response = self.client.table("conversation_history") \
.select("*") \
.gte("created_at", cutoff_time.isoformat()) \
.order("created_at", desc=True) \
.limit(limit) \
.execute()
return response.data if response.data else []
except Exception as e:
print(f"❌ 获取最近对话失败: {e}")
return []
def delete_conversation(self, conversation_id: str) -> bool:
"""
删除指定对话
Args:
conversation_id: 对话ID (UUID)
Returns:
是否删除成功
"""
if not self.is_available():
return False
try:
response = self.client.table("conversation_history") \
.delete() \
.eq("id", conversation_id) \
.execute()
return len(response.data) > 0
except Exception as e:
print(f"❌ 删除对话失败: {e}")
return False
def delete_session(self, session_id: str) -> bool:
"""
删除整个会话的所有对话
Args:
session_id: 会话ID
Returns:
是否删除成功
"""
if not self.is_available():
return False
try:
response = self.client.table("conversation_history") \
.delete() \
.eq("session_id", session_id) \
.execute()
return True
except Exception as e:
print(f"❌ 删除会话失败: {e}")
return False
def get_statistics(self, hours: int = 24) -> Dict:
"""
获取对话统计信息
Args:
hours: 统计最近多少小时
Returns:
统计信息字典
"""
if not self.is_available():
return {}
try:
conversations = self.get_recent_conversations(hours=hours, limit=10000)
total = len(conversations)
if total == 0:
return {
"total_conversations": 0,
"unique_sessions": 0,
"avg_score": 0,
"high_risk_count": 0
}
# 统计唯一会话数
unique_sessions = len(set(c["session_id"] for c in conversations))
# 计算平均分数
scores = []
high_risk_count = 0
for c in conversations:
try:
if c.get("analysis_report"):
report = json.loads(c["analysis_report"]) if isinstance(c["analysis_report"], str) else c["analysis_report"]
if "overall_score" in report:
scores.append(report["overall_score"])
# 检查风险等级
risk_assessment = report.get("risk_assessment", {})
if risk_assessment.get("risk_level") == "high":
high_risk_count += 1
except:
pass
avg_score = sum(scores) / len(scores) if scores else 0
return {
"total_conversations": total,
"unique_sessions": unique_sessions,
"avg_score": round(avg_score, 2),
"high_risk_count": high_risk_count,
"period_hours": hours
}
except Exception as e:
print(f"❌ 获取统计信息失败: {e}")
return {}
# 全局单例
_supabase_client_instance = None
def get_supabase_client() -> SupabaseClient:
"""获取 Supabase 客户端单例"""
global _supabase_client_instance
if _supabase_client_instance is None:
_supabase_client_instance = SupabaseClient()
return _supabase_client_instance