| import argparse |
| import ast |
| import json |
| import os |
| import re |
| import sys |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
| @dataclass |
| class ToolCall: |
| name: str |
| args: Dict[str, Any] |
|
|
|
|
| @dataclass |
| class ToolResult: |
| ok: bool |
| output: str |
| artifacts: List[str] |
|
|
|
|
| def _tokenize(s: str) -> List[str]: |
| s = s.lower() |
| s = re.sub(r"[^0-9a-z\u4e00-\u9fff]+", " ", s) |
| parts = [p.strip() for p in s.split() if p.strip()] |
| return parts |
|
|
|
|
| def tool_search_kb(query: str, kb_dir: str, top_k: int = 4) -> ToolResult: |
| base = Path(kb_dir) |
| if not base.exists(): |
| return ToolResult(ok=True, output="[]", artifacts=[]) |
|
|
| q_tokens = _tokenize(query) |
| if not q_tokens: |
| return ToolResult(ok=True, output="[]", artifacts=[]) |
|
|
| hits: List[Tuple[int, str, str]] = [] |
| for p in sorted(base.glob("**/*.md")): |
| try: |
| text = p.read_text(encoding="utf-8") |
| except Exception: |
| continue |
| chunks = [c.strip() for c in re.split(r"\n\s*\n+", text) if c.strip()] |
| for c in chunks: |
| c_tokens = _tokenize(c) |
| score = sum(1 for t in q_tokens if t in c_tokens) |
| if score <= 0: |
| continue |
| hits.append((score, str(p), c)) |
|
|
| hits.sort(key=lambda x: (-x[0], x[1])) |
| picked = hits[: max(0, int(top_k))] |
| out = [{"score": s, "source": src, "text": txt} for (s, src, txt) in picked] |
| return ToolResult(ok=True, output=json.dumps(out, ensure_ascii=False, indent=2), artifacts=[]) |
|
|
|
|
| class _SafeEval(ast.NodeVisitor): |
| def visit(self, node): |
| return super().visit(node) |
|
|
| def generic_visit(self, node): |
| raise ValueError(f"unsupported: {node.__class__.__name__}") |
|
|
| def visit_Expression(self, node: ast.Expression): |
| return self.visit(node.body) |
|
|
| def visit_BinOp(self, node: ast.BinOp): |
| left = self.visit(node.left) |
| right = self.visit(node.right) |
| if isinstance(node.op, ast.Add): |
| return left + right |
| if isinstance(node.op, ast.Sub): |
| return left - right |
| if isinstance(node.op, ast.Mult): |
| return left * right |
| if isinstance(node.op, ast.Div): |
| return left / right |
| if isinstance(node.op, ast.FloorDiv): |
| return left // right |
| if isinstance(node.op, ast.Mod): |
| return left % right |
| if isinstance(node.op, ast.Pow): |
| return left ** right |
| raise ValueError("unsupported operator") |
|
|
| def visit_UnaryOp(self, node: ast.UnaryOp): |
| v = self.visit(node.operand) |
| if isinstance(node.op, ast.UAdd): |
| return +v |
| if isinstance(node.op, ast.USub): |
| return -v |
| raise ValueError("unsupported unary operator") |
|
|
| def visit_Constant(self, node: ast.Constant): |
| if isinstance(node.value, (int, float)): |
| return node.value |
| raise ValueError("only numbers allowed") |
|
|
|
|
| def tool_calc(expression: str) -> ToolResult: |
| expr = expression.strip() |
| if not re.fullmatch(r"[0-9\.\s\+\-\*\/\(\)\%\^]+", expr): |
| return ToolResult(ok=False, output="表达式包含不允许的字符", artifacts=[]) |
| expr = expr.replace("^", "**") |
| try: |
| tree = ast.parse(expr, mode="eval") |
| value = _SafeEval().visit(tree) |
| except Exception as e: |
| return ToolResult(ok=False, output=f"计算失败: {e}", artifacts=[]) |
| return ToolResult(ok=True, output=str(value), artifacts=[]) |
|
|
|
|
| def tool_write_text(path: str, text: str) -> ToolResult: |
| p = Path(path) |
| p.parent.mkdir(parents=True, exist_ok=True) |
| p.write_text(text, encoding="utf-8") |
| return ToolResult(ok=True, output=f"wrote: {p}", artifacts=[str(p)]) |
|
|
|
|
| def tool_read_text(path: str) -> ToolResult: |
| p = Path(path) |
| if not p.exists(): |
| return ToolResult(ok=False, output="not found", artifacts=[]) |
| return ToolResult(ok=True, output=p.read_text(encoding="utf-8"), artifacts=[]) |
|
|
|
|
| def _extract_math_expr(goal: str) -> Optional[str]: |
| m = re.search(r"(\d[\d\s\+\-\*\/\(\)\%\^\.]*\d)", goal) |
| if not m: |
| return None |
| expr = m.group(1) |
| expr = re.sub(r"\s+", "", expr) |
| if len(expr) > 120: |
| return None |
| if not re.search(r"[\+\-\*\/\%\^]", expr): |
| return None |
| return expr |
|
|
|
|
| def _extract_faq_count(goal: str) -> Optional[int]: |
| m = re.search(r"(\d+)\s*条\s*FAQ", goal, flags=re.IGNORECASE) |
| if m: |
| return max(1, min(20, int(m.group(1)))) |
| m = re.search(r"(\d+)\s*条", goal) |
| if m and ("faq" in goal.lower() or "常见问题" in goal): |
| return max(1, min(20, int(m.group(1)))) |
| if "faq" in goal.lower() or "常见问题" in goal: |
| return 5 |
| return None |
|
|
|
|
| def _norm_space(s: str) -> str: |
| return re.sub(r"\s+", " ", (s or "").strip()) |
|
|
|
|
| def _lead_to_goal(lead: Dict[str, Any]) -> str: |
| company = _norm_space(str(lead.get("company") or "")) |
| contact = _norm_space(str(lead.get("contact") or "")) |
| role = _norm_space(str(lead.get("role") or "")) |
| channel = _norm_space(str(lead.get("channel") or "")) |
| product = _norm_space(str(lead.get("product") or "")) |
| stage = _norm_space(str(lead.get("stage") or "")) |
| pain = _norm_space(str(lead.get("pain_points") or "")) |
| budget = _norm_space(str(lead.get("budget") or "")) |
| timeline = _norm_space(str(lead.get("timeline") or "")) |
| notes = _norm_space(str(lead.get("notes") or "")) |
|
|
| parts: List[str] = [] |
| parts.append("生成一份销售线索跟进方案(含补问清单、下一步任务清单、跟进话术:微信/电话/邮件、风险提示),并写进报告。") |
| if company: |
| parts.append(f"公司/组织:{company}") |
| if contact or role: |
| who = contact if contact else "(未提供姓名)" |
| if role: |
| who = f"{who}({role})" |
| parts.append(f"联系人:{who}") |
| if channel: |
| parts.append(f"来源渠道:{channel}") |
| if product: |
| parts.append(f"关注产品:{product}") |
| if stage: |
| parts.append(f"当前阶段:{stage}") |
| if pain: |
| parts.append(f"主要诉求/痛点:{pain}") |
| if budget: |
| parts.append(f"预算:{budget}") |
| if timeline: |
| parts.append(f"期望时间:{timeline}") |
| if notes: |
| parts.append(f"备注:{notes}") |
| return "\n".join(parts) |
|
|
|
|
| def make_plan(goal: str, kb_dir: str, out_report: str) -> List[ToolCall]: |
| plan: List[ToolCall] = [] |
| plan.append(ToolCall(name="search_kb", args={"query": goal, "kb_dir": kb_dir, "top_k": 6})) |
| expr = _extract_math_expr(goal) |
| if expr: |
| plan.append(ToolCall(name="calc", args={"expression": expr})) |
| plan.append(ToolCall(name="write_report", args={"goal": goal, "out_report": out_report})) |
| return plan |
|
|
|
|
| def _format_faq_items(snippets: List[Dict[str, Any]], n: int) -> List[str]: |
| pool: List[str] = [] |
| for s in snippets: |
| t = str(s.get("text", "")).strip() |
| t = re.sub(r"\s+", " ", t) |
| if len(t) >= 12: |
| pool.append(t) |
| if not pool: |
| pool = [ |
| "我们提供从获客到转化的自动化流程编排能力,减少重复操作。", |
| "支持接入常见广告与社媒渠道,并统一沉淀线索与素材。", |
| "关键数据可追踪,可用规则触发通知与任务分配。", |
| "提供基础的权限与审计,方便多人协作与风控。", |
| "支持将常见问题与业务规范整理进知识库,供智能体检索。", |
| ] |
| items: List[str] = [] |
| i = 0 |
| while len(items) < n: |
| items.append(pool[i % len(pool)]) |
| i += 1 |
| return items |
|
|
|
|
| def _render_report(goal: str, snippets: List[Dict[str, Any]], faq_count: Optional[int], math_expr: Optional[str], math_result: Optional[str]) -> str: |
| lines: List[str] = [] |
| lines.append("# 智能体生成报告") |
| lines.append("") |
| lines.append("## 目标") |
| lines.append(goal.strip()) |
| lines.append("") |
| if snippets: |
| lines.append("## 知识库检索摘要") |
| for i, s in enumerate(snippets[:4], start=1): |
| src = os.path.basename(str(s.get("source", ""))) |
| txt = str(s.get("text", "")).strip().replace("\n", " ") |
| txt = re.sub(r"\s+", " ", txt) |
| lines.append(f"- 片段{i}({src}):{txt}") |
| lines.append("") |
| if faq_count: |
| lines.append(f"## FAQ({faq_count} 条)") |
| items = _format_faq_items(snippets, faq_count) |
| for it in items: |
| lines.append(f"- {it}") |
| lines.append("") |
| if math_expr: |
| lines.append("## 计算结果") |
| if math_result is None: |
| lines.append(f"- 表达式:{math_expr}") |
| lines.append("- 结果:计算失败") |
| else: |
| lines.append(f"- 表达式:{math_expr}") |
| lines.append(f"- 结果:{math_result}") |
| lines.append("") |
| lines.append("## 结论") |
| lines.append("已基于本地知识库完成检索,并生成可复核的报告工件。") |
| lines.append("") |
| return "\n".join(lines) |
|
|
|
|
| def _lead_sections(goal: str, snippets: List[Dict[str, Any]]) -> Dict[str, Any]: |
| lines = goal.splitlines() |
| facts: Dict[str, str] = {} |
| for ln in lines: |
| if ":" in ln: |
| k, v = ln.split(":", 1) |
| k = _norm_space(k) |
| v = _norm_space(v) |
| if k and v: |
| facts[k] = v |
|
|
| company = facts.get("公司/组织", "") |
| product = facts.get("关注产品", "") |
| stage = facts.get("当前阶段", "") |
| pain = facts.get("主要诉求/痛点", "") |
| budget = facts.get("预算", "") |
| timeline = facts.get("期望时间", "") |
|
|
| summary = ";".join([p for p in [company, product, stage, pain] if p]) or "已记录线索信息。" |
|
|
| questions: List[str] = [ |
| "当前目标是什么(增长/获客/转化/复购/效率/风控)?优先级如何排序?", |
| "现有流程里最耗时或最容易丢信息的环节是哪一步?", |
| "线索来源结构与规模(渠道、日均量、转化率、客单价)大概是多少?", |
| "团队协作方式(谁负责、如何交接、是否需要权限与审计)?", |
| "期望在多长周期内看到可量化改进?评估指标是什么?", |
| ] |
| if budget: |
| questions.append("预算口径(一次性/按月/按量)与采购流程(招采/合同/付款节点)?") |
| if timeline: |
| questions.append("上线时间点是否有外部约束(活动/发布/投放节奏)?") |
|
|
| tasks: List[Dict[str, str]] = [ |
| {"task": "补齐关键字段:目标、现状流程、线索规模、关键指标", "owner": "你", "due": "今天"}, |
| {"task": "确认决策链路:需求方/使用方/审批方/预算方", "owner": "你", "due": "今天"}, |
| {"task": "整理 3 个典型线索样本与跟进记录(脱敏)", "owner": "客户", "due": "本周"}, |
| {"task": "对齐一版 SOP:触达→跟进→转化→复盘的关键动作与责任人", "owner": "你", "due": "本周"}, |
| {"task": "输出落地方案:自动化触发点、权限与审计、指标口径", "owner": "你", "due": "本周"}, |
| {"task": "安排演示/试用:用样本数据跑一遍闭环", "owner": "你", "due": "下周"}, |
| ] |
|
|
| wechat = "我先基于你们当前的线索流程整理一版“可落地的跟进方案+自动化触发点”,你看方便补充下:目标/线索规模/当前痛点三项吗?补齐后我今天给你一版可执行清单。" |
| phone = "你好,我是这边负责增长流程自动化方案的。想快速了解下:你们目前线索从哪个渠道进来、谁负责跟进、最容易卡在哪一步?我这边把方案做成你们团队可直接执行的清单。" |
| email = "\n".join( |
| [ |
| "主题:线索跟进闭环方案梳理(补问清单 + 可执行任务)", |
| "", |
| "你好,", |
| "", |
| "我已根据当前线索信息先拟了一版跟进方案(含补问清单、下一步任务、话术与风险点)。", |
| "为保证方案能直接落地,麻烦补充 3 点:目标/线索规模/当前流程卡点(可简单一句话)。", |
| "", |
| "我收到后会在当天给出:", |
| "1) 可执行的任务清单(负责人/截止时间)", |
| "2) 自动化触发点建议", |
| "3) 指标口径与复盘方式", |
| "", |
| "谢谢", |
| ] |
| ) |
|
|
| risks = [ |
| "对外触达需避免夸大承诺,话术以“可量化目标/可验证步骤”为准。", |
| "涉及个人信息与线索数据时,需明确数据来源与留存周期,按权限最小化原则访问。", |
| "对接渠道 API/表单时,先做字段映射与去重规则,避免重复触达与口径混乱。", |
| ] |
|
|
| cited: List[str] = [] |
| for s in snippets[:3]: |
| src = os.path.basename(str(s.get("source", ""))) |
| txt = _norm_space(str(s.get("text", "")).replace("\n", " ")) |
| if txt: |
| cited.append(f"{src}:{txt}") |
|
|
| return { |
| "summary": summary, |
| "questions": questions[:7], |
| "tasks": tasks, |
| "scripts": {"wechat": wechat, "phone": phone, "email": email}, |
| "risks": risks, |
| "cited": cited, |
| } |
|
|
|
|
| def _render_lead_report(goal: str, snippets: List[Dict[str, Any]]) -> Tuple[str, Dict[str, Any]]: |
| data = _lead_sections(goal=goal, snippets=snippets) |
| lines: List[str] = [] |
| lines.append("# 线索跟进方案") |
| lines.append("") |
| lines.append("## 线索摘要") |
| lines.append(data["summary"]) |
| lines.append("") |
| lines.append("## 需要补问(建议按顺序)") |
| for q in data["questions"]: |
| lines.append(f"- {q}") |
| lines.append("") |
| lines.append("## 下一步任务清单") |
| for t in data["tasks"]: |
| lines.append(f"- {t['task']}(负责人:{t['owner']};截止:{t['due']})") |
| lines.append("") |
| lines.append("## 跟进话术") |
| lines.append("### 微信/IM") |
| lines.append(data["scripts"]["wechat"]) |
| lines.append("") |
| lines.append("### 电话开场") |
| lines.append(data["scripts"]["phone"]) |
| lines.append("") |
| lines.append("### 邮件") |
| lines.append(data["scripts"]["email"]) |
| lines.append("") |
| lines.append("## 风险提示") |
| for r in data["risks"]: |
| lines.append(f"- {r}") |
| lines.append("") |
| if data["cited"]: |
| lines.append("## 参考知识片段") |
| for c in data["cited"]: |
| lines.append(f"- {c}") |
| lines.append("") |
| lines.append("## 原始目标") |
| lines.append(goal.strip()) |
| lines.append("") |
| return "\n".join(lines), data |
|
|
|
|
|
|
| def _needs_report(goal: str) -> bool: |
| g = goal.lower() |
| if "报告" in goal or "写进报告" in goal or "写入报告" in goal: |
| return True |
| if "report" in g: |
| return True |
| return True |
|
|
|
|
| def check_closure(goal: str, out_report: str, mode: str) -> Tuple[bool, str]: |
| p = Path(out_report) |
| if _needs_report(goal): |
| if not p.exists(): |
| return False, "报告文件不存在" |
| txt = p.read_text(encoding="utf-8") |
| if mode == "lead_followup": |
| if "线索跟进方案" not in txt: |
| return False, "报告缺少标题" |
| if "需要补问" not in txt or "下一步任务清单" not in txt or "跟进话术" not in txt: |
| return False, "报告结构不完整" |
| if len(re.findall(r"^\-\s+", txt, flags=re.MULTILINE)) < 8: |
| return False, "清单项不足" |
| else: |
| if "智能体生成报告" not in txt: |
| return False, "报告缺少标题" |
| faq_count = _extract_faq_count(goal) |
| if faq_count: |
| if len(re.findall(r"^\-\s+", txt, flags=re.MULTILINE)) < faq_count: |
| return False, "FAQ 条数不足" |
| expr = _extract_math_expr(goal) |
| if expr: |
| if expr not in txt: |
| return False, "报告未包含表达式" |
| calc_res = tool_calc(expr) |
| if calc_res.ok and str(calc_res.output) not in txt: |
| return False, "报告未包含计算结果" |
| return True, "ok" |
|
|
|
|
| def run_agent(goal: str, kb_dir: str, out_report: str, max_rounds: int = 3, mode: str = "general") -> Dict[str, Any]: |
| round_logs: List[Dict[str, Any]] = [] |
| state: Dict[str, Any] = {"snippets": [], "math_expr": None, "math_result": None, "artifacts": [], "structured": None} |
|
|
| for r in range(1, max_rounds + 1): |
| plan = make_plan(goal=goal, kb_dir=kb_dir, out_report=out_report) |
| step_logs: List[Dict[str, Any]] = [] |
|
|
| for step in plan: |
| if step.name == "search_kb": |
| res = tool_search_kb(**step.args) |
| if res.ok: |
| try: |
| state["snippets"] = json.loads(res.output) |
| except Exception: |
| state["snippets"] = [] |
| step_logs.append({"tool": step.name, "ok": res.ok, "output": res.output[:800]}) |
| continue |
|
|
| if step.name == "calc": |
| state["math_expr"] = step.args.get("expression") |
| res = tool_calc(**step.args) |
| state["math_result"] = res.output if res.ok else None |
| step_logs.append({"tool": step.name, "ok": res.ok, "output": res.output}) |
| continue |
|
|
| if step.name == "write_report": |
| faq_count = _extract_faq_count(goal) |
| if mode == "lead_followup": |
| report, structured = _render_lead_report(goal=goal, snippets=state.get("snippets") or []) |
| state["structured"] = structured |
| else: |
| report = _render_report( |
| goal=goal, |
| snippets=state.get("snippets") or [], |
| faq_count=faq_count, |
| math_expr=state.get("math_expr"), |
| math_result=state.get("math_result"), |
| ) |
| res = tool_write_text(path=out_report, text=report) |
| state["artifacts"].extend(res.artifacts) |
| step_logs.append({"tool": step.name, "ok": res.ok, "output": res.output}) |
| continue |
|
|
| step_logs.append({"tool": step.name, "ok": False, "output": "unknown tool"}) |
|
|
| ok, reason = check_closure(goal=goal, out_report=out_report, mode=mode) |
| round_logs.append({"round": r, "plan": [c.__dict__ for c in plan], "steps": step_logs, "check": {"ok": ok, "reason": reason}}) |
| if ok: |
| break |
|
|
| final = [] |
| final.append("已完成闭环执行:规划→工具执行→自检→产出。") |
| final.append(f"报告工件:{out_report}") |
| expr = state.get("math_expr") |
| if expr: |
| final.append(f"计算:{expr} = {state.get('math_result')}") |
| faq_count = _extract_faq_count(goal) |
| if faq_count: |
| final.append(f"FAQ:{faq_count} 条(见报告)") |
|
|
| return { |
| "goal": goal, |
| "out_report": out_report, |
| "round_logs": round_logs, |
| "final_answer": "\n".join(final), |
| "artifacts": state.get("artifacts"), |
| "structured": state.get("structured"), |
| "mode": mode, |
| } |
|
|
|
|
| def run_agent_mode(mode: str, payload: Dict[str, Any], kb_dir: str, out_report: str, max_rounds: int = 3) -> Dict[str, Any]: |
| mode = (mode or "general").strip() |
| if mode == "lead_followup": |
| lead = payload.get("lead") or {} |
| goal = _lead_to_goal(lead if isinstance(lead, dict) else {}) |
| result = run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=max_rounds, mode=mode) |
| return result |
| goal = str(payload.get("goal") or "").strip() |
| return run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=max_rounds, mode="general") |
|
|
|
|
| def _print_round_logs(round_logs: List[Dict[str, Any]]) -> None: |
| for r in round_logs: |
| print(f"\n=== Round {r['round']} ===") |
| print("Plan:") |
| for s in r["plan"]: |
| print(f"- {s['name']} {json.dumps(s['args'], ensure_ascii=False)}") |
| print("Steps:") |
| for s in r["steps"]: |
| out = s["output"] |
| out = out.replace("\n", " ") |
| if len(out) > 180: |
| out = out[:180] + "..." |
| print(f"- {s['tool']}: ok={s['ok']} output={out}") |
| print(f"Check: ok={r['check']['ok']} reason={r['check']['reason']}") |
|
|
|
|
| def cmd_demo() -> int: |
| here = Path(__file__).resolve().parent |
| goal = "基于知识库,生成一份 5 条 FAQ,并计算 17*23 的结果写进报告" |
| out_report = str(here / "out" / "demo_report.md") |
| kb_dir = str(here / "kb") |
| result = run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=3) |
| _print_round_logs(result["round_logs"]) |
| print("\n=== Final ===") |
| print(result["final_answer"]) |
| try: |
| txt = (here / "out" / "demo_report.md").read_text(encoding="utf-8") |
| head = "\n".join(txt.splitlines()[:22]) |
| print("\n=== Report Preview (Top) ===") |
| print(head) |
| except Exception: |
| pass |
| return 0 |
|
|
|
|
| def cmd_run(goal: str) -> int: |
| here = Path(__file__).resolve().parent |
| out_report = str(here / "out" / "demo_report.md") |
| kb_dir = str(here / "kb") |
| result = run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=3) |
| _print_round_logs(result["round_logs"]) |
| print("\n=== Final ===") |
| print(result["final_answer"]) |
| return 0 |
|
|
|
|
| def main(argv: List[str]) -> int: |
| p = argparse.ArgumentParser(prog="agent.py") |
| sub = p.add_subparsers(dest="cmd", required=True) |
|
|
| sub.add_parser("demo") |
|
|
| p_run = sub.add_parser("run") |
| p_run.add_argument("--goal", required=True) |
|
|
| args = p.parse_args(argv) |
| if args.cmd == "demo": |
| return cmd_demo() |
| if args.cmd == "run": |
| return cmd_run(goal=args.goal) |
| return 2 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main(sys.argv[1:])) |
|
|