Spaces:
Running
Running
File size: 18,805 Bytes
0bf23ac 8f0ec20 a2e28fb 8f0ec20 a2e28fb 8f0ec20 0bf23ac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 | # 云端Space代码/db_utils.py
# ==========================================
# 🔧 P2代码质量优化:数据库工具函数
# ==========================================
# 作用:封装 JSON 数据库常用操作,减少重复代码
# 关联文件:
# - 数据库连接.py (基础读写)
# - router_tasks.py (任务操作)
# - router_posts.py (帖子操作)
# - router_items.py (商品操作)
# ==========================================
from typing import Any, Dict, List, Optional, Callable, Union
import 数据库连接 as db
# ==========================================
# 📖 查询工具函数
# ==========================================
def get_by_id(file_name: str, item_id: str, id_field: str = "id") -> Optional[Dict]:
"""
根据 ID 获取单个记录
参数:
file_name: JSON 文件名(如 tasks.json)
item_id: 要查找的 ID
id_field: ID 字段名(默认 "id")
返回:
找到的记录,或 None
示例:
task = get_by_id("tasks.json", "task_123")
user = get_by_id("users.json", "user@example.com", id_field="account")
"""
data = db.load_data(file_name, default_data=[])
if isinstance(data, dict):
return data.get(item_id)
return next((item for item in data if item.get(id_field) == item_id), None)
def get_by_field(file_name: str, field: str, value: Any) -> Optional[Dict]:
"""
根据指定字段获取单个记录
参数:
file_name: JSON 文件名
field: 字段名
value: 字段值
返回:
找到的记录,或 None
"""
data = db.load_data(file_name, default_data=[])
if isinstance(data, dict):
for item in data.values():
if isinstance(item, dict) and item.get(field) == value:
return item
return None
return next((item for item in data if item.get(field) == value), None)
def filter_by(file_name: str, **conditions) -> List[Dict]:
"""
根据条件筛选记录
参数:
file_name: JSON 文件名
**conditions: 筛选条件(键值对)
返回:
符合条件的记录列表
示例:
open_tasks = filter_by("tasks.json", status="open")
user_posts = filter_by("posts.json", author="user123", deleted=False)
"""
data = db.load_data(file_name, default_data=[])
if isinstance(data, dict):
data = list(data.values())
result = []
for item in data:
if all(item.get(key) == value for key, value in conditions.items()):
result.append(item)
return result
def count_by(file_name: str, **conditions) -> int:
"""
统计符合条件的记录数量
参数:
file_name: JSON 文件名
**conditions: 筛选条件
返回:
符合条件的记录数量
"""
return len(filter_by(file_name, **conditions))
# ==========================================
# ✏️ 更新工具函数
# ==========================================
def update_by_id(file_name: str, item_id: str, updates: Dict, id_field: str = "id") -> bool:
"""
根据 ID 更新记录
参数:
file_name: JSON 文件名
item_id: 要更新的 ID
updates: 要更新的字段(键值对)
id_field: ID 字段名
返回:
True 更新成功 / False 记录不存在
示例:
update_by_id("tasks.json", "task_123", {"status": "completed"})
"""
data = db.load_data(file_name, default_data=[])
if isinstance(data, dict):
if item_id in data:
data[item_id].update(updates)
db.save_data(file_name, data)
return True
return False
for item in data:
if item.get(id_field) == item_id:
item.update(updates)
db.save_data(file_name, data)
return True
return False
def update_with_fn(file_name: str, item_id: str, update_fn: Callable[[Dict], None], id_field: str = "id") -> bool:
"""
使用函数更新记录(支持复杂更新逻辑)
参数:
file_name: JSON 文件名
item_id: 要更新的 ID
update_fn: 更新函数,接收记录 dict,直接修改
id_field: ID 字段名
返回:
True 更新成功 / False 记录不存在
示例:
def increment_views(item):
item["views"] = item.get("views", 0) + 1
update_with_fn("items.json", "item_123", increment_views)
"""
data = db.load_data(file_name, default_data=[])
if isinstance(data, dict):
if item_id in data:
update_fn(data[item_id])
db.save_data(file_name, data)
return True
return False
for item in data:
if item.get(id_field) == item_id:
update_fn(item)
db.save_data(file_name, data)
return True
return False
# ==========================================
# ➕ 添加工具函数
# ==========================================
def insert(file_name: str, item: Dict, prepend: bool = True) -> bool:
"""
插入新记录
参数:
file_name: JSON 文件名
item: 要插入的记录
prepend: True 插入到开头 / False 插入到末尾
返回:
True 插入成功
示例:
insert("tasks.json", {"id": "task_123", "title": "新任务"})
"""
data = db.load_data(file_name, default_data=[])
if isinstance(data, dict):
item_id = item.get("id") or item.get("account")
if item_id:
data[item_id] = item
db.save_data(file_name, data)
return True
return False
if prepend:
data.insert(0, item)
else:
data.append(item)
db.save_data(file_name, data)
return True
def insert_if_not_exists(file_name: str, item: Dict, id_field: str = "id") -> bool:
"""
如果不存在则插入
参数:
file_name: JSON 文件名
item: 要插入的记录
id_field: ID 字段名
返回:
True 插入成功 / False 已存在
"""
item_id = item.get(id_field)
if not item_id:
return False
existing = get_by_id(file_name, item_id, id_field)
if existing:
return False
return insert(file_name, item)
# ==========================================
# ❌ 删除工具函数
# ==========================================
def delete_by_id(file_name: str, item_id: str, id_field: str = "id") -> bool:
"""
根据 ID 删除记录
参数:
file_name: JSON 文件名
item_id: 要删除的 ID
id_field: ID 字段名
返回:
True 删除成功 / False 记录不存在
"""
data = db.load_data(file_name, default_data=[])
if isinstance(data, dict):
if item_id in data:
del data[item_id]
db.save_data(file_name, data)
return True
return False
original_len = len(data)
data = [item for item in data if item.get(id_field) != item_id]
if len(data) < original_len:
db.save_data(file_name, data)
return True
return False
def soft_delete_by_id(file_name: str, item_id: str, id_field: str = "id") -> bool:
"""
软删除(标记为已删除,不物理删除)
参数:
file_name: JSON 文件名
item_id: 要删除的 ID
id_field: ID 字段名
返回:
True 成功 / False 记录不存在
"""
import time
return update_by_id(file_name, item_id, {
"deleted": True,
"deleted_at": int(time.time())
}, id_field)
# ==========================================
# 🔍 分页工具函数
# ==========================================
def paginate(
file_name: str,
page: int = 1,
limit: int = 20,
sort_by: str = None,
sort_desc: bool = True,
**filters
) -> Dict:
"""
分页查询
参数:
file_name: JSON 文件名
page: 页码(从 1 开始)
limit: 每页数量
sort_by: 排序字段
sort_desc: 是否降序
**filters: 筛选条件
返回:
{
"data": [...], # 当前页数据
"total": 100, # 总数
"page": 1, # 当前页
"limit": 20, # 每页数量
"pages": 5 # 总页数
}
"""
# 获取并筛选数据
if filters:
data = filter_by(file_name, **filters)
else:
data = db.load_data(file_name, default_data=[])
if isinstance(data, dict):
data = list(data.values())
# 排序
if sort_by:
try:
data = sorted(data, key=lambda x: x.get(sort_by, 0), reverse=sort_desc)
except TypeError:
pass # 排序失败时忽略
# 计算分页
total = len(data)
pages = (total + limit - 1) // limit # 向上取整
start = (page - 1) * limit
end = start + limit
return {
"data": data[start:end],
"total": total,
"page": page,
"limit": limit,
"pages": pages
}
# ==========================================
# 👁️ 访问量记录工具函数
# ==========================================
def record_view(data_file: str, item_id: str, user_account: str) -> Optional[Dict]:
"""
记录访问量(原子操作,并发安全)
参数:
data_file: JSON 文件名(如 Items.json)
item_id: 要记录访问的 item/task ID
user_account: 访问用户账号
返回:
{"views": N, "daily_views": N} 成功
None 记录不存在
逻辑:
- 初始化字段: views=0, viewed_by=[], daily_views=0, daily_views_date=""
- daily_views 每次调用都增加
- views 只在用户首次访问时增加(user_account 不在 viewed_by 中)
- 如果 daily_views_date 不是今天,重置 daily_views=0 并更新日期
并发安全:
- 使用 atomic_update 确保读-改-写在同一把锁内完成
- 高并发下不会丢失访问量
"""
from datetime import datetime, timedelta
# 用于在闭包中存储结果
result_container = [None]
def updater(data):
# 查找目标记录
target_item = None
if isinstance(data, dict):
if item_id in data:
target_item = data[item_id]
else:
for item in data:
if item.get("id") == item_id:
target_item = item
break
if target_item is None:
result_container[0] = None
return
# 初始化字段(如果不存在)
if "views" not in target_item:
target_item["views"] = 0
if "viewed_by" not in target_item:
target_item["viewed_by"] = []
if "daily_views" not in target_item:
target_item["daily_views"] = 0
if "daily_views_date" not in target_item:
target_item["daily_views_date"] = ""
# 获取今天的日期字符串(使用 UTC+8 时区)
now_utc8 = datetime.utcnow() + timedelta(hours=8)
today_str = now_utc8.date().isoformat() # "2026-04-02"
# 检查是否需要重置日访问量
if target_item["daily_views_date"] != today_str:
target_item["daily_views"] = 0
target_item["daily_views_date"] = today_str
# 增加日访问量(每次调用都增加)
target_item["daily_views"] += 1
# 检查用户是否已访问过
if user_account not in target_item["viewed_by"]:
target_item["viewed_by"].append(user_account)
target_item["views"] += 1
# 保存结果到闭包容器
result_container[0] = {
"views": target_item["views"],
"daily_views": target_item["daily_views"]
}
# 使用原子更新,整个读-改-写过程在同一把锁内完成
db.atomic_update(data_file, updater, default_data=[])
return result_container[0]
# ==========================================
# 🗂️ 排序结果缓存工具类
# ==========================================
import time
from typing import Callable, List, Dict, Any
class SortCache:
"""
排序结果缓存类 - 用于缓存列表排序结果,减少重复排序开销
设计原则:
1. 缓存排序后的 ID 顺序,而非完整数据,避免内存浪费
2. 使用 TTL (默认5分钟) 自动过期,确保数据新鲜度
3. 写操作时清除相关缓存,确保数据一致性
4. 在 load_data 返回的最新数据之上应用缓存的顺序
使用示例:
# 在列表接口中
def get_items(sort="time"):
items = db.load_data("items.json", default_data=[])
cache_key = f"items:{sort}"
def sort_fn(data):
if sort == "likes":
data.sort(key=lambda x: x.get("likes", 0), reverse=True)
else:
data.sort(key=lambda x: x.get("created_at", 0), reverse=True)
return sort_cache.get_sorted(cache_key, items, sort_fn)
# 在写操作后
def create_item(...):
...
sort_cache.invalidate("items")
"""
def __init__(self, ttl: int = 300):
"""
初始化排序缓存
参数:
ttl: 缓存过期时间(秒),默认 300 秒(5分钟)
"""
self._cache: Dict[str, tuple] = {} # {cache_key: (sorted_ids, timestamp)}
self._ttl = ttl
def get_sorted(self, cache_key: str, items: List[Dict], sort_fn: Callable[[List[Dict]], None]) -> List[Dict]:
"""
获取排序后的数据(带缓存)
参数:
cache_key: 缓存键,应包含数据文件和排序参数
items: 原始数据列表(来自 load_data 的最新数据)
sort_fn: 排序函数,接收 items 列表并原地排序
返回:
排序后的 items 列表
"""
now = time.time()
# 检查缓存是否有效
if cache_key in self._cache:
sorted_ids, cached_time = self._cache[cache_key]
if now - cached_time < self._ttl:
# 缓存有效,用缓存的顺序重排当前数据
id_order = {id_: idx for idx, id_ in enumerate(sorted_ids)}
# 按缓存的顺序排序,新数据(不在缓存中的)放在最后
return sorted(items, key=lambda x: id_order.get(x.get("id"), float('inf')))
# 缓存无效或不存在,执行排序
sort_fn(items)
# 缓存排序后的 ID 列表
sorted_ids = [item.get("id") for item in items]
self._cache[cache_key] = (sorted_ids, now)
return items
def invalidate(self, prefix: str = ""):
"""
清除缓存
参数:
prefix: 缓存键前缀,如果指定则只清除匹配的缓存,否则清除所有缓存
"""
if prefix:
keys_to_remove = [k for k in self._cache if k.startswith(prefix)]
for k in keys_to_remove:
del self._cache[k]
else:
self._cache.clear()
def get_stats(self) -> Dict[str, Any]:
"""
获取缓存统计信息(用于调试)
"""
now = time.time()
valid_count = sum(1 for _, ts in self._cache.values() if now - ts < self._ttl)
return {
"total_cached": len(self._cache),
"valid": valid_count,
"expired": len(self._cache) - valid_count,
"ttl": self._ttl
}
# 全局排序缓存实例(TTL 5分钟)
sort_cache = SortCache(ttl=300)
# ==========================================
# 🔄 批量操作工具函数
# ==========================================
def batch_update(file_name: str, item_ids: List[str], updates: Dict, id_field: str = "id") -> int:
"""
批量更新记录
参数:
file_name: JSON 文件名
item_ids: 要更新的 ID 列表
updates: 要更新的字段
id_field: ID 字段名
返回:
更新的记录数量
"""
data = db.load_data(file_name, default_data=[])
updated_count = 0
if isinstance(data, dict):
for item_id in item_ids:
if item_id in data:
data[item_id].update(updates)
updated_count += 1
else:
id_set = set(item_ids)
for item in data:
if item.get(id_field) in id_set:
item.update(updates)
updated_count += 1
if updated_count > 0:
db.save_data(file_name, data)
return updated_count
def batch_delete(file_name: str, item_ids: List[str], id_field: str = "id") -> int:
"""
批量删除记录
参数:
file_name: JSON 文件名
item_ids: 要删除的 ID 列表
id_field: ID 字段名
返回:
删除的记录数量
"""
data = db.load_data(file_name, default_data=[])
original_count = len(data) if isinstance(data, list) else len(data)
if isinstance(data, dict):
for item_id in item_ids:
data.pop(item_id, None)
else:
id_set = set(item_ids)
data = [item for item in data if item.get(id_field) not in id_set]
new_count = len(data) if isinstance(data, list) else len(data)
deleted_count = original_count - new_count
if deleted_count > 0:
db.save_data(file_name, data)
return deleted_count
|