duqing2026's picture
init
214f078
import os
import pandas as pd
import io
import random
from datetime import datetime, timedelta
from flask import Flask, render_template, request, jsonify
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB limit
@app.route('/')
def index():
return render_template('index.html')
def safe_rename_and_filter(df, col_map):
"""
Safely rename columns and filter the DataFrame to avoid duplicate column issues.
"""
# Create a new DataFrame with only the columns we want, using the new names
new_data = {}
# Iterate over the mapping: {original_name: new_name}
# But wait, original names might be duplicated in the source CSV?
# Usually CSVs from WeChat/Alipay have unique headers, but parsing might have issues.
# Let's assume unique headers in source for now, or use the first one found.
# Invert the map for easier lookup if needed, but here we iterate map items
for original, new in col_map.items():
if original in df.columns:
# If there are duplicate columns in source with same name, df[original] returns a DataFrame
# We take the first one if that happens
series = df[original]
if isinstance(series, pd.DataFrame):
series = series.iloc[:, 0]
new_data[new] = series
# Create new DF from dict
new_df = pd.DataFrame(new_data)
# Fill missing required columns with defaults
required_cols = ['time', 'category', 'counterparty', 'desc', 'type', 'amount']
for col in required_cols:
if col not in new_df.columns:
new_df[col] = ''
return new_df
def parse_wechat_csv(file_stream):
"""解析微信账单 CSV"""
content = file_stream.read().decode('utf-8', errors='ignore')
lines = content.split('\n')
header_row = -1
for i, line in enumerate(lines):
if '交易时间' in line and '金额' in line:
header_row = i
break
if header_row == -1:
raise ValueError("无法识别微信账单格式")
file_stream.seek(0)
df = pd.read_csv(io.StringIO(content), header=header_row)
# Clean column names
df.columns = [c.strip() for c in df.columns]
# Wechat specific processing
# Filter rows first
df = df.dropna(subset=['交易时间'])
col_map = {
'金额(元)': 'amount',
'收/支': 'type',
'交易时间': 'time',
'商品': 'desc',
'交易对方': 'counterparty',
'交易类型': 'category'
}
# Use safe extraction
result_df = safe_rename_and_filter(df, col_map)
# Post-process
result_df['amount'] = result_df['amount'].astype(str).str.replace('¥', '').apply(pd.to_numeric, errors='coerce')
result_df['time'] = pd.to_datetime(result_df['time'], errors='coerce')
result_df['source'] = '微信'
return result_df
def parse_alipay_csv(file_stream):
"""解析支付宝账单 CSV"""
try:
content = file_stream.read().decode('gbk')
except:
file_stream.seek(0)
content = file_stream.read().decode('utf-8', errors='ignore')
lines = content.split('\n')
header_row = -1
for i, line in enumerate(lines):
if '交易创建时间' in line or '交易时间' in line:
if '金额' in line:
header_row = i
break
if header_row == -1:
raise ValueError("无法识别支付宝账单格式")
file_stream.seek(0)
df = pd.read_csv(io.StringIO(content), header=header_row)
df.columns = [c.strip() for c in df.columns]
# Filter valid rows
if '金额(元)' in df.columns:
df = df.dropna(subset=['金额(元)'])
elif '金额(元)' in df.columns: # Sometimes it differs
df = df.dropna(subset=['金额(元)'])
# Determine time column
time_col = '交易创建时间' if '交易创建时间' in df.columns else '交易时间'
category_col = '交易分类' if '交易分类' in df.columns else None
col_map = {
'金额(元)': 'amount',
'收/支': 'type',
time_col: 'time',
'商品名称': 'desc',
'交易对方': 'counterparty',
}
if '金额(元)' in df.columns: col_map['金额(元)'] = 'amount' # Handle variance
if category_col: col_map[category_col] = 'category'
result_df = safe_rename_and_filter(df, col_map)
if not category_col:
result_df['category'] = '其他'
result_df['amount'] = result_df['amount'].astype(str).apply(pd.to_numeric, errors='coerce')
result_df['time'] = pd.to_datetime(result_df['time'], errors='coerce')
result_df['source'] = '支付宝'
return result_df
def generate_analysis_result(df):
"""Common logic to generate analysis JSON from DataFrame"""
# Data cleaning
df['type'] = df['type'].astype(str).str.strip()
df = df[df['type'].isin(['收入', '支出', '收', '支', 'Income', 'Expense'])]
# Normalize type
def normalize_type(t):
if t in ['收入', '收', 'Income']: return 'Income'
return 'Expense'
df['type'] = df['type'].apply(normalize_type)
# Ensure time is datetime
if not pd.api.types.is_datetime64_any_dtype(df['time']):
df['time'] = pd.to_datetime(df['time'], errors='coerce')
# Remove rows with invalid time or amount
df = df.dropna(subset=['time', 'amount'])
# Add date columns
df['date_str'] = df['time'].dt.strftime('%Y-%m-%d')
df['month_str'] = df['time'].dt.strftime('%Y-%m')
# 1. Summary
total_income = df[df['type'] == 'Income']['amount'].sum()
total_expense = df[df['type'] == 'Expense']['amount'].sum()
balance = total_income - total_expense
# 2. Monthly Stats
monthly_groups = df.groupby(['month_str', 'type'])['amount'].sum().unstack(fill_value=0)
months = sorted(monthly_groups.index.tolist())
monthly_income = [monthly_groups.loc[m].get('Income', 0) for m in months]
monthly_expense = [monthly_groups.loc[m].get('Expense', 0) for m in months]
# 3. Category Stats (Expense only)
expense_df = df[df['type'] == 'Expense']
if not expense_df.empty:
category_stats = expense_df.groupby('category')['amount'].sum().sort_values(ascending=False)
category_data = [{'name': k, 'value': v} for k, v in category_stats.items()]
# Daily Stats
daily_expense = expense_df.groupby('date_str')['amount'].sum()
daily_dates = sorted(daily_expense.index.tolist())
daily_values = [daily_expense[d] for d in daily_dates]
# Top Expenses
top_expenses = expense_df.nlargest(10, 'amount')[['date_str', 'desc', 'amount', 'category']].to_dict('records')
# 4. Weekly Stats (0=Monday, 6=Sunday)
expense_df['weekday'] = expense_df['time'].dt.weekday
weekly_stats = expense_df.groupby('weekday')['amount'].sum().reindex(range(7), fill_value=0)
weekly_values = weekly_stats.tolist()
# 5. Hourly Stats
expense_df['hour'] = expense_df['time'].dt.hour
hourly_stats = expense_df.groupby('hour')['amount'].sum().reindex(range(24), fill_value=0)
hourly_values = hourly_stats.tolist()
else:
category_data = []
daily_dates = []
daily_values = []
top_expenses = []
weekly_values = [0] * 7
hourly_values = [0] * 24
return {
'summary': {
'income': round(total_income, 2),
'expense': round(total_expense, 2),
'balance': round(balance, 2),
'count': len(df)
},
'charts': {
'months': months,
'monthly_income': [round(x, 2) for x in monthly_income],
'monthly_expense': [round(x, 2) for x in monthly_expense],
'category_data': [{'name': x['name'], 'value': round(x['value'], 2)} for x in category_data],
'daily_dates': daily_dates,
'daily_values': [round(x, 2) for x in daily_values],
'weekly_values': [round(x, 2) for x in weekly_values],
'hourly_values': [round(x, 2) for x in hourly_values]
},
'top_expenses': top_expenses
}
@app.route('/api/analyze', methods=['POST'])
def analyze():
file = request.files.get('file')
if not file:
return jsonify({'error': '未上传文件'}), 400
filename = file.filename.lower()
df = None
try:
if 'csv' in filename:
first_chunk = file.read(2048).decode('utf-8', errors='ignore')
file.seek(0)
if '微信支付' in first_chunk:
df = parse_wechat_csv(file)
elif '支付宝' in first_chunk or 'alipay' in first_chunk.lower():
df = parse_alipay_csv(file)
else:
try:
df = parse_wechat_csv(file)
except:
file.seek(0)
df = parse_alipay_csv(file)
else:
return jsonify({'error': '目前仅支持 CSV 格式账单'}), 400
if df is None or df.empty:
return jsonify({'error': '解析失败或数据为空'}), 400
result = generate_analysis_result(df)
return jsonify(result)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'error': f'处理出错: {str(e)}'}), 500
@app.route('/api/demo', methods=['GET'])
def demo():
"""Generate random demo data"""
try:
# Generate 100-200 random records
count = random.randint(100, 200)
data = []
categories = ['餐饮美食', '服饰装扮', '日用百货', '交通出行', '数码电器', '美容美发', '其他']
counterparties = ['肯德基', '麦当劳', '星巴克', '优衣库', '淘宝', '京东', '滴滴出行', '罗森', '全家', '理发店']
start_date = datetime.now() - timedelta(days=90)
for _ in range(count):
is_income = random.random() < 0.2 # 20% income
dt = start_date + timedelta(days=random.randint(0, 90), hours=random.randint(8, 22), minutes=random.randint(0, 59))
if is_income:
amount = random.randint(5000, 20000) if random.random() < 0.1 else random.randint(100, 1000)
cat = '工资' if amount > 5000 else '转账'
desc = '工资发放' if cat == '工资' else '朋友转账'
ctype = 'Income'
else:
amount = random.randint(10, 500) if random.random() < 0.9 else random.randint(500, 5000)
cat = random.choice(categories)
desc = f'{cat}消费'
ctype = 'Expense'
data.append({
'time': dt,
'category': cat,
'counterparty': random.choice(counterparties),
'desc': desc,
'type': ctype,
'amount': float(amount)
})
df = pd.DataFrame(data)
result = generate_analysis_result(df)
return jsonify(result)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'error': f'演示数据生成失败: {str(e)}'}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)