from textwrap import dedent from typing import TypedDict, List, Dict, Any, Optional, Annotated from functools import partial import os import re import time import uuid # from langchain_openai import ChatOpenAI # from langchain_huggingface.llms import HuggingFaceEndpoint from langgraph.graph import StateGraph, START, END from langgraph.prebuilt import ToolNode, tools_condition from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage from langgraph.graph.message import add_messages from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint from langfuse.langchain import CallbackHandler from langfuse import get_client os.environ["LANGFUSE_PUBLIC_KEY"] = os.getenv("LANGFUSE_PUBLIC_KEY", "pk-lf-***") # Public key is safe to expose in client-side code os.environ["LANGFUSE_SECRET_KEY"] = os.getenv("LANGFUSE_SECRET_KEY", "sk-lf-***") os.environ["LANGFUSE_BASE_URL"] = os.getenv("LANGFUSE_BASE_URL", "https://us.cloud.langfuse.com") # 🇺🇸 US region langfuse = get_client() # Verify connection if langfuse.auth_check(): print("Langfuse client is authenticated and ready!") else: print("Authentication failed. Please check your credentials and host.") # langfuse_handler = CallbackHandler() # # Initialize the Hugging Face model # hf_model_name = "openai/gpt-oss-120b" # "Qwen/Qwen2.5-72B-Instruct" # hf_model_provider = "nscale" # "hf-inference" # llm = HuggingFaceEndpoint( # repo_id=hf_model_name, # provider=hf_model_provider, # max_new_tokens=8192, # do_sample=False, # # temperature=0., # ) # chat_model = ChatHuggingFace(llm=llm) # # Equip llm with tools # tools_list = [ # fetch_website, # get_wiki_full, # youtube_transcript, # python_repl_tool, # duckduckgo_search_results # ] # llm_with_tools = chat_model.bind_tools( # tools_list # ) # Define Agent Workflow class AgentState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] def assistant(state: AgentState, llm) -> Dict[str, Any]: # System message textual_description_of_tool = dedent( """ duckduckgo_search_results(query: str) -> list[dict]: Perform a web search using DuckDuckGo and return the results. Args: query: The search query string. Returns: A list of search results, where each result is a dictionary that includes the snippet, title, and link. fetch_website(url: str) -> str: Fetch the content of a website. Args: url: The URL of the website to fetch. Returns: The title and content of the website. get_wiki_full(query: str) -> str: Scrape the content of a Wikipedia page based on the user query. Args: query: The user query to search for on Wikipedia. Returns: A single string containing the content of the Wikipedia page. youtube_transcript(url: str) -> list[dict]: Fetch the transcript of a youtube video. Args: url: input youtube url. Returns: A list of dictionaries containing the transcript of the youtube videos. Each dictionary has 'text', 'start', and 'duration' keys. python_repl_tool(code: str) -> str: Execute Python code and return the output. Args: code: A string of Python code to execute. Returns: The output of the executed code or any error messages. """ ) sys_msg = SystemMessage( content=dedent( f""" You are a helpful assistant at answering user questions. \ Your final answer will be between and tags. \ You can access provided tools:\n{textual_description_of_tool}\n""" ) ) return { "messages": [llm.invoke([sys_msg] + state["messages"])], } # # Build the StateGraph for the agent # # The graph # builder = StateGraph(AgentState) # # Define nodes: these do the work # builder.add_node("assistant", assistant) # builder.add_node("tools", ToolNode(tools_list)) # # Define edges: these determine how the control flow moves # builder.add_edge(START, "assistant") # builder.add_conditional_edges( # "assistant", # # If the latest message requires a tool, route to tools # # Otherwise, provide a direct response # tools_condition, # ) # builder.add_edge("tools", "assistant") # agent_graph = builder.compile() def extract_answer(text): match = re.search(r'(.*?)', text, re.DOTALL) if match: return match.group(1).strip() return 'None' class BasicAgent: def __init__(self, hf_model_name, hf_model_provider, tools_list): self.hf_model_name = hf_model_name self.hf_model_provider = hf_model_provider self.tools_list = tools_list print("BasicAgent initialized.") # Create agent with all the tools self.agent_graph = self.build_agent_graph() def build_llm_with_tools(self): print("Building Hugging Face model and tools...") # Initialize the Hugging Face model llm = HuggingFaceEndpoint( repo_id=self.hf_model_name, provider=self.hf_model_provider, max_new_tokens=8192, do_sample=False, temperature=0.2, ) chat_model = ChatHuggingFace(llm=llm) # Equip llm with tools llm_with_tools = chat_model.bind_tools( self.tools_list ) print("LLM with tools built successfully.") return llm_with_tools def build_agent_graph(self): llm_with_tools = self.build_llm_with_tools() # Build the StateGraph for the agent builder = StateGraph(AgentState) # Define nodes: these do the work builder.add_node("assistant", partial(assistant, llm=llm_with_tools)) builder.add_node("tools", ToolNode(self.tools_list)) # Define edges: these determine how the control flow moves builder.add_edge(START, "assistant") builder.add_conditional_edges( "assistant", # If the latest message requires a tool, route to tools # Otherwise, provide a direct response tools_condition, ) builder.add_edge("tools", "assistant") agent_graph = builder.compile() print("Agent graph built successfully.") return agent_graph async def __call__(self, question: str, task_id: str = None) -> str: print(f"Agent received question (first 100 chars): {question[:100]}...") # Create a new Langfuse handler for this specific question to ensure separate traces handler = CallbackHandler() # Generate unique identifiers for this trace trace_id = str(uuid.uuid4()) run_name = f"agent_question_{task_id or trace_id[:8]}" messages = [ HumanMessage( content=question ) ] response = await self.agent_graph.ainvoke( {"messages": messages}, config={ "recursion_limit": 8, "callbacks": [handler], # Use the new handler instance "run_name": run_name, "metadata": { "task_id": task_id, "question_preview": question[:200], "trace_id": trace_id, "tags": "agent,question_answering" } } ) response_text = response['messages'][-1].content answer = extract_answer(response_text) # Flush the langfuse client to ensure the trace is sent immediately langfuse.flush() print(f"Trace logged for task_id: {task_id}") return answer