Trae Assistant
init factory flow agent
be7a7ac
import os
import sqlite3
import json
import datetime
import random
import requests
from flask import Flask, render_template, request, jsonify, g
app = Flask(__name__, instance_relative_config=True)
app.config["SECRET_KEY"] = "factory-flow-secret-key"
app.config["JSON_AS_ASCII"] = False
try:
os.makedirs(app.instance_path, exist_ok=True)
except OSError:
pass
DB_PATH = os.path.join(app.instance_path, "factory_flow.db")
SILICON_FLOW_API_URL = "https://api.siliconflow.cn/v1/chat/completions"
DEFAULT_MODEL = "Qwen/Qwen2.5-7B-Instruct"
DEFAULT_TEMPLATES = [
{
"title": "电池产线良率提升闭环",
"scenario": "动力电池产线希望在 2 个月内提升良品率 1.5%,同时降低涂布段停机率。请给出闭环方案与验收指标。",
"hint": "优先解决涂布段设备老化与质量波动问题。",
},
{
"title": "仓配协同排产闭环",
"scenario": "电子制造工厂需要将排产与仓配补料联动,降低缺料率并提高交付准时率。",
"hint": "关注安全库存阈值与跨班次协同。",
},
{
"title": "冷链质量追溯闭环",
"scenario": "冷链食品工厂的温控波动导致返工率上升,需要建立温控预警与批次追溯体系。",
"hint": "明确温控指标和追溯链路。",
},
{
"title": "设备预防性维护闭环",
"scenario": "设备故障频发导致产能波动,要求建立预防性维护节奏与停机风险预测。",
"hint": "补齐设备状态监测与维护日志闭环。",
},
]
def get_db():
db = getattr(g, "_database", None)
if db is None:
db = g._database = sqlite3.connect(DB_PATH)
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, "_database", None)
if db is not None:
db.close()
def init_db():
db = get_db()
db.execute(
"""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
scenario TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
db.execute(
"""
CREATE TABLE IF NOT EXISTS steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
step_order INTEGER NOT NULL,
role TEXT NOT NULL,
step_type TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE
)
"""
)
db.execute(
"""
CREATE TABLE IF NOT EXISTS assets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
category TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
db.execute(
"""
CREATE TABLE IF NOT EXISTS chats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
db.commit()
seed_default_data(db)
def seed_default_data(db):
row = db.execute("SELECT COUNT(*) AS c FROM sessions").fetchone()
if row and row["c"]:
return
demos = [
{
"name": "示例会话 · 新能源电池产线降本增效",
"scenario": (
"一家动力电池工厂计划在 3 个月内将良品率提升 2%,并降低单位能耗 8%。"
"当前产线瓶颈集中在涂布与化成段,设备老化导致停机频繁。"
"要求输出:可执行的改造路径、数据采集方案、质量验证与验收指标。"
),
"hint": "优先解决涂布段产能与能耗问题,同时建立快速验收闭环。",
},
{
"name": "示例会话 · 智能仓配协同工厂排产",
"scenario": (
"一家电子制造企业需要将产线排产与仓配补料联动。"
"目标是让关键物料缺料率下降 50%,并保证交付准时率 > 95%。"
"请给出排产策略、补料触发规则、异常处理与复盘机制。"
),
"hint": "关注关键物料的安全库存阈值与跨班次协同。",
},
{
"name": "示例会话 · 食品工厂冷链质量闭环",
"scenario": (
"一家冷链食品工厂需要降低返工率并提升出货稳定性。"
"当前问题集中在冷链温控波动、包装漏检与批次追溯不完整。"
"请给出数据采集与批次追溯闭环方案,并输出验收指标。"
),
"hint": "优先建立温控波动预警与批次追溯链路。",
},
]
for item in demos:
cursor = db.execute(
"INSERT INTO sessions (name, scenario) VALUES (?, ?)",
(item["name"], item["scenario"]),
)
session_id = cursor.lastrowid
result = run_factory_workflow(item["scenario"], extra_hint=item["hint"])
for s in result["steps"]:
db.execute(
"""
INSERT INTO steps (session_id, step_order, role, step_type, content)
VALUES (?, ?, ?, ?, ?)
""",
(session_id, s["step_order"], s["role"], s["step_type"], s["content"]),
)
db.execute(
"""
INSERT INTO assets (title, category, content)
VALUES (?, ?, ?)
""",
(
"基准产线画像模板",
"流程资产",
"用于沉淀关键工序、设备台账与质量指标的基础模板,可直接复用为后续方案的输入。",
),
)
db.execute(
"""
INSERT INTO assets (title, category, content)
VALUES (?, ?, ?)
""",
(
"关键工序异常响应卡",
"运营资产",
"当检测到异常停机、良率波动或温控偏移时,按照分级响应流程快速锁定问题并记录复盘。",
),
)
db.execute(
"INSERT INTO chats (role, content) VALUES (?, ?)",
("assistant", "你好,我可以帮你评估产线瓶颈、成本与交付风险。"),
)
db.commit()
def call_llm(messages, model=DEFAULT_MODEL, max_tokens=1200):
api_key = os.getenv("SILICONFLOW_API_KEY", "").strip()
def mock_completion():
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
last_user = ""
for m in reversed(messages):
if m.get("role") == "user":
last_user = m.get("content", "")[:300]
break
return (
f"## 本地模拟智能体回复\n\n"
f"- 时间:{now}\n"
f"- 说明:当前未连接硅基流 API,系统使用本地规则进行推演。\n\n"
f"### 输入摘要\n\n"
f"{last_user}\n\n"
f"### 建议\n\n"
f"- 先建立数据采集与异常预警,再扩展到全链路优化。"
)
if not api_key or not api_key.startswith("sk-"):
return mock_completion()
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"stream": False,
"max_tokens": max_tokens,
}
try:
resp = requests.post(SILICON_FLOW_API_URL, headers=headers, json=payload, timeout=20)
if resp.status_code == 200:
data = resp.json()
return data["choices"][0]["message"]["content"]
return mock_completion()
except Exception:
return mock_completion()
def simulate_factory_tools(scenario_text):
length = len(scenario_text)
demand_index = min(96, max(55, 55 + length % 42))
energy_cost_index = round(0.62 + (length % 36) * 0.01, 2)
utilization = min(93, 66 + (length % 24))
defect_risk = min(88, 35 + (length % 52))
bottlenecks = ["涂布段", "化成段", "贴片段", "包装段", "检测段", "仓配补料"]
weighted = []
for item in bottlenecks:
weight = 1
if item in scenario_text:
weight += 2
weighted.extend([item] * weight)
bottleneck = random.choice(weighted)
keywords = ["冷链", "良品率", "缺料", "停机", "能耗", "交付", "追溯", "排产"]
risk_score = 0
for k in keywords:
if k in scenario_text:
risk_score += 7
risk_score = min(92, risk_score + length % 12)
sla_score = min(98, 72 + (100 - risk_score) // 4)
oee = min(90, max(58, 60 + (length % 28)))
throughput_gain = min(22, 6 + (length % 16))
cost_reduction = min(20, 5 + (length % 14))
energy_saving = min(18, 4 + (length % 12))
inventory_turnover = min(14, 6 + (length % 9))
return {
"demand_index": demand_index,
"energy_cost_index": energy_cost_index,
"capacity_utilization": utilization,
"defect_risk": defect_risk,
"bottleneck": bottleneck,
"risk_score": risk_score,
"sla_score": sla_score,
"oee": oee,
"throughput_gain": throughput_gain,
"cost_reduction": cost_reduction,
"energy_saving": energy_saving,
"inventory_turnover": inventory_turnover,
}
def build_state_memory(scenario_text, tool_data):
return (
"### 产线状态记忆快照\n\n"
f"- 当前瓶颈:{tool_data['bottleneck']}\n"
f"- 产能利用率:{tool_data['capacity_utilization']}%\n"
f"- 缺陷风险指数:{tool_data['defect_risk']}\n"
f"- OEE 综合效率:{tool_data['oee']}%\n"
f"- 能耗成本指数:{tool_data['energy_cost_index']}\n\n"
"### 可复用资产\n\n"
"- 产线瓶颈诊断模板\n"
"- 工序 KPI 看板结构\n"
"- 质量验收与复盘清单"
)
def build_acceptance(tool_data):
score = round(
(tool_data["sla_score"] * 0.5)
+ (100 - tool_data["defect_risk"]) * 0.3
+ (100 - tool_data["risk_score"]) * 0.2,
1,
)
verdict = "通过" if score >= 75 else "需整改"
return {
"acceptance_score": score,
"verdict": verdict,
"key_checks": [
"瓶颈段产能提升是否达到目标",
"能耗与成本下降是否有数据闭环",
"质量缺陷率是否稳定下降",
"异常处理与复盘是否形成闭环机制",
],
}
def derive_risk_level(score):
if score <= 35:
return "低"
if score <= 70:
return "中"
return "高"
def build_insight(tool_data):
risk_level = derive_risk_level(tool_data["risk_score"])
gaps = []
if tool_data["defect_risk"] >= 60:
gaps.append("质量波动偏高")
if tool_data["capacity_utilization"] < 75:
gaps.append("产能利用率偏低")
if tool_data["energy_cost_index"] > 0.75:
gaps.append("能耗成本偏高")
if not gaps:
gaps.append("暂无明显短板")
return {
"risk_level": risk_level,
"gaps": gaps,
"roi_hint": f"预计成本下降 {tool_data['cost_reduction']}%,产能提升 {tool_data['throughput_gain']}%。",
}
def build_action_plan(tool_data):
return (
"### 执行路线图\n\n"
"#### 第 1 周:数据基线建立\n"
"- 梳理瓶颈工序与关键设备状态\n"
"- 建立良率、能耗、停机与交付基线\n\n"
"#### 第 2-4 周:瓶颈突破与协同排产\n"
f"- 重点优化工序:{tool_data['bottleneck']}\n"
"- 建立补料触发规则与异常响应流程\n"
"- 引入批次追溯与质量预警\n\n"
"#### 第 5-8 周:质量闭环与节能优化\n"
"- 固化标准作业与质量验收清单\n"
"- 建立节能计划与设备维护节奏\n\n"
"#### 第 9-12 周:规模化复制与复盘\n"
"- 复制闭环方案到相似产线\n"
"- 形成复盘报告与资产沉淀"
)
def run_factory_workflow(scenario_text, extra_hint=None):
system_prompt = (
"你是产线闭环运营智能体,负责从推理决策到工具行动、验收与复盘。"
"输出内容必须是可执行的中文 Markdown。"
)
hint = extra_hint or "请给出可落地的改造步骤与量化指标。"
reasoning = call_llm(
[
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": f"场景:{scenario_text}\n要求:{hint}\n请输出决策推理与行动路径。",
},
]
)
tool_data = simulate_factory_tools(scenario_text)
tool_payload = json.dumps(tool_data, ensure_ascii=False, indent=2)
state_memory = build_state_memory(scenario_text, tool_data)
acceptance = build_acceptance(tool_data)
action_plan = build_action_plan(tool_data)
acceptance_md = (
"### 验收与校验\n\n"
f"- 综合得分:{acceptance['acceptance_score']}\n"
f"- 验收结论:{acceptance['verdict']}\n\n"
"#### 关键校验点\n\n"
+ "\n".join([f"- {c}" for c in acceptance["key_checks"]])
)
iteration = call_llm(
[
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": (
f"基于以下产线工具结果与验收结论,输出迭代与复盘计划:\n\n"
f"工具结果:{tool_payload}\n\n"
f"验收结论:{acceptance['verdict']}\n"
),
},
]
)
insight = build_insight(tool_data)
return {
"steps": [
{
"step_order": 1,
"role": "决策官",
"step_type": "reasoning",
"content": reasoning,
},
{
"step_order": 2,
"role": "计划官",
"step_type": "plan",
"content": action_plan,
},
{
"step_order": 3,
"role": "工具引擎",
"step_type": "tool_action",
"content": tool_payload,
},
{
"step_order": 4,
"role": "状态记忆",
"step_type": "state_memory",
"content": state_memory,
},
{
"step_order": 5,
"role": "验收官",
"step_type": "verification",
"content": acceptance_md,
},
{
"step_order": 6,
"role": "复盘官",
"step_type": "iteration",
"content": iteration,
},
],
"tool_data": tool_data,
"acceptance": acceptance,
"insight": insight,
"state_memory": state_memory,
}
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/sessions", methods=["GET"])
def list_sessions():
db = get_db()
rows = db.execute(
"SELECT id, name, scenario, created_at FROM sessions ORDER BY id DESC LIMIT 50"
).fetchall()
sessions = [
{
"id": r["id"],
"name": r["name"],
"scenario": r["scenario"],
"created_at": r["created_at"],
}
for r in rows
]
return jsonify({"sessions": sessions})
@app.route("/api/templates", methods=["GET"])
def list_templates():
return jsonify({"templates": DEFAULT_TEMPLATES})
@app.route("/api/overview", methods=["GET"])
def overview():
db = get_db()
session_count = db.execute("SELECT COUNT(*) AS c FROM sessions").fetchone()["c"]
asset_count = db.execute("SELECT COUNT(*) AS c FROM assets").fetchone()["c"]
chat_count = db.execute("SELECT COUNT(*) AS c FROM chats").fetchone()["c"]
latest_tool = db.execute(
"SELECT content FROM steps WHERE step_type = 'tool_action' ORDER BY id DESC LIMIT 1"
).fetchone()
tool_data = {}
if latest_tool:
try:
tool_data = json.loads(latest_tool["content"])
except Exception:
tool_data = {}
return jsonify(
{
"session_count": session_count,
"asset_count": asset_count,
"chat_count": chat_count,
"latest_tool": tool_data,
}
)
@app.route("/api/sessions/<int:session_id>", methods=["GET"])
def get_session_detail(session_id):
db = get_db()
steps = db.execute(
"""
SELECT step_order, role, step_type, content, created_at
FROM steps WHERE session_id = ?
ORDER BY step_order ASC
""",
(session_id,),
).fetchall()
step_list = [
{
"step_order": s["step_order"],
"role": s["role"],
"step_type": s["step_type"],
"content": s["content"],
"created_at": s["created_at"],
}
for s in steps
]
return jsonify({"steps": step_list})
@app.route("/api/run", methods=["POST"])
def run_workflow():
try:
data = request.get_json(force=True)
except Exception:
return jsonify({"error": "请求体必须为 JSON"}), 400
scenario = (data or {}).get("scenario", "").strip()
name = (data or {}).get("name", "").strip() or "产线闭环会话"
hint = (data or {}).get("hint", "").strip()
if not scenario:
return jsonify({"error": "请提供场景描述"}), 400
db = get_db()
cursor = db.execute(
"INSERT INTO sessions (name, scenario) VALUES (?, ?)", (name, scenario)
)
session_id = cursor.lastrowid
result = run_factory_workflow(scenario, extra_hint=hint)
for s in result["steps"]:
db.execute(
"""
INSERT INTO steps (session_id, step_order, role, step_type, content)
VALUES (?, ?, ?, ?, ?)
""",
(session_id, s["step_order"], s["role"], s["step_type"], s["content"]),
)
if result["acceptance"]["verdict"] == "通过":
db.execute(
"INSERT INTO assets (title, category, content) VALUES (?, ?, ?)",
(
f"{name} · 闭环方案摘要",
"闭环资产",
result["state_memory"],
),
)
db.commit()
return jsonify(
{
"session_id": session_id,
"tool_data": result["tool_data"],
"acceptance": result["acceptance"],
"insight": result["insight"],
}
), 201
@app.route("/api/assets", methods=["GET"])
def list_assets():
db = get_db()
rows = db.execute(
"SELECT id, title, category, content, created_at FROM assets ORDER BY id DESC"
).fetchall()
assets = [
{
"id": r["id"],
"title": r["title"],
"category": r["category"],
"content": r["content"],
"created_at": r["created_at"],
}
for r in rows
]
return jsonify({"assets": assets})
@app.route("/api/assets", methods=["POST"])
def create_asset():
data = request.get_json(force=True)
title = (data.get("title") or "").strip()
category = (data.get("category") or "").strip()
content = (data.get("content") or "").strip()
if not title or not category or not content:
return jsonify({"error": "标题、分类与内容不能为空"}), 400
db = get_db()
db.execute(
"INSERT INTO assets (title, category, content) VALUES (?, ?, ?)",
(title, category, content),
)
db.commit()
return jsonify({"status": "ok"})
@app.route("/api/chats", methods=["GET"])
def list_chats():
db = get_db()
rows = db.execute(
"SELECT role, content, created_at FROM chats ORDER BY id DESC LIMIT 50"
).fetchall()
chats = [
{"role": r["role"], "content": r["content"], "created_at": r["created_at"]}
for r in rows[::-1]
]
return jsonify({"messages": chats})
@app.route("/api/chat", methods=["POST"])
def chat():
data = request.get_json(force=True)
message = (data.get("message") or "").strip()
if not message:
return jsonify({"error": "消息不能为空"}), 400
system_prompt = (
"你是制造业产线运营顾问,擅长在成本、质量、交付与风险之间做平衡决策。"
"你的回复必须是中文 Markdown。"
)
reply = call_llm(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": message},
]
)
db = get_db()
db.execute(
"INSERT INTO chats (role, content) VALUES (?, ?)", ("user", message)
)
db.execute(
"INSERT INTO chats (role, content) VALUES (?, ?)", ("assistant", reply)
)
db.commit()
return jsonify({"reply": reply})
with app.app_context():
init_db()
if __name__ == "__main__":
port = int(os.getenv("PORT", "7868"))
app.run(host="0.0.0.0", port=port, debug=False)