from textwrap import dedent
from typing import TypedDict, List, Dict, Any, Optional, Annotated
from functools import partial
import os
import re
import time
import uuid
# from langchain_openai import ChatOpenAI
# from langchain_huggingface.llms import HuggingFaceEndpoint
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langgraph.graph.message import add_messages
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
from langfuse.langchain import CallbackHandler
from langfuse import get_client
os.environ["LANGFUSE_PUBLIC_KEY"] = os.getenv("LANGFUSE_PUBLIC_KEY", "pk-lf-***") # Public key is safe to expose in client-side code
os.environ["LANGFUSE_SECRET_KEY"] = os.getenv("LANGFUSE_SECRET_KEY", "sk-lf-***")
os.environ["LANGFUSE_BASE_URL"] = os.getenv("LANGFUSE_BASE_URL", "https://us.cloud.langfuse.com") # 🇺🇸 US region
langfuse = get_client()
# Verify connection
if langfuse.auth_check():
print("Langfuse client is authenticated and ready!")
else:
print("Authentication failed. Please check your credentials and host.")
# langfuse_handler = CallbackHandler()
# # Initialize the Hugging Face model
# hf_model_name = "openai/gpt-oss-120b" # "Qwen/Qwen2.5-72B-Instruct"
# hf_model_provider = "nscale" # "hf-inference"
# llm = HuggingFaceEndpoint(
# repo_id=hf_model_name,
# provider=hf_model_provider,
# max_new_tokens=8192,
# do_sample=False,
# # temperature=0.,
# )
# chat_model = ChatHuggingFace(llm=llm)
# # Equip llm with tools
# tools_list = [
# fetch_website,
# get_wiki_full,
# youtube_transcript,
# python_repl_tool,
# duckduckgo_search_results
# ]
# llm_with_tools = chat_model.bind_tools(
# tools_list
# )
# Define Agent Workflow
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
def assistant(state: AgentState, llm) -> Dict[str, Any]:
# System message
textual_description_of_tool = dedent(
"""
duckduckgo_search_results(query: str) -> list[dict]:
Perform a web search using DuckDuckGo and return the results.
Args:
query: The search query string.
Returns:
A list of search results, where each result is a dictionary that includes the snippet, title, and link.
fetch_website(url: str) -> str:
Fetch the content of a website.
Args:
url: The URL of the website to fetch.
Returns:
The title and content of the website.
get_wiki_full(query: str) -> str:
Scrape the content of a Wikipedia page based on the user query.
Args:
query: The user query to search for on Wikipedia.
Returns:
A single string containing the content of the Wikipedia page.
youtube_transcript(url: str) -> list[dict]:
Fetch the transcript of a youtube video.
Args:
url: input youtube url.
Returns:
A list of dictionaries containing the transcript of the youtube videos.
Each dictionary has 'text', 'start', and 'duration' keys.
python_repl_tool(code: str) -> str:
Execute Python code and return the output.
Args:
code: A string of Python code to execute.
Returns:
The output of the executed code or any error messages.
"""
)
sys_msg = SystemMessage(
content=dedent(
f"""
You are a helpful assistant at answering user questions. \
Your final answer will be between and tags. \
You can access provided tools:\n{textual_description_of_tool}\n"""
)
)
return {
"messages": [llm.invoke([sys_msg] + state["messages"])],
}
# # Build the StateGraph for the agent
# # The graph
# builder = StateGraph(AgentState)
# # Define nodes: these do the work
# builder.add_node("assistant", assistant)
# builder.add_node("tools", ToolNode(tools_list))
# # Define edges: these determine how the control flow moves
# builder.add_edge(START, "assistant")
# builder.add_conditional_edges(
# "assistant",
# # If the latest message requires a tool, route to tools
# # Otherwise, provide a direct response
# tools_condition,
# )
# builder.add_edge("tools", "assistant")
# agent_graph = builder.compile()
def extract_answer(text):
match = re.search(r'(.*?)', text, re.DOTALL)
if match:
return match.group(1).strip()
return 'None'
class BasicAgent:
def __init__(self, hf_model_name, hf_model_provider, tools_list):
self.hf_model_name = hf_model_name
self.hf_model_provider = hf_model_provider
self.tools_list = tools_list
print("BasicAgent initialized.")
# Create agent with all the tools
self.agent_graph = self.build_agent_graph()
def build_llm_with_tools(self):
print("Building Hugging Face model and tools...")
# Initialize the Hugging Face model
llm = HuggingFaceEndpoint(
repo_id=self.hf_model_name,
provider=self.hf_model_provider,
max_new_tokens=8192,
do_sample=False,
temperature=0.2,
)
chat_model = ChatHuggingFace(llm=llm)
# Equip llm with tools
llm_with_tools = chat_model.bind_tools(
self.tools_list
)
print("LLM with tools built successfully.")
return llm_with_tools
def build_agent_graph(self):
llm_with_tools = self.build_llm_with_tools()
# Build the StateGraph for the agent
builder = StateGraph(AgentState)
# Define nodes: these do the work
builder.add_node("assistant", partial(assistant, llm=llm_with_tools))
builder.add_node("tools", ToolNode(self.tools_list))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message requires a tool, route to tools
# Otherwise, provide a direct response
tools_condition,
)
builder.add_edge("tools", "assistant")
agent_graph = builder.compile()
print("Agent graph built successfully.")
return agent_graph
async def __call__(self, question: str, task_id: str = None) -> str:
print(f"Agent received question (first 100 chars): {question[:100]}...")
# Create a new Langfuse handler for this specific question to ensure separate traces
handler = CallbackHandler()
# Generate unique identifiers for this trace
trace_id = str(uuid.uuid4())
run_name = f"agent_question_{task_id or trace_id[:8]}"
messages = [
HumanMessage(
content=question
)
]
response = await self.agent_graph.ainvoke(
{"messages": messages},
config={
"recursion_limit": 8,
"callbacks": [handler], # Use the new handler instance
"run_name": run_name,
"metadata": {
"task_id": task_id,
"question_preview": question[:200],
"trace_id": trace_id,
"tags": "agent,question_answering"
}
}
)
response_text = response['messages'][-1].content
answer = extract_answer(response_text)
# Flush the langfuse client to ensure the trace is sent immediately
langfuse.flush()
print(f"Trace logged for task_id: {task_id}")
return answer