Spaces:
Build error
Build error
| import os | |
| import time | |
| import json | |
| import arxiv | |
| import logging | |
| from dotenv import load_dotenv | |
| from duckduckgo_search import DDGS | |
| from brave import Brave | |
| from langchain_groq import ChatGroq | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_mistralai.chat_models import ChatMistralAI | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import SystemMessage, AIMessage | |
| from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
| from langgraph.graph import StateGraph, MessagesState, START | |
| from langgraph.prebuilt import tools_condition, ToolNode | |
| load_dotenv() | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| datefmt="%H:%M:%S" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ------ ARITHMATIC TOOLS ------ | |
| def add(a: int, b: int) -> int: | |
| """Add two integers.""" | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| """Subtract two integers.""" | |
| return a - b | |
| def divide(a: int, b: int) -> int: | |
| """Divide two integers.""" | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def multiply(a: int, b: int) -> int: | |
| """Multiply two integers.""" | |
| return a * b | |
| def modulus(a: int, b: int) -> int: | |
| """Modulus two integers.""" | |
| return a % b | |
| # --- SEARCH TOOLS --- | |
| def wikipedia_search(query: str) -> str: | |
| """Search Wikipedia and return summaries of the top 3 articles.""" | |
| try: | |
| docs = WikipediaLoader(query=query, load_max_docs=3).load() | |
| if not docs: | |
| return "<Document>Error: No Wikipedia results found.</Document>" | |
| formatted = "\n\n---\n\n".join( | |
| f"<Document source='{doc.metadata.get('source', 'unknown')}'>\n{doc.page_content.strip()}\n</Document>" | |
| for doc in docs | |
| ) | |
| return formatted | |
| except Exception as e: | |
| logger.error(f"Wikipedia tool failed: {e}") | |
| return f"<Document>Error: Wikipedia search failed — {e}</Document>" | |
| def arxiv_search(query: str) -> str: | |
| """Search Arxiv and return summaries of the top 3 results.""" | |
| try: | |
| client = arxiv.Client() | |
| search = arxiv.Search( | |
| query=query, | |
| max_results=3, | |
| sort_by=arxiv.SortCriterion.Relevance | |
| ) | |
| results = list(client.results(search)) | |
| if not results: | |
| return "<Document>Error: No arXiv results found.</Document>" | |
| formatted = "\n\n---\n\n".join( | |
| f"<Document source='{result.entry_id}'>\nTitle: {result.title.strip()}\nSummary: {result.summary.strip()}\n</Document>" | |
| for result in results | |
| ) | |
| return formatted | |
| except Exception as e: | |
| logger.error(f"Arxiv tool failed: {e}") | |
| return f"<Document>Error: arXiv search failed — {e}</Document>" | |
| # --- WEB SEARCH TOOLS --- | |
| def _perform_brave_search_attempt(query: str, api_key: str) -> str | None: | |
| """ | |
| Performs one Brave Search (raw) attempt. | |
| Returns: | |
| - formatted results string on success, | |
| - critical-error string if library missing, | |
| - None on retryable failure or no results. | |
| """ | |
| local_logger = logging.getLogger(f"{__name__}._perform_brave_search_attempt") | |
| try: | |
| client = Brave(api_key=api_key) | |
| raw = client.search(q=query, count=3, raw=True) or {} | |
| # Extract hits from raw JSON | |
| hits = ( | |
| raw.get("web", {}).get("results") | |
| or raw.get("results") | |
| or [] | |
| ) | |
| # Build <Document> entries | |
| docs = [] | |
| for h in hits: | |
| if not isinstance(h, dict): | |
| continue | |
| url = h.get("url") | |
| if not url: | |
| continue | |
| title = h.get("title", "No Title") | |
| snippet = h.get("description", h.get("snippet", "")) | |
| docs.append( | |
| f"<Document source='{url}'>\n" | |
| f"Title: {title}\n" | |
| f"{snippet}\n" | |
| f"</Document>" | |
| ) | |
| if docs: | |
| return "\n\n---\n\n".join(docs) | |
| local_logger.warning("Brave Search returned no usable results.") | |
| return None | |
| except ImportError: | |
| local_logger.error("Brave Search library not installed.") | |
| return "<Document>Error: Brave Search library not installed. Cannot perform search.</Document>" | |
| except Exception as e: | |
| local_logger.exception("Brave Search attempt failed") | |
| return None # Retryable | |
| def _perform_duckduckgo_search(query: str) -> str: | |
| """ | |
| Performs up to 2 DDG searches as fallback. | |
| Returns formatted results or an error <Document>. | |
| """ | |
| local_logger = logging.getLogger(f"{__name__}._perform_duckduckgo_search") | |
| headers = {"Accept-Encoding": "gzip, deflate, br"} | |
| try: | |
| with DDGS(headers=headers, timeout=30) as ddgs: | |
| for attempt in range(1, 3): | |
| local_logger.info(f"DDG attempt {attempt} for '{query}'") | |
| try: | |
| results = ddgs.text(query, max_results=3) or [] | |
| except Exception as e: | |
| local_logger.warning(f"DDG error on attempt {attempt}: {e}") | |
| results = [] | |
| if results: | |
| docs = [] | |
| for r in results: | |
| href = r.get("href") | |
| if not href: | |
| continue | |
| title = r.get("title", "No Title") | |
| body = r.get("body", "") | |
| docs.append( | |
| f"<Document source='{href}'>\n" | |
| f"Title: {title}\n" | |
| f"{body}\n" | |
| f"</Document>" | |
| ) | |
| if docs: | |
| return "\n\n---\n\n".join(docs) | |
| local_logger.warning("DDG returned no docs with URLs.") | |
| # back-off before next try | |
| time.sleep(7 * attempt) | |
| return "<Document>Error: All web search attempts (including fallback) failed to find results.</Document>" | |
| except Exception as e: | |
| local_logger.exception("DuckDuckGo fallback search failed catastrophically") | |
| return f"<Document>Error: DuckDuckGo fallback search failed — {type(e).__name__}: {e}</Document>" | |
| def web_search(query: str) -> str: | |
| """ | |
| Searches the web and returns top-3 search results formatted as <Document> blocks. | |
| """ | |
| main_logger = logging.getLogger(__name__ + ".web_search") | |
| api_key = os.getenv("BRAVE_API_KEY") | |
| if api_key: | |
| main_logger.info(f"Trying Brave Search for '{query}'") | |
| for attempt in range(1, 3): | |
| res = _perform_brave_search_attempt(query, api_key) | |
| if res: | |
| if res.startswith("<Document>Error: Brave Search library not installed"): | |
| main_logger.error("Brave library missing; skipping to fallback.") | |
| break | |
| main_logger.info("Brave Search succeeded.") | |
| return res | |
| main_logger.warning(f"Brave attempt {attempt} failed; retrying...") | |
| time.sleep(3) | |
| main_logger.warning("Brave Search failed; falling back.") | |
| else: | |
| main_logger.warning("No BRAVE_API_KEY; skipping Brave Search.") | |
| main_logger.info(f"Using DuckDuckGo fallback for '{query}'") | |
| return _perform_duckduckgo_search(query) | |
| tools = [ | |
| add, | |
| subtract, | |
| divide, | |
| multiply, | |
| modulus, | |
| wikipedia_search, | |
| arxiv_search, | |
| web_search, | |
| ] | |
| # ------ Build the graph ------ | |
| def build_graph(test_mode=False, system_prompt=None, model_backend="gemini"): | |
| """Builds the state graph for the agent with fallback model support.""" | |
| # ------ Model Selector ------ | |
| if model_backend == "groq": | |
| model_name = "llama3-70b-8192" if test_mode else "qwen-qwq-32b" | |
| llm = ChatGroq(model=model_name, temperature=0.1, max_tokens=8192) | |
| elif model_backend == "mistral": | |
| model_name = "mistral-small-latest" | |
| llm = ChatMistralAI(model=model_name, temperature=0.1, max_tokens=8192) | |
| elif model_backend == "gemini": | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0.1) # gemini-2.5-flash-preview-04-17 # gemini-2.0-flash | |
| else: | |
| raise ValueError(f"Unsupported model backend: {model_backend}") | |
| llm_with_tools = llm.bind_tools(tools) | |
| sys_msg = SystemMessage(content=system_prompt or """ | |
| You are a precise and helpful agent that uses tools. | |
| Key Instructions: | |
| 1. Use tools for calculations and information retrieval (web, Wikipedia, ArXiv). Do NOT answer from internal knowledge for these tasks. | |
| 2. Use ONLY the specified tool parameters (e.g., `a`, `b` for arithmetic; `query` for search). | |
| 3. If a search tool (`wikipedia_search`, `arxiv_search`, `web_search`) returns `<Document>...</Document>` content (including errors in this format), your FINAL response to the user MUST BE the verbatim content from the tool, including all `<Document>` tags and their full content. | |
| 4. For multi-step calculations, call tools sequentially. Use the output of one tool call as input for the next. Your FINAL answer for calculations MUST BE ONLY THE NUMERICAL RESULT after all steps are complete. | |
| Examples: | |
| --- | |
| Example 1: Single Step Calculation | |
| User: What is 15 divided by 3? | |
| Tool Call: | |
| ```json | |
| { | |
| "tool_name": "divide", | |
| "parameters": { | |
| "a": 15, | |
| "b": 3 | |
| } | |
| } | |
| ``` | |
| Tool Output: | |
| ``` | |
| 5 | |
| ``` | |
| Final Answer: 15 divided by 3 is 5 | |
| --- | |
| Example 2: Multi-Step Calculation | |
| User: What is (5 plus 3) then multiplied by 2? | |
| Tool Call: # First operation | |
| ```json | |
| { | |
| "tool_name": "add", | |
| "parameters": { | |
| "a": 5, | |
| "b": 3 | |
| } | |
| } | |
| ``` | |
| Tool Output: # Result of the first operation | |
| ```# 8 | |
| ``` | |
| # Given the previous output '8', the next operation is 8 * 2. | |
| Tool Call: # Second operation, using previous output | |
| ```json | |
| { | |
| "tool_name": "multiply", | |
| "parameters": { | |
| "a": 8, # Output from the 'add' tool | |
| "b": 2 | |
| } | |
| } | |
| ``` | |
| Tool Output: # Result of the second operation | |
| ``` | |
| 16 | |
| ``` | |
| Final Answer: 16 # Only the final numerical result | |
| --- | |
| Example 3: Information Retrieval | |
| User: Tell me about the Eiffel tower | |
| # Assistant's intended action: Call wikipedia_search | |
| # Assistant's response if it were to put JSON in content (which your code handles): | |
| ```json | |
| { | |
| "tool_name": "wikipedia_search", | |
| "parameters": { | |
| "query": "Eiffel Tower" | |
| } | |
| } | |
| ``` | |
| # Correct Final Answer (after tool execution and if assistant is responding correctly): | |
| <Document source='https://en.wikipedia.org/wiki/Eiffel_Tower'> | |
| Title: Eiffel Tower | |
| The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris, France. | |
| </Document> | |
| --- | |
| <Document source='https://en.wikipedia.org/wiki/Eiffel_Tower_in_popular_culture'> | |
| Title: Eiffel Tower in popular culture | |
| The Eiffel Tower has been the subject of numerous references in popular culture. | |
| </Document> | |
| --- | |
| """) | |
| # ------ Define decision node ------ | |
| def decision_node(state: MessagesState) -> dict: | |
| """Decides which tool to use based on the task or parses content if needed.""" | |
| logger.info("Deciding which tool to use...") | |
| response: AIMessage = llm_with_tools.invoke([sys_msg] + state["messages"]) | |
| logger.info(f"LLM raw response object: {response!r}") | |
| logger.info(f"LLM response content: '{response.content}'") | |
| parsed_tool_calls: List[Dict[str, Any]] = [] | |
| if response.tool_calls: | |
| logger.info(f"LLM response tool_calls (native): {response.tool_calls}") | |
| for tc in response.tool_calls: | |
| parsed_tool_calls.append({"name": tc["name"], "args": tc["args"], "id": tc.get("id")}) | |
| # Only attempt if native tool_calls is empty AND content seems like a JSON block (Gemini workaround) | |
| elif not response.tool_calls and isinstance(response.content, str) and \ | |
| response.content.strip().startswith("```json") and response.content.strip().endswith("```"): | |
| logger.warning("Native tool_calls empty, attempting to parse JSON from content for Gemini-like behavior.") | |
| content_str = response.content.strip() | |
| # Strip the ```json and ``` markers | |
| json_str = content_str[len("```json"): -len("```")].strip() | |
| try: | |
| # The JSON might be a single tool call object or a list of them | |
| parsed_json_content = json.loads(json_str) | |
| # Standardize to a list of tool call like dicts | |
| tool_requests_from_content = [] | |
| if isinstance(parsed_json_content, dict): # Single tool call | |
| tool_requests_from_content.append(parsed_json_content) | |
| elif isinstance(parsed_json_content, list): # List of tool calls (less common for this workaround) | |
| tool_requests_from_content.extend(parsed_json_content) | |
| for tool_data in tool_requests_from_content: | |
| if "tool_name" in tool_data and "parameters" in tool_data: | |
| # Convert to the format Langchain expects for AIMessage.tool_calls | |
| tool_call_id = tool_data.get("id", f"call_json_content_{os.urandom(4).hex()}") | |
| parsed_tool_calls.append({ | |
| "name": tool_data["tool_name"], | |
| "args": tool_data["parameters"], # Langchain expects 'args' | |
| "id": tool_call_id | |
| }) | |
| logger.info(f"Successfully parsed tool call from content: {tool_data['tool_name']}") | |
| else: | |
| logger.warning(f"Parsed JSON from content does not match expected tool call structure: {tool_data}") | |
| if parsed_tool_calls: | |
| # We create a new AIMessage with the parsed tool_calls. | |
| response = AIMessage( | |
| content="", # Clear content as it's now processed as a tool call | |
| tool_calls=parsed_tool_calls, | |
| id=response.id, | |
| response_metadata=response.response_metadata, | |
| usage_metadata=response.usage_metadata, | |
| ) | |
| logger.info(f"Modified AIMessage with tool_calls parsed from content: {response.tool_calls}") | |
| else: | |
| logger.warning("Attempted to parse JSON from content, but no valid tool calls found.") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to decode JSON from content: {e}. Content was: '{json_str}'") | |
| except Exception as e: | |
| logger.error(f"Unexpected error parsing tool call from content: {e}") | |
| else: # No native tool_calls and content doesn't look like our target JSON block | |
| logger.info(f"LLM response additional_kwargs: {response.additional_kwargs if hasattr(response, 'additional_kwargs') else 'N/A'}") | |
| # Logging tool names based on successfully parsed tool_calls (either native or from content) | |
| tool_names_to_log = [tc["name"] for tc in response.tool_calls] if response.tool_calls else [] | |
| if tool_names_to_log: | |
| logger.info(f"Used tool(s) after parsing/checking: {', '.join(tool_names_to_log)}") | |
| else: | |
| logger.info("No tool used in this step (after parsing/checking).") | |
| return {"messages": state["messages"] + [response]} | |
| graph = StateGraph(MessagesState) | |
| graph.add_node("decision_node", decision_node) | |
| graph.add_node("tools", ToolNode(tools)) | |
| graph.add_edge(START, "decision_node") | |
| graph.add_conditional_edges("decision_node", tools_condition) | |
| graph.add_edge("tools", "decision_node") | |
| return graph.compile() |