ZHIWEI666 commited on
Commit
f92968c
·
verified ·
1 Parent(s): 74406c7

Upload 迁移_补全历史SALE记录.py

Browse files
Files changed (1) hide show
  1. 迁移_补全历史SALE记录.py +332 -0
迁移_补全历史SALE记录.py ADDED
@@ -0,0 +1,332 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ # -*- coding: utf-8 -*-
3
+ """
4
+ 迁移脚本:为历史购买记录补全卖家 SALE 交易记录
5
+ ====================================================
6
+ 背景:
7
+ 之前购买接口只为买家生成 PURCHASE 记录,未为卖家生成 SALE 记录。
8
+ 现已修复,新购买会同步生成双方记录。
9
+ 本脚本为历史缺失的 SALE 记录进行补全。
10
+
11
+ 迁移逻辑:
12
+ 1. 查询所有 tx_type="PURCHASE" 的 Transaction 记录
13
+ 2. 检查是否已存在对应 SALE 记录(幂等保障)
14
+ - 匹配条件:相同 item_id + 卖家account(related_account) + tx_type="SALE" + 金额相同
15
+ 3. 不存在则创建 SALE 记录,created_at 与 PURCHASE 保持一致
16
+
17
+ 用法:
18
+ python 迁移_补全历史SALE记录.py # 直接执行迁移(幂等设计,重复执行不产生重复记录)
19
+
20
+ 作者:数据迁移脚本
21
+ 创建日期:2026-05-20
22
+ """
23
+
24
+ import os
25
+ import sys
26
+ import time
27
+ import uuid
28
+ import argparse
29
+ from dataclasses import dataclass, field
30
+ from typing import List, Dict, Optional, Tuple
31
+
32
+ # 添加云端Space代码目录到路径
33
+ script_dir = os.path.dirname(os.path.abspath(__file__))
34
+ parent_dir = os.path.dirname(script_dir)
35
+ if parent_dir not in sys.path:
36
+ sys.path.insert(0, parent_dir)
37
+
38
+ from sqlalchemy import create_engine
39
+ from sqlalchemy.orm import sessionmaker, Session
40
+ from sqlalchemy.exc import SQLAlchemyError
41
+
42
+ from database_sql import SQLALCHEMY_DATABASE_URL
43
+ from models_sql import Transaction
44
+
45
+
46
+ # ==========================================
47
+ # 📊 统计数据结构
48
+ # ==========================================
49
+
50
+ @dataclass
51
+ class MigrationStats:
52
+ """迁移统计信息"""
53
+ total_purchase: int = 0 # PURCHASE 记录总数
54
+ already_exists: int = 0 # 已有对应 SALE 记录,跳过
55
+ skipped_no_seller: int = 0 # 无卖家账号(related_account 为空),跳过
56
+ created: int = 0 # 新创建 SALE 记录数
57
+ failed: int = 0 # 单条处理失败数
58
+ failed_tx_ids: List[str] = field(default_factory=list) # 失败的 PURCHASE tx_id
59
+
60
+ def __str__(self) -> str:
61
+ return f"""
62
+ ========== 迁移统计报告 ==========
63
+ PURCHASE 记录总数: {self.total_purchase}
64
+ 已有对应 SALE(跳过): {self.already_exists}
65
+ 无卖家账号(跳过): {self.skipped_no_seller}
66
+ 新创建 SALE 记录: {self.created}
67
+ 处理失败: {self.failed}
68
+ {f" 失败的 tx_id: {self.failed_tx_ids}" if self.failed_tx_ids else ""}
69
+ ==================================
70
+ """
71
+
72
+
73
+ # ==========================================
74
+ # 🔗 数据库连接
75
+ # ==========================================
76
+
77
+ def get_database_session() -> Session:
78
+ """创建数据库会话"""
79
+ engine = create_engine(SQLALCHEMY_DATABASE_URL)
80
+ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
81
+ return SessionLocal()
82
+
83
+
84
+ # ==========================================
85
+ # 🔍 核心检测逻辑
86
+ # ==========================================
87
+
88
+ def sale_record_exists(session: Session, purchase: Transaction) -> bool:
89
+ """
90
+ 检查是否已存在对应的 SALE 记录(幂等校验)
91
+
92
+ 匹配条件:
93
+ - tx_type = "SALE"
94
+ - account = purchase.related_account(卖家)
95
+ - item_id = purchase.item_id
96
+ - amount = abs(purchase.amount)(PURCHASE 为负数,SALE 为正数)
97
+ """
98
+ seller_account = purchase.related_account
99
+ if not seller_account:
100
+ return False
101
+
102
+ sale_amount = abs(purchase.amount)
103
+
104
+ existing = session.query(Transaction).filter(
105
+ Transaction.tx_type == "SALE",
106
+ Transaction.account == seller_account,
107
+ Transaction.item_id == purchase.item_id,
108
+ Transaction.amount == sale_amount,
109
+ Transaction.related_account == purchase.account,
110
+ ).first()
111
+
112
+ return existing is not None
113
+
114
+
115
+ def build_sale_record(purchase: Transaction) -> Optional[Dict]:
116
+ """
117
+ 根据 PURCHASE 记录构造 SALE 记录的字段
118
+
119
+ 返回 None 表示无法构造(如无卖家账号)
120
+ """
121
+ seller_account = purchase.related_account
122
+ if not seller_account:
123
+ return None
124
+
125
+ item_title = purchase.item_title or purchase.item_id or ""
126
+ sale_amount = abs(purchase.amount)
127
+
128
+ # tx_id 格式:SALE_MIGRATE_{原PURCHASE的tx_id尾段}_{随机6位}
129
+ # 取 PURCHASE tx_id 的最后一段(通常为 BUY_{ts}_{hex6},取 hex6 部分)
130
+ purchase_suffix = purchase.tx_id.rsplit("_", 1)[-1] if "_" in purchase.tx_id else purchase.tx_id[-6:]
131
+ sale_tx_id = f"SALE_MIGRATE_{purchase_suffix}_{uuid.uuid4().hex[:6]}"
132
+
133
+ return {
134
+ "tx_id": sale_tx_id,
135
+ "account": seller_account,
136
+ "tx_type": "SALE",
137
+ "amount": sale_amount,
138
+ "related_account": purchase.account,
139
+ "item_id": purchase.item_id,
140
+ "item_title": purchase.item_title,
141
+ "item_type": purchase.item_type,
142
+ "related_user_name": purchase.related_user_name,
143
+ "description": f"销售收入(历史补全): {item_title}",
144
+ "created_at": purchase.created_at,
145
+ # 迁移记录无需防篡改哈希链,留空
146
+ "prev_hash": "MIGRATE_NO_HASH",
147
+ "tx_hash": "MIGRATE_NO_HASH",
148
+ }
149
+
150
+
151
+ # ==========================================
152
+ # 👁️ 预览模式
153
+ # ==========================================
154
+
155
+ def preview_migration(session: Session) -> Tuple[List[Dict], MigrationStats]:
156
+ """
157
+ 预览模式:扫描所有 PURCHASE 记录,返回待创建列表和统计
158
+
159
+ 不写入数据库。
160
+ """
161
+ print("\n========== 迁移预览模式 ==========")
162
+ print("⚠️ 当前为预览模式,不会实际修改数据")
163
+ print(" 添加 --execute 参数以实际执行迁移\n")
164
+
165
+ stats = MigrationStats()
166
+ to_create: List[Dict] = []
167
+
168
+ purchases = session.query(Transaction).filter(
169
+ Transaction.tx_type == "PURCHASE"
170
+ ).order_by(Transaction.created_at.asc()).all()
171
+
172
+ stats.total_purchase = len(purchases)
173
+ print(f"共发现 {stats.total_purchase} 条 PURCHASE 记录,开始扫描...\n")
174
+
175
+ for purchase in purchases:
176
+ # 无卖家
177
+ if not purchase.related_account:
178
+ stats.skipped_no_seller += 1
179
+ continue
180
+
181
+ # 已有 SALE
182
+ if sale_record_exists(session, purchase):
183
+ stats.already_exists += 1
184
+ continue
185
+
186
+ # 构造 SALE 记录
187
+ sale_data = build_sale_record(purchase)
188
+ if sale_data:
189
+ to_create.append(sale_data)
190
+ stats.created += 1
191
+
192
+ # 打印预览详情(最多前 20 条)
193
+ print(f"发现 {len(to_create)} 条需要补全的 SALE 记录:\n")
194
+ display_limit = 20
195
+ for i, sale in enumerate(to_create[:display_limit], 1):
196
+ print(f" {i:3d}. 卖家: {sale['account']}")
197
+ print(f" 买家: {sale['related_account']}")
198
+ print(f" 资源: {sale['item_id']} | {sale['item_title'] or '(无标题)'}")
199
+ print(f" 金额: +{sale['amount']} 分")
200
+ print(f" 时间: {sale['created_at']}")
201
+ print(f" 新 tx_id: {sale['tx_id']}\n")
202
+
203
+ if len(to_create) > display_limit:
204
+ print(f" ... 还有 {len(to_create) - display_limit} 条未显示\n")
205
+
206
+ # 预览模式下 created 字段表示"待创建"数量
207
+ print(stats)
208
+ return to_create, stats
209
+
210
+
211
+ # ==========================================
212
+ # ⚡ 执行模式
213
+ # ==========================================
214
+
215
+ BATCH_SIZE = 100 # 每批提交条数
216
+
217
+
218
+ def execute_migration(session: Session):
219
+ """
220
+ 执行实际迁移:
221
+ - 逐条处理,幂等检测
222
+ - 每 BATCH_SIZE 条 commit 一次
223
+ - 单条失败不影响其他记录
224
+ """
225
+ print("\n========== 开始执行迁移 ==========\n")
226
+
227
+ stats = MigrationStats()
228
+
229
+ purchases = session.query(Transaction).filter(
230
+ Transaction.tx_type == "PURCHASE"
231
+ ).order_by(Transaction.created_at.asc()).all()
232
+
233
+ stats.total_purchase = len(purchases)
234
+ print(f"共发现 {stats.total_purchase} 条 PURCHASE 记录,开始处理...\n")
235
+
236
+ batch_pending = 0 # 当前批次待提交数
237
+
238
+ for idx, purchase in enumerate(purchases, 1):
239
+ try:
240
+ # ---- 无卖家,跳过 ----
241
+ if not purchase.related_account:
242
+ stats.skipped_no_seller += 1
243
+ continue
244
+
245
+ # ---- 已有 SALE,跳过 ----
246
+ if sale_record_exists(session, purchase):
247
+ stats.already_exists += 1
248
+ continue
249
+
250
+ # ---- 构造并插入 SALE 记录 ----
251
+ sale_data = build_sale_record(purchase)
252
+ if not sale_data:
253
+ stats.skipped_no_seller += 1
254
+ continue
255
+
256
+ sale_tx = Transaction(**sale_data)
257
+ session.add(sale_tx)
258
+ stats.created += 1
259
+ batch_pending += 1
260
+
261
+ # ---- 每 BATCH_SIZE 条提交一次 ----
262
+ if batch_pending >= BATCH_SIZE:
263
+ session.commit()
264
+ print(f" [进度] 已处理 {idx}/{stats.total_purchase},"
265
+ f"本批次提交 {batch_pending} 条,"
266
+ f"累计创建 {stats.created} 条,"
267
+ f"跳过 {stats.already_exists + stats.skipped_no_seller} 条")
268
+ batch_pending = 0
269
+
270
+ except SQLAlchemyError as e:
271
+ # 单条失败:回滚当前批次,记录错误,继续下一条
272
+ session.rollback()
273
+ batch_pending = 0
274
+ stats.failed += 1
275
+ stats.failed_tx_ids.append(purchase.tx_id)
276
+ print(f" ⚠️ 处理 PURCHASE {purchase.tx_id} 失败: {type(e).__name__}: {e}")
277
+
278
+ except Exception as e:
279
+ session.rollback()
280
+ batch_pending = 0
281
+ stats.failed += 1
282
+ stats.failed_tx_ids.append(purchase.tx_id)
283
+ print(f" ⚠️ 处理 PURCHASE {purchase.tx_id} 异常: {type(e).__name__}: {e}")
284
+
285
+ # ---- 提交剩余记录 ----
286
+ if batch_pending > 0:
287
+ try:
288
+ session.commit()
289
+ print(f" [进度] 最后一批提交 {batch_pending} 条")
290
+ except SQLAlchemyError as e:
291
+ session.rollback()
292
+ stats.failed += batch_pending
293
+ stats.created -= batch_pending
294
+ print(f" 🚨 最后一批提交失败: {e}")
295
+
296
+ print("\n✅ 迁移执行完毕!")
297
+ print(stats)
298
+
299
+
300
+ # ==========================================
301
+ # 🚀 入口
302
+ # ==========================================
303
+
304
+ def main():
305
+ session = None
306
+ try:
307
+ print("正在连接数据库...")
308
+ session = get_database_session()
309
+ print("✅ 数据库连接成功\n")
310
+
311
+ # 直接执行迁移(脚本本身有幂等防重复设计)
312
+ execute_migration(session)
313
+
314
+ except KeyboardInterrupt:
315
+ print("\n⚠️ 用户中断,已退出")
316
+ if session:
317
+ session.rollback()
318
+ except Exception as e:
319
+ print(f"\n🚨 程序执行出错:")
320
+ print(f" 错误类型: {type(e).__name__}")
321
+ print(f" 错误信息: {str(e)}")
322
+ if session:
323
+ session.rollback()
324
+ sys.exit(1)
325
+ finally:
326
+ if session:
327
+ session.close()
328
+ print("\n数据库连接已关闭")
329
+
330
+
331
+ if __name__ == "__main__":
332
+ main()