Trae Assistant
init
4797b80
import os
import json
import sqlite3
import datetime
import random
import requests
from flask import Flask, render_template, request, jsonify, g
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
# 冷链闭环智能体:加载环境变量
load_dotenv()
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
INSTANCE_DIR = os.path.join(BASE_DIR, "instance")
os.makedirs(INSTANCE_DIR, exist_ok=True)
app = Flask(__name__)
app.config["DATABASE"] = os.path.join(INSTANCE_DIR, "coldchain.db")
app.config["SECRET_KEY"] = os.getenv("SECRET_KEY", "coldchain-secret-key")
app.config["JSON_AS_ASCII"] = False
app.config["MAX_CONTENT_LENGTH"] = 32 * 1024 * 1024
SILICONFLOW_API_KEY = os.getenv("SILICONFLOW_API_KEY", "").strip()
SILICONFLOW_BASE_URL = os.getenv("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1").strip()
SILICONFLOW_MODEL = os.getenv("SILICONFLOW_MODEL", "Qwen/Qwen2.5-7B-Instruct").strip()
def get_db():
# 统一获取数据库连接
db = getattr(g, "_database", None)
if db is None:
db = g._database = sqlite3.connect(app.config["DATABASE"])
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
# 请求结束后关闭连接
db = getattr(g, "_database", None)
if db is not None:
db.close()
def simulate_coldchain_tools(scenario, target, constraints):
seed = len(scenario) + len(target or "") + len(constraints or "")
random.seed(seed)
route_count = max(5, min(18, seed // 40))
hubs = max(2, min(8, seed // 90))
temp_risk = round(random.uniform(0.8, 2.8), 2)
on_time = round(96 + random.uniform(1.2, 2.8), 2)
compliance = round(99.0 + random.uniform(0.2, 0.7), 2)
cost_saving = round(random.uniform(5.0, 12.0), 1)
risk_keywords = ["堵车", "夜间", "限行", "偏差", "高峰", "雨雪", "台风"]
risk_score = 0
for k in risk_keywords:
if k in scenario or k in (constraints or ""):
risk_score += 2
risk_score += min(6, seed // 160)
if risk_score <= 3:
risk_level = "低"
elif risk_score <= 6:
risk_level = "中"
else:
risk_level = "高"
return {
"route_count": route_count,
"hub_count": hubs,
"temp_risk": temp_risk,
"on_time": on_time,
"compliance": compliance,
"cost_saving": cost_saving,
"risk_level": risk_level,
}
def seed_demo(db):
row = db.execute("SELECT COUNT(*) AS c FROM sessions").fetchone()
if row and row["c"]:
return
demo = {
"name": "示例会话 · 医药冷链干线 + 末端",
"scenario": (
"为华东地区 6 个仓与 28 家三甲医院建立 2-8℃医药冷链网络,"
"需要在 24 小时内完成跨城配送,温控偏差不能超过 30 分钟。"
"要求支持节假日高峰、夜间配送以及异常追溯。"
),
"target": "准时交付率 ≥ 98%,温控合规率 ≥ 99.5%,综合成本降低 8%",
"constraints": "夜间限行、部分城市冷链车辆通行证审批较慢,预算上限 450 万/年",
}
cursor = db.execute(
"INSERT INTO sessions (name, scenario, target, constraints) VALUES (?, ?, ?, ?)",
(demo["name"], demo["scenario"], demo["target"], demo["constraints"]),
)
session_id = cursor.lastrowid
tool_result = simulate_coldchain_tools(demo["scenario"], demo["target"], demo["constraints"])
db.execute(
"""
INSERT INTO steps (session_id, step_order, role, step_type, content)
VALUES (?, 1, 'Planner', 'reasoning', ?)
""",
(
session_id,
"## 示例推理结果\n\n"
"- 将华东划分为干线 + 末端两级网络,优先保障医院到区域仓的时效\n"
"- 对夜间与节假日高峰设置冗余线路与备用车辆池\n"
"- 使用温度记录仪与异常告警系统保障 2-8℃ 全程可追溯\n",
),
)
db.execute(
"""
INSERT INTO steps (session_id, step_order, role, step_type, content)
VALUES (?, 2, 'RouteOps', 'tool_action', ?)
""",
(
session_id,
"## 示例工具行动结果\n\n"
f"- 规划干线/支线数:{tool_result['route_count']} 条\n"
f"- 冷链枢纽节点:{tool_result['hub_count']} 个\n"
f"- 温控偏差风险指数:{tool_result['temp_risk']}\n"
f"- 预计准时交付率:{tool_result['on_time']}%\n"
f"- 预计温控合规率:{tool_result['compliance']}%\n"
f"- 预计综合成本节约:{tool_result['cost_saving']}%\n",
),
)
db.execute(
"""
INSERT INTO steps (session_id, step_order, role, step_type, content)
VALUES (?, 3, 'Memory', 'memory', ?)
""",
(
session_id,
"## 示例状态记忆\n\n"
"- 首轮试点重点关注医院急配线路与疫苗仓温控告警阈值\n"
"- 记录各城市夜间限行时间窗口与通行证办理周期\n"
"- 建立合作冷链车队与外包商的服务等级基线\n",
),
)
db.execute(
"""
INSERT INTO steps (session_id, step_order, role, step_type, content)
VALUES (?, 4, 'QualityAuditor', 'validation', ?)
""",
(
session_id,
"## 示例校验与验收\n\n"
"- 首轮模拟结果满足准时率与温控合规目标\n"
"- 建议在真实运营前增加极端天气与高峰场景的压力测试\n",
),
)
db.execute(
"""
INSERT INTO steps (session_id, step_order, role, step_type, content)
VALUES (?, 5, 'Optimizer', 'iteration', ?)
""",
(
session_id,
"## 示例迭代计划\n\n"
"- 第二阶段纳入更多区域仓与三方物流合作方\n"
"- 收集真实运营数据后,进一步优化线路与车辆编组\n",
),
)
db.execute(
"""
INSERT INTO assets (session_id, asset_type, title, content)
VALUES (?, 'report', '冷链闭环运营资产包(示例)', ?)
""",
(
session_id,
"## 资产包概要(示例)\n\n"
"- 冷链网络拓扑草案:干线 + 末端两级结构\n"
"- 温控合规模板:2-8℃ 温度区间与偏差告警策略\n"
"- KPI 指标字典:准时率、合规率、成本节约率\n"
"- 合作方清单:仓、车队与医院联络窗口\n",
),
)
db.execute(
"""
INSERT INTO assets (session_id, asset_type, title, content)
VALUES (?, 'metrics', '本轮关键指标快照(示例)', ?)
""",
(session_id, json.dumps(tool_result, ensure_ascii=False)),
)
db.commit()
def init_db():
db = get_db()
db.execute(
"""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
scenario TEXT NOT NULL,
target TEXT,
constraints TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
db.execute(
"""
CREATE TABLE IF NOT EXISTS steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
step_order INTEGER NOT NULL,
role TEXT NOT NULL,
step_type TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE
)
"""
)
db.execute(
"""
CREATE TABLE IF NOT EXISTS assets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
asset_type TEXT NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE
)
"""
)
db.commit()
seed_demo(db)
with app.app_context():
init_db()
def build_api_url(base_url):
# 统一拼接 SiliconFlow API 地址
if base_url.endswith("/chat/completions"):
return base_url
return f"{base_url.rstrip('/')}/chat/completions"
def call_llm(messages, temperature=0.7, max_tokens=900):
# 调用硅基流大模型,失败时自动回落
if not SILICONFLOW_API_KEY or not SILICONFLOW_API_KEY.startswith("sk-"):
return mock_completion(messages)
payload = {
"model": SILICONFLOW_MODEL,
"messages": messages,
"stream": False,
"temperature": temperature,
"max_tokens": max_tokens,
}
headers = {
"Authorization": f"Bearer {SILICONFLOW_API_KEY}",
"Content-Type": "application/json",
}
try:
resp = requests.post(build_api_url(SILICONFLOW_BASE_URL), json=payload, headers=headers, timeout=20)
if resp.status_code == 200:
data = resp.json()
return data["choices"][0]["message"]["content"]
return mock_completion(messages)
except Exception:
return mock_completion(messages)
def mock_completion(messages):
# 本地模拟输出,保持闭环演示可运行
last_user = ""
for m in reversed(messages):
if m.get("role") == "user":
last_user = m.get("content", "")[:240]
break
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
return (
f"## 本地模拟输出\n\n"
f"- 时间:{now}\n"
f"- 说明:未配置硅基流 API Key,系统使用规则化策略生成示意结果。\n\n"
f"### 场景摘要\n\n{last_user}\n\n"
f"### 建议要点\n\n"
f"- 优先保证温控合规与准时交付\n"
f"- 对高风险线路设置双层冗余方案\n"
f"- 建议分阶段上线与持续迭代"
)
def simulate_coldchain_tools(scenario, target, constraints):
# 工具层模拟:输出可量化指标
seed = len(scenario) + len(target or "") + len(constraints or "")
random.seed(seed)
route_count = max(5, min(18, seed // 40))
hubs = max(2, min(8, seed // 90))
temp_risk = round(random.uniform(0.8, 2.8), 2)
on_time = round(96 + random.uniform(1.2, 2.8), 2)
compliance = round(99.0 + random.uniform(0.2, 0.7), 2)
cost_saving = round(random.uniform(5.0, 12.0), 1)
risk_keywords = ["堵车", "夜间", "限行", "偏差", "高峰", "雨雪", "台风"]
risk_score = 0
for k in risk_keywords:
if k in scenario or k in (constraints or ""):
risk_score += 2
risk_score += min(6, seed // 160)
if risk_score <= 3:
risk_level = "低"
elif risk_score <= 6:
risk_level = "中"
else:
risk_level = "高"
return {
"route_count": route_count,
"hub_count": hubs,
"temp_risk": temp_risk,
"on_time": on_time,
"compliance": compliance,
"cost_saving": cost_saving,
"risk_level": risk_level,
}
def run_coldchain_workflow(scenario, target, constraints):
# 冷链闭环主流程:推理 -> 工具行动 -> 状态记忆 -> 校验验收 -> 迭代复盘
tool_result = simulate_coldchain_tools(scenario, target, constraints)
system_role = (
"你是冷链物流与医药合规专家,负责构建可执行的闭环方案。"
"输出中文 Markdown,结构清晰,强调可落地与可复用资产。"
)
reasoning_prompt = (
f"冷链场景:{scenario}\n"
f"目标:{target or '未指定'}\n"
f"约束:{constraints or '未指定'}\n"
"请生成推理/决策阶段的方案,包含关键假设、优先级、核心路径。"
)
reasoning = call_llm(
[
{"role": "system", "content": system_role},
{"role": "user", "content": reasoning_prompt},
],
temperature=0.6,
)
tool_md = (
"## 工具行动结果(模拟)\n\n"
f"- 规划干线/支线数:{tool_result['route_count']} 条\n"
f"- 冷链枢纽节点:{tool_result['hub_count']} 个\n"
f"- 温控偏差风险指数:{tool_result['temp_risk']}\n"
f"- 预计准时交付率:{tool_result['on_time']}%\n"
f"- 预计温控合规率:{tool_result['compliance']}%\n"
f"- 预计综合成本节约:{tool_result['cost_saving']}%\n"
f"- 风险等级:{tool_result['risk_level']}\n"
)
memory_prompt = (
"请输出可沉淀的状态/记忆资产清单,至少包含:"
"标准化 SOP、关键指标字典、异常追溯模板、合作方清单。"
)
memory_md = call_llm(
[
{"role": "system", "content": system_role},
{"role": "user", "content": memory_prompt},
],
temperature=0.5,
)
validation_prompt = (
f"当前工具指标:准时交付率 {tool_result['on_time']}%,"
f"温控合规率 {tool_result['compliance']}%,成本节约 {tool_result['cost_saving']}%。"
"请生成校验/验收结论,列出是否达标与需补强项。"
)
validation_md = call_llm(
[
{"role": "system", "content": system_role},
{"role": "user", "content": validation_prompt},
],
temperature=0.4,
)
iteration_prompt = (
"请生成迭代/复盘计划,包含下一轮优化假设、需要补采的数据、"
"以及可以立即执行的 3 条改进动作。"
)
iteration_md = call_llm(
[
{"role": "system", "content": system_role},
{"role": "user", "content": iteration_prompt},
],
temperature=0.6,
)
steps = [
{
"step_order": 1,
"role": "Planner",
"step_type": "reasoning",
"content": reasoning,
},
{
"step_order": 2,
"role": "RouteOps",
"step_type": "tool_action",
"content": tool_md,
},
{
"step_order": 3,
"role": "Memory",
"step_type": "memory",
"content": memory_md,
},
{
"step_order": 4,
"role": "QualityAuditor",
"step_type": "validation",
"content": validation_md,
},
{
"step_order": 5,
"role": "Optimizer",
"step_type": "iteration",
"content": iteration_md,
},
]
assets = [
{
"asset_type": "report",
"title": "冷链闭环运营资产包",
"content": (
"## 资产包概要\n\n"
"- 冷链网络拓扑草案\n"
"- 温控合规模板与异常追溯SOP\n"
"- KPI 指标字典与验收阈值\n"
"- 供应商与节点协作清单\n"
),
},
{
"asset_type": "metrics",
"title": "本轮关键指标快照",
"content": json.dumps(tool_result, ensure_ascii=False),
},
]
return steps, assets
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/overview")
def overview():
db = get_db()
session_count = db.execute("SELECT COUNT(*) AS c FROM sessions").fetchone()["c"]
step_count = db.execute("SELECT COUNT(*) AS c FROM steps").fetchone()["c"]
asset_count = db.execute("SELECT COUNT(*) AS c FROM assets").fetchone()["c"]
return jsonify(
{
"session_count": session_count,
"step_count": step_count,
"asset_count": asset_count,
}
)
@app.route("/api/sessions")
def list_sessions():
db = get_db()
rows = db.execute(
"SELECT id, name, scenario, target, created_at FROM sessions ORDER BY id DESC LIMIT 50"
).fetchall()
sessions = [
{
"id": r["id"],
"name": r["name"],
"scenario": r["scenario"],
"target": r["target"],
"created_at": r["created_at"],
}
for r in rows
]
return jsonify({"sessions": sessions})
@app.route("/api/session/<int:session_id>")
def get_session(session_id):
db = get_db()
session = db.execute(
"SELECT * FROM sessions WHERE id = ?", (session_id,)
).fetchone()
if not session:
return jsonify({"error": "会话不存在"}), 404
steps = db.execute(
"SELECT step_order, role, step_type, content, created_at FROM steps WHERE session_id = ? ORDER BY step_order ASC",
(session_id,),
).fetchall()
assets = db.execute(
"SELECT asset_type, title, content, created_at FROM assets WHERE session_id = ? ORDER BY id DESC",
(session_id,),
).fetchall()
return jsonify(
{
"session": dict(session),
"steps": [dict(s) for s in steps],
"assets": [dict(a) for a in assets],
}
)
@app.route("/api/assets")
def list_assets():
db = get_db()
rows = db.execute(
"SELECT id, session_id, asset_type, title, content, created_at FROM assets ORDER BY id DESC LIMIT 50"
).fetchall()
assets = [dict(r) for r in rows]
return jsonify({"assets": assets})
@app.route("/api/upload", methods=["POST"])
def upload_file():
file = request.files.get("file")
if not file:
return jsonify({"error": "未收到上传文件"}), 400
filename = secure_filename(file.filename or "unnamed.bin")
if not filename:
filename = "unnamed.bin"
uploads_dir = os.path.join(INSTANCE_DIR, "uploads")
os.makedirs(uploads_dir, exist_ok=True)
path = os.path.join(uploads_dir, filename)
file.save(path)
size_bytes = os.path.getsize(path)
size_kb = round(size_bytes / 1024, 2)
db = get_db()
db.execute(
"""
INSERT INTO assets (session_id, asset_type, title, content)
VALUES (?, ?, ?, ?)
""",
(
1,
"upload",
f"上传文件:{filename}",
f"大小:{size_kb} KB;存储于内部 uploads 目录,用于演示 Hugging Face 大文件处理策略。",
),
)
db.commit()
return jsonify({"filename": filename, "size_kb": size_kb})
@app.route("/api/run", methods=["POST"])
def run_workflow():
try:
data = request.get_json(force=True)
except Exception:
return jsonify({"error": "请求体必须为 JSON"}), 400
scenario = (data or {}).get("scenario", "").strip()
target = (data or {}).get("target", "").strip()
constraints = (data or {}).get("constraints", "").strip()
name = (data or {}).get("name", "").strip() or "冷链运营会话"
if not scenario:
return jsonify({"error": "请提供冷链场景描述"}), 400
db = get_db()
cursor = db.execute(
"INSERT INTO sessions (name, scenario, target, constraints) VALUES (?, ?, ?, ?)",
(name, scenario, target, constraints),
)
session_id = cursor.lastrowid
steps, assets = run_coldchain_workflow(scenario, target, constraints)
for s in steps:
db.execute(
"""
INSERT INTO steps (session_id, step_order, role, step_type, content)
VALUES (?, ?, ?, ?, ?)
""",
(session_id, s["step_order"], s["role"], s["step_type"], s["content"]),
)
for a in assets:
db.execute(
"""
INSERT INTO assets (session_id, asset_type, title, content)
VALUES (?, ?, ?, ?)
""",
(session_id, a["asset_type"], a["title"], a["content"]),
)
db.commit()
return jsonify(
{
"session_id": session_id,
"steps": steps,
"assets": assets,
}
)
if __name__ == "__main__":
port = int(os.getenv("PORT", "7865"))
app.run(host="0.0.0.0", port=port, debug=False)