ZHIWEI666 commited on
Commit
7e1e7c0
·
verified ·
1 Parent(s): a756543

Upload 迁移_余额合并.py

Browse files
Files changed (1) hide show
  1. 迁移_余额合并.py +262 -0
迁移_余额合并.py ADDED
@@ -0,0 +1,262 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ # -*- coding: utf-8 -*-
3
+ """
4
+ 迁移脚本:合并用户钱包余额
5
+ ====================================
6
+ 功能:将现有用户的 earn_balance + tip_balance 合并到 balance 中
7
+
8
+ 执行逻辑:
9
+ wallet.balance += wallet.earn_balance + wallet.tip_balance
10
+ # earn_balance 和 tip_balance 保留原值不变(作为历史累计统计)
11
+
12
+ 用法:
13
+ python 迁移_余额合并.py # 预览模式(dry-run,默认)
14
+ python 迁移_余额合并.py --execute # 实际执行迁移
15
+
16
+ 作者:数据迁移脚本
17
+ 创建日期:2026-04-10
18
+ """
19
+
20
+ import os
21
+ import sys
22
+ import argparse
23
+ from typing import Dict, List, Tuple
24
+ from dataclasses import dataclass
25
+
26
+ # 添加当前目录到路径,确保可以导入项目模块
27
+ script_dir = os.path.dirname(os.path.abspath(__file__))
28
+ if script_dir not in sys.path:
29
+ sys.path.insert(0, script_dir)
30
+
31
+ from sqlalchemy import create_engine, text
32
+ from sqlalchemy.orm import sessionmaker, Session
33
+ from sqlalchemy.exc import SQLAlchemyError
34
+
35
+ # 导入项目配置
36
+ from database_sql import Base, SQLALCHEMY_DATABASE_URL
37
+ from models_sql import Wallet
38
+
39
+
40
+ @dataclass
41
+ class MigrationStats:
42
+ """迁移统计信息"""
43
+ total_users: int = 0 # 总用户数
44
+ affected_users: int = 0 # 受影响用户数(需要迁移的)
45
+ skipped_users: int = 0 # 跳过的用户数(earn 和 tip 都为0)
46
+
47
+ # 金额统计(单位:分)
48
+ total_balance_before: int = 0 # 迁移前总 balance
49
+ total_balance_after: int = 0 # 迁移后总 balance
50
+ total_earn_merged: int = 0 # 合并的 earn_balance 总额
51
+ total_tip_merged: int = 0 # 合并的 tip_balance 总额
52
+
53
+ def __str__(self) -> str:
54
+ return f"""
55
+ ========== 迁移统计报告 ==========
56
+ 用户统计:
57
+ - 总用户数: {self.total_users}
58
+ - 受影响用户数: {self.affected_users}
59
+ - 跳过用户数 (earn=0 且 tip=0): {self.skipped_users}
60
+
61
+ 金额统计(单位:分):
62
+ - 迁移前总 balance: {self.total_balance_before}
63
+ - 迁移后总 balance: {self.total_balance_after}
64
+ - 合并的 earn_balance: {self.total_earn_merged}
65
+ - 合并的 tip_balance: {self.total_tip_merged}
66
+ - 总增加金额: {self.total_earn_merged + self.total_tip_merged}
67
+
68
+ ==================================
69
+ """
70
+
71
+
72
+ def get_database_session() -> Session:
73
+ """创建数据库会话"""
74
+ engine = create_engine(SQLALCHEMY_DATABASE_URL)
75
+ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
76
+ return SessionLocal()
77
+
78
+
79
+ def fetch_wallet_data(session: Session) -> List[Dict]:
80
+ """获取所有钱包数据"""
81
+ wallets = session.query(Wallet).all()
82
+ return [
83
+ {
84
+ "account": w.account,
85
+ "balance": w.balance or 0,
86
+ "earn_balance": w.earn_balance or 0,
87
+ "tip_balance": w.tip_balance or 0,
88
+ }
89
+ for w in wallets
90
+ ]
91
+
92
+
93
+ def calculate_migration(wallets: List[Dict]) -> Tuple[List[Dict], MigrationStats]:
94
+ """
95
+ 计算迁移结果
96
+
97
+ 返回:
98
+ - 需要更新的钱包列表
99
+ - 统计信息
100
+ """
101
+ stats = MigrationStats()
102
+ updates = []
103
+
104
+ stats.total_users = len(wallets)
105
+
106
+ for wallet in wallets:
107
+ account = wallet["account"]
108
+ balance = wallet["balance"]
109
+ earn = wallet["earn_balance"]
110
+ tip = wallet["tip_balance"]
111
+
112
+ stats.total_balance_before += balance
113
+
114
+ # 跳过 earn 和 tip 都为 0 的用户
115
+ if earn == 0 and tip == 0:
116
+ stats.skipped_users += 1
117
+ stats.total_balance_after += balance
118
+ continue
119
+
120
+ # 计算新 balance
121
+ new_balance = balance + earn + tip
122
+
123
+ stats.affected_users += 1
124
+ stats.total_earn_merged += earn
125
+ stats.total_tip_merged += tip
126
+ stats.total_balance_after += new_balance
127
+
128
+ updates.append({
129
+ "account": account,
130
+ "old_balance": balance,
131
+ "new_balance": new_balance,
132
+ "earn": earn,
133
+ "tip": tip,
134
+ })
135
+
136
+ return updates, stats
137
+
138
+
139
+ def preview_migration(updates: List[Dict], stats: MigrationStats):
140
+ """打印迁移预览信息"""
141
+ print("\n========== 迁移预览模式 ==========")
142
+ print("⚠️ 当前为预览模式,不会实际修改数据")
143
+ print(" 添加 --execute 参数以实际执行迁移\n")
144
+
145
+ print(f"发现 {stats.affected_users} 个用户需要迁移:\n")
146
+
147
+ # 打印前10条详细信息
148
+ display_limit = 10
149
+ for i, update in enumerate(updates[:display_limit], 1):
150
+ print(f" {i}. 用户: {update['account']}")
151
+ print(f" 当前 balance: {update['old_balance']} 分")
152
+ print(f" earn_balance: {update['earn']} 分")
153
+ print(f" tip_balance: {update['tip']} 分")
154
+ print(f" 新 balance: {update['new_balance']} 分 (+{update['earn'] + update['tip']})\n")
155
+
156
+ if len(updates) > display_limit:
157
+ print(f" ... 还有 {len(updates) - display_limit} 个用户未显示\n")
158
+
159
+ print(stats)
160
+
161
+
162
+ def execute_migration(session: Session, updates: List[Dict], stats: MigrationStats):
163
+ """
164
+ 执行实际迁移
165
+
166
+ 在事务中批量执行,确保原子性
167
+ """
168
+ print("\n========== 开始执行迁移 ==========")
169
+ print(f"将更新 {len(updates)} 个用户的钱包数据\n")
170
+
171
+ try:
172
+ # 开始事务
173
+ for update in updates:
174
+ account = update["account"]
175
+ new_balance = update["new_balance"]
176
+
177
+ # 使用乐观锁更新,防止并发问题
178
+ result = session.query(Wallet).filter(
179
+ Wallet.account == account
180
+ ).update({
181
+ Wallet.balance: new_balance
182
+ }, synchronize_session=False)
183
+
184
+ if result == 0:
185
+ print(f"⚠️ 用户 {account} 更新失败:未找到记录")
186
+
187
+ # 提交事务
188
+ session.commit()
189
+
190
+ print("✅ 迁移执行成功!")
191
+ print(stats)
192
+
193
+ except SQLAlchemyError as e:
194
+ session.rollback()
195
+ print(f"\n🚨 迁移失败,已回滚所有更改:")
196
+ print(f" 错误类型: {type(e).__name__}")
197
+ print(f" 错误信息: {str(e)}")
198
+ raise
199
+
200
+
201
+ def main():
202
+ """主函数"""
203
+ parser = argparse.ArgumentParser(
204
+ description="合并用户钱包余额迁移脚本",
205
+ formatter_class=argparse.RawDescriptionHelpFormatter,
206
+ epilog="""
207
+ 示例:
208
+ python 迁移_余额合并.py # 预览模式
209
+ python 迁移_余额合并.py --execute # 实际执行
210
+ """
211
+ )
212
+ parser.add_argument(
213
+ "--execute",
214
+ action="store_true",
215
+ help="实际执行迁移(默认仅预览)"
216
+ )
217
+
218
+ args = parser.parse_args()
219
+
220
+ session = None
221
+ try:
222
+ # 获取数据库会话
223
+ print("正在连接数据库...")
224
+ session = get_database_session()
225
+ print("✅ 数据库连接成功\n")
226
+
227
+ # 获取钱包数据
228
+ print("正在获取钱包数据...")
229
+ wallets = fetch_wallet_data(session)
230
+ print(f"✅ 获取到 {len(wallets)} 个用户钱包数据\n")
231
+
232
+ if len(wallets) == 0:
233
+ print("⚠️ 没有用户钱包数据,无需迁移")
234
+ return
235
+
236
+ # 计算迁移
237
+ updates, stats = calculate_migration(wallets)
238
+
239
+ if stats.affected_users == 0:
240
+ print("✅ 没有需要迁移的用户(所有用户的 earn_balance 和 tip_balance 都为 0)")
241
+ print(stats)
242
+ return
243
+
244
+ # 预览或执行
245
+ if args.execute:
246
+ execute_migration(session, updates, stats)
247
+ else:
248
+ preview_migration(updates, stats)
249
+
250
+ except Exception as e:
251
+ print(f"\n🚨 程序执行出错:")
252
+ print(f" 错误类型: {type(e).__name__}")
253
+ print(f" 错误信息: {str(e)}")
254
+ sys.exit(1)
255
+ finally:
256
+ if session:
257
+ session.close()
258
+ print("\n数据库连接已关闭")
259
+
260
+
261
+ if __name__ == "__main__":
262
+ main()