Spaces:
Sleeping
Sleeping
| import os | |
| from typing import TypedDict, List, Dict, Any, Optional | |
| from langgraph.graph import StateGraph, START, END | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import HumanMessage, AIMessage | |
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint | |
| from langgraph.prebuilt import ToolNode, tools_condition | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| from langchain_core.utils.function_calling import convert_to_openai_tool | |
| from langchain.tools import Tool | |
| from serpapi import GoogleSearch | |
| import requests | |
| from bs4 import BeautifulSoup | |
| SERPAPI_API_KEY = os.environ["SERPAPI_TOKEN"] | |
| def serpapi_search(query: str) -> str: | |
| print(f"Running SerpAPI search for: {query}") | |
| params = { | |
| "engine": "google", | |
| "q": query, | |
| "api_key": SERPAPI_API_KEY, | |
| "num": 3, | |
| } | |
| search = GoogleSearch(params) | |
| results = search.get_dict() | |
| if "organic_results" in results: | |
| snippets = [] | |
| for item in results["organic_results"]: | |
| snippet = item.get("snippet", "") | |
| link = item.get("link", "") | |
| snippets.append(f"{snippet}\nURL: {link}") | |
| return "\n\n".join(snippets) | |
| return "No results found." | |
| serpapi_tool = Tool( | |
| name="serpapi_search", | |
| func=serpapi_search, | |
| description="A tool that allows you to search the web using Google via SerpAPI. Input should be a search query." | |
| ) | |
| def fetch_website_content(url: str) -> str: | |
| print(f"Fetching website content from: {url}") | |
| try: | |
| response = requests.get(url, timeout=5) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Get main text content (very basic) | |
| text = soup.get_text(separator="\n", strip=True) | |
| return text[:1000] # Return first 1000 chars for brevity | |
| except Exception as e: | |
| print(f"Error fetching website: {e}") | |
| return f"Error fetching website: {e}" | |
| fetch_website_tool = Tool( | |
| name="fetch_website_content", | |
| func=fetch_website_content, | |
| description="Fetches and returns the main text content of a given website URL." | |
| ) | |
| # Initialize LLM | |
| model = ChatOpenAI( model="gpt-4o",temperature=0) | |
| #model = ChatOpenAI(model="gpt-4o-mini", temperature=0) | |
| #vision_llm = ChatOpenAI(model="gpt-4o") | |
| #search_tool = DuckDuckGoSearchRun() | |
| tools = [serpapi_tool]#, fetch_website_tool] | |
| llm_with_tools = model.bind_tools(tools, parallel_tool_calls=False) | |
| class AgentState(TypedDict): | |
| question: Dict[str, Any] | |
| messages: List[Any] | |
| answer: Optional[str] | |
| tool_calls: Optional[list] | |
| tool_outputs: Optional[list] | |
| def assistant(state: AgentState): | |
| print("\n--- ASSISTANT NODE ---") | |
| print(f"State received: {state}") | |
| question = state["question"] | |
| print(f"Question dict: {question}") | |
| #textual_description_of_tool = """ | |
| #search_tool: A tool that allows you to search the web using DuckDuckGo. It returns a list of search results based on the query provided. | |
| #""" | |
| textual_description_of_tool = """ | |
| serpapi_search: A tool that allows you to search the web using Google via SerpAPI. It returns a list of search results based on the query provided. | |
| fetch_website_content(url: str) -> str: A tool that fetches and returns the main text content of a given website URL. | |
| """ | |
| system_prompt = SystemMessage( | |
| content=f""" | |
| Your answers are tested. Try to answer the question as accurately as possible. Give only the minimum necessary information to answer the question. | |
| If you use a tool, answer the question using the tool results provided below. | |
| Tool results will be provided as context after your question. If you receive a tool output, then use this information and come to the final answer if possible. | |
| Only call another tool if you cannot answer the question with the information provided. | |
| If you formulate your final answer, analyze it if it really ONLY answers the question. Don't provide additional information. One word, number or name is enough if it answers the question. | |
| """ | |
| #You can use the following tools to help you: | |
| #{textual_description_of_tool} | |
| ) | |
| messages = [system_prompt] | |
| # Always add the user question | |
| messages.append(HumanMessage(content=f"Question: {question.get('question', question)}")) | |
| # If tool_outputs exist, add them as context | |
| if state.get("tool_outputs"): | |
| # Format tool results as plain text | |
| tool_results = state["tool_outputs"] | |
| if isinstance(tool_results, dict): | |
| tool_text = "" | |
| if "search_results" in tool_results and tool_results["search_results"]: | |
| tool_text += "Search Results:\n" | |
| tool_text += "\n".join(str(r) for r in tool_results["search_results"]) | |
| if "website_contents" in tool_results and tool_results["website_contents"]: | |
| tool_text += "\nWebsite Contents:\n" | |
| for wc in tool_results["website_contents"]: | |
| tool_text += f"\nURL: {wc['url']}\nContent: {wc['content']}\n" | |
| else: | |
| tool_text = str(tool_results) | |
| messages.append(HumanMessage(content=f"Tool results:\n{tool_text}")) | |
| print(f"Messages sent to LLM: {messages}") | |
| response = llm_with_tools.invoke(messages) | |
| print(f"Raw LLM response: {response}") | |
| # If the LLM wants to call a tool, store tool_calls in state | |
| tool_calls = getattr(response, "tool_calls", None) | |
| if tool_calls: | |
| print(f"Tool calls requested: {tool_calls}") | |
| state["tool_calls"] = tool_calls | |
| state["answer"] = "" # Not final yet | |
| state.setdefault("messages", []).append(AIMessage(content="Calling tool: " + str(tool_calls))) | |
| else: | |
| state["answer"] = response.content.strip() | |
| print(f"Model response: {state['answer']}") | |
| state.setdefault("messages", []).append(AIMessage(content=state["answer"])) | |
| state["tool_calls"] = None | |
| return state | |
| def tool_node(state: AgentState): | |
| print("\n--- TOOL NODE ---") | |
| print(f"State received: {state}") | |
| search_results = [] | |
| website_contents = [] | |
| tool_calls = state.get("tool_calls") or [] | |
| for call in tool_calls: | |
| print(f"Tool call: {call}") | |
| args = call.get("args", {}) | |
| # Accept both {"query": ...} and {"__arg1": ...} | |
| query = args.get("query") or args.get("__arg1") or (list(args.values())[0] if args else None) | |
| print(f"Query to use: {query}") | |
| if call["name"] == "serpapi_search": | |
| print("--- SERPAPI SEARCH ---") | |
| try: | |
| result = serpapi_search(query) | |
| search_results.append(result) | |
| except Exception as e: | |
| print(f"Error running SerpAPI search: {e}") | |
| search_results.append(f"Error: {e}") | |
| elif call["name"] == "fetch_website_content": | |
| print("--- FETCH WEBSITE CONTENT ---") | |
| try: | |
| content = fetch_website_content(query) | |
| website_contents.append({"url": query, "content": content}) | |
| except Exception as e: | |
| print(f"Error fetching website: {e}") | |
| website_contents.append({"url": query, "content": f"Error: {e}"}) | |
| # Store tool outputs in state for the assistant | |
| state["tool_outputs"] = { | |
| "search_results": search_results, | |
| "website_contents": website_contents | |
| } | |
| state["tool_calls"] = None # Clear tool calls | |
| # Add tool results to conversation history for traceability | |
| state.setdefault("messages", []).append( | |
| HumanMessage(content=f"Tool results: {state['tool_outputs']}") | |
| ) | |
| return state | |
| class BasicAgent: | |
| compiled_graph: StateGraph | |
| def __init__(self): | |
| print("BasicAgent initialized.") | |
| #building the graph | |
| answering_graph = StateGraph(AgentState) | |
| # Add nodes | |
| answering_graph.add_node("assistant", assistant) | |
| #answering_graph.add_node("tools", ToolNode(tools)) | |
| answering_graph.add_node("tools", tool_node) | |
| # Add edges | |
| answering_graph.add_edge(START, "assistant") | |
| answering_graph.add_conditional_edges( | |
| "assistant", | |
| lambda state: "tools" if state.get("tool_calls") else END | |
| ) | |
| answering_graph.add_edge("tools", "assistant") | |
| # Compile the graph | |
| self.compiled_graph = answering_graph.compile() | |
| def __call__(self, question: str) -> str: | |
| question_text = question.get("question") | |
| print(f"Agent received question (first 50 chars): {question_text[:50]}...") | |
| initial_state = { | |
| "question": question, | |
| "messages": [], | |
| "answer": None, | |
| "tool_calls": None, | |
| "tool_outputs": None | |
| } | |
| print(f"Initial state: {initial_state}") | |
| answer = self.compiled_graph.invoke(initial_state) | |
| print(f"Agent returning answer: {answer.get('answer')}") | |
| return answer.get("answer") |