Spaces:
Running
Running
| import os | |
| import sys | |
| import logging | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| from dotenv import load_dotenv | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| from google.cloud.firestore import DocumentReference, CollectionReference, FieldFilter | |
| from google.cloud.firestore_v1 import ArrayUnion, Increment | |
| import asyncio | |
| import hashlib | |
| # 設置日誌 | |
| LOG_LEVEL_NAME = os.getenv("BLOOMWARE_LOG_LEVEL", "WARNING").upper() | |
| LOG_LEVEL = getattr(logging, LOG_LEVEL_NAME, logging.WARNING) | |
| logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger("Firestore") | |
| logger.setLevel(LOG_LEVEL) | |
| # 載入環境變數 | |
| load_dotenv() | |
| # 統一配置管理 | |
| from core.config import settings | |
| # 全局變數 | |
| firestore_db = None | |
| users_collection = None | |
| chats_collection = None | |
| messages_collection = None | |
| memories_collection = None | |
| health_data_collection = None | |
| device_bindings_collection = None | |
| geo_cache_collection = None | |
| route_cache_collection = None | |
| # 記憶儲存相關設定 | |
| MAX_MEMORIES_PER_USER = 500 | |
| def _serialize_firestore_data(data: Any) -> Any: | |
| """ | |
| 遞迴轉換 Firestore 資料中的 DatetimeWithNanoseconds 物件為 ISO 字串 | |
| Args: | |
| data: Firestore 回傳的資料(可能包含 DatetimeWithNanoseconds) | |
| Returns: | |
| JSON 可序列化的資料 | |
| """ | |
| from google.cloud.firestore_v1._helpers import DatetimeWithNanoseconds | |
| if isinstance(data, DatetimeWithNanoseconds): | |
| # 轉成 ISO 8601 字串 | |
| return data.isoformat() | |
| elif isinstance(data, datetime): | |
| # 一般 Python datetime 也轉成字串 | |
| return data.isoformat() | |
| elif isinstance(data, dict): | |
| # 遞迴處理字典 | |
| return {k: _serialize_firestore_data(v) for k, v in data.items()} | |
| elif isinstance(data, list): | |
| # 遞迴處理列表 | |
| return [_serialize_firestore_data(item) for item in data] | |
| else: | |
| # 其他型別直接回傳 | |
| return data | |
| def _get_user_doc_ref(user_id: str) -> DocumentReference: | |
| if users_collection is None: | |
| raise RuntimeError("Firestore尚未連接,無法操作使用者資料") | |
| return users_collection.document(user_id) | |
| def _get_user_memories_collection(user_id: str) -> CollectionReference: | |
| return _get_user_doc_ref(user_id).collection("memories") | |
| def _get_chat_messages_collection(chat_id: str) -> CollectionReference: | |
| if chats_collection is None: | |
| raise RuntimeError("Firestore尚未連接,無法取得對話消息集合") | |
| return chats_collection.document(chat_id).collection("messages") | |
| def connect_to_firestore(): | |
| """初始化 Firebase Firestore 連接""" | |
| global firestore_db, messages_collection, users_collection, chats_collection, memories_collection, health_data_collection, device_bindings_collection | |
| firebase_project_id = settings.FIREBASE_PROJECT_ID | |
| if not firebase_project_id: | |
| logger.error("Firebase專案ID未正確設置,請在.env文件中設置FIREBASE_PROJECT_ID環境變數") | |
| logger.error("\n❌ 錯誤: Firebase專案ID未設置!請在.env文件中設置FIREBASE_PROJECT_ID\n") | |
| return False | |
| try: | |
| logger.info("正在嘗試連接Firebase Firestore...") | |
| logger.info("\n🔄 正在連接Firebase Firestore數據庫...\n") | |
| # 檢查是否已經初始化 Firebase | |
| try: | |
| firebase_admin.get_app() | |
| logger.info("Firebase 已初始化,跳過重複初始化") | |
| except ValueError: | |
| # 從統一配置取得 Firebase 憑證(支援環境變數或檔案) | |
| try: | |
| firebase_creds_dict = settings.get_firebase_credentials() | |
| cred = credentials.Certificate(firebase_creds_dict) | |
| firebase_admin.initialize_app(cred, { | |
| 'projectId': firebase_project_id, | |
| }) | |
| logger.info(f"Firebase 初始化成功(專案ID:{firebase_project_id})") | |
| except ValueError as e: | |
| logger.error(f"Firebase 憑證載入失敗: {e}") | |
| logger.error(f"\n❌ 錯誤: Firebase 憑證載入失敗!{e}\n") | |
| return False | |
| # 初始化 Firestore 客戶端 | |
| firestore_db = firestore.client() | |
| # 測試連接 | |
| test_doc = firestore_db.collection('_test_connection').document('test') | |
| test_doc.set({'timestamp': datetime.now(), 'test': True}) | |
| test_doc.delete() # 清理測試文檔 | |
| # 初始化集合引用 | |
| messages_collection = firestore_db.collection('messages') | |
| users_collection = firestore_db.collection('users') | |
| chats_collection = firestore_db.collection('chats') | |
| health_data_collection = firestore_db.collection('health_data') | |
| device_bindings_collection = firestore_db.collection('device_bindings') | |
| # 其他集合 | |
| global geo_cache_collection, route_cache_collection | |
| geo_cache_collection = firestore_db.collection('geo_cache') | |
| route_cache_collection = firestore_db.collection('route_cache') | |
| logger.info(f"✅ Firestore連接成功,專案ID:{firebase_project_id}") | |
| logger.info(f"\n✅ Firebase Firestore連接成功!專案ID:{firebase_project_id}\n") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Firebase Firestore連接失敗:{e}") | |
| logger.error(f"\n❌ Firebase Firestore連接失敗:{e}\n") | |
| logger.error("🔧 故障排除建議:") | |
| logger.error("1. 檢查網絡連接") | |
| logger.error("2. 確認Firebase服務帳戶金鑰文件路徑正確") | |
| logger.error("3. 驗證Firebase專案ID是否正確") | |
| logger.error("4. 確保Firestore Database已在Firebase Console中啟用") | |
| logger.error("5. 檢查服務帳戶權限是否包含Firestore權限") | |
| return False | |
| def ensure_indexes(): | |
| """Firestore 不需要手動創建索引,由 Google 自動優化""" | |
| logger.info("Firestore 自動處理索引優化,無需手動創建索引") | |
| async def get_user_by_id(user_id: str): | |
| """根據使用者ID查找使用者,返回公共資訊""" | |
| if users_collection is None: | |
| logger.error("Firestore尚未連接,無法查找使用者") | |
| return None | |
| try: | |
| # Firestore 查詢 - 使用新語法 | |
| query = users_collection.where(filter=FieldFilter("user_id", "==", user_id)).limit(1) | |
| docs = query.get() | |
| if not docs: | |
| return None | |
| user_doc = docs[0] | |
| user_data = user_doc.to_dict() | |
| result = { | |
| "id": user_data["user_id"], | |
| "name": user_data.get("name", ""), | |
| "email": user_data.get("email", ""), | |
| "created_at": user_data.get("created_at"), | |
| } | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| return _serialize_firestore_data(result) | |
| except Exception as e: | |
| logger.error(f"查找使用者時發生錯誤: {e}") | |
| return None | |
| # 已移除舊的內嵌測試函式 test_connection,避免在生產代碼夾雜測試邏輯 | |
| async def save_message(user_id, content, is_bot=False): | |
| """保存消息到數據庫""" | |
| if messages_collection is None: | |
| logger.error("Firestore尚未連接,無法保存消息") | |
| return False | |
| try: | |
| message = { | |
| "user_id": user_id, # 使用user_id字段存儲用戶ID | |
| "content": content, | |
| "is_bot": is_bot, | |
| "timestamp": datetime.now(), | |
| } | |
| import asyncio as _asyncio | |
| await _asyncio.to_thread(lambda: messages_collection.add(message)) | |
| logger.debug(f"消息已保存到 Firestore") | |
| return True | |
| except Exception as e: | |
| logger.error(f"保存消息時發生錯誤: {e}") | |
| return False | |
| async def get_user_history(user_id, limit=20): | |
| """獲取用戶的歷史對話記錄""" | |
| if messages_collection is None: | |
| logger.error("Firestore尚未連接,無法獲取歷史記錄") | |
| return [] | |
| try: | |
| import asyncio as _asyncio | |
| def _fetch_messages(): | |
| docs = messages_collection.where(filter=FieldFilter("user_id", "==", user_id))\ | |
| .order_by("timestamp")\ | |
| .limit(limit)\ | |
| .stream() | |
| return [doc.to_dict() for doc in docs] | |
| messages = await _asyncio.to_thread(_fetch_messages) | |
| logger.info(f"已獲取用戶 {user_id} 的 {len(messages)} 條歷史記錄") | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| return _serialize_firestore_data(messages) | |
| except Exception as e: | |
| logger.error(f"獲取歷史記錄時發生錯誤: {e}") | |
| return [] | |
| # Google OAuth 2.0 用戶認證 | |
| async def create_or_login_google_user(google_token_info): | |
| """Google OAuth 唯一登入入口,自動處理首次註冊和後續登入""" | |
| if users_collection is None or firestore_db is None: | |
| logger.error("Firestore尚未連接,無法處理用戶認證") | |
| return {"success": False, "error": "數據庫未連接"} | |
| # 檢查 Firestore 連接狀態 | |
| try: | |
| logger.info("🔍 檢查 Firestore 連接狀態...") | |
| # 快速連接測試 | |
| import asyncio as _asyncio | |
| def _test_connection(): | |
| test_ref = firestore_db.collection('_connection_test').document('ping') | |
| test_ref.set({'ping': 'test'}, merge=True) | |
| test_ref.delete() | |
| return True | |
| await _asyncio.wait_for( | |
| _asyncio.to_thread(_test_connection), | |
| timeout=5.0 # 5秒連接測試超時 | |
| ) | |
| logger.info("✅ Firestore 連接正常") | |
| except _asyncio.TimeoutError: | |
| logger.error("❌ Firestore 連接測試超時") | |
| return {"success": False, "error": "數據庫連接超時"} | |
| except Exception as e: | |
| logger.error(f"❌ Firestore 連接測試失敗: {e}") | |
| return {"success": False, "error": f"數據庫連接異常: {str(e)}"} | |
| google_id = google_token_info.get("id") or google_token_info.get("sub") | |
| if not google_id: | |
| logger.error(f"Google用戶信息中缺少ID字段,收到的信息: {google_token_info}") | |
| return {"success": False, "error": "INVALID_GOOGLE_USER_INFO"} | |
| email = google_token_info.get("email") | |
| if not email: | |
| logger.error(f"Google用戶信息中缺少email字段,收到的信息: {google_token_info}") | |
| return {"success": False, "error": "INVALID_GOOGLE_USER_INFO"} | |
| logger.info(f"🔍 處理Google用戶: google_id={google_id}, email={email}") | |
| try: | |
| import asyncio as _asyncio | |
| def _fetch_existing_user(): | |
| try: | |
| logger.info(f"🔍 查詢現有用戶: google_id={google_id}") | |
| # 使用新的 filter 語法 | |
| query = users_collection.where(filter=FieldFilter("google_id", "==", google_id)).limit(1) | |
| docs = list(query.stream()) | |
| logger.info(f"🔍 查詢結果: 找到 {len(docs)} 個用戶") | |
| return docs[0] if docs else None | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| if "quota" in error_msg or "exceeded" in error_msg: | |
| logger.error(f"❌ Firestore 配額已超出限制: {e}") | |
| raise Exception("FIRESTORE_QUOTA_EXCEEDED") | |
| else: | |
| logger.error(f"❌ Firestore 查詢失敗: {e}") | |
| raise e | |
| logger.info(f"📤 開始查詢用戶...") | |
| # 添加超時機制 | |
| try: | |
| user_doc = await _asyncio.wait_for( | |
| _asyncio.to_thread(_fetch_existing_user), | |
| timeout=10.0 # 10秒超時 | |
| ) | |
| logger.info(f"🔍 用戶查詢完成: {'找到現有用戶' if user_doc else '未找到用戶'}") | |
| except _asyncio.TimeoutError: | |
| logger.error("❌ Firestore 查詢超時(10秒)") | |
| return {"success": False, "error": "數據庫查詢超時"} | |
| except Exception as e: | |
| error_str = str(e) | |
| if "FIRESTORE_QUOTA_EXCEEDED" in error_str: | |
| logger.error("❌ Firestore 每日配額已用完") | |
| return { | |
| "success": False, | |
| "error": "QUOTA_EXCEEDED", | |
| "message": "Firestore 每日配額已用完,請稍後再試或聯繫管理員升級服務" | |
| } | |
| else: | |
| logger.error(f"❌ 用戶查詢異常: {e}") | |
| return {"success": False, "error": f"用戶查詢失敗: {str(e)}"} | |
| if user_doc: | |
| user_data = user_doc.to_dict() | |
| def _update_user(): | |
| users_collection.document(user_doc.id).update({ | |
| "name": google_token_info.get("name", user_data.get("name")), | |
| "picture": google_token_info.get("picture", user_data.get("picture")), | |
| "last_login": datetime.now(), | |
| "updated_at": datetime.now() | |
| }) | |
| await _asyncio.to_thread(_update_user) | |
| logger.info(f"用戶 {email} 登入成功,user_id: {user_data.get('user_id')}") | |
| return { | |
| "success": True, | |
| "user": { | |
| "id": user_data.get("user_id", google_id), | |
| "name": user_data.get("name", ""), | |
| "email": user_data.get("email", email), | |
| "picture": user_data.get("picture"), | |
| "created_at": user_data.get("created_at") | |
| }, | |
| "is_new_user": False | |
| } | |
| logger.info(f"📤 創建新用戶...") | |
| now = datetime.now() | |
| new_user = { | |
| "user_id": google_id, | |
| "google_id": google_id, | |
| "email": email, | |
| "name": google_token_info.get("name", ""), | |
| "picture": google_token_info.get("picture"), | |
| "locale": google_token_info.get("locale", "zh-TW"), | |
| "first_login": now, | |
| "last_login": now, | |
| "created_at": now, | |
| "updated_at": now | |
| } | |
| logger.info(f"🔍 新用戶數據: {new_user}") | |
| logger.info(f"📤 寫入Firestore...") | |
| await _asyncio.to_thread(lambda: users_collection.document(google_id).set(new_user)) | |
| logger.info(f"✅ 新用戶 {email} 註冊成功,user_id: {google_id}") | |
| return { | |
| "success": True, | |
| "user": { | |
| "id": google_id, | |
| "name": new_user["name"], | |
| "email": new_user["email"], | |
| "picture": new_user["picture"], | |
| "created_at": new_user["created_at"] | |
| }, | |
| "is_new_user": True | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Google OAuth 認證時發生錯誤: {e}") | |
| logger.error(f"❌ 錯誤類型: {type(e).__name__}") | |
| logger.error(f"❌ 錯誤堆疊:", exc_info=True) | |
| return {"success": False, "error": str(e)} | |
| # 對話管理 | |
| async def create_chat(user_id, title="新對話"): | |
| if chats_collection is None: | |
| logger.error("Firestore尚未連接,無法創建對話") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| chat = { | |
| "user_id": user_id, | |
| "title": title, | |
| "messages": [], | |
| "created_at": datetime.now(), | |
| "updated_at": datetime.now(), | |
| } | |
| import asyncio as _asyncio | |
| doc_ref = await _asyncio.to_thread(lambda: chats_collection.add(chat)) | |
| chat_id = doc_ref[1].id | |
| logger.info(f"為用戶 {user_id} 創建了新對話,ID: {chat_id}") | |
| chat_info = { | |
| "chat_id": chat_id, | |
| "user_id": user_id, | |
| "title": title, | |
| "created_at": chat["created_at"], | |
| "updated_at": chat["updated_at"], | |
| } | |
| # 序列化時間物件,避免 JSON 序列化炸裂 | |
| serialized_chat_info = _serialize_firestore_data(chat_info) | |
| return {"success": True, "chat": serialized_chat_info} | |
| except Exception as e: | |
| logger.error(f"創建對話時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def get_user_chats(user_id): | |
| if chats_collection is None: | |
| logger.error("Firestore尚未連接,無法獲取對話") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _fetch_chats(): | |
| docs = chats_collection.where(filter=FieldFilter("user_id", "==", user_id))\ | |
| .order_by("updated_at", direction=firestore.Query.DESCENDING)\ | |
| .stream() | |
| chats = [] | |
| for doc in docs: | |
| chat = doc.to_dict() | |
| chat["chat_id"] = doc.id | |
| if "user_id" in chat: | |
| del chat["user_id"] | |
| if "messages" in chat: | |
| del chat["messages"] | |
| if "created_at" in chat: | |
| del chat["created_at"] | |
| chats.append(chat) | |
| return chats | |
| chats = await _asyncio.to_thread(_fetch_chats) | |
| logger.info(f"獲取到用戶 {user_id} 的 {len(chats)} 個對話") | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| serialized_chats = _serialize_firestore_data(chats) | |
| return {"success": True, "chats": serialized_chats} | |
| except Exception as e: | |
| logger.error(f"獲取用戶對話時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def get_chat(chat_id): | |
| if chats_collection is None: | |
| logger.error("Firestore尚未連接,無法獲取對話") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _get_doc(): | |
| doc = chats_collection.document(chat_id).get() | |
| return doc if doc.exists else None | |
| doc = await _asyncio.to_thread(_get_doc) | |
| if not doc: | |
| logger.warning(f"對話 {chat_id} 不存在") | |
| return {"success": False, "error": "對話不存在"} | |
| chat = doc.to_dict() or {} | |
| chat["chat_id"] = doc.id | |
| # 從 chat 子集合讀取完整對話(按時間升序) | |
| try: | |
| def _fetch_msgs(): | |
| ref = _get_chat_messages_collection(chat_id) | |
| return [ | |
| {**doc.to_dict(), "id": doc.id} | |
| for doc in ref.order_by("timestamp").stream() | |
| ] | |
| msgs = await _asyncio.to_thread(_fetch_msgs) | |
| chat["messages"] = msgs | |
| logger.info(f"獲取到對話 {chat_id},包含 {len(msgs)} 條消息(chat 子集合)") | |
| except Exception as _e: | |
| # 向後相容:若讀取失敗,退回文件內嵌 messages(若存在) | |
| msgs_fallback = chat.get('messages', []) or [] | |
| chat["messages"] = msgs_fallback | |
| logger.warning(f"讀取 chat 子集合失敗,使用內嵌 messages。原因: {_e}") | |
| return {"success": True, "chat": chat} | |
| except Exception as e: | |
| logger.error(f"獲取對話時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def save_chat_message(chat_id, sender, content): | |
| """保存對話消息(chat/{chat_id}/messages 子集合作為主要儲存)""" | |
| if chats_collection is None: | |
| logger.error("Firestore尚未連接,無法保存消息") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| now = datetime.now() | |
| message = { | |
| "chat_id": chat_id, | |
| "sender": sender, | |
| "content": content, | |
| "timestamp": now, | |
| } | |
| def _write_message(): | |
| ref = _get_chat_messages_collection(chat_id) | |
| ref.add(message) | |
| def _write_legacy_copy(): | |
| if messages_collection is None: | |
| return | |
| try: | |
| messages_collection.add(message) | |
| except Exception as legacy_err: # pragma: no cover | |
| logger.debug(f"寫入頂層 messages 集合失敗(兼容用途,可忽略): {legacy_err}") | |
| def _touch_chat(): | |
| doc_ref = chats_collection.document(chat_id) | |
| snap = doc_ref.get() | |
| if not snap.exists: | |
| return False | |
| doc_ref.update({"updated_at": now}) | |
| return True | |
| await _asyncio.to_thread(_write_message) | |
| # 兼容舊資料模型:非阻塞地寫入頂層 messages 集合,供舊功能查詢使用 | |
| await _asyncio.to_thread(_write_legacy_copy) | |
| touched = await _asyncio.to_thread(_touch_chat) | |
| if not touched: | |
| logger.warning(f"對話 {chat_id} 不存在,但消息已寫入 chat 子集合") | |
| logger.info(f"消息已保存到 chat 子集合(chat_id={chat_id})") | |
| return {"success": True, "message": message} | |
| except Exception as e: | |
| logger.error(f"保存消息時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def get_chat_messages(chat_id: str, limit: int | None = None, ascending: bool = True): | |
| """讀取指定對話的消息(優先使用 chat 子集合)""" | |
| if chats_collection is None: | |
| logger.error("Firestore尚未連接,無法讀取消息") | |
| return [] | |
| try: | |
| import asyncio as _asyncio | |
| from google.cloud import firestore as _fs | |
| def _query(): | |
| ref = _get_chat_messages_collection(chat_id) | |
| direction = _fs.Query.ASCENDING if ascending else _fs.Query.DESCENDING | |
| q = ref.order_by("timestamp", direction=direction) | |
| if limit and limit > 0: | |
| q = q.limit(limit) | |
| docs = q.stream() | |
| records = [] | |
| for doc in docs: | |
| data = doc.to_dict() | |
| data["id"] = doc.id | |
| records.append(data) | |
| if not ascending: | |
| records = list(reversed(records)) | |
| return records | |
| messages = await _asyncio.to_thread(_query) | |
| if messages: | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| return _serialize_firestore_data(messages) | |
| # 向後相容:若子集合無資料,嘗試讀取舊頂層 messages 集合 | |
| if messages_collection is None: | |
| return [] | |
| def _legacy_query(): | |
| docs = messages_collection.where(filter=FieldFilter("chat_id", "==", chat_id)).stream() | |
| legacy = [d.to_dict() for d in docs] | |
| legacy.sort(key=lambda item: item.get("timestamp")) | |
| if limit and limit > 0: | |
| legacy = legacy[:limit] | |
| return legacy | |
| legacy_sorted = await _asyncio.to_thread(_legacy_query) | |
| view_messages = list(legacy_sorted) | |
| if not ascending: | |
| view_messages.reverse() | |
| if legacy_sorted: | |
| def _backfill(): | |
| try: | |
| ref = _get_chat_messages_collection(chat_id) | |
| # 若子集合仍為空,將舊資料搬遷過去 | |
| has_existing = any(True for _ in ref.limit(1).stream()) | |
| if has_existing: | |
| return | |
| for legacy_msg in legacy_sorted: | |
| ref.add(legacy_msg) | |
| logger.info(f"已將 legacy messages 回填至 chat 子集合(chat_id={chat_id})") | |
| except Exception as backfill_err: | |
| logger.warning(f"回填 legacy messages 失敗(可忽略): {backfill_err}") | |
| await _asyncio.to_thread(_backfill) | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| return _serialize_firestore_data(view_messages) | |
| except Exception as e: | |
| logger.error(f"讀取對話消息失敗: {e}") | |
| return [] | |
| async def update_chat_title(chat_id, title): | |
| if chats_collection is None: | |
| logger.error("Firestore尚未連接,無法更新對話標題") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _update_doc(): | |
| doc_ref = chats_collection.document(chat_id) | |
| doc = doc_ref.get() | |
| if not doc.exists: | |
| return False | |
| doc_ref.update({ | |
| "title": title, | |
| "updated_at": datetime.now(), | |
| }) | |
| return True | |
| updated = await _asyncio.to_thread(_update_doc) | |
| if not updated: | |
| logger.warning(f"對話 {chat_id} 不存在,無法更新標題") | |
| return {"success": False, "error": "對話不存在"} | |
| logger.info(f"對話 {chat_id} 標題已更新為 '{title}'") | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error(f"更新對話標題時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def delete_chat(chat_id): | |
| if chats_collection is None: | |
| logger.error("Firestore尚未連接,無法刪除對話") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _delete_doc(): | |
| doc_ref = chats_collection.document(chat_id) | |
| doc = doc_ref.get() | |
| if not doc.exists: | |
| return False | |
| # 先刪除子集合中的消息,避免孤兒資料 | |
| try: | |
| messages_ref = _get_chat_messages_collection(chat_id) | |
| for msg_snapshot in messages_ref.stream(): | |
| msg_snapshot.reference.delete() | |
| except Exception as msg_err: | |
| logger.warning(f"刪除對話 {chat_id} 的子消息時發生錯誤:{msg_err}") | |
| doc_ref.delete() | |
| return True | |
| deleted = await _asyncio.to_thread(_delete_doc) | |
| if not deleted: | |
| logger.warning(f"對話 {chat_id} 不存在,無法刪除") | |
| return {"success": False, "error": "對話不存在"} | |
| logger.info(f"對話 {chat_id} 已刪除") | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error(f"刪除對話時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| # ===== 對話情緒記憶 ===== | |
| async def set_chat_emotion(chat_id: str, emotion: dict): | |
| """為指定對話記錄最近的情緒狀態(label, confidence, timestamp)。""" | |
| if chats_collection is None: | |
| logger.error("Firestore尚未連接,無法設定對話情緒") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _update_doc(): | |
| doc_ref = chats_collection.document(chat_id) | |
| doc = doc_ref.get() | |
| if not doc.exists: | |
| return False | |
| payload = { | |
| "label": emotion.get("label"), | |
| "confidence": emotion.get("confidence"), | |
| "timestamp": datetime.now(), | |
| } | |
| doc_ref.update({ | |
| "context.emotion": payload, | |
| "updated_at": datetime.now(), | |
| }) | |
| return True | |
| updated = await _asyncio.to_thread(_update_doc) | |
| if not updated: | |
| return {"success": False, "error": "對話不存在"} | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error(f"設定對話情緒時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def get_chat_emotion(chat_id: str): | |
| """取得對話記錄的最近情緒狀態。""" | |
| if chats_collection is None: | |
| logger.error("Firestore尚未連接,無法讀取對話情緒") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _get_doc(): | |
| doc = chats_collection.document(chat_id).get() | |
| return doc if doc.exists else None | |
| doc = await _asyncio.to_thread(_get_doc) | |
| if not doc: | |
| return {"success": False, "error": "對話不存在"} | |
| data = doc.to_dict() or {} | |
| emotion = (data.get("context") or {}).get("emotion") | |
| return {"success": True, "emotion": emotion} | |
| except Exception as e: | |
| logger.error(f"讀取對話情緒時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| # ===== 語音登入:使用者與說話者標籤關聯 ===== | |
| async def set_user_speaker_label(user_id: str, speaker_label: str): | |
| if users_collection is None: | |
| logger.error("Firestore尚未連接,無法設定語音標籤") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _is_label_taken(): | |
| docs = list(users_collection.where(filter=FieldFilter("speaker_label", "==", speaker_label)).limit(1).stream()) | |
| return docs[0] if docs else None | |
| existing_label = await _asyncio.to_thread(_is_label_taken) | |
| if existing_label and existing_label.to_dict().get("user_id") != user_id: | |
| return {"success": False, "error": "SPEAKER_LABEL_TAKEN"} | |
| def _update_user(): | |
| doc_ref = users_collection.document(user_id) | |
| doc = doc_ref.get() | |
| if not doc.exists: | |
| return False | |
| doc_ref.update({"speaker_label": speaker_label}) | |
| return True | |
| updated = await _asyncio.to_thread(_update_user) | |
| if not updated: | |
| return {"success": False, "error": "USER_NOT_FOUND"} | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error(f"設定語音標籤時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def get_user_by_speaker_label(speaker_label: str): | |
| if users_collection is None: | |
| logger.error("Firestore尚未連接,無法查詢語音標籤") | |
| return None | |
| try: | |
| import asyncio as _asyncio | |
| def _fetch_user(): | |
| docs = list(users_collection.where(filter=FieldFilter("speaker_label", "==", speaker_label)).limit(1).stream()) | |
| return docs[0] if docs else None | |
| doc = await _asyncio.to_thread(_fetch_user) | |
| if not doc: | |
| return None | |
| data = doc.to_dict() | |
| result = { | |
| "id": data.get("user_id"), | |
| "name": data.get("name", ""), | |
| "email": data.get("email", ""), | |
| "created_at": data.get("created_at"), | |
| } | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| return _serialize_firestore_data(result) | |
| except Exception as e: | |
| logger.error(f"查詢語音標籤對應用戶時發生錯誤: {e}") | |
| return None | |
| # ===== 專門記憶系統 ===== | |
| async def save_memory( | |
| user_id: str, | |
| memory_type: str, | |
| content: str, | |
| importance: float = 1.0, | |
| metadata: dict | None = None, | |
| ) -> Dict[str, Any]: | |
| """保存重要記憶到 Firestore""" | |
| if users_collection is None or firestore_db is None: | |
| logger.error("Firestore尚未連接,無法保存記憶") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| now = datetime.now() | |
| sanitized_importance = max(0.0, min(1.0, importance)) | |
| metadata_payload = metadata.copy() if metadata else {} | |
| metadata_payload.setdefault("source", "unknown") | |
| metadata_payload.setdefault("last_updated_by", "memory_system") | |
| metadata_payload["updated_at"] = now.isoformat() | |
| context_tags = metadata_payload.get("context_tags", []) | |
| if not isinstance(context_tags, list): | |
| context_tags = list(context_tags) if context_tags else [] | |
| metadata_payload["context_tags"] = context_tags | |
| triggers = metadata_payload.get("triggers", []) | |
| if not isinstance(triggers, list): | |
| triggers = list(triggers) if triggers else [] | |
| metadata_payload["triggers"] = triggers | |
| col_ref = _get_user_memories_collection(user_id) | |
| content_hash = hashlib.sha1(content.strip().lower().encode("utf-8")).hexdigest() | |
| def _ensure_user_stub(): | |
| user_doc = _get_user_doc_ref(user_id) | |
| snap = user_doc.get() | |
| if not snap.exists: | |
| user_doc.set( | |
| { | |
| "user_id": user_id, | |
| "created_at": now, | |
| "updated_at": now, | |
| }, | |
| merge=True, | |
| ) | |
| def _find_existing(): | |
| docs = ( | |
| col_ref.where(filter=FieldFilter("content_hash", "==", content_hash)) | |
| .limit(1) | |
| .stream() | |
| ) | |
| for doc in docs: | |
| return doc | |
| return None | |
| await _asyncio.to_thread(_ensure_user_stub) | |
| existing_doc = await _asyncio.to_thread(_find_existing) | |
| if existing_doc: | |
| doc_ref = existing_doc.reference | |
| def _update_memory(): | |
| doc_ref.update({ | |
| "content": content, | |
| "importance": sanitized_importance, | |
| "metadata": metadata_payload, | |
| "updated_at": now, | |
| "access_count": Increment(1), | |
| "last_accessed": now, | |
| "content_hash": content_hash, | |
| }) | |
| await _asyncio.to_thread(_update_memory) | |
| logger.info(f"更新用戶 {user_id} 的記憶: {memory_type}") | |
| return {"success": True, "action": "updated", "memory_id": existing_doc.id} | |
| def _create_memory(): | |
| doc_ref = col_ref.document() | |
| doc_ref.set({ | |
| "user_id": user_id, | |
| "type": memory_type, | |
| "content": content, | |
| "importance": sanitized_importance, | |
| "metadata": metadata_payload, | |
| "access_count": 0, | |
| "last_accessed": now, | |
| "updated_at": now, | |
| "created_at": now, | |
| "content_hash": content_hash, | |
| }) | |
| return doc_ref.id | |
| memory_id = await _asyncio.to_thread(_create_memory) | |
| await _asyncio.to_thread(_enforce_memory_quota, col_ref) | |
| logger.info(f"保存用戶 {user_id} 的新記憶: {memory_type}") | |
| return {"success": True, "action": "created", "memory_id": memory_id} | |
| except Exception as e: | |
| logger.error(f"保存記憶時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| # ===== 環境 Context(位置/方位/時序) ===== | |
| async def set_user_env_current(user_id: str, ctx: Dict[str, Any]) -> Dict[str, Any]: | |
| """更新使用者環境現況 users/{uid}/context/current(含 TTL/新鮮度由讀取端判斷)。""" | |
| if users_collection is None: | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| now = datetime.now() | |
| def _update(): | |
| user_doc = _get_user_doc_ref(user_id) | |
| ctx_ref = user_doc.collection('context').document('current') | |
| payload = ctx.copy() | |
| payload['updated_at'] = now | |
| ctx_ref.set(payload, merge=True) | |
| return True | |
| await _asyncio.to_thread(_update) | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error(f"更新環境現況失敗: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def add_user_env_snapshot(user_id: str, snapshot: Dict[str, Any]) -> Dict[str, Any]: | |
| """新增使用者環境快照 users/{uid}/context/snapshots。僅保留短期歷史。""" | |
| if users_collection is None: | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| now = datetime.now() | |
| def _write(): | |
| user_doc = _get_user_doc_ref(user_id) | |
| col = user_doc.collection('context').document('meta').collection('snapshots') | |
| payload = snapshot.copy() | |
| payload['created_at'] = now | |
| col.add(payload) | |
| return True | |
| await _asyncio.to_thread(_write) | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error(f"寫入環境快照失敗: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def get_user_env_current(user_id: str) -> Dict[str, Any]: | |
| """讀取使用者環境現況。""" | |
| if users_collection is None: | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _read(): | |
| user_doc = _get_user_doc_ref(user_id) | |
| ctx_ref = user_doc.collection('context').document('current') | |
| snap = ctx_ref.get() | |
| return snap.to_dict() if snap.exists else None | |
| data = await _asyncio.to_thread(_read) | |
| if not data: | |
| return {"success": False, "error": "NOT_FOUND"} | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| serialized_data = _serialize_firestore_data(data) | |
| return {"success": True, "context": serialized_data} | |
| except Exception as e: | |
| logger.error(f"讀取環境現況失敗: {e}") | |
| return {"success": False, "error": str(e)} | |
| # ===== 反地理/路線 全域快取集合 ===== | |
| async def get_geo_cache(geohash7: str) -> Optional[Dict[str, Any]]: | |
| if geo_cache_collection is None: | |
| return None | |
| try: | |
| import asyncio as _asyncio | |
| def _read(): | |
| doc = geo_cache_collection.document(geohash7).get() | |
| return doc.to_dict() if doc.exists else None | |
| result = await _asyncio.to_thread(_read) | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| return _serialize_firestore_data(result) if result else None | |
| except Exception as e: | |
| logger.warning(f"讀取 geo_cache 失敗: {e}") | |
| return None | |
| async def set_geo_cache(geohash7: str, payload: Dict[str, Any]) -> bool: | |
| if geo_cache_collection is None: | |
| return False | |
| try: | |
| import asyncio as _asyncio | |
| now = datetime.now() | |
| def _write(): | |
| data = payload.copy() | |
| data['cached_at'] = now | |
| geo_cache_collection.document(geohash7).set(data, merge=True) | |
| return True | |
| return await _asyncio.to_thread(_write) | |
| except Exception as e: | |
| logger.warning(f"寫入 geo_cache 失敗: {e}") | |
| return False | |
| async def get_route_cache(key: str) -> Optional[Dict[str, Any]]: | |
| if route_cache_collection is None: | |
| return None | |
| try: | |
| import asyncio as _asyncio | |
| def _read(): | |
| doc = route_cache_collection.document(key).get() | |
| return doc.to_dict() if doc.exists else None | |
| result = await _asyncio.to_thread(_read) | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| return _serialize_firestore_data(result) if result else None | |
| except Exception as e: | |
| logger.warning(f"讀取 route_cache 失敗: {e}") | |
| return None | |
| async def set_route_cache(key: str, payload: Dict[str, Any]) -> bool: | |
| if route_cache_collection is None: | |
| return False | |
| try: | |
| import asyncio as _asyncio | |
| now = datetime.now() | |
| def _write(): | |
| data = payload.copy() | |
| data['cached_at'] = now | |
| route_cache_collection.document(key).set(data, merge=True) | |
| return True | |
| return await _asyncio.to_thread(_write) | |
| except Exception as e: | |
| logger.warning(f"寫入 route_cache 失敗: {e}") | |
| return False | |
| async def get_user_memories( | |
| user_id: str, | |
| memory_type: str | None = None, | |
| limit: int = 10, | |
| min_importance: float = 0.0, | |
| ) -> Dict[str, Any]: | |
| """獲取用戶的記憶""" | |
| if users_collection is None: | |
| logger.error("Firestore尚未連接,無法獲取記憶") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _fetch_memories(): | |
| col_ref = _get_user_memories_collection(user_id) | |
| query = col_ref.where(filter=FieldFilter("importance", ">=", min_importance)) | |
| if memory_type: | |
| query = query.where(filter=FieldFilter("type", "==", memory_type)) | |
| docs = ( | |
| query.order_by("importance", direction=firestore.Query.DESCENDING) | |
| .order_by("updated_at", direction=firestore.Query.DESCENDING) | |
| .limit(limit) | |
| .stream() | |
| ) | |
| return [doc.to_dict() | {"memory_id": doc.id} for doc in docs] | |
| memories = await _asyncio.to_thread(_fetch_memories) | |
| def _mark_accessed(mem_ids): | |
| col_ref = _get_user_memories_collection(user_id) | |
| now_inner = datetime.now() | |
| for mid in mem_ids: | |
| col_ref.document(mid).update({ | |
| "access_count": Increment(1), | |
| "last_accessed": now_inner, | |
| }) | |
| if memories: | |
| await _asyncio.to_thread(_mark_accessed, [m["memory_id"] for m in memories]) | |
| logger.info(f"獲取到用戶 {user_id} 的 {len(memories)} 條記憶") | |
| # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂 | |
| serialized_memories = _serialize_firestore_data(memories) | |
| return {"success": True, "memories": serialized_memories} | |
| except Exception as e: | |
| logger.error(f"獲取記憶時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def search_memories(user_id: str, query_text: str, limit: int = 5) -> Dict[str, Any]: | |
| """基於簡易文本匹配的記憶搜索""" | |
| if users_collection is None: | |
| logger.error("Firestore尚未連接,無法搜索記憶") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| normalized_query = query_text.lower() | |
| def _candidate_memories(): | |
| col_ref = _get_user_memories_collection(user_id) | |
| docs = ( | |
| col_ref.order_by("updated_at", direction=firestore.Query.DESCENDING) | |
| .limit(80) | |
| .stream() | |
| ) | |
| results = [] | |
| for doc in docs: | |
| data = doc.to_dict() or {} | |
| haystack = "{} {}".format( | |
| data.get("content", ""), | |
| " ".join(data.get("metadata", {}).get("context_tags", [])), | |
| ).lower() | |
| if normalized_query in haystack: | |
| data["memory_id"] = doc.id | |
| results.append(data) | |
| if len(results) >= limit: | |
| break | |
| return results | |
| memories = await _asyncio.to_thread(_candidate_memories) | |
| def _mark_accessed(mem_ids): | |
| col_ref = _get_user_memories_collection(user_id) | |
| now_inner = datetime.now() | |
| for mid in mem_ids: | |
| col_ref.document(mid).update({ | |
| "access_count": Increment(1), | |
| "last_accessed": now_inner, | |
| }) | |
| if memories: | |
| await _asyncio.to_thread(_mark_accessed, [m["memory_id"] for m in memories]) | |
| logger.info(f"搜索到用戶 {user_id} 的 {len(memories)} 條相關記憶") | |
| return {"success": True, "memories": memories} | |
| except Exception as e: | |
| logger.error(f"搜索記憶時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def update_memory_importance(memory_id: str, new_importance: float): | |
| """更新記憶的重要性分數""" | |
| if users_collection is None: | |
| logger.error("Firestore尚未連接,無法更新記憶") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| sanitized_importance = max(0.0, min(1.0, new_importance)) | |
| now = datetime.now() | |
| def _update_doc(): | |
| users = users_collection.stream() | |
| for user_doc in users: | |
| mem_ref = user_doc.reference.collection("memories").document(memory_id) | |
| snapshot = mem_ref.get() | |
| if snapshot.exists: | |
| mem_ref.update({ | |
| "importance": sanitized_importance, | |
| "updated_at": now, | |
| }) | |
| return True | |
| return False | |
| updated = await _asyncio.to_thread(_update_doc) | |
| if not updated: | |
| return {"success": False, "error": "記憶不存在"} | |
| logger.info(f"更新記憶 {memory_id} 的重要性為 {sanitized_importance}") | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error(f"更新記憶重要性時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def delete_memory(memory_id: str): | |
| """刪除記憶""" | |
| if users_collection is None: | |
| logger.error("Firestore尚未連接,無法刪除記憶") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| def _delete_doc(): | |
| users = users_collection.stream() | |
| for user_doc in users: | |
| mem_ref = user_doc.reference.collection("memories").document(memory_id) | |
| snapshot = mem_ref.get() | |
| if snapshot.exists: | |
| mem_ref.delete() | |
| return True | |
| return False | |
| deleted = await _asyncio.to_thread(_delete_doc) | |
| if not deleted: | |
| return {"success": False, "error": "記憶不存在"} | |
| logger.info(f"刪除記憶 {memory_id}") | |
| return {"success": True} | |
| except Exception as e: | |
| logger.error(f"刪除記憶時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| async def cleanup_old_memories(user_id: str, days_old: int = 90, min_importance: float = 0.3): | |
| """清理舊的、低重要性的記憶 | |
| Args: | |
| user_id: 用戶ID | |
| days_old: 刪除多少天前的記憶 | |
| min_importance: 保留的最小重要性分數 | |
| """ | |
| if users_collection is None: | |
| logger.error("Firestore尚未連接,無法清理記憶") | |
| return {"success": False, "error": "數據庫未連接"} | |
| try: | |
| import asyncio as _asyncio | |
| from datetime import timedelta | |
| cutoff_date = datetime.now() - timedelta(days=days_old) | |
| def _delete_old(): | |
| col_ref = _get_user_memories_collection(user_id) | |
| docs = ( | |
| col_ref.where(filter=FieldFilter("importance", "<", min_importance)) | |
| .where(filter=FieldFilter("updated_at", "<", cutoff_date)) | |
| .stream() | |
| ) | |
| deleted_count = 0 | |
| for doc in docs: | |
| doc.reference.delete() | |
| deleted_count += 1 | |
| return deleted_count | |
| deleted = await _asyncio.to_thread(_delete_old) | |
| logger.info(f"為用戶 {user_id} 清理 {deleted} 條舊記憶") | |
| return {"success": True, "deleted": deleted} | |
| except Exception as e: | |
| logger.error(f"清理記憶時發生錯誤: {e}") | |
| return {"success": False, "error": str(e)} | |
| def _enforce_memory_quota(col_ref: CollectionReference) -> None: | |
| docs = list( | |
| col_ref.order_by("importance", direction=firestore.Query.ASCENDING) | |
| .order_by("updated_at", direction=firestore.Query.ASCENDING) | |
| .stream() | |
| ) | |
| if len(docs) <= MAX_MEMORIES_PER_USER: | |
| return | |
| excess = len(docs) - MAX_MEMORIES_PER_USER | |
| for doc in docs[:excess]: | |
| doc.reference.delete() | |