File size: 13,354 Bytes
0b38188
ad9e957
0b38188
 
 
ad9e957
 
0b38188
 
 
8f0ec20
0b38188
 
 
f4e76c2
 
 
0b38188
 
 
 
f4e76c2
ad9e957
f4e76c2
0b38188
ad9e957
a0ab3de
 
ad9e957
0b38188
017b721
 
 
 
0b38188
 
 
 
 
a0ab3de
017b721
 
0b38188
 
 
 
 
 
f4e76c2
2a551ce
ad9e957
 
 
 
 
 
 
 
a0ab3de
 
 
 
 
a9092cc
a0ab3de
 
ad9e957
 
 
 
a0ab3de
ad9e957
a0ab3de
 
ad9e957
 
 
 
 
 
a0ab3de
 
 
 
 
 
 
 
 
 
ad9e957
 
 
 
 
 
 
 
 
a0ab3de
ad9e957
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4e76c2
 
 
0b38188
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4e76c2
0b38188
f4e76c2
0b38188
f4e76c2
0b38188
f4e76c2
 
77cbe47
0b38188
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4e76c2
0b38188
 
 
 
2a551ce
0b38188
 
 
 
 
 
 
 
 
 
f4e76c2
 
5bf0454
619e4fd
f4e76c2
0b38188
c2ac4a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
619e4fd
0b38188
 
619e4fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c2ac4a9
 
 
 
 
619e4fd
 
 
 
 
c2ac4a9
619e4fd
 
 
5bf0454
0b38188
5bf0454
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0b38188
5bf0454
 
 
 
 
 
 
 
 
 
 
c2ac4a9
5bf0454
 
 
 
 
0b38188
 
 
5bf0454
 
 
 
 
 
 
 
 
 
0b38188
5bf0454
 
 
 
 
 
088dffd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# router_messages.py
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
import time
import uuid
import subprocess
import os
import 数据库连接 as db
from notifications import add_notification
from models import PrivateMessage
from 安全认证 import require_auth, is_admin

router = APIRouter()

# ==========================================
# 新增:系统公告请求体模型
# ==========================================
class SystemAnnouncement(BaseModel):
    admin_account: str
    content: str

# ==========================================
# 新增:发布系统公告接口 (仅限管理员,使用JWT验证)
# ==========================================
@router.post("/api/system/announcement")
async def publish_announcement(ann: SystemAnnouncement, current_user: str = Depends(require_auth)):
    # 🔒 P0安全修复:使用环境变量配置的管理员列表
    if not is_admin(current_user):
        raise HTTPException(status_code=403, detail="无权发布系统公告,仅管理员可操作")
    
    # 查询管理员信息
    users_db = db.load_data("users.json", default_data={})
    admin_info = users_db.get(current_user, {})
    
    announcements_db = db.load_data("announcements.json", default_data=[])
    
    new_ann = {
        "id": f"sys_{int(time.time())}_{uuid.uuid4().hex[:6]}",
        "type": "system",
        "from_user": current_user,  # 使用真实的管理员账号
        "from_name": admin_info.get("name", current_user),  # 使用真实昵称,fallback 为账号
        "from_avatar": admin_info.get("avatarDataUrl", ""),  # 使用真实头像
        "content": ann.content,
        "created_at": int(time.time())
    }
    
    announcements_db.append(new_ann)
    db.save_data("announcements.json", announcements_db)
    return {"status": "success"}


# ==========================================
# 管理员调试:执行 Python 脚本
# ==========================================
class AdminScriptRequest(BaseModel):
    admin_account: str
    script_name: str

# 🔒 P0安全修复:脚本白名单(仅允许执行指定的脚本)
# 警告:添加新脚本前请确保其安全性
ALLOWED_SCRIPTS = {
    "密码迁移.py",      # 用户密码哈希化迁移
    "测试脚本.py",      # 接口测试工具
    "迁移_余额合并.py",  # 一次性余额合并迁移(执行后可移除)
}

@router.post("/api/admin/run-script")
async def run_admin_script(req: AdminScriptRequest, current_user: str = Depends(require_auth)):
    """

    管理员专属:执行指定的 Python 脚本

    🔒 P0安全修复:白名单 + 路径穿越防护

    """
    # 🔒 P0安全修复:使用环境变量配置的管理员列表
    if not is_admin(current_user):
        raise HTTPException(status_code=403, detail="无权执行此操作,仅管理员可操作")
    
    script_name = req.script_name.strip()
    if not script_name:
        raise HTTPException(status_code=400, detail="脚本名称不能为空")
    
    # 🔒 P0安全修复:路径穿越攻击防护
    if ".." in script_name or "/" in script_name or "\\" in script_name:
        raise HTTPException(status_code=400, detail="🚨 安全拦截:脚本名称包含非法字符")
    
    # 🔒 P0安全修复:白名单检查
    if script_name not in ALLOWED_SCRIPTS:
        raise HTTPException(
            status_code=403, 
            detail=f"🚨 安全拦截:脚本 [{script_name}] 不在白名单中。允许的脚本: {list(ALLOWED_SCRIPTS)}"
        )
    
    # 获取当前工作目录
    current_dir = os.path.dirname(os.path.abspath(__file__))
    script_path = os.path.join(current_dir, script_name)
    
    # 检查文件是否存在
    if not os.path.exists(script_path):
        return {
            "status": "error",
            "output": f"❌ 脚本文件不存在: {script_name}\n\n白名单脚本: {list(ALLOWED_SCRIPTS)}"
        }
    
    try:
        # 执行脚本,设置超时 60 秒
        result = subprocess.run(
            ["python", script_path],
            capture_output=True,
            text=True,
            timeout=60,
            cwd=current_dir,
            encoding="utf-8"
        )
        
        output = ""
        if result.stdout:
            output += f"📝 标准输出:\n{result.stdout}\n"
        if result.stderr:
            output += f"\n⚠️ 错误输出:\n{result.stderr}"
        if not output:
            output = "✅ 脚本执行完成,无输出"
            
        return {
            "status": "success" if result.returncode == 0 else "error",
            "return_code": result.returncode,
            "output": output
        }
        
    except subprocess.TimeoutExpired:
        return {
            "status": "error",
            "output": "❌ 脚本执行超时 (60秒)"
        }
    except Exception as e:
        return {
            "status": "error",
            "output": f"❌ 执行异常: {str(e)}"
        }

# ==========================================
# 原有功能:私信与聊天
# ==========================================
@router.post("/api/messages/private")
async def send_private_message(msg: PrivateMessage):
    chats_db = db.load_data("chats.json", default_data={})
    conv_id = f"{min(msg.sender, msg.receiver)}_{max(msg.sender, msg.receiver)}"
    if conv_id not in chats_db: chats_db[conv_id] = []
    
    chat_msg = {"id": f"chat_{int(time.time())}_{uuid.uuid4().hex[:6]}", "sender": msg.sender, "receiver": msg.receiver, "content": msg.content, "created_at": int(time.time()), "is_read": False}
    chats_db[conv_id].append(chat_msg)
    db.save_data("chats.json", chats_db)
    
    add_notification(msg.receiver, {"type": "private", "from_user": msg.sender, "content": msg.content})
    return {"status": "success"}

@router.get("/api/chats/{account}")
async def get_chat_list(account: str):
    chats_db = db.load_data("chats.json", default_data={})
    users_db = db.load_data("users.json", default_data={})
    
    chat_list = []
    for conv_id, msgs in chats_db.items():
        if account in conv_id:
            other_account = conv_id.replace(account, "").replace("_", "")
            if not msgs: continue
            last_msg = msgs[-1]
            unread_count = sum(1 for m in msgs if m["receiver"] == account and not m.get("is_read"))
            other_user = users_db.get(other_account, {})
            chat_list.append({
                "account": other_account,
                "name": other_user.get("name", other_account),
                "avatar": other_user.get("avatarDataUrl", ""),
                "last_message": last_msg["content"],
                "last_time": last_msg["created_at"],
                "unread_count": unread_count
            })
            
    chat_list.sort(key=lambda x: x["last_time"], reverse=True)
    return {"status": "success", "data": chat_list}

@router.get("/api/chats/{account}/{target_account}")
async def get_chat_history(account: str, target_account: str):
    chats_db = db.load_data("chats.json", default_data={})
    conv_id = f"{min(account, target_account)}_{max(account, target_account)}"
    msgs = chats_db.get(conv_id, [])
    
    now = int(time.time())
    seven_days = 7 * 24 * 3600
    valid_msgs = []
    modified = False
    
    for m in msgs:
        if not m.get("is_read") or (now - m.get("created_at", 0) < seven_days):
            valid_msgs.append(m)
        else:
            modified = True
            
        # 本次访问即为已读
        if m["receiver"] == account and not m.get("is_read"): 
            m["is_read"] = True
            modified = True
            
    if modified or len(valid_msgs) != len(msgs): 
        chats_db[conv_id] = valid_msgs
        db.save_data("chats.json", chats_db)
        
    return {"status": "success", "data": valid_msgs}

# ==========================================
# 改造:获取通知列表 (加入系统公告懒加载注入)
# 使用 atomic_update 避免并发覆盖问题
# 🔥 性能优化:先用只读方式检查是否有实际变更,避免无意义的写入和HF上传
# ==========================================
@router.get("/api/messages/{account}")
async def get_messages(account: str, count_only: bool = False, current_user: str = Depends(require_auth)):
    # 🔥 count_only 模式:轻量级轮询,只返回未读数,不标记已读
    if count_only:
        messages_db = db.load_data("messages.json", default_data={})
        user_msgs = messages_db.get(account, [])
        
        now = int(time.time())
        seven_days = 7 * 24 * 3600
        
        # 只统计未过期的未读消息
        unread = sum(1 for m in user_msgs 
                     if not m.get("is_read") 
                     and (now - m.get("created_at", 0) < seven_days))
        return {"status": "success", "unread_count": unread}
    
    # 公告是只读的,先加载
    announcements_db = db.load_data("announcements.json", default_data=[])
    
    # 🔥 性能优化:先用只读方式检查是否有实际变更需要写入
    messages_db = db.load_data("messages.json", default_data={})
    user_msgs = messages_db.get(account, [])
    
    now = int(time.time())
    seven_days = 7 * 24 * 3600
    
    # 检查三个条件判断是否需要写入
    # 1. 是否有新公告需要注入
    user_msg_ids = {m.get("id") for m in user_msgs}
    has_new_announcements = any(
        ann.get("id") not in user_msg_ids 
        for ann in announcements_db
    )
    
    # 2. 是否有未读消息需要标记已读
    has_unread = any(not m.get("is_read") for m in user_msgs)
    
    # 3. 是否有已读超过7天的消息需要清理
    has_expired = any(
        m.get("is_read") and (now - m.get("created_at", 0) >= seven_days)
        for m in user_msgs
    )
    
    needs_update = has_new_announcements or has_unread or has_expired
    
    # 🔥 修复:在标记已读之前先计算真实的未读数
    unread_before_mark = sum(1 for m in user_msgs 
                              if not m.get("is_read") 
                              and (now - m.get("created_at", 0) < seven_days))
    
    if not needs_update:
        # 无变更,直接返回只读数据,不触发写入和HF上传
        return {
            "status": "success",
            "data": user_msgs,
            "unread_count": unread_before_mark  # 🔥 修复:返回真实的未读数
        }
    
    # 有变更需要写入,使用 atomic_update 保证并发安全
    result_container = [None]
    
    def updater(data):
        user_msgs = data.get(account, [])
        
        # --- 核心改造区:瞬间比对并注入全局公告 ---
        user_msg_ids = {m.get("id") for m in user_msgs}
        
        injected = False
        for ann in announcements_db:
            if ann.get("id") not in user_msg_ids:
                new_sys_msg = dict(ann)
                new_sys_msg["is_read"] = False
                new_sys_msg["receiver"] = account
                user_msgs.append(new_sys_msg)
                injected = True
        
        if injected:
            # 重新按照时间倒序排列,让新公告置顶
            user_msgs.sort(key=lambda x: x.get("created_at", 0), reverse=True)
        # ----------------------------------------
        
        now = int(time.time())
        seven_days = 7 * 24 * 3600
        valid = []
        
        # 如果注入了新公告,则判定为需要回写数据库保存
        modified = injected
        
        for m in user_msgs:
            if not m.get("is_read") or (now - m.get("created_at", 0) < seven_days):
                valid.append(m)
            else:
                modified = True
            
            # 本次访问即为已读 - 将所有未读消息标记为已读
            if not m.get("is_read"):
                m["is_read"] = True
                modified = True
        # 原地修改 data,atomic_update 会自动保存
        data[account] = valid
        
        # 通过闭包返回结果
        result_container[0] = {
            "status": "success",
            "data": valid,
            "unread_count": unread_before_mark  # 🔥 修复:返回标记已读前的真实未读数
        }
    
    db.atomic_update("messages.json", updater, default_data={})
    
    return result_container[0]

@router.post("/api/messages/{account}/read")
async def mark_messages_read(account: str):
    """

    标记消息为已读(原子操作,并发安全)

    """
    def updater(data):
        user_msgs = data.get(account, [])
        modified = False
        for m in user_msgs:
            if not m.get("is_read"):
                m["is_read"] = True
                modified = True
        
        # 原地修改 data,atomic_update 会自动保存
        if modified:
            data[account] = user_msgs
    
    db.atomic_update("messages.json", updater, default_data={})
    
    return {"status": "success"}