import os import json import random import datetime import pandas as pd import numpy as np from flask import Flask, render_template, jsonify, request from faker import Faker app = Flask(__name__) fake = Faker() # Configuration app.config['JSON_AS_ASCII'] = False app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # app.jinja_env.variable_start_string = "[[" # app.jinja_env.variable_end_string = "]]" # Global Cache for simulated data DATA_CACHE = None class CostSimulator: def __init__(self): self.services = ['Amazon EC2', 'Amazon RDS', 'Amazon S3', 'AWS Lambda', 'Amazon CloudFront'] self.regions = ['us-east-1', 'us-west-2', 'eu-central-1', 'ap-northeast-1'] self.instance_types = ['t3.micro', 'm5.large', 'c5.xlarge', 'r5.2xlarge'] def generate_data(self, days=90): """Generate simulated billing data for the last N days.""" data = [] end_date = datetime.date.today() start_date = end_date - datetime.timedelta(days=days) # Base resources (Long running) resources = [] for _ in range(20): resources.append({ 'id': fake.uuid4(), 'service': random.choice(self.services), 'region': random.choice(self.regions), 'name': fake.hostname(), 'tag_env': random.choice(['Production', 'Staging', 'Dev']), 'base_cost': random.uniform(0.5, 50.0) # Daily cost }) current_date = start_date while current_date <= end_date: # Add base resource costs for res in resources: daily_cost = res['base_cost'] * random.uniform(0.9, 1.1) # Simulate a cost spike in the last 3 days for one specific resource if current_date > end_date - datetime.timedelta(days=3) and res['service'] == 'AWS Lambda': daily_cost *= random.uniform(5.0, 10.0) # Anomaly! data.append({ 'date': current_date.strftime('%Y-%m-%d'), 'resource_id': res['id'], 'resource_name': res['name'], 'service': res['service'], 'region': res['region'], 'environment': res['tag_env'], 'cost': round(daily_cost, 4), 'usage_amount': round(random.uniform(10, 1000), 2) }) # Add some random transient costs (Spot instances, data transfer) for _ in range(random.randint(5, 15)): svc = random.choice(self.services) data.append({ 'date': current_date.strftime('%Y-%m-%d'), 'resource_id': fake.uuid4(), 'resource_name': 'Transient-' + fake.word(), 'service': svc, 'region': random.choice(self.regions), 'environment': 'Dev', 'cost': round(random.uniform(0.1, 5.0), 4), 'usage_amount': round(random.uniform(1, 100), 2) }) current_date += datetime.timedelta(days=1) return pd.DataFrame(data) def get_data(refresh=False): global DATA_CACHE if DATA_CACHE is None or refresh: sim = CostSimulator() DATA_CACHE = sim.generate_data() return DATA_CACHE @app.route('/') def index(): return render_template('index.html') @app.route('/api/summary') def api_summary(): df = get_data() # Total Cost total_cost = df['cost'].sum() # This Month vs Last Month (Simulated logic using 30 day windows) today = datetime.date.today() last_30_start = (today - datetime.timedelta(days=30)).strftime('%Y-%m-%d') prev_30_start = (today - datetime.timedelta(days=60)).strftime('%Y-%m-%d') current_cost = df[df['date'] >= last_30_start]['cost'].sum() prev_cost = df[(df['date'] >= prev_30_start) & (df['date'] < last_30_start)]['cost'].sum() mom_change = 0 if prev_cost > 0: mom_change = ((current_cost - prev_cost) / prev_cost) * 100 return jsonify({ 'total_cost': round(total_cost, 2), 'current_month_cost': round(current_cost, 2), 'mom_change': round(mom_change, 2), 'forecast': round(current_cost * 1.05, 2) # Simple forecast }) @app.route('/api/trend') def api_trend(): df = get_data() # Group by date and service trend = df.groupby(['date', 'service'])['cost'].sum().reset_index() # Pivot for ECharts pivot = trend.pivot(index='date', columns='service', values='cost').fillna(0) dates = pivot.index.tolist() series = [] for column in pivot.columns: series.append({ 'name': column, 'type': 'bar', 'stack': 'total', 'data': pivot[column].round(2).tolist() }) return jsonify({ 'dates': dates, 'series': series }) @app.route('/api/anomalies') def api_anomalies(): df = get_data() # Simple anomaly detection: Daily cost > Mean + 3*STD daily_cost = df.groupby(['date', 'service'])['cost'].sum().reset_index() anomalies = [] for service in df['service'].unique(): svc_data = daily_cost[daily_cost['service'] == service] mean = svc_data['cost'].mean() std = svc_data['cost'].std() threshold = mean + 3 * std outliers = svc_data[svc_data['cost'] > threshold] for _, row in outliers.iterrows(): anomalies.append({ 'date': row['date'], 'service': service, 'cost': round(row['cost'], 2), 'threshold': round(threshold, 2), 'severity': 'Critical' if row['cost'] > mean * 2 else 'High' }) return jsonify(sorted(anomalies, key=lambda x: x['date'], reverse=True)) @app.route('/api/recommendations') def api_recommendations(): # Simulate logic: Find resources that are consistent but "expensive" -> Suggest RI # Or find Dev resources running on weekends recommendations = [ { 'id': 1, 'title': '购买 EC2 预留实例 (RI)', 'description': '检测到 5 台 m5.large 实例长期运行,购买 1 年期全预付 RI 可节省 35%。', 'potential_savings': 450.00, 'effort': 'Medium', 'category': 'Rate Optimization' }, { 'id': 2, 'title': '清理未关联的弹性 IP', 'description': '发现 3 个 EIP 未绑定到运行中的实例。', 'potential_savings': 15.00, 'effort': 'Low', 'category': 'Waste' }, { 'id': 3, 'title': 'S3 生命周期策略优化', 'description': '2TB 的日志数据超过 90 天未访问,建议归档至 Glacier。', 'potential_savings': 120.50, 'effort': 'Medium', 'category': 'Storage' }, { 'id': 4, 'title': 'RDS 实例空闲检测', 'description': 'db-staging-01 在过去 7 天 CPU 使用率低于 2%。', 'potential_savings': 85.20, 'effort': 'High', 'category': 'Rightsizing' } ] return jsonify(recommendations) @app.route('/api/breakdown') def api_breakdown(): df = get_data() # Breakdown by Service by_service = df.groupby('service')['cost'].sum().sort_values(ascending=False) # Breakdown by Environment by_env = df.groupby('environment')['cost'].sum().sort_values(ascending=False) return jsonify({ 'service_breakdown': [{'name': k, 'value': round(v, 2)} for k, v in by_service.items()], 'env_breakdown': [{'name': k, 'value': round(v, 2)} for k, v in by_env.items()] }) def _ensure_schema(df: pd.DataFrame) -> pd.DataFrame: required_cols = ['date', 'resource_id', 'resource_name', 'service', 'region', 'environment', 'cost', 'usage_amount'] for col in required_cols: if col not in df.columns: df[col] = None df['date'] = df['date'].fillna(datetime.date.today().strftime('%Y-%m-%d')).astype(str) df['resource_id'] = df['resource_id'].fillna(df['resource_name'].fillna('').astype(str) + '-' + pd.Series(range(len(df))).astype(str)).astype(str) df['resource_name'] = df['resource_name'].fillna('Imported-' + pd.Series(range(len(df))).astype(str)).astype(str) df['service'] = df['service'].fillna('Unknown Service').astype(str) df['region'] = df['region'].fillna('us-east-1').astype(str) df['environment'] = df['environment'].fillna('Dev').astype(str) df['cost'] = pd.to_numeric(df['cost'], errors='coerce').fillna(0.0) df['usage_amount'] = pd.to_numeric(df['usage_amount'], errors='coerce').fillna(0.0) return df[required_cols] @app.route('/api/upload', methods=['POST']) def api_upload(): global DATA_CACHE file = request.files.get('file') if not file: return jsonify({'status': 'error', 'message': '未收到文件'}), 400 filename = file.filename or 'uploaded' try: if filename.lower().endswith('.csv'): chunks = pd.read_csv(file.stream, chunksize=100000, encoding='utf-8', on_bad_lines='skip') df = pd.concat(list(chunks), ignore_index=True) elif filename.lower().endswith('.json'): payload = json.load(file.stream) df = pd.DataFrame(payload if isinstance(payload, list) else [payload]) else: tmp_path = os.path.join('/tmp', filename) file.save(tmp_path) return jsonify({'status': 'success', 'message': '二进制文件已保存', 'path': tmp_path}) df = _ensure_schema(df) DATA_CACHE = df return jsonify({'status': 'success', 'rows': int(len(df))}) except Exception as e: return jsonify({'status': 'error', 'message': f'导入失败: {str(e)}'}), 500 def _is_api_request(): return request.path.startswith('/api/') @app.errorhandler(404) def handle_404(e): if _is_api_request(): return jsonify({'status': 'error', 'message': '接口不存在'}), 404 return ('页面不存在 (404)', 404) @app.errorhandler(500) def handle_500(e): if _is_api_request(): return jsonify({'status': 'error', 'message': '服务器内部错误'}), 500 return ('内部服务器错误,请稍后再试', 500) @app.route('/api/refresh', methods=['POST']) def api_refresh(): get_data(refresh=True) return jsonify({'status': 'success'}) if __name__ == '__main__': port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port)