Spaces:
Running
Running
| import time | |
| import uuid | |
| import 数据库连接 as db | |
| def add_notification(target_account: str, notif_data: dict): | |
| try: | |
| from_user = notif_data.get("from_user") | |
| if target_account == from_user or not target_account: | |
| return # 不给自己发通知 | |
| users_db = db.load_data("users.json", default_data={}) | |
| user_info = users_db.get(from_user, {}) | |
| notif = { | |
| "id": f"msg_{int(time.time())}_{uuid.uuid4().hex[:6]}", | |
| "type": notif_data.get("type"), | |
| "from_user": from_user, | |
| "from_name": user_info.get("name", from_user), | |
| "from_avatar": user_info.get("avatarDataUrl", ""), | |
| "target_item_id": notif_data.get("target_item_id", ""), | |
| "target_item_title": notif_data.get("target_item_title", ""), | |
| "content": notif_data.get("content", ""), | |
| "is_read": False, | |
| "created_at": int(time.time()) | |
| } | |
| def updater(data): | |
| if target_account not in data: | |
| data[target_account] = [] | |
| data[target_account].insert(0, notif) | |
| db.atomic_update("messages.json", updater, default_data={}) | |
| except Exception as e: | |
| print(f"⚠️ 通知发送失败 (target={target_account}): {e}") | |
| # 不抛出异常,让主流程继续 |