File size: 26,615 Bytes
6dfddfb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
"""
数据库服务模块
"""

import asyncio
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Union

from sqlalchemy import asc, delete, desc, func, insert, select, update

from app.database.connection import database
from app.database.models import ErrorLog, FileRecord, FileState, RequestLog, Settings
from app.log.logger import get_database_logger
from app.utils.helpers import redact_key_for_logging

logger = get_database_logger()


async def get_all_settings() -> List[Dict[str, Any]]:
    """
    获取所有设置

    Returns:
        List[Dict[str, Any]]: 设置列表
    """
    try:
        query = select(Settings)
        result = await database.fetch_all(query)
        return [dict(row) for row in result]
    except Exception as e:
        logger.error(f"Failed to get all settings: {str(e)}")
        raise


async def get_setting(key: str) -> Optional[Dict[str, Any]]:
    """
    获取指定键的设置

    Args:
        key: 设置键名

    Returns:
        Optional[Dict[str, Any]]: 设置信息,如果不存在则返回None
    """
    try:
        query = select(Settings).where(Settings.key == key)
        result = await database.fetch_one(query)
        return dict(result) if result else None
    except Exception as e:
        logger.error(f"Failed to get setting {key}: {str(e)}")
        raise


async def update_setting(
    key: str, value: str, description: Optional[str] = None
) -> bool:
    """
    更新设置

    Args:
        key: 设置键名
        value: 设置值
        description: 设置描述

    Returns:
        bool: 是否更新成功
    """
    try:
        # 检查设置是否存在
        setting = await get_setting(key)

        if setting:
            # 更新设置
            query = (
                update(Settings)
                .where(Settings.key == key)
                .values(
                    value=value,
                    description=description if description else setting["description"],
                    updated_at=datetime.now(),
                )
            )
            await database.execute(query)
            logger.info(f"Updated setting: {key}")
            return True
        else:
            # 插入设置
            query = insert(Settings).values(
                key=key,
                value=value,
                description=description,
                created_at=datetime.now(),
                updated_at=datetime.now(),
            )
            await database.execute(query)
            logger.info(f"Inserted setting: {key}")
            return True
    except Exception as e:
        logger.error(f"Failed to update setting {key}: {str(e)}")
        return False


async def add_error_log(
    gemini_key: Optional[str] = None,
    model_name: Optional[str] = None,
    error_type: Optional[str] = None,
    error_log: Optional[str] = None,
    error_code: Optional[int] = None,
    request_msg: Optional[Union[Dict[str, Any], str]] = None,
    request_datetime: Optional[datetime] = None,
) -> bool:
    """
    添加错误日志

    Args:
        gemini_key: Gemini API密钥
        error_log: 错误日志
        error_code: 错误代码 (例如 HTTP 状态码)
        request_msg: 请求消息

    Returns:
        bool: 是否添加成功
    """
    try:
        if request_msg is None:
            request_msg_json = None
        else:
            # 如果request_msg是字典,则转换为JSON字符串
            if isinstance(request_msg, dict):
                request_msg_json = request_msg
            elif isinstance(request_msg, str):
                try:
                    request_msg_json = json.loads(request_msg)
                except json.JSONDecodeError:
                    request_msg_json = {"message": request_msg}
            else:
                request_msg_json = None

        # 插入错误日志
        query = insert(ErrorLog).values(
            gemini_key=gemini_key,
            error_type=error_type,
            error_log=error_log,
            model_name=model_name,
            error_code=error_code,
            request_msg=request_msg_json,
            request_time=(request_datetime if request_datetime else datetime.now()),
        )
        await database.execute(query)
        logger.info(f"Added error log for key: {redact_key_for_logging(gemini_key)}")
        return True
    except Exception as e:
        logger.error(f"Failed to add error log: {str(e)}")
        return False


async def get_error_logs(
    limit: int = 20,
    offset: int = 0,
    key_search: Optional[str] = None,
    error_search: Optional[str] = None,
    error_code_search: Optional[str] = None,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    sort_by: str = "id",
    sort_order: str = "desc",
) -> List[Dict[str, Any]]:
    """
    获取错误日志,支持搜索、日期过滤和排序

    Args:
        limit (int): 限制数量
        offset (int): 偏移量
        key_search (Optional[str]): Gemini密钥搜索词 (模糊匹配)
        error_search (Optional[str]): 错误类型或日志内容搜索词 (模糊匹配)
        error_code_search (Optional[str]): 错误码搜索词 (精确匹配)
        start_date (Optional[datetime]): 开始日期时间
        end_date (Optional[datetime]): 结束日期时间
        sort_by (str): 排序字段 (例如 'id', 'request_time')
        sort_order (str): 排序顺序 ('asc' or 'desc')

    Returns:
        List[Dict[str, Any]]: 错误日志列表
    """
    try:
        query = select(
            ErrorLog.id,
            ErrorLog.gemini_key,
            ErrorLog.model_name,
            ErrorLog.error_type,
            ErrorLog.error_log,
            ErrorLog.error_code,
            ErrorLog.request_time,
        )

        if key_search:
            query = query.where(ErrorLog.gemini_key.ilike(f"%{key_search}%"))
        if error_search:
            query = query.where(
                (ErrorLog.error_type.ilike(f"%{error_search}%"))
                | (ErrorLog.error_log.ilike(f"%{error_search}%"))
            )
        if start_date:
            query = query.where(ErrorLog.request_time >= start_date)
        if end_date:
            query = query.where(ErrorLog.request_time < end_date)
        if error_code_search:
            try:
                error_code_int = int(error_code_search)
                query = query.where(ErrorLog.error_code == error_code_int)
            except ValueError:
                logger.warning(
                    f"Invalid format for error_code_search: '{error_code_search}'. Expected an integer. Skipping error code filter."
                )

        sort_column = getattr(ErrorLog, sort_by, ErrorLog.id)
        if sort_order.lower() == "asc":
            query = query.order_by(asc(sort_column))
        else:
            query = query.order_by(desc(sort_column))

        query = query.limit(limit).offset(offset)

        result = await database.fetch_all(query)
        return [dict(row) for row in result]
    except Exception as e:
        logger.exception(f"Failed to get error logs with filters: {str(e)}")
        raise


async def get_error_logs_count(
    key_search: Optional[str] = None,
    error_search: Optional[str] = None,
    error_code_search: Optional[str] = None,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
) -> int:
    """
    获取符合条件的错误日志总数

    Args:
        key_search (Optional[str]): Gemini密钥搜索词 (模糊匹配)
        error_search (Optional[str]): 错误类型或日志内容搜索词 (模糊匹配)
        error_code_search (Optional[str]): 错误码搜索词 (精确匹配)
        start_date (Optional[datetime]): 开始日期时间
        end_date (Optional[datetime]): 结束日期时间

    Returns:
        int: 日志总数
    """
    try:
        query = select(func.count()).select_from(ErrorLog)

        if key_search:
            query = query.where(ErrorLog.gemini_key.ilike(f"%{key_search}%"))
        if error_search:
            query = query.where(
                (ErrorLog.error_type.ilike(f"%{error_search}%"))
                | (ErrorLog.error_log.ilike(f"%{error_search}%"))
            )
        if start_date:
            query = query.where(ErrorLog.request_time >= start_date)
        if end_date:
            query = query.where(ErrorLog.request_time < end_date)
        if error_code_search:
            try:
                error_code_int = int(error_code_search)
                query = query.where(ErrorLog.error_code == error_code_int)
            except ValueError:
                logger.warning(
                    f"Invalid format for error_code_search in count: '{error_code_search}'. Expected an integer. Skipping error code filter."
                )

        count_result = await database.fetch_one(query)
        return count_result[0] if count_result else 0
    except Exception as e:
        logger.exception(f"Failed to count error logs with filters: {str(e)}")
        raise


# 新增函数:获取单条错误日志详情
async def get_error_log_details(log_id: int) -> Optional[Dict[str, Any]]:
    """
    根据 ID 获取单个错误日志的详细信息

    Args:
        log_id (int): 错误日志的 ID

    Returns:
        Optional[Dict[str, Any]]: 包含日志详细信息的字典,如果未找到则返回 None
    """
    try:
        query = select(ErrorLog).where(ErrorLog.id == log_id)
        result = await database.fetch_one(query)
        if result:
            # 将 request_msg (JSONB) 转换为字符串以便在 API 中返回
            log_dict = dict(result)
            if "request_msg" in log_dict and log_dict["request_msg"] is not None:
                # 确保即使是 None 或非 JSON 数据也能处理
                try:
                    log_dict["request_msg"] = json.dumps(
                        log_dict["request_msg"], ensure_ascii=False, indent=2
                    )
                except TypeError:
                    log_dict["request_msg"] = str(log_dict["request_msg"])
            return log_dict
        else:
            return None
    except Exception as e:
        logger.exception(f"Failed to get error log details for ID {log_id}: {str(e)}")
        raise


# 新增函数:通过 gemini_key / error_code / 时间窗口 查找最接近的错误日志
async def find_error_log_by_info(
    gemini_key: str,
    timestamp: datetime,
    status_code: Optional[int] = None,
    window_seconds: int = 1,
) -> Optional[Dict[str, Any]]:
    """
    在给定时间窗口内,根据 gemini_key(精确匹配)及可选的 status_code 查找最接近 timestamp 的错误日志。

    假设错误日志的 error_code 存储的是 HTTP 状态码或等价错误码。

    Args:
        gemini_key: 完整的 Gemini key 字符串。
        timestamp: 目标时间(UTC 或本地,与存储一致)。
        status_code: 可选的错误码,若提供则优先匹配该错误码。
        window_seconds: 允许的时间偏差窗口,单位秒,默认为 1 秒。

    Returns:
        Optional[Dict[str, Any]]: 最匹配的一条错误日志的完整详情(字段与 get_error_log_details 一致),若未找到则返回 None。
    """
    try:
        start_time = timestamp - timedelta(seconds=window_seconds)
        end_time = timestamp + timedelta(seconds=window_seconds)

        base_query = select(ErrorLog).where(
            ErrorLog.gemini_key == gemini_key,
            ErrorLog.request_time >= start_time,
            ErrorLog.request_time <= end_time,
        )

        # 若提供了状态码,先尝试按状态码过滤
        if status_code is not None:
            query = base_query.where(ErrorLog.error_code == status_code).order_by(
                ErrorLog.request_time.desc()
            )
            candidates = await database.fetch_all(query)
            if not candidates:
                # 回退:不按状态码,仅按时间窗口
                query2 = base_query.order_by(ErrorLog.request_time.desc())
                candidates = await database.fetch_all(query2)
        else:
            query = base_query.order_by(ErrorLog.request_time.desc())
            candidates = await database.fetch_all(query)

        if not candidates:
            return None

        # 在 Python 中选择与 timestamp 最接近的一条
        def _to_dict(row: Any) -> Dict[str, Any]:
            d = dict(row)
            if "request_msg" in d and d["request_msg"] is not None:
                try:
                    d["request_msg"] = json.dumps(
                        d["request_msg"], ensure_ascii=False, indent=2
                    )
                except TypeError:
                    d["request_msg"] = str(d["request_msg"])
            return d

        best = min(
            candidates,
            key=lambda r: abs((r["request_time"] - timestamp).total_seconds()),
        )
        return _to_dict(best)
    except Exception as e:
        logger.exception(
            f"Failed to find error log by info (key=***{gemini_key[-4:] if gemini_key else ''}, code={status_code}, ts={timestamp}, window={window_seconds}s): {str(e)}"
        )
        raise


async def delete_error_logs_by_ids(log_ids: List[int]) -> int:
    """
    根据提供的 ID 列表批量删除错误日志 (异步)。

    Args:
        log_ids: 要删除的错误日志 ID 列表。

    Returns:
        int: 实际删除的日志数量。
    """
    if not log_ids:
        return 0
    try:
        # 使用 databases 执行删除
        query = delete(ErrorLog).where(ErrorLog.id.in_(log_ids))
        # execute 返回受影响的行数,但 databases 库的 execute 不直接返回 rowcount
        # 我们需要先查询是否存在,或者依赖数据库约束/触发器(如果适用)
        # 或者,我们可以执行删除并假设成功,除非抛出异常
        # 为了简单起见,我们执行删除并记录日志,不精确返回删除数量
        # 如果需要精确数量,需要先执行 SELECT COUNT(*)
        await database.execute(query)
        # 注意:databases 的 execute 不返回 rowcount,所以我们不能直接返回删除的数量
        # 返回 log_ids 的长度作为尝试删除的数量,或者返回 0/1 表示操作尝试
        logger.info(f"Attempted bulk deletion for error logs with IDs: {log_ids}")
        return len(log_ids)  # 返回尝试删除的数量
    except Exception as e:
        # 数据库连接或执行错误
        logger.error(
            f"Error during bulk deletion of error logs {log_ids}: {e}", exc_info=True
        )
        raise


async def delete_error_log_by_id(log_id: int) -> bool:
    """
    根据 ID 删除单个错误日志 (异步)。

    Args:
        log_id: 要删除的错误日志 ID。

    Returns:
        bool: 如果成功删除返回 True,否则返回 False。
    """
    try:
        # 先检查是否存在 (可选,但更明确)
        check_query = select(ErrorLog.id).where(ErrorLog.id == log_id)
        exists = await database.fetch_one(check_query)

        if not exists:
            logger.warning(
                f"Attempted to delete non-existent error log with ID: {log_id}"
            )
            return False

        # 执行删除
        delete_query = delete(ErrorLog).where(ErrorLog.id == log_id)
        await database.execute(delete_query)
        logger.info(f"Successfully deleted error log with ID: {log_id}")
        return True
    except Exception as e:
        logger.error(f"Error deleting error log with ID {log_id}: {e}", exc_info=True)
        raise


async def delete_all_error_logs() -> int:
    """
    分批删除所有错误日志,以避免大数据量下的超时和性能问题。

    Returns:
        int: 被删除的错误日志总数。
    """
    total_deleted_count = 0
    # SQLite 对 SQL 参数数量有上限(常见为 999),IN 子句中过多参数会报错
    # 统一使用 500,兼容 SQLite/MySQL,必要时可在配置中暴露该值
    batch_size = 200

    try:
        while True:
            # 1) 读取一批待删除的ID,仅选择ID列以提升效率
            id_query = select(ErrorLog.id).order_by(ErrorLog.id).limit(batch_size)
            rows = await database.fetch_all(id_query)
            if not rows:
                break

            ids = [row["id"] for row in rows]

            # 2) 按ID批量删除
            delete_query = delete(ErrorLog).where(ErrorLog.id.in_(ids))
            await database.execute(delete_query)

            deleted_in_batch = len(ids)
            total_deleted_count += deleted_in_batch

            logger.debug(f"Deleted a batch of {deleted_in_batch} error logs.")

            # 若不足一个批次,说明已删除完成
            if deleted_in_batch < batch_size:
                break

            # 3) 将控制权交还事件循环,缓解长时间占用
            await asyncio.sleep(0)

        logger.info(
            f"Successfully deleted all error logs in batches. Total deleted: {total_deleted_count}"
        )
        return total_deleted_count
    except Exception as e:
        logger.error(
            f"Failed to delete all error logs in batches: {str(e)}", exc_info=True
        )
        raise


# 新增函数:添加请求日志
async def add_request_log(
    model_name: Optional[str],
    api_key: Optional[str],
    is_success: bool,
    status_code: Optional[int] = None,
    latency_ms: Optional[int] = None,
    request_time: Optional[datetime] = None,
) -> bool:
    """
    添加 API 请求日志

    Args:
        model_name: 模型名称
        api_key: 使用的 API 密钥
        is_success: 请求是否成功
        status_code: API 响应状态码
        latency_ms: 请求耗时(毫秒)
        request_time: 请求发生时间 (如果为 None, 则使用当前时间)

    Returns:
        bool: 是否添加成功
    """
    try:
        log_time = request_time if request_time else datetime.now()

        query = insert(RequestLog).values(
            request_time=log_time,
            model_name=model_name,
            api_key=api_key,
            is_success=is_success,
            status_code=status_code,
            latency_ms=latency_ms,
        )
        await database.execute(query)
        return True
    except Exception as e:
        logger.error(f"Failed to add request log: {str(e)}")
        return False


# ==================== 文件记录相关函数 ====================


async def create_file_record(
    name: str,
    mime_type: str,
    size_bytes: int,
    api_key: str,
    uri: str,
    create_time: datetime,
    update_time: datetime,
    expiration_time: datetime,
    state: FileState = FileState.PROCESSING,
    display_name: Optional[str] = None,
    sha256_hash: Optional[str] = None,
    upload_url: Optional[str] = None,
    user_token: Optional[str] = None,
) -> Dict[str, Any]:
    """
    创建文件记录

    Args:
        name: 文件名称(格式: files/{file_id})
        mime_type: MIME 类型
        size_bytes: 文件大小(字节)
        api_key: 上传时使用的 API Key
        uri: 文件访问 URI
        create_time: 创建时间
        update_time: 更新时间
        expiration_time: 过期时间
        display_name: 显示名称
        sha256_hash: SHA256 哈希值
        upload_url: 临时上传 URL
        user_token: 上传用户的 token

    Returns:
        Dict[str, Any]: 创建的文件记录
    """
    try:
        query = insert(FileRecord).values(
            name=name,
            display_name=display_name,
            mime_type=mime_type,
            size_bytes=size_bytes,
            sha256_hash=sha256_hash,
            state=state,
            create_time=create_time,
            update_time=update_time,
            expiration_time=expiration_time,
            uri=uri,
            api_key=api_key,
            upload_url=upload_url,
            user_token=user_token,
        )
        await database.execute(query)

        # 返回创建的记录
        return await get_file_record_by_name(name)
    except Exception as e:
        logger.error(f"Failed to create file record: {str(e)}")
        raise


async def get_file_record_by_name(name: str) -> Optional[Dict[str, Any]]:
    """
    根据文件名获取文件记录

    Args:
        name: 文件名称(格式: files/{file_id})

    Returns:
        Optional[Dict[str, Any]]: 文件记录,如果不存在则返回 None
    """
    try:
        query = select(FileRecord).where(FileRecord.name == name)
        result = await database.fetch_one(query)
        return dict(result) if result else None
    except Exception as e:
        logger.error(f"Failed to get file record by name {name}: {str(e)}")
        raise


async def update_file_record_state(
    file_name: str,
    state: FileState,
    update_time: Optional[datetime] = None,
    upload_completed: Optional[datetime] = None,
    sha256_hash: Optional[str] = None,
) -> bool:
    """
    更新文件记录状态

    Args:
        file_name: 文件名
        state: 新状态
        update_time: 更新时间
        upload_completed: 上传完成时间
        sha256_hash: SHA256 哈希值

    Returns:
        bool: 是否更新成功
    """
    try:
        values = {"state": state}
        if update_time:
            values["update_time"] = update_time
        if upload_completed:
            values["upload_completed"] = upload_completed
        if sha256_hash:
            values["sha256_hash"] = sha256_hash

        query = update(FileRecord).where(FileRecord.name == file_name).values(**values)
        result = await database.execute(query)

        if result:
            logger.info(f"Updated file record state for {file_name} to {state}")
            return True

        logger.warning(f"File record not found for update: {file_name}")
        return False
    except Exception as e:
        logger.error(f"Failed to update file record state: {str(e)}")
        return False


async def list_file_records(
    user_token: Optional[str] = None,
    api_key: Optional[str] = None,
    page_size: int = 10,
    page_token: Optional[str] = None,
) -> tuple[List[Dict[str, Any]], Optional[str]]:
    """
    列出文件记录

    Args:
        user_token: 用户 token(如果提供,只返回该用户的文件)
        api_key: API Key(如果提供,只返回使用该 key 的文件)
        page_size: 每页大小
        page_token: 分页标记(偏移量)

    Returns:
        tuple[List[Dict[str, Any]], Optional[str]]: (文件列表, 下一页标记)
    """
    try:
        logger.debug(
            f"list_file_records called with page_size={page_size}, page_token={page_token}"
        )
        query = select(FileRecord).where(
            FileRecord.expiration_time > datetime.now(timezone.utc)
        )

        if user_token:
            query = query.where(FileRecord.user_token == user_token)
        if api_key:
            query = query.where(FileRecord.api_key == api_key)

        # 使用偏移量进行分页
        offset = 0
        if page_token:
            try:
                offset = int(page_token)
            except ValueError:
                logger.warning(f"Invalid page token: {page_token}")
                offset = 0

        # 按ID升序排列,使用 OFFSET 和 LIMIT
        query = query.order_by(FileRecord.id).offset(offset).limit(page_size + 1)

        results = await database.fetch_all(query)

        logger.debug(f"Query returned {len(results)} records")
        if results:
            logger.debug(
                f"First record ID: {results[0]['id']}, Last record ID: {results[-1]['id']}"
            )

        # 处理分页
        has_next = len(results) > page_size
        if has_next:
            results = results[:page_size]
            # 下一页的偏移量是当前偏移量加上本页返回的记录数
            next_offset = offset + page_size
            next_page_token = str(next_offset)
            logger.debug(
                f"Has next page, offset={offset}, page_size={page_size}, next_page_token={next_page_token}"
            )
        else:
            next_page_token = None
            logger.debug(f"No next page, returning {len(results)} results")

        return [dict(row) for row in results], next_page_token
    except Exception as e:
        logger.error(f"Failed to list file records: {str(e)}")
        raise


async def delete_file_record(name: str) -> bool:
    """
    删除文件记录

    Args:
        name: 文件名称

    Returns:
        bool: 是否删除成功
    """
    try:
        query = delete(FileRecord).where(FileRecord.name == name)
        await database.execute(query)
        return True
    except Exception as e:
        logger.error(f"Failed to delete file record: {str(e)}")
        return False


async def delete_expired_file_records() -> List[Dict[str, Any]]:
    """
    删除已过期的文件记录

    Returns:
        List[Dict[str, Any]]: 删除的记录列表
    """
    try:
        # 先获取要删除的记录
        query = select(FileRecord).where(
            FileRecord.expiration_time <= datetime.now(timezone.utc)
        )
        expired_records = await database.fetch_all(query)

        if not expired_records:
            return []

        # 执行删除
        delete_query = delete(FileRecord).where(
            FileRecord.expiration_time <= datetime.now(timezone.utc)
        )
        await database.execute(delete_query)

        logger.info(f"Deleted {len(expired_records)} expired file records")
        return [dict(record) for record in expired_records]
    except Exception as e:
        logger.error(f"Failed to delete expired file records: {str(e)}")
        raise


async def get_file_api_key(name: str) -> Optional[str]:
    """
    获取文件对应的 API Key

    Args:
        name: 文件名称

    Returns:
        Optional[str]: API Key,如果文件不存在或已过期则返回 None
    """
    try:
        query = select(FileRecord.api_key).where(
            (FileRecord.name == name)
            & (FileRecord.expiration_time > datetime.now(timezone.utc))
        )
        result = await database.fetch_one(query)
        return result["api_key"] if result else None
    except Exception as e:
        logger.error(f"Failed to get file API key: {str(e)}")
        raise