ZHIWEI666 commited on
Commit
d94e782
·
verified ·
1 Parent(s): 615febf

Delete 迁移_补全历史SALE记录.py

Browse files
Files changed (1) hide show
  1. 迁移_补全历史SALE记录.py +0 -351
迁移_补全历史SALE记录.py DELETED
@@ -1,351 +0,0 @@
1
- #!/usr/bin/env python3
2
- # -*- coding: utf-8 -*-
3
- """
4
- 迁移脚本:为历史购买记录补全卖家 SALE 交易记录
5
- ====================================================
6
- 背景:
7
- 之前购买接口只为买家生成 PURCHASE 记录,未为卖家生成 SALE 记录。
8
- 现已修复,新购买会同步生成双方记录。
9
- 本脚本为历史缺失的 SALE 记录进行补全。
10
-
11
- 迁移逻辑:
12
- 1. 查询所有 tx_type="PURCHASE" 的 Transaction 记录
13
- 2. 检查是否已存在对应 SALE 记录(幂等保障)
14
- - 匹配条件:相同 item_id + 卖家account(related_account) + tx_type="SALE" + 金额相同
15
- 3. 不存在则创建 SALE 记录,created_at 与 PURCHASE 保持一致
16
-
17
- 用法:
18
- python 迁移_补全历史SALE记录.py # 预览模式(dry-run,默认)
19
- python 迁移_补全历史SALE记录.py --execute # 实际执行迁移
20
-
21
- 作者:数据迁移脚本
22
- 创建日期:2026-05-20
23
- """
24
-
25
- import os
26
- import sys
27
- import time
28
- import uuid
29
- import argparse
30
- from dataclasses import dataclass, field
31
- from typing import List, Dict, Optional, Tuple
32
-
33
- # 添加云端Space代码目录到路径
34
- script_dir = os.path.dirname(os.path.abspath(__file__))
35
- parent_dir = os.path.dirname(script_dir)
36
- if parent_dir not in sys.path:
37
- sys.path.insert(0, parent_dir)
38
-
39
- from sqlalchemy import create_engine
40
- from sqlalchemy.orm import sessionmaker, Session
41
- from sqlalchemy.exc import SQLAlchemyError
42
-
43
- from database_sql import SQLALCHEMY_DATABASE_URL
44
- from models_sql import Transaction
45
-
46
-
47
- # ==========================================
48
- # 📊 统计数据结构
49
- # ==========================================
50
-
51
- @dataclass
52
- class MigrationStats:
53
- """迁移统计信息"""
54
- total_purchase: int = 0 # PURCHASE 记录总数
55
- already_exists: int = 0 # 已有对应 SALE 记录,跳过
56
- skipped_no_seller: int = 0 # 无卖家账号(related_account 为空),跳过
57
- created: int = 0 # 新创建 SALE 记录数
58
- failed: int = 0 # 单条处理失败数
59
- failed_tx_ids: List[str] = field(default_factory=list) # 失败的 PURCHASE tx_id
60
-
61
- def __str__(self) -> str:
62
- return f"""
63
- ========== 迁移统计报告 ==========
64
- PURCHASE 记录总数: {self.total_purchase}
65
- 已有对应 SALE(跳过): {self.already_exists}
66
- 无卖家账号(跳过): {self.skipped_no_seller}
67
- 新创建 SALE 记录: {self.created}
68
- 处理失败: {self.failed}
69
- {f" 失败的 tx_id: {self.failed_tx_ids}" if self.failed_tx_ids else ""}
70
- ==================================
71
- """
72
-
73
-
74
- # ==========================================
75
- # 🔗 数据库连接
76
- # ==========================================
77
-
78
- def get_database_session() -> Session:
79
- """创建数据库会话"""
80
- engine = create_engine(SQLALCHEMY_DATABASE_URL)
81
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
82
- return SessionLocal()
83
-
84
-
85
- # ==========================================
86
- # 🔍 核心检测逻辑
87
- # ==========================================
88
-
89
- def sale_record_exists(session: Session, purchase: Transaction) -> bool:
90
- """
91
- 检查是否已存在对应的 SALE 记录(幂等校验)
92
-
93
- 匹配条件:
94
- - tx_type = "SALE"
95
- - account = purchase.related_account(卖家)
96
- - item_id = purchase.item_id
97
- - amount = abs(purchase.amount)(PURCHASE 为负数,SALE 为正数)
98
- """
99
- seller_account = purchase.related_account
100
- if not seller_account:
101
- return False
102
-
103
- sale_amount = abs(purchase.amount)
104
-
105
- existing = session.query(Transaction).filter(
106
- Transaction.tx_type == "SALE",
107
- Transaction.account == seller_account,
108
- Transaction.item_id == purchase.item_id,
109
- Transaction.amount == sale_amount,
110
- Transaction.related_account == purchase.account,
111
- ).first()
112
-
113
- return existing is not None
114
-
115
-
116
- def build_sale_record(purchase: Transaction) -> Optional[Dict]:
117
- """
118
- 根据 PURCHASE 记录构造 SALE 记录的字段
119
-
120
- 返回 None 表示无法构造(如无卖家账号)
121
- """
122
- seller_account = purchase.related_account
123
- if not seller_account:
124
- return None
125
-
126
- item_title = purchase.item_title or purchase.item_id or ""
127
- sale_amount = abs(purchase.amount)
128
-
129
- # tx_id 格式:SALE_MIGRATE_{原PURCHASE的tx_id尾段}_{随机6位}
130
- # 取 PURCHASE tx_id 的最后一段(通常为 BUY_{ts}_{hex6},取 hex6 部分)
131
- purchase_suffix = purchase.tx_id.rsplit("_", 1)[-1] if "_" in purchase.tx_id else purchase.tx_id[-6:]
132
- sale_tx_id = f"SALE_MIGRATE_{purchase_suffix}_{uuid.uuid4().hex[:6]}"
133
-
134
- return {
135
- "tx_id": sale_tx_id,
136
- "account": seller_account,
137
- "tx_type": "SALE",
138
- "amount": sale_amount,
139
- "related_account": purchase.account,
140
- "item_id": purchase.item_id,
141
- "item_title": purchase.item_title,
142
- "item_type": purchase.item_type,
143
- "related_user_name": purchase.related_user_name,
144
- "description": f"销售收入(历史补全): {item_title}",
145
- "created_at": purchase.created_at,
146
- # 迁移记录无需防篡改哈希链,留空
147
- "prev_hash": "MIGRATE_NO_HASH",
148
- "tx_hash": "MIGRATE_NO_HASH",
149
- }
150
-
151
-
152
- # ==========================================
153
- # 👁️ 预览模式
154
- # ==========================================
155
-
156
- def preview_migration(session: Session) -> Tuple[List[Dict], MigrationStats]:
157
- """
158
- 预览模式:扫描所有 PURCHASE 记录,返回待创建列表和统计
159
-
160
- 不写入数据库。
161
- """
162
- print("\n========== 迁移预览模式 ==========")
163
- print("⚠️ 当前为预览模式,不会实际修改数据")
164
- print(" 添加 --execute 参数以实际执行迁移\n")
165
-
166
- stats = MigrationStats()
167
- to_create: List[Dict] = []
168
-
169
- purchases = session.query(Transaction).filter(
170
- Transaction.tx_type == "PURCHASE"
171
- ).order_by(Transaction.created_at.asc()).all()
172
-
173
- stats.total_purchase = len(purchases)
174
- print(f"共发现 {stats.total_purchase} 条 PURCHASE 记录,开始扫描...\n")
175
-
176
- for purchase in purchases:
177
- # 无卖家
178
- if not purchase.related_account:
179
- stats.skipped_no_seller += 1
180
- continue
181
-
182
- # 已有 SALE
183
- if sale_record_exists(session, purchase):
184
- stats.already_exists += 1
185
- continue
186
-
187
- # 构造 SALE 记录
188
- sale_data = build_sale_record(purchase)
189
- if sale_data:
190
- to_create.append(sale_data)
191
- stats.created += 1
192
-
193
- # 打印预览详情(最多前 20 条)
194
- print(f"发现 {len(to_create)} 条需要补全的 SALE 记录:\n")
195
- display_limit = 20
196
- for i, sale in enumerate(to_create[:display_limit], 1):
197
- print(f" {i:3d}. 卖家: {sale['account']}")
198
- print(f" 买家: {sale['related_account']}")
199
- print(f" 资源: {sale['item_id']} | {sale['item_title'] or '(无标题)'}")
200
- print(f" 金额: +{sale['amount']} 分")
201
- print(f" 时间: {sale['created_at']}")
202
- print(f" 新 tx_id: {sale['tx_id']}\n")
203
-
204
- if len(to_create) > display_limit:
205
- print(f" ... 还有 {len(to_create) - display_limit} 条未显示\n")
206
-
207
- # 预览模式下 created 字段表示"待创建"数量
208
- print(stats)
209
- return to_create, stats
210
-
211
-
212
- # ==========================================
213
- # ⚡ 执行模式
214
- # ==========================================
215
-
216
- BATCH_SIZE = 100 # 每批提交条数
217
-
218
-
219
- def execute_migration(session: Session):
220
- """
221
- 执行实际迁移:
222
- - 逐条处理,幂等检测
223
- - 每 BATCH_SIZE 条 commit 一次
224
- - 单条失败不影响其他记录
225
- """
226
- print("\n========== 开始执行迁移 ==========\n")
227
-
228
- stats = MigrationStats()
229
-
230
- purchases = session.query(Transaction).filter(
231
- Transaction.tx_type == "PURCHASE"
232
- ).order_by(Transaction.created_at.asc()).all()
233
-
234
- stats.total_purchase = len(purchases)
235
- print(f"共发现 {stats.total_purchase} 条 PURCHASE 记录,开始处理...\n")
236
-
237
- batch_pending = 0 # 当前批次待提交数
238
-
239
- for idx, purchase in enumerate(purchases, 1):
240
- try:
241
- # ---- 无卖家,跳过 ----
242
- if not purchase.related_account:
243
- stats.skipped_no_seller += 1
244
- continue
245
-
246
- # ---- 已有 SALE,跳过 ----
247
- if sale_record_exists(session, purchase):
248
- stats.already_exists += 1
249
- continue
250
-
251
- # ---- 构造并插入 SALE 记录 ----
252
- sale_data = build_sale_record(purchase)
253
- if not sale_data:
254
- stats.skipped_no_seller += 1
255
- continue
256
-
257
- sale_tx = Transaction(**sale_data)
258
- session.add(sale_tx)
259
- stats.created += 1
260
- batch_pending += 1
261
-
262
- # ---- 每 BATCH_SIZE 条提交一次 ----
263
- if batch_pending >= BATCH_SIZE:
264
- session.commit()
265
- print(f" [进度] 已处理 {idx}/{stats.total_purchase},"
266
- f"本批次提交 {batch_pending} 条,"
267
- f"累计创建 {stats.created} 条,"
268
- f"跳过 {stats.already_exists + stats.skipped_no_seller} 条")
269
- batch_pending = 0
270
-
271
- except SQLAlchemyError as e:
272
- # 单条失败:回滚当前批次,记录错误,继续下一条
273
- session.rollback()
274
- batch_pending = 0
275
- stats.failed += 1
276
- stats.failed_tx_ids.append(purchase.tx_id)
277
- print(f" ⚠️ 处理 PURCHASE {purchase.tx_id} 失败: {type(e).__name__}: {e}")
278
-
279
- except Exception as e:
280
- session.rollback()
281
- batch_pending = 0
282
- stats.failed += 1
283
- stats.failed_tx_ids.append(purchase.tx_id)
284
- print(f" ⚠️ 处理 PURCHASE {purchase.tx_id} 异常: {type(e).__name__}: {e}")
285
-
286
- # ---- 提交剩余记录 ----
287
- if batch_pending > 0:
288
- try:
289
- session.commit()
290
- print(f" [进度] 最后一批提交 {batch_pending} 条")
291
- except SQLAlchemyError as e:
292
- session.rollback()
293
- stats.failed += batch_pending
294
- stats.created -= batch_pending
295
- print(f" 🚨 最后一批提交失败: {e}")
296
-
297
- print("\n✅ 迁移执行完毕!")
298
- print(stats)
299
-
300
-
301
- # ==========================================
302
- # 🚀 入口
303
- # ==========================================
304
-
305
- def main():
306
- parser = argparse.ArgumentParser(
307
- description="为历史购买记录补全卖家 SALE 交易记录",
308
- formatter_class=argparse.RawDescriptionHelpFormatter,
309
- epilog="""
310
- 示例:
311
- python 迁移_补全历史SALE记录.py # 预览模式(默认)
312
- python 迁移_补全历史SALE记录.py --execute # 实际执行
313
- """
314
- )
315
- parser.add_argument(
316
- "--execute",
317
- action="store_true",
318
- help="实际执行迁移(默认仅预览)"
319
- )
320
- args = parser.parse_args()
321
-
322
- session = None
323
- try:
324
- print("正在连接数据库...")
325
- session = get_database_session()
326
- print("✅ 数据库连接成功\n")
327
-
328
- if args.execute:
329
- execute_migration(session)
330
- else:
331
- preview_migration(session)
332
-
333
- except KeyboardInterrupt:
334
- print("\n⚠️ 用户中断,已退出")
335
- if session:
336
- session.rollback()
337
- except Exception as e:
338
- print(f"\n🚨 程序执行出错:")
339
- print(f" 错误类型: {type(e).__name__}")
340
- print(f" 错误信息: {str(e)}")
341
- if session:
342
- session.rollback()
343
- sys.exit(1)
344
- finally:
345
- if session:
346
- session.close()
347
- print("\n数据库连接已关闭")
348
-
349
-
350
- if __name__ == "__main__":
351
- main()