import base64 import json from os.path import join import pandas as pd from langchain_core.messages import SystemMessage, HumanMessage from langchain_core.rate_limiters import InMemoryRateLimiter from langchain_openai.chat_models import ChatOpenAI from langfuse import Langfuse, get_client from langfuse.langchain import CallbackHandler from langgraph.graph import START, StateGraph, MessagesState from langgraph.prebuilt import tools_condition from langgraph.prebuilt import ToolNode class Agent: """ Class representing a basic agent that can answer questions. """ def __init__( self, model: str, tools: list, system_prompt_path: str, data_path: str, openai_api_key: str = None, langfuse_callback_handler: CallbackHandler = None ): """ Initialize the agent object. :param model: The OpenAI model to use. :param tools: List of tools the agent can use. :param system_prompt_path: Path to the system prompt file. :param data_path: Data to be used by the agent. :param openai_api_key: OpenAI API key for authentication. :param langfuse_callback_handler: Langfuse callback handler for tracking and logging interactions. """ rate_limiter = InMemoryRateLimiter( # <-- Super slow! We can only make a request once every 10 seconds requests_per_second=0.1, # Wake up every 100 ms to check whether allowed to make a request, check_every_n_seconds=0.1, # Controls the maximum burst size. max_bucket_size=10, ) self.chat_model = ChatOpenAI( model=model, api_key=openai_api_key, rate_limiter=rate_limiter ) with open(system_prompt_path, "r") as file: self.system_prompt = file.read() self.data_path = data_path self.tools = tools if langfuse_callback_handler is not None: self.chat_model.callbacks = [langfuse_callback_handler] self.chat_model_with_tools = self.chat_model.bind_tools( tools=tools, parallel_tool_calls=False ) self.graph = self.__build_graph() def __call__( self, question: str, question_file: str | None ) -> tuple[str, str]: """ Reply to a question using the agent and return the agents full reply with reasoning included. :param question: The question to ask the agent. :param question_file: The file that comes with the question. :return: The agent's response. """ human_message = self.__format_human_message( question=question, question_file=question_file ) final_state = self.graph.invoke( input={ "messages": [ SystemMessage(content=self.system_prompt), human_message ] }, config={ "callbacks": self.chat_model.callbacks } ) content: str = final_state["messages"][-1].content if content.startswith("```json"): # If the reply starts with a code fence, remove it content = content[7:-3] reply = json.loads(content) return reply["reasoning"], reply["answer"] def __build_graph(self): """ Build the graph for the agent. """ builder = StateGraph(MessagesState) # Define nodes: these do the work builder.add_node("assistant", self.__assistant) builder.add_node("tools", ToolNode(self.tools)) # Define edges: these determine how the control flow moves builder.add_edge(START, "assistant") builder.add_conditional_edges( "assistant", tools_condition, ) builder.add_edge("tools", "assistant") return builder.compile() def __assistant(self, state: MessagesState) -> MessagesState: """ The assistant function that processes the state and returns a response. :param state: The current state of the agent. :return: Updated state with the assistant's response. """ response = self.chat_model_with_tools.invoke(state["messages"]) return {"messages": [response]} def __format_human_message( self, question: str, question_file: str | None ) -> HumanMessage: """ Format the human message for the agent. :param question: The question to ask the agent. :param question_file: The file that comes with the question. :return: Formatted HumanMessage. """ if question_file is None or question_file == '': human_message = HumanMessage(content=question) else: if '.png' in question_file: with open(join(self.data_path, question_file), "rb") as file: file_content = base64.b64encode(file.read()).\ decode("utf-8") human_message = HumanMessage( content=[ { 'type': 'text', 'text': question }, { 'type': 'image', 'source_type': 'base64', 'data': file_content, "mime_type": "image/png" } ] ) elif '.mp3' in question_file: # There is no support for audio fileswhen using gpt-4o # So, I will use a tools to record the .mp3 file in text human_message = HumanMessage( content=[ { 'type': 'text', 'text': f'''{question}\n\nHere is the audio file: ```audio\n{question_file}\n```''' }, ] ) elif '.py' in question_file: with open(join(self.data_path, question_file), "r") as file: file_content = file.read() human_message = HumanMessage( content=[ { 'type': 'text', 'text': f'''{question}\n\nHere is the code: ```python\n{file_content}\n```''' }, ] ) elif '.xlsx' in question_file: data = pd.read_excel( join(self.data_path, question_file), ) data = data.to_string() human_message = HumanMessage( content=[ { 'type': 'text', 'text': f'''{question}\n\nHere is the data: ```\n{data}\n```''' }, ] ) return human_message if __name__ == "__main__": import os from langchain_community.tools import DuckDuckGoSearchResults from tools import multiply, add, subtract, divide, modulus # Initialize Langfuse client with constructor arguments Langfuse( public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"), secret_key=os.environ.get("LANGFUSE_SECRET_KEY"), host='https://cloud.langfuse.com' ) # Get the configured client instance langfuse = get_client() # Initialize the Langfuse handler langfuse_handler = CallbackHandler() tools = [multiply, add, subtract, divide, modulus] tools.append( DuckDuckGoSearchResults() ) agent = Agent( model="gpt-4o", tools=tools, system_prompt_path="prompts/system_prompt.txt", openai_api_key=os.environ.get("OPENAI_API_KEY"), langfuse_callback_handler=langfuse_handler ) response = agent( question=""" Search for Tom Cruise and summarize the results for me. """ ) print(response)