File size: 64,144 Bytes
5868187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
"""
认证API模块 - 使用统一存储中间层,完全摆脱文件操作
"""
import asyncio
import json
import secrets
import socket
import threading
import time
import uuid
from datetime import timezone
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Optional, Dict, Any, List
from urllib.parse import urlparse, parse_qs

from .google_oauth_api import Credentials, Flow, enable_required_apis, get_user_projects, select_default_project
from .storage_adapter import get_storage_adapter
from config import get_config_value
from log import log

# OAuth Configuration
CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com"
CLIENT_SECRET = "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"
SCOPES = [
    "https://www.googleapis.com/auth/cloud-platform",
    "https://www.googleapis.com/auth/userinfo.email",
    "https://www.googleapis.com/auth/userinfo.profile",
]

# 回调服务器配置
CALLBACK_HOST = 'localhost'

async def get_callback_port():
    """获取OAuth回调端口"""
    return int(await get_config_value('oauth_callback_port', '8080', 'OAUTH_CALLBACK_PORT'))

# 全局状态管理 - 严格限制大小
auth_flows = {}  # 存储进行中的认证流程
MAX_AUTH_FLOWS = 20  # 严格限制最大认证流程数

def cleanup_auth_flows_for_memory():
    """清理认证流程以释放内存"""
    global auth_flows
    cleaned = cleanup_expired_flows()
    # 如果还是太多,强制清理一些旧的流程
    if len(auth_flows) > 10:
        # 按创建时间排序,保留最新的10个
        sorted_flows = sorted(auth_flows.items(), key=lambda x: x[1].get('created_at', 0), reverse=True)
        new_auth_flows = dict(sorted_flows[:10])
        
        # 清理被移除的流程
        for state, flow_data in auth_flows.items():
            if state not in new_auth_flows:
                try:
                    if flow_data.get('server'):
                        server = flow_data['server']
                        port = flow_data.get('callback_port')
                        async_shutdown_server(server, port)
                except Exception:
                    pass
                flow_data.clear()
        
        auth_flows = new_auth_flows
        log.info(f"强制清理认证流程,保留 {len(auth_flows)} 个最新流程")
    
    return len(auth_flows)


async def find_available_port(start_port: int = None) -> int:
    """动态查找可用端口"""
    if start_port is None:
        start_port = await get_callback_port()
    
    # 首先尝试默认端口
    for port in range(start_port, start_port + 100):  # 尝试100个端口
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                s.bind(('0.0.0.0', port))
                log.info(f"找到可用端口: {port}")
                return port
        except OSError:
            continue
    
    # 如果都不可用,让系统自动分配端口
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.bind(('0.0.0.0', 0))
            port = s.getsockname()[1]
            log.info(f"系统分配可用端口: {port}")
            return port
    except OSError as e:
        log.error(f"无法找到可用端口: {e}")
        raise RuntimeError("无法找到可用端口")

def create_callback_server(port: int) -> HTTPServer:
    """创建指定端口的回调服务器,优化快速关闭"""
    try:
        # 服务器监听0.0.0.0
        server = HTTPServer(("0.0.0.0", port), AuthCallbackHandler)
        
        # 设置socket选项以支持快速关闭
        server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 设置较短的超时时间
        server.timeout = 1.0
        
        log.info(f"创建OAuth回调服务器,监听端口: {port}")
        return server
    except OSError as e:
        log.error(f"创建端口{port}的服务器失败: {e}")
        raise

class AuthCallbackHandler(BaseHTTPRequestHandler):
    """OAuth回调处理器"""
    def do_GET(self):
        query_components = parse_qs(urlparse(self.path).query)
        code = query_components.get("code", [None])[0]
        state = query_components.get("state", [None])[0]
        
        log.info(f"收到OAuth回调: code={'已获取' if code else '未获取'}, state={state}")
        
        if code and state and state in auth_flows:
            # 更新流程状态
            auth_flows[state]['code'] = code
            auth_flows[state]['completed'] = True
            
            log.info(f"OAuth回调成功处理: state={state}")
            
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.end_headers()
            # 成功页面
            self.wfile.write(b"<h1>OAuth authentication successful!</h1><p>You can close this window. Please return to the original page and click 'Get Credentials' button.</p>")
        else:
            self.send_response(400)
            self.send_header("Content-type", "text/html")
            self.end_headers()
            self.wfile.write(b"<h1>Authentication failed.</h1><p>Please try again.</p>")
    
    def log_message(self, format, *args):
        # 减少日志噪音
        pass


async def create_auth_url(project_id: Optional[str] = None, user_session: str = None, get_all_projects: bool = False) -> Dict[str, Any]:
    """创建认证URL,支持动态端口分配"""
    try:
        # 动态分配端口
        callback_port = await find_available_port()
        callback_url = f"http://{CALLBACK_HOST}:{callback_port}"
        
        # 立即启动回调服务器
        try:
            callback_server = create_callback_server(callback_port)
            # 在后台线程中运行服务器
            server_thread = threading.Thread(
                target=callback_server.serve_forever, 
                daemon=True,
                name=f"OAuth-Server-{callback_port}"
            )
            server_thread.start()
            log.info(f"OAuth回调服务器已启动,端口: {callback_port}")
        except Exception as e:
            log.error(f"启动回调服务器失败: {e}")
            return {
                'success': False,
                'error': f'无法启动OAuth回调服务器,端口{callback_port}: {str(e)}'
            }
        
        # 创建OAuth流程
        client_config = {
            "installed": {
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "auth_uri": "https://accounts.google.com/o/oauth2/auth",
                "token_uri": "https://oauth2.googleapis.com/token",
            }
        }
        
        flow = Flow(
            client_id=CLIENT_ID,
            client_secret=CLIENT_SECRET,
            scopes=SCOPES,
            redirect_uri=callback_url
        )
        
        # 生成状态标识符,包含用户会话信息
        if user_session:
            state = f"{user_session}_{str(uuid.uuid4())}"
        else:
            state = str(uuid.uuid4())
        
        # 生成认证URL
        auth_url = flow.get_auth_url(state=state)
        
        # 严格控制认证流程数量 - 超过限制时立即清理最旧的
        if len(auth_flows) >= MAX_AUTH_FLOWS:
            # 清理最旧的认证流程
            oldest_state = min(auth_flows.keys(), 
                             key=lambda k: auth_flows[k].get('created_at', 0))
            try:
                # 清理服务器资源
                old_flow = auth_flows[oldest_state]
                if old_flow.get('server'):
                    server = old_flow['server']
                    port = old_flow.get('callback_port')
                    async_shutdown_server(server, port)
            except Exception as e:
                log.warning(f"Failed to cleanup old auth flow {oldest_state}: {e}")
            
            del auth_flows[oldest_state]
            log.debug(f"Removed oldest auth flow: {oldest_state}")
        
        # 保存流程状态
        auth_flows[state] = {
            'flow': flow,
            'project_id': project_id,  # 可能为None,稍后在回调时确定
            'user_session': user_session,
            'callback_port': callback_port,  # 存储分配的端口
            'callback_url': callback_url,   # 存储完整回调URL
            'server': callback_server,  # 存储服务器实例
            'server_thread': server_thread,  # 存储服务器线程
            'code': None,
            'completed': False,
            'created_at': time.time(),
            'auto_project_detection': project_id is None,  # 标记是否需要自动检测项目ID
            'get_all_projects': get_all_projects  # 是否为所有项目获取凭证
        }
        
        # 清理过期的流程(30分钟)
        cleanup_expired_flows()
        
        log.info(f"OAuth流程已创建: state={state}, project_id={project_id}")
        log.info(f"用户需要访问认证URL,然后OAuth会回调到 {callback_url}")
        log.info(f"为此认证流程分配的端口: {callback_port}")
        
        return {
            'auth_url': auth_url,
            'state': state,
            'callback_port': callback_port,
            'success': True,
            'auto_project_detection': project_id is None,
            'detected_project_id': project_id
        }
        
    except Exception as e:
        log.error(f"创建认证URL失败: {e}")
        return {
            'success': False,
            'error': str(e)
        }


def wait_for_callback_sync(state: str, timeout: int = 300) -> Optional[str]:
    """同步等待OAuth回调完成,使用对应流程的专用服务器"""
    if state not in auth_flows:
        log.error(f"未找到状态为 {state} 的认证流程")
        return None
    
    flow_data = auth_flows[state]
    callback_port = flow_data['callback_port']
    
    # 服务器已经在create_auth_url时启动了,这里只需要等待
    log.info(f"等待OAuth回调完成,端口: {callback_port}")
    
    # 等待回调完成
    start_time = time.time()
    while time.time() - start_time < timeout:
        if flow_data.get('code'):
            log.info(f"OAuth回调成功完成")
            return flow_data['code']
        time.sleep(0.5)  # 每0.5秒检查一次
        
        # 刷新flow_data引用
        if state in auth_flows:
            flow_data = auth_flows[state]
    
    log.warning(f"等待OAuth回调超时 ({timeout}秒)")
    return None


async def complete_auth_flow(project_id: Optional[str] = None, user_session: str = None) -> Dict[str, Any]:
    """完成认证流程并保存凭证,支持自动检测项目ID"""
    try:
        # 查找对应的认证流程
        state = None
        flow_data = None
        
        # 如果指定了project_id,先尝试匹配指定的项目
        if project_id:
            for s, data in auth_flows.items():
                if data['project_id'] == project_id:
                    # 如果指定了用户会话,优先匹配相同会话的流程
                    if user_session and data.get('user_session') == user_session:
                        state = s
                        flow_data = data
                        break
                    # 如果没有指定会话,或没找到匹配会话的流程,使用第一个匹配项目ID的
                    elif not state:
                        state = s
                        flow_data = data
        
        # 如果没有指定项目ID或没找到匹配的,查找需要自动检测项目ID的流程
        if not state:
            for s, data in auth_flows.items():
                if data.get('auto_project_detection', False):
                    # 如果指定了用户会话,优先匹配相同会话的流程
                    if user_session and data.get('user_session') == user_session:
                        state = s
                        flow_data = data
                        break
                    # 使用第一个找到的需要自动检测的流程
                    elif not state:
                        state = s
                        flow_data = data
        
        if not state or not flow_data:
            return {
                'success': False,
                'error': '未找到对应的认证流程,请先点击获取认证链接'
            }
        
        if not project_id:
            project_id = flow_data.get('project_id')
            if not project_id:
                return {
                    'success': False,
                    'error': '缺少项目ID,请指定项目ID',
                    'requires_manual_project_id': True
                }
        
        flow = flow_data['flow']
        
        # 如果还没有授权码,需要等待回调
        if not flow_data.get('code'):
            log.info(f"等待用户完成OAuth授权 (state: {state})")
            auth_code = wait_for_callback_sync(state)
            
            if not auth_code:
                return {
                    'success': False,
                    'error': '未接收到授权回调,请确保完成了浏览器中的OAuth认证'
                }
            
            # 更新流程数据
            auth_flows[state]['code'] = auth_code
            auth_flows[state]['completed'] = True
        else:
            auth_code = flow_data['code']
        
        # 使用认证代码获取凭证
        import oauthlib.oauth2.rfc6749.parameters
        original_validate = oauthlib.oauth2.rfc6749.parameters.validate_token_parameters
        
        def patched_validate(params):
            try:
                return original_validate(params)
            except Warning:
                pass
        
        oauthlib.oauth2.rfc6749.parameters.validate_token_parameters = patched_validate
        
        try:
            credentials = await flow.exchange_code(auth_code)
            # credentials 已经在 exchange_code 中获得
            
            # 如果需要自动检测项目ID且没有提供项目ID
            if flow_data.get('auto_project_detection', False) and not project_id:
                log.info("尝试通过API获取用户项目列表...")
                log.info(f"使用的token: {credentials.access_token[:20]}...")
                log.info(f"Token过期时间: {credentials.expires_at}")
                user_projects = await get_user_projects(credentials)
                
                if user_projects:
                    # 如果只有一个项目,自动使用
                    if len(user_projects) == 1:
                        project_id = user_projects[0].get('projectId')
                        if project_id:
                            flow_data['project_id'] = project_id
                            log.info(f"自动选择唯一项目: {project_id}")
                    # 如果有多个项目,尝试选择默认项目
                    else:
                        project_id = await select_default_project(user_projects)
                        if project_id:
                            flow_data['project_id'] = project_id
                            log.info(f"自动选择默认项目: {project_id}")
                        else:
                            # 返回项目列表让用户选择
                            return {
                                'success': False,
                                'error': '请从以下项目中选择一个',
                                'requires_project_selection': True,
                                'available_projects': [
                                    {
                                        'projectId': p.get('projectId'),
                                        'name': p.get('displayName') or p.get('projectId'),
                                        'projectNumber': p.get('projectNumber')
                                    }
                                    for p in user_projects
                                ]
                            }
                else:
                    # 如果无法获取项目列表,提示手动输入
                    return {
                        'success': False,
                        'error': '无法获取您的项目列表,请手动指定项目ID',
                        'requires_manual_project_id': True
                    }
            
            # 如果仍然没有项目ID,返回错误
            if not project_id:
                return {
                    'success': False,
                    'error': '缺少项目ID,请指定项目ID',
                    'requires_manual_project_id': True
                }
            
            # 保存凭证
            saved_filename = await save_credentials(credentials, project_id)
            
            # 准备返回的凭证数据
            creds_data = {
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "token": credentials.access_token,
                "refresh_token": credentials.refresh_token,
                "scopes": SCOPES,
                "token_uri": "https://oauth2.googleapis.com/token",
                "project_id": project_id
            }
            
            if credentials.expires_at:
                if credentials.expires_at.tzinfo is None:
                    expiry_utc = credentials.expires_at.replace(tzinfo=timezone.utc)
                else:
                    expiry_utc = credentials.expires_at
                creds_data["expiry"] = expiry_utc.isoformat()
            
            # 清理使用过的流程
            if state in auth_flows:
                flow_data_to_clean = auth_flows[state]
                # 快速关闭服务器
                try:
                    if flow_data_to_clean.get('server'):
                        server = flow_data_to_clean['server']
                        port = flow_data_to_clean.get('callback_port')
                        async_shutdown_server(server, port)
                except Exception as e:
                    log.debug(f"启动异步关闭服务器时出错: {e}")
                
                del auth_flows[state]
            
            log.info("OAuth认证成功,凭证已保存")
            return {
                'success': True,
                'credentials': creds_data,
                'file_path': saved_filename,
                'auto_detected_project': flow_data.get('auto_project_detection', False)
            }
            
        except Exception as e:
            log.error(f"获取凭证失败: {e}")
            return {
                'success': False,
                'error': f'获取凭证失败: {str(e)}'
            }
        finally:
            oauthlib.oauth2.rfc6749.parameters.validate_token_parameters = original_validate
            
    except Exception as e:
        log.error(f"完成认证流程失败: {e}")
        return {
            'success': False,
            'error': str(e)
        }


async def asyncio_complete_auth_flow(project_id: Optional[str] = None, user_session: str = None, get_all_projects: bool = False) -> Dict[str, Any]:
    """异步完成认证流程,支持自动检测项目ID"""
    try:
        log.info(f"asyncio_complete_auth_flow开始执行: project_id={project_id}, user_session={user_session}")
        
        # 查找对应的认证流程
        state = None
        flow_data = None
        
        log.debug(f"当前所有auth_flows: {list(auth_flows.keys())}")
        
        # 如果指定了project_id,先尝试匹配指定的项目
        if project_id:
            log.info(f"尝试匹配指定的项目ID: {project_id}")
            for s, data in auth_flows.items():
                if data['project_id'] == project_id:
                    # 如果指定了用户会话,优先匹配相同会话的流程
                    if user_session and data.get('user_session') == user_session:
                        state = s
                        flow_data = data
                        log.info(f"找到匹配的用户会话: {s}")
                        break
                    # 如果没有指定会话,或没找到匹配会话的流程,使用第一个匹配项目ID的
                    elif not state:
                        state = s
                        flow_data = data
                        log.info(f"找到匹配的项目ID: {s}")
        
        # 如果没有指定项目ID或没找到匹配的,查找需要自动检测项目ID的流程
        if not state:
            log.info(f"没有找到指定项目的流程,查找自动检测流程")
            for s, data in auth_flows.items():
                log.debug(f"检查流程 {s}: auto_project_detection={data.get('auto_project_detection', False)}")
                if data.get('auto_project_detection', False):
                    # 如果指定了用户会话,优先匹配相同会话的流程
                    if user_session and data.get('user_session') == user_session:
                        state = s
                        flow_data = data
                        log.info(f"找到匹配用户会话的自动检测流程: {s}")
                        break
                    # 使用第一个找到的需要自动检测的流程
                    elif not state:
                        state = s
                        flow_data = data
                        log.info(f"找到自动检测流程: {s}")
        
        if not state or not flow_data:
            log.error(f"未找到认证流程: state={state}, flow_data存在={bool(flow_data)}")
            log.debug(f"当前所有flow_data: {list(auth_flows.keys())}")
            return {
                'success': False,
                'error': '未找到对应的认证流程,请先点击获取认证链接'
            }
        
        log.info(f"找到认证流程: state={state}")
        log.info(f"flow_data内容: project_id={flow_data.get('project_id')}, auto_project_detection={flow_data.get('auto_project_detection')}")
        log.info(f"传入的project_id参数: {project_id}")
        
        # 如果需要自动检测项目ID且没有提供项目ID
        log.info(f"检查auto_project_detection条件: auto_project_detection={flow_data.get('auto_project_detection', False)}, not project_id={not project_id}")
        if flow_data.get('auto_project_detection', False) and not project_id:
            log.info("跳过自动检测项目ID,进入等待阶段")
        elif not project_id:
            log.info("进入project_id检查分支")
            project_id = flow_data.get('project_id')
            if not project_id:
                log.error("缺少项目ID,返回错误")
                return {
                    'success': False,
                    'error': '缺少项目ID,请指定项目ID',
                    'requires_manual_project_id': True
                }
        else:
            log.info(f"使用提供的项目ID: {project_id}")
        
        # 检查是否已经有授权码
        log.info(f"开始检查OAuth授权码...")
        max_wait_time = 60  # 最多等待60秒
        wait_interval = 1   # 每秒检查一次
        waited = 0
        
        while waited < max_wait_time:
            log.debug(f"等待OAuth授权码... ({waited}/{max_wait_time}秒)")
            if flow_data.get('code'):
                log.info(f"检测到OAuth授权码,开始处理凭证 (等待时间: {waited}秒)")
                break
            
            # 异步等待
            await asyncio.sleep(wait_interval)
            waited += wait_interval
            
            # 刷新flow_data引用,因为可能被回调更新了
            if state in auth_flows:
                flow_data = auth_flows[state]
                log.debug(f"刷新flow_data: completed={flow_data.get('completed')}, code存在={bool(flow_data.get('code'))}")
        
        if not flow_data.get('code'):
            log.error(f"等待OAuth回调超时,等待了{waited}秒")
            return {
                'success': False,
                'error': '等待OAuth回调超时,请确保完成了浏览器中的认证并看到成功页面'
            }
        
        flow = flow_data['flow']
        auth_code = flow_data['code']
        
        log.info(f"开始使用授权码获取凭证: code={'***' + auth_code[-4:] if auth_code else 'None'}")
        
        # 使用认证代码获取凭证
        import oauthlib.oauth2.rfc6749.parameters
        original_validate = oauthlib.oauth2.rfc6749.parameters.validate_token_parameters
        
        def patched_validate(params):
            try:
                return original_validate(params)
            except Warning:
                pass
        
        oauthlib.oauth2.rfc6749.parameters.validate_token_parameters = patched_validate
        
        try:
            log.info(f"调用flow.exchange_code...")
            credentials = await flow.exchange_code(auth_code)
            log.info(f"成功获取凭证,token前缀: {credentials.access_token[:20] if credentials.access_token else 'None'}...")
            
            log.info(f"检查是否需要项目检测: auto_project_detection={flow_data.get('auto_project_detection')}, project_id={project_id}")
            
            # 检查是否为批量获取所有项目模式
            if flow_data.get('get_all_projects', False) or get_all_projects:
                log.info("批量模式:为所有项目并发获取凭证...")
                user_projects = await get_user_projects(credentials)
                
                if user_projects:
                    async def process_single_project(project_info):
                        """并发处理单个项目的凭证获取"""
                        project_id_current = project_info.get('projectId')
                        project_name = project_info.get('displayName') or project_id_current
                        
                        try:
                            log.info(f"为项目 {project_name} ({project_id_current}) 启用API服务...")
                            await enable_required_apis(credentials, project_id_current)
                            
                            # 保存凭证
                            saved_filename = await save_credentials(credentials, project_id_current)
                            
                            log.info(f"成功为项目 {project_name} 保存凭证")
                            return {
                                'status': 'success',
                                'project_id': project_id_current,
                                'project_name': project_name,
                                'file_path': saved_filename
                            }
                            
                        except Exception as e:
                            log.error(f"为项目 {project_name} ({project_id_current}) 处理凭证失败: {e}")
                            return {
                                'status': 'failed',
                                'project_id': project_id_current,
                                'project_name': project_name,
                                'error': str(e)
                            }
                    
                    # 并发处理所有项目
                    log.info(f"开始并发处理 {len(user_projects)} 个项目...")
                    tasks = [process_single_project(project_info) for project_info in user_projects]
                    results = await asyncio.gather(*tasks, return_exceptions=True)
                    
                    # 整理结果
                    multiple_results = {'success': [], 'failed': []}
                    for result in results:
                        if isinstance(result, Exception):
                            log.error(f"并发处理项目时发生异常: {result}")
                            multiple_results['failed'].append({
                                'project_id': 'unknown',
                                'project_name': 'unknown',
                                'error': f'处理异常: {str(result)}'
                            })
                        elif result['status'] == 'success':
                            multiple_results['success'].append({
                                'project_id': result['project_id'],
                                'project_name': result['project_name'],
                                'file_path': result['file_path']
                            })
                        else:  # failed
                            multiple_results['failed'].append({
                                'project_id': result['project_id'],
                                'project_name': result['project_name'],
                                'error': result['error']
                            })
                    
                    # 清理使用过的流程
                    if state in auth_flows:
                        flow_data_to_clean = auth_flows[state]
                        try:
                            if flow_data_to_clean.get('server'):
                                server = flow_data_to_clean['server']
                                port = flow_data_to_clean.get('callback_port')
                                async_shutdown_server(server, port)
                        except Exception as e:
                            log.debug(f"启动异步关闭服务器时出错: {e}")
                        del auth_flows[state]
                    
                    log.info(f"批量并发认证完成:成功 {len(multiple_results['success'])} 个,失败 {len(multiple_results['failed'])} 个")
                    return {
                        'success': True,
                        'multiple_credentials': multiple_results
                    }
                else:
                    return {
                        'success': False,
                        'error': '无法获取您的项目列表,批量认证失败'
                    }
                        
            # 如果需要自动检测项目ID且没有提供项目ID(单项目模式)
            elif flow_data.get('auto_project_detection', False) and not project_id:
                log.info("尝试通过API获取用户项目列表...")
                log.info(f"使用的token: {credentials.access_token[:20]}...")
                log.info(f"Token过期时间: {credentials.expires_at}")
                user_projects = await get_user_projects(credentials)
                
                if user_projects:
                    # 如果只有一个项目,自动使用
                    if len(user_projects) == 1:
                        project_id = user_projects[0].get('projectId')
                        if project_id:
                            flow_data['project_id'] = project_id
                            log.info(f"自动选择唯一项目: {project_id}")
                            # 自动启用必需的API服务
                            log.info("正在自动启用必需的API服务...")
                            await enable_required_apis(credentials, project_id)
                    # 如果有多个项目,尝试选择默认项目
                    else:
                        project_id = await select_default_project(user_projects)
                        if project_id:
                            flow_data['project_id'] = project_id
                            log.info(f"自动选择默认项目: {project_id}")
                            # 自动启用必需的API服务
                            log.info("正在自动启用必需的API服务...")
                            await enable_required_apis(credentials, project_id)
                        else:
                            # 返回项目列表让用户选择
                            return {
                                'success': False,
                                'error': '请从以下项目中选择一个',
                                'requires_project_selection': True,
                                'available_projects': [
                                    {
                                        'projectId': p.get('projectId'),
                                        'name': p.get('displayName') or p.get('projectId'),
                                        'projectNumber': p.get('projectNumber')
                                    }
                                    for p in user_projects
                                ]
                            }
                else:
                    # 如果无法获取项目列表,提示手动输入
                    return {
                        'success': False,
                        'error': '无法获取您的项目列表,请手动指定项目ID',
                        'requires_manual_project_id': True
                    }
            elif project_id:
                # 如果已经有项目ID(手动提供或环境检测),也尝试启用API服务
                log.info("正在为已提供的项目ID自动启用必需的API服务...")
                await enable_required_apis(credentials, project_id)
            
            # 如果仍然没有项目ID,返回错误
            if not project_id:
                return {
                    'success': False,
                    'error': '缺少项目ID,请指定项目ID',
                    'requires_manual_project_id': True
                }
            
            # 保存凭证
            saved_filename = await save_credentials(credentials, project_id)
            
            # 准备返回的凭证数据
            creds_data = {
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "token": credentials.access_token,
                "refresh_token": credentials.refresh_token,
                "scopes": SCOPES,
                "token_uri": "https://oauth2.googleapis.com/token",
                "project_id": project_id
            }
            
            if credentials.expires_at:
                if credentials.expires_at.tzinfo is None:
                    expiry_utc = credentials.expires_at.replace(tzinfo=timezone.utc)
                else:
                    expiry_utc = credentials.expires_at
                creds_data["expiry"] = expiry_utc.isoformat()
            
            # 清理使用过的流程
            if state in auth_flows:
                flow_data_to_clean = auth_flows[state]
                # 快速关闭服务器
                try:
                    if flow_data_to_clean.get('server'):
                        server = flow_data_to_clean['server']
                        port = flow_data_to_clean.get('callback_port')
                        async_shutdown_server(server, port)
                except Exception as e:
                    log.debug(f"启动异步关闭服务器时出错: {e}")
                
                del auth_flows[state]
            
            log.info("OAuth认证成功,凭证已保存")
            return {
                'success': True,
                'credentials': creds_data,
                'file_path': saved_filename,
                'auto_detected_project': flow_data.get('auto_project_detection', False)
            }
            
        except Exception as e:
            log.error(f"获取凭证失败: {e}")
            return {
                'success': False,
                'error': f'获取凭证失败: {str(e)}'
            }
        finally:
            oauthlib.oauth2.rfc6749.parameters.validate_token_parameters = original_validate
            
    except Exception as e:
        log.error(f"异步完成认证流程失败: {e}")
        return {
            'success': False,
            'error': str(e)
        }


async def complete_auth_flow_from_callback_url(callback_url: str, project_id: Optional[str] = None, get_all_projects: bool = False) -> Dict[str, Any]:
    """从回调URL直接完成认证流程,无需启动本地服务器"""
    try:
        log.info(f"开始从回调URL完成认证: {callback_url}")
        
        # 解析回调URL
        parsed_url = urlparse(callback_url)
        query_params = parse_qs(parsed_url.query)
        
        # 验证必要参数
        if 'state' not in query_params or 'code' not in query_params:
            return {
                'success': False,
                'error': '回调URL缺少必要参数 (state 或 code)'
            }
        
        state = query_params['state'][0]
        code = query_params['code'][0]
        
        log.info(f"从URL解析到: state={state}, code=xxx...")
        
        # 检查是否有对应的认证流程
        if state not in auth_flows:
            return {
                'success': False,
                'error': f'未找到对应的认证流程,请先启动认证 (state: {state})'
            }
        
        flow_data = auth_flows[state]
        flow = flow_data['flow']
        
        # 构造回调URL(使用flow中存储的redirect_uri)
        redirect_uri = flow.redirect_uri
        log.info(f"使用redirect_uri: {redirect_uri}")
        
        try:
            # 使用authorization code获取token
            credentials = await flow.exchange_code(code)
            log.info("成功获取访问令牌")
            
            # 检查是否为批量获取所有项目模式
            if get_all_projects:
                log.info("批量模式:从回调URL为所有项目并发获取凭证...")
                try:
                    projects = await get_user_projects(credentials)
                    if projects:
                        async def process_single_project(project_info):
                            """并发处理单个项目的凭证获取"""
                            project_id_current = project_info.get('projectId')
                            project_name = project_info.get('displayName') or project_id_current
                            
                            try:
                                log.info(f"为项目 {project_name} ({project_id_current}) 启用API服务...")
                                await enable_required_apis(credentials, project_id_current)
                                
                                # 保存凭证
                                saved_filename = await save_credentials(credentials, project_id_current)
                                
                                log.info(f"成功为项目 {project_name} 保存凭证")
                                return {
                                    'status': 'success',
                                    'project_id': project_id_current,
                                    'project_name': project_name,
                                    'file_path': saved_filename
                                }
                                
                            except Exception as e:
                                log.error(f"为项目 {project_name} ({project_id_current}) 处理凭证失败: {e}")
                                return {
                                    'status': 'failed',
                                    'project_id': project_id_current,
                                    'project_name': project_name,
                                    'error': str(e)
                                }
                        
                        # 并发处理所有项目
                        log.info(f"开始并发处理 {len(projects)} 个项目...")
                        tasks = [process_single_project(project_info) for project_info in projects]
                        results = await asyncio.gather(*tasks, return_exceptions=True)
                        
                        # 整理结果
                        multiple_results = {'success': [], 'failed': []}
                        for result in results:
                            if isinstance(result, Exception):
                                log.error(f"并发处理项目时发生异常: {result}")
                                multiple_results['failed'].append({
                                    'project_id': 'unknown',
                                    'project_name': 'unknown',
                                    'error': f'处理异常: {str(result)}'
                                })
                            elif result['status'] == 'success':
                                multiple_results['success'].append({
                                    'project_id': result['project_id'],
                                    'project_name': result['project_name'],
                                    'file_path': result['file_path']
                                })
                            else:  # failed
                                multiple_results['failed'].append({
                                    'project_id': result['project_id'],
                                    'project_name': result['project_name'],
                                    'error': result['error']
                                })
                        
                        # 清理使用过的流程
                        if state in auth_flows:
                            flow_data_to_clean = auth_flows[state]
                            try:
                                if flow_data_to_clean.get('server'):
                                    server = flow_data_to_clean['server']
                                    port = flow_data_to_clean.get('callback_port')
                                    async_shutdown_server(server, port)
                            except Exception as e:
                                log.debug(f"关闭服务器时出错: {e}")
                            del auth_flows[state]
                        
                        log.info(f"从回调URL批量并发认证完成:成功 {len(multiple_results['success'])} 个,失败 {len(multiple_results['failed'])} 个")
                        return {
                            'success': True,
                            'multiple_credentials': multiple_results
                        }
                    else:
                        return {
                            'success': False,
                            'error': '无法获取您的项目列表,批量认证失败'
                        }
                except Exception as e:
                    log.error(f"批量获取项目列表失败: {e}")
                    return {
                        'success': False,
                        'error': f'批量获取项目列表失败: {str(e)}'
                    }
            
            # 单项目模式的项目ID处理逻辑
            detected_project_id = None
            auto_detected = False
            
            if not project_id:
                # 尝试自动检测项目ID
                try:
                    projects = await get_user_projects(credentials)
                    if projects:
                        if len(projects) == 1:
                            # 只有一个项目,自动使用
                            detected_project_id = projects[0]['projectId']
                            auto_detected = True
                            log.info(f"自动检测到唯一项目ID: {detected_project_id}")
                        else:
                            # 多个项目,自动选择第一个
                            detected_project_id = projects[0]['projectId']
                            auto_detected = True
                            log.info(f"检测到{len(projects)}个项目,自动选择第一个: {detected_project_id}")
                            log.debug(f"其他可用项目: {[p['projectId'] for p in projects[1:]]}")
                    else:
                        # 没有项目访问权限
                        return {
                            'success': False,
                            'error': '未检测到可访问的项目,请检查权限或手动指定项目ID',
                            'requires_manual_project_id': True
                        }
                except Exception as e:
                    log.warning(f"自动检测项目ID失败: {e}")
                    return {
                        'success': False,
                        'error': f'自动检测项目ID失败: {str(e)},请手动指定项目ID',
                        'requires_manual_project_id': True
                    }
            else:
                detected_project_id = project_id
            
            # 启用必需的API服务
            if detected_project_id:
                try:
                    log.info(f"正在为项目 {detected_project_id} 启用必需的API服务...")
                    await enable_required_apis(credentials, detected_project_id)
                except Exception as e:
                    log.warning(f"启用API服务失败: {e}")
            
            # 保存凭证
            saved_filename = await save_credentials(credentials, detected_project_id)
            
            # 准备返回的凭证数据
            creds_data = {
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,
                "token": credentials.access_token,
                "refresh_token": credentials.refresh_token,
                "scopes": SCOPES,
                "token_uri": "https://oauth2.googleapis.com/token",
                "project_id": detected_project_id
            }
            
            if credentials.expires_at:
                if credentials.expires_at.tzinfo is None:
                    expiry_utc = credentials.expires_at.replace(tzinfo=timezone.utc)
                else:
                    expiry_utc = credentials.expires_at
                creds_data["expiry"] = expiry_utc.isoformat()
            
            # 清理使用过的流程
            if state in auth_flows:
                flow_data_to_clean = auth_flows[state]
                # 快速关闭服务器(如果有)
                try:
                    if flow_data_to_clean.get('server'):
                        server = flow_data_to_clean['server']
                        port = flow_data_to_clean.get('callback_port')
                        async_shutdown_server(server, port)
                except Exception as e:
                    log.debug(f"关闭服务器时出错: {e}")
                
                del auth_flows[state]
            
            log.info("从回调URL完成OAuth认证成功,凭证已保存")
            return {
                'success': True,
                'credentials': creds_data,
                'file_path': saved_filename,
                'auto_detected_project': auto_detected
            }
            
        except Exception as e:
            log.error(f"从回调URL获取凭证失败: {e}")
            return {
                'success': False,
                'error': f'获取凭证失败: {str(e)}'
            }
        
    except Exception as e:
        log.error(f"从回调URL完成认证流程失败: {e}")
        return {
            'success': False,
            'error': str(e)
        }


async def save_credentials(creds: Credentials, project_id: str) -> str:
    """通过统一存储系统保存凭证"""
    # 生成文件名(使用project_id和时间戳)
    timestamp = int(time.time())
    filename = f"{project_id}-{timestamp}.json"
    
    # 准备凭证数据
    creds_data = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "token": creds.access_token,
        "refresh_token": creds.refresh_token,
        "scopes": SCOPES,
        "token_uri": "https://oauth2.googleapis.com/token",
        "project_id": project_id
    }
    
    if creds.expires_at:
        if creds.expires_at.tzinfo is None:
            expiry_utc = creds.expires_at.replace(tzinfo=timezone.utc)
        else:
            expiry_utc = creds.expires_at
        creds_data["expiry"] = expiry_utc.isoformat()
    
    # 通过存储适配器保存
    storage_adapter = await get_storage_adapter()
    success = await storage_adapter.store_credential(filename, creds_data)
    
    if success:
        # 创建默认状态记录
        try:
            default_state = {
                "error_codes": [],
                "disabled": False,
                "last_success": time.time(),
                "user_email": None,
                "gemini_2_5_pro_calls": 0,
                "total_calls": 0,
                "next_reset_time": None,
                "daily_limit_gemini_2_5_pro": 100,
                "daily_limit_total": 1000
            }
            await storage_adapter.update_credential_state(filename, default_state)
            log.info(f"凭证和状态已保存到: {filename}")
        except Exception as e:
            log.warning(f"创建默认状态记录失败 {filename}: {e}")
        
        return filename
    else:
        raise Exception(f"保存凭证失败: {filename}")


def async_shutdown_server(server, port):
    """异步关闭OAuth回调服务器,避免阻塞主流程"""
    def shutdown_server_async():
        try:
            # 设置一个标志来跟踪关闭状态
            shutdown_completed = threading.Event()
            
            def do_shutdown():
                try:
                    server.shutdown()
                    server.server_close()
                    shutdown_completed.set()
                    log.info(f"已关闭端口 {port} 的OAuth回调服务器")
                except Exception as e:
                    shutdown_completed.set()
                    log.debug(f"关闭服务器时出错: {e}")
            
            # 在单独线程中执行关闭操作
            shutdown_worker = threading.Thread(target=do_shutdown, daemon=True)
            shutdown_worker.start()
            
            # 等待最多5秒,如果超时就放弃等待
            if shutdown_completed.wait(timeout=5):
                log.debug(f"端口 {port} 服务器关闭完成")
            else:
                log.warning(f"端口 {port} 服务器关闭超时,但不阻塞主流程")
                
        except Exception as e:
            log.debug(f"异步关闭服务器时出错: {e}")
    
    # 在后台线程中关闭服务器,不阻塞主流程
    shutdown_thread = threading.Thread(target=shutdown_server_async, daemon=True)
    shutdown_thread.start()
    log.debug(f"开始异步关闭端口 {port} 的OAuth回调服务器")

def cleanup_expired_flows():
    """清理过期的认证流程"""
    current_time = time.time()
    EXPIRY_TIME = 600  # 10分钟过期
    
    # 直接遍历删除,避免创建额外列表
    states_to_remove = [
        state for state, flow_data in auth_flows.items()
        if current_time - flow_data['created_at'] > EXPIRY_TIME
    ]
    
    # 批量清理,提高效率
    cleaned_count = 0
    for state in states_to_remove:
        flow_data = auth_flows.get(state)
        if flow_data:
            # 快速关闭可能存在的服务器
            try:
                if flow_data.get('server'):
                    server = flow_data['server']
                    port = flow_data.get('callback_port')
                    async_shutdown_server(server, port)
            except Exception as e:
                log.debug(f"清理过期流程时启动异步关闭服务器失败: {e}")
            
            # 显式清理流程数据,释放内存
            flow_data.clear()
            del auth_flows[state]
            cleaned_count += 1
    
    if cleaned_count > 0:
        log.info(f"清理了 {cleaned_count} 个过期的认证流程")
    
    # 更积极的垃圾回收触发条件
    if len(auth_flows) > 20:  # 降低阈值
        import gc
        gc.collect()
        log.debug(f"触发垃圾回收,当前活跃认证流程数: {len(auth_flows)}")


def get_auth_status(project_id: str) -> Dict[str, Any]:
    """获取认证状态"""
    for state, flow_data in auth_flows.items():
        if flow_data['project_id'] == project_id:
            return {
                'status': 'completed' if flow_data['completed'] else 'pending',
                'state': state,
                'created_at': flow_data['created_at']
            }
    
    return {
        'status': 'not_found'
    }


# 鉴权功能 - 使用更小的数据结构
auth_tokens = {}  # 存储有效的认证令牌
TOKEN_EXPIRY = 3600  # 1小时令牌过期时间


async def verify_password(password: str) -> bool:
    """验证密码(面板登录使用)"""
    from config import get_panel_password
    correct_password = await get_panel_password()
    return password == correct_password


def generate_auth_token() -> str:
    """生成认证令牌"""
    # 清理过期令牌
    cleanup_expired_tokens()
    
    token = secrets.token_urlsafe(32)
    # 只存储创建时间
    auth_tokens[token] = time.time()
    return token


def verify_auth_token(token: str) -> bool:
    """验证认证令牌"""
    if not token or token not in auth_tokens:
        return False
    
    created_at = auth_tokens[token]
    
    # 检查令牌是否过期 (使用更短的过期时间)
    if time.time() - created_at > TOKEN_EXPIRY:
        del auth_tokens[token]
        return False
    
    return True


def cleanup_expired_tokens():
    """清理过期的认证令牌"""
    current_time = time.time()
    expired_tokens = [
        token for token, created_at in auth_tokens.items()
        if current_time - created_at > TOKEN_EXPIRY
    ]
    
    for token in expired_tokens:
        del auth_tokens[token]
    
    if expired_tokens:
        log.debug(f"清理了 {len(expired_tokens)} 个过期的认证令牌")

def invalidate_auth_token(token: str):
    """使认证令牌失效"""
    if token in auth_tokens:
        del auth_tokens[token]


# 文件验证和处理功能 - 使用统一存储系统
def validate_credential_content(content: str) -> Dict[str, Any]:
    """验证凭证内容格式"""
    try:
        creds_data = json.loads(content)
        
        # 检查必要字段
        required_fields = ['client_id', 'client_secret', 'refresh_token', 'token_uri']
        missing_fields = [field for field in required_fields if field not in creds_data]
        
        if missing_fields:
            return {
                'valid': False,
                'error': f'缺少必要字段: {", ".join(missing_fields)}'
            }
        
        # 检查project_id
        if 'project_id' not in creds_data:
            log.warning("认证文件缺少project_id字段")
        
        return {
            'valid': True,
            'data': creds_data
        }
        
    except json.JSONDecodeError as e:
        return {
            'valid': False,
            'error': f'JSON格式错误: {str(e)}'
        }
    except Exception as e:
        return {
            'valid': False,
            'error': f'文件验证失败: {str(e)}'
        }


async def save_uploaded_credential(content: str, original_filename: str) -> Dict[str, Any]:
    """通过统一存储系统保存上传的凭证"""
    try:
        # 验证内容格式
        validation = validate_credential_content(content)
        if not validation['valid']:
            return {
                'success': False,
                'error': validation['error']
            }
        
        creds_data = validation['data']
        
        # 生成文件名
        project_id = creds_data.get('project_id', 'unknown')
        timestamp = int(time.time())
        
        # 从原文件名中提取有用信息
        import os
        base_name = os.path.splitext(original_filename)[0]
        filename = f"{base_name}-{timestamp}.json"
        
        # 通过存储适配器保存
        storage_adapter = await get_storage_adapter()
        success = await storage_adapter.store_credential(filename, creds_data)
        
        if success:
            log.info(f"凭证文件已上传保存: {filename}")
            return {
                'success': True,
                'file_path': filename,
                'project_id': project_id
            }
        else:
            return {
                'success': False,
                'error': '保存到存储系统失败'
            }
        
    except Exception as e:
        log.error(f"保存上传文件失败: {e}")
        return {
            'success': False,
            'error': str(e)
        }


async def batch_upload_credentials(files_data: List[Dict[str, str]]) -> Dict[str, Any]:
    """批量上传凭证文件到统一存储系统"""
    results = []
    success_count = 0
    
    for file_data in files_data:
        filename = file_data.get('filename', 'unknown.json')
        content = file_data.get('content', '')
        
        result = await save_uploaded_credential(content, filename)
        result['filename'] = filename
        results.append(result)
        
        if result['success']:
            success_count += 1
    
    return {
        'uploaded_count': success_count,
        'total_count': len(files_data),
        'results': results
    }


# 环境变量批量导入功能 - 使用统一存储系统
async def load_credentials_from_env() -> Dict[str, Any]:
    """
    从环境变量加载多个凭证文件到统一存储系统
    支持两种环境变量格式:
    1. GCLI_CREDS_1, GCLI_CREDS_2, ... (编号格式)
    2. GCLI_CREDS_projectname1, GCLI_CREDS_projectname2, ... (项目名格式)
    """
    import os
    
    results = []
    success_count = 0
    
    log.info("开始从环境变量加载认证凭证...")
    
    # 获取所有以GCLI_CREDS_开头的环境变量
    creds_env_vars = {key: value for key, value in os.environ.items() 
                      if key.startswith('GCLI_CREDS_') and value.strip()}
    
    if not creds_env_vars:
        log.info("未找到GCLI_CREDS_*环境变量")
        return {
            'loaded_count': 0,
            'total_count': 0,
            'results': [],
            'message': '未找到GCLI_CREDS_*环境变量'
        }
    
    log.info(f"找到 {len(creds_env_vars)} 个凭证环境变量")
    
    # 获取存储适配器
    storage_adapter = await get_storage_adapter()
    
    for env_name, creds_content in creds_env_vars.items():
        # 从环境变量名提取标识符
        identifier = env_name.replace('GCLI_CREDS_', '')
        
        try:
            # 验证JSON格式
            validation = validate_credential_content(creds_content)
            if not validation['valid']:
                result = {
                    'env_name': env_name,
                    'identifier': identifier,
                    'success': False,
                    'error': validation['error']
                }
                results.append(result)
                log.error(f"环境变量 {env_name} 验证失败: {validation['error']}")
                continue
            
            creds_data = validation['data']
            project_id = creds_data.get('project_id', 'unknown')
            
            # 生成文件名 (使用标识符和项目ID)
            timestamp = int(time.time())
            if identifier.isdigit():
                # 如果标识符是数字,使用项目ID作为主要标识
                filename = f"env-{project_id}-{identifier}-{timestamp}.json"
            else:
                # 如果标识符是项目名,直接使用
                filename = f"env-{identifier}-{timestamp}.json"
            
            # 通过存储适配器保存
            success = await storage_adapter.store_credential(filename, creds_data)
            
            if success:
                result = {
                    'env_name': env_name,
                    'identifier': identifier,
                    'success': True,
                    'file_path': filename,
                    'project_id': project_id,
                    'filename': filename
                }
                results.append(result)
                success_count += 1
                
                log.info(f"成功从环境变量 {env_name} 保存凭证到: {filename}")
            else:
                result = {
                    'env_name': env_name,
                    'identifier': identifier,
                    'success': False,
                    'error': '保存到存储系统失败'
                }
                results.append(result)
                log.error(f"环境变量 {env_name} 保存失败")
            
        except Exception as e:
            result = {
                'env_name': env_name,
                'identifier': identifier,
                'success': False,
                'error': str(e)
            }
            results.append(result)
            log.error(f"处理环境变量 {env_name} 时发生错误: {e}")
    
    message = f"成功导入 {success_count}/{len(creds_env_vars)} 个凭证文件"
    log.info(message)
    
    return {
        'loaded_count': success_count,
        'total_count': len(creds_env_vars),
        'results': results,
        'message': message
    }


async def auto_load_env_credentials_on_startup() -> None:
    """
    程序启动时自动从环境变量加载凭证到统一存储系统
    如果设置了 AUTO_LOAD_ENV_CREDS=true,则会自动执行
    """
    from config import get_auto_load_env_creds
    auto_load = await get_auto_load_env_creds()
    
    if not auto_load:
        log.debug("AUTO_LOAD_ENV_CREDS未启用,跳过自动加载")
        return
    
    log.info("AUTO_LOAD_ENV_CREDS已启用,开始自动加载环境变量中的凭证...")
    
    try:
        result = await load_credentials_from_env()
        if result['loaded_count'] > 0:
            log.info(f"启动时成功自动导入 {result['loaded_count']} 个凭证文件")
        else:
            log.info("启动时未找到可导入的环境变量凭证")
    except Exception as e:
        log.error(f"启动时自动加载环境变量凭证失败: {e}")


async def clear_env_credentials() -> Dict[str, Any]:
    """
    清除所有从环境变量导入的凭证文件
    仅删除文件名包含'env-'前缀的文件
    """
    try:
        storage_adapter = await get_storage_adapter()
        
        # 获取所有凭证
        all_credentials = await storage_adapter.list_credentials()
        
        deleted_files = []
        deleted_count = 0
        
        for credential_name in all_credentials:
            if credential_name.startswith('env-') and credential_name.endswith('.json'):
                try:
                    success = await storage_adapter.delete_credential(credential_name)
                    if success:
                        deleted_files.append(credential_name)
                        deleted_count += 1
                        log.info(f"删除环境变量凭证文件: {credential_name}")
                    else:
                        log.error(f"删除文件 {credential_name} 失败")
                except Exception as e:
                    log.error(f"删除文件 {credential_name} 失败: {e}")
        
        message = f"成功删除 {deleted_count} 个环境变量凭证文件"
        log.info(message)
        
        return {
            'deleted_count': deleted_count,
            'deleted_files': deleted_files,
            'message': message
        }
        
    except Exception as e:
        error_message = f"清除环境变量凭证文件时发生错误: {e}"
        log.error(error_message)
        return {
            'deleted_count': 0,
            'error': error_message
        }