Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from llama_index.core.tools.types import ToolMetadata | |
| from llama_index.core.schema import Document | |
| from llama_index.core.tools import FunctionTool | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from llama_index.core.agent.workflow import AgentWorkflow | |
| hf_token = os.getenv("HF_TOKEN") | |
| # List of models to try in order | |
| model_list = [ | |
| "TinyLlama/TinyLlama-1.1B-Chat-v1.0", | |
| "microsoft/phi-3-mini-128k-instruct", | |
| "google/gemma-2b-it", | |
| "gpt2" | |
| ] | |
| current_model_index = 0 | |
| llm = HuggingFaceInferenceAPI( | |
| model_name=model_list[current_model_index], | |
| token=hf_token, | |
| ) | |
| # Numerical operation functions | |
| def multiply(a: int, b: int) -> int: | |
| """Multiply two numbers.""" | |
| return a * b | |
| def add(a: int, b: int) -> int: | |
| """Add two numbers.""" | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| """Subtract two numbers.""" | |
| return a - b | |
| def divide(a: int, b: int) -> float: | |
| """Divide two numbers, raises error on zero divisor.""" | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """Get the modulus of two numbers.""" | |
| return a % b | |
| # Web search tool function | |
| def web_search(query: str) -> list: | |
| """Search Tavily for a query and return up to 3 results.""" | |
| results = TavilySearchResults(max_results=3).invoke(query=query) | |
| docs = [] | |
| for r in results: | |
| meta = {"source": r.metadata.get("source", ""), "page": r.metadata.get("page", "")} | |
| docs.append(Document(text=r.page_content, metadata=meta)) | |
| return docs | |
| # Wikipedia search tool function | |
| def wiki_search(query: str) -> list: | |
| """Search Wikipedia for a query and return up to 2 results.""" | |
| results = WikipediaLoader(query=query, load_max_docs=2).load() | |
| docs = [] | |
| for r in results: | |
| meta = {"source": r.metadata.get("source", ""), "page": r.metadata.get("page", "")} | |
| docs.append(Document(text=r.page_content, metadata=meta)) | |
| return docs | |
| # Wrap functions into FunctionTool instances | |
| web_search_tool = FunctionTool( | |
| web_search, | |
| metadata=ToolMetadata(name="web_search", description="Tavily 3-hit search") | |
| ) | |
| wiki_search_tool = FunctionTool( | |
| wiki_search, | |
| metadata=ToolMetadata(name="wiki_search", description="Wikipedia 2-hit search") | |
| ) | |
| multiply_tool = FunctionTool(multiply, metadata=ToolMetadata(name="multiply", description="Multiply two numbers.")) | |
| add_tool = FunctionTool(add, metadata=ToolMetadata(name="add", description="Add two numbers.")) | |
| subtract_tool = FunctionTool(subtract, metadata=ToolMetadata(name="subtract", description="Subtract two numbers.")) | |
| divide_tool = FunctionTool(divide, metadata=ToolMetadata(name="divide", description="Divide two numbers.")) | |
| modulus_tool = FunctionTool(modulus, metadata=ToolMetadata(name="modulus", description="Modulus operation on two numbers.")) | |
| # Aggregate all tools | |
| tools = [ | |
| web_search_tool, | |
| wiki_search_tool, | |
| multiply_tool, | |
| add_tool, | |
| subtract_tool, | |
| divide_tool, | |
| modulus_tool, | |
| ] | |
| # Initialize agent | |
| agent = AgentWorkflow.from_tools_or_functions(tools, llm=llm) | |
| # Function to try the next model in the list | |
| def try_next_model(): | |
| """Switch to the next model in the list and reinitialize the agent. | |
| Returns True if successful, False if we've tried all models.""" | |
| global current_model_index, llm, agent | |
| current_model_index += 1 | |
| if current_model_index >= len(model_list): | |
| return False | |
| # Reinitialize LLM with new model | |
| llm = HuggingFaceInferenceAPI( | |
| model_name=model_list[current_model_index], | |
| token=hf_token, | |
| ) | |
| # Reinitialize agent with new LLM | |
| agent = AgentWorkflow.from_tools_or_functions(tools, llm=llm) | |
| return True | |
| # Run with fallback logic | |
| def run_with_fallback(query: str): | |
| global current_model_index, llm, agent | |
| # Reset to first model if we're not already on it | |
| if current_model_index != 0: | |
| current_model_index = 0 | |
| llm = HuggingFaceInferenceAPI( | |
| model_name=model_list[current_model_index], | |
| token=hf_token, | |
| ) | |
| agent = AgentWorkflow.from_tools_or_functions(tools, llm=llm) | |
| # Try each model in sequence | |
| for i in range(len(model_list)): | |
| try: | |
| result = agent.run(query) | |
| print(f"Successfully ran query with model: {model_list[current_model_index]}") | |
| return result | |
| except Exception as e: | |
| print(f"Error with model {model_list[current_model_index]}: {e}") | |
| if i < len(model_list) - 1: # If not the last model | |
| try_next_model() | |
| else: | |
| break | |
| return "Sorry, encountered issues with all models." | |
| # Make agent.run() work with asyncio by adding async support | |
| async def run(query: str): | |
| """Async wrapper for the agent.run method to be compatible with app.py""" | |
| return run_with_fallback(query) | |
| # Add the async run method to the agent object | |
| agent.run = run_with_fallback # Replace with synchronous version for direct calls |