File size: 7,986 Bytes
1e9e483 9df9926 1e9e483 9df9926 2d72111 1e9e483 15181b7 1e9e483 9df9926 1e9e483 9df9926 1e9e483 a654024 1e9e483 a654024 1e9e483 9df9926 1e9e483 9df9926 1e9e483 9df9926 1e9e483 af6fdc2 9df9926 af6fdc2 9df9926 af6fdc2 9df9926 af6fdc2 9df9926 f2f195a 1e9e483 f2f195a 15181b7 ae79440 f2f195a 1e9e483 f2f195a 1e9e483 f2f195a af6fdc2 1e9e483 a654024 15181b7 ae79440 1e9e483 f2f195a 1e9e483 f2f195a 2d72111 15181b7 f2f195a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | from textwrap import dedent
from typing import TypedDict, List, Dict, Any, Optional, Annotated
from functools import partial
import os
import re
import time
import uuid
# from langchain_openai import ChatOpenAI
# from langchain_huggingface.llms import HuggingFaceEndpoint
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langgraph.graph.message import add_messages
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint
from langfuse.langchain import CallbackHandler
from langfuse import get_client
os.environ["LANGFUSE_PUBLIC_KEY"] = os.getenv("LANGFUSE_PUBLIC_KEY", "pk-lf-***") # Public key is safe to expose in client-side code
os.environ["LANGFUSE_SECRET_KEY"] = os.getenv("LANGFUSE_SECRET_KEY", "sk-lf-***")
os.environ["LANGFUSE_BASE_URL"] = os.getenv("LANGFUSE_BASE_URL", "https://us.cloud.langfuse.com") # 🇺🇸 US region
langfuse = get_client()
# Verify connection
if langfuse.auth_check():
print("Langfuse client is authenticated and ready!")
else:
print("Authentication failed. Please check your credentials and host.")
# langfuse_handler = CallbackHandler()
# # Initialize the Hugging Face model
# hf_model_name = "openai/gpt-oss-120b" # "Qwen/Qwen2.5-72B-Instruct"
# hf_model_provider = "nscale" # "hf-inference"
# llm = HuggingFaceEndpoint(
# repo_id=hf_model_name,
# provider=hf_model_provider,
# max_new_tokens=8192,
# do_sample=False,
# # temperature=0.,
# )
# chat_model = ChatHuggingFace(llm=llm)
# # Equip llm with tools
# tools_list = [
# fetch_website,
# get_wiki_full,
# youtube_transcript,
# python_repl_tool,
# duckduckgo_search_results
# ]
# llm_with_tools = chat_model.bind_tools(
# tools_list
# )
# Define Agent Workflow
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
def assistant(state: AgentState, llm) -> Dict[str, Any]:
# System message
textual_description_of_tool = dedent(
"""
duckduckgo_search_results(query: str) -> list[dict]:
Perform a web search using DuckDuckGo and return the results.
Args:
query: The search query string.
Returns:
A list of search results, where each result is a dictionary that includes the snippet, title, and link.
fetch_website(url: str) -> str:
Fetch the content of a website.
Args:
url: The URL of the website to fetch.
Returns:
The title and content of the website.
get_wiki_full(query: str) -> str:
Scrape the content of a Wikipedia page based on the user query.
Args:
query: The user query to search for on Wikipedia.
Returns:
A single string containing the content of the Wikipedia page.
youtube_transcript(url: str) -> list[dict]:
Fetch the transcript of a youtube video.
Args:
url: input youtube url.
Returns:
A list of dictionaries containing the transcript of the youtube videos.
Each dictionary has 'text', 'start', and 'duration' keys.
python_repl_tool(code: str) -> str:
Execute Python code and return the output.
Args:
code: A string of Python code to execute.
Returns:
The output of the executed code or any error messages.
"""
)
sys_msg = SystemMessage(
content=dedent(
f"""
You are a helpful assistant at answering user questions. \
Your final answer will be between <answer> and </answer> tags. \
You can access provided tools:\n{textual_description_of_tool}\n"""
)
)
return {
"messages": [llm.invoke([sys_msg] + state["messages"])],
}
# # Build the StateGraph for the agent
# # The graph
# builder = StateGraph(AgentState)
# # Define nodes: these do the work
# builder.add_node("assistant", assistant)
# builder.add_node("tools", ToolNode(tools_list))
# # Define edges: these determine how the control flow moves
# builder.add_edge(START, "assistant")
# builder.add_conditional_edges(
# "assistant",
# # If the latest message requires a tool, route to tools
# # Otherwise, provide a direct response
# tools_condition,
# )
# builder.add_edge("tools", "assistant")
# agent_graph = builder.compile()
def extract_answer(text):
match = re.search(r'<answer>(.*?)</answer>', text, re.DOTALL)
if match:
return match.group(1).strip()
return 'None'
class BasicAgent:
def __init__(self, hf_model_name, hf_model_provider, tools_list):
self.hf_model_name = hf_model_name
self.hf_model_provider = hf_model_provider
self.tools_list = tools_list
print("BasicAgent initialized.")
# Create agent with all the tools
self.agent_graph = self.build_agent_graph()
def build_llm_with_tools(self):
print("Building Hugging Face model and tools...")
# Initialize the Hugging Face model
llm = HuggingFaceEndpoint(
repo_id=self.hf_model_name,
provider=self.hf_model_provider,
max_new_tokens=8192,
do_sample=False,
temperature=0.2,
)
chat_model = ChatHuggingFace(llm=llm)
# Equip llm with tools
llm_with_tools = chat_model.bind_tools(
self.tools_list
)
print("LLM with tools built successfully.")
return llm_with_tools
def build_agent_graph(self):
llm_with_tools = self.build_llm_with_tools()
# Build the StateGraph for the agent
builder = StateGraph(AgentState)
# Define nodes: these do the work
builder.add_node("assistant", partial(assistant, llm=llm_with_tools))
builder.add_node("tools", ToolNode(self.tools_list))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message requires a tool, route to tools
# Otherwise, provide a direct response
tools_condition,
)
builder.add_edge("tools", "assistant")
agent_graph = builder.compile()
print("Agent graph built successfully.")
return agent_graph
async def __call__(self, question: str, task_id: str = None) -> str:
print(f"Agent received question (first 100 chars): {question[:100]}...")
# Create a new Langfuse handler for this specific question to ensure separate traces
handler = CallbackHandler()
# Generate unique identifiers for this trace
trace_id = str(uuid.uuid4())
run_name = f"agent_question_{task_id or trace_id[:8]}"
messages = [
HumanMessage(
content=question
)
]
response = await self.agent_graph.ainvoke(
{"messages": messages},
config={
"recursion_limit": 8,
"callbacks": [handler], # Use the new handler instance
"run_name": run_name,
"metadata": {
"task_id": task_id,
"question_preview": question[:200],
"trace_id": trace_id,
"tags": "agent,question_answering"
}
}
)
response_text = response['messages'][-1].content
answer = extract_answer(response_text)
# Flush the langfuse client to ensure the trace is sent immediately
langfuse.flush()
print(f"Trace logged for task_id: {task_id}")
return answer |