DIY_Agent / agent_multi.py
bwilkie's picture
Rename agent.py to agent_multi.py
0f20409 verified
from langchain_core.output_parsers import PydanticOutputParser
from typing import Callable, Dict, List, Any
import time
import json
from groq_api import grok_get_llm_response, API_llama_get_llm_response, open_oss_get_llm_response, openai_get_llm_response, deepseekapi_get_llm_response
from local_templates import llama3_get_llm_response, mistral_get_llm_response, qwen_get_llm_response, deepseek_get_llm_response, grape_get_llm_response
import os
import re
max_steps = 15
base_dir = os.path.dirname(os.path.abspath(__file__))
def select_model(model_type: str):
"""Return the correct LLM response function for a given model_type."""
mapping = {
"groq_api": grok_get_llm_response,
"llama_api": API_llama_get_llm_response,
"oss_api": open_oss_get_llm_response,
"openai_api": openai_get_llm_response,
"deepseek_api": deepseekapi_get_llm_response,
"llama3": llama3_get_llm_response,
"mistral": mistral_get_llm_response,
"qwen3": qwen_get_llm_response,
"deepseek": deepseek_get_llm_response,
"grape": grape_get_llm_response,
}
if model_type not in mapping:
raise ValueError(f"Unknown model_type: {model_type}")
return mapping[model_type]
def format_gaia_response(model_type, last_observation, question_out):
get_llm_response = select_model(model_type)
# Process Gaia
with open(base_dir+"/system_prompt_final.txt", "r") as f:
final_sys_prompt = f.read()
gaia_prompt = (
f"{final_sys_prompt}\n\n"
f"User Question:\n{question_out}\n\n"
f"Last Observation:\n{last_observation}\n\n"
"Please review user questions and the last obervation and respond with the correct answer, in the correct format. No extra text, just the answer."
)
final_answer_out = get_llm_response(final_sys_prompt, gaia_prompt, reasoning_format = 'hidden')
return final_answer_out
class ImprovedAgent:
def __init__(self, tools: Dict[str, Callable], model_type: str):
self.tools = tools
self.history = []
self.get_llm_response = select_model(model_type)
# Load system prompts from .txt files
self.system_prompt_plan = self.load_prompt(base_dir+"/system_prompt_planning.txt")
self.system_prompt_thought = self.load_prompt(base_dir+"/system_prompt_thought.txt")
self.system_prompt_action = self.load_prompt(base_dir+"/system_prompt_action.txt")
self.system_prompt_observe = self.load_prompt(base_dir+"/system_prompt_observe.txt")
def load_prompt(self, filepath: str) -> str:
with open(filepath, "r") as f:
return f.read()
def reset(self):
self.history = []
def strip_markdown_code_block(self, text: str) -> str:
"""
Remove leading/trailing markdown code block markers like ```json or ```
"""
# Remove leading ```json or ``` (case-insensitive, multiline-safe)
text = re.sub(r"^```(?:json)?\s*", "", text, flags=re.IGNORECASE)
# Remove trailing ```
text = re.sub(r"\s*```$", "", text)
return text.strip()
def parse_json_response(self, response_text: str) -> Dict:
"""Attempt to parse LLM JSON response safely."""
try:
cleaned = self.strip_markdown_code_block(response_text.strip())
json_text = self.extract_json_string(cleaned)
json_text = json_text.replace("\\'", "'")
#json_text = json_text.replace("\n", "\\n")
return json.loads(json_text)
except json.JSONDecodeError as e:
print(f"[ERROR] JSON Parse Error: {e}")
print(f"[DEBUG] Raw response: {response_text}")
return {"error": f"Invalid JSON response: {str(e)}"}
def extract_json_string(self, text: str) -> str:
"""Extract the first valid-looking JSON object from a string."""
match = re.search(r'\{.*\}', text, re.DOTALL)
return match.group(0) if match else text
def build_prompt_from_history(self, query: str) -> str:
return f"""User Query: {query}
History: {json.dumps(self.history, indent=2)}
"""
def run(self, query: str):
self.reset()
# Step 1: Planning Agent
planning_input = f"User Query: {query}"
print("-----Stage Plan-----")
plan_response = self.get_llm_response(self.system_prompt_plan, planning_input)
print("-----Plan Text-----")
print(plan_response)
print("-------------------")
print("-----Plan Parsed-----")
parsed_plan = self.parse_json_response(plan_response)
print(parsed_plan)
print("---------------------")
self.history.append(parsed_plan)
current_input = self.build_prompt_from_history(query)
for _ in range(max_steps): # maximum 5 loops
print(f"-----Itterantion {_}-----")
# Step 2: Thought Agent
print("-----Stage Thought-----")
thought_response = self.get_llm_response(self.system_prompt_thought, current_input)
print(thought_response)
parsed_thought = self.parse_json_response(thought_response)
print("-----Thought Parsed-----")
print(parsed_thought)
print("-----------------")
self.history.append(parsed_thought)
# Step 3: Action Agent
if "thought" not in parsed_thought:
return "[ERROR] Thought agent did not return 'thought'. Ending.", ""
action_input = json.dumps({"thought": parsed_thought["thought"]})
print("-----Stage Action-----")
action_response_text = self.get_llm_response(self.system_prompt_action, action_input)
# With this:
try:
# Handle <think> tags
if '<think>' in action_response_text and '</think>' in action_response_text:
json_part = action_response_text.split('</think>')[1].strip()
else:
json_part = action_response_text.strip()
# Extract JSON
import re
json_match = re.search(r'\{.*\}', json_part)
if json_match:
parsed_action = json.loads(json_match.group())
else:
parsed_action = {'error': 'No JSON found in response'}
except Exception as e:
parsed_action = {'error': f'JSON parsing failed: {str(e)}'}
print(parsed_action)
print("-----------------")
self.history.append(parsed_action)
# Step 4: Tool Execution
tool_name = parsed_action.get("action")
tool_args = parsed_action.get("action_input", {})
# print("-----Tool Name-----")
# print(tool_name)
# print("-----Tool Args-----")
# print(tool_args)
# print("-----------------")
if not tool_name or tool_name not in self.tools:
observation = f"[ERROR] Invalid or missing tool: {tool_name}"
else:
try:
result = self.tools[tool_name](**tool_args)
observation = f"Tool `{tool_name}` executed successfully. Output: {result}"
print("-----Tool Observation OK-----")
print(observation)
print("-----------------")
except Exception as e:
observation = f"[ERROR] Tool `{tool_name}` execution failed: {str(e)}"
print("-----Tool Observation Fail-----")
print(observation)
print("-----------------")
# Store the tool result explicitly in history
self.history.append({
"tool_name": tool_name,
"tool_args": tool_args,
#"tool_output": result if 'result' in locals() else None
})
# Step 5: Observation Agent
observation_input = f"""User Query: {query}
Plan: {json.dumps(self.history[0], indent=2)}
History: {json.dumps(self.history, indent=2)}
Tool Output: {observation}
"""
print("-----Stage Observe-----")
observation_response_text = self.get_llm_response(self.system_prompt_observe, observation_input)
print("-----Observation Parsed-----")
parsed_observation = self.parse_json_response(observation_response_text)
print(parsed_observation)
print("-----------------")
self.history.append(parsed_observation)
# Step 6: Check for final answer
if "final_answer" in parsed_observation:
print(parsed_observation["final_answer"])
#break
return self.history, observation_response_text, parsed_observation["final_answer"]
# Step 7: Update prompt for next loop
current_input = self.build_prompt_from_history(query)
print('ERROR LOOP LIMIT REACHED')
return self.history, observation_response_text + "This is our last observation. Make your best estimation given the question.", parsed_observation