fenghantong
Deploy RouteOpt Agent
ffda755
Raw
History Blame Contribute Delete
18.2 kB
from __future__ import annotations
import json
from typing import Any, Callable
from .geo_tools import build_route_matrix, geocode_places
from .llm_client import LLMError, chat_completion, extract_json_from_text, extract_tool_arguments
from .models import AgentResult, GeoPoint, Objective, RouteMatrix, RouteSolution, RouteTask, ToolEvent
from .parsing import dedupe_preserve_order, heuristic_extract_task, normalize_place_lines
from .report import generate_pdf_report
from .solver import format_km, format_minutes, solve_route
from .viz import build_route_svg
TASK_TOOL_SCHEMA: dict[str, Any] = {
"type": "function",
"function": {
"name": "submit_route_task",
"description": "Extract a route optimization task from user text and structured hints.",
"parameters": {
"type": "object",
"properties": {
"start_place": {
"type": "string",
"description": "The route start place. Keep it specific enough for geocoding.",
},
"destination_places": {
"type": "array",
"items": {"type": "string"},
"description": "Places that must be visited after the start.",
},
"objective": {
"type": "string",
"enum": ["time", "distance"],
"description": "Optimization objective: time or distance.",
},
"return_to_start": {
"type": "boolean",
"description": "Whether the route should return to the start place.",
},
"fixed_end_place": {
"type": "string",
"description": "Optional fixed final destination when not returning to start.",
},
"constraints": {
"type": "array",
"items": {"type": "string"},
"description": "Soft constraints or notes mentioned by the user.",
},
},
"required": ["start_place", "destination_places", "objective", "return_to_start"],
},
},
}
class RouteOptAgent:
def __init__(self) -> None:
self.trace: list[ToolEvent] = []
def run(
self,
raw_request: str,
start_hint: str = "",
destinations_hint: str = "",
objective_hint: str = "time",
return_to_start_hint: bool = True,
fixed_end_hint: str = "",
city_hint: str = "上海,中国",
use_llm: bool = True,
) -> AgentResult:
self.trace = []
warnings: list[str] = []
task = self._extract_task(
raw_request=raw_request,
start_hint=start_hint,
destinations_hint=destinations_hint,
objective_hint=objective_hint,
return_to_start_hint=return_to_start_hint,
fixed_end_hint=fixed_end_hint,
use_llm=use_llm,
warnings=warnings,
)
self._validate_task(task, warnings)
all_places = [task.start_place] + task.destination_places
points = self._record_tool(
"geocode_places",
{"places": all_places, "city_hint": city_hint},
lambda: geocode_places(all_places, city_hint),
lambda result: f"解析 {len(result)} 个地点:{', '.join(point.name for point in result)}",
)
matrix = self._record_tool(
"build_route_matrix",
{"points": [point.name for point in points]},
lambda: build_route_matrix(points),
lambda result: f"获得 {len(result.points)}x{len(result.points)} 距离/时间矩阵,来源:{result.source}",
)
solution = self._record_tool(
"solve_route",
{
"objective": task.objective,
"return_to_start": task.return_to_start,
"fixed_end_place": task.fixed_end_place,
},
lambda: solve_route(matrix, task.objective, task.return_to_start, task.fixed_end_place),
lambda result: f"路线:{' -> '.join(result.route_names)};距离 {format_km(result.total_distance_meters)};时间 {format_minutes(result.total_duration_seconds)}",
)
summary = self._compose_summary(task, points, matrix, solution, use_llm, warnings)
route_svg = build_route_svg(points, solution.route_indices)
pdf_path = self._record_tool(
"generate_pdf_report",
{"route": solution.route_names},
lambda: generate_pdf_report(task, points, solution, self.trace, summary),
lambda result: f"PDF 已生成:{result}",
)
return AgentResult(
task=task,
points=points,
matrix=matrix,
solution=solution,
summary_markdown=summary,
trace=self.trace,
pdf_path=pdf_path,
route_svg=route_svg,
warnings=warnings,
)
def _extract_task(
self,
raw_request: str,
start_hint: str,
destinations_hint: str,
objective_hint: str,
return_to_start_hint: bool,
fixed_end_hint: str,
use_llm: bool,
warnings: list[str],
) -> RouteTask:
if use_llm:
try:
task = self._extract_task_with_llm(
raw_request,
start_hint,
destinations_hint,
objective_hint,
return_to_start_hint,
fixed_end_hint,
)
self._append_event(
"LLM tool_call: submit_route_task",
{
"raw_request": raw_request,
"start_hint": start_hint,
"destinations_hint": destinations_hint,
},
"ok",
f"抽取任务:起点={task.start_place};目的地={len(task.destination_places)} 个;目标={task.objective}",
)
return task
except Exception as exc:
warnings.append(f"LLM 工具调用抽取失败,已切换本地解析:{exc}")
self._append_event(
"LLM tool_call: submit_route_task",
{"raw_request": raw_request},
"fallback",
str(exc),
)
task = heuristic_extract_task(
raw_request,
start_hint=start_hint,
destinations_hint=destinations_hint,
objective_hint=objective_hint,
return_to_start_hint=return_to_start_hint,
fixed_end_hint=fixed_end_hint,
)
self._append_event(
"local_parse_route_task",
{
"raw_request": raw_request,
"start_hint": start_hint,
"destinations_hint": destinations_hint,
},
"ok",
f"本地解析任务:起点={task.start_place};目的地={len(task.destination_places)} 个;目标={task.objective}",
)
return task
def _extract_task_with_llm(
self,
raw_request: str,
start_hint: str,
destinations_hint: str,
objective_hint: str,
return_to_start_hint: bool,
fixed_end_hint: str,
) -> RouteTask:
messages = [
{
"role": "system",
"content": (
"You are a route optimization agent controller. "
"Use the submit_route_task function to return one structured task. "
"Prefer structured hints over ambiguous natural language. "
"Keep Chinese place names specific for geocoding. "
"If the user says 最快/时间最短 use objective=time; if 距离/少走路 use objective=distance."
),
},
{
"role": "user",
"content": json.dumps(
{
"raw_request": raw_request,
"start_hint": start_hint,
"destinations_hint": normalize_place_lines(destinations_hint),
"objective_hint": objective_hint,
"return_to_start_hint": return_to_start_hint,
"fixed_end_hint": fixed_end_hint,
},
ensure_ascii=False,
),
},
]
message = chat_completion(messages, tools=[TASK_TOOL_SCHEMA], temperature=0.1)
args = extract_tool_arguments(message, "submit_route_task")
if args is None:
args = extract_json_from_text(message.get("content") or "")
if args is None:
raise LLMError("模型没有返回 submit_route_task 工具参数。")
return self._task_from_arguments(
args,
raw_request,
start_hint,
destinations_hint,
objective_hint,
return_to_start_hint,
fixed_end_hint,
)
def _task_from_arguments(
self,
args: dict[str, Any],
raw_request: str,
start_hint: str,
destinations_hint: str,
objective_hint: str,
return_to_start_hint: bool,
fixed_end_hint: str,
) -> RouteTask:
local_task = heuristic_extract_task(
raw_request,
start_hint=start_hint,
destinations_hint=destinations_hint,
objective_hint=objective_hint,
return_to_start_hint=return_to_start_hint,
fixed_end_hint=fixed_end_hint,
)
destinations = args.get("destination_places") or local_task.destination_places
if isinstance(destinations, str):
destinations = normalize_place_lines(destinations)
destinations = dedupe_preserve_order([str(item).strip() for item in destinations if str(item).strip()])
start = start_hint.strip() or str(args.get("start_place") or local_task.start_place).strip()
objective = normalize_objective(str(args.get("objective") or local_task.objective))
fixed_end = fixed_end_hint.strip() or str(args.get("fixed_end_place") or local_task.fixed_end_place or "").strip()
return RouteTask(
raw_request=raw_request,
start_place=start,
destination_places=destinations,
objective=objective,
return_to_start=return_to_start_hint if return_to_start_hint is not None else bool(args.get("return_to_start", local_task.return_to_start)),
fixed_end_place=fixed_end or None,
constraints=[str(item) for item in args.get("constraints") or local_task.constraints],
)
def _validate_task(self, task: RouteTask, warnings: list[str]) -> None:
task.start_place = task.start_place.strip()
task.destination_places = [place.strip() for place in task.destination_places if place.strip()]
original_count = len(task.destination_places)
task.destination_places = dedupe_preserve_order(task.destination_places)
if len(task.destination_places) < original_count:
warnings.append("目的地中存在重复项,系统已自动去重。")
if not task.start_place:
raise ValueError("缺少起点。请在起点输入框填写一个地点,或在自然语言需求中写明“从哪里出发”。")
if not task.destination_places:
raise ValueError("缺少目的地。请至少填写 1 个目的地。建议每行写一个地点,例如:人民广场、外滩、陆家嘴。")
start_key = task.start_place.lower()
without_start = [
place
for place in task.destination_places
if place.lower() != start_key and start_key not in place.lower() and place.lower() not in start_key
]
if len(without_start) < len(task.destination_places):
warnings.append("目的地中包含起点,系统已自动移除该重复访问点。")
task.destination_places = without_start
if not task.destination_places:
raise ValueError("目的地里只有起点本身。请至少增加一个不同于起点的目的地。")
if len(task.destination_places) > 10:
raise ValueError(
"目的地数量过多。当前演示版最多支持 10 个目的地;作业视频推荐 3 到 8 个,"
"这样 Held-Karp 精确算法和公开路线 API 都更稳定。"
)
if task.fixed_end_place and not task.return_to_start:
fixed = task.fixed_end_place.strip().lower()
if all(fixed not in item.lower() and item.lower() not in fixed for item in task.destination_places):
task.destination_places.append(task.fixed_end_place)
def _compose_summary(
self,
task: RouteTask,
points: list[GeoPoint],
matrix: RouteMatrix,
solution: RouteSolution,
use_llm: bool,
warnings: list[str],
) -> str:
if use_llm:
try:
messages = [
{
"role": "system",
"content": (
"你是算法课程作业里的路线优化智能体。"
"请用中文写一段简洁但不敷衍的求解说明,强调:"
"大模型负责理解和解释,工具负责地理编码/路径矩阵,算法负责最优化。"
"不要编造没有出现的数据。"
),
},
{
"role": "user",
"content": json.dumps(
{
"task": task.__dict__,
"points": [point.__dict__ for point in points],
"matrix_source": matrix.source,
"solution": {
"route": solution.route_names,
"total_distance_km": round(solution.total_distance_meters / 1000, 2),
"total_minutes": round(solution.total_duration_seconds / 60, 1),
"algorithm": solution.algorithm,
},
"tool_trace": [event.__dict__ for event in self.trace],
"warnings": warnings,
},
ensure_ascii=False,
),
},
]
message = chat_completion(messages, temperature=0.35)
content = (message.get("content") or "").strip()
if content:
self._append_event(
"LLM compose_summary",
{"route": solution.route_names},
"ok",
"已生成中文解释总结。",
)
return content
except Exception as exc:
warnings.append(f"LLM 总结失败,已切换本地报告模板:{exc}")
self._append_event("LLM compose_summary", {}, "fallback", str(exc))
return deterministic_summary(task, matrix, solution, warnings)
def _record_tool(
self,
name: str,
arguments: dict[str, Any],
func: Callable[[], Any],
summarize: Callable[[Any], str],
) -> Any:
try:
result = func()
self._append_event(name, arguments, "ok", summarize(result))
return result
except Exception as exc:
self._append_event(name, arguments, "error", str(exc))
raise
def _append_event(self, tool: str, arguments: dict[str, Any], status: str, result: str) -> None:
self.trace.append(
ToolEvent(
step=len(self.trace) + 1,
tool=tool,
arguments=compact_arguments(arguments),
status=status,
result=result,
)
)
def normalize_objective(value: str) -> Objective:
value = value.lower().strip()
if value in {"distance", "最短距离", "距离"}:
return "distance"
return "time"
def compact_arguments(arguments: dict[str, Any]) -> dict[str, Any]:
compacted: dict[str, Any] = {}
for key, value in arguments.items():
if isinstance(value, list) and len(value) > 8:
compacted[key] = value[:8] + ["..."]
else:
compacted[key] = value
return compacted
def deterministic_summary(
task: RouteTask,
matrix: RouteMatrix,
solution: RouteSolution,
warnings: list[str],
) -> str:
objective_text = "预计驾驶时间" if task.objective == "time" else "路线距离"
warning_text = ""
if warnings:
warning_text = "\n\n**运行提示**:\n" + "\n".join(f"- {item}" for item in warnings)
return (
"本次求解把用户需求先转成结构化路线优化任务,然后调用地理编码工具获得经纬度,"
f"再通过 `{matrix.source}` 得到点对点距离/时间矩阵。最后,本地优化器以“{objective_text}”为目标,"
f"使用 `{solution.algorithm}` 搜索访问顺序。\n\n"
f"最终路线为:**{' → '.join(solution.route_names)}**。"
f"总距离约 **{format_km(solution.total_distance_meters)}**,预计驾驶时间约 **{format_minutes(solution.total_duration_seconds)}**。"
"\n\n这个设计中,大模型不直接猜最优路线,而是负责理解自然语言、组织工具调用并解释结果;"
"确定性工具负责拿真实数据和完成可验证的算法计算。"
f"{warning_text}"
)