import os import json import math import random import io import csv import logging import numpy as np import pandas as pd from datetime import datetime, timedelta from flask import Flask, render_template_string, request, jsonify, send_file # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) app.secret_key = os.urandom(24) app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload # ----------------------------------------------------------------------------- # 核心算法逻辑 # ----------------------------------------------------------------------------- def generate_mock_data(months=24, trend=0.02, seasonality_strength=0.2, base=1000): """生成模拟销量数据:趋势 + 季节性 + 随机噪声""" data = [] start_date = datetime.now() - timedelta(days=months*30) for i in range(months): # 时间索引 date = start_date + timedelta(days=i*30) date_str = date.strftime("%Y-%m") # 趋势项 (线性增长) trend_factor = 1 + (i * trend) # 季节项 (模拟年度周期) # 使用 month 0-11 映射到 0-2pi month_idx = date.month - 1 season_factor = 1 + seasonality_strength * math.sin(2 * math.pi * month_idx / 12) # 随机噪声 noise = random.uniform(0.9, 1.1) # 最终销量 volume = int(base * trend_factor * season_factor * noise) data.append({ "date": date_str, "volume": volume }) return data def holt_winters_forecast(series, n_preds=6, alpha=0.3, beta=0.1, gamma=0.1, season_len=12): """ 简化的 Holt-Winters (Triple Exponential Smoothing) 实现 series: list of historical values n_preds: number of months to predict """ series = np.array(series) n = len(series) # 数据过短处理 if n < season_len * 2: season_len = max(2, n // 2) # 初始值 level = series[0] trend = series[1] - series[0] if n > 1 else 0 seasonals = [series[i] / (series[0] if series[0] != 0 else 1) for i in range(season_len)] result = [] # 拟合历史数据 (简单模拟,不进行复杂的参数优化,仅做演示运算) levels = [level] trends = [trend] # 训练阶段 for i in range(n): val = series[i] s_idx = i % season_len prev_level = levels[-1] prev_trend = trends[-1] prev_seasonal = seasonals[s_idx] # 防止除零 if prev_seasonal == 0: prev_seasonal = 1 if prev_level == 0: prev_level = 1 # 更新 Level new_level = alpha * (val / prev_seasonal) + (1 - alpha) * (prev_level + prev_trend) # 更新 Trend new_trend = beta * (new_level - prev_level) + (1 - beta) * prev_trend # 更新 Seasonal new_seasonal = gamma * (val / new_level) + (1 - gamma) * prev_seasonal levels.append(new_level) trends.append(new_trend) seasonals[s_idx] = new_seasonal # 更新当前季节系数 # 记录拟合值 (One-step ahead forecast) fitted = (prev_level + prev_trend) * prev_seasonal result.append(fitted) # 预测未来 forecast = [] last_level = levels[-1] last_trend = trends[-1] for i in range(n_preds): m = i + 1 s_idx = (n + i) % season_len pred = (last_level + m * last_trend) * seasonals[s_idx] forecast.append(int(pred)) return result, forecast def calculate_inventory_metrics(history_series, forecast_series, lead_time_days, service_level, unit_cost, holding_cost_percent): """计算库存核心指标""" if not history_series: return {} # 1. 计算日均销量 (简化:月销量 / 30) avg_monthly_demand = np.mean(history_series[-6:]) # 取最近6个月均值 avg_daily_demand = avg_monthly_demand / 30 # 2. 计算需求标准差 (用于安全库存) # 计算最近历史数据的波动性 std_dev_monthly = np.std(history_series[-6:]) std_dev_daily = std_dev_monthly / math.sqrt(30) # 3. Z-score 映射 (Service Level -> Z) # 90% -> 1.28, 95% -> 1.645, 99% -> 2.33 z_map = { 0.90: 1.28, 0.95: 1.645, 0.98: 2.05, 0.99: 2.33 } # 默认插值或取最近 z_score = 1.645 # default 95% closest_sl = min(z_map.keys(), key=lambda x: abs(x - service_level)) z_score = z_map[closest_sl] # 4. 安全库存 (Safety Stock) = Z * sigma_LT # sigma_LT = sigma_daily * sqrt(Lead Time) safety_stock = z_score * std_dev_daily * math.sqrt(lead_time_days) # 5. 再订货点 (ROP) = (Daily Demand * Lead Time) + Safety Stock rop = (avg_daily_demand * lead_time_days) + safety_stock # 6. 建议订货量 (EOQ - Economic Order Quantity) # EOQ = sqrt( (2 * AnnualDemand * OrderCost) / HoldingCostPerUnit ) # 假设 OrderCost 固定为 $50 (演示用) annual_demand = np.sum(forecast_series) * (12 / len(forecast_series)) if len(forecast_series) > 0 else 0 order_cost = 50 holding_cost_per_unit = unit_cost * holding_cost_percent if holding_cost_per_unit > 0: eoq = math.sqrt((2 * annual_demand * order_cost) / holding_cost_per_unit) else: eoq = annual_demand / 12 # fallback return { "safety_stock": int(safety_stock), "rop": int(rop), "eoq": int(eoq), "avg_daily_demand": round(avg_daily_demand, 2), "turnover_rate": round(annual_demand / ((safety_stock + eoq/2) * unit_cost), 1) if unit_cost > 0 and (safety_stock + eoq/2) > 0 else 0 } # ----------------------------------------------------------------------------- # Routes # ----------------------------------------------------------------------------- @app.route('/') def index(): return render_template_string(TEMPLATE) @app.route('/api/generate', methods=['POST']) def api_generate(): try: params = request.json trend = float(params.get('trend', 0.02)) seasonality = float(params.get('seasonality', 0.2)) base = int(params.get('base', 1000)) data = generate_mock_data(months=24, trend=trend, seasonality_strength=seasonality, base=base) return jsonify({"status": "success", "data": data}) except Exception as e: logger.error(f"Generate error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/api/forecast', methods=['POST']) def api_forecast(): try: req = request.json history = req.get('history', []) # list of {date, volume} params = req.get('params', {}) if not history: return jsonify({"status": "error", "message": "No history data provided"}), 400 # Extract time series volumes = [d['volume'] for d in history] dates = [d['date'] for d in history] # Run Forecast fitted, forecast = holt_winters_forecast(volumes, n_preds=6) # Generate future dates try: last_date = datetime.strptime(dates[-1], "%Y-%m") except ValueError: # Try another format if %Y-%m fails, or default last_date = datetime.now() future_dates = [] for i in range(6): d = last_date + timedelta(days=(i+1)*30) future_dates.append(d.strftime("%Y-%m")) # Calculate Inventory Metrics lead_time = int(params.get('lead_time', 14)) service_level = float(params.get('service_level', 0.95)) unit_cost = float(params.get('unit_cost', 50)) metrics = calculate_inventory_metrics( volumes, forecast, lead_time_days=lead_time, service_level=service_level, unit_cost=unit_cost, holding_cost_percent=0.2 # 20% annual holding cost ) return jsonify({ "forecast_dates": future_dates, "forecast_values": forecast, "metrics": metrics, "fitted": fitted # Optional: show how well it fit history }) except Exception as e: logger.error(f"Forecast error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/api/upload', methods=['POST']) def api_upload(): try: if 'file' not in request.files: return jsonify({"status": "error", "message": "No file part"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"status": "error", "message": "No selected file"}), 400 if file: # Handle large files by processing stream or reading efficiently # For simplicity with pandas, we read into memory, but limit is 16MB via config try: if file.filename.endswith('.csv'): df = pd.read_csv(file) elif file.filename.endswith(('.xls', '.xlsx')): df = pd.read_excel(file) else: return jsonify({"status": "error", "message": "Unsupported file format. Please use CSV or Excel."}), 400 except Exception as e: return jsonify({"status": "error", "message": f"File parse error: {str(e)}"}), 400 # Normalize columns df.columns = [c.lower() for c in df.columns] # Look for date and volume columns date_col = next((c for c in df.columns if 'date' in c or 'time' in c or '日期' in c or '时间' in c), None) vol_col = next((c for c in df.columns if 'vol' in c or 'qty' in c or 'amount' in c or '销量' in c or '数量' in c), None) if not date_col or not vol_col: return jsonify({"status": "error", "message": "Could not identify 'Date' or 'Volume' columns. Please name them clearly."}), 400 # Sort by date try: df[date_col] = pd.to_datetime(df[date_col]) df = df.sort_values(date_col) df[date_col] = df[date_col].dt.strftime('%Y-%m') except Exception: return jsonify({"status": "error", "message": "Date column format invalid"}), 400 data = [] for _, row in df.iterrows(): try: vol = int(row[vol_col]) data.append({ "date": str(row[date_col]), "volume": vol }) except ValueError: continue # skip invalid rows return jsonify({"status": "success", "data": data}) except Exception as e: logger.error(f"Upload error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # ----------------------------------------------------------------------------- # Vue Template # ----------------------------------------------------------------------------- TEMPLATE = """ 智能库存预测引擎 (Inventory Forecast Engine)

处理中...

智能库存预测引擎

Inventory Forecast Engine Pro

数据模拟参数

${ (params.trend * 100).toFixed(0) }%
${ params.seasonality }

库存策略配置

${ invParams.lead_time }d

商业价值说明

本系统使用 Holt-Winters 三次指数平滑算法 预测未来销量,并基于正态分布理论计算安全库存再订货点 (ROP)。帮助商家在维持服务水平的同时,最小化资金占用。

${ error }
建议安全库存
${ metrics.safety_stock }
Buffer Stock
再订货点 (ROP)
${ metrics.rop }
Reorder Point
经济订货量 (EOQ)
${ metrics.eoq }
Optimal Order Qty
预估周转率
${ metrics.turnover_rate }x /年
Turnover Rate

销量预测与库存分析 Holt-Winters Model

""" if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)