File size: 26,615 Bytes
6dfddfb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 |
"""
数据库服务模块
"""
import asyncio
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Union
from sqlalchemy import asc, delete, desc, func, insert, select, update
from app.database.connection import database
from app.database.models import ErrorLog, FileRecord, FileState, RequestLog, Settings
from app.log.logger import get_database_logger
from app.utils.helpers import redact_key_for_logging
logger = get_database_logger()
async def get_all_settings() -> List[Dict[str, Any]]:
"""
获取所有设置
Returns:
List[Dict[str, Any]]: 设置列表
"""
try:
query = select(Settings)
result = await database.fetch_all(query)
return [dict(row) for row in result]
except Exception as e:
logger.error(f"Failed to get all settings: {str(e)}")
raise
async def get_setting(key: str) -> Optional[Dict[str, Any]]:
"""
获取指定键的设置
Args:
key: 设置键名
Returns:
Optional[Dict[str, Any]]: 设置信息,如果不存在则返回None
"""
try:
query = select(Settings).where(Settings.key == key)
result = await database.fetch_one(query)
return dict(result) if result else None
except Exception as e:
logger.error(f"Failed to get setting {key}: {str(e)}")
raise
async def update_setting(
key: str, value: str, description: Optional[str] = None
) -> bool:
"""
更新设置
Args:
key: 设置键名
value: 设置值
description: 设置描述
Returns:
bool: 是否更新成功
"""
try:
# 检查设置是否存在
setting = await get_setting(key)
if setting:
# 更新设置
query = (
update(Settings)
.where(Settings.key == key)
.values(
value=value,
description=description if description else setting["description"],
updated_at=datetime.now(),
)
)
await database.execute(query)
logger.info(f"Updated setting: {key}")
return True
else:
# 插入设置
query = insert(Settings).values(
key=key,
value=value,
description=description,
created_at=datetime.now(),
updated_at=datetime.now(),
)
await database.execute(query)
logger.info(f"Inserted setting: {key}")
return True
except Exception as e:
logger.error(f"Failed to update setting {key}: {str(e)}")
return False
async def add_error_log(
gemini_key: Optional[str] = None,
model_name: Optional[str] = None,
error_type: Optional[str] = None,
error_log: Optional[str] = None,
error_code: Optional[int] = None,
request_msg: Optional[Union[Dict[str, Any], str]] = None,
request_datetime: Optional[datetime] = None,
) -> bool:
"""
添加错误日志
Args:
gemini_key: Gemini API密钥
error_log: 错误日志
error_code: 错误代码 (例如 HTTP 状态码)
request_msg: 请求消息
Returns:
bool: 是否添加成功
"""
try:
if request_msg is None:
request_msg_json = None
else:
# 如果request_msg是字典,则转换为JSON字符串
if isinstance(request_msg, dict):
request_msg_json = request_msg
elif isinstance(request_msg, str):
try:
request_msg_json = json.loads(request_msg)
except json.JSONDecodeError:
request_msg_json = {"message": request_msg}
else:
request_msg_json = None
# 插入错误日志
query = insert(ErrorLog).values(
gemini_key=gemini_key,
error_type=error_type,
error_log=error_log,
model_name=model_name,
error_code=error_code,
request_msg=request_msg_json,
request_time=(request_datetime if request_datetime else datetime.now()),
)
await database.execute(query)
logger.info(f"Added error log for key: {redact_key_for_logging(gemini_key)}")
return True
except Exception as e:
logger.error(f"Failed to add error log: {str(e)}")
return False
async def get_error_logs(
limit: int = 20,
offset: int = 0,
key_search: Optional[str] = None,
error_search: Optional[str] = None,
error_code_search: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
sort_by: str = "id",
sort_order: str = "desc",
) -> List[Dict[str, Any]]:
"""
获取错误日志,支持搜索、日期过滤和排序
Args:
limit (int): 限制数量
offset (int): 偏移量
key_search (Optional[str]): Gemini密钥搜索词 (模糊匹配)
error_search (Optional[str]): 错误类型或日志内容搜索词 (模糊匹配)
error_code_search (Optional[str]): 错误码搜索词 (精确匹配)
start_date (Optional[datetime]): 开始日期时间
end_date (Optional[datetime]): 结束日期时间
sort_by (str): 排序字段 (例如 'id', 'request_time')
sort_order (str): 排序顺序 ('asc' or 'desc')
Returns:
List[Dict[str, Any]]: 错误日志列表
"""
try:
query = select(
ErrorLog.id,
ErrorLog.gemini_key,
ErrorLog.model_name,
ErrorLog.error_type,
ErrorLog.error_log,
ErrorLog.error_code,
ErrorLog.request_time,
)
if key_search:
query = query.where(ErrorLog.gemini_key.ilike(f"%{key_search}%"))
if error_search:
query = query.where(
(ErrorLog.error_type.ilike(f"%{error_search}%"))
| (ErrorLog.error_log.ilike(f"%{error_search}%"))
)
if start_date:
query = query.where(ErrorLog.request_time >= start_date)
if end_date:
query = query.where(ErrorLog.request_time < end_date)
if error_code_search:
try:
error_code_int = int(error_code_search)
query = query.where(ErrorLog.error_code == error_code_int)
except ValueError:
logger.warning(
f"Invalid format for error_code_search: '{error_code_search}'. Expected an integer. Skipping error code filter."
)
sort_column = getattr(ErrorLog, sort_by, ErrorLog.id)
if sort_order.lower() == "asc":
query = query.order_by(asc(sort_column))
else:
query = query.order_by(desc(sort_column))
query = query.limit(limit).offset(offset)
result = await database.fetch_all(query)
return [dict(row) for row in result]
except Exception as e:
logger.exception(f"Failed to get error logs with filters: {str(e)}")
raise
async def get_error_logs_count(
key_search: Optional[str] = None,
error_search: Optional[str] = None,
error_code_search: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> int:
"""
获取符合条件的错误日志总数
Args:
key_search (Optional[str]): Gemini密钥搜索词 (模糊匹配)
error_search (Optional[str]): 错误类型或日志内容搜索词 (模糊匹配)
error_code_search (Optional[str]): 错误码搜索词 (精确匹配)
start_date (Optional[datetime]): 开始日期时间
end_date (Optional[datetime]): 结束日期时间
Returns:
int: 日志总数
"""
try:
query = select(func.count()).select_from(ErrorLog)
if key_search:
query = query.where(ErrorLog.gemini_key.ilike(f"%{key_search}%"))
if error_search:
query = query.where(
(ErrorLog.error_type.ilike(f"%{error_search}%"))
| (ErrorLog.error_log.ilike(f"%{error_search}%"))
)
if start_date:
query = query.where(ErrorLog.request_time >= start_date)
if end_date:
query = query.where(ErrorLog.request_time < end_date)
if error_code_search:
try:
error_code_int = int(error_code_search)
query = query.where(ErrorLog.error_code == error_code_int)
except ValueError:
logger.warning(
f"Invalid format for error_code_search in count: '{error_code_search}'. Expected an integer. Skipping error code filter."
)
count_result = await database.fetch_one(query)
return count_result[0] if count_result else 0
except Exception as e:
logger.exception(f"Failed to count error logs with filters: {str(e)}")
raise
# 新增函数:获取单条错误日志详情
async def get_error_log_details(log_id: int) -> Optional[Dict[str, Any]]:
"""
根据 ID 获取单个错误日志的详细信息
Args:
log_id (int): 错误日志的 ID
Returns:
Optional[Dict[str, Any]]: 包含日志详细信息的字典,如果未找到则返回 None
"""
try:
query = select(ErrorLog).where(ErrorLog.id == log_id)
result = await database.fetch_one(query)
if result:
# 将 request_msg (JSONB) 转换为字符串以便在 API 中返回
log_dict = dict(result)
if "request_msg" in log_dict and log_dict["request_msg"] is not None:
# 确保即使是 None 或非 JSON 数据也能处理
try:
log_dict["request_msg"] = json.dumps(
log_dict["request_msg"], ensure_ascii=False, indent=2
)
except TypeError:
log_dict["request_msg"] = str(log_dict["request_msg"])
return log_dict
else:
return None
except Exception as e:
logger.exception(f"Failed to get error log details for ID {log_id}: {str(e)}")
raise
# 新增函数:通过 gemini_key / error_code / 时间窗口 查找最接近的错误日志
async def find_error_log_by_info(
gemini_key: str,
timestamp: datetime,
status_code: Optional[int] = None,
window_seconds: int = 1,
) -> Optional[Dict[str, Any]]:
"""
在给定时间窗口内,根据 gemini_key(精确匹配)及可选的 status_code 查找最接近 timestamp 的错误日志。
假设错误日志的 error_code 存储的是 HTTP 状态码或等价错误码。
Args:
gemini_key: 完整的 Gemini key 字符串。
timestamp: 目标时间(UTC 或本地,与存储一致)。
status_code: 可选的错误码,若提供则优先匹配该错误码。
window_seconds: 允许的时间偏差窗口,单位秒,默认为 1 秒。
Returns:
Optional[Dict[str, Any]]: 最匹配的一条错误日志的完整详情(字段与 get_error_log_details 一致),若未找到则返回 None。
"""
try:
start_time = timestamp - timedelta(seconds=window_seconds)
end_time = timestamp + timedelta(seconds=window_seconds)
base_query = select(ErrorLog).where(
ErrorLog.gemini_key == gemini_key,
ErrorLog.request_time >= start_time,
ErrorLog.request_time <= end_time,
)
# 若提供了状态码,先尝试按状态码过滤
if status_code is not None:
query = base_query.where(ErrorLog.error_code == status_code).order_by(
ErrorLog.request_time.desc()
)
candidates = await database.fetch_all(query)
if not candidates:
# 回退:不按状态码,仅按时间窗口
query2 = base_query.order_by(ErrorLog.request_time.desc())
candidates = await database.fetch_all(query2)
else:
query = base_query.order_by(ErrorLog.request_time.desc())
candidates = await database.fetch_all(query)
if not candidates:
return None
# 在 Python 中选择与 timestamp 最接近的一条
def _to_dict(row: Any) -> Dict[str, Any]:
d = dict(row)
if "request_msg" in d and d["request_msg"] is not None:
try:
d["request_msg"] = json.dumps(
d["request_msg"], ensure_ascii=False, indent=2
)
except TypeError:
d["request_msg"] = str(d["request_msg"])
return d
best = min(
candidates,
key=lambda r: abs((r["request_time"] - timestamp).total_seconds()),
)
return _to_dict(best)
except Exception as e:
logger.exception(
f"Failed to find error log by info (key=***{gemini_key[-4:] if gemini_key else ''}, code={status_code}, ts={timestamp}, window={window_seconds}s): {str(e)}"
)
raise
async def delete_error_logs_by_ids(log_ids: List[int]) -> int:
"""
根据提供的 ID 列表批量删除错误日志 (异步)。
Args:
log_ids: 要删除的错误日志 ID 列表。
Returns:
int: 实际删除的日志数量。
"""
if not log_ids:
return 0
try:
# 使用 databases 执行删除
query = delete(ErrorLog).where(ErrorLog.id.in_(log_ids))
# execute 返回受影响的行数,但 databases 库的 execute 不直接返回 rowcount
# 我们需要先查询是否存在,或者依赖数据库约束/触发器(如果适用)
# 或者,我们可以执行删除并假设成功,除非抛出异常
# 为了简单起见,我们执行删除并记录日志,不精确返回删除数量
# 如果需要精确数量,需要先执行 SELECT COUNT(*)
await database.execute(query)
# 注意:databases 的 execute 不返回 rowcount,所以我们不能直接返回删除的数量
# 返回 log_ids 的长度作为尝试删除的数量,或者返回 0/1 表示操作尝试
logger.info(f"Attempted bulk deletion for error logs with IDs: {log_ids}")
return len(log_ids) # 返回尝试删除的数量
except Exception as e:
# 数据库连接或执行错误
logger.error(
f"Error during bulk deletion of error logs {log_ids}: {e}", exc_info=True
)
raise
async def delete_error_log_by_id(log_id: int) -> bool:
"""
根据 ID 删除单个错误日志 (异步)。
Args:
log_id: 要删除的错误日志 ID。
Returns:
bool: 如果成功删除返回 True,否则返回 False。
"""
try:
# 先检查是否存在 (可选,但更明确)
check_query = select(ErrorLog.id).where(ErrorLog.id == log_id)
exists = await database.fetch_one(check_query)
if not exists:
logger.warning(
f"Attempted to delete non-existent error log with ID: {log_id}"
)
return False
# 执行删除
delete_query = delete(ErrorLog).where(ErrorLog.id == log_id)
await database.execute(delete_query)
logger.info(f"Successfully deleted error log with ID: {log_id}")
return True
except Exception as e:
logger.error(f"Error deleting error log with ID {log_id}: {e}", exc_info=True)
raise
async def delete_all_error_logs() -> int:
"""
分批删除所有错误日志,以避免大数据量下的超时和性能问题。
Returns:
int: 被删除的错误日志总数。
"""
total_deleted_count = 0
# SQLite 对 SQL 参数数量有上限(常见为 999),IN 子句中过多参数会报错
# 统一使用 500,兼容 SQLite/MySQL,必要时可在配置中暴露该值
batch_size = 200
try:
while True:
# 1) 读取一批待删除的ID,仅选择ID列以提升效率
id_query = select(ErrorLog.id).order_by(ErrorLog.id).limit(batch_size)
rows = await database.fetch_all(id_query)
if not rows:
break
ids = [row["id"] for row in rows]
# 2) 按ID批量删除
delete_query = delete(ErrorLog).where(ErrorLog.id.in_(ids))
await database.execute(delete_query)
deleted_in_batch = len(ids)
total_deleted_count += deleted_in_batch
logger.debug(f"Deleted a batch of {deleted_in_batch} error logs.")
# 若不足一个批次,说明已删除完成
if deleted_in_batch < batch_size:
break
# 3) 将控制权交还事件循环,缓解长时间占用
await asyncio.sleep(0)
logger.info(
f"Successfully deleted all error logs in batches. Total deleted: {total_deleted_count}"
)
return total_deleted_count
except Exception as e:
logger.error(
f"Failed to delete all error logs in batches: {str(e)}", exc_info=True
)
raise
# 新增函数:添加请求日志
async def add_request_log(
model_name: Optional[str],
api_key: Optional[str],
is_success: bool,
status_code: Optional[int] = None,
latency_ms: Optional[int] = None,
request_time: Optional[datetime] = None,
) -> bool:
"""
添加 API 请求日志
Args:
model_name: 模型名称
api_key: 使用的 API 密钥
is_success: 请求是否成功
status_code: API 响应状态码
latency_ms: 请求耗时(毫秒)
request_time: 请求发生时间 (如果为 None, 则使用当前时间)
Returns:
bool: 是否添加成功
"""
try:
log_time = request_time if request_time else datetime.now()
query = insert(RequestLog).values(
request_time=log_time,
model_name=model_name,
api_key=api_key,
is_success=is_success,
status_code=status_code,
latency_ms=latency_ms,
)
await database.execute(query)
return True
except Exception as e:
logger.error(f"Failed to add request log: {str(e)}")
return False
# ==================== 文件记录相关函数 ====================
async def create_file_record(
name: str,
mime_type: str,
size_bytes: int,
api_key: str,
uri: str,
create_time: datetime,
update_time: datetime,
expiration_time: datetime,
state: FileState = FileState.PROCESSING,
display_name: Optional[str] = None,
sha256_hash: Optional[str] = None,
upload_url: Optional[str] = None,
user_token: Optional[str] = None,
) -> Dict[str, Any]:
"""
创建文件记录
Args:
name: 文件名称(格式: files/{file_id})
mime_type: MIME 类型
size_bytes: 文件大小(字节)
api_key: 上传时使用的 API Key
uri: 文件访问 URI
create_time: 创建时间
update_time: 更新时间
expiration_time: 过期时间
display_name: 显示名称
sha256_hash: SHA256 哈希值
upload_url: 临时上传 URL
user_token: 上传用户的 token
Returns:
Dict[str, Any]: 创建的文件记录
"""
try:
query = insert(FileRecord).values(
name=name,
display_name=display_name,
mime_type=mime_type,
size_bytes=size_bytes,
sha256_hash=sha256_hash,
state=state,
create_time=create_time,
update_time=update_time,
expiration_time=expiration_time,
uri=uri,
api_key=api_key,
upload_url=upload_url,
user_token=user_token,
)
await database.execute(query)
# 返回创建的记录
return await get_file_record_by_name(name)
except Exception as e:
logger.error(f"Failed to create file record: {str(e)}")
raise
async def get_file_record_by_name(name: str) -> Optional[Dict[str, Any]]:
"""
根据文件名获取文件记录
Args:
name: 文件名称(格式: files/{file_id})
Returns:
Optional[Dict[str, Any]]: 文件记录,如果不存在则返回 None
"""
try:
query = select(FileRecord).where(FileRecord.name == name)
result = await database.fetch_one(query)
return dict(result) if result else None
except Exception as e:
logger.error(f"Failed to get file record by name {name}: {str(e)}")
raise
async def update_file_record_state(
file_name: str,
state: FileState,
update_time: Optional[datetime] = None,
upload_completed: Optional[datetime] = None,
sha256_hash: Optional[str] = None,
) -> bool:
"""
更新文件记录状态
Args:
file_name: 文件名
state: 新状态
update_time: 更新时间
upload_completed: 上传完成时间
sha256_hash: SHA256 哈希值
Returns:
bool: 是否更新成功
"""
try:
values = {"state": state}
if update_time:
values["update_time"] = update_time
if upload_completed:
values["upload_completed"] = upload_completed
if sha256_hash:
values["sha256_hash"] = sha256_hash
query = update(FileRecord).where(FileRecord.name == file_name).values(**values)
result = await database.execute(query)
if result:
logger.info(f"Updated file record state for {file_name} to {state}")
return True
logger.warning(f"File record not found for update: {file_name}")
return False
except Exception as e:
logger.error(f"Failed to update file record state: {str(e)}")
return False
async def list_file_records(
user_token: Optional[str] = None,
api_key: Optional[str] = None,
page_size: int = 10,
page_token: Optional[str] = None,
) -> tuple[List[Dict[str, Any]], Optional[str]]:
"""
列出文件记录
Args:
user_token: 用户 token(如果提供,只返回该用户的文件)
api_key: API Key(如果提供,只返回使用该 key 的文件)
page_size: 每页大小
page_token: 分页标记(偏移量)
Returns:
tuple[List[Dict[str, Any]], Optional[str]]: (文件列表, 下一页标记)
"""
try:
logger.debug(
f"list_file_records called with page_size={page_size}, page_token={page_token}"
)
query = select(FileRecord).where(
FileRecord.expiration_time > datetime.now(timezone.utc)
)
if user_token:
query = query.where(FileRecord.user_token == user_token)
if api_key:
query = query.where(FileRecord.api_key == api_key)
# 使用偏移量进行分页
offset = 0
if page_token:
try:
offset = int(page_token)
except ValueError:
logger.warning(f"Invalid page token: {page_token}")
offset = 0
# 按ID升序排列,使用 OFFSET 和 LIMIT
query = query.order_by(FileRecord.id).offset(offset).limit(page_size + 1)
results = await database.fetch_all(query)
logger.debug(f"Query returned {len(results)} records")
if results:
logger.debug(
f"First record ID: {results[0]['id']}, Last record ID: {results[-1]['id']}"
)
# 处理分页
has_next = len(results) > page_size
if has_next:
results = results[:page_size]
# 下一页的偏移量是当前偏移量加上本页返回的记录数
next_offset = offset + page_size
next_page_token = str(next_offset)
logger.debug(
f"Has next page, offset={offset}, page_size={page_size}, next_page_token={next_page_token}"
)
else:
next_page_token = None
logger.debug(f"No next page, returning {len(results)} results")
return [dict(row) for row in results], next_page_token
except Exception as e:
logger.error(f"Failed to list file records: {str(e)}")
raise
async def delete_file_record(name: str) -> bool:
"""
删除文件记录
Args:
name: 文件名称
Returns:
bool: 是否删除成功
"""
try:
query = delete(FileRecord).where(FileRecord.name == name)
await database.execute(query)
return True
except Exception as e:
logger.error(f"Failed to delete file record: {str(e)}")
return False
async def delete_expired_file_records() -> List[Dict[str, Any]]:
"""
删除已过期的文件记录
Returns:
List[Dict[str, Any]]: 删除的记录列表
"""
try:
# 先获取要删除的记录
query = select(FileRecord).where(
FileRecord.expiration_time <= datetime.now(timezone.utc)
)
expired_records = await database.fetch_all(query)
if not expired_records:
return []
# 执行删除
delete_query = delete(FileRecord).where(
FileRecord.expiration_time <= datetime.now(timezone.utc)
)
await database.execute(delete_query)
logger.info(f"Deleted {len(expired_records)} expired file records")
return [dict(record) for record in expired_records]
except Exception as e:
logger.error(f"Failed to delete expired file records: {str(e)}")
raise
async def get_file_api_key(name: str) -> Optional[str]:
"""
获取文件对应的 API Key
Args:
name: 文件名称
Returns:
Optional[str]: API Key,如果文件不存在或已过期则返回 None
"""
try:
query = select(FileRecord.api_key).where(
(FileRecord.name == name)
& (FileRecord.expiration_time > datetime.now(timezone.utc))
)
result = await database.fetch_one(query)
return result["api_key"] if result else None
except Exception as e:
logger.error(f"Failed to get file API key: {str(e)}")
raise
|