Spaces:
Sleeping
Sleeping
File size: 4,640 Bytes
b27eb78 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
from pathlib import Path
from typing import Any
import opik
from loguru import logger
from opik import opik_context
from smolagents import LiteLLMModel, MessageRole, MultiStepAgent, ToolCallingAgent
from second_brain_online.config import settings
from .tools import (
HuggingFaceEndpointSummarizerTool,
MongoDBRetrieverTool,
OpenAISummarizerTool,
what_can_i_do,
)
def get_agent(retriever_config_path: Path) -> "AgentWrapper":
agent = AgentWrapper.build_from_smolagents(
retriever_config_path=retriever_config_path
)
return agent
class AgentWrapper:
def __init__(self, agent: MultiStepAgent) -> None:
self.__agent = agent
@property
def input_messages(self) -> list[dict]:
return self.__agent.input_messages
@property
def agent_name(self) -> str:
return self.__agent.agent_name
@property
def max_steps(self) -> str:
return self.__agent.max_steps
@classmethod
def build_from_smolagents(cls, retriever_config_path: Path) -> "AgentWrapper":
retriever_tool = MongoDBRetrieverTool(config_path=retriever_config_path)
if settings.USE_HUGGINGFACE_DEDICATED_ENDPOINT:
logger.warning(
f"Using Hugging Face dedicated endpoint as the summarizer with URL: {settings.HUGGINGFACE_DEDICATED_ENDPOINT}"
)
summarizer_tool = HuggingFaceEndpointSummarizerTool()
else:
logger.warning(
f"Using OpenAI as the summarizer with model: {settings.OPENAI_MODEL_ID}"
)
summarizer_tool = OpenAISummarizerTool(stream=False)
model = LiteLLMModel(
model_id=settings.OPENAI_MODEL_ID,
api_base="https://api.openai.com/v1",
api_key=settings.OPENAI_API_KEY,
)
agent = ToolCallingAgent(
tools=[what_can_i_do, retriever_tool], # Remove summarizer - it's redundant
model=model,
max_steps=2, # Reduce steps since we removed summarizer
verbosity_level=2,
)
return cls(agent)
@opik.track(name="Agent.run")
def run(self, task: str, **kwargs) -> Any:
result = self.__agent.run(task, **kwargs)
model = self.__agent.model
metadata = {
"system_prompt": self.__agent.system_prompt,
"system_prompt_template": self.__agent.system_prompt_template,
"tool_description_template": self.__agent.tool_description_template,
"tools": self.__agent.tools,
"model_id": self.__agent.model.model_id,
"api_base": self.__agent.model.api_base,
"input_token_count": model.last_input_token_count,
"output_token_count": model.last_output_token_count,
}
if hasattr(self.__agent, "step_number"):
metadata["step_number"] = self.__agent.step_number
opik_context.update_current_trace(
tags=["agent"],
metadata=metadata,
)
return result
def extract_tool_responses(agent: ToolCallingAgent) -> str:
"""
Extracts and concatenates all tool response contents with numbered observation delimiters.
Args:
input_messages (List[Dict]): List of message dictionaries containing 'role' and 'content' keys
Returns:
str: Tool response contents separated by numbered observation delimiters
Example:
>>> messages = [
... {"role": MessageRole.TOOL_RESPONSE, "content": "First response"},
... {"role": MessageRole.USER, "content": "Question"},
... {"role": MessageRole.TOOL_RESPONSE, "content": "Second response"}
... ]
>>> extract_tool_responses(messages)
"-------- OBSERVATION 1 --------\nFirst response\n-------- OBSERVATION 2 --------\nSecond response"
"""
tool_responses = [
msg["content"]
for msg in agent.input_messages
if msg["role"] == MessageRole.TOOL_RESPONSE
]
return "\n".join(
f"-------- OBSERVATION {i + 1} --------\n{response}"
for i, response in enumerate(tool_responses)
)
class OpikAgentMonitorCallback:
def __init__(self) -> None:
self.output_state: dict = {}
def __call__(self, step_log) -> None:
input_state = {
"agent_memory": step_log.agent_memory,
"tool_calls": step_log.tool_calls,
}
self.output_state = {"observations": step_log.observations}
self.trace(input_state)
@opik.track(name="Callback.agent_step")
def trace(self, step_log) -> dict:
return self.output_state
|