|
|
import os |
|
|
import logging |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from scipy.optimize import minimize_scalar |
|
|
from flask import Flask, render_template, send_from_directory, request, jsonify |
|
|
from werkzeug.utils import secure_filename |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = Flask(__name__) |
|
|
app.config['JSON_AS_ASCII'] = False |
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
|
|
app.config['UPLOAD_FOLDER'] = 'uploads' |
|
|
|
|
|
|
|
|
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
|
|
|
|
|
@app.route('/') |
|
|
def index(): |
|
|
|
|
|
return render_template('index.html') |
|
|
|
|
|
@app.route('/static/<path:path>') |
|
|
def send_static(path): |
|
|
return send_from_directory('static', path) |
|
|
|
|
|
def calculate_demand_linear(price, base_price, base_demand, elasticity): |
|
|
""" |
|
|
线性需求模型: D = a - bP |
|
|
在基准点 (base_price, base_demand) 处的弹性为 elasticity |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
e = -abs(elasticity) |
|
|
|
|
|
b = -e * (base_demand / base_price) |
|
|
a = base_demand + b * base_price |
|
|
|
|
|
demand = a - b * price |
|
|
return np.maximum(demand, 0) |
|
|
|
|
|
def calculate_demand_constant_elasticity(price, base_price, base_demand, elasticity): |
|
|
""" |
|
|
恒定弹性模型 (指数模型): D = A * P^e |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
e = -abs(elasticity) |
|
|
A = base_demand / (base_price ** e) |
|
|
|
|
|
demand = A * (price ** e) |
|
|
return demand |
|
|
|
|
|
def run_optimization(cost, base_price, base_demand, elasticity, model_type='linear'): |
|
|
""" |
|
|
运行价格优化算法 |
|
|
""" |
|
|
|
|
|
min_p = max(cost * 0.8, 0.01) |
|
|
max_p = max(base_price * 2.0, cost * 3.0) |
|
|
|
|
|
prices = np.linspace(min_p, max_p, 100) |
|
|
|
|
|
results = [] |
|
|
max_profit = -float('inf') |
|
|
optimal_price = base_price |
|
|
|
|
|
|
|
|
for p in prices: |
|
|
if model_type == 'constant': |
|
|
d = calculate_demand_constant_elasticity(p, base_price, base_demand, elasticity) |
|
|
else: |
|
|
d = calculate_demand_linear(p, base_price, base_demand, elasticity) |
|
|
|
|
|
revenue = p * d |
|
|
profit = (p - cost) * d |
|
|
|
|
|
if profit > max_profit: |
|
|
max_profit = profit |
|
|
optimal_price = p |
|
|
|
|
|
results.append({ |
|
|
'price': round(p, 2), |
|
|
'demand': round(d, 2), |
|
|
'revenue': round(revenue, 2), |
|
|
'profit': round(profit, 2) |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
zero_demand_price = max_p |
|
|
if model_type == 'linear': |
|
|
|
|
|
e_val = -abs(elasticity) |
|
|
b_val = -e_val * (base_demand / base_price) |
|
|
if b_val != 0: |
|
|
a_val = base_demand + b_val * base_price |
|
|
zero_demand_price = a_val / b_val |
|
|
|
|
|
upper_bound = min(max_p * 1.5, zero_demand_price * 1.1 if model_type == 'linear' else max_p * 1.5) |
|
|
|
|
|
def profit_func(p): |
|
|
if p <= 0: return 1e9 |
|
|
if model_type == 'constant': |
|
|
d = calculate_demand_constant_elasticity(p, base_price, base_demand, elasticity) |
|
|
else: |
|
|
d = calculate_demand_linear(p, base_price, base_demand, elasticity) |
|
|
|
|
|
|
|
|
prof = (p - cost) * d |
|
|
return -prof |
|
|
|
|
|
res = minimize_scalar(profit_func, bounds=(cost, upper_bound), method='bounded') |
|
|
|
|
|
refined_optimal_price = optimal_price |
|
|
refined_max_profit = max_profit |
|
|
|
|
|
if res.success: |
|
|
opt_p = res.x |
|
|
opt_prof = -res.fun |
|
|
|
|
|
if opt_prof >= max_profit: |
|
|
refined_optimal_price = opt_p |
|
|
refined_max_profit = opt_prof |
|
|
|
|
|
|
|
|
if model_type == 'constant': |
|
|
opt_d = calculate_demand_constant_elasticity(refined_optimal_price, base_price, base_demand, elasticity) |
|
|
else: |
|
|
opt_d = calculate_demand_linear(refined_optimal_price, base_price, base_demand, elasticity) |
|
|
|
|
|
return { |
|
|
'results': results, |
|
|
'optimal': { |
|
|
'price': round(refined_optimal_price, 2), |
|
|
'profit': round(refined_max_profit, 2), |
|
|
'demand': round(opt_d, 2), |
|
|
'margin': round(((refined_optimal_price - cost) / refined_optimal_price) * 100, 2) if refined_optimal_price > 0 else 0 |
|
|
} |
|
|
} |
|
|
|
|
|
@app.route('/api/analyze', methods=['POST']) |
|
|
def analyze(): |
|
|
try: |
|
|
data = request.json |
|
|
if not data: |
|
|
return jsonify({'success': False, 'error': '未提供数据'}), 400 |
|
|
|
|
|
|
|
|
cost = float(data.get('cost', 10.0)) |
|
|
base_price = float(data.get('base_price', 20.0)) |
|
|
base_demand = float(data.get('base_demand', 100.0)) |
|
|
elasticity = float(data.get('elasticity', -1.5)) |
|
|
model_type = data.get('model_type', 'linear') |
|
|
|
|
|
result = run_optimization(cost, base_price, base_demand, elasticity, model_type) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'data': result['results'], |
|
|
'optimal': result['optimal'] |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Analysis Error: {str(e)}") |
|
|
return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/upload', methods=['POST']) |
|
|
def upload_file(): |
|
|
""" |
|
|
处理 CSV/Excel 文件上传并批量分析 |
|
|
""" |
|
|
try: |
|
|
if 'file' not in request.files: |
|
|
return jsonify({'success': False, 'error': '未找到上传文件'}), 400 |
|
|
|
|
|
file = request.files['file'] |
|
|
if file.filename == '': |
|
|
return jsonify({'success': False, 'error': '未选择文件'}), 400 |
|
|
|
|
|
if file: |
|
|
filename = secure_filename(file.filename) |
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
|
file.save(filepath) |
|
|
|
|
|
|
|
|
try: |
|
|
if filename.endswith('.csv'): |
|
|
df = pd.read_csv(filepath) |
|
|
elif filename.endswith(('.xls', '.xlsx')): |
|
|
df = pd.read_excel(filepath) |
|
|
else: |
|
|
return jsonify({'success': False, 'error': '不支持的文件格式 (仅支持 .csv, .xls, .xlsx)'}), 400 |
|
|
except Exception as e: |
|
|
return jsonify({'success': False, 'error': f'文件读取错误: {str(e)}'}), 400 |
|
|
|
|
|
|
|
|
required_cols = ['cost', 'base_price', 'base_demand'] |
|
|
missing_cols = [col for col in required_cols if col not in df.columns] |
|
|
if missing_cols: |
|
|
return jsonify({'success': False, 'error': f'Missing columns: {", ".join(missing_cols)}'}), 400 |
|
|
|
|
|
|
|
|
results = [] |
|
|
for _, row in df.iterrows(): |
|
|
try: |
|
|
cost = float(row['cost']) |
|
|
base_price = float(row['base_price']) |
|
|
base_demand = float(row['base_demand']) |
|
|
elasticity = float(row.get('elasticity', -1.5)) |
|
|
model_type = str(row.get('model_type', 'linear')) |
|
|
|
|
|
opt_res = run_optimization(cost, base_price, base_demand, elasticity, model_type) |
|
|
|
|
|
item_res = { |
|
|
'input': { |
|
|
'cost': cost, |
|
|
'base_price': base_price, |
|
|
'base_demand': base_demand, |
|
|
'elasticity': elasticity |
|
|
}, |
|
|
'optimal': opt_res['optimal'] |
|
|
} |
|
|
results.append(item_res) |
|
|
except Exception as row_err: |
|
|
logger.warning(f"Row processing error: {row_err}") |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
os.remove(filepath) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'batch_results': results[:50] |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Upload Error: {str(e)}") |
|
|
return jsonify({'success': False, 'error': f"服务器内部错误: {str(e)}"}), 500 |
|
|
|
|
|
if __name__ == '__main__': |
|
|
port = int(os.environ.get('PORT', 7860)) |
|
|
app.run(host='0.0.0.0', port=port) |
|
|
|