Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import requests | |
| import pandas as pd | |
| from pathlib import Path | |
| from typing import Optional | |
| from dotenv import load_dotenv | |
| from langgraph.prebuilt import create_react_agent | |
| from langchain_core.messages import HumanMessage | |
| from langchain_core.tools import tool | |
| from langchain_openai import ChatOpenAI | |
| import inspect | |
| load_dotenv() | |
| class OpenRouterLLM(ChatOpenAI): | |
| """Custom OpenRouter LLM wrapper for LangGraph""" | |
| def __init__(self, model: str = "deepseek/deepseek-v3.1-terminus", **kwargs): | |
| api_key = os.getenv("OPENROUTER_API_KEY") or os.getenv("my_key") | |
| super().__init__( | |
| model=model, | |
| openai_api_key=api_key, | |
| openai_api_base="https://openrouter.ai/api/v1", | |
| **kwargs | |
| ) | |
| # ------------------ TOOLS ------------------ | |
| def search_web(query: str) -> str: | |
| """Search the web using DuckDuckGo for current information.""" | |
| try: | |
| search_url = f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1&skip_disambig=1" | |
| response = requests.get(search_url, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| results = [] | |
| if data.get("AbstractText"): | |
| results.append(f"Abstract: {data['AbstractText']}") | |
| if data.get("RelatedTopics"): | |
| for topic in data["RelatedTopics"][:3]: | |
| if isinstance(topic, dict) and topic.get("Text"): | |
| results.append(f"Related: {topic['Text']}") | |
| return "\n".join(results) if results else f"No results for '{query}'." | |
| return f"Search failed with status code {response.status_code}" | |
| except Exception as e: | |
| return f"Search error: {str(e)}" | |
| def search_wikipedia(query: str) -> str: | |
| """Search Wikipedia for factual information.""" | |
| try: | |
| search_url = "https://en.wikipedia.org/api/rest_v1/page/summary/" + query.replace(" ", "_") | |
| response = requests.get(search_url, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| extract = data.get("extract", "") | |
| return f"Wikipedia: {extract[:500]}..." if extract else f"No extract for '{query}'." | |
| return f"Wikipedia search failed for '{query}'" | |
| except Exception as e: | |
| return f"Wikipedia search error: {str(e)}" | |
| def execute_python(code: str) -> str: | |
| """Execute Python code and return the result.""" | |
| try: | |
| safe_globals = { | |
| '__builtins__': { | |
| 'print': print, 'len': len, 'str': str, 'int': int, 'float': float, | |
| 'bool': bool, 'list': list, 'dict': dict, 'tuple': tuple, 'set': set, | |
| 'range': range, 'sum': sum, 'max': max, 'min': min, 'abs': abs, | |
| 'round': round, 'sorted': sorted, 'enumerate': enumerate, 'zip': zip, | |
| }, | |
| 'math': __import__('math'), | |
| 'json': __import__('json'), | |
| 'datetime': __import__('datetime'), | |
| 'random': __import__('random'), | |
| } | |
| import io, sys | |
| old_stdout = sys.stdout | |
| sys.stdout = mystdout = io.StringIO() | |
| try: | |
| exec(code, safe_globals) | |
| output = mystdout.getvalue() | |
| finally: | |
| sys.stdout = old_stdout | |
| return output if output else "Code executed successfully (no output)" | |
| except Exception as e: | |
| return f"Python execution error: {str(e)}" | |
| def read_excel_file(file_path: str, sheet_name: Optional[str] = None) -> str: | |
| """Read an Excel file and return its contents as a formatted string.""" | |
| try: | |
| file_path_obj = Path(file_path) | |
| if not file_path_obj.exists(): | |
| return f"Error: File not found at {file_path}" | |
| if sheet_name and sheet_name.isdigit(): | |
| sheet_name = int(sheet_name) | |
| elif sheet_name is None: | |
| sheet_name = 0 | |
| df = pd.read_excel(file_path, sheet_name=sheet_name) | |
| if len(df) > 20: | |
| result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n" | |
| result += "First 10 rows:\n" + df.head(10).to_string(index=False) | |
| result += f"\n\n... ({len(df) - 20} rows omitted) ...\n\n" | |
| result += "Last 10 rows:\n" + df.tail(10).to_string(index=False) | |
| else: | |
| result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n" | |
| result += df.to_string(index=False) | |
| return result | |
| except Exception as e: | |
| return f"Error reading Excel file: {str(e)}" | |
| def read_text_file(file_path: str) -> str: | |
| """Read a text file and return its contents.""" | |
| try: | |
| file_path_obj = Path(file_path) | |
| if not file_path_obj.exists(): | |
| return f"Error: File not found at {file_path}" | |
| encodings = ['utf-8', 'utf-16', 'iso-8859-1', 'cp1252'] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path_obj, 'r', encoding=encoding) as f: | |
| return f"File content ({encoding} encoding):\n\n{f.read()}" | |
| except UnicodeDecodeError: | |
| continue | |
| return "Error: Could not decode file with any standard encoding" | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| # ------------------ GAIA AGENT ------------------ | |
| class GaiaAgent: | |
| """LangGraph-based agent for GAIA tasks using OpenRouter DeepSeek""" | |
| def __init__(self): | |
| print("Initializing GaiaAgent with LangGraph and OpenRouter DeepSeek...") | |
| self.llm = OpenRouterLLM( | |
| model="deepseek/deepseek-v3.1-terminus", | |
| temperature=0.1, | |
| max_tokens=2000 | |
| ) | |
| self.tools = [search_web, search_wikipedia, execute_python, read_excel_file, read_text_file] | |
| prompt_modifier = self._get_system_prompt() | |
| # Detect correct kwarg for your LangGraph version | |
| sig = inspect.signature(create_react_agent) | |
| accepted = sig.parameters.keys() | |
| kwargs = {} | |
| if "messages_modifier" in accepted: | |
| kwargs["messages_modifier"] = prompt_modifier | |
| elif "state_modifier" in accepted: | |
| kwargs["state_modifier"] = prompt_modifier | |
| elif "prompt" in accepted: | |
| kwargs["prompt"] = prompt_modifier | |
| self.agent = create_react_agent(self.llm, self.tools, **kwargs) | |
| print("GaiaAgent initialized successfully!") | |
| def _get_system_prompt(self) -> str: | |
| return """You are an advanced AI agent designed to answer complex questions... | |
| (keep your original system prompt here)""" | |
| def __call__(self, task_id: str, question: str) -> str: | |
| try: | |
| print(f"Processing task {task_id}: {question[:100]}...") | |
| messages = [HumanMessage(content=question)] | |
| result = self.agent.invoke({"messages": messages}) | |
| final_message = result["messages"][-1] | |
| answer = final_message.content | |
| return self._clean_answer(answer) | |
| except Exception as e: | |
| return f"Agent error: {e}" | |
| def _clean_answer(self, answer: str) -> str: | |
| # same cleaning code as before | |
| answer = answer.strip() | |
| if "final answer:" in answer.lower(): | |
| parts = re.split(r'final answer:', answer, flags=re.IGNORECASE) | |
| if len(parts) > 1: | |
| answer = parts[-1].strip() | |
| prefixes = ["The answer is", "Answer:", "Result:", "Solution:", | |
| "Based on", "Therefore", "In conclusion", "So the answer is"] | |
| for prefix in prefixes: | |
| if answer.lower().startswith(prefix.lower()): | |
| answer = answer[len(prefix):].strip() | |
| if answer.startswith(':'): | |
| answer = answer[1:].strip() | |
| break | |
| if len(answer.split()) <= 3: | |
| answer = answer.strip('"\'.') | |
| return answer | |
| # ------------------ ENTRYPOINT ------------------ | |
| import gradio as gr | |
| agent = GaiaAgent() | |
| def run_agent(prompt: str) -> str: | |
| return agent("gaia_task", prompt) | |
| demo = gr.Interface(fn=run_agent, inputs="text", outputs="text", title="GAIA Agent") | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |