Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import io | |
| import random | |
| from datetime import datetime, timedelta | |
| from flask import Flask, render_template, request, jsonify | |
| app = Flask(__name__) | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB limit | |
| def index(): | |
| return render_template('index.html') | |
| def safe_rename_and_filter(df, col_map): | |
| """ | |
| Safely rename columns and filter the DataFrame to avoid duplicate column issues. | |
| """ | |
| # Create a new DataFrame with only the columns we want, using the new names | |
| new_data = {} | |
| # Iterate over the mapping: {original_name: new_name} | |
| # But wait, original names might be duplicated in the source CSV? | |
| # Usually CSVs from WeChat/Alipay have unique headers, but parsing might have issues. | |
| # Let's assume unique headers in source for now, or use the first one found. | |
| # Invert the map for easier lookup if needed, but here we iterate map items | |
| for original, new in col_map.items(): | |
| if original in df.columns: | |
| # If there are duplicate columns in source with same name, df[original] returns a DataFrame | |
| # We take the first one if that happens | |
| series = df[original] | |
| if isinstance(series, pd.DataFrame): | |
| series = series.iloc[:, 0] | |
| new_data[new] = series | |
| # Create new DF from dict | |
| new_df = pd.DataFrame(new_data) | |
| # Fill missing required columns with defaults | |
| required_cols = ['time', 'category', 'counterparty', 'desc', 'type', 'amount'] | |
| for col in required_cols: | |
| if col not in new_df.columns: | |
| new_df[col] = '' | |
| return new_df | |
| def parse_wechat_csv(file_stream): | |
| """解析微信账单 CSV""" | |
| content = file_stream.read().decode('utf-8', errors='ignore') | |
| lines = content.split('\n') | |
| header_row = -1 | |
| for i, line in enumerate(lines): | |
| if '交易时间' in line and '金额' in line: | |
| header_row = i | |
| break | |
| if header_row == -1: | |
| raise ValueError("无法识别微信账单格式") | |
| file_stream.seek(0) | |
| df = pd.read_csv(io.StringIO(content), header=header_row) | |
| # Clean column names | |
| df.columns = [c.strip() for c in df.columns] | |
| # Wechat specific processing | |
| # Filter rows first | |
| df = df.dropna(subset=['交易时间']) | |
| col_map = { | |
| '金额(元)': 'amount', | |
| '收/支': 'type', | |
| '交易时间': 'time', | |
| '商品': 'desc', | |
| '交易对方': 'counterparty', | |
| '交易类型': 'category' | |
| } | |
| # Use safe extraction | |
| result_df = safe_rename_and_filter(df, col_map) | |
| # Post-process | |
| result_df['amount'] = result_df['amount'].astype(str).str.replace('¥', '').apply(pd.to_numeric, errors='coerce') | |
| result_df['time'] = pd.to_datetime(result_df['time'], errors='coerce') | |
| result_df['source'] = '微信' | |
| return result_df | |
| def parse_alipay_csv(file_stream): | |
| """解析支付宝账单 CSV""" | |
| try: | |
| content = file_stream.read().decode('gbk') | |
| except: | |
| file_stream.seek(0) | |
| content = file_stream.read().decode('utf-8', errors='ignore') | |
| lines = content.split('\n') | |
| header_row = -1 | |
| for i, line in enumerate(lines): | |
| if '交易创建时间' in line or '交易时间' in line: | |
| if '金额' in line: | |
| header_row = i | |
| break | |
| if header_row == -1: | |
| raise ValueError("无法识别支付宝账单格式") | |
| file_stream.seek(0) | |
| df = pd.read_csv(io.StringIO(content), header=header_row) | |
| df.columns = [c.strip() for c in df.columns] | |
| # Filter valid rows | |
| if '金额(元)' in df.columns: | |
| df = df.dropna(subset=['金额(元)']) | |
| elif '金额(元)' in df.columns: # Sometimes it differs | |
| df = df.dropna(subset=['金额(元)']) | |
| # Determine time column | |
| time_col = '交易创建时间' if '交易创建时间' in df.columns else '交易时间' | |
| category_col = '交易分类' if '交易分类' in df.columns else None | |
| col_map = { | |
| '金额(元)': 'amount', | |
| '收/支': 'type', | |
| time_col: 'time', | |
| '商品名称': 'desc', | |
| '交易对方': 'counterparty', | |
| } | |
| if '金额(元)' in df.columns: col_map['金额(元)'] = 'amount' # Handle variance | |
| if category_col: col_map[category_col] = 'category' | |
| result_df = safe_rename_and_filter(df, col_map) | |
| if not category_col: | |
| result_df['category'] = '其他' | |
| result_df['amount'] = result_df['amount'].astype(str).apply(pd.to_numeric, errors='coerce') | |
| result_df['time'] = pd.to_datetime(result_df['time'], errors='coerce') | |
| result_df['source'] = '支付宝' | |
| return result_df | |
| def generate_analysis_result(df): | |
| """Common logic to generate analysis JSON from DataFrame""" | |
| # Data cleaning | |
| df['type'] = df['type'].astype(str).str.strip() | |
| df = df[df['type'].isin(['收入', '支出', '收', '支', 'Income', 'Expense'])] | |
| # Normalize type | |
| def normalize_type(t): | |
| if t in ['收入', '收', 'Income']: return 'Income' | |
| return 'Expense' | |
| df['type'] = df['type'].apply(normalize_type) | |
| # Ensure time is datetime | |
| if not pd.api.types.is_datetime64_any_dtype(df['time']): | |
| df['time'] = pd.to_datetime(df['time'], errors='coerce') | |
| # Remove rows with invalid time or amount | |
| df = df.dropna(subset=['time', 'amount']) | |
| # Add date columns | |
| df['date_str'] = df['time'].dt.strftime('%Y-%m-%d') | |
| df['month_str'] = df['time'].dt.strftime('%Y-%m') | |
| # 1. Summary | |
| total_income = df[df['type'] == 'Income']['amount'].sum() | |
| total_expense = df[df['type'] == 'Expense']['amount'].sum() | |
| balance = total_income - total_expense | |
| # 2. Monthly Stats | |
| monthly_groups = df.groupby(['month_str', 'type'])['amount'].sum().unstack(fill_value=0) | |
| months = sorted(monthly_groups.index.tolist()) | |
| monthly_income = [monthly_groups.loc[m].get('Income', 0) for m in months] | |
| monthly_expense = [monthly_groups.loc[m].get('Expense', 0) for m in months] | |
| # 3. Category Stats (Expense only) | |
| expense_df = df[df['type'] == 'Expense'] | |
| if not expense_df.empty: | |
| category_stats = expense_df.groupby('category')['amount'].sum().sort_values(ascending=False) | |
| category_data = [{'name': k, 'value': v} for k, v in category_stats.items()] | |
| # Daily Stats | |
| daily_expense = expense_df.groupby('date_str')['amount'].sum() | |
| daily_dates = sorted(daily_expense.index.tolist()) | |
| daily_values = [daily_expense[d] for d in daily_dates] | |
| # Top Expenses | |
| top_expenses = expense_df.nlargest(10, 'amount')[['date_str', 'desc', 'amount', 'category']].to_dict('records') | |
| # 4. Weekly Stats (0=Monday, 6=Sunday) | |
| expense_df['weekday'] = expense_df['time'].dt.weekday | |
| weekly_stats = expense_df.groupby('weekday')['amount'].sum().reindex(range(7), fill_value=0) | |
| weekly_values = weekly_stats.tolist() | |
| # 5. Hourly Stats | |
| expense_df['hour'] = expense_df['time'].dt.hour | |
| hourly_stats = expense_df.groupby('hour')['amount'].sum().reindex(range(24), fill_value=0) | |
| hourly_values = hourly_stats.tolist() | |
| else: | |
| category_data = [] | |
| daily_dates = [] | |
| daily_values = [] | |
| top_expenses = [] | |
| weekly_values = [0] * 7 | |
| hourly_values = [0] * 24 | |
| return { | |
| 'summary': { | |
| 'income': round(total_income, 2), | |
| 'expense': round(total_expense, 2), | |
| 'balance': round(balance, 2), | |
| 'count': len(df) | |
| }, | |
| 'charts': { | |
| 'months': months, | |
| 'monthly_income': [round(x, 2) for x in monthly_income], | |
| 'monthly_expense': [round(x, 2) for x in monthly_expense], | |
| 'category_data': [{'name': x['name'], 'value': round(x['value'], 2)} for x in category_data], | |
| 'daily_dates': daily_dates, | |
| 'daily_values': [round(x, 2) for x in daily_values], | |
| 'weekly_values': [round(x, 2) for x in weekly_values], | |
| 'hourly_values': [round(x, 2) for x in hourly_values] | |
| }, | |
| 'top_expenses': top_expenses | |
| } | |
| def analyze(): | |
| file = request.files.get('file') | |
| if not file: | |
| return jsonify({'error': '未上传文件'}), 400 | |
| filename = file.filename.lower() | |
| df = None | |
| try: | |
| if 'csv' in filename: | |
| first_chunk = file.read(2048).decode('utf-8', errors='ignore') | |
| file.seek(0) | |
| if '微信支付' in first_chunk: | |
| df = parse_wechat_csv(file) | |
| elif '支付宝' in first_chunk or 'alipay' in first_chunk.lower(): | |
| df = parse_alipay_csv(file) | |
| else: | |
| try: | |
| df = parse_wechat_csv(file) | |
| except: | |
| file.seek(0) | |
| df = parse_alipay_csv(file) | |
| else: | |
| return jsonify({'error': '目前仅支持 CSV 格式账单'}), 400 | |
| if df is None or df.empty: | |
| return jsonify({'error': '解析失败或数据为空'}), 400 | |
| result = generate_analysis_result(df) | |
| return jsonify(result) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({'error': f'处理出错: {str(e)}'}), 500 | |
| def demo(): | |
| """Generate random demo data""" | |
| try: | |
| # Generate 100-200 random records | |
| count = random.randint(100, 200) | |
| data = [] | |
| categories = ['餐饮美食', '服饰装扮', '日用百货', '交通出行', '数码电器', '美容美发', '其他'] | |
| counterparties = ['肯德基', '麦当劳', '星巴克', '优衣库', '淘宝', '京东', '滴滴出行', '罗森', '全家', '理发店'] | |
| start_date = datetime.now() - timedelta(days=90) | |
| for _ in range(count): | |
| is_income = random.random() < 0.2 # 20% income | |
| dt = start_date + timedelta(days=random.randint(0, 90), hours=random.randint(8, 22), minutes=random.randint(0, 59)) | |
| if is_income: | |
| amount = random.randint(5000, 20000) if random.random() < 0.1 else random.randint(100, 1000) | |
| cat = '工资' if amount > 5000 else '转账' | |
| desc = '工资发放' if cat == '工资' else '朋友转账' | |
| ctype = 'Income' | |
| else: | |
| amount = random.randint(10, 500) if random.random() < 0.9 else random.randint(500, 5000) | |
| cat = random.choice(categories) | |
| desc = f'{cat}消费' | |
| ctype = 'Expense' | |
| data.append({ | |
| 'time': dt, | |
| 'category': cat, | |
| 'counterparty': random.choice(counterparties), | |
| 'desc': desc, | |
| 'type': ctype, | |
| 'amount': float(amount) | |
| }) | |
| df = pd.DataFrame(data) | |
| result = generate_analysis_result(df) | |
| return jsonify(result) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({'error': f'演示数据生成失败: {str(e)}'}), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860) | |