ZHIWEI666 commited on
Commit
eaf4673
·
verified ·
1 Parent(s): f92968c

Delete 迁移_补全历史SALE记录.py

Browse files
Files changed (1) hide show
  1. 迁移_补全历史SALE记录.py +0 -332
迁移_补全历史SALE记录.py DELETED
@@ -1,332 +0,0 @@
1
- #!/usr/bin/env python3
2
- # -*- coding: utf-8 -*-
3
- """
4
- 迁移脚本:为历史购买记录补全卖家 SALE 交易记录
5
- ====================================================
6
- 背景:
7
- 之前购买接口只为买家生成 PURCHASE 记录,未为卖家生成 SALE 记录。
8
- 现已修复,新购买会同步生成双方记录。
9
- 本脚本为历史缺失的 SALE 记录进行补全。
10
-
11
- 迁移逻辑:
12
- 1. 查询所有 tx_type="PURCHASE" 的 Transaction 记录
13
- 2. 检查是否已存在对应 SALE 记录(幂等保障)
14
- - 匹配条件:相同 item_id + 卖家account(related_account) + tx_type="SALE" + 金额相同
15
- 3. 不存在则创建 SALE 记录,created_at 与 PURCHASE 保持一致
16
-
17
- 用法:
18
- python 迁移_补全历史SALE记录.py # 直接执行迁移(幂等设计,重复执行不产生重复记录)
19
-
20
- 作者:数据迁移脚本
21
- 创建日期:2026-05-20
22
- """
23
-
24
- import os
25
- import sys
26
- import time
27
- import uuid
28
- import argparse
29
- from dataclasses import dataclass, field
30
- from typing import List, Dict, Optional, Tuple
31
-
32
- # 添加云端Space代码目录到路径
33
- script_dir = os.path.dirname(os.path.abspath(__file__))
34
- parent_dir = os.path.dirname(script_dir)
35
- if parent_dir not in sys.path:
36
- sys.path.insert(0, parent_dir)
37
-
38
- from sqlalchemy import create_engine
39
- from sqlalchemy.orm import sessionmaker, Session
40
- from sqlalchemy.exc import SQLAlchemyError
41
-
42
- from database_sql import SQLALCHEMY_DATABASE_URL
43
- from models_sql import Transaction
44
-
45
-
46
- # ==========================================
47
- # 📊 统计数据结构
48
- # ==========================================
49
-
50
- @dataclass
51
- class MigrationStats:
52
- """迁移统计信息"""
53
- total_purchase: int = 0 # PURCHASE 记录总数
54
- already_exists: int = 0 # 已有对应 SALE 记录,跳过
55
- skipped_no_seller: int = 0 # 无卖家账号(related_account 为空),跳过
56
- created: int = 0 # 新创建 SALE 记录数
57
- failed: int = 0 # 单条处理失败数
58
- failed_tx_ids: List[str] = field(default_factory=list) # 失败的 PURCHASE tx_id
59
-
60
- def __str__(self) -> str:
61
- return f"""
62
- ========== 迁移统计报告 ==========
63
- PURCHASE 记录总数: {self.total_purchase}
64
- 已有对应 SALE(跳过): {self.already_exists}
65
- 无卖家账号(跳过): {self.skipped_no_seller}
66
- 新创建 SALE 记录: {self.created}
67
- 处理失败: {self.failed}
68
- {f" 失败的 tx_id: {self.failed_tx_ids}" if self.failed_tx_ids else ""}
69
- ==================================
70
- """
71
-
72
-
73
- # ==========================================
74
- # 🔗 数据库连接
75
- # ==========================================
76
-
77
- def get_database_session() -> Session:
78
- """创建数据库会话"""
79
- engine = create_engine(SQLALCHEMY_DATABASE_URL)
80
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
81
- return SessionLocal()
82
-
83
-
84
- # ==========================================
85
- # 🔍 核心检测逻辑
86
- # ==========================================
87
-
88
- def sale_record_exists(session: Session, purchase: Transaction) -> bool:
89
- """
90
- 检查是否已存在对应的 SALE 记录(幂等校验)
91
-
92
- 匹配条件:
93
- - tx_type = "SALE"
94
- - account = purchase.related_account(卖家)
95
- - item_id = purchase.item_id
96
- - amount = abs(purchase.amount)(PURCHASE 为负数,SALE 为正数)
97
- """
98
- seller_account = purchase.related_account
99
- if not seller_account:
100
- return False
101
-
102
- sale_amount = abs(purchase.amount)
103
-
104
- existing = session.query(Transaction).filter(
105
- Transaction.tx_type == "SALE",
106
- Transaction.account == seller_account,
107
- Transaction.item_id == purchase.item_id,
108
- Transaction.amount == sale_amount,
109
- Transaction.related_account == purchase.account,
110
- ).first()
111
-
112
- return existing is not None
113
-
114
-
115
- def build_sale_record(purchase: Transaction) -> Optional[Dict]:
116
- """
117
- 根据 PURCHASE 记录构造 SALE 记录的字段
118
-
119
- 返回 None 表示无法构造(如无卖家账号)
120
- """
121
- seller_account = purchase.related_account
122
- if not seller_account:
123
- return None
124
-
125
- item_title = purchase.item_title or purchase.item_id or ""
126
- sale_amount = abs(purchase.amount)
127
-
128
- # tx_id 格式:SALE_MIGRATE_{原PURCHASE的tx_id尾段}_{随机6位}
129
- # 取 PURCHASE tx_id 的最后一段(通常为 BUY_{ts}_{hex6},取 hex6 部分)
130
- purchase_suffix = purchase.tx_id.rsplit("_", 1)[-1] if "_" in purchase.tx_id else purchase.tx_id[-6:]
131
- sale_tx_id = f"SALE_MIGRATE_{purchase_suffix}_{uuid.uuid4().hex[:6]}"
132
-
133
- return {
134
- "tx_id": sale_tx_id,
135
- "account": seller_account,
136
- "tx_type": "SALE",
137
- "amount": sale_amount,
138
- "related_account": purchase.account,
139
- "item_id": purchase.item_id,
140
- "item_title": purchase.item_title,
141
- "item_type": purchase.item_type,
142
- "related_user_name": purchase.related_user_name,
143
- "description": f"销售收入(历史补全): {item_title}",
144
- "created_at": purchase.created_at,
145
- # 迁移记录无需防篡改哈希链,留空
146
- "prev_hash": "MIGRATE_NO_HASH",
147
- "tx_hash": "MIGRATE_NO_HASH",
148
- }
149
-
150
-
151
- # ==========================================
152
- # 👁️ 预览模式
153
- # ==========================================
154
-
155
- def preview_migration(session: Session) -> Tuple[List[Dict], MigrationStats]:
156
- """
157
- 预览模式:扫描所有 PURCHASE 记录,返回待创建列表和统计
158
-
159
- 不写入数据库。
160
- """
161
- print("\n========== 迁移预览模式 ==========")
162
- print("⚠️ 当前为预览模式,不会实际修改数据")
163
- print(" 添加 --execute 参数以实际执行迁移\n")
164
-
165
- stats = MigrationStats()
166
- to_create: List[Dict] = []
167
-
168
- purchases = session.query(Transaction).filter(
169
- Transaction.tx_type == "PURCHASE"
170
- ).order_by(Transaction.created_at.asc()).all()
171
-
172
- stats.total_purchase = len(purchases)
173
- print(f"共发现 {stats.total_purchase} 条 PURCHASE 记录,开始扫描...\n")
174
-
175
- for purchase in purchases:
176
- # 无卖家
177
- if not purchase.related_account:
178
- stats.skipped_no_seller += 1
179
- continue
180
-
181
- # 已有 SALE
182
- if sale_record_exists(session, purchase):
183
- stats.already_exists += 1
184
- continue
185
-
186
- # 构造 SALE 记录
187
- sale_data = build_sale_record(purchase)
188
- if sale_data:
189
- to_create.append(sale_data)
190
- stats.created += 1
191
-
192
- # 打印预览详情(最多前 20 条)
193
- print(f"发现 {len(to_create)} 条需要补全的 SALE 记录:\n")
194
- display_limit = 20
195
- for i, sale in enumerate(to_create[:display_limit], 1):
196
- print(f" {i:3d}. 卖家: {sale['account']}")
197
- print(f" 买家: {sale['related_account']}")
198
- print(f" 资源: {sale['item_id']} | {sale['item_title'] or '(无标题)'}")
199
- print(f" 金额: +{sale['amount']} 分")
200
- print(f" 时间: {sale['created_at']}")
201
- print(f" 新 tx_id: {sale['tx_id']}\n")
202
-
203
- if len(to_create) > display_limit:
204
- print(f" ... 还有 {len(to_create) - display_limit} 条未显示\n")
205
-
206
- # 预览模式下 created 字段表示"待创建"数量
207
- print(stats)
208
- return to_create, stats
209
-
210
-
211
- # ==========================================
212
- # ⚡ 执行模式
213
- # ==========================================
214
-
215
- BATCH_SIZE = 100 # 每批提交条数
216
-
217
-
218
- def execute_migration(session: Session):
219
- """
220
- 执行实际迁移:
221
- - 逐条处理,幂等检测
222
- - 每 BATCH_SIZE 条 commit 一次
223
- - 单条失败不影响其他记录
224
- """
225
- print("\n========== 开始执行迁移 ==========\n")
226
-
227
- stats = MigrationStats()
228
-
229
- purchases = session.query(Transaction).filter(
230
- Transaction.tx_type == "PURCHASE"
231
- ).order_by(Transaction.created_at.asc()).all()
232
-
233
- stats.total_purchase = len(purchases)
234
- print(f"共发现 {stats.total_purchase} 条 PURCHASE 记录,开始处理...\n")
235
-
236
- batch_pending = 0 # 当前批次待提交数
237
-
238
- for idx, purchase in enumerate(purchases, 1):
239
- try:
240
- # ---- 无卖家,跳过 ----
241
- if not purchase.related_account:
242
- stats.skipped_no_seller += 1
243
- continue
244
-
245
- # ---- 已有 SALE,跳过 ----
246
- if sale_record_exists(session, purchase):
247
- stats.already_exists += 1
248
- continue
249
-
250
- # ---- 构造并插入 SALE 记录 ----
251
- sale_data = build_sale_record(purchase)
252
- if not sale_data:
253
- stats.skipped_no_seller += 1
254
- continue
255
-
256
- sale_tx = Transaction(**sale_data)
257
- session.add(sale_tx)
258
- stats.created += 1
259
- batch_pending += 1
260
-
261
- # ---- 每 BATCH_SIZE 条提交一次 ----
262
- if batch_pending >= BATCH_SIZE:
263
- session.commit()
264
- print(f" [进度] 已处理 {idx}/{stats.total_purchase},"
265
- f"本批次提交 {batch_pending} 条,"
266
- f"累计创建 {stats.created} 条,"
267
- f"跳过 {stats.already_exists + stats.skipped_no_seller} 条")
268
- batch_pending = 0
269
-
270
- except SQLAlchemyError as e:
271
- # 单条失败:回滚当前批次,记录错误,继续下一条
272
- session.rollback()
273
- batch_pending = 0
274
- stats.failed += 1
275
- stats.failed_tx_ids.append(purchase.tx_id)
276
- print(f" ⚠️ 处理 PURCHASE {purchase.tx_id} 失败: {type(e).__name__}: {e}")
277
-
278
- except Exception as e:
279
- session.rollback()
280
- batch_pending = 0
281
- stats.failed += 1
282
- stats.failed_tx_ids.append(purchase.tx_id)
283
- print(f" ⚠️ 处理 PURCHASE {purchase.tx_id} 异常: {type(e).__name__}: {e}")
284
-
285
- # ---- 提交剩余记录 ----
286
- if batch_pending > 0:
287
- try:
288
- session.commit()
289
- print(f" [进度] 最后一批提交 {batch_pending} 条")
290
- except SQLAlchemyError as e:
291
- session.rollback()
292
- stats.failed += batch_pending
293
- stats.created -= batch_pending
294
- print(f" 🚨 最后一批提交失败: {e}")
295
-
296
- print("\n✅ 迁移执行完毕!")
297
- print(stats)
298
-
299
-
300
- # ==========================================
301
- # 🚀 入口
302
- # ==========================================
303
-
304
- def main():
305
- session = None
306
- try:
307
- print("正在连接数据库...")
308
- session = get_database_session()
309
- print("✅ 数据库连接成功\n")
310
-
311
- # 直接执行迁移(脚本本身有幂等防重复设计)
312
- execute_migration(session)
313
-
314
- except KeyboardInterrupt:
315
- print("\n⚠️ 用户中断,已退出")
316
- if session:
317
- session.rollback()
318
- except Exception as e:
319
- print(f"\n🚨 程序执行出错:")
320
- print(f" 错误类型: {type(e).__name__}")
321
- print(f" 错误信息: {str(e)}")
322
- if session:
323
- session.rollback()
324
- sys.exit(1)
325
- finally:
326
- if session:
327
- session.close()
328
- print("\n数据库连接已关闭")
329
-
330
-
331
- if __name__ == "__main__":
332
- main()