hanshan1988's picture
fixed langfuse flush function
2d72111
from textwrap import dedent
from typing import TypedDict, List, Dict, Any, Optional, Annotated
from functools import partial
import os
import re
import time
import uuid
# from langchain_openai import ChatOpenAI
# from langchain_huggingface.llms import HuggingFaceEndpoint
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langgraph.graph.message import add_messages
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
from langfuse.langchain import CallbackHandler
from langfuse import get_client
os.environ["LANGFUSE_PUBLIC_KEY"] = os.getenv("LANGFUSE_PUBLIC_KEY", "pk-lf-***") # Public key is safe to expose in client-side code
os.environ["LANGFUSE_SECRET_KEY"] = os.getenv("LANGFUSE_SECRET_KEY", "sk-lf-***")
os.environ["LANGFUSE_BASE_URL"] = os.getenv("LANGFUSE_BASE_URL", "https://us.cloud.langfuse.com") # 🇺🇸 US region
langfuse = get_client()
# Verify connection
if langfuse.auth_check():
print("Langfuse client is authenticated and ready!")
else:
print("Authentication failed. Please check your credentials and host.")
# langfuse_handler = CallbackHandler()
# # Initialize the Hugging Face model
# hf_model_name = "openai/gpt-oss-120b" # "Qwen/Qwen2.5-72B-Instruct"
# hf_model_provider = "nscale" # "hf-inference"
# llm = HuggingFaceEndpoint(
# repo_id=hf_model_name,
# provider=hf_model_provider,
# max_new_tokens=8192,
# do_sample=False,
# # temperature=0.,
# )
# chat_model = ChatHuggingFace(llm=llm)
# # Equip llm with tools
# tools_list = [
# fetch_website,
# get_wiki_full,
# youtube_transcript,
# python_repl_tool,
# duckduckgo_search_results
# ]
# llm_with_tools = chat_model.bind_tools(
# tools_list
# )
# Define Agent Workflow
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
def assistant(state: AgentState, llm) -> Dict[str, Any]:
# System message
textual_description_of_tool = dedent(
"""
duckduckgo_search_results(query: str) -> list[dict]:
Perform a web search using DuckDuckGo and return the results.
Args:
query: The search query string.
Returns:
A list of search results, where each result is a dictionary that includes the snippet, title, and link.
fetch_website(url: str) -> str:
Fetch the content of a website.
Args:
url: The URL of the website to fetch.
Returns:
The title and content of the website.
get_wiki_full(query: str) -> str:
Scrape the content of a Wikipedia page based on the user query.
Args:
query: The user query to search for on Wikipedia.
Returns:
A single string containing the content of the Wikipedia page.
youtube_transcript(url: str) -> list[dict]:
Fetch the transcript of a youtube video.
Args:
url: input youtube url.
Returns:
A list of dictionaries containing the transcript of the youtube videos.
Each dictionary has 'text', 'start', and 'duration' keys.
python_repl_tool(code: str) -> str:
Execute Python code and return the output.
Args:
code: A string of Python code to execute.
Returns:
The output of the executed code or any error messages.
"""
)
sys_msg = SystemMessage(
content=dedent(
f"""
You are a helpful assistant at answering user questions. \
Your final answer will be between <answer> and </answer> tags. \
You can access provided tools:\n{textual_description_of_tool}\n"""
)
)
return {
"messages": [llm.invoke([sys_msg] + state["messages"])],
}
# # Build the StateGraph for the agent
# # The graph
# builder = StateGraph(AgentState)
# # Define nodes: these do the work
# builder.add_node("assistant", assistant)
# builder.add_node("tools", ToolNode(tools_list))
# # Define edges: these determine how the control flow moves
# builder.add_edge(START, "assistant")
# builder.add_conditional_edges(
# "assistant",
# # If the latest message requires a tool, route to tools
# # Otherwise, provide a direct response
# tools_condition,
# )
# builder.add_edge("tools", "assistant")
# agent_graph = builder.compile()
def extract_answer(text):
match = re.search(r'<answer>(.*?)</answer>', text, re.DOTALL)
if match:
return match.group(1).strip()
return 'None'
class BasicAgent:
def __init__(self, hf_model_name, hf_model_provider, tools_list):
self.hf_model_name = hf_model_name
self.hf_model_provider = hf_model_provider
self.tools_list = tools_list
print("BasicAgent initialized.")
# Create agent with all the tools
self.agent_graph = self.build_agent_graph()
def build_llm_with_tools(self):
print("Building Hugging Face model and tools...")
# Initialize the Hugging Face model
llm = HuggingFaceEndpoint(
repo_id=self.hf_model_name,
provider=self.hf_model_provider,
max_new_tokens=8192,
do_sample=False,
temperature=0.2,
)
chat_model = ChatHuggingFace(llm=llm)
# Equip llm with tools
llm_with_tools = chat_model.bind_tools(
self.tools_list
)
print("LLM with tools built successfully.")
return llm_with_tools
def build_agent_graph(self):
llm_with_tools = self.build_llm_with_tools()
# Build the StateGraph for the agent
builder = StateGraph(AgentState)
# Define nodes: these do the work
builder.add_node("assistant", partial(assistant, llm=llm_with_tools))
builder.add_node("tools", ToolNode(self.tools_list))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message requires a tool, route to tools
# Otherwise, provide a direct response
tools_condition,
)
builder.add_edge("tools", "assistant")
agent_graph = builder.compile()
print("Agent graph built successfully.")
return agent_graph
async def __call__(self, question: str, task_id: str = None) -> str:
print(f"Agent received question (first 100 chars): {question[:100]}...")
# Create a new Langfuse handler for this specific question to ensure separate traces
handler = CallbackHandler()
# Generate unique identifiers for this trace
trace_id = str(uuid.uuid4())
run_name = f"agent_question_{task_id or trace_id[:8]}"
messages = [
HumanMessage(
content=question
)
]
response = await self.agent_graph.ainvoke(
{"messages": messages},
config={
"recursion_limit": 8,
"callbacks": [handler], # Use the new handler instance
"run_name": run_name,
"metadata": {
"task_id": task_id,
"question_preview": question[:200],
"trace_id": trace_id,
"tags": "agent,question_answering"
}
}
)
response_text = response['messages'][-1].content
answer = extract_answer(response_text)
# Flush the langfuse client to ensure the trace is sent immediately
langfuse.flush()
print(f"Trace logged for task_id: {task_id}")
return answer