ZHIWEI666 commited on
Commit
b98ffc6
·
verified ·
1 Parent(s): a9092cc

Delete 迁移_余额合并.py

Browse files
Files changed (1) hide show
  1. 迁移_余额合并.py +0 -262
迁移_余额合并.py DELETED
@@ -1,262 +0,0 @@
1
- #!/usr/bin/env python3
2
- # -*- coding: utf-8 -*-
3
- """
4
- 迁移脚本:合并用户钱包余额
5
- ====================================
6
- 功能:将现有用户的 earn_balance + tip_balance 合并到 balance 中
7
-
8
- 执行逻辑:
9
- wallet.balance += wallet.earn_balance + wallet.tip_balance
10
- # earn_balance 和 tip_balance 保留原值不变(作为历史累计统计)
11
-
12
- 用法:
13
- python 迁移_余额合并.py # 预览模式(dry-run,默认)
14
- python 迁移_余额合并.py --execute # 实际执行迁移
15
-
16
- 作者:数据迁移脚本
17
- 创建日期:2026-04-10
18
- """
19
-
20
- import os
21
- import sys
22
- import argparse
23
- from typing import Dict, List, Tuple
24
- from dataclasses import dataclass
25
-
26
- # 添加当前目录到路径,确保可以导入项目模块
27
- script_dir = os.path.dirname(os.path.abspath(__file__))
28
- if script_dir not in sys.path:
29
- sys.path.insert(0, script_dir)
30
-
31
- from sqlalchemy import create_engine, text
32
- from sqlalchemy.orm import sessionmaker, Session
33
- from sqlalchemy.exc import SQLAlchemyError
34
-
35
- # 导入项目配置
36
- from database_sql import Base, SQLALCHEMY_DATABASE_URL
37
- from models_sql import Wallet
38
-
39
-
40
- @dataclass
41
- class MigrationStats:
42
- """迁移统计信息"""
43
- total_users: int = 0 # 总用户数
44
- affected_users: int = 0 # 受影响用户数(需要迁移的)
45
- skipped_users: int = 0 # 跳过的用户数(earn 和 tip 都为0)
46
-
47
- # 金额统计(单位:分)
48
- total_balance_before: int = 0 # 迁移前总 balance
49
- total_balance_after: int = 0 # 迁移后总 balance
50
- total_earn_merged: int = 0 # 合并的 earn_balance 总额
51
- total_tip_merged: int = 0 # 合并的 tip_balance 总额
52
-
53
- def __str__(self) -> str:
54
- return f"""
55
- ========== 迁移统计报告 ==========
56
- 用户统计:
57
- - 总用户数: {self.total_users}
58
- - 受影响用户数: {self.affected_users}
59
- - 跳过用户数 (earn=0 且 tip=0): {self.skipped_users}
60
-
61
- 金额统计(单位:分):
62
- - 迁移前总 balance: {self.total_balance_before}
63
- - 迁移后总 balance: {self.total_balance_after}
64
- - 合并的 earn_balance: {self.total_earn_merged}
65
- - 合并的 tip_balance: {self.total_tip_merged}
66
- - 总增加金额: {self.total_earn_merged + self.total_tip_merged}
67
-
68
- ==================================
69
- """
70
-
71
-
72
- def get_database_session() -> Session:
73
- """创建数据库会话"""
74
- engine = create_engine(SQLALCHEMY_DATABASE_URL)
75
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
76
- return SessionLocal()
77
-
78
-
79
- def fetch_wallet_data(session: Session) -> List[Dict]:
80
- """获取所有钱包数据"""
81
- wallets = session.query(Wallet).all()
82
- return [
83
- {
84
- "account": w.account,
85
- "balance": w.balance or 0,
86
- "earn_balance": w.earn_balance or 0,
87
- "tip_balance": w.tip_balance or 0,
88
- }
89
- for w in wallets
90
- ]
91
-
92
-
93
- def calculate_migration(wallets: List[Dict]) -> Tuple[List[Dict], MigrationStats]:
94
- """
95
- 计算迁移结果
96
-
97
- 返回:
98
- - 需要更新的钱包列表
99
- - 统计信息
100
- """
101
- stats = MigrationStats()
102
- updates = []
103
-
104
- stats.total_users = len(wallets)
105
-
106
- for wallet in wallets:
107
- account = wallet["account"]
108
- balance = wallet["balance"]
109
- earn = wallet["earn_balance"]
110
- tip = wallet["tip_balance"]
111
-
112
- stats.total_balance_before += balance
113
-
114
- # 跳过 earn 和 tip 都为 0 的用户
115
- if earn == 0 and tip == 0:
116
- stats.skipped_users += 1
117
- stats.total_balance_after += balance
118
- continue
119
-
120
- # 计算新 balance
121
- new_balance = balance + earn + tip
122
-
123
- stats.affected_users += 1
124
- stats.total_earn_merged += earn
125
- stats.total_tip_merged += tip
126
- stats.total_balance_after += new_balance
127
-
128
- updates.append({
129
- "account": account,
130
- "old_balance": balance,
131
- "new_balance": new_balance,
132
- "earn": earn,
133
- "tip": tip,
134
- })
135
-
136
- return updates, stats
137
-
138
-
139
- def preview_migration(updates: List[Dict], stats: MigrationStats):
140
- """打印迁移预览信息"""
141
- print("\n========== 迁移预览模式 ==========")
142
- print("⚠️ 当前为预览模式,不会实际修改数据")
143
- print(" 添加 --execute 参数以实际执行迁移\n")
144
-
145
- print(f"发现 {stats.affected_users} 个用户需要迁移:\n")
146
-
147
- # 打印前10条详细信息
148
- display_limit = 10
149
- for i, update in enumerate(updates[:display_limit], 1):
150
- print(f" {i}. 用户: {update['account']}")
151
- print(f" 当前 balance: {update['old_balance']} 分")
152
- print(f" earn_balance: {update['earn']} 分")
153
- print(f" tip_balance: {update['tip']} 分")
154
- print(f" 新 balance: {update['new_balance']} 分 (+{update['earn'] + update['tip']})\n")
155
-
156
- if len(updates) > display_limit:
157
- print(f" ... 还有 {len(updates) - display_limit} 个用户未显示\n")
158
-
159
- print(stats)
160
-
161
-
162
- def execute_migration(session: Session, updates: List[Dict], stats: MigrationStats):
163
- """
164
- 执行实际迁移
165
-
166
- 在事务中批量执行,确保原子性
167
- """
168
- print("\n========== 开始执行迁移 ==========")
169
- print(f"将更新 {len(updates)} 个用户的钱包数据\n")
170
-
171
- try:
172
- # 开始事务
173
- for update in updates:
174
- account = update["account"]
175
- new_balance = update["new_balance"]
176
-
177
- # 使用乐观锁更新,防止并发问题
178
- result = session.query(Wallet).filter(
179
- Wallet.account == account
180
- ).update({
181
- Wallet.balance: new_balance
182
- }, synchronize_session=False)
183
-
184
- if result == 0:
185
- print(f"⚠️ 用户 {account} 更新失败:未找到记录")
186
-
187
- # 提交事务
188
- session.commit()
189
-
190
- print("✅ 迁移执行成功!")
191
- print(stats)
192
-
193
- except SQLAlchemyError as e:
194
- session.rollback()
195
- print(f"\n🚨 迁移失败,已回滚所有更改:")
196
- print(f" 错误类型: {type(e).__name__}")
197
- print(f" 错误信息: {str(e)}")
198
- raise
199
-
200
-
201
- def main():
202
- """主函数"""
203
- parser = argparse.ArgumentParser(
204
- description="合并用户钱包余额迁移脚本",
205
- formatter_class=argparse.RawDescriptionHelpFormatter,
206
- epilog="""
207
- 示例:
208
- python 迁移_余额合并.py # 预览模式
209
- python 迁移_余额合并.py --execute # 实际执行
210
- """
211
- )
212
- parser.add_argument(
213
- "--execute",
214
- action="store_true",
215
- help="实际执行迁移(默认仅预览)"
216
- )
217
-
218
- args = parser.parse_args()
219
-
220
- session = None
221
- try:
222
- # 获取数据库会话
223
- print("正在连接数据库...")
224
- session = get_database_session()
225
- print("✅ 数据库连接成功\n")
226
-
227
- # 获取钱包数据
228
- print("正在获取钱包数据...")
229
- wallets = fetch_wallet_data(session)
230
- print(f"✅ 获取到 {len(wallets)} 个用户钱包数据\n")
231
-
232
- if len(wallets) == 0:
233
- print("⚠️ 没有用户钱包数据,无需迁移")
234
- return
235
-
236
- # 计算迁移
237
- updates, stats = calculate_migration(wallets)
238
-
239
- if stats.affected_users == 0:
240
- print("✅ 没有需要迁移的用户(所有用户的 earn_balance 和 tip_balance 都为 0)")
241
- print(stats)
242
- return
243
-
244
- # 预览或执行
245
- if args.execute:
246
- execute_migration(session, updates, stats)
247
- else:
248
- preview_migration(updates, stats)
249
-
250
- except Exception as e:
251
- print(f"\n🚨 程序执行出错:")
252
- print(f" 错误类型: {type(e).__name__}")
253
- print(f" 错误信息: {str(e)}")
254
- sys.exit(1)
255
- finally:
256
- if session:
257
- session.close()
258
- print("\n数据库连接已关闭")
259
-
260
-
261
- if __name__ == "__main__":
262
- main()