import argparse import ast import json import os import re import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple @dataclass class ToolCall: name: str args: Dict[str, Any] @dataclass class ToolResult: ok: bool output: str artifacts: List[str] def _tokenize(s: str) -> List[str]: s = s.lower() s = re.sub(r"[^0-9a-z\u4e00-\u9fff]+", " ", s) parts = [p.strip() for p in s.split() if p.strip()] return parts def tool_search_kb(query: str, kb_dir: str, top_k: int = 4) -> ToolResult: base = Path(kb_dir) if not base.exists(): return ToolResult(ok=True, output="[]", artifacts=[]) q_tokens = _tokenize(query) if not q_tokens: return ToolResult(ok=True, output="[]", artifacts=[]) hits: List[Tuple[int, str, str]] = [] for p in sorted(base.glob("**/*.md")): try: text = p.read_text(encoding="utf-8") except Exception: continue chunks = [c.strip() for c in re.split(r"\n\s*\n+", text) if c.strip()] for c in chunks: c_tokens = _tokenize(c) score = sum(1 for t in q_tokens if t in c_tokens) if score <= 0: continue hits.append((score, str(p), c)) hits.sort(key=lambda x: (-x[0], x[1])) picked = hits[: max(0, int(top_k))] out = [{"score": s, "source": src, "text": txt} for (s, src, txt) in picked] return ToolResult(ok=True, output=json.dumps(out, ensure_ascii=False, indent=2), artifacts=[]) class _SafeEval(ast.NodeVisitor): def visit(self, node): return super().visit(node) def generic_visit(self, node): raise ValueError(f"unsupported: {node.__class__.__name__}") def visit_Expression(self, node: ast.Expression): return self.visit(node.body) def visit_BinOp(self, node: ast.BinOp): left = self.visit(node.left) right = self.visit(node.right) if isinstance(node.op, ast.Add): return left + right if isinstance(node.op, ast.Sub): return left - right if isinstance(node.op, ast.Mult): return left * right if isinstance(node.op, ast.Div): return left / right if isinstance(node.op, ast.FloorDiv): return left // right if isinstance(node.op, ast.Mod): return left % right if isinstance(node.op, ast.Pow): return left ** right raise ValueError("unsupported operator") def visit_UnaryOp(self, node: ast.UnaryOp): v = self.visit(node.operand) if isinstance(node.op, ast.UAdd): return +v if isinstance(node.op, ast.USub): return -v raise ValueError("unsupported unary operator") def visit_Constant(self, node: ast.Constant): if isinstance(node.value, (int, float)): return node.value raise ValueError("only numbers allowed") def tool_calc(expression: str) -> ToolResult: expr = expression.strip() if not re.fullmatch(r"[0-9\.\s\+\-\*\/\(\)\%\^]+", expr): return ToolResult(ok=False, output="表达式包含不允许的字符", artifacts=[]) expr = expr.replace("^", "**") try: tree = ast.parse(expr, mode="eval") value = _SafeEval().visit(tree) except Exception as e: return ToolResult(ok=False, output=f"计算失败: {e}", artifacts=[]) return ToolResult(ok=True, output=str(value), artifacts=[]) def tool_write_text(path: str, text: str) -> ToolResult: p = Path(path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(text, encoding="utf-8") return ToolResult(ok=True, output=f"wrote: {p}", artifacts=[str(p)]) def tool_read_text(path: str) -> ToolResult: p = Path(path) if not p.exists(): return ToolResult(ok=False, output="not found", artifacts=[]) return ToolResult(ok=True, output=p.read_text(encoding="utf-8"), artifacts=[]) def _extract_math_expr(goal: str) -> Optional[str]: m = re.search(r"(\d[\d\s\+\-\*\/\(\)\%\^\.]*\d)", goal) if not m: return None expr = m.group(1) expr = re.sub(r"\s+", "", expr) if len(expr) > 120: return None if not re.search(r"[\+\-\*\/\%\^]", expr): return None return expr def _extract_faq_count(goal: str) -> Optional[int]: m = re.search(r"(\d+)\s*条\s*FAQ", goal, flags=re.IGNORECASE) if m: return max(1, min(20, int(m.group(1)))) m = re.search(r"(\d+)\s*条", goal) if m and ("faq" in goal.lower() or "常见问题" in goal): return max(1, min(20, int(m.group(1)))) if "faq" in goal.lower() or "常见问题" in goal: return 5 return None def _norm_space(s: str) -> str: return re.sub(r"\s+", " ", (s or "").strip()) def _lead_to_goal(lead: Dict[str, Any]) -> str: company = _norm_space(str(lead.get("company") or "")) contact = _norm_space(str(lead.get("contact") or "")) role = _norm_space(str(lead.get("role") or "")) channel = _norm_space(str(lead.get("channel") or "")) product = _norm_space(str(lead.get("product") or "")) stage = _norm_space(str(lead.get("stage") or "")) pain = _norm_space(str(lead.get("pain_points") or "")) budget = _norm_space(str(lead.get("budget") or "")) timeline = _norm_space(str(lead.get("timeline") or "")) notes = _norm_space(str(lead.get("notes") or "")) parts: List[str] = [] parts.append("生成一份销售线索跟进方案(含补问清单、下一步任务清单、跟进话术:微信/电话/邮件、风险提示),并写进报告。") if company: parts.append(f"公司/组织:{company}") if contact or role: who = contact if contact else "(未提供姓名)" if role: who = f"{who}({role})" parts.append(f"联系人:{who}") if channel: parts.append(f"来源渠道:{channel}") if product: parts.append(f"关注产品:{product}") if stage: parts.append(f"当前阶段:{stage}") if pain: parts.append(f"主要诉求/痛点:{pain}") if budget: parts.append(f"预算:{budget}") if timeline: parts.append(f"期望时间:{timeline}") if notes: parts.append(f"备注:{notes}") return "\n".join(parts) def make_plan(goal: str, kb_dir: str, out_report: str) -> List[ToolCall]: plan: List[ToolCall] = [] plan.append(ToolCall(name="search_kb", args={"query": goal, "kb_dir": kb_dir, "top_k": 6})) expr = _extract_math_expr(goal) if expr: plan.append(ToolCall(name="calc", args={"expression": expr})) plan.append(ToolCall(name="write_report", args={"goal": goal, "out_report": out_report})) return plan def _format_faq_items(snippets: List[Dict[str, Any]], n: int) -> List[str]: pool: List[str] = [] for s in snippets: t = str(s.get("text", "")).strip() t = re.sub(r"\s+", " ", t) if len(t) >= 12: pool.append(t) if not pool: pool = [ "我们提供从获客到转化的自动化流程编排能力,减少重复操作。", "支持接入常见广告与社媒渠道,并统一沉淀线索与素材。", "关键数据可追踪,可用规则触发通知与任务分配。", "提供基础的权限与审计,方便多人协作与风控。", "支持将常见问题与业务规范整理进知识库,供智能体检索。", ] items: List[str] = [] i = 0 while len(items) < n: items.append(pool[i % len(pool)]) i += 1 return items def _render_report(goal: str, snippets: List[Dict[str, Any]], faq_count: Optional[int], math_expr: Optional[str], math_result: Optional[str]) -> str: lines: List[str] = [] lines.append("# 智能体生成报告") lines.append("") lines.append("## 目标") lines.append(goal.strip()) lines.append("") if snippets: lines.append("## 知识库检索摘要") for i, s in enumerate(snippets[:4], start=1): src = os.path.basename(str(s.get("source", ""))) txt = str(s.get("text", "")).strip().replace("\n", " ") txt = re.sub(r"\s+", " ", txt) lines.append(f"- 片段{i}({src}):{txt}") lines.append("") if faq_count: lines.append(f"## FAQ({faq_count} 条)") items = _format_faq_items(snippets, faq_count) for it in items: lines.append(f"- {it}") lines.append("") if math_expr: lines.append("## 计算结果") if math_result is None: lines.append(f"- 表达式:{math_expr}") lines.append("- 结果:计算失败") else: lines.append(f"- 表达式:{math_expr}") lines.append(f"- 结果:{math_result}") lines.append("") lines.append("## 结论") lines.append("已基于本地知识库完成检索,并生成可复核的报告工件。") lines.append("") return "\n".join(lines) def _lead_sections(goal: str, snippets: List[Dict[str, Any]]) -> Dict[str, Any]: lines = goal.splitlines() facts: Dict[str, str] = {} for ln in lines: if ":" in ln: k, v = ln.split(":", 1) k = _norm_space(k) v = _norm_space(v) if k and v: facts[k] = v company = facts.get("公司/组织", "") product = facts.get("关注产品", "") stage = facts.get("当前阶段", "") pain = facts.get("主要诉求/痛点", "") budget = facts.get("预算", "") timeline = facts.get("期望时间", "") summary = ";".join([p for p in [company, product, stage, pain] if p]) or "已记录线索信息。" questions: List[str] = [ "当前目标是什么(增长/获客/转化/复购/效率/风控)?优先级如何排序?", "现有流程里最耗时或最容易丢信息的环节是哪一步?", "线索来源结构与规模(渠道、日均量、转化率、客单价)大概是多少?", "团队协作方式(谁负责、如何交接、是否需要权限与审计)?", "期望在多长周期内看到可量化改进?评估指标是什么?", ] if budget: questions.append("预算口径(一次性/按月/按量)与采购流程(招采/合同/付款节点)?") if timeline: questions.append("上线时间点是否有外部约束(活动/发布/投放节奏)?") tasks: List[Dict[str, str]] = [ {"task": "补齐关键字段:目标、现状流程、线索规模、关键指标", "owner": "你", "due": "今天"}, {"task": "确认决策链路:需求方/使用方/审批方/预算方", "owner": "你", "due": "今天"}, {"task": "整理 3 个典型线索样本与跟进记录(脱敏)", "owner": "客户", "due": "本周"}, {"task": "对齐一版 SOP:触达→跟进→转化→复盘的关键动作与责任人", "owner": "你", "due": "本周"}, {"task": "输出落地方案:自动化触发点、权限与审计、指标口径", "owner": "你", "due": "本周"}, {"task": "安排演示/试用:用样本数据跑一遍闭环", "owner": "你", "due": "下周"}, ] wechat = "我先基于你们当前的线索流程整理一版“可落地的跟进方案+自动化触发点”,你看方便补充下:目标/线索规模/当前痛点三项吗?补齐后我今天给你一版可执行清单。" phone = "你好,我是这边负责增长流程自动化方案的。想快速了解下:你们目前线索从哪个渠道进来、谁负责跟进、最容易卡在哪一步?我这边把方案做成你们团队可直接执行的清单。" email = "\n".join( [ "主题:线索跟进闭环方案梳理(补问清单 + 可执行任务)", "", "你好,", "", "我已根据当前线索信息先拟了一版跟进方案(含补问清单、下一步任务、话术与风险点)。", "为保证方案能直接落地,麻烦补充 3 点:目标/线索规模/当前流程卡点(可简单一句话)。", "", "我收到后会在当天给出:", "1) 可执行的任务清单(负责人/截止时间)", "2) 自动化触发点建议", "3) 指标口径与复盘方式", "", "谢谢", ] ) risks = [ "对外触达需避免夸大承诺,话术以“可量化目标/可验证步骤”为准。", "涉及个人信息与线索数据时,需明确数据来源与留存周期,按权限最小化原则访问。", "对接渠道 API/表单时,先做字段映射与去重规则,避免重复触达与口径混乱。", ] cited: List[str] = [] for s in snippets[:3]: src = os.path.basename(str(s.get("source", ""))) txt = _norm_space(str(s.get("text", "")).replace("\n", " ")) if txt: cited.append(f"{src}:{txt}") return { "summary": summary, "questions": questions[:7], "tasks": tasks, "scripts": {"wechat": wechat, "phone": phone, "email": email}, "risks": risks, "cited": cited, } def _render_lead_report(goal: str, snippets: List[Dict[str, Any]]) -> Tuple[str, Dict[str, Any]]: data = _lead_sections(goal=goal, snippets=snippets) lines: List[str] = [] lines.append("# 线索跟进方案") lines.append("") lines.append("## 线索摘要") lines.append(data["summary"]) lines.append("") lines.append("## 需要补问(建议按顺序)") for q in data["questions"]: lines.append(f"- {q}") lines.append("") lines.append("## 下一步任务清单") for t in data["tasks"]: lines.append(f"- {t['task']}(负责人:{t['owner']};截止:{t['due']})") lines.append("") lines.append("## 跟进话术") lines.append("### 微信/IM") lines.append(data["scripts"]["wechat"]) lines.append("") lines.append("### 电话开场") lines.append(data["scripts"]["phone"]) lines.append("") lines.append("### 邮件") lines.append(data["scripts"]["email"]) lines.append("") lines.append("## 风险提示") for r in data["risks"]: lines.append(f"- {r}") lines.append("") if data["cited"]: lines.append("## 参考知识片段") for c in data["cited"]: lines.append(f"- {c}") lines.append("") lines.append("## 原始目标") lines.append(goal.strip()) lines.append("") return "\n".join(lines), data def _needs_report(goal: str) -> bool: g = goal.lower() if "报告" in goal or "写进报告" in goal or "写入报告" in goal: return True if "report" in g: return True return True def check_closure(goal: str, out_report: str, mode: str) -> Tuple[bool, str]: p = Path(out_report) if _needs_report(goal): if not p.exists(): return False, "报告文件不存在" txt = p.read_text(encoding="utf-8") if mode == "lead_followup": if "线索跟进方案" not in txt: return False, "报告缺少标题" if "需要补问" not in txt or "下一步任务清单" not in txt or "跟进话术" not in txt: return False, "报告结构不完整" if len(re.findall(r"^\-\s+", txt, flags=re.MULTILINE)) < 8: return False, "清单项不足" else: if "智能体生成报告" not in txt: return False, "报告缺少标题" faq_count = _extract_faq_count(goal) if faq_count: if len(re.findall(r"^\-\s+", txt, flags=re.MULTILINE)) < faq_count: return False, "FAQ 条数不足" expr = _extract_math_expr(goal) if expr: if expr not in txt: return False, "报告未包含表达式" calc_res = tool_calc(expr) if calc_res.ok and str(calc_res.output) not in txt: return False, "报告未包含计算结果" return True, "ok" def run_agent(goal: str, kb_dir: str, out_report: str, max_rounds: int = 3, mode: str = "general") -> Dict[str, Any]: round_logs: List[Dict[str, Any]] = [] state: Dict[str, Any] = {"snippets": [], "math_expr": None, "math_result": None, "artifacts": [], "structured": None} for r in range(1, max_rounds + 1): plan = make_plan(goal=goal, kb_dir=kb_dir, out_report=out_report) step_logs: List[Dict[str, Any]] = [] for step in plan: if step.name == "search_kb": res = tool_search_kb(**step.args) if res.ok: try: state["snippets"] = json.loads(res.output) except Exception: state["snippets"] = [] step_logs.append({"tool": step.name, "ok": res.ok, "output": res.output[:800]}) continue if step.name == "calc": state["math_expr"] = step.args.get("expression") res = tool_calc(**step.args) state["math_result"] = res.output if res.ok else None step_logs.append({"tool": step.name, "ok": res.ok, "output": res.output}) continue if step.name == "write_report": faq_count = _extract_faq_count(goal) if mode == "lead_followup": report, structured = _render_lead_report(goal=goal, snippets=state.get("snippets") or []) state["structured"] = structured else: report = _render_report( goal=goal, snippets=state.get("snippets") or [], faq_count=faq_count, math_expr=state.get("math_expr"), math_result=state.get("math_result"), ) res = tool_write_text(path=out_report, text=report) state["artifacts"].extend(res.artifacts) step_logs.append({"tool": step.name, "ok": res.ok, "output": res.output}) continue step_logs.append({"tool": step.name, "ok": False, "output": "unknown tool"}) ok, reason = check_closure(goal=goal, out_report=out_report, mode=mode) round_logs.append({"round": r, "plan": [c.__dict__ for c in plan], "steps": step_logs, "check": {"ok": ok, "reason": reason}}) if ok: break final = [] final.append("已完成闭环执行:规划→工具执行→自检→产出。") final.append(f"报告工件:{out_report}") expr = state.get("math_expr") if expr: final.append(f"计算:{expr} = {state.get('math_result')}") faq_count = _extract_faq_count(goal) if faq_count: final.append(f"FAQ:{faq_count} 条(见报告)") return { "goal": goal, "out_report": out_report, "round_logs": round_logs, "final_answer": "\n".join(final), "artifacts": state.get("artifacts"), "structured": state.get("structured"), "mode": mode, } def run_agent_mode(mode: str, payload: Dict[str, Any], kb_dir: str, out_report: str, max_rounds: int = 3) -> Dict[str, Any]: mode = (mode or "general").strip() if mode == "lead_followup": lead = payload.get("lead") or {} goal = _lead_to_goal(lead if isinstance(lead, dict) else {}) result = run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=max_rounds, mode=mode) return result goal = str(payload.get("goal") or "").strip() return run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=max_rounds, mode="general") def _print_round_logs(round_logs: List[Dict[str, Any]]) -> None: for r in round_logs: print(f"\n=== Round {r['round']} ===") print("Plan:") for s in r["plan"]: print(f"- {s['name']} {json.dumps(s['args'], ensure_ascii=False)}") print("Steps:") for s in r["steps"]: out = s["output"] out = out.replace("\n", " ") if len(out) > 180: out = out[:180] + "..." print(f"- {s['tool']}: ok={s['ok']} output={out}") print(f"Check: ok={r['check']['ok']} reason={r['check']['reason']}") def cmd_demo() -> int: here = Path(__file__).resolve().parent goal = "基于知识库,生成一份 5 条 FAQ,并计算 17*23 的结果写进报告" out_report = str(here / "out" / "demo_report.md") kb_dir = str(here / "kb") result = run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=3) _print_round_logs(result["round_logs"]) print("\n=== Final ===") print(result["final_answer"]) try: txt = (here / "out" / "demo_report.md").read_text(encoding="utf-8") head = "\n".join(txt.splitlines()[:22]) print("\n=== Report Preview (Top) ===") print(head) except Exception: pass return 0 def cmd_run(goal: str) -> int: here = Path(__file__).resolve().parent out_report = str(here / "out" / "demo_report.md") kb_dir = str(here / "kb") result = run_agent(goal=goal, kb_dir=kb_dir, out_report=out_report, max_rounds=3) _print_round_logs(result["round_logs"]) print("\n=== Final ===") print(result["final_answer"]) return 0 def main(argv: List[str]) -> int: p = argparse.ArgumentParser(prog="agent.py") sub = p.add_subparsers(dest="cmd", required=True) sub.add_parser("demo") p_run = sub.add_parser("run") p_run.add_argument("--goal", required=True) args = p.parse_args(argv) if args.cmd == "demo": return cmd_demo() if args.cmd == "run": return cmd_run(goal=args.goal) return 2 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))