import os import json import sqlite3 import datetime import random import requests from flask import Flask, render_template, request, jsonify, g from werkzeug.utils import secure_filename from dotenv import load_dotenv # 冷链闭环智能体:加载环境变量 load_dotenv() BASE_DIR = os.path.abspath(os.path.dirname(__file__)) INSTANCE_DIR = os.path.join(BASE_DIR, "instance") os.makedirs(INSTANCE_DIR, exist_ok=True) app = Flask(__name__) app.config["DATABASE"] = os.path.join(INSTANCE_DIR, "coldchain.db") app.config["SECRET_KEY"] = os.getenv("SECRET_KEY", "coldchain-secret-key") app.config["JSON_AS_ASCII"] = False app.config["MAX_CONTENT_LENGTH"] = 32 * 1024 * 1024 SILICONFLOW_API_KEY = os.getenv("SILICONFLOW_API_KEY", "").strip() SILICONFLOW_BASE_URL = os.getenv("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1").strip() SILICONFLOW_MODEL = os.getenv("SILICONFLOW_MODEL", "Qwen/Qwen2.5-7B-Instruct").strip() def get_db(): # 统一获取数据库连接 db = getattr(g, "_database", None) if db is None: db = g._database = sqlite3.connect(app.config["DATABASE"]) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): # 请求结束后关闭连接 db = getattr(g, "_database", None) if db is not None: db.close() def simulate_coldchain_tools(scenario, target, constraints): seed = len(scenario) + len(target or "") + len(constraints or "") random.seed(seed) route_count = max(5, min(18, seed // 40)) hubs = max(2, min(8, seed // 90)) temp_risk = round(random.uniform(0.8, 2.8), 2) on_time = round(96 + random.uniform(1.2, 2.8), 2) compliance = round(99.0 + random.uniform(0.2, 0.7), 2) cost_saving = round(random.uniform(5.0, 12.0), 1) risk_keywords = ["堵车", "夜间", "限行", "偏差", "高峰", "雨雪", "台风"] risk_score = 0 for k in risk_keywords: if k in scenario or k in (constraints or ""): risk_score += 2 risk_score += min(6, seed // 160) if risk_score <= 3: risk_level = "低" elif risk_score <= 6: risk_level = "中" else: risk_level = "高" return { "route_count": route_count, "hub_count": hubs, "temp_risk": temp_risk, "on_time": on_time, "compliance": compliance, "cost_saving": cost_saving, "risk_level": risk_level, } def seed_demo(db): row = db.execute("SELECT COUNT(*) AS c FROM sessions").fetchone() if row and row["c"]: return demo = { "name": "示例会话 · 医药冷链干线 + 末端", "scenario": ( "为华东地区 6 个仓与 28 家三甲医院建立 2-8℃医药冷链网络," "需要在 24 小时内完成跨城配送,温控偏差不能超过 30 分钟。" "要求支持节假日高峰、夜间配送以及异常追溯。" ), "target": "准时交付率 ≥ 98%,温控合规率 ≥ 99.5%,综合成本降低 8%", "constraints": "夜间限行、部分城市冷链车辆通行证审批较慢,预算上限 450 万/年", } cursor = db.execute( "INSERT INTO sessions (name, scenario, target, constraints) VALUES (?, ?, ?, ?)", (demo["name"], demo["scenario"], demo["target"], demo["constraints"]), ) session_id = cursor.lastrowid tool_result = simulate_coldchain_tools(demo["scenario"], demo["target"], demo["constraints"]) db.execute( """ INSERT INTO steps (session_id, step_order, role, step_type, content) VALUES (?, 1, 'Planner', 'reasoning', ?) """, ( session_id, "## 示例推理结果\n\n" "- 将华东划分为干线 + 末端两级网络,优先保障医院到区域仓的时效\n" "- 对夜间与节假日高峰设置冗余线路与备用车辆池\n" "- 使用温度记录仪与异常告警系统保障 2-8℃ 全程可追溯\n", ), ) db.execute( """ INSERT INTO steps (session_id, step_order, role, step_type, content) VALUES (?, 2, 'RouteOps', 'tool_action', ?) """, ( session_id, "## 示例工具行动结果\n\n" f"- 规划干线/支线数:{tool_result['route_count']} 条\n" f"- 冷链枢纽节点:{tool_result['hub_count']} 个\n" f"- 温控偏差风险指数:{tool_result['temp_risk']}\n" f"- 预计准时交付率:{tool_result['on_time']}%\n" f"- 预计温控合规率:{tool_result['compliance']}%\n" f"- 预计综合成本节约:{tool_result['cost_saving']}%\n", ), ) db.execute( """ INSERT INTO steps (session_id, step_order, role, step_type, content) VALUES (?, 3, 'Memory', 'memory', ?) """, ( session_id, "## 示例状态记忆\n\n" "- 首轮试点重点关注医院急配线路与疫苗仓温控告警阈值\n" "- 记录各城市夜间限行时间窗口与通行证办理周期\n" "- 建立合作冷链车队与外包商的服务等级基线\n", ), ) db.execute( """ INSERT INTO steps (session_id, step_order, role, step_type, content) VALUES (?, 4, 'QualityAuditor', 'validation', ?) """, ( session_id, "## 示例校验与验收\n\n" "- 首轮模拟结果满足准时率与温控合规目标\n" "- 建议在真实运营前增加极端天气与高峰场景的压力测试\n", ), ) db.execute( """ INSERT INTO steps (session_id, step_order, role, step_type, content) VALUES (?, 5, 'Optimizer', 'iteration', ?) """, ( session_id, "## 示例迭代计划\n\n" "- 第二阶段纳入更多区域仓与三方物流合作方\n" "- 收集真实运营数据后,进一步优化线路与车辆编组\n", ), ) db.execute( """ INSERT INTO assets (session_id, asset_type, title, content) VALUES (?, 'report', '冷链闭环运营资产包(示例)', ?) """, ( session_id, "## 资产包概要(示例)\n\n" "- 冷链网络拓扑草案:干线 + 末端两级结构\n" "- 温控合规模板:2-8℃ 温度区间与偏差告警策略\n" "- KPI 指标字典:准时率、合规率、成本节约率\n" "- 合作方清单:仓、车队与医院联络窗口\n", ), ) db.execute( """ INSERT INTO assets (session_id, asset_type, title, content) VALUES (?, 'metrics', '本轮关键指标快照(示例)', ?) """, (session_id, json.dumps(tool_result, ensure_ascii=False)), ) db.commit() def init_db(): db = get_db() db.execute( """ CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, scenario TEXT NOT NULL, target TEXT, constraints TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) db.execute( """ CREATE TABLE IF NOT EXISTS steps ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id INTEGER NOT NULL, step_order INTEGER NOT NULL, role TEXT NOT NULL, step_type TEXT NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE ) """ ) db.execute( """ CREATE TABLE IF NOT EXISTS assets ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id INTEGER NOT NULL, asset_type TEXT NOT NULL, title TEXT NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE ) """ ) db.commit() seed_demo(db) with app.app_context(): init_db() def build_api_url(base_url): # 统一拼接 SiliconFlow API 地址 if base_url.endswith("/chat/completions"): return base_url return f"{base_url.rstrip('/')}/chat/completions" def call_llm(messages, temperature=0.7, max_tokens=900): # 调用硅基流大模型,失败时自动回落 if not SILICONFLOW_API_KEY or not SILICONFLOW_API_KEY.startswith("sk-"): return mock_completion(messages) payload = { "model": SILICONFLOW_MODEL, "messages": messages, "stream": False, "temperature": temperature, "max_tokens": max_tokens, } headers = { "Authorization": f"Bearer {SILICONFLOW_API_KEY}", "Content-Type": "application/json", } try: resp = requests.post(build_api_url(SILICONFLOW_BASE_URL), json=payload, headers=headers, timeout=20) if resp.status_code == 200: data = resp.json() return data["choices"][0]["message"]["content"] return mock_completion(messages) except Exception: return mock_completion(messages) def mock_completion(messages): # 本地模拟输出,保持闭环演示可运行 last_user = "" for m in reversed(messages): if m.get("role") == "user": last_user = m.get("content", "")[:240] break now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") return ( f"## 本地模拟输出\n\n" f"- 时间:{now}\n" f"- 说明:未配置硅基流 API Key,系统使用规则化策略生成示意结果。\n\n" f"### 场景摘要\n\n{last_user}\n\n" f"### 建议要点\n\n" f"- 优先保证温控合规与准时交付\n" f"- 对高风险线路设置双层冗余方案\n" f"- 建议分阶段上线与持续迭代" ) def simulate_coldchain_tools(scenario, target, constraints): # 工具层模拟:输出可量化指标 seed = len(scenario) + len(target or "") + len(constraints or "") random.seed(seed) route_count = max(5, min(18, seed // 40)) hubs = max(2, min(8, seed // 90)) temp_risk = round(random.uniform(0.8, 2.8), 2) on_time = round(96 + random.uniform(1.2, 2.8), 2) compliance = round(99.0 + random.uniform(0.2, 0.7), 2) cost_saving = round(random.uniform(5.0, 12.0), 1) risk_keywords = ["堵车", "夜间", "限行", "偏差", "高峰", "雨雪", "台风"] risk_score = 0 for k in risk_keywords: if k in scenario or k in (constraints or ""): risk_score += 2 risk_score += min(6, seed // 160) if risk_score <= 3: risk_level = "低" elif risk_score <= 6: risk_level = "中" else: risk_level = "高" return { "route_count": route_count, "hub_count": hubs, "temp_risk": temp_risk, "on_time": on_time, "compliance": compliance, "cost_saving": cost_saving, "risk_level": risk_level, } def run_coldchain_workflow(scenario, target, constraints): # 冷链闭环主流程:推理 -> 工具行动 -> 状态记忆 -> 校验验收 -> 迭代复盘 tool_result = simulate_coldchain_tools(scenario, target, constraints) system_role = ( "你是冷链物流与医药合规专家,负责构建可执行的闭环方案。" "输出中文 Markdown,结构清晰,强调可落地与可复用资产。" ) reasoning_prompt = ( f"冷链场景:{scenario}\n" f"目标:{target or '未指定'}\n" f"约束:{constraints or '未指定'}\n" "请生成推理/决策阶段的方案,包含关键假设、优先级、核心路径。" ) reasoning = call_llm( [ {"role": "system", "content": system_role}, {"role": "user", "content": reasoning_prompt}, ], temperature=0.6, ) tool_md = ( "## 工具行动结果(模拟)\n\n" f"- 规划干线/支线数:{tool_result['route_count']} 条\n" f"- 冷链枢纽节点:{tool_result['hub_count']} 个\n" f"- 温控偏差风险指数:{tool_result['temp_risk']}\n" f"- 预计准时交付率:{tool_result['on_time']}%\n" f"- 预计温控合规率:{tool_result['compliance']}%\n" f"- 预计综合成本节约:{tool_result['cost_saving']}%\n" f"- 风险等级:{tool_result['risk_level']}\n" ) memory_prompt = ( "请输出可沉淀的状态/记忆资产清单,至少包含:" "标准化 SOP、关键指标字典、异常追溯模板、合作方清单。" ) memory_md = call_llm( [ {"role": "system", "content": system_role}, {"role": "user", "content": memory_prompt}, ], temperature=0.5, ) validation_prompt = ( f"当前工具指标:准时交付率 {tool_result['on_time']}%," f"温控合规率 {tool_result['compliance']}%,成本节约 {tool_result['cost_saving']}%。" "请生成校验/验收结论,列出是否达标与需补强项。" ) validation_md = call_llm( [ {"role": "system", "content": system_role}, {"role": "user", "content": validation_prompt}, ], temperature=0.4, ) iteration_prompt = ( "请生成迭代/复盘计划,包含下一轮优化假设、需要补采的数据、" "以及可以立即执行的 3 条改进动作。" ) iteration_md = call_llm( [ {"role": "system", "content": system_role}, {"role": "user", "content": iteration_prompt}, ], temperature=0.6, ) steps = [ { "step_order": 1, "role": "Planner", "step_type": "reasoning", "content": reasoning, }, { "step_order": 2, "role": "RouteOps", "step_type": "tool_action", "content": tool_md, }, { "step_order": 3, "role": "Memory", "step_type": "memory", "content": memory_md, }, { "step_order": 4, "role": "QualityAuditor", "step_type": "validation", "content": validation_md, }, { "step_order": 5, "role": "Optimizer", "step_type": "iteration", "content": iteration_md, }, ] assets = [ { "asset_type": "report", "title": "冷链闭环运营资产包", "content": ( "## 资产包概要\n\n" "- 冷链网络拓扑草案\n" "- 温控合规模板与异常追溯SOP\n" "- KPI 指标字典与验收阈值\n" "- 供应商与节点协作清单\n" ), }, { "asset_type": "metrics", "title": "本轮关键指标快照", "content": json.dumps(tool_result, ensure_ascii=False), }, ] return steps, assets @app.route("/") def index(): return render_template("index.html") @app.route("/api/overview") def overview(): db = get_db() session_count = db.execute("SELECT COUNT(*) AS c FROM sessions").fetchone()["c"] step_count = db.execute("SELECT COUNT(*) AS c FROM steps").fetchone()["c"] asset_count = db.execute("SELECT COUNT(*) AS c FROM assets").fetchone()["c"] return jsonify( { "session_count": session_count, "step_count": step_count, "asset_count": asset_count, } ) @app.route("/api/sessions") def list_sessions(): db = get_db() rows = db.execute( "SELECT id, name, scenario, target, created_at FROM sessions ORDER BY id DESC LIMIT 50" ).fetchall() sessions = [ { "id": r["id"], "name": r["name"], "scenario": r["scenario"], "target": r["target"], "created_at": r["created_at"], } for r in rows ] return jsonify({"sessions": sessions}) @app.route("/api/session/") def get_session(session_id): db = get_db() session = db.execute( "SELECT * FROM sessions WHERE id = ?", (session_id,) ).fetchone() if not session: return jsonify({"error": "会话不存在"}), 404 steps = db.execute( "SELECT step_order, role, step_type, content, created_at FROM steps WHERE session_id = ? ORDER BY step_order ASC", (session_id,), ).fetchall() assets = db.execute( "SELECT asset_type, title, content, created_at FROM assets WHERE session_id = ? ORDER BY id DESC", (session_id,), ).fetchall() return jsonify( { "session": dict(session), "steps": [dict(s) for s in steps], "assets": [dict(a) for a in assets], } ) @app.route("/api/assets") def list_assets(): db = get_db() rows = db.execute( "SELECT id, session_id, asset_type, title, content, created_at FROM assets ORDER BY id DESC LIMIT 50" ).fetchall() assets = [dict(r) for r in rows] return jsonify({"assets": assets}) @app.route("/api/upload", methods=["POST"]) def upload_file(): file = request.files.get("file") if not file: return jsonify({"error": "未收到上传文件"}), 400 filename = secure_filename(file.filename or "unnamed.bin") if not filename: filename = "unnamed.bin" uploads_dir = os.path.join(INSTANCE_DIR, "uploads") os.makedirs(uploads_dir, exist_ok=True) path = os.path.join(uploads_dir, filename) file.save(path) size_bytes = os.path.getsize(path) size_kb = round(size_bytes / 1024, 2) db = get_db() db.execute( """ INSERT INTO assets (session_id, asset_type, title, content) VALUES (?, ?, ?, ?) """, ( 1, "upload", f"上传文件:{filename}", f"大小:{size_kb} KB;存储于内部 uploads 目录,用于演示 Hugging Face 大文件处理策略。", ), ) db.commit() return jsonify({"filename": filename, "size_kb": size_kb}) @app.route("/api/run", methods=["POST"]) def run_workflow(): try: data = request.get_json(force=True) except Exception: return jsonify({"error": "请求体必须为 JSON"}), 400 scenario = (data or {}).get("scenario", "").strip() target = (data or {}).get("target", "").strip() constraints = (data or {}).get("constraints", "").strip() name = (data or {}).get("name", "").strip() or "冷链运营会话" if not scenario: return jsonify({"error": "请提供冷链场景描述"}), 400 db = get_db() cursor = db.execute( "INSERT INTO sessions (name, scenario, target, constraints) VALUES (?, ?, ?, ?)", (name, scenario, target, constraints), ) session_id = cursor.lastrowid steps, assets = run_coldchain_workflow(scenario, target, constraints) for s in steps: db.execute( """ INSERT INTO steps (session_id, step_order, role, step_type, content) VALUES (?, ?, ?, ?, ?) """, (session_id, s["step_order"], s["role"], s["step_type"], s["content"]), ) for a in assets: db.execute( """ INSERT INTO assets (session_id, asset_type, title, content) VALUES (?, ?, ?, ?) """, (session_id, a["asset_type"], a["title"], a["content"]), ) db.commit() return jsonify( { "session_id": session_id, "steps": steps, "assets": assets, } ) if __name__ == "__main__": port = int(os.getenv("PORT", "7865")) app.run(host="0.0.0.0", port=port, debug=False)