import os import sqlite3 import json import datetime import random import requests from flask import Flask, render_template, request, jsonify, g app = Flask(__name__, instance_relative_config=True) app.config["SECRET_KEY"] = "factory-flow-secret-key" app.config["JSON_AS_ASCII"] = False try: os.makedirs(app.instance_path, exist_ok=True) except OSError: pass DB_PATH = os.path.join(app.instance_path, "factory_flow.db") SILICON_FLOW_API_URL = "https://api.siliconflow.cn/v1/chat/completions" DEFAULT_MODEL = "Qwen/Qwen2.5-7B-Instruct" DEFAULT_TEMPLATES = [ { "title": "电池产线良率提升闭环", "scenario": "动力电池产线希望在 2 个月内提升良品率 1.5%,同时降低涂布段停机率。请给出闭环方案与验收指标。", "hint": "优先解决涂布段设备老化与质量波动问题。", }, { "title": "仓配协同排产闭环", "scenario": "电子制造工厂需要将排产与仓配补料联动,降低缺料率并提高交付准时率。", "hint": "关注安全库存阈值与跨班次协同。", }, { "title": "冷链质量追溯闭环", "scenario": "冷链食品工厂的温控波动导致返工率上升,需要建立温控预警与批次追溯体系。", "hint": "明确温控指标和追溯链路。", }, { "title": "设备预防性维护闭环", "scenario": "设备故障频发导致产能波动,要求建立预防性维护节奏与停机风险预测。", "hint": "补齐设备状态监测与维护日志闭环。", }, ] def get_db(): db = getattr(g, "_database", None) if db is None: db = g._database = sqlite3.connect(DB_PATH) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, "_database", None) if db is not None: db.close() def init_db(): db = get_db() db.execute( """ CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, scenario TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) db.execute( """ CREATE TABLE IF NOT EXISTS steps ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id INTEGER NOT NULL, step_order INTEGER NOT NULL, role TEXT NOT NULL, step_type TEXT NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE ) """ ) db.execute( """ CREATE TABLE IF NOT EXISTS assets ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, category TEXT NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) db.execute( """ CREATE TABLE IF NOT EXISTS chats ( id INTEGER PRIMARY KEY AUTOINCREMENT, role TEXT NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) db.commit() seed_default_data(db) def seed_default_data(db): row = db.execute("SELECT COUNT(*) AS c FROM sessions").fetchone() if row and row["c"]: return demos = [ { "name": "示例会话 · 新能源电池产线降本增效", "scenario": ( "一家动力电池工厂计划在 3 个月内将良品率提升 2%,并降低单位能耗 8%。" "当前产线瓶颈集中在涂布与化成段,设备老化导致停机频繁。" "要求输出:可执行的改造路径、数据采集方案、质量验证与验收指标。" ), "hint": "优先解决涂布段产能与能耗问题,同时建立快速验收闭环。", }, { "name": "示例会话 · 智能仓配协同工厂排产", "scenario": ( "一家电子制造企业需要将产线排产与仓配补料联动。" "目标是让关键物料缺料率下降 50%,并保证交付准时率 > 95%。" "请给出排产策略、补料触发规则、异常处理与复盘机制。" ), "hint": "关注关键物料的安全库存阈值与跨班次协同。", }, { "name": "示例会话 · 食品工厂冷链质量闭环", "scenario": ( "一家冷链食品工厂需要降低返工率并提升出货稳定性。" "当前问题集中在冷链温控波动、包装漏检与批次追溯不完整。" "请给出数据采集与批次追溯闭环方案,并输出验收指标。" ), "hint": "优先建立温控波动预警与批次追溯链路。", }, ] for item in demos: cursor = db.execute( "INSERT INTO sessions (name, scenario) VALUES (?, ?)", (item["name"], item["scenario"]), ) session_id = cursor.lastrowid result = run_factory_workflow(item["scenario"], extra_hint=item["hint"]) for s in result["steps"]: db.execute( """ INSERT INTO steps (session_id, step_order, role, step_type, content) VALUES (?, ?, ?, ?, ?) """, (session_id, s["step_order"], s["role"], s["step_type"], s["content"]), ) db.execute( """ INSERT INTO assets (title, category, content) VALUES (?, ?, ?) """, ( "基准产线画像模板", "流程资产", "用于沉淀关键工序、设备台账与质量指标的基础模板,可直接复用为后续方案的输入。", ), ) db.execute( """ INSERT INTO assets (title, category, content) VALUES (?, ?, ?) """, ( "关键工序异常响应卡", "运营资产", "当检测到异常停机、良率波动或温控偏移时,按照分级响应流程快速锁定问题并记录复盘。", ), ) db.execute( "INSERT INTO chats (role, content) VALUES (?, ?)", ("assistant", "你好,我可以帮你评估产线瓶颈、成本与交付风险。"), ) db.commit() def call_llm(messages, model=DEFAULT_MODEL, max_tokens=1200): api_key = os.getenv("SILICONFLOW_API_KEY", "").strip() def mock_completion(): now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") last_user = "" for m in reversed(messages): if m.get("role") == "user": last_user = m.get("content", "")[:300] break return ( f"## 本地模拟智能体回复\n\n" f"- 时间:{now}\n" f"- 说明:当前未连接硅基流 API,系统使用本地规则进行推演。\n\n" f"### 输入摘要\n\n" f"{last_user}\n\n" f"### 建议\n\n" f"- 先建立数据采集与异常预警,再扩展到全链路优化。" ) if not api_key or not api_key.startswith("sk-"): return mock_completion() headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "stream": False, "max_tokens": max_tokens, } try: resp = requests.post(SILICON_FLOW_API_URL, headers=headers, json=payload, timeout=20) if resp.status_code == 200: data = resp.json() return data["choices"][0]["message"]["content"] return mock_completion() except Exception: return mock_completion() def simulate_factory_tools(scenario_text): length = len(scenario_text) demand_index = min(96, max(55, 55 + length % 42)) energy_cost_index = round(0.62 + (length % 36) * 0.01, 2) utilization = min(93, 66 + (length % 24)) defect_risk = min(88, 35 + (length % 52)) bottlenecks = ["涂布段", "化成段", "贴片段", "包装段", "检测段", "仓配补料"] weighted = [] for item in bottlenecks: weight = 1 if item in scenario_text: weight += 2 weighted.extend([item] * weight) bottleneck = random.choice(weighted) keywords = ["冷链", "良品率", "缺料", "停机", "能耗", "交付", "追溯", "排产"] risk_score = 0 for k in keywords: if k in scenario_text: risk_score += 7 risk_score = min(92, risk_score + length % 12) sla_score = min(98, 72 + (100 - risk_score) // 4) oee = min(90, max(58, 60 + (length % 28))) throughput_gain = min(22, 6 + (length % 16)) cost_reduction = min(20, 5 + (length % 14)) energy_saving = min(18, 4 + (length % 12)) inventory_turnover = min(14, 6 + (length % 9)) return { "demand_index": demand_index, "energy_cost_index": energy_cost_index, "capacity_utilization": utilization, "defect_risk": defect_risk, "bottleneck": bottleneck, "risk_score": risk_score, "sla_score": sla_score, "oee": oee, "throughput_gain": throughput_gain, "cost_reduction": cost_reduction, "energy_saving": energy_saving, "inventory_turnover": inventory_turnover, } def build_state_memory(scenario_text, tool_data): return ( "### 产线状态记忆快照\n\n" f"- 当前瓶颈:{tool_data['bottleneck']}\n" f"- 产能利用率:{tool_data['capacity_utilization']}%\n" f"- 缺陷风险指数:{tool_data['defect_risk']}\n" f"- OEE 综合效率:{tool_data['oee']}%\n" f"- 能耗成本指数:{tool_data['energy_cost_index']}\n\n" "### 可复用资产\n\n" "- 产线瓶颈诊断模板\n" "- 工序 KPI 看板结构\n" "- 质量验收与复盘清单" ) def build_acceptance(tool_data): score = round( (tool_data["sla_score"] * 0.5) + (100 - tool_data["defect_risk"]) * 0.3 + (100 - tool_data["risk_score"]) * 0.2, 1, ) verdict = "通过" if score >= 75 else "需整改" return { "acceptance_score": score, "verdict": verdict, "key_checks": [ "瓶颈段产能提升是否达到目标", "能耗与成本下降是否有数据闭环", "质量缺陷率是否稳定下降", "异常处理与复盘是否形成闭环机制", ], } def derive_risk_level(score): if score <= 35: return "低" if score <= 70: return "中" return "高" def build_insight(tool_data): risk_level = derive_risk_level(tool_data["risk_score"]) gaps = [] if tool_data["defect_risk"] >= 60: gaps.append("质量波动偏高") if tool_data["capacity_utilization"] < 75: gaps.append("产能利用率偏低") if tool_data["energy_cost_index"] > 0.75: gaps.append("能耗成本偏高") if not gaps: gaps.append("暂无明显短板") return { "risk_level": risk_level, "gaps": gaps, "roi_hint": f"预计成本下降 {tool_data['cost_reduction']}%,产能提升 {tool_data['throughput_gain']}%。", } def build_action_plan(tool_data): return ( "### 执行路线图\n\n" "#### 第 1 周:数据基线建立\n" "- 梳理瓶颈工序与关键设备状态\n" "- 建立良率、能耗、停机与交付基线\n\n" "#### 第 2-4 周:瓶颈突破与协同排产\n" f"- 重点优化工序:{tool_data['bottleneck']}\n" "- 建立补料触发规则与异常响应流程\n" "- 引入批次追溯与质量预警\n\n" "#### 第 5-8 周:质量闭环与节能优化\n" "- 固化标准作业与质量验收清单\n" "- 建立节能计划与设备维护节奏\n\n" "#### 第 9-12 周:规模化复制与复盘\n" "- 复制闭环方案到相似产线\n" "- 形成复盘报告与资产沉淀" ) def run_factory_workflow(scenario_text, extra_hint=None): system_prompt = ( "你是产线闭环运营智能体,负责从推理决策到工具行动、验收与复盘。" "输出内容必须是可执行的中文 Markdown。" ) hint = extra_hint or "请给出可落地的改造步骤与量化指标。" reasoning = call_llm( [ {"role": "system", "content": system_prompt}, { "role": "user", "content": f"场景:{scenario_text}\n要求:{hint}\n请输出决策推理与行动路径。", }, ] ) tool_data = simulate_factory_tools(scenario_text) tool_payload = json.dumps(tool_data, ensure_ascii=False, indent=2) state_memory = build_state_memory(scenario_text, tool_data) acceptance = build_acceptance(tool_data) action_plan = build_action_plan(tool_data) acceptance_md = ( "### 验收与校验\n\n" f"- 综合得分:{acceptance['acceptance_score']}\n" f"- 验收结论:{acceptance['verdict']}\n\n" "#### 关键校验点\n\n" + "\n".join([f"- {c}" for c in acceptance["key_checks"]]) ) iteration = call_llm( [ {"role": "system", "content": system_prompt}, { "role": "user", "content": ( f"基于以下产线工具结果与验收结论,输出迭代与复盘计划:\n\n" f"工具结果:{tool_payload}\n\n" f"验收结论:{acceptance['verdict']}\n" ), }, ] ) insight = build_insight(tool_data) return { "steps": [ { "step_order": 1, "role": "决策官", "step_type": "reasoning", "content": reasoning, }, { "step_order": 2, "role": "计划官", "step_type": "plan", "content": action_plan, }, { "step_order": 3, "role": "工具引擎", "step_type": "tool_action", "content": tool_payload, }, { "step_order": 4, "role": "状态记忆", "step_type": "state_memory", "content": state_memory, }, { "step_order": 5, "role": "验收官", "step_type": "verification", "content": acceptance_md, }, { "step_order": 6, "role": "复盘官", "step_type": "iteration", "content": iteration, }, ], "tool_data": tool_data, "acceptance": acceptance, "insight": insight, "state_memory": state_memory, } @app.route("/") def index(): return render_template("index.html") @app.route("/api/sessions", methods=["GET"]) def list_sessions(): db = get_db() rows = db.execute( "SELECT id, name, scenario, created_at FROM sessions ORDER BY id DESC LIMIT 50" ).fetchall() sessions = [ { "id": r["id"], "name": r["name"], "scenario": r["scenario"], "created_at": r["created_at"], } for r in rows ] return jsonify({"sessions": sessions}) @app.route("/api/templates", methods=["GET"]) def list_templates(): return jsonify({"templates": DEFAULT_TEMPLATES}) @app.route("/api/overview", methods=["GET"]) def overview(): db = get_db() session_count = db.execute("SELECT COUNT(*) AS c FROM sessions").fetchone()["c"] asset_count = db.execute("SELECT COUNT(*) AS c FROM assets").fetchone()["c"] chat_count = db.execute("SELECT COUNT(*) AS c FROM chats").fetchone()["c"] latest_tool = db.execute( "SELECT content FROM steps WHERE step_type = 'tool_action' ORDER BY id DESC LIMIT 1" ).fetchone() tool_data = {} if latest_tool: try: tool_data = json.loads(latest_tool["content"]) except Exception: tool_data = {} return jsonify( { "session_count": session_count, "asset_count": asset_count, "chat_count": chat_count, "latest_tool": tool_data, } ) @app.route("/api/sessions/", methods=["GET"]) def get_session_detail(session_id): db = get_db() steps = db.execute( """ SELECT step_order, role, step_type, content, created_at FROM steps WHERE session_id = ? ORDER BY step_order ASC """, (session_id,), ).fetchall() step_list = [ { "step_order": s["step_order"], "role": s["role"], "step_type": s["step_type"], "content": s["content"], "created_at": s["created_at"], } for s in steps ] return jsonify({"steps": step_list}) @app.route("/api/run", methods=["POST"]) def run_workflow(): try: data = request.get_json(force=True) except Exception: return jsonify({"error": "请求体必须为 JSON"}), 400 scenario = (data or {}).get("scenario", "").strip() name = (data or {}).get("name", "").strip() or "产线闭环会话" hint = (data or {}).get("hint", "").strip() if not scenario: return jsonify({"error": "请提供场景描述"}), 400 db = get_db() cursor = db.execute( "INSERT INTO sessions (name, scenario) VALUES (?, ?)", (name, scenario) ) session_id = cursor.lastrowid result = run_factory_workflow(scenario, extra_hint=hint) for s in result["steps"]: db.execute( """ INSERT INTO steps (session_id, step_order, role, step_type, content) VALUES (?, ?, ?, ?, ?) """, (session_id, s["step_order"], s["role"], s["step_type"], s["content"]), ) if result["acceptance"]["verdict"] == "通过": db.execute( "INSERT INTO assets (title, category, content) VALUES (?, ?, ?)", ( f"{name} · 闭环方案摘要", "闭环资产", result["state_memory"], ), ) db.commit() return jsonify( { "session_id": session_id, "tool_data": result["tool_data"], "acceptance": result["acceptance"], "insight": result["insight"], } ), 201 @app.route("/api/assets", methods=["GET"]) def list_assets(): db = get_db() rows = db.execute( "SELECT id, title, category, content, created_at FROM assets ORDER BY id DESC" ).fetchall() assets = [ { "id": r["id"], "title": r["title"], "category": r["category"], "content": r["content"], "created_at": r["created_at"], } for r in rows ] return jsonify({"assets": assets}) @app.route("/api/assets", methods=["POST"]) def create_asset(): data = request.get_json(force=True) title = (data.get("title") or "").strip() category = (data.get("category") or "").strip() content = (data.get("content") or "").strip() if not title or not category or not content: return jsonify({"error": "标题、分类与内容不能为空"}), 400 db = get_db() db.execute( "INSERT INTO assets (title, category, content) VALUES (?, ?, ?)", (title, category, content), ) db.commit() return jsonify({"status": "ok"}) @app.route("/api/chats", methods=["GET"]) def list_chats(): db = get_db() rows = db.execute( "SELECT role, content, created_at FROM chats ORDER BY id DESC LIMIT 50" ).fetchall() chats = [ {"role": r["role"], "content": r["content"], "created_at": r["created_at"]} for r in rows[::-1] ] return jsonify({"messages": chats}) @app.route("/api/chat", methods=["POST"]) def chat(): data = request.get_json(force=True) message = (data.get("message") or "").strip() if not message: return jsonify({"error": "消息不能为空"}), 400 system_prompt = ( "你是制造业产线运营顾问,擅长在成本、质量、交付与风险之间做平衡决策。" "你的回复必须是中文 Markdown。" ) reply = call_llm( [ {"role": "system", "content": system_prompt}, {"role": "user", "content": message}, ] ) db = get_db() db.execute( "INSERT INTO chats (role, content) VALUES (?, ?)", ("user", message) ) db.execute( "INSERT INTO chats (role, content) VALUES (?, ?)", ("assistant", reply) ) db.commit() return jsonify({"reply": reply}) with app.app_context(): init_db() if __name__ == "__main__": port = int(os.getenv("PORT", "7868")) app.run(host="0.0.0.0", port=port, debug=False)