FeiMatrix-Synapse / core /agent.py
aifeifei798's picture
Upload 7 files
719390c verified
raw
history blame
6.97 kB
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
from typing import List, Any
import json
import os
import re
from .tool_recommender import DirectToolRecommender
from tools.tool_registry import get_tool_by_name
# --- Agent Prompt, now fully in English ---
AGENT_PROMPT_TEMPLATE = """
You are a powerful AI assistant. Your task is to understand the user's question and decide if a tool is needed to answer it.
You have the following tools available:
{tools}
If you need to use a tool, you must respond in the following JSON format strictly, without any other text or explanation:
{{
"tool": "the_name_of_the_tool_to_call",
"tool_input": {{ "parameter1": "value1", "parameter2": "value2" }}
}}
If you do not need to use any tool, answer the user's question directly.
This is the conversation history:
{chat_history}
User's question: {input}
Now, think and provide your response (either JSON or a direct answer):
"""
class SmartAIAgent:
def __init__(
self,
tool_recommender: DirectToolRecommender,
registered_tools: List[Any],
api_key: str,
):
self.tool_recommender = tool_recommender
self.registered_tools = registered_tools
self.model_name = "gemini-2.5-flash"
self.llm = ChatGoogleGenerativeAI(
model=self.model_name,
google_api_key=api_key,
convert_system_message_to_human=True,
)
self.chat_history = []
print(f"LangChain Agent initialized, using model: {self.model_name}.")
def _extract_json_from_string(self, text: str) -> dict | None:
"""Extracts a JSON block from a string that might contain other text."""
match = re.search(r"```json\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
json_str = match.group(1)
else:
match = re.search(r"\{.*\}", text, re.DOTALL)
if match:
json_str = match.group(0)
else:
return None
try:
return json.loads(json_str)
except json.JSONDecodeError:
return None
def _format_tools_for_prompt(self, tools: List[dict]) -> str:
"""Formats the list of tools into a clear string for the prompt."""
if not tools:
return "No tools available."
tool_strings = []
for tool in tools:
try:
params = json.loads(tool["parameters"])
param_str = ", ".join(
[f"{p_name}: {p_type}" for p_name, p_type in params.items()]
)
tool_strings.append(
f"- Tool Name: {tool['name']}\n - Description: {tool['description']}\n - Parameters: {param_str}"
)
except (json.JSONDecodeError, TypeError):
tool_strings.append(
f"- Tool Name: {tool['name']}\n - Description: {tool['description']}\n - Parameters: Could not be parsed"
)
return "\n".join(tool_strings)
def _format_chat_history(self) -> str:
"""Formats the chat history for the prompt."""
formatted_history = []
for msg in self.chat_history:
if isinstance(msg, HumanMessage):
formatted_history.append(f"User: {msg.content}")
elif isinstance(msg, AIMessage):
formatted_history.append(f"Assistant: {msg.content}")
elif isinstance(msg, ToolMessage):
formatted_history.append(f"Tool Result: {msg.content}")
return "\n".join(formatted_history)
def stream_run(self, user_input: str):
"""Processes user input in a streaming fashion."""
self.chat_history.append(HumanMessage(content=user_input))
yield "πŸ€” Analyzing your question...\n"
yield "πŸ” Recommending relevant tools from the library...\n"
recommended_tools_meta = self.tool_recommender.recommend_tools(user_input)
if not recommended_tools_meta:
yield "ℹ️ No relevant tools found. Answering directly.\n"
recommended_tools_prompt = "No recommended tools."
else:
tool_names = [t["name"] for t in recommended_tools_meta]
yield f"βœ… Recommended tools: `{', '.join(tool_names)}`\n"
recommended_tools_prompt = self._format_tools_for_prompt(
recommended_tools_meta
)
yield f"🧠 Letting the AI Brain ({self.model_name}) decide on the action...\n"
prompt = AGENT_PROMPT_TEMPLATE.format(
tools=recommended_tools_prompt,
chat_history=self._format_chat_history(),
input=user_input,
)
llm_response = self.llm.invoke(prompt)
llm_decision_content = llm_response.content.strip()
decision = self._extract_json_from_string(llm_decision_content)
if decision and "tool" in decision and "tool_input" in decision:
tool_name = decision.get("tool")
tool_input = decision.get("tool_input")
yield f"πŸ’‘ AI Action: Call tool `{tool_name}` with parameters `{tool_input}`\n"
tool_to_execute = get_tool_by_name(tool_name)
if tool_to_execute:
yield f"βš™οΈ Executing tool `{tool_name}`...\n"
tool_output = tool_to_execute.invoke(tool_input)
yield f"πŸ“Š Tool Result:\n---\n{str(tool_output)[:500]}...\n---\n"
self.chat_history.append(
AIMessage(content=json.dumps(decision, ensure_ascii=False))
)
self.chat_history.append(
ToolMessage(content=str(tool_output), tool_call_id="N/A")
)
yield "✍️ Generating final answer based on tool results...\n\n"
final_answer_prompt = f"Based on the conversation history and the latest tool result, generate a final, complete, and natural response for the user.\n\nConversation History:\n{self._format_chat_history()}\n\nPlease answer directly without mentioning your thought process."
final_answer_stream = self.llm.stream(final_answer_prompt)
full_final_answer = ""
for chunk in final_answer_stream:
yield chunk.content
full_final_answer += chunk.content
self.chat_history.append(AIMessage(content=full_final_answer))
else:
yield f"❌ Error: The tool `{tool_name}` decided by the AI does not exist.\n"
else:
yield "βœ… AI Action: Answer directly.\n\n"
yield llm_decision_content
self.chat_history.append(AIMessage(content=llm_decision_content))