import os import time import json import arxiv import logging from dotenv import load_dotenv from duckduckgo_search import DDGS from brave import Brave from langchain_groq import ChatGroq from langchain_google_genai import ChatGoogleGenerativeAI from langchain_mistralai.chat_models import ChatMistralAI from langchain_core.tools import tool from langchain_core.messages import SystemMessage, AIMessage from langchain_community.document_loaders import WikipediaLoader, ArxivLoader from langgraph.graph import StateGraph, MessagesState, START from langgraph.prebuilt import tools_condition, ToolNode load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S" ) logger = logging.getLogger(__name__) # ------ ARITHMATIC TOOLS ------ @tool def add(a: int, b: int) -> int: """Add two integers.""" return a + b @tool def subtract(a: int, b: int) -> int: """Subtract two integers.""" return a - b @tool def divide(a: int, b: int) -> int: """Divide two integers.""" if b == 0: raise ValueError("Cannot divide by zero.") return a / b @tool def multiply(a: int, b: int) -> int: """Multiply two integers.""" return a * b @tool def modulus(a: int, b: int) -> int: """Modulus two integers.""" return a % b # --- SEARCH TOOLS --- @tool def wikipedia_search(query: str) -> str: """Search Wikipedia and return summaries of the top 3 articles.""" try: docs = WikipediaLoader(query=query, load_max_docs=3).load() if not docs: return "Error: No Wikipedia results found." formatted = "\n\n---\n\n".join( f"\n{doc.page_content.strip()}\n" for doc in docs ) return formatted except Exception as e: logger.error(f"Wikipedia tool failed: {e}") return f"Error: Wikipedia search failed — {e}" @tool def arxiv_search(query: str) -> str: """Search Arxiv and return summaries of the top 3 results.""" try: client = arxiv.Client() search = arxiv.Search( query=query, max_results=3, sort_by=arxiv.SortCriterion.Relevance ) results = list(client.results(search)) if not results: return "Error: No arXiv results found." formatted = "\n\n---\n\n".join( f"\nTitle: {result.title.strip()}\nSummary: {result.summary.strip()}\n" for result in results ) return formatted except Exception as e: logger.error(f"Arxiv tool failed: {e}") return f"Error: arXiv search failed — {e}" # --- WEB SEARCH TOOLS --- def _perform_brave_search_attempt(query: str, api_key: str) -> str | None: """ Performs one Brave Search (raw) attempt. Returns: - formatted results string on success, - critical-error string if library missing, - None on retryable failure or no results. """ local_logger = logging.getLogger(f"{__name__}._perform_brave_search_attempt") try: client = Brave(api_key=api_key) raw = client.search(q=query, count=3, raw=True) or {} # Extract hits from raw JSON hits = ( raw.get("web", {}).get("results") or raw.get("results") or [] ) # Build entries docs = [] for h in hits: if not isinstance(h, dict): continue url = h.get("url") if not url: continue title = h.get("title", "No Title") snippet = h.get("description", h.get("snippet", "")) docs.append( f"\n" f"Title: {title}\n" f"{snippet}\n" f"" ) if docs: return "\n\n---\n\n".join(docs) local_logger.warning("Brave Search returned no usable results.") return None except ImportError: local_logger.error("Brave Search library not installed.") return "Error: Brave Search library not installed. Cannot perform search." except Exception as e: local_logger.exception("Brave Search attempt failed") return None # Retryable def _perform_duckduckgo_search(query: str) -> str: """ Performs up to 2 DDG searches as fallback. Returns formatted results or an error . """ local_logger = logging.getLogger(f"{__name__}._perform_duckduckgo_search") headers = {"Accept-Encoding": "gzip, deflate, br"} try: with DDGS(headers=headers, timeout=30) as ddgs: for attempt in range(1, 3): local_logger.info(f"DDG attempt {attempt} for '{query}'") try: results = ddgs.text(query, max_results=3) or [] except Exception as e: local_logger.warning(f"DDG error on attempt {attempt}: {e}") results = [] if results: docs = [] for r in results: href = r.get("href") if not href: continue title = r.get("title", "No Title") body = r.get("body", "") docs.append( f"\n" f"Title: {title}\n" f"{body}\n" f"" ) if docs: return "\n\n---\n\n".join(docs) local_logger.warning("DDG returned no docs with URLs.") # back-off before next try time.sleep(7 * attempt) return "Error: All web search attempts (including fallback) failed to find results." except Exception as e: local_logger.exception("DuckDuckGo fallback search failed catastrophically") return f"Error: DuckDuckGo fallback search failed — {type(e).__name__}: {e}" @tool def web_search(query: str) -> str: """ Searches the web and returns top-3 search results formatted as blocks. """ main_logger = logging.getLogger(__name__ + ".web_search") api_key = os.getenv("BRAVE_API_KEY") if api_key: main_logger.info(f"Trying Brave Search for '{query}'") for attempt in range(1, 3): res = _perform_brave_search_attempt(query, api_key) if res: if res.startswith("Error: Brave Search library not installed"): main_logger.error("Brave library missing; skipping to fallback.") break main_logger.info("Brave Search succeeded.") return res main_logger.warning(f"Brave attempt {attempt} failed; retrying...") time.sleep(3) main_logger.warning("Brave Search failed; falling back.") else: main_logger.warning("No BRAVE_API_KEY; skipping Brave Search.") main_logger.info(f"Using DuckDuckGo fallback for '{query}'") return _perform_duckduckgo_search(query) tools = [ add, subtract, divide, multiply, modulus, wikipedia_search, arxiv_search, web_search, ] # ------ Build the graph ------ def build_graph(test_mode=False, system_prompt=None, model_backend="gemini"): """Builds the state graph for the agent with fallback model support.""" # ------ Model Selector ------ if model_backend == "groq": model_name = "llama3-70b-8192" if test_mode else "qwen-qwq-32b" llm = ChatGroq(model=model_name, temperature=0.1, max_tokens=8192) elif model_backend == "mistral": model_name = "mistral-small-latest" llm = ChatMistralAI(model=model_name, temperature=0.1, max_tokens=8192) elif model_backend == "gemini": llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0.1) # gemini-2.5-flash-preview-04-17 # gemini-2.0-flash else: raise ValueError(f"Unsupported model backend: {model_backend}") llm_with_tools = llm.bind_tools(tools) sys_msg = SystemMessage(content=system_prompt or """ You are a precise and helpful agent that uses tools. Key Instructions: 1. Use tools for calculations and information retrieval (web, Wikipedia, ArXiv). Do NOT answer from internal knowledge for these tasks. 2. Use ONLY the specified tool parameters (e.g., `a`, `b` for arithmetic; `query` for search). 3. If a search tool (`wikipedia_search`, `arxiv_search`, `web_search`) returns `...` content (including errors in this format), your FINAL response to the user MUST BE the verbatim content from the tool, including all `` tags and their full content. 4. For multi-step calculations, call tools sequentially. Use the output of one tool call as input for the next. Your FINAL answer for calculations MUST BE ONLY THE NUMERICAL RESULT after all steps are complete. Examples: --- Example 1: Single Step Calculation User: What is 15 divided by 3? Tool Call: ```json { "tool_name": "divide", "parameters": { "a": 15, "b": 3 } } ``` Tool Output: ``` 5 ``` Final Answer: 15 divided by 3 is 5 --- Example 2: Multi-Step Calculation User: What is (5 plus 3) then multiplied by 2? Tool Call: # First operation ```json { "tool_name": "add", "parameters": { "a": 5, "b": 3 } } ``` Tool Output: # Result of the first operation ```# 8 ``` # Given the previous output '8', the next operation is 8 * 2. Tool Call: # Second operation, using previous output ```json { "tool_name": "multiply", "parameters": { "a": 8, # Output from the 'add' tool "b": 2 } } ``` Tool Output: # Result of the second operation ``` 16 ``` Final Answer: 16 # Only the final numerical result --- Example 3: Information Retrieval User: Tell me about the Eiffel tower # Assistant's intended action: Call wikipedia_search # Assistant's response if it were to put JSON in content (which your code handles): ```json { "tool_name": "wikipedia_search", "parameters": { "query": "Eiffel Tower" } } ``` # Correct Final Answer (after tool execution and if assistant is responding correctly): Title: Eiffel Tower The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris, France. --- Title: Eiffel Tower in popular culture The Eiffel Tower has been the subject of numerous references in popular culture. --- """) # ------ Define decision node ------ def decision_node(state: MessagesState) -> dict: """Decides which tool to use based on the task or parses content if needed.""" logger.info("Deciding which tool to use...") response: AIMessage = llm_with_tools.invoke([sys_msg] + state["messages"]) logger.info(f"LLM raw response object: {response!r}") logger.info(f"LLM response content: '{response.content}'") parsed_tool_calls: List[Dict[str, Any]] = [] if response.tool_calls: logger.info(f"LLM response tool_calls (native): {response.tool_calls}") for tc in response.tool_calls: parsed_tool_calls.append({"name": tc["name"], "args": tc["args"], "id": tc.get("id")}) # Only attempt if native tool_calls is empty AND content seems like a JSON block (Gemini workaround) elif not response.tool_calls and isinstance(response.content, str) and \ response.content.strip().startswith("```json") and response.content.strip().endswith("```"): logger.warning("Native tool_calls empty, attempting to parse JSON from content for Gemini-like behavior.") content_str = response.content.strip() # Strip the ```json and ``` markers json_str = content_str[len("```json"): -len("```")].strip() try: # The JSON might be a single tool call object or a list of them parsed_json_content = json.loads(json_str) # Standardize to a list of tool call like dicts tool_requests_from_content = [] if isinstance(parsed_json_content, dict): # Single tool call tool_requests_from_content.append(parsed_json_content) elif isinstance(parsed_json_content, list): # List of tool calls (less common for this workaround) tool_requests_from_content.extend(parsed_json_content) for tool_data in tool_requests_from_content: if "tool_name" in tool_data and "parameters" in tool_data: # Convert to the format Langchain expects for AIMessage.tool_calls tool_call_id = tool_data.get("id", f"call_json_content_{os.urandom(4).hex()}") parsed_tool_calls.append({ "name": tool_data["tool_name"], "args": tool_data["parameters"], # Langchain expects 'args' "id": tool_call_id }) logger.info(f"Successfully parsed tool call from content: {tool_data['tool_name']}") else: logger.warning(f"Parsed JSON from content does not match expected tool call structure: {tool_data}") if parsed_tool_calls: # We create a new AIMessage with the parsed tool_calls. response = AIMessage( content="", # Clear content as it's now processed as a tool call tool_calls=parsed_tool_calls, id=response.id, response_metadata=response.response_metadata, usage_metadata=response.usage_metadata, ) logger.info(f"Modified AIMessage with tool_calls parsed from content: {response.tool_calls}") else: logger.warning("Attempted to parse JSON from content, but no valid tool calls found.") except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON from content: {e}. Content was: '{json_str}'") except Exception as e: logger.error(f"Unexpected error parsing tool call from content: {e}") else: # No native tool_calls and content doesn't look like our target JSON block logger.info(f"LLM response additional_kwargs: {response.additional_kwargs if hasattr(response, 'additional_kwargs') else 'N/A'}") # Logging tool names based on successfully parsed tool_calls (either native or from content) tool_names_to_log = [tc["name"] for tc in response.tool_calls] if response.tool_calls else [] if tool_names_to_log: logger.info(f"Used tool(s) after parsing/checking: {', '.join(tool_names_to_log)}") else: logger.info("No tool used in this step (after parsing/checking).") return {"messages": state["messages"] + [response]} graph = StateGraph(MessagesState) graph.add_node("decision_node", decision_node) graph.add_node("tools", ToolNode(tools)) graph.add_edge(START, "decision_node") graph.add_conditional_edges("decision_node", tools_condition) graph.add_edge("tools", "decision_node") return graph.compile()