File size: 48,254 Bytes
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
36b622a
 
 
3f0377e
36b622a
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
801ae57
 
3f0377e
 
 
 
 
69fb140
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f0377e
 
 
 
 
 
 
 
 
f998449
 
 
 
 
 
3f0377e
 
 
 
 
 
 
 
6c78660
3f0377e
 
 
 
6c78660
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6c78660
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
801ae57
 
 
 
 
3f0377e
6c78660
3f0377e
 
 
 
6c78660
 
 
 
 
 
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
 
3f0377e
 
 
 
 
69fb140
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3c14891
3f0377e
 
 
 
 
 
 
69fb140
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
 
 
3f0377e
 
 
 
 
 
 
 
 
 
 
3c14891
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
 
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
801ae57
3f0377e
801ae57
f998449
801ae57
 
f998449
 
 
 
 
801ae57
 
 
f998449
801ae57
 
 
 
f998449
801ae57
3f0377e
 
 
 
 
 
f998449
 
3f0377e
 
 
 
 
801ae57
 
 
 
 
 
 
 
 
f998449
 
 
 
 
 
 
 
 
 
801ae57
 
3f0377e
801ae57
 
 
 
 
3f0377e
801ae57
f998449
 
801ae57
 
f998449
3f0377e
f998449
3f0377e
 
 
 
 
801ae57
 
f998449
 
801ae57
 
 
 
 
 
 
f998449
801ae57
f998449
801ae57
 
 
f998449
 
 
 
 
801ae57
f998449
 
801ae57
 
f998449
69fb140
 
f998449
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
 
801ae57
 
 
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f998449
 
 
 
 
 
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
3f0377e
 
 
 
 
69fb140
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3c14891
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
801ae57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
 
 
801ae57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
 
 
801ae57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
 
 
801ae57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3c14891
3f0377e
3c14891
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
 
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3c14891
 
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
import os
import sys
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
import firebase_admin
from firebase_admin import credentials, firestore
from google.cloud.firestore import DocumentReference, CollectionReference, FieldFilter
from google.cloud.firestore_v1 import ArrayUnion, Increment
import asyncio
import hashlib

# 設置日誌
LOG_LEVEL_NAME = os.getenv("BLOOMWARE_LOG_LEVEL", "WARNING").upper()
LOG_LEVEL = getattr(logging, LOG_LEVEL_NAME, logging.WARNING)
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Firestore")
logger.setLevel(LOG_LEVEL)

# 載入環境變數
load_dotenv()

# 統一配置管理
from core.config import settings

# 全局變數
firestore_db = None
users_collection = None
chats_collection = None
messages_collection = None
memories_collection = None
health_data_collection = None
device_bindings_collection = None
geo_cache_collection = None
route_cache_collection = None

# 記憶儲存相關設定
MAX_MEMORIES_PER_USER = 500


def _serialize_firestore_data(data: Any) -> Any:
    """
    遞迴轉換 Firestore 資料中的 DatetimeWithNanoseconds 物件為 ISO 字串

    Args:
        data: Firestore 回傳的資料(可能包含 DatetimeWithNanoseconds)

    Returns:
        JSON 可序列化的資料
    """
    from google.cloud.firestore_v1._helpers import DatetimeWithNanoseconds

    if isinstance(data, DatetimeWithNanoseconds):
        # 轉成 ISO 8601 字串
        return data.isoformat()
    elif isinstance(data, datetime):
        # 一般 Python datetime 也轉成字串
        return data.isoformat()
    elif isinstance(data, dict):
        # 遞迴處理字典
        return {k: _serialize_firestore_data(v) for k, v in data.items()}
    elif isinstance(data, list):
        # 遞迴處理列表
        return [_serialize_firestore_data(item) for item in data]
    else:
        # 其他型別直接回傳
        return data


def _get_user_doc_ref(user_id: str) -> DocumentReference:
    if users_collection is None:
        raise RuntimeError("Firestore尚未連接,無法操作使用者資料")
    return users_collection.document(user_id)


def _get_user_memories_collection(user_id: str) -> CollectionReference:
    return _get_user_doc_ref(user_id).collection("memories")


def _get_chat_messages_collection(chat_id: str) -> CollectionReference:
    if chats_collection is None:
        raise RuntimeError("Firestore尚未連接,無法取得對話消息集合")
    return chats_collection.document(chat_id).collection("messages")

def connect_to_firestore():
    """初始化 Firebase Firestore 連接"""
    global firestore_db, messages_collection, users_collection, chats_collection, memories_collection, health_data_collection, device_bindings_collection

    firebase_project_id = settings.FIREBASE_PROJECT_ID

    if not firebase_project_id:
        logger.error("Firebase專案ID未正確設置,請在.env文件中設置FIREBASE_PROJECT_ID環境變數")
        logger.error("\n❌ 錯誤: Firebase專案ID未設置!請在.env文件中設置FIREBASE_PROJECT_ID\n")
        return False

    try:
        logger.info("正在嘗試連接Firebase Firestore...")
        logger.info("\n🔄 正在連接Firebase Firestore數據庫...\n")

        # 檢查是否已經初始化 Firebase
        try:
            firebase_admin.get_app()
            logger.info("Firebase 已初始化,跳過重複初始化")
        except ValueError:
            # 從統一配置取得 Firebase 憑證(支援環境變數或檔案)
            try:
                firebase_creds_dict = settings.get_firebase_credentials()
                cred = credentials.Certificate(firebase_creds_dict)
                firebase_admin.initialize_app(cred, {
                    'projectId': firebase_project_id,
                })
                logger.info(f"Firebase 初始化成功(專案ID:{firebase_project_id})")
            except ValueError as e:
                logger.error(f"Firebase 憑證載入失敗: {e}")
                logger.error(f"\n❌ 錯誤: Firebase 憑證載入失敗!{e}\n")
                return False
        
        # 初始化 Firestore 客戶端
        firestore_db = firestore.client()
        
        # 測試連接
        test_doc = firestore_db.collection('_test_connection').document('test')
        test_doc.set({'timestamp': datetime.now(), 'test': True})
        test_doc.delete()  # 清理測試文檔
        
        # 初始化集合引用
        messages_collection = firestore_db.collection('messages')
        users_collection = firestore_db.collection('users')
        chats_collection = firestore_db.collection('chats')
        health_data_collection = firestore_db.collection('health_data')
        device_bindings_collection = firestore_db.collection('device_bindings')
        
        # 其他集合
        global geo_cache_collection, route_cache_collection
        geo_cache_collection = firestore_db.collection('geo_cache')
        route_cache_collection = firestore_db.collection('route_cache')
        
        logger.info(f"✅ Firestore連接成功,專案ID:{firebase_project_id}")
        logger.info(f"\n✅ Firebase Firestore連接成功!專案ID:{firebase_project_id}\n")
        return True
        
    except Exception as e:
        logger.error(f"Firebase Firestore連接失敗:{e}")
        logger.error(f"\n❌ Firebase Firestore連接失敗:{e}\n")
        logger.error("🔧 故障排除建議:")
        logger.error("1. 檢查網絡連接")
        logger.error("2. 確認Firebase服務帳戶金鑰文件路徑正確")
        logger.error("3. 驗證Firebase專案ID是否正確")
        logger.error("4. 確保Firestore Database已在Firebase Console中啟用")
        logger.error("5. 檢查服務帳戶權限是否包含Firestore權限")
        return False
def ensure_indexes():
    """Firestore 不需要手動創建索引,由 Google 自動優化"""
    logger.info("Firestore 自動處理索引優化,無需手動創建索引")



async def get_user_by_id(user_id: str):
    """根據使用者ID查找使用者,返回公共資訊"""
    if users_collection is None:
        logger.error("Firestore尚未連接,無法查找使用者")
        return None
    try:
        # Firestore 查詢 - 使用新語法
        query = users_collection.where(filter=FieldFilter("user_id", "==", user_id)).limit(1)
        docs = query.get()
        
        if not docs:
            return None
            
        user_doc = docs[0]
        user_data = user_doc.to_dict()

        result = {
            "id": user_data["user_id"],
            "name": user_data.get("name", ""),
            "email": user_data.get("email", ""),
            "created_at": user_data.get("created_at"),
        }
        # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
        return _serialize_firestore_data(result)
    except Exception as e:
        logger.error(f"查找使用者時發生錯誤: {e}")
        return None

# 已移除舊的內嵌測試函式 test_connection,避免在生產代碼夾雜測試邏輯

async def save_message(user_id, content, is_bot=False):
    """保存消息到數據庫"""
    if messages_collection is None:
        logger.error("Firestore尚未連接,無法保存消息")
        return False
    try:
        message = {
            "user_id": user_id,  # 使用user_id字段存儲用戶ID
            "content": content,
            "is_bot": is_bot,
            "timestamp": datetime.now(),
        }
        import asyncio as _asyncio
        await _asyncio.to_thread(lambda: messages_collection.add(message))
        logger.debug(f"消息已保存到 Firestore")
        return True
    except Exception as e:
        logger.error(f"保存消息時發生錯誤: {e}")
        return False

async def get_user_history(user_id, limit=20):
    """獲取用戶的歷史對話記錄"""
    if messages_collection is None:
        logger.error("Firestore尚未連接,無法獲取歷史記錄")
        return []
    try:
        import asyncio as _asyncio
        def _fetch_messages():
            docs = messages_collection.where(filter=FieldFilter("user_id", "==", user_id))\
                                    .order_by("timestamp")\
                                    .limit(limit)\
                                    .stream()
            return [doc.to_dict() for doc in docs]
        
        messages = await _asyncio.to_thread(_fetch_messages)
        logger.info(f"已獲取用戶 {user_id}{len(messages)} 條歷史記錄")
        # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
        return _serialize_firestore_data(messages)
    except Exception as e:
        logger.error(f"獲取歷史記錄時發生錯誤: {e}")
        return []

# Google OAuth 2.0 用戶認證
async def create_or_login_google_user(google_token_info):
    """Google OAuth 唯一登入入口,自動處理首次註冊和後續登入"""
    if users_collection is None or firestore_db is None:
        logger.error("Firestore尚未連接,無法處理用戶認證")
        return {"success": False, "error": "數據庫未連接"}

    # 檢查 Firestore 連接狀態
    try:
        logger.info("🔍 檢查 Firestore 連接狀態...")
        # 快速連接測試
        import asyncio as _asyncio
        def _test_connection():
            test_ref = firestore_db.collection('_connection_test').document('ping')
            test_ref.set({'ping': 'test'}, merge=True)
            test_ref.delete()
            return True

        await _asyncio.wait_for(
            _asyncio.to_thread(_test_connection),
            timeout=5.0  # 5秒連接測試超時
        )
        logger.info("✅ Firestore 連接正常")
    except _asyncio.TimeoutError:
        logger.error("❌ Firestore 連接測試超時")
        return {"success": False, "error": "數據庫連接超時"}
    except Exception as e:
        logger.error(f"❌ Firestore 連接測試失敗: {e}")
        return {"success": False, "error": f"數據庫連接異常: {str(e)}"}

    google_id = google_token_info.get("id") or google_token_info.get("sub")
    if not google_id:
        logger.error(f"Google用戶信息中缺少ID字段,收到的信息: {google_token_info}")
        return {"success": False, "error": "INVALID_GOOGLE_USER_INFO"}

    email = google_token_info.get("email")
    if not email:
        logger.error(f"Google用戶信息中缺少email字段,收到的信息: {google_token_info}")
        return {"success": False, "error": "INVALID_GOOGLE_USER_INFO"}

    logger.info(f"🔍 處理Google用戶: google_id={google_id}, email={email}")

    try:
        import asyncio as _asyncio

        def _fetch_existing_user():
            try:
                logger.info(f"🔍 查詢現有用戶: google_id={google_id}")
                # 使用新的 filter 語法
                query = users_collection.where(filter=FieldFilter("google_id", "==", google_id)).limit(1)
                docs = list(query.stream())
                logger.info(f"🔍 查詢結果: 找到 {len(docs)} 個用戶")
                return docs[0] if docs else None
            except Exception as e:
                error_msg = str(e).lower()
                if "quota" in error_msg or "exceeded" in error_msg:
                    logger.error(f"❌ Firestore 配額已超出限制: {e}")
                    raise Exception("FIRESTORE_QUOTA_EXCEEDED")
                else:
                    logger.error(f"❌ Firestore 查詢失敗: {e}")
                    raise e

        logger.info(f"📤 開始查詢用戶...")
        # 添加超時機制
        try:
            user_doc = await _asyncio.wait_for(
                _asyncio.to_thread(_fetch_existing_user),
                timeout=10.0  # 10秒超時
            )
            logger.info(f"🔍 用戶查詢完成: {'找到現有用戶' if user_doc else '未找到用戶'}")
        except _asyncio.TimeoutError:
            logger.error("❌ Firestore 查詢超時(10秒)")
            return {"success": False, "error": "數據庫查詢超時"}
        except Exception as e:
            error_str = str(e)
            if "FIRESTORE_QUOTA_EXCEEDED" in error_str:
                logger.error("❌ Firestore 每日配額已用完")
                return {
                    "success": False,
                    "error": "QUOTA_EXCEEDED",
                    "message": "Firestore 每日配額已用完,請稍後再試或聯繫管理員升級服務"
                }
            else:
                logger.error(f"❌ 用戶查詢異常: {e}")
                return {"success": False, "error": f"用戶查詢失敗: {str(e)}"}

        if user_doc:
            user_data = user_doc.to_dict()

            def _update_user():
                users_collection.document(user_doc.id).update({
                    "name": google_token_info.get("name", user_data.get("name")),
                    "picture": google_token_info.get("picture", user_data.get("picture")),
                    "last_login": datetime.now(),
                    "updated_at": datetime.now()
                })

            await _asyncio.to_thread(_update_user)

            logger.info(f"用戶 {email} 登入成功,user_id: {user_data.get('user_id')}")
            return {
                "success": True,
                "user": {
                    "id": user_data.get("user_id", google_id),
                    "name": user_data.get("name", ""),
                    "email": user_data.get("email", email),
                    "picture": user_data.get("picture"),
                    "created_at": user_data.get("created_at")
                },
                "is_new_user": False
            }

        logger.info(f"📤 創建新用戶...")
        now = datetime.now()
        new_user = {
            "user_id": google_id,
            "google_id": google_id,
            "email": email,
            "name": google_token_info.get("name", ""),
            "picture": google_token_info.get("picture"),
            "locale": google_token_info.get("locale", "zh-TW"),
            "first_login": now,
            "last_login": now,
            "created_at": now,
            "updated_at": now
        }

        logger.info(f"🔍 新用戶數據: {new_user}")
        logger.info(f"📤 寫入Firestore...")
        await _asyncio.to_thread(lambda: users_collection.document(google_id).set(new_user))
        logger.info(f"✅ 新用戶 {email} 註冊成功,user_id: {google_id}")

        return {
            "success": True,
            "user": {
                "id": google_id,
                "name": new_user["name"],
                "email": new_user["email"],
                "picture": new_user["picture"],
                "created_at": new_user["created_at"]
            },
            "is_new_user": True
        }

    except Exception as e:
        logger.error(f"❌ Google OAuth 認證時發生錯誤: {e}")
        logger.error(f"❌ 錯誤類型: {type(e).__name__}")
        logger.error(f"❌ 錯誤堆疊:", exc_info=True)
        return {"success": False, "error": str(e)}

# 對話管理
async def create_chat(user_id, title="新對話"):
    if chats_collection is None:
        logger.error("Firestore尚未連接,無法創建對話")
        return {"success": False, "error": "數據庫未連接"}
    try:
        chat = {
            "user_id": user_id,
            "title": title,
            "messages": [],
            "created_at": datetime.now(),
            "updated_at": datetime.now(),
        }
        import asyncio as _asyncio
        doc_ref = await _asyncio.to_thread(lambda: chats_collection.add(chat))
        chat_id = doc_ref[1].id
        logger.info(f"為用戶 {user_id} 創建了新對話,ID: {chat_id}")
        chat_info = {
            "chat_id": chat_id,
            "user_id": user_id,
            "title": title,
            "created_at": chat["created_at"],
            "updated_at": chat["updated_at"],
        }
        # 序列化時間物件,避免 JSON 序列化炸裂
        serialized_chat_info = _serialize_firestore_data(chat_info)
        return {"success": True, "chat": serialized_chat_info}
    except Exception as e:
        logger.error(f"創建對話時發生錯誤: {e}")
        return {"success": False, "error": str(e)}

async def get_user_chats(user_id):
    if chats_collection is None:
        logger.error("Firestore尚未連接,無法獲取對話")
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio
        def _fetch_chats():
            docs = chats_collection.where(filter=FieldFilter("user_id", "==", user_id))\
                                 .order_by("updated_at", direction=firestore.Query.DESCENDING)\
                                 .stream()
            chats = []
            for doc in docs:
                chat = doc.to_dict()
                chat["chat_id"] = doc.id
                if "user_id" in chat:
                    del chat["user_id"]
                if "messages" in chat:
                    del chat["messages"]
                if "created_at" in chat:
                    del chat["created_at"]
                chats.append(chat)
            return chats
        
        chats = await _asyncio.to_thread(_fetch_chats)
        logger.info(f"獲取到用戶 {user_id}{len(chats)} 個對話")
        # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
        serialized_chats = _serialize_firestore_data(chats)
        return {"success": True, "chats": serialized_chats}
    except Exception as e:
        logger.error(f"獲取用戶對話時發生錯誤: {e}")
        return {"success": False, "error": str(e)}

async def get_chat(chat_id):
    if chats_collection is None:
        logger.error("Firestore尚未連接,無法獲取對話")
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio
        def _get_doc():
            doc = chats_collection.document(chat_id).get()
            return doc if doc.exists else None
        
        doc = await _asyncio.to_thread(_get_doc)
        if not doc:
            logger.warning(f"對話 {chat_id} 不存在")
            return {"success": False, "error": "對話不存在"}
        
        chat = doc.to_dict() or {}
        chat["chat_id"] = doc.id

        # 從 chat 子集合讀取完整對話(按時間升序)
        try:
            def _fetch_msgs():
                ref = _get_chat_messages_collection(chat_id)
                return [
                    {**doc.to_dict(), "id": doc.id}
                    for doc in ref.order_by("timestamp").stream()
                ]

            msgs = await _asyncio.to_thread(_fetch_msgs)
            chat["messages"] = msgs
            logger.info(f"獲取到對話 {chat_id},包含 {len(msgs)} 條消息(chat 子集合)")
        except Exception as _e:
            # 向後相容:若讀取失敗,退回文件內嵌 messages(若存在)
            msgs_fallback = chat.get('messages', []) or []
            chat["messages"] = msgs_fallback
            logger.warning(f"讀取 chat 子集合失敗,使用內嵌 messages。原因: {_e}")

        return {"success": True, "chat": chat}
    except Exception as e:
        logger.error(f"獲取對話時發生錯誤: {e}")
        return {"success": False, "error": str(e)}

async def save_chat_message(chat_id, sender, content):
    """保存對話消息(chat/{chat_id}/messages 子集合作為主要儲存)"""
    if chats_collection is None:
        logger.error("Firestore尚未連接,無法保存消息")
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio

        now = datetime.now()
        message = {
            "chat_id": chat_id,
            "sender": sender,
            "content": content,
            "timestamp": now,
        }

        def _write_message():
            ref = _get_chat_messages_collection(chat_id)
            ref.add(message)

        def _write_legacy_copy():
            if messages_collection is None:
                return
            try:
                messages_collection.add(message)
            except Exception as legacy_err:  # pragma: no cover
                logger.debug(f"寫入頂層 messages 集合失敗(兼容用途,可忽略): {legacy_err}")

        def _touch_chat():
            doc_ref = chats_collection.document(chat_id)
            snap = doc_ref.get()
            if not snap.exists:
                return False
            doc_ref.update({"updated_at": now})
            return True

        await _asyncio.to_thread(_write_message)
        # 兼容舊資料模型:非阻塞地寫入頂層 messages 集合,供舊功能查詢使用
        await _asyncio.to_thread(_write_legacy_copy)
        touched = await _asyncio.to_thread(_touch_chat)
        if not touched:
            logger.warning(f"對話 {chat_id} 不存在,但消息已寫入 chat 子集合")

        logger.info(f"消息已保存到 chat 子集合(chat_id={chat_id})")
        return {"success": True, "message": message}
    except Exception as e:
        logger.error(f"保存消息時發生錯誤: {e}")
        return {"success": False, "error": str(e)}


async def get_chat_messages(chat_id: str, limit: int | None = None, ascending: bool = True):
    """讀取指定對話的消息(優先使用 chat 子集合)"""
    if chats_collection is None:
        logger.error("Firestore尚未連接,無法讀取消息")
        return []
    try:
        import asyncio as _asyncio
        from google.cloud import firestore as _fs

        def _query():
            ref = _get_chat_messages_collection(chat_id)
            direction = _fs.Query.ASCENDING if ascending else _fs.Query.DESCENDING
            q = ref.order_by("timestamp", direction=direction)
            if limit and limit > 0:
                q = q.limit(limit)
            docs = q.stream()
            records = []
            for doc in docs:
                data = doc.to_dict()
                data["id"] = doc.id
                records.append(data)
            if not ascending:
                records = list(reversed(records))
            return records

        messages = await _asyncio.to_thread(_query)
        if messages:
            # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
            return _serialize_firestore_data(messages)

        # 向後相容:若子集合無資料,嘗試讀取舊頂層 messages 集合
        if messages_collection is None:
            return []

        def _legacy_query():
            docs = messages_collection.where(filter=FieldFilter("chat_id", "==", chat_id)).stream()
            legacy = [d.to_dict() for d in docs]
            legacy.sort(key=lambda item: item.get("timestamp"))
            if limit and limit > 0:
                legacy = legacy[:limit]
            return legacy

        legacy_sorted = await _asyncio.to_thread(_legacy_query)
        view_messages = list(legacy_sorted)
        if not ascending:
            view_messages.reverse()
        if legacy_sorted:
            def _backfill():
                try:
                    ref = _get_chat_messages_collection(chat_id)
                    # 若子集合仍為空,將舊資料搬遷過去
                    has_existing = any(True for _ in ref.limit(1).stream())
                    if has_existing:
                        return
                    for legacy_msg in legacy_sorted:
                        ref.add(legacy_msg)
                    logger.info(f"已將 legacy messages 回填至 chat 子集合(chat_id={chat_id})")
                except Exception as backfill_err:
                    logger.warning(f"回填 legacy messages 失敗(可忽略): {backfill_err}")

            await _asyncio.to_thread(_backfill)
        # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
        return _serialize_firestore_data(view_messages)
    except Exception as e:
        logger.error(f"讀取對話消息失敗: {e}")
        return []

async def update_chat_title(chat_id, title):
    if chats_collection is None:
        logger.error("Firestore尚未連接,無法更新對話標題")
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio

        def _update_doc():
            doc_ref = chats_collection.document(chat_id)
            doc = doc_ref.get()
            if not doc.exists:
                return False
            doc_ref.update({
                "title": title,
                "updated_at": datetime.now(),
            })
            return True

        updated = await _asyncio.to_thread(_update_doc)
        if not updated:
            logger.warning(f"對話 {chat_id} 不存在,無法更新標題")
            return {"success": False, "error": "對話不存在"}

        logger.info(f"對話 {chat_id} 標題已更新為 '{title}'")
        return {"success": True}
    except Exception as e:
        logger.error(f"更新對話標題時發生錯誤: {e}")
        return {"success": False, "error": str(e)}

async def delete_chat(chat_id):
    if chats_collection is None:
        logger.error("Firestore尚未連接,無法刪除對話")
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio

        def _delete_doc():
            doc_ref = chats_collection.document(chat_id)
            doc = doc_ref.get()
            if not doc.exists:
                return False
            # 先刪除子集合中的消息,避免孤兒資料
            try:
                messages_ref = _get_chat_messages_collection(chat_id)
                for msg_snapshot in messages_ref.stream():
                    msg_snapshot.reference.delete()
            except Exception as msg_err:
                logger.warning(f"刪除對話 {chat_id} 的子消息時發生錯誤:{msg_err}")
            doc_ref.delete()
            return True

        deleted = await _asyncio.to_thread(_delete_doc)
        if not deleted:
            logger.warning(f"對話 {chat_id} 不存在,無法刪除")
            return {"success": False, "error": "對話不存在"}

        logger.info(f"對話 {chat_id} 已刪除")
        return {"success": True}
    except Exception as e:
        logger.error(f"刪除對話時發生錯誤: {e}")
        return {"success": False, "error": str(e)}


# ===== 對話情緒記憶 =====
async def set_chat_emotion(chat_id: str, emotion: dict):
    """為指定對話記錄最近的情緒狀態(label, confidence, timestamp)。"""
    if chats_collection is None:
        logger.error("Firestore尚未連接,無法設定對話情緒")
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio

        def _update_doc():
            doc_ref = chats_collection.document(chat_id)
            doc = doc_ref.get()
            if not doc.exists:
                return False
            payload = {
                "label": emotion.get("label"),
                "confidence": emotion.get("confidence"),
                "timestamp": datetime.now(),
            }
            doc_ref.update({
                "context.emotion": payload,
                "updated_at": datetime.now(),
            })
            return True

        updated = await _asyncio.to_thread(_update_doc)
        if not updated:
            return {"success": False, "error": "對話不存在"}
        return {"success": True}
    except Exception as e:
        logger.error(f"設定對話情緒時發生錯誤: {e}")
        return {"success": False, "error": str(e)}

async def get_chat_emotion(chat_id: str):
    """取得對話記錄的最近情緒狀態。"""
    if chats_collection is None:
        logger.error("Firestore尚未連接,無法讀取對話情緒")
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio

        def _get_doc():
            doc = chats_collection.document(chat_id).get()
            return doc if doc.exists else None

        doc = await _asyncio.to_thread(_get_doc)
        if not doc:
            return {"success": False, "error": "對話不存在"}
        data = doc.to_dict() or {}
        emotion = (data.get("context") or {}).get("emotion")
        return {"success": True, "emotion": emotion}
    except Exception as e:
        logger.error(f"讀取對話情緒時發生錯誤: {e}")
        return {"success": False, "error": str(e)}

# ===== 語音登入:使用者與說話者標籤關聯 =====
async def set_user_speaker_label(user_id: str, speaker_label: str):
    if users_collection is None:
        logger.error("Firestore尚未連接,無法設定語音標籤")
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio

        def _is_label_taken():
            docs = list(users_collection.where(filter=FieldFilter("speaker_label", "==", speaker_label)).limit(1).stream())
            return docs[0] if docs else None

        existing_label = await _asyncio.to_thread(_is_label_taken)
        if existing_label and existing_label.to_dict().get("user_id") != user_id:
            return {"success": False, "error": "SPEAKER_LABEL_TAKEN"}

        def _update_user():
            doc_ref = users_collection.document(user_id)
            doc = doc_ref.get()
            if not doc.exists:
                return False
            doc_ref.update({"speaker_label": speaker_label})
            return True

        updated = await _asyncio.to_thread(_update_user)
        if not updated:
            return {"success": False, "error": "USER_NOT_FOUND"}

        return {"success": True}
    except Exception as e:
        logger.error(f"設定語音標籤時發生錯誤: {e}")
        return {"success": False, "error": str(e)}


async def get_user_by_speaker_label(speaker_label: str):
    if users_collection is None:
        logger.error("Firestore尚未連接,無法查詢語音標籤")
        return None
    try:
        import asyncio as _asyncio

        def _fetch_user():
            docs = list(users_collection.where(filter=FieldFilter("speaker_label", "==", speaker_label)).limit(1).stream())
            return docs[0] if docs else None

        doc = await _asyncio.to_thread(_fetch_user)
        if not doc:
            return None

        data = doc.to_dict()
        result = {
            "id": data.get("user_id"),
            "name": data.get("name", ""),
            "email": data.get("email", ""),
            "created_at": data.get("created_at"),
        }
        # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
        return _serialize_firestore_data(result)
    except Exception as e:
        logger.error(f"查詢語音標籤對應用戶時發生錯誤: {e}")
        return None


# ===== 專門記憶系統 =====

async def save_memory(
    user_id: str,
    memory_type: str,
    content: str,
    importance: float = 1.0,
    metadata: dict | None = None,
) -> Dict[str, Any]:
    """保存重要記憶到 Firestore"""
    if users_collection is None or firestore_db is None:
        logger.error("Firestore尚未連接,無法保存記憶")
        return {"success": False, "error": "數據庫未連接"}

    try:
        import asyncio as _asyncio

        now = datetime.now()
        sanitized_importance = max(0.0, min(1.0, importance))
        metadata_payload = metadata.copy() if metadata else {}
        metadata_payload.setdefault("source", "unknown")
        metadata_payload.setdefault("last_updated_by", "memory_system")
        metadata_payload["updated_at"] = now.isoformat()

        context_tags = metadata_payload.get("context_tags", [])
        if not isinstance(context_tags, list):
            context_tags = list(context_tags) if context_tags else []
        metadata_payload["context_tags"] = context_tags

        triggers = metadata_payload.get("triggers", [])
        if not isinstance(triggers, list):
            triggers = list(triggers) if triggers else []
        metadata_payload["triggers"] = triggers

        col_ref = _get_user_memories_collection(user_id)
        content_hash = hashlib.sha1(content.strip().lower().encode("utf-8")).hexdigest()

        def _ensure_user_stub():
            user_doc = _get_user_doc_ref(user_id)
            snap = user_doc.get()
            if not snap.exists:
                user_doc.set(
                    {
                        "user_id": user_id,
                        "created_at": now,
                        "updated_at": now,
                    },
                    merge=True,
                )

        def _find_existing():
            docs = (
                col_ref.where(filter=FieldFilter("content_hash", "==", content_hash))
                .limit(1)
                .stream()
            )
            for doc in docs:
                return doc
            return None

        await _asyncio.to_thread(_ensure_user_stub)
        existing_doc = await _asyncio.to_thread(_find_existing)

        if existing_doc:
            doc_ref = existing_doc.reference

            def _update_memory():
                doc_ref.update({
                    "content": content,
                    "importance": sanitized_importance,
                    "metadata": metadata_payload,
                    "updated_at": now,
                    "access_count": Increment(1),
                    "last_accessed": now,
                    "content_hash": content_hash,
                })

            await _asyncio.to_thread(_update_memory)
            logger.info(f"更新用戶 {user_id} 的記憶: {memory_type}")
            return {"success": True, "action": "updated", "memory_id": existing_doc.id}

        def _create_memory():
            doc_ref = col_ref.document()
            doc_ref.set({
                "user_id": user_id,
                "type": memory_type,
                "content": content,
                "importance": sanitized_importance,
                "metadata": metadata_payload,
                "access_count": 0,
                "last_accessed": now,
                "updated_at": now,
                "created_at": now,
                "content_hash": content_hash,
            })
            return doc_ref.id

        memory_id = await _asyncio.to_thread(_create_memory)
        await _asyncio.to_thread(_enforce_memory_quota, col_ref)
        logger.info(f"保存用戶 {user_id} 的新記憶: {memory_type}")
        return {"success": True, "action": "created", "memory_id": memory_id}

    except Exception as e:
        logger.error(f"保存記憶時發生錯誤: {e}")
        return {"success": False, "error": str(e)}


# ===== 環境 Context(位置/方位/時序) =====

async def set_user_env_current(user_id: str, ctx: Dict[str, Any]) -> Dict[str, Any]:
    """更新使用者環境現況 users/{uid}/context/current(含 TTL/新鮮度由讀取端判斷)。"""
    if users_collection is None:
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio
        now = datetime.now()

        def _update():
            user_doc = _get_user_doc_ref(user_id)
            ctx_ref = user_doc.collection('context').document('current')
            payload = ctx.copy()
            payload['updated_at'] = now
            ctx_ref.set(payload, merge=True)
            return True

        await _asyncio.to_thread(_update)
        return {"success": True}
    except Exception as e:
        logger.error(f"更新環境現況失敗: {e}")
        return {"success": False, "error": str(e)}


async def add_user_env_snapshot(user_id: str, snapshot: Dict[str, Any]) -> Dict[str, Any]:
    """新增使用者環境快照 users/{uid}/context/snapshots。僅保留短期歷史。"""
    if users_collection is None:
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio
        now = datetime.now()

        def _write():
            user_doc = _get_user_doc_ref(user_id)
            col = user_doc.collection('context').document('meta').collection('snapshots')
            payload = snapshot.copy()
            payload['created_at'] = now
            col.add(payload)
            return True

        await _asyncio.to_thread(_write)
        return {"success": True}
    except Exception as e:
        logger.error(f"寫入環境快照失敗: {e}")
        return {"success": False, "error": str(e)}


async def get_user_env_current(user_id: str) -> Dict[str, Any]:
    """讀取使用者環境現況。"""
    if users_collection is None:
        return {"success": False, "error": "數據庫未連接"}
    try:
        import asyncio as _asyncio

        def _read():
            user_doc = _get_user_doc_ref(user_id)
            ctx_ref = user_doc.collection('context').document('current')
            snap = ctx_ref.get()
            return snap.to_dict() if snap.exists else None

        data = await _asyncio.to_thread(_read)
        if not data:
            return {"success": False, "error": "NOT_FOUND"}
        # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
        serialized_data = _serialize_firestore_data(data)
        return {"success": True, "context": serialized_data}
    except Exception as e:
        logger.error(f"讀取環境現況失敗: {e}")
        return {"success": False, "error": str(e)}


# ===== 反地理/路線 全域快取集合 =====

async def get_geo_cache(geohash7: str) -> Optional[Dict[str, Any]]:
    if geo_cache_collection is None:
        return None
    try:
        import asyncio as _asyncio
        def _read():
            doc = geo_cache_collection.document(geohash7).get()
            return doc.to_dict() if doc.exists else None
        result = await _asyncio.to_thread(_read)
        # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
        return _serialize_firestore_data(result) if result else None
    except Exception as e:
        logger.warning(f"讀取 geo_cache 失敗: {e}")
        return None


async def set_geo_cache(geohash7: str, payload: Dict[str, Any]) -> bool:
    if geo_cache_collection is None:
        return False
    try:
        import asyncio as _asyncio
        now = datetime.now()
        def _write():
            data = payload.copy()
            data['cached_at'] = now
            geo_cache_collection.document(geohash7).set(data, merge=True)
            return True
        return await _asyncio.to_thread(_write)
    except Exception as e:
        logger.warning(f"寫入 geo_cache 失敗: {e}")
        return False


async def get_route_cache(key: str) -> Optional[Dict[str, Any]]:
    if route_cache_collection is None:
        return None
    try:
        import asyncio as _asyncio
        def _read():
            doc = route_cache_collection.document(key).get()
            return doc.to_dict() if doc.exists else None
        result = await _asyncio.to_thread(_read)
        # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
        return _serialize_firestore_data(result) if result else None
    except Exception as e:
        logger.warning(f"讀取 route_cache 失敗: {e}")
        return None


async def set_route_cache(key: str, payload: Dict[str, Any]) -> bool:
    if route_cache_collection is None:
        return False
    try:
        import asyncio as _asyncio
        now = datetime.now()
        def _write():
            data = payload.copy()
            data['cached_at'] = now
            route_cache_collection.document(key).set(data, merge=True)
            return True
        return await _asyncio.to_thread(_write)
    except Exception as e:
        logger.warning(f"寫入 route_cache 失敗: {e}")
        return False


async def get_user_memories(
    user_id: str,
    memory_type: str | None = None,
    limit: int = 10,
    min_importance: float = 0.0,
) -> Dict[str, Any]:
    """獲取用戶的記憶"""
    if users_collection is None:
        logger.error("Firestore尚未連接,無法獲取記憶")
        return {"success": False, "error": "數據庫未連接"}

    try:
        import asyncio as _asyncio

        def _fetch_memories():
            col_ref = _get_user_memories_collection(user_id)
            query = col_ref.where(filter=FieldFilter("importance", ">=", min_importance))
            if memory_type:
                query = query.where(filter=FieldFilter("type", "==", memory_type))
            docs = (
                query.order_by("importance", direction=firestore.Query.DESCENDING)
                .order_by("updated_at", direction=firestore.Query.DESCENDING)
                .limit(limit)
                .stream()
            )
            return [doc.to_dict() | {"memory_id": doc.id} for doc in docs]

        memories = await _asyncio.to_thread(_fetch_memories)

        def _mark_accessed(mem_ids):
            col_ref = _get_user_memories_collection(user_id)
            now_inner = datetime.now()
            for mid in mem_ids:
                col_ref.document(mid).update({
                    "access_count": Increment(1),
                    "last_accessed": now_inner,
                })

        if memories:
            await _asyncio.to_thread(_mark_accessed, [m["memory_id"] for m in memories])

        logger.info(f"獲取到用戶 {user_id}{len(memories)} 條記憶")
        # 序列化 Firestore 時間物件,避免 JSON 序列化炸裂
        serialized_memories = _serialize_firestore_data(memories)
        return {"success": True, "memories": serialized_memories}

    except Exception as e:
        logger.error(f"獲取記憶時發生錯誤: {e}")
        return {"success": False, "error": str(e)}


async def search_memories(user_id: str, query_text: str, limit: int = 5) -> Dict[str, Any]:
    """基於簡易文本匹配的記憶搜索"""
    if users_collection is None:
        logger.error("Firestore尚未連接,無法搜索記憶")
        return {"success": False, "error": "數據庫未連接"}

    try:
        import asyncio as _asyncio

        normalized_query = query_text.lower()

        def _candidate_memories():
            col_ref = _get_user_memories_collection(user_id)
            docs = (
                col_ref.order_by("updated_at", direction=firestore.Query.DESCENDING)
                .limit(80)
                .stream()
            )
            results = []
            for doc in docs:
                data = doc.to_dict() or {}
                haystack = "{} {}".format(
                    data.get("content", ""),
                    " ".join(data.get("metadata", {}).get("context_tags", [])),
                ).lower()
                if normalized_query in haystack:
                    data["memory_id"] = doc.id
                    results.append(data)
                    if len(results) >= limit:
                        break
            return results

        memories = await _asyncio.to_thread(_candidate_memories)

        def _mark_accessed(mem_ids):
            col_ref = _get_user_memories_collection(user_id)
            now_inner = datetime.now()
            for mid in mem_ids:
                col_ref.document(mid).update({
                    "access_count": Increment(1),
                    "last_accessed": now_inner,
                })

        if memories:
            await _asyncio.to_thread(_mark_accessed, [m["memory_id"] for m in memories])

        logger.info(f"搜索到用戶 {user_id}{len(memories)} 條相關記憶")
        return {"success": True, "memories": memories}

    except Exception as e:
        logger.error(f"搜索記憶時發生錯誤: {e}")
        return {"success": False, "error": str(e)}


async def update_memory_importance(memory_id: str, new_importance: float):
    """更新記憶的重要性分數"""
    if users_collection is None:
        logger.error("Firestore尚未連接,無法更新記憶")
        return {"success": False, "error": "數據庫未連接"}

    try:
        import asyncio as _asyncio

        sanitized_importance = max(0.0, min(1.0, new_importance))
        now = datetime.now()

        def _update_doc():
            users = users_collection.stream()
            for user_doc in users:
                mem_ref = user_doc.reference.collection("memories").document(memory_id)
                snapshot = mem_ref.get()
                if snapshot.exists:
                    mem_ref.update({
                        "importance": sanitized_importance,
                        "updated_at": now,
                    })
                    return True
            return False

        updated = await _asyncio.to_thread(_update_doc)
        if not updated:
            return {"success": False, "error": "記憶不存在"}

        logger.info(f"更新記憶 {memory_id} 的重要性為 {sanitized_importance}")
        return {"success": True}

    except Exception as e:
        logger.error(f"更新記憶重要性時發生錯誤: {e}")
        return {"success": False, "error": str(e)}


async def delete_memory(memory_id: str):
    """刪除記憶"""
    if users_collection is None:
        logger.error("Firestore尚未連接,無法刪除記憶")
        return {"success": False, "error": "數據庫未連接"}

    try:
        import asyncio as _asyncio

        def _delete_doc():
            users = users_collection.stream()
            for user_doc in users:
                mem_ref = user_doc.reference.collection("memories").document(memory_id)
                snapshot = mem_ref.get()
                if snapshot.exists:
                    mem_ref.delete()
                    return True
            return False

        deleted = await _asyncio.to_thread(_delete_doc)
        if not deleted:
            return {"success": False, "error": "記憶不存在"}

        logger.info(f"刪除記憶 {memory_id}")
        return {"success": True}

    except Exception as e:
        logger.error(f"刪除記憶時發生錯誤: {e}")
        return {"success": False, "error": str(e)}


async def cleanup_old_memories(user_id: str, days_old: int = 90, min_importance: float = 0.3):
    """清理舊的、低重要性的記憶

    Args:
        user_id: 用戶ID
        days_old: 刪除多少天前的記憶
        min_importance: 保留的最小重要性分數
    """
    if users_collection is None:
        logger.error("Firestore尚未連接,無法清理記憶")
        return {"success": False, "error": "數據庫未連接"}

    try:
        import asyncio as _asyncio
        from datetime import timedelta

        cutoff_date = datetime.now() - timedelta(days=days_old)

        def _delete_old():
            col_ref = _get_user_memories_collection(user_id)
            docs = (
                col_ref.where(filter=FieldFilter("importance", "<", min_importance))
                .where(filter=FieldFilter("updated_at", "<", cutoff_date))
                .stream()
            )
            deleted_count = 0
            for doc in docs:
                doc.reference.delete()
                deleted_count += 1
            return deleted_count

        deleted = await _asyncio.to_thread(_delete_old)

        logger.info(f"為用戶 {user_id} 清理 {deleted} 條舊記憶")
        return {"success": True, "deleted": deleted}

    except Exception as e:
        logger.error(f"清理記憶時發生錯誤: {e}")
        return {"success": False, "error": str(e)}


def _enforce_memory_quota(col_ref: CollectionReference) -> None:
    docs = list(
        col_ref.order_by("importance", direction=firestore.Query.ASCENDING)
        .order_by("updated_at", direction=firestore.Query.ASCENDING)
        .stream()
    )
    if len(docs) <= MAX_MEMORIES_PER_USER:
        return
    excess = len(docs) - MAX_MEMORIES_PER_USER
    for doc in docs[:excess]:
        doc.reference.delete()