| from textwrap import dedent |
| from typing import TypedDict, List, Dict, Any, Optional, Annotated |
| from functools import partial |
| import os |
| import re |
| import time |
| import uuid |
|
|
| |
| |
| from langgraph.graph import StateGraph, START, END |
| from langgraph.prebuilt import ToolNode, tools_condition |
| from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage |
| from langgraph.graph.message import add_messages |
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint |
|
|
| from langfuse.langchain import CallbackHandler |
| from langfuse import get_client |
|
|
| os.environ["LANGFUSE_PUBLIC_KEY"] = os.getenv("LANGFUSE_PUBLIC_KEY", "pk-lf-***") |
| os.environ["LANGFUSE_SECRET_KEY"] = os.getenv("LANGFUSE_SECRET_KEY", "sk-lf-***") |
| os.environ["LANGFUSE_BASE_URL"] = os.getenv("LANGFUSE_BASE_URL", "https://us.cloud.langfuse.com") |
|
|
| langfuse = get_client() |
| |
| if langfuse.auth_check(): |
| print("Langfuse client is authenticated and ready!") |
| else: |
| print("Authentication failed. Please check your credentials and host.") |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
|
|
| class AgentState(TypedDict): |
| messages: Annotated[list[AnyMessage], add_messages] |
|
|
|
|
| def assistant(state: AgentState, llm) -> Dict[str, Any]: |
| |
| textual_description_of_tool = dedent( |
| """ |
| duckduckgo_search_results(query: str) -> list[dict]: |
| Perform a web search using DuckDuckGo and return the results. |
| Args: |
| query: The search query string. |
| Returns: |
| A list of search results, where each result is a dictionary that includes the snippet, title, and link. |
| |
| fetch_website(url: str) -> str: |
| Fetch the content of a website. |
| Args: |
| url: The URL of the website to fetch. |
| Returns: |
| The title and content of the website. |
| |
| get_wiki_full(query: str) -> str: |
| Scrape the content of a Wikipedia page based on the user query. |
| Args: |
| query: The user query to search for on Wikipedia. |
| Returns: |
| A single string containing the content of the Wikipedia page. |
| |
| youtube_transcript(url: str) -> list[dict]: |
| Fetch the transcript of a youtube video. |
| Args: |
| url: input youtube url. |
| Returns: |
| A list of dictionaries containing the transcript of the youtube videos. |
| Each dictionary has 'text', 'start', and 'duration' keys. |
| |
| python_repl_tool(code: str) -> str: |
| Execute Python code and return the output. |
| Args: |
| code: A string of Python code to execute. |
| Returns: |
| The output of the executed code or any error messages. |
| """ |
| ) |
|
|
| sys_msg = SystemMessage( |
| content=dedent( |
| f""" |
| You are a helpful assistant at answering user questions. \ |
| Your final answer will be between <answer> and </answer> tags. \ |
| You can access provided tools:\n{textual_description_of_tool}\n""" |
| ) |
| ) |
|
|
| return { |
| "messages": [llm.invoke([sys_msg] + state["messages"])], |
| } |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def extract_answer(text): |
| match = re.search(r'<answer>(.*?)</answer>', text, re.DOTALL) |
| if match: |
| return match.group(1).strip() |
| return 'None' |
|
|
| class BasicAgent: |
|
|
| def __init__(self, hf_model_name, hf_model_provider, tools_list): |
| self.hf_model_name = hf_model_name |
| self.hf_model_provider = hf_model_provider |
| self.tools_list = tools_list |
| print("BasicAgent initialized.") |
| |
| self.agent_graph = self.build_agent_graph() |
|
|
| def build_llm_with_tools(self): |
| print("Building Hugging Face model and tools...") |
| |
| llm = HuggingFaceEndpoint( |
| repo_id=self.hf_model_name, |
| provider=self.hf_model_provider, |
| max_new_tokens=8192, |
| do_sample=False, |
| temperature=0.2, |
| ) |
|
|
| chat_model = ChatHuggingFace(llm=llm) |
|
|
| |
|
|
| llm_with_tools = chat_model.bind_tools( |
| self.tools_list |
| ) |
| print("LLM with tools built successfully.") |
| return llm_with_tools |
| |
| def build_agent_graph(self): |
| llm_with_tools = self.build_llm_with_tools() |
| |
| builder = StateGraph(AgentState) |
|
|
| |
| builder.add_node("assistant", partial(assistant, llm=llm_with_tools)) |
| builder.add_node("tools", ToolNode(self.tools_list)) |
|
|
| |
| builder.add_edge(START, "assistant") |
| builder.add_conditional_edges( |
| "assistant", |
| |
| |
| tools_condition, |
| ) |
| builder.add_edge("tools", "assistant") |
| agent_graph = builder.compile() |
| print("Agent graph built successfully.") |
| return agent_graph |
|
|
| async def __call__(self, question: str, task_id: str = None) -> str: |
| print(f"Agent received question (first 100 chars): {question[:100]}...") |
| |
| |
| handler = CallbackHandler() |
| |
| |
| trace_id = str(uuid.uuid4()) |
| run_name = f"agent_question_{task_id or trace_id[:8]}" |
| |
| messages = [ |
| HumanMessage( |
| content=question |
| ) |
| ] |
| |
| response = await self.agent_graph.ainvoke( |
| {"messages": messages}, |
| config={ |
| "recursion_limit": 8, |
| "callbacks": [handler], |
| "run_name": run_name, |
| "metadata": { |
| "task_id": task_id, |
| "question_preview": question[:200], |
| "trace_id": trace_id, |
| "tags": "agent,question_answering" |
| } |
| } |
| ) |
| |
| response_text = response['messages'][-1].content |
| answer = extract_answer(response_text) |
| |
| |
| langfuse.flush() |
| |
| print(f"Trace logged for task_id: {task_id}") |
| |
| return answer |