ZHIWEI666 commited on
Commit
6f73fb4
·
verified ·
1 Parent(s): 6525f57

Upload 3 files

Browse files
Files changed (3) hide show
  1. app.py +10 -25
  2. router_wallet.py +168 -215
  3. 数据库连接.py +30 -36
app.py CHANGED
@@ -8,9 +8,8 @@ from huggingface_hub import hf_hub_download, HfApi
8
  import hashlib
9
  import urllib.parse
10
  import urllib.request
11
- import urllib.error # 用于捕获 HTTPError 以判断私有库状态
12
  import os
13
- import shutil
14
  import 数据库连接 as db
15
 
16
  from router_users import router as users_router
@@ -18,7 +17,7 @@ from router_items import router as items_router
18
  from router_comments import router as comments_router
19
  from router_messages import router as messages_router
20
  from router_wallet import router as wallet_router
21
- from router_proxy import router as proxy_router # 导入代理路由
22
 
23
  from database_sql import init_sql_db, get_db
24
  from models_sql import Ownership
@@ -30,15 +29,15 @@ def on_startup():
30
  init_sql_db()
31
  print("关系型数据库加载完毕,金融表同步完成。")
32
 
 
33
  app.add_middleware(
34
  CORSMiddleware,
35
  allow_origins=["*"],
36
- allow_credentials=True,
37
  allow_methods=["*"],
38
  allow_headers=["*"],
39
  )
40
 
41
- # 挂载各个业务域路由
42
  app.include_router(users_router)
43
  app.include_router(items_router)
44
  app.include_router(comments_router)
@@ -46,17 +45,14 @@ app.include_router(messages_router)
46
  app.include_router(wallet_router)
47
  app.include_router(proxy_router)
48
 
49
- # ==========================================
50
- # 核心上传接口 (修复:使用 HF Datasets 永久存储多媒体)
51
- # ==========================================
52
  @app.post("/api/upload")
53
- async def upload_file(file: UploadFile = File(...), file_type: str = Form(...)):
54
- # 1. 拦截超大文件 (限制为 10MB)
55
- content = await file.read()
56
  if len(content) > 10 * 1024 * 1024:
57
  raise HTTPException(status_code=400, detail="文件过大,请限制在 10MB 以内")
58
 
59
- # 2. 生成防木马的安全文件名 (利用 MD5)
60
  ext = file.filename.split(".")[-1].lower()
61
  if ext not in ["jpg", "jpeg", "png", "gif", "webp", "json", "mp4"]:
62
  raise HTTPException(status_code=400, detail="不支持的文件格式")
@@ -64,18 +60,15 @@ async def upload_file(file: UploadFile = File(...), file_type: str = Form(...)):
64
  file_hash = hashlib.md5(content).hexdigest()[:10]
65
  safe_filename = f"{file_type}_{file_hash}.{ext}"
66
 
67
- # 3. 临时存放在 Spaces 容器本地
68
  local_tmp_path = f"/tmp/{safe_filename}"
69
  with open(local_tmp_path, "wb") as f:
70
  f.write(content)
71
 
72
- # 4. 🚀 核心修复:直接将文件上传到 Hugging Face Dataset (永久图床/存储)
73
  hf_token = os.environ.get("HF_TOKEN")
74
  dataset_repo_id = "ZHIWEI666/ComfyUI-Ranking"
75
 
76
  try:
77
  api = HfApi()
78
- # 将文件上传到 Dataset 仓库的 uploads/{file_type} 文件夹下
79
  api.upload_file(
80
  path_or_fileobj=local_tmp_path,
81
  path_in_repo=f"uploads/{file_type}/{safe_filename}",
@@ -87,26 +80,21 @@ async def upload_file(file: UploadFile = File(...), file_type: str = Form(...)):
87
  except Exception as e:
88
  raise HTTPException(status_code=500, detail=f"图床同步失败: {str(e)}")
89
  finally:
90
- # 清理容器的临时文件,防止爆内存
91
  if os.path.exists(local_tmp_path):
92
  os.remove(local_tmp_path)
93
 
94
- # 5. 返回 Dataset 的永久直链 (利用 HF 的底层 Raw 链接,永不失效且自带 CDN)
95
  permanent_url = f"https://huggingface.co/datasets/{dataset_repo_id}/resolve/main/uploads/{file_type}/{safe_filename}"
96
-
97
  return {"status": "success", "url": permanent_url}
98
 
99
 
100
- # ==========================================
101
- # 资源验证与所有权拦截接口 (原逻辑无损保留)
102
- # ==========================================
103
  class ValidateResourceRequest(BaseModel):
104
  url: str
105
  item_id: str
106
  account: str
107
 
 
108
  @app.post("/api/validate_resource")
109
- async def validate_resource(req_data: ValidateResourceRequest, sql_db: Session = Depends(get_db)):
110
  target_url = req_data.url
111
  if not target_url.startswith("https://huggingface.co/datasets/") and not target_url.startswith("https://github.com/"):
112
  return JSONResponse(content={"error": "无效的下载链接"}, status_code=400)
@@ -119,7 +107,6 @@ async def validate_resource(req_data: ValidateResourceRequest, sql_db: Session =
119
  price = int(item.get("price", 0))
120
  author = item.get("author")
121
 
122
- # 【拦截逻辑】:若不是作者本���,必须去 SQL 库校验所有权
123
  if price > 0 and req_data.account != author:
124
  owned = sql_db.query(Ownership).filter(Ownership.account == req_data.account, Ownership.item_id == req_data.item_id).first()
125
  if not owned:
@@ -129,7 +116,6 @@ async def validate_resource(req_data: ValidateResourceRequest, sql_db: Session =
129
  if not hf_token: return JSONResponse(content={"error": "云端环境变量未配置 HF_TOKEN"}, status_code=401)
130
 
131
  try:
132
- # 情况 1:如果是 GitHub 私有库,探测死链
133
  if target_url.startswith("https://github.com/"):
134
  creator_token = item.get("github_token")
135
  fallback_token = os.environ.get("GITHUB_PAT")
@@ -150,7 +136,6 @@ async def validate_resource(req_data: ValidateResourceRequest, sql_db: Session =
150
  return JSONResponse(content={"error": "资源仓库不可访问,可能已被作者删除或设为私有"}, status_code=404)
151
  return {"status": "success", "message": "资源有效"}
152
 
153
- # 情况 2:如果是 Hugging Face 文件 (如 JSON 工作流),校验云端文件是否存在
154
  elif target_url.startswith("https://huggingface.co/datasets/"):
155
  repo_path_encoded = target_url.split("resolve/main/")[-1]
156
  repo_path = urllib.parse.unquote(repo_path_encoded)
 
8
  import hashlib
9
  import urllib.parse
10
  import urllib.request
11
+ import urllib.error
12
  import os
 
13
  import 数据库连接 as db
14
 
15
  from router_users import router as users_router
 
17
  from router_comments import router as comments_router
18
  from router_messages import router as messages_router
19
  from router_wallet import router as wallet_router
20
+ from router_proxy import router as proxy_router
21
 
22
  from database_sql import init_sql_db, get_db
23
  from models_sql import Ownership
 
29
  init_sql_db()
30
  print("关系型数据库加载完毕,金融表同步完成。")
31
 
32
+ # 🟢 致命 BUG 修复:allow_credentials 必须为 False 才能配合 origins=["*"]
33
  app.add_middleware(
34
  CORSMiddleware,
35
  allow_origins=["*"],
36
+ allow_credentials=False,
37
  allow_methods=["*"],
38
  allow_headers=["*"],
39
  )
40
 
 
41
  app.include_router(users_router)
42
  app.include_router(items_router)
43
  app.include_router(comments_router)
 
45
  app.include_router(wallet_router)
46
  app.include_router(proxy_router)
47
 
48
+ # 🟢 性能隐患修复:去掉 async def,改为 def。让 FastAPI 自动分配底层线程,防止上传大文件时全站卡死!
 
 
49
  @app.post("/api/upload")
50
+ def upload_file(file: UploadFile = File(...), file_type: str = Form(...)):
51
+ # 同步读取文件内容
52
+ content = file.file.read()
53
  if len(content) > 10 * 1024 * 1024:
54
  raise HTTPException(status_code=400, detail="文件过大,请限制在 10MB 以内")
55
 
 
56
  ext = file.filename.split(".")[-1].lower()
57
  if ext not in ["jpg", "jpeg", "png", "gif", "webp", "json", "mp4"]:
58
  raise HTTPException(status_code=400, detail="不支持的文件格式")
 
60
  file_hash = hashlib.md5(content).hexdigest()[:10]
61
  safe_filename = f"{file_type}_{file_hash}.{ext}"
62
 
 
63
  local_tmp_path = f"/tmp/{safe_filename}"
64
  with open(local_tmp_path, "wb") as f:
65
  f.write(content)
66
 
 
67
  hf_token = os.environ.get("HF_TOKEN")
68
  dataset_repo_id = "ZHIWEI666/ComfyUI-Ranking"
69
 
70
  try:
71
  api = HfApi()
 
72
  api.upload_file(
73
  path_or_fileobj=local_tmp_path,
74
  path_in_repo=f"uploads/{file_type}/{safe_filename}",
 
80
  except Exception as e:
81
  raise HTTPException(status_code=500, detail=f"图床同步失败: {str(e)}")
82
  finally:
 
83
  if os.path.exists(local_tmp_path):
84
  os.remove(local_tmp_path)
85
 
 
86
  permanent_url = f"https://huggingface.co/datasets/{dataset_repo_id}/resolve/main/uploads/{file_type}/{safe_filename}"
 
87
  return {"status": "success", "url": permanent_url}
88
 
89
 
 
 
 
90
  class ValidateResourceRequest(BaseModel):
91
  url: str
92
  item_id: str
93
  account: str
94
 
95
+ # 🟢 性能隐患修复:去掉 async def,防止 urllib.request 阻塞事件循环
96
  @app.post("/api/validate_resource")
97
+ def validate_resource(req_data: ValidateResourceRequest, sql_db: Session = Depends(get_db)):
98
  target_url = req_data.url
99
  if not target_url.startswith("https://huggingface.co/datasets/") and not target_url.startswith("https://github.com/"):
100
  return JSONResponse(content={"error": "无效的下载链接"}, status_code=400)
 
107
  price = int(item.get("price", 0))
108
  author = item.get("author")
109
 
 
110
  if price > 0 and req_data.account != author:
111
  owned = sql_db.query(Ownership).filter(Ownership.account == req_data.account, Ownership.item_id == req_data.item_id).first()
112
  if not owned:
 
116
  if not hf_token: return JSONResponse(content={"error": "云端环境变量未配置 HF_TOKEN"}, status_code=401)
117
 
118
  try:
 
119
  if target_url.startswith("https://github.com/"):
120
  creator_token = item.get("github_token")
121
  fallback_token = os.environ.get("GITHUB_PAT")
 
136
  return JSONResponse(content={"error": "资源仓库不可访问,可能已被作者删除或设为私有"}, status_code=404)
137
  return {"status": "success", "message": "资源有效"}
138
 
 
139
  elif target_url.startswith("https://huggingface.co/datasets/"):
140
  repo_path_encoded = target_url.split("resolve/main/")[-1]
141
  repo_path = urllib.parse.unquote(repo_path_encoded)
router_wallet.py CHANGED
@@ -1,6 +1,6 @@
1
  # router_wallet.py
2
  from fastapi import APIRouter, Depends, HTTPException, Request
3
- from fastapi.responses import Response # 【必须导入的模块,否则支付宝回调报 500】
4
  from sqlalchemy.orm import Session
5
  import time
6
  import uuid
@@ -10,11 +10,9 @@ from database_sql import get_db
10
  from models_sql import Wallet, Transaction, Ownership
11
  from models import RechargeRequest, WithdrawRequest, PurchaseRequest, TipRequest
12
  import 数据库连接 as json_db
13
- from router_users import VERIFY_CODES
14
 
15
  router = APIRouter()
16
 
17
- # 支付宝初始化防崩溃包装
18
  try:
19
  from alipay import AliPay
20
  from alipay.utils import AliPayConfig
@@ -32,232 +30,44 @@ except Exception as e:
32
 
33
  def calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash):
34
  data = f"{tx_id}{account}{tx_type}{amount}{prev_hash}"
35
- return hashlib.sha256(data.encode('utf-8')).hexdigest()
36
 
37
- def record_transaction(db: Session, account: str, tx_type: str, amount: int, related_account: str = None, item_id: str = None):
38
- last_tx = db.query(Transaction).filter(Transaction.account == account).order_by(Transaction.created_at.desc()).first()
39
- prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH"
 
40
 
41
- tx_id = f"tx_{int(time.time())}_{uuid.uuid4().hex[:8]}"
42
- tx_hash = calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash)
43
 
44
- new_tx = Transaction(
45
- tx_id=tx_id, account=account, tx_type=tx_type, amount=amount,
46
- related_account=related_account, item_id=item_id,
47
- prev_hash=prev_hash, tx_hash=tx_hash
48
  )
49
- db.add(new_tx)
50
- return new_tx
51
-
52
- @router.get("/api/wallet/{account}")
53
- async def get_wallet(account: str, db: Session = Depends(get_db)):
54
- wallet = db.query(Wallet).filter(Wallet.account == account).first()
55
- if not wallet:
56
- return {"balance": 0, "earn_balance": 0, "tip_balance": 0, "frozen_balance": 0}
57
-
58
- # 兼容旧数据库结构,防止属性不存在时崩溃
59
- return {
60
- "balance": wallet.balance,
61
- "earn_balance": getattr(wallet, 'earn_balance', 0),
62
- "tip_balance": getattr(wallet, 'tip_balance', 0),
63
- "frozen_balance": getattr(wallet, 'frozen_balance', 0)
64
- }
65
-
66
- @router.post("/api/wallet/tip")
67
- async def tip_user(req: TipRequest, db: Session = Depends(get_db)):
68
- if req.sender_account == req.target_account:
69
- raise HTTPException(status_code=400, detail="不能给自己打赏")
70
-
71
- sender_wallet = db.query(Wallet).filter(Wallet.account == req.sender_account).with_for_update().first()
72
- if not sender_wallet or sender_wallet.balance < req.amount:
73
- raise HTTPException(status_code=400, detail="积分余额不足,请先充值")
74
-
75
- # 扣除发送者余额
76
- sender_wallet.balance -= req.amount
77
- record_transaction(db, req.sender_account, "TIP_SEND", -req.amount, req.target_account)
78
-
79
- target_wallet = db.query(Wallet).filter(Wallet.account == req.target_account).with_for_update().first()
80
- if not target_wallet:
81
- target_wallet = Wallet(account=req.target_account)
82
- db.add(target_wallet)
83
-
84
- # 【打赏双轨账目分离核心】:金额只进入 tip_balance 账目
85
- if hasattr(target_wallet, 'tip_balance'):
86
- target_wallet.tip_balance += req.amount
87
- else:
88
- target_wallet.earn_balance += req.amount
89
-
90
- record_transaction(db, req.target_account, "TIP_RECEIVE", req.amount, req.sender_account)
91
- db.commit()
92
-
93
- # 触发云端通知
94
- try:
95
- from notifications import add_notification
96
- add_notification(req.target_account, {
97
- "type": "tip",
98
- "from_user": req.sender_account if not req.is_anonymous else "anonymous",
99
- "content": f"赞助了您 {req.amount} 积分!"
100
- })
101
- except Exception:
102
- pass
103
-
104
- return {"status": "success", "balance": sender_wallet.balance}
105
-
106
- @router.post("/api/wallet/withdraw")
107
- async def submit_withdraw(req: WithdrawRequest, db: Session = Depends(get_db)):
108
- # 1. 验证码校验
109
- verify_data = VERIFY_CODES.get(req.account)
110
- if not verify_data or verify_data["code"] != req.code or verify_data["action"] != "withdraw":
111
- raise HTTPException(status_code=400, detail="验证码无效或已过期")
112
-
113
- # 2. 锁定钱包
114
- wallet = db.query(Wallet).filter(Wallet.account == req.account).with_for_update().first()
115
- if not wallet:
116
- raise HTTPException(status_code=404, detail="钱包不存在")
117
-
118
- # 3. 合并双轨账目计算可提现总额度
119
- earn = getattr(wallet, 'earn_balance', 0)
120
- tip = getattr(wallet, 'tip_balance', 0)
121
- max_withdraw = earn + tip
122
 
123
- if max_withdraw < req.amount:
124
- raise HTTPException(status_code=400, detail=f"提现金额超出可用收益,当前最多可提现 {max_withdraw}")
125
-
126
- # 4. 优先扣除销售收益,不够的再扣打赏收益
127
- amount_to_deduct = req.amount
128
- if wallet.earn_balance >= amount_to_deduct:
129
- wallet.earn_balance -= amount_to_deduct
130
- else:
131
- remaining = amount_to_deduct - wallet.earn_balance
132
- wallet.earn_balance = 0
133
- if hasattr(wallet, 'tip_balance'):
134
- wallet.tip_balance -= remaining
135
-
136
- wallet.frozen_balance += req.amount
137
- record_transaction(db, req.account, "WITHDRAW_APPLY", -req.amount)
138
-
139
- db.commit()
140
- return {"status": "success"}
141
-
142
- @router.post("/api/wallet/purchase")
143
- async def purchase_item(req: PurchaseRequest, db: Session = Depends(get_db)):
144
- items_db = json_db.load_data("items.json", default_data=[])
145
- item = next((i for i in items_db if i["id"] == req.item_id), None)
146
- if not item: raise HTTPException(status_code=404, detail="商品不存在")
147
-
148
- price = int(item.get("price", 0))
149
- author = item.get("author")
150
-
151
- # 作者或已购买的免单判断
152
- if req.account == author: return {"status": "success", "already_owned": True}
153
 
154
- owned = db.query(Ownership).filter(Ownership.account == req.account, Ownership.item_id == req.item_id).first()
155
- if owned: return {"status": "success", "already_owned": True}
156
-
157
- if price > 0:
158
- buyer_wallet = db.query(Wallet).filter(Wallet.account == req.account).with_for_update().first()
159
- if not buyer_wallet or buyer_wallet.balance < price:
160
- raise HTTPException(status_code=400, detail="余额不足")
161
-
162
- buyer_wallet.balance -= price
163
- record_transaction(db, req.account, "PURCHASE", -price, related_account=author, item_id=req.item_id)
164
-
165
- author_wallet = db.query(Wallet).filter(Wallet.account == author).with_for_update().first()
166
- if not author_wallet:
167
- author_wallet = Wallet(account=author)
168
- db.add(author_wallet)
169
-
170
- # 销售金额进入 earn_balance 账目
171
- author_wallet.earn_balance += price
172
- record_transaction(db, author, "EARN", price, related_account=req.account, item_id=req.item_id)
173
-
174
- new_ownership = Ownership(account=req.account, item_id=req.item_id)
175
- db.add(new_ownership)
176
- db.commit()
177
- return {"status": "success"}
178
-
179
- # =============== 以下为真实的支付宝充值与回调逻辑 ===============
180
-
181
- @router.post("/api/wallet/create_recharge_order")
182
- async def create_recharge_order(req: RechargeRequest, db: Session = Depends(get_db)):
183
- if not alipay:
184
- raise HTTPException(status_code=500, detail="支付宝支付组件未配置或秘钥加载失败")
185
-
186
- # 生成带时间戳和随机哈希的全局唯一订单号
187
- order_id = f"R_{int(time.time())}_{uuid.uuid4().hex[:6]}"
188
-
189
- try:
190
- # 调用支付宝当面付接口生成二维码链接 (1积分 = 1元)
191
- result = alipay.api_alipay_trade_precreate(
192
- subject=f"ComfyUI社区积分充值 - {req.account}",
193
- out_trade_no=order_id,
194
- total_amount=str(req.amount),
195
- )
196
-
197
- if result.get("code") == "10000":
198
- return {"status": "success", "order_id": order_id, "qr_code_url": result.get("qr_code")}
199
- else:
200
- raise HTTPException(status_code=400, detail=result.get("msg", "创建支付宝订单失败"))
201
- except Exception as e:
202
- raise HTTPException(status_code=500, detail=f"支付网关异常: {str(e)}")
203
-
204
- @router.get("/api/wallet/check_order/{order_id}")
205
- async def check_order(order_id: str, db: Session = Depends(get_db)):
206
- """前端轮询查单接口:主动向支付宝查询订单状态,防止异步回调延迟或丢失"""
207
- if not alipay:
208
- return {"status": "PENDING"}
209
-
210
- try:
211
- # 主动向支付宝发起查单请求
212
- result = alipay.api_alipay_trade_query(out_trade_no=order_id)
213
- if result.get("code") == "10000" and result.get("trade_status") == "TRADE_SUCCESS":
214
-
215
- # 查单发现已支付,检查数据库防重复加钱
216
- existing_tx = db.query(Transaction).filter(Transaction.tx_id == order_id).first()
217
- if not existing_tx:
218
- # 提取金额并加锁更新账户
219
- amount = int(float(result.get("total_amount", 0)))
220
- account = result.get("subject", "").split(" - ")[-1]
221
-
222
- wallet = db.query(Wallet).filter(Wallet.account == account).with_for_update().first()
223
- if not wallet:
224
- wallet = Wallet(account=account)
225
- db.add(wallet)
226
-
227
- wallet.balance += amount
228
-
229
- # 手动记录一条流水,tx_id 强制绑定为支付宝的 order_id 防止重复处理
230
- last_tx = db.query(Transaction).filter(Transaction.account == account).order_by(Transaction.created_at.desc()).first()
231
- prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH"
232
- tx_hash = calculate_tx_hash(order_id, account, "RECHARGE", amount, prev_hash)
233
-
234
- new_tx = Transaction(
235
- tx_id=order_id, account=account, tx_type="RECHARGE", amount=amount,
236
- prev_hash=prev_hash, tx_hash=tx_hash
237
- )
238
- db.add(new_tx)
239
- db.commit()
240
-
241
- return {"status": "SUCCESS"}
242
- except Exception:
243
- pass
244
-
245
- return {"status": "PENDING"}
246
 
 
247
  @router.post("/api/wallet/alipay_notify")
248
  async def alipay_notify(request: Request, db: Session = Depends(get_db)):
249
- """支付宝异步回调 Webhook (最高优先级的到账通知)"""
250
- data = dict(await request.form())
 
 
251
  signature = data.pop("sign", None)
 
252
 
253
- # 核心风控:验证签名是否真的是支付宝发来的
254
- if not alipay or not alipay.verify(data, signature):
255
  return Response(content="fail", media_type="text/plain")
256
 
257
  if data.get("trade_status") in ("TRADE_SUCCESS", "TRADE_FINISHED"):
258
  order_id = data.get("out_trade_no")
259
 
260
- # 检查是否已经被轮询查单处理过了
261
  existing_tx = db.query(Transaction).filter(Transaction.tx_id == order_id).first()
262
  if not existing_tx:
263
  amount = int(float(data.get("total_amount", 0)))
@@ -281,5 +91,148 @@ async def alipay_notify(request: Request, db: Session = Depends(get_db)):
281
  db.add(new_tx)
282
  db.commit()
283
 
284
- # 必须向支付宝返回 success,否则支付宝会按阶梯间隔持续重发请求
285
- return Response(content="success", media_type="text/plain")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # router_wallet.py
2
  from fastapi import APIRouter, Depends, HTTPException, Request
3
+ from fastapi.responses import Response
4
  from sqlalchemy.orm import Session
5
  import time
6
  import uuid
 
10
  from models_sql import Wallet, Transaction, Ownership
11
  from models import RechargeRequest, WithdrawRequest, PurchaseRequest, TipRequest
12
  import 数据库连接 as json_db
 
13
 
14
  router = APIRouter()
15
 
 
16
  try:
17
  from alipay import AliPay
18
  from alipay.utils import AliPayConfig
 
30
 
31
  def calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash):
32
  data = f"{tx_id}{account}{tx_type}{amount}{prev_hash}"
33
+ return hashlib.sha256(data.encode()).hexdigest()
34
 
35
+ @router.post("/api/wallet/create_recharge_order")
36
+ async def create_recharge_order(req: RechargeRequest):
37
+ if not alipay:
38
+ raise HTTPException(status_code=500, detail="支付网关未配置或初始化失败")
39
 
40
+ order_id = f"PAY_{int(time.time())}_{uuid.uuid4().hex[:6]}"
41
+ subject = f"ComfyUI Community Points - {req.account}"
42
 
43
+ order_string = alipay.api_alipay_trade_precreate(
44
+ out_trade_no=order_id,
45
+ total_amount=str(req.amount),
46
+ subject=subject
47
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
 
49
+ qr_code_url = order_string.get("qr_code")
50
+ if not qr_code_url:
51
+ raise HTTPException(status_code=500, detail="生成支付二维码失败")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
 
53
+ return {"status": "success", "order_id": order_id, "qr_code": qr_code_url}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
 
55
+ # 🟢 业务流转细节修复:正确解析 application/x-www-form-urlencoded
56
  @router.post("/api/wallet/alipay_notify")
57
  async def alipay_notify(request: Request, db: Session = Depends(get_db)):
58
+ # 强制将表单数据解析为纯字典,防止由于数据类型错误导致验签失败
59
+ form_data = await request.form()
60
+ data = dict(form_data.items())
61
+
62
  signature = data.pop("sign", None)
63
+ data.pop("sign_type", None)
64
 
65
+ if not alipay or not signature or not alipay.verify(data, signature):
 
66
  return Response(content="fail", media_type="text/plain")
67
 
68
  if data.get("trade_status") in ("TRADE_SUCCESS", "TRADE_FINISHED"):
69
  order_id = data.get("out_trade_no")
70
 
 
71
  existing_tx = db.query(Transaction).filter(Transaction.tx_id == order_id).first()
72
  if not existing_tx:
73
  amount = int(float(data.get("total_amount", 0)))
 
91
  db.add(new_tx)
92
  db.commit()
93
 
94
+ return Response(content="success", media_type="text/plain")
95
+
96
+ @router.get("/api/wallet/check_order/{order_id}")
97
+ async def check_order(order_id: str, db: Session = Depends(get_db)):
98
+ tx = db.query(Transaction).filter(Transaction.tx_id == order_id).first()
99
+ if tx:
100
+ return {"status": "SUCCESS"}
101
+ return {"status": "PENDING"}
102
+
103
+ @router.get("/api/wallet/{account}")
104
+ async def get_wallet(account: str, db: Session = Depends(get_db)):
105
+ wallet = db.query(Wallet).filter(Wallet.account == account).first()
106
+ if not wallet:
107
+ return {"balance": 0, "earn_balance": 0, "tip_balance": 0}
108
+ return {
109
+ "balance": wallet.balance,
110
+ "earn_balance": wallet.earn_balance,
111
+ "tip_balance": wallet.tip_balance
112
+ }
113
+
114
+ @router.post("/api/wallet/purchase")
115
+ async def purchase_item(req: PurchaseRequest, db: Session = Depends(get_db)):
116
+ items_db = json_db.load_data("items.json", default_data=[])
117
+ item = next((i for i in items_db if i["id"] == req.item_id), None)
118
+
119
+ if not item:
120
+ raise HTTPException(status_code=404, detail="商品不存在")
121
+
122
+ price = int(item.get("price", 0))
123
+ seller_account = item.get("author")
124
+
125
+ if price <= 0 or req.account == seller_account:
126
+ return {"status": "success", "already_owned": True}
127
+
128
+ owned = db.query(Ownership).filter(Ownership.account == req.account, Ownership.item_id == req.item_id).first()
129
+ if owned:
130
+ return {"status": "success", "already_owned": True}
131
+
132
+ buyer_wallet = db.query(Wallet).filter(Wallet.account == req.account).with_for_update().first()
133
+ if not buyer_wallet or buyer_wallet.balance < price:
134
+ raise HTTPException(status_code=402, detail="余额不足,请先充值")
135
+
136
+ seller_wallet = db.query(Wallet).filter(Wallet.account == seller_account).with_for_update().first()
137
+ if not seller_wallet:
138
+ seller_wallet = Wallet(account=seller_account)
139
+ db.add(seller_wallet)
140
+
141
+ buyer_wallet.balance -= price
142
+ seller_wallet.earn_balance += price
143
+
144
+ new_ownership = Ownership(account=req.account, item_id=req.item_id)
145
+ db.add(new_ownership)
146
+
147
+ tx_id = f"BUY_{int(time.time())}_{uuid.uuid4().hex[:6]}"
148
+ last_tx = db.query(Transaction).filter(Transaction.account == req.account).order_by(Transaction.created_at.desc()).first()
149
+ prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH"
150
+ tx_hash = calculate_tx_hash(tx_id, req.account, "PURCHASE", -price, prev_hash)
151
+
152
+ new_tx = Transaction(
153
+ tx_id=tx_id, account=req.account, tx_type="PURCHASE", amount=-price,
154
+ target_account=seller_account, prev_hash=prev_hash, tx_hash=tx_hash
155
+ )
156
+ db.add(new_tx)
157
+ db.commit()
158
+
159
+ return {"status": "success", "already_owned": False}
160
+
161
+ @router.post("/api/wallet/tip")
162
+ async def tip_user(req: TipRequest, db: Session = Depends(get_db)):
163
+ if req.amount <= 0:
164
+ raise HTTPException(status_code=400, detail="打赏金额必须大于0")
165
+
166
+ sender_wallet = db.query(Wallet).filter(Wallet.account == req.sender_account).with_for_update().first()
167
+ if not sender_wallet or sender_wallet.balance < req.amount:
168
+ raise HTTPException(status_code=402, detail="余额不足")
169
+
170
+ target_wallet = db.query(Wallet).filter(Wallet.account == req.target_account).with_for_update().first()
171
+ if not target_wallet:
172
+ target_wallet = Wallet(account=req.target_account)
173
+ db.add(target_wallet)
174
+
175
+ sender_wallet.balance -= req.amount
176
+ target_wallet.tip_balance += req.amount
177
+
178
+ tx_id = f"TIP_{int(time.time())}_{uuid.uuid4().hex[:6]}"
179
+ last_tx = db.query(Transaction).filter(Transaction.account == req.sender_account).order_by(Transaction.created_at.desc()).first()
180
+ prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH"
181
+ tx_hash = calculate_tx_hash(tx_id, req.sender_account, "TIP", -req.amount, prev_hash)
182
+
183
+ new_tx = Transaction(
184
+ tx_id=tx_id, account=req.sender_account, tx_type="TIP", amount=-req.amount,
185
+ target_account=req.target_account, prev_hash=prev_hash, tx_hash=tx_hash
186
+ )
187
+ db.add(new_tx)
188
+ db.commit()
189
+
190
+ from notifications import add_notification
191
+ display_sender = "匿名用户" if req.is_anonymous else req.sender_account
192
+ add_notification(req.target_account, {
193
+ "type": "tip",
194
+ "from_user": "system",
195
+ "target_item_title": "您的主页",
196
+ "content": f"🎉 {display_sender} 给您打赏了 {req.amount} 积分!"
197
+ })
198
+
199
+ return {"status": "success", "balance": sender_wallet.balance}
200
+
201
+ @router.post("/api/wallet/withdraw")
202
+ async def withdraw(req: WithdrawRequest, db: Session = Depends(get_db)):
203
+ key = f"{req.account}_withdraw"
204
+ code_data = VERIFY_CODES.get(key)
205
+ if not code_data or code_data["code"] != req.code or time.time() > code_data["expires"]:
206
+ raise HTTPException(status_code=400, detail="验证码无效或已过期")
207
+
208
+ wallet = db.query(Wallet).filter(Wallet.account == req.account).with_for_update().first()
209
+ if not wallet:
210
+ raise HTTPException(status_code=400, detail="钱包不存在")
211
+
212
+ total_withdrawable = wallet.earn_balance + wallet.tip_balance
213
+ if req.amount > total_withdrawable:
214
+ raise HTTPException(status_code=400, detail="可提现余额不足")
215
+
216
+ if req.amount <= wallet.earn_balance:
217
+ wallet.earn_balance -= req.amount
218
+ else:
219
+ remaining = req.amount - wallet.earn_balance
220
+ wallet.earn_balance = 0
221
+ wallet.tip_balance -= remaining
222
+
223
+ wallet.frozen_balance += req.amount
224
+
225
+ tx_id = f"WD_{int(time.time())}_{uuid.uuid4().hex[:6]}"
226
+ last_tx = db.query(Transaction).filter(Transaction.account == req.account).order_by(Transaction.created_at.desc()).first()
227
+ prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH"
228
+ tx_hash = calculate_tx_hash(tx_id, req.account, "WITHDRAW", -req.amount, prev_hash)
229
+
230
+ new_tx = Transaction(
231
+ tx_id=tx_id, account=req.account, tx_type="WITHDRAW", amount=-req.amount,
232
+ prev_hash=prev_hash, tx_hash=tx_hash
233
+ )
234
+ db.add(new_tx)
235
+ db.commit()
236
+
237
+ del VERIFY_CODES[key]
238
+ return {"status": "success"}
数据库连接.py CHANGED
@@ -2,6 +2,7 @@
2
  import os
3
  import json
4
  import threading
 
5
  from huggingface_hub import HfApi, hf_hub_download
6
 
7
  HF_TOKEN = os.environ.get("HF_TOKEN")
@@ -18,11 +19,9 @@ api = HfApi() if HF_TOKEN else None
18
  if not os.path.exists(LOCAL_DB_DIR):
19
  os.makedirs(LOCAL_DB_DIR)
20
 
21
- # 【核心优化 1】:引入全局读写锁,防止高并发下 JSON 数据覆写和丢失
22
  db_lock = threading.Lock()
23
 
24
  def load_data(file_name: str, default_data=None):
25
- """读取数据:引入线程锁,保证读取时不会读到写入一半的残缺数据"""
26
  if default_data is None:
27
  default_data = {} if file_name == "users.json" else []
28
 
@@ -32,17 +31,22 @@ def load_data(file_name: str, default_data=None):
32
  if not os.path.exists(local_path):
33
  if HF_TOKEN:
34
  try:
35
- file_path = hf_hub_download(repo_id=DATASET_REPO_ID, repo_type="dataset", filename=file_name, token=HF_TOKEN)
36
- with open(file_path, "r", encoding="utf-8") as f:
 
 
 
 
 
37
  data = json.load(f)
38
  with open(local_path, "w", encoding="utf-8") as f:
39
  json.dump(data, f, ensure_ascii=False, indent=2)
40
  return data
41
- except Exception:
 
42
  return default_data
43
- else:
44
- return default_data
45
-
46
  try:
47
  with open(local_path, "r", encoding="utf-8") as f:
48
  return json.load(f)
@@ -50,40 +54,30 @@ def load_data(file_name: str, default_data=None):
50
  print(f"解析 {file_name} 失败,启用默认数据。原因: {e}")
51
  return default_data
52
 
53
- def _background_upload_to_hf(local_path, file_name):
54
- """【核心优化 2】:后台独立线程执行 HF 推送,彻底解放 FastAPI 主线程性能"""
55
- try:
56
- api.upload_file(
57
- path_or_fileobj=local_path,
58
- path_in_repo=file_name,
59
- repo_id=DATASET_REPO_ID,
60
- repo_type="dataset",
61
- token=HF_TOKEN,
62
- commit_message=f"Auto-update {file_name}"
63
- )
64
- except Exception as e:
65
- print(f"后台同步到 HF Dataset 失败: {e}")
 
 
 
 
66
 
67
  def save_data(file_name: str, data):
68
- """保存数据:加锁写入本地,并触发异步后台同步"""
69
  local_path = os.path.join(LOCAL_DB_DIR, file_name)
70
 
71
  with db_lock:
72
  with open(local_path, "w", encoding="utf-8") as f:
73
  json.dump(data, f, ensure_ascii=False, indent=2)
74
 
75
- # 触发后台线程推送,接口毫秒级返回
76
- if HF_TOKEN:
77
- threading.Thread(target=_background_upload_to_hf, args=(local_path, file_name)).start()
78
-
79
- def save_file(file_path_in_repo: str, content: bytes):
80
- """保存二进制文件(图片/应用等)"""
81
- local_full_path = os.path.join(LOCAL_DB_DIR, file_path_in_repo)
82
- os.makedirs(os.path.dirname(local_full_path), exist_ok=True)
83
-
84
- with db_lock:
85
- with open(local_full_path, "wb") as f:
86
- f.write(content)
87
-
88
  if HF_TOKEN:
89
- threading.Thread(target=_background_upload_to_hf, args=(local_full_path, file_path_in_repo)).start()
 
2
  import os
3
  import json
4
  import threading
5
+ import time
6
  from huggingface_hub import HfApi, hf_hub_download
7
 
8
  HF_TOKEN = os.environ.get("HF_TOKEN")
 
19
  if not os.path.exists(LOCAL_DB_DIR):
20
  os.makedirs(LOCAL_DB_DIR)
21
 
 
22
  db_lock = threading.Lock()
23
 
24
  def load_data(file_name: str, default_data=None):
 
25
  if default_data is None:
26
  default_data = {} if file_name == "users.json" else []
27
 
 
31
  if not os.path.exists(local_path):
32
  if HF_TOKEN:
33
  try:
34
+ downloaded_path = hf_hub_download(
35
+ repo_id=DATASET_REPO_ID,
36
+ repo_type="dataset",
37
+ filename=file_name,
38
+ token=HF_TOKEN
39
+ )
40
+ with open(downloaded_path, "r", encoding="utf-8") as f:
41
  data = json.load(f)
42
  with open(local_path, "w", encoding="utf-8") as f:
43
  json.dump(data, f, ensure_ascii=False, indent=2)
44
  return data
45
+ except Exception as e:
46
+ print(f"从 HF 下载 {file_name} 失败: {e}")
47
  return default_data
48
+ return default_data
49
+
 
50
  try:
51
  with open(local_path, "r", encoding="utf-8") as f:
52
  return json.load(f)
 
54
  print(f"解析 {file_name} 失败,启用默认数据。原因: {e}")
55
  return default_data
56
 
57
+ # 🟢 隐形炸弹修复:增加错误重试机制,防止由于 Hugging Face 接口瞬间限流导致数据云端丢失
58
+ def _background_upload_to_hf(local_path, file_name, retries=3):
59
+ for attempt in range(retries):
60
+ try:
61
+ api.upload_file(
62
+ path_or_fileobj=local_path,
63
+ path_in_repo=file_name,
64
+ repo_id=DATASET_REPO_ID,
65
+ repo_type="dataset",
66
+ token=HF_TOKEN,
67
+ commit_message=f"Auto-update {file_name}"
68
+ )
69
+ return # 成功后立即退出线程
70
+ except Exception as e:
71
+ if attempt == retries - 1:
72
+ print(f"🚨 致命错误:重试 {retries} 次后,同步到 HF Dataset 依然失败: {e}")
73
+ time.sleep(2) # 失败后休眠 2 秒避开接口限流再试
74
 
75
  def save_data(file_name: str, data):
 
76
  local_path = os.path.join(LOCAL_DB_DIR, file_name)
77
 
78
  with db_lock:
79
  with open(local_path, "w", encoding="utf-8") as f:
80
  json.dump(data, f, ensure_ascii=False, indent=2)
81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  if HF_TOKEN:
83
+ threading.Thread(target=_background_upload_to_hf, args=(local_path, file_name)).start()