ZHIWEI666 commited on
Commit
9e5055f
·
verified ·
1 Parent(s): 027a973

Upload 迁移_补全历史SALE记录.py

Browse files
Files changed (1) hide show
  1. 迁移_补全历史SALE记录.py +351 -0
迁移_补全历史SALE记录.py ADDED
@@ -0,0 +1,351 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ # -*- coding: utf-8 -*-
3
+ """
4
+ 迁移脚本:为历史购买记录补全卖家 SALE 交易记录
5
+ ====================================================
6
+ 背景:
7
+ 之前购买接口只为买家生成 PURCHASE 记录,未为卖家生成 SALE 记录。
8
+ 现已修复,新购买会同步生成双方记录。
9
+ 本脚本为历史缺失的 SALE 记录进行补全。
10
+
11
+ 迁移逻辑:
12
+ 1. 查询所有 tx_type="PURCHASE" 的 Transaction 记录
13
+ 2. 检查是否已存在对应 SALE 记录(幂等保障)
14
+ - 匹配条件:相同 item_id + 卖家account(related_account) + tx_type="SALE" + 金额相同
15
+ 3. 不存在则创建 SALE 记录,created_at 与 PURCHASE 保持一致
16
+
17
+ 用法:
18
+ python 迁移_补全历史SALE记录.py # 预览模式(dry-run,默认)
19
+ python 迁移_补全历史SALE记录.py --execute # 实际执行迁移
20
+
21
+ 作者:数据迁移脚本
22
+ 创建日期:2026-05-20
23
+ """
24
+
25
+ import os
26
+ import sys
27
+ import time
28
+ import uuid
29
+ import argparse
30
+ from dataclasses import dataclass, field
31
+ from typing import List, Dict, Optional, Tuple
32
+
33
+ # 添加云端Space代码目录到路径
34
+ script_dir = os.path.dirname(os.path.abspath(__file__))
35
+ parent_dir = os.path.dirname(script_dir)
36
+ if parent_dir not in sys.path:
37
+ sys.path.insert(0, parent_dir)
38
+
39
+ from sqlalchemy import create_engine
40
+ from sqlalchemy.orm import sessionmaker, Session
41
+ from sqlalchemy.exc import SQLAlchemyError
42
+
43
+ from database_sql import SQLALCHEMY_DATABASE_URL
44
+ from models_sql import Transaction
45
+
46
+
47
+ # ==========================================
48
+ # 📊 统计数据结构
49
+ # ==========================================
50
+
51
+ @dataclass
52
+ class MigrationStats:
53
+ """迁移统计信息"""
54
+ total_purchase: int = 0 # PURCHASE 记录总数
55
+ already_exists: int = 0 # 已有对应 SALE 记录,跳过
56
+ skipped_no_seller: int = 0 # 无卖家账号(related_account 为空),跳过
57
+ created: int = 0 # 新创建 SALE 记录数
58
+ failed: int = 0 # 单条处理失败数
59
+ failed_tx_ids: List[str] = field(default_factory=list) # 失败的 PURCHASE tx_id
60
+
61
+ def __str__(self) -> str:
62
+ return f"""
63
+ ========== 迁移统计报告 ==========
64
+ PURCHASE 记录总数: {self.total_purchase}
65
+ 已有对应 SALE(跳过): {self.already_exists}
66
+ 无卖家账号(跳过): {self.skipped_no_seller}
67
+ 新创建 SALE 记录: {self.created}
68
+ 处理失败: {self.failed}
69
+ {f" 失败的 tx_id: {self.failed_tx_ids}" if self.failed_tx_ids else ""}
70
+ ==================================
71
+ """
72
+
73
+
74
+ # ==========================================
75
+ # 🔗 数据库连接
76
+ # ==========================================
77
+
78
+ def get_database_session() -> Session:
79
+ """创建数据库会话"""
80
+ engine = create_engine(SQLALCHEMY_DATABASE_URL)
81
+ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
82
+ return SessionLocal()
83
+
84
+
85
+ # ==========================================
86
+ # 🔍 核心检测逻辑
87
+ # ==========================================
88
+
89
+ def sale_record_exists(session: Session, purchase: Transaction) -> bool:
90
+ """
91
+ 检查是否已存在对应的 SALE 记录(幂等校验)
92
+
93
+ 匹配条件:
94
+ - tx_type = "SALE"
95
+ - account = purchase.related_account(卖家)
96
+ - item_id = purchase.item_id
97
+ - amount = abs(purchase.amount)(PURCHASE 为负数,SALE 为正数)
98
+ """
99
+ seller_account = purchase.related_account
100
+ if not seller_account:
101
+ return False
102
+
103
+ sale_amount = abs(purchase.amount)
104
+
105
+ existing = session.query(Transaction).filter(
106
+ Transaction.tx_type == "SALE",
107
+ Transaction.account == seller_account,
108
+ Transaction.item_id == purchase.item_id,
109
+ Transaction.amount == sale_amount,
110
+ Transaction.related_account == purchase.account,
111
+ ).first()
112
+
113
+ return existing is not None
114
+
115
+
116
+ def build_sale_record(purchase: Transaction) -> Optional[Dict]:
117
+ """
118
+ 根据 PURCHASE 记录构造 SALE 记录的字段
119
+
120
+ 返回 None 表示无法构造(如无卖家账号)
121
+ """
122
+ seller_account = purchase.related_account
123
+ if not seller_account:
124
+ return None
125
+
126
+ item_title = purchase.item_title or purchase.item_id or ""
127
+ sale_amount = abs(purchase.amount)
128
+
129
+ # tx_id 格式:SALE_MIGRATE_{原PURCHASE的tx_id尾段}_{随机6位}
130
+ # 取 PURCHASE tx_id 的最后一段(通常为 BUY_{ts}_{hex6},取 hex6 部分)
131
+ purchase_suffix = purchase.tx_id.rsplit("_", 1)[-1] if "_" in purchase.tx_id else purchase.tx_id[-6:]
132
+ sale_tx_id = f"SALE_MIGRATE_{purchase_suffix}_{uuid.uuid4().hex[:6]}"
133
+
134
+ return {
135
+ "tx_id": sale_tx_id,
136
+ "account": seller_account,
137
+ "tx_type": "SALE",
138
+ "amount": sale_amount,
139
+ "related_account": purchase.account,
140
+ "item_id": purchase.item_id,
141
+ "item_title": purchase.item_title,
142
+ "item_type": purchase.item_type,
143
+ "related_user_name": purchase.related_user_name,
144
+ "description": f"销售收入(历史补全): {item_title}",
145
+ "created_at": purchase.created_at,
146
+ # 迁移记录无需防篡改哈希链,留空
147
+ "prev_hash": "MIGRATE_NO_HASH",
148
+ "tx_hash": "MIGRATE_NO_HASH",
149
+ }
150
+
151
+
152
+ # ==========================================
153
+ # 👁️ 预览模式
154
+ # ==========================================
155
+
156
+ def preview_migration(session: Session) -> Tuple[List[Dict], MigrationStats]:
157
+ """
158
+ 预览模式:扫描所有 PURCHASE 记录,返回待创建列表和统计
159
+
160
+ 不写入数据库。
161
+ """
162
+ print("\n========== 迁移预览模式 ==========")
163
+ print("⚠️ 当前为预览模式,不会实际修改数据")
164
+ print(" 添加 --execute 参数以实际执行迁移\n")
165
+
166
+ stats = MigrationStats()
167
+ to_create: List[Dict] = []
168
+
169
+ purchases = session.query(Transaction).filter(
170
+ Transaction.tx_type == "PURCHASE"
171
+ ).order_by(Transaction.created_at.asc()).all()
172
+
173
+ stats.total_purchase = len(purchases)
174
+ print(f"共发现 {stats.total_purchase} 条 PURCHASE 记录,开始扫描...\n")
175
+
176
+ for purchase in purchases:
177
+ # 无卖家
178
+ if not purchase.related_account:
179
+ stats.skipped_no_seller += 1
180
+ continue
181
+
182
+ # 已有 SALE
183
+ if sale_record_exists(session, purchase):
184
+ stats.already_exists += 1
185
+ continue
186
+
187
+ # 构造 SALE 记录
188
+ sale_data = build_sale_record(purchase)
189
+ if sale_data:
190
+ to_create.append(sale_data)
191
+ stats.created += 1
192
+
193
+ # 打印预览详情(最多前 20 条)
194
+ print(f"发现 {len(to_create)} 条需要补全的 SALE 记录:\n")
195
+ display_limit = 20
196
+ for i, sale in enumerate(to_create[:display_limit], 1):
197
+ print(f" {i:3d}. 卖家: {sale['account']}")
198
+ print(f" 买家: {sale['related_account']}")
199
+ print(f" 资源: {sale['item_id']} | {sale['item_title'] or '(无标题)'}")
200
+ print(f" 金额: +{sale['amount']} 分")
201
+ print(f" 时间: {sale['created_at']}")
202
+ print(f" 新 tx_id: {sale['tx_id']}\n")
203
+
204
+ if len(to_create) > display_limit:
205
+ print(f" ... 还有 {len(to_create) - display_limit} 条未显示\n")
206
+
207
+ # 预览模式下 created 字段表示"待创建"数量
208
+ print(stats)
209
+ return to_create, stats
210
+
211
+
212
+ # ==========================================
213
+ # ⚡ 执行模式
214
+ # ==========================================
215
+
216
+ BATCH_SIZE = 100 # 每批提交条数
217
+
218
+
219
+ def execute_migration(session: Session):
220
+ """
221
+ 执行实际迁移:
222
+ - 逐条处理,幂等检测
223
+ - 每 BATCH_SIZE 条 commit 一次
224
+ - 单条失败不影响其他记录
225
+ """
226
+ print("\n========== 开始执行迁移 ==========\n")
227
+
228
+ stats = MigrationStats()
229
+
230
+ purchases = session.query(Transaction).filter(
231
+ Transaction.tx_type == "PURCHASE"
232
+ ).order_by(Transaction.created_at.asc()).all()
233
+
234
+ stats.total_purchase = len(purchases)
235
+ print(f"共发现 {stats.total_purchase} 条 PURCHASE 记录,开始处理...\n")
236
+
237
+ batch_pending = 0 # 当前批次待提交数
238
+
239
+ for idx, purchase in enumerate(purchases, 1):
240
+ try:
241
+ # ---- 无卖家,跳过 ----
242
+ if not purchase.related_account:
243
+ stats.skipped_no_seller += 1
244
+ continue
245
+
246
+ # ---- 已有 SALE,跳过 ----
247
+ if sale_record_exists(session, purchase):
248
+ stats.already_exists += 1
249
+ continue
250
+
251
+ # ---- 构造并插入 SALE 记录 ----
252
+ sale_data = build_sale_record(purchase)
253
+ if not sale_data:
254
+ stats.skipped_no_seller += 1
255
+ continue
256
+
257
+ sale_tx = Transaction(**sale_data)
258
+ session.add(sale_tx)
259
+ stats.created += 1
260
+ batch_pending += 1
261
+
262
+ # ---- 每 BATCH_SIZE 条提交一次 ----
263
+ if batch_pending >= BATCH_SIZE:
264
+ session.commit()
265
+ print(f" [进度] 已处理 {idx}/{stats.total_purchase},"
266
+ f"本批次提交 {batch_pending} 条,"
267
+ f"累计创建 {stats.created} 条,"
268
+ f"跳过 {stats.already_exists + stats.skipped_no_seller} 条")
269
+ batch_pending = 0
270
+
271
+ except SQLAlchemyError as e:
272
+ # 单条失败:回滚当前批次,记录错误,继续下一条
273
+ session.rollback()
274
+ batch_pending = 0
275
+ stats.failed += 1
276
+ stats.failed_tx_ids.append(purchase.tx_id)
277
+ print(f" ⚠️ 处理 PURCHASE {purchase.tx_id} 失败: {type(e).__name__}: {e}")
278
+
279
+ except Exception as e:
280
+ session.rollback()
281
+ batch_pending = 0
282
+ stats.failed += 1
283
+ stats.failed_tx_ids.append(purchase.tx_id)
284
+ print(f" ⚠️ 处理 PURCHASE {purchase.tx_id} 异常: {type(e).__name__}: {e}")
285
+
286
+ # ---- 提交剩余记录 ----
287
+ if batch_pending > 0:
288
+ try:
289
+ session.commit()
290
+ print(f" [进度] 最后一批提交 {batch_pending} 条")
291
+ except SQLAlchemyError as e:
292
+ session.rollback()
293
+ stats.failed += batch_pending
294
+ stats.created -= batch_pending
295
+ print(f" 🚨 最后一批提交失败: {e}")
296
+
297
+ print("\n✅ 迁移执行完毕!")
298
+ print(stats)
299
+
300
+
301
+ # ==========================================
302
+ # 🚀 入口
303
+ # ==========================================
304
+
305
+ def main():
306
+ parser = argparse.ArgumentParser(
307
+ description="为历史购买记录补全卖家 SALE 交易记录",
308
+ formatter_class=argparse.RawDescriptionHelpFormatter,
309
+ epilog="""
310
+ 示例:
311
+ python 迁移_补全历史SALE记录.py # 预览模式(默认)
312
+ python 迁移_补全历史SALE记录.py --execute # 实际执行
313
+ """
314
+ )
315
+ parser.add_argument(
316
+ "--execute",
317
+ action="store_true",
318
+ help="实际执行迁移(默认仅预览)"
319
+ )
320
+ args = parser.parse_args()
321
+
322
+ session = None
323
+ try:
324
+ print("正在连接数据库...")
325
+ session = get_database_session()
326
+ print("✅ 数据库连接成功\n")
327
+
328
+ if args.execute:
329
+ execute_migration(session)
330
+ else:
331
+ preview_migration(session)
332
+
333
+ except KeyboardInterrupt:
334
+ print("\n⚠️ 用户中断,已退出")
335
+ if session:
336
+ session.rollback()
337
+ except Exception as e:
338
+ print(f"\n🚨 程序执行出错:")
339
+ print(f" 错误类型: {type(e).__name__}")
340
+ print(f" 错误信息: {str(e)}")
341
+ if session:
342
+ session.rollback()
343
+ sys.exit(1)
344
+ finally:
345
+ if session:
346
+ session.close()
347
+ print("\n数据库连接已关闭")
348
+
349
+
350
+ if __name__ == "__main__":
351
+ main()