XiaoBai1221's picture
Done
6c78660
import os
import sys
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
import firebase_admin
from firebase_admin import credentials, firestore
from google.cloud.firestore import DocumentReference, CollectionReference, FieldFilter
from google.cloud.firestore_v1 import ArrayUnion, Increment
import asyncio
import hashlib
# 設置日誌
LOG_LEVEL_NAME = os.getenv("BLOOMWARE_LOG_LEVEL", "WARNING").upper()
LOG_LEVEL = getattr(logging, LOG_LEVEL_NAME, logging.WARNING)
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Firestore")
logger.setLevel(LOG_LEVEL)
# 載入環境變數
load_dotenv()
# 統一配置管理
from core.config import settings
# 全局變數
firestore_db = None
users_collection = None
chats_collection = None
messages_collection = None
memories_collection = None
health_data_collection = None
device_bindings_collection = None
geo_cache_collection = None
route_cache_collection = None
# 記憶儲存相關設定
MAX_MEMORIES_PER_USER = 500
def _serialize_firestore_data(data: Any) -> Any:
"""
遞迴轉換 Firestore 資料中的 DatetimeWithNanoseconds 物件為 ISO 字串
Args:
data: Firestore 回傳的資料(可能包含 DatetimeWithNanoseconds)
Returns:
JSON 可序列化的資料
"""
from google.cloud.firestore_v1._helpers import DatetimeWithNanoseconds
if isinstance(data, DatetimeWithNanoseconds):
# 轉成 ISO 8601 字串
return data.isoformat()
elif isinstance(data, datetime):
# 一般 Python datetime 也轉成字串
return data.isoformat()
elif isinstance(data, dict):
# 遞迴處理字典
return {k: _serialize_firestore_data(v) for k, v in data.items()}
elif isinstance(data, list):
# 遞迴處理列表
return [_serialize_firestore_data(item) for item in data]
else:
# 其他型別直接回傳
return data
def _get_user_doc_ref(user_id: str) -> DocumentReference:
if users_collection is None:
raise RuntimeError("Firestore尚未連接,無法操作使用者資料")
return users_collection.document(user_id)
def _get_user_memories_collection(user_id: str) -> CollectionReference:
return _get_user_doc_ref(user_id).collection("memories")
def _get_chat_messages_collection(chat_id: str) -> CollectionReference:
if chats_collection is None:
raise RuntimeError("Firestore尚未連接,無法取得對話消息集合")
return chats_collection.document(chat_id).collection("messages")
def connect_to_firestore():
"""初始化 Firebase Firestore 連接"""
global firestore_db, messages_collection, users_collection, chats_collection, memories_collection, health_data_collection, device_bindings_collection
firebase_project_id = settings.FIREBASE_PROJECT_ID
if not firebase_project_id:
logger.error("Firebase專案ID未正確設置,請在.env文件中設置FIREBASE_PROJECT_ID環境變數")
logger.error("\n❌ 錯誤: Firebase專案ID未設置!請在.env文件中設置FIREBASE_PROJECT_ID\n")
return False
try:
logger.info("正在嘗試連接Firebase Firestore...")
logger.info("\n🔄 正在連接Firebase Firestore數據庫...\n")
# 檢查是否已經初始化 Firebase
try:
firebase_admin.get_app()
logger.info("Firebase 已初始化,跳過重複初始化")
except ValueError:
# 從統一配置取得 Firebase 憑證(支援環境變數或檔案)
try:
firebase_creds_dict = settings.get_firebase_credentials()
cred = credentials.Certificate(firebase_creds_dict)
firebase_admin.initialize_app(cred, {
'projectId': firebase_project_id,
})
logger.info(f"Firebase 初始化成功(專案ID:{firebase_project_id})")
except ValueError as e:
logger.error(f"Firebase 憑證載入失敗: {e}")
logger.error(f"\n❌ 錯誤: Firebase 憑證載入失敗!{e}\n")
return False
# 初始化 Firestore 客戶端
firestore_db = firestore.client()
# 測試連接
test_doc = firestore_db.collection('_test_connection').document('test')
test_doc.set({'timestamp': datetime.now(), 'test': True})
test_doc.delete() # 清理測試文檔
# 初始化集合引用
messages_collection = firestore_db.collection('messages')
users_collection = firestore_db.collection('users')
chats_collection = firestore_db.collection('chats')
health_data_collection = firestore_db.collection('health_data')
device_bindings_collection = firestore_db.collection('device_bindings')
# 其他集合
global geo_cache_collection, route_cache_collection
geo_cache_collection = firestore_db.collection('geo_cache')
route_cache_collection = firestore_db.collection('route_cache')
logger.info(f"✅ Firestore連接成功,專案ID:{firebase_project_id}")
logger.info(f"\n✅ Firebase Firestore連接成功!專案ID:{firebase_project_id}\n")
return True
except Exception as e:
logger.error(f"Firebase Firestore連接失敗:{e}")
logger.error(f"\n❌ Firebase Firestore連接失敗:{e}\n")
logger.error("🔧 故障排除建議:")
logger.error("1. 檢查網絡連接")
logger.error("2. 確認Firebase服務帳戶金鑰文件路徑正確")
logger.error("3. 驗證Firebase專案ID是否正確")
logger.error("4. 確保Firestore Database已在Firebase Console中啟用")
logger.error("5. 檢查服務帳戶權限是否包含Firestore權限")
return False
def ensure_indexes():
"""Firestore 不需要手動創建索引,由 Google 自動優化"""
logger.info("Firestore 自動處理索引優化,無需手動創建索引")
async def get_user_by_id(user_id: str):
"""根據使用者ID查找使用者,返回公共資訊"""
if users_collection is None:
logger.error("Firestore尚未連接,無法查找使用者")
return None
try:
# Firestore 查詢 - 使用新語法
query = users_collection.where(filter=FieldFilter("user_id", "==", user_id)).limit(1)
docs = query.get()
if not docs:
return None
user_doc = docs[0]
user_data = user_doc.to_dict()
result = {
"id": user_data["user_id"],
"name": user_data.get("name", ""),
"email": user_data.get("email", ""),
"created_at": user_data.get("created_at"),
}
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
return _serialize_firestore_data(result)
except Exception as e:
logger.error(f"查找使用者時發生錯誤: {e}")
return None
# 已移除舊的內嵌測試函式 test_connection,避免在生產代碼夾雜測試邏輯
async def save_message(user_id, content, is_bot=False):
"""保存消息到數據庫"""
if messages_collection is None:
logger.error("Firestore尚未連接,無法保存消息")
return False
try:
message = {
"user_id": user_id, # 使用user_id字段存儲用戶ID
"content": content,
"is_bot": is_bot,
"timestamp": datetime.now(),
}
import asyncio as _asyncio
await _asyncio.to_thread(lambda: messages_collection.add(message))
logger.debug(f"消息已保存到 Firestore")
return True
except Exception as e:
logger.error(f"保存消息時發生錯誤: {e}")
return False
async def get_user_history(user_id, limit=20):
"""獲取用戶的歷史對話記錄"""
if messages_collection is None:
logger.error("Firestore尚未連接,無法獲取歷史記錄")
return []
try:
import asyncio as _asyncio
def _fetch_messages():
docs = messages_collection.where(filter=FieldFilter("user_id", "==", user_id))\
.order_by("timestamp")\
.limit(limit)\
.stream()
return [doc.to_dict() for doc in docs]
messages = await _asyncio.to_thread(_fetch_messages)
logger.info(f"已獲取用戶 {user_id}{len(messages)} 條歷史記錄")
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
return _serialize_firestore_data(messages)
except Exception as e:
logger.error(f"獲取歷史記錄時發生錯誤: {e}")
return []
# Google OAuth 2.0 用戶認證
async def create_or_login_google_user(google_token_info):
"""Google OAuth 唯一登入入口,自動處理首次註冊和後續登入"""
if users_collection is None or firestore_db is None:
logger.error("Firestore尚未連接,無法處理用戶認證")
return {"success": False, "error": "數據庫未連接"}
# 檢查 Firestore 連接狀態
try:
logger.info("🔍 檢查 Firestore 連接狀態...")
# 快速連接測試
import asyncio as _asyncio
def _test_connection():
test_ref = firestore_db.collection('_connection_test').document('ping')
test_ref.set({'ping': 'test'}, merge=True)
test_ref.delete()
return True
await _asyncio.wait_for(
_asyncio.to_thread(_test_connection),
timeout=5.0 # 5秒連接測試超時
)
logger.info("✅ Firestore 連接正常")
except _asyncio.TimeoutError:
logger.error("❌ Firestore 連接測試超時")
return {"success": False, "error": "數據庫連接超時"}
except Exception as e:
logger.error(f"❌ Firestore 連接測試失敗: {e}")
return {"success": False, "error": f"數據庫連接異常: {str(e)}"}
google_id = google_token_info.get("id") or google_token_info.get("sub")
if not google_id:
logger.error(f"Google用戶信息中缺少ID字段,收到的信息: {google_token_info}")
return {"success": False, "error": "INVALID_GOOGLE_USER_INFO"}
email = google_token_info.get("email")
if not email:
logger.error(f"Google用戶信息中缺少email字段,收到的信息: {google_token_info}")
return {"success": False, "error": "INVALID_GOOGLE_USER_INFO"}
logger.info(f"🔍 處理Google用戶: google_id={google_id}, email={email}")
try:
import asyncio as _asyncio
def _fetch_existing_user():
try:
logger.info(f"🔍 查詢現有用戶: google_id={google_id}")
# 使用新的 filter 語法
query = users_collection.where(filter=FieldFilter("google_id", "==", google_id)).limit(1)
docs = list(query.stream())
logger.info(f"🔍 查詢結果: 找到 {len(docs)} 個用戶")
return docs[0] if docs else None
except Exception as e:
error_msg = str(e).lower()
if "quota" in error_msg or "exceeded" in error_msg:
logger.error(f"❌ Firestore 配額已超出限制: {e}")
raise Exception("FIRESTORE_QUOTA_EXCEEDED")
else:
logger.error(f"❌ Firestore 查詢失敗: {e}")
raise e
logger.info(f"📤 開始查詢用戶...")
# 添加超時機制
try:
user_doc = await _asyncio.wait_for(
_asyncio.to_thread(_fetch_existing_user),
timeout=10.0 # 10秒超時
)
logger.info(f"🔍 用戶查詢完成: {'找到現有用戶' if user_doc else '未找到用戶'}")
except _asyncio.TimeoutError:
logger.error("❌ Firestore 查詢超時(10秒)")
return {"success": False, "error": "數據庫查詢超時"}
except Exception as e:
error_str = str(e)
if "FIRESTORE_QUOTA_EXCEEDED" in error_str:
logger.error("❌ Firestore 每日配額已用完")
return {
"success": False,
"error": "QUOTA_EXCEEDED",
"message": "Firestore 每日配額已用完,請稍後再試或聯繫管理員升級服務"
}
else:
logger.error(f"❌ 用戶查詢異常: {e}")
return {"success": False, "error": f"用戶查詢失敗: {str(e)}"}
if user_doc:
user_data = user_doc.to_dict()
def _update_user():
users_collection.document(user_doc.id).update({
"name": google_token_info.get("name", user_data.get("name")),
"picture": google_token_info.get("picture", user_data.get("picture")),
"last_login": datetime.now(),
"updated_at": datetime.now()
})
await _asyncio.to_thread(_update_user)
logger.info(f"用戶 {email} 登入成功,user_id: {user_data.get('user_id')}")
return {
"success": True,
"user": {
"id": user_data.get("user_id", google_id),
"name": user_data.get("name", ""),
"email": user_data.get("email", email),
"picture": user_data.get("picture"),
"created_at": user_data.get("created_at")
},
"is_new_user": False
}
logger.info(f"📤 創建新用戶...")
now = datetime.now()
new_user = {
"user_id": google_id,
"google_id": google_id,
"email": email,
"name": google_token_info.get("name", ""),
"picture": google_token_info.get("picture"),
"locale": google_token_info.get("locale", "zh-TW"),
"first_login": now,
"last_login": now,
"created_at": now,
"updated_at": now
}
logger.info(f"🔍 新用戶數據: {new_user}")
logger.info(f"📤 寫入Firestore...")
await _asyncio.to_thread(lambda: users_collection.document(google_id).set(new_user))
logger.info(f"✅ 新用戶 {email} 註冊成功,user_id: {google_id}")
return {
"success": True,
"user": {
"id": google_id,
"name": new_user["name"],
"email": new_user["email"],
"picture": new_user["picture"],
"created_at": new_user["created_at"]
},
"is_new_user": True
}
except Exception as e:
logger.error(f"❌ Google OAuth 認證時發生錯誤: {e}")
logger.error(f"❌ 錯誤類型: {type(e).__name__}")
logger.error(f"❌ 錯誤堆疊:", exc_info=True)
return {"success": False, "error": str(e)}
# 對話管理
async def create_chat(user_id, title="新對話"):
if chats_collection is None:
logger.error("Firestore尚未連接,無法創建對話")
return {"success": False, "error": "數據庫未連接"}
try:
chat = {
"user_id": user_id,
"title": title,
"messages": [],
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
import asyncio as _asyncio
doc_ref = await _asyncio.to_thread(lambda: chats_collection.add(chat))
chat_id = doc_ref[1].id
logger.info(f"為用戶 {user_id} 創建了新對話,ID: {chat_id}")
chat_info = {
"chat_id": chat_id,
"user_id": user_id,
"title": title,
"created_at": chat["created_at"],
"updated_at": chat["updated_at"],
}
# 序列化時間物件,避免 JSON 序列化炸裂
serialized_chat_info = _serialize_firestore_data(chat_info)
return {"success": True, "chat": serialized_chat_info}
except Exception as e:
logger.error(f"創建對話時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def get_user_chats(user_id):
if chats_collection is None:
logger.error("Firestore尚未連接,無法獲取對話")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _fetch_chats():
docs = chats_collection.where(filter=FieldFilter("user_id", "==", user_id))\
.order_by("updated_at", direction=firestore.Query.DESCENDING)\
.stream()
chats = []
for doc in docs:
chat = doc.to_dict()
chat["chat_id"] = doc.id
if "user_id" in chat:
del chat["user_id"]
if "messages" in chat:
del chat["messages"]
if "created_at" in chat:
del chat["created_at"]
chats.append(chat)
return chats
chats = await _asyncio.to_thread(_fetch_chats)
logger.info(f"獲取到用戶 {user_id}{len(chats)} 個對話")
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
serialized_chats = _serialize_firestore_data(chats)
return {"success": True, "chats": serialized_chats}
except Exception as e:
logger.error(f"獲取用戶對話時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def get_chat(chat_id):
if chats_collection is None:
logger.error("Firestore尚未連接,無法獲取對話")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _get_doc():
doc = chats_collection.document(chat_id).get()
return doc if doc.exists else None
doc = await _asyncio.to_thread(_get_doc)
if not doc:
logger.warning(f"對話 {chat_id} 不存在")
return {"success": False, "error": "對話不存在"}
chat = doc.to_dict() or {}
chat["chat_id"] = doc.id
# 從 chat 子集合讀取完整對話(按時間升序)
try:
def _fetch_msgs():
ref = _get_chat_messages_collection(chat_id)
return [
{**doc.to_dict(), "id": doc.id}
for doc in ref.order_by("timestamp").stream()
]
msgs = await _asyncio.to_thread(_fetch_msgs)
chat["messages"] = msgs
logger.info(f"獲取到對話 {chat_id},包含 {len(msgs)} 條消息(chat 子集合)")
except Exception as _e:
# 向後相容:若讀取失敗,退回文件內嵌 messages(若存在)
msgs_fallback = chat.get('messages', []) or []
chat["messages"] = msgs_fallback
logger.warning(f"讀取 chat 子集合失敗,使用內嵌 messages。原因: {_e}")
return {"success": True, "chat": chat}
except Exception as e:
logger.error(f"獲取對話時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def save_chat_message(chat_id, sender, content):
"""保存對話消息(chat/{chat_id}/messages 子集合作為主要儲存)"""
if chats_collection is None:
logger.error("Firestore尚未連接,無法保存消息")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
now = datetime.now()
message = {
"chat_id": chat_id,
"sender": sender,
"content": content,
"timestamp": now,
}
def _write_message():
ref = _get_chat_messages_collection(chat_id)
ref.add(message)
def _write_legacy_copy():
if messages_collection is None:
return
try:
messages_collection.add(message)
except Exception as legacy_err: # pragma: no cover
logger.debug(f"寫入頂層 messages 集合失敗(兼容用途,可忽略): {legacy_err}")
def _touch_chat():
doc_ref = chats_collection.document(chat_id)
snap = doc_ref.get()
if not snap.exists:
return False
doc_ref.update({"updated_at": now})
return True
await _asyncio.to_thread(_write_message)
# 兼容舊資料模型:非阻塞地寫入頂層 messages 集合,供舊功能查詢使用
await _asyncio.to_thread(_write_legacy_copy)
touched = await _asyncio.to_thread(_touch_chat)
if not touched:
logger.warning(f"對話 {chat_id} 不存在,但消息已寫入 chat 子集合")
logger.info(f"消息已保存到 chat 子集合(chat_id={chat_id})")
return {"success": True, "message": message}
except Exception as e:
logger.error(f"保存消息時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def get_chat_messages(chat_id: str, limit: int | None = None, ascending: bool = True):
"""讀取指定對話的消息(優先使用 chat 子集合)"""
if chats_collection is None:
logger.error("Firestore尚未連接,無法讀取消息")
return []
try:
import asyncio as _asyncio
from google.cloud import firestore as _fs
def _query():
ref = _get_chat_messages_collection(chat_id)
direction = _fs.Query.ASCENDING if ascending else _fs.Query.DESCENDING
q = ref.order_by("timestamp", direction=direction)
if limit and limit > 0:
q = q.limit(limit)
docs = q.stream()
records = []
for doc in docs:
data = doc.to_dict()
data["id"] = doc.id
records.append(data)
if not ascending:
records = list(reversed(records))
return records
messages = await _asyncio.to_thread(_query)
if messages:
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
return _serialize_firestore_data(messages)
# 向後相容:若子集合無資料,嘗試讀取舊頂層 messages 集合
if messages_collection is None:
return []
def _legacy_query():
docs = messages_collection.where(filter=FieldFilter("chat_id", "==", chat_id)).stream()
legacy = [d.to_dict() for d in docs]
legacy.sort(key=lambda item: item.get("timestamp"))
if limit and limit > 0:
legacy = legacy[:limit]
return legacy
legacy_sorted = await _asyncio.to_thread(_legacy_query)
view_messages = list(legacy_sorted)
if not ascending:
view_messages.reverse()
if legacy_sorted:
def _backfill():
try:
ref = _get_chat_messages_collection(chat_id)
# 若子集合仍為空,將舊資料搬遷過去
has_existing = any(True for _ in ref.limit(1).stream())
if has_existing:
return
for legacy_msg in legacy_sorted:
ref.add(legacy_msg)
logger.info(f"已將 legacy messages 回填至 chat 子集合(chat_id={chat_id})")
except Exception as backfill_err:
logger.warning(f"回填 legacy messages 失敗(可忽略): {backfill_err}")
await _asyncio.to_thread(_backfill)
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
return _serialize_firestore_data(view_messages)
except Exception as e:
logger.error(f"讀取對話消息失敗: {e}")
return []
async def update_chat_title(chat_id, title):
if chats_collection is None:
logger.error("Firestore尚未連接,無法更新對話標題")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _update_doc():
doc_ref = chats_collection.document(chat_id)
doc = doc_ref.get()
if not doc.exists:
return False
doc_ref.update({
"title": title,
"updated_at": datetime.now(),
})
return True
updated = await _asyncio.to_thread(_update_doc)
if not updated:
logger.warning(f"對話 {chat_id} 不存在,無法更新標題")
return {"success": False, "error": "對話不存在"}
logger.info(f"對話 {chat_id} 標題已更新為 '{title}'")
return {"success": True}
except Exception as e:
logger.error(f"更新對話標題時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def delete_chat(chat_id):
if chats_collection is None:
logger.error("Firestore尚未連接,無法刪除對話")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _delete_doc():
doc_ref = chats_collection.document(chat_id)
doc = doc_ref.get()
if not doc.exists:
return False
# 先刪除子集合中的消息,避免孤兒資料
try:
messages_ref = _get_chat_messages_collection(chat_id)
for msg_snapshot in messages_ref.stream():
msg_snapshot.reference.delete()
except Exception as msg_err:
logger.warning(f"刪除對話 {chat_id} 的子消息時發生錯誤:{msg_err}")
doc_ref.delete()
return True
deleted = await _asyncio.to_thread(_delete_doc)
if not deleted:
logger.warning(f"對話 {chat_id} 不存在,無法刪除")
return {"success": False, "error": "對話不存在"}
logger.info(f"對話 {chat_id} 已刪除")
return {"success": True}
except Exception as e:
logger.error(f"刪除對話時發生錯誤: {e}")
return {"success": False, "error": str(e)}
# ===== 對話情緒記憶 =====
async def set_chat_emotion(chat_id: str, emotion: dict):
"""為指定對話記錄最近的情緒狀態(label, confidence, timestamp)。"""
if chats_collection is None:
logger.error("Firestore尚未連接,無法設定對話情緒")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _update_doc():
doc_ref = chats_collection.document(chat_id)
doc = doc_ref.get()
if not doc.exists:
return False
payload = {
"label": emotion.get("label"),
"confidence": emotion.get("confidence"),
"timestamp": datetime.now(),
}
doc_ref.update({
"context.emotion": payload,
"updated_at": datetime.now(),
})
return True
updated = await _asyncio.to_thread(_update_doc)
if not updated:
return {"success": False, "error": "對話不存在"}
return {"success": True}
except Exception as e:
logger.error(f"設定對話情緒時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def get_chat_emotion(chat_id: str):
"""取得對話記錄的最近情緒狀態。"""
if chats_collection is None:
logger.error("Firestore尚未連接,無法讀取對話情緒")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _get_doc():
doc = chats_collection.document(chat_id).get()
return doc if doc.exists else None
doc = await _asyncio.to_thread(_get_doc)
if not doc:
return {"success": False, "error": "對話不存在"}
data = doc.to_dict() or {}
emotion = (data.get("context") or {}).get("emotion")
return {"success": True, "emotion": emotion}
except Exception as e:
logger.error(f"讀取對話情緒時發生錯誤: {e}")
return {"success": False, "error": str(e)}
# ===== 語音登入:使用者與說話者標籤關聯 =====
async def set_user_speaker_label(user_id: str, speaker_label: str):
if users_collection is None:
logger.error("Firestore尚未連接,無法設定語音標籤")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _is_label_taken():
docs = list(users_collection.where(filter=FieldFilter("speaker_label", "==", speaker_label)).limit(1).stream())
return docs[0] if docs else None
existing_label = await _asyncio.to_thread(_is_label_taken)
if existing_label and existing_label.to_dict().get("user_id") != user_id:
return {"success": False, "error": "SPEAKER_LABEL_TAKEN"}
def _update_user():
doc_ref = users_collection.document(user_id)
doc = doc_ref.get()
if not doc.exists:
return False
doc_ref.update({"speaker_label": speaker_label})
return True
updated = await _asyncio.to_thread(_update_user)
if not updated:
return {"success": False, "error": "USER_NOT_FOUND"}
return {"success": True}
except Exception as e:
logger.error(f"設定語音標籤時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def get_user_by_speaker_label(speaker_label: str):
if users_collection is None:
logger.error("Firestore尚未連接,無法查詢語音標籤")
return None
try:
import asyncio as _asyncio
def _fetch_user():
docs = list(users_collection.where(filter=FieldFilter("speaker_label", "==", speaker_label)).limit(1).stream())
return docs[0] if docs else None
doc = await _asyncio.to_thread(_fetch_user)
if not doc:
return None
data = doc.to_dict()
result = {
"id": data.get("user_id"),
"name": data.get("name", ""),
"email": data.get("email", ""),
"created_at": data.get("created_at"),
}
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
return _serialize_firestore_data(result)
except Exception as e:
logger.error(f"查詢語音標籤對應用戶時發生錯誤: {e}")
return None
# ===== 專門記憶系統 =====
async def save_memory(
user_id: str,
memory_type: str,
content: str,
importance: float = 1.0,
metadata: dict | None = None,
) -> Dict[str, Any]:
"""保存重要記憶到 Firestore"""
if users_collection is None or firestore_db is None:
logger.error("Firestore尚未連接,無法保存記憶")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
now = datetime.now()
sanitized_importance = max(0.0, min(1.0, importance))
metadata_payload = metadata.copy() if metadata else {}
metadata_payload.setdefault("source", "unknown")
metadata_payload.setdefault("last_updated_by", "memory_system")
metadata_payload["updated_at"] = now.isoformat()
context_tags = metadata_payload.get("context_tags", [])
if not isinstance(context_tags, list):
context_tags = list(context_tags) if context_tags else []
metadata_payload["context_tags"] = context_tags
triggers = metadata_payload.get("triggers", [])
if not isinstance(triggers, list):
triggers = list(triggers) if triggers else []
metadata_payload["triggers"] = triggers
col_ref = _get_user_memories_collection(user_id)
content_hash = hashlib.sha1(content.strip().lower().encode("utf-8")).hexdigest()
def _ensure_user_stub():
user_doc = _get_user_doc_ref(user_id)
snap = user_doc.get()
if not snap.exists:
user_doc.set(
{
"user_id": user_id,
"created_at": now,
"updated_at": now,
},
merge=True,
)
def _find_existing():
docs = (
col_ref.where(filter=FieldFilter("content_hash", "==", content_hash))
.limit(1)
.stream()
)
for doc in docs:
return doc
return None
await _asyncio.to_thread(_ensure_user_stub)
existing_doc = await _asyncio.to_thread(_find_existing)
if existing_doc:
doc_ref = existing_doc.reference
def _update_memory():
doc_ref.update({
"content": content,
"importance": sanitized_importance,
"metadata": metadata_payload,
"updated_at": now,
"access_count": Increment(1),
"last_accessed": now,
"content_hash": content_hash,
})
await _asyncio.to_thread(_update_memory)
logger.info(f"更新用戶 {user_id} 的記憶: {memory_type}")
return {"success": True, "action": "updated", "memory_id": existing_doc.id}
def _create_memory():
doc_ref = col_ref.document()
doc_ref.set({
"user_id": user_id,
"type": memory_type,
"content": content,
"importance": sanitized_importance,
"metadata": metadata_payload,
"access_count": 0,
"last_accessed": now,
"updated_at": now,
"created_at": now,
"content_hash": content_hash,
})
return doc_ref.id
memory_id = await _asyncio.to_thread(_create_memory)
await _asyncio.to_thread(_enforce_memory_quota, col_ref)
logger.info(f"保存用戶 {user_id} 的新記憶: {memory_type}")
return {"success": True, "action": "created", "memory_id": memory_id}
except Exception as e:
logger.error(f"保存記憶時發生錯誤: {e}")
return {"success": False, "error": str(e)}
# ===== 環境 Context(位置/方位/時序) =====
async def set_user_env_current(user_id: str, ctx: Dict[str, Any]) -> Dict[str, Any]:
"""更新使用者環境現況 users/{uid}/context/current(含 TTL/新鮮度由讀取端判斷)。"""
if users_collection is None:
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
now = datetime.now()
def _update():
user_doc = _get_user_doc_ref(user_id)
ctx_ref = user_doc.collection('context').document('current')
payload = ctx.copy()
payload['updated_at'] = now
ctx_ref.set(payload, merge=True)
return True
await _asyncio.to_thread(_update)
return {"success": True}
except Exception as e:
logger.error(f"更新環境現況失敗: {e}")
return {"success": False, "error": str(e)}
async def add_user_env_snapshot(user_id: str, snapshot: Dict[str, Any]) -> Dict[str, Any]:
"""新增使用者環境快照 users/{uid}/context/snapshots。僅保留短期歷史。"""
if users_collection is None:
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
now = datetime.now()
def _write():
user_doc = _get_user_doc_ref(user_id)
col = user_doc.collection('context').document('meta').collection('snapshots')
payload = snapshot.copy()
payload['created_at'] = now
col.add(payload)
return True
await _asyncio.to_thread(_write)
return {"success": True}
except Exception as e:
logger.error(f"寫入環境快照失敗: {e}")
return {"success": False, "error": str(e)}
async def get_user_env_current(user_id: str) -> Dict[str, Any]:
"""讀取使用者環境現況。"""
if users_collection is None:
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _read():
user_doc = _get_user_doc_ref(user_id)
ctx_ref = user_doc.collection('context').document('current')
snap = ctx_ref.get()
return snap.to_dict() if snap.exists else None
data = await _asyncio.to_thread(_read)
if not data:
return {"success": False, "error": "NOT_FOUND"}
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
serialized_data = _serialize_firestore_data(data)
return {"success": True, "context": serialized_data}
except Exception as e:
logger.error(f"讀取環境現況失敗: {e}")
return {"success": False, "error": str(e)}
# ===== 反地理/路線 全域快取集合 =====
async def get_geo_cache(geohash7: str) -> Optional[Dict[str, Any]]:
if geo_cache_collection is None:
return None
try:
import asyncio as _asyncio
def _read():
doc = geo_cache_collection.document(geohash7).get()
return doc.to_dict() if doc.exists else None
result = await _asyncio.to_thread(_read)
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
return _serialize_firestore_data(result) if result else None
except Exception as e:
logger.warning(f"讀取 geo_cache 失敗: {e}")
return None
async def set_geo_cache(geohash7: str, payload: Dict[str, Any]) -> bool:
if geo_cache_collection is None:
return False
try:
import asyncio as _asyncio
now = datetime.now()
def _write():
data = payload.copy()
data['cached_at'] = now
geo_cache_collection.document(geohash7).set(data, merge=True)
return True
return await _asyncio.to_thread(_write)
except Exception as e:
logger.warning(f"寫入 geo_cache 失敗: {e}")
return False
async def get_route_cache(key: str) -> Optional[Dict[str, Any]]:
if route_cache_collection is None:
return None
try:
import asyncio as _asyncio
def _read():
doc = route_cache_collection.document(key).get()
return doc.to_dict() if doc.exists else None
result = await _asyncio.to_thread(_read)
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
return _serialize_firestore_data(result) if result else None
except Exception as e:
logger.warning(f"讀取 route_cache 失敗: {e}")
return None
async def set_route_cache(key: str, payload: Dict[str, Any]) -> bool:
if route_cache_collection is None:
return False
try:
import asyncio as _asyncio
now = datetime.now()
def _write():
data = payload.copy()
data['cached_at'] = now
route_cache_collection.document(key).set(data, merge=True)
return True
return await _asyncio.to_thread(_write)
except Exception as e:
logger.warning(f"寫入 route_cache 失敗: {e}")
return False
async def get_user_memories(
user_id: str,
memory_type: str | None = None,
limit: int = 10,
min_importance: float = 0.0,
) -> Dict[str, Any]:
"""獲取用戶的記憶"""
if users_collection is None:
logger.error("Firestore尚未連接,無法獲取記憶")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _fetch_memories():
col_ref = _get_user_memories_collection(user_id)
query = col_ref.where(filter=FieldFilter("importance", ">=", min_importance))
if memory_type:
query = query.where(filter=FieldFilter("type", "==", memory_type))
docs = (
query.order_by("importance", direction=firestore.Query.DESCENDING)
.order_by("updated_at", direction=firestore.Query.DESCENDING)
.limit(limit)
.stream()
)
return [doc.to_dict() | {"memory_id": doc.id} for doc in docs]
memories = await _asyncio.to_thread(_fetch_memories)
def _mark_accessed(mem_ids):
col_ref = _get_user_memories_collection(user_id)
now_inner = datetime.now()
for mid in mem_ids:
col_ref.document(mid).update({
"access_count": Increment(1),
"last_accessed": now_inner,
})
if memories:
await _asyncio.to_thread(_mark_accessed, [m["memory_id"] for m in memories])
logger.info(f"獲取到用戶 {user_id}{len(memories)} 條記憶")
# 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
serialized_memories = _serialize_firestore_data(memories)
return {"success": True, "memories": serialized_memories}
except Exception as e:
logger.error(f"獲取記憶時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def search_memories(user_id: str, query_text: str, limit: int = 5) -> Dict[str, Any]:
"""基於簡易文本匹配的記憶搜索"""
if users_collection is None:
logger.error("Firestore尚未連接,無法搜索記憶")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
normalized_query = query_text.lower()
def _candidate_memories():
col_ref = _get_user_memories_collection(user_id)
docs = (
col_ref.order_by("updated_at", direction=firestore.Query.DESCENDING)
.limit(80)
.stream()
)
results = []
for doc in docs:
data = doc.to_dict() or {}
haystack = "{} {}".format(
data.get("content", ""),
" ".join(data.get("metadata", {}).get("context_tags", [])),
).lower()
if normalized_query in haystack:
data["memory_id"] = doc.id
results.append(data)
if len(results) >= limit:
break
return results
memories = await _asyncio.to_thread(_candidate_memories)
def _mark_accessed(mem_ids):
col_ref = _get_user_memories_collection(user_id)
now_inner = datetime.now()
for mid in mem_ids:
col_ref.document(mid).update({
"access_count": Increment(1),
"last_accessed": now_inner,
})
if memories:
await _asyncio.to_thread(_mark_accessed, [m["memory_id"] for m in memories])
logger.info(f"搜索到用戶 {user_id}{len(memories)} 條相關記憶")
return {"success": True, "memories": memories}
except Exception as e:
logger.error(f"搜索記憶時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def update_memory_importance(memory_id: str, new_importance: float):
"""更新記憶的重要性分數"""
if users_collection is None:
logger.error("Firestore尚未連接,無法更新記憶")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
sanitized_importance = max(0.0, min(1.0, new_importance))
now = datetime.now()
def _update_doc():
users = users_collection.stream()
for user_doc in users:
mem_ref = user_doc.reference.collection("memories").document(memory_id)
snapshot = mem_ref.get()
if snapshot.exists:
mem_ref.update({
"importance": sanitized_importance,
"updated_at": now,
})
return True
return False
updated = await _asyncio.to_thread(_update_doc)
if not updated:
return {"success": False, "error": "記憶不存在"}
logger.info(f"更新記憶 {memory_id} 的重要性為 {sanitized_importance}")
return {"success": True}
except Exception as e:
logger.error(f"更新記憶重要性時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def delete_memory(memory_id: str):
"""刪除記憶"""
if users_collection is None:
logger.error("Firestore尚未連接,無法刪除記憶")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
def _delete_doc():
users = users_collection.stream()
for user_doc in users:
mem_ref = user_doc.reference.collection("memories").document(memory_id)
snapshot = mem_ref.get()
if snapshot.exists:
mem_ref.delete()
return True
return False
deleted = await _asyncio.to_thread(_delete_doc)
if not deleted:
return {"success": False, "error": "記憶不存在"}
logger.info(f"刪除記憶 {memory_id}")
return {"success": True}
except Exception as e:
logger.error(f"刪除記憶時發生錯誤: {e}")
return {"success": False, "error": str(e)}
async def cleanup_old_memories(user_id: str, days_old: int = 90, min_importance: float = 0.3):
"""清理舊的、低重要性的記憶
Args:
user_id: 用戶ID
days_old: 刪除多少天前的記憶
min_importance: 保留的最小重要性分數
"""
if users_collection is None:
logger.error("Firestore尚未連接,無法清理記憶")
return {"success": False, "error": "數據庫未連接"}
try:
import asyncio as _asyncio
from datetime import timedelta
cutoff_date = datetime.now() - timedelta(days=days_old)
def _delete_old():
col_ref = _get_user_memories_collection(user_id)
docs = (
col_ref.where(filter=FieldFilter("importance", "<", min_importance))
.where(filter=FieldFilter("updated_at", "<", cutoff_date))
.stream()
)
deleted_count = 0
for doc in docs:
doc.reference.delete()
deleted_count += 1
return deleted_count
deleted = await _asyncio.to_thread(_delete_old)
logger.info(f"為用戶 {user_id} 清理 {deleted} 條舊記憶")
return {"success": True, "deleted": deleted}
except Exception as e:
logger.error(f"清理記憶時發生錯誤: {e}")
return {"success": False, "error": str(e)}
def _enforce_memory_quota(col_ref: CollectionReference) -> None:
docs = list(
col_ref.order_by("importance", direction=firestore.Query.ASCENDING)
.order_by("updated_at", direction=firestore.Query.ASCENDING)
.stream()
)
if len(docs) <= MAX_MEMORIES_PER_USER:
return
excess = len(docs) - MAX_MEMORIES_PER_USER
for doc in docs[:excess]:
doc.reference.delete()