new_assignment / app.py
Mehedi2's picture
Update app.py
a26e4d2 verified
import os
import re
import json
import requests
import pandas as pd
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
import inspect
load_dotenv()
class OpenRouterLLM(ChatOpenAI):
"""Custom OpenRouter LLM wrapper for LangGraph"""
def __init__(self, model: str = "deepseek/deepseek-v3.1-terminus", **kwargs):
api_key = os.getenv("OPENROUTER_API_KEY") or os.getenv("my_key")
super().__init__(
model=model,
openai_api_key=api_key,
openai_api_base="https://openrouter.ai/api/v1",
**kwargs
)
# ------------------ TOOLS ------------------
@tool
def search_web(query: str) -> str:
"""Search the web using DuckDuckGo for current information."""
try:
search_url = f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1&skip_disambig=1"
response = requests.get(search_url, timeout=10)
if response.status_code == 200:
data = response.json()
results = []
if data.get("AbstractText"):
results.append(f"Abstract: {data['AbstractText']}")
if data.get("RelatedTopics"):
for topic in data["RelatedTopics"][:3]:
if isinstance(topic, dict) and topic.get("Text"):
results.append(f"Related: {topic['Text']}")
return "\n".join(results) if results else f"No results for '{query}'."
return f"Search failed with status code {response.status_code}"
except Exception as e:
return f"Search error: {str(e)}"
@tool
def search_wikipedia(query: str) -> str:
"""Search Wikipedia for factual information."""
try:
search_url = "https://en.wikipedia.org/api/rest_v1/page/summary/" + query.replace(" ", "_")
response = requests.get(search_url, timeout=10)
if response.status_code == 200:
data = response.json()
extract = data.get("extract", "")
return f"Wikipedia: {extract[:500]}..." if extract else f"No extract for '{query}'."
return f"Wikipedia search failed for '{query}'"
except Exception as e:
return f"Wikipedia search error: {str(e)}"
@tool
def execute_python(code: str) -> str:
"""Execute Python code and return the result."""
try:
safe_globals = {
'__builtins__': {
'print': print, 'len': len, 'str': str, 'int': int, 'float': float,
'bool': bool, 'list': list, 'dict': dict, 'tuple': tuple, 'set': set,
'range': range, 'sum': sum, 'max': max, 'min': min, 'abs': abs,
'round': round, 'sorted': sorted, 'enumerate': enumerate, 'zip': zip,
},
'math': __import__('math'),
'json': __import__('json'),
'datetime': __import__('datetime'),
'random': __import__('random'),
}
import io, sys
old_stdout = sys.stdout
sys.stdout = mystdout = io.StringIO()
try:
exec(code, safe_globals)
output = mystdout.getvalue()
finally:
sys.stdout = old_stdout
return output if output else "Code executed successfully (no output)"
except Exception as e:
return f"Python execution error: {str(e)}"
@tool
def read_excel_file(file_path: str, sheet_name: Optional[str] = None) -> str:
"""Read an Excel file and return its contents as a formatted string."""
try:
file_path_obj = Path(file_path)
if not file_path_obj.exists():
return f"Error: File not found at {file_path}"
if sheet_name and sheet_name.isdigit():
sheet_name = int(sheet_name)
elif sheet_name is None:
sheet_name = 0
df = pd.read_excel(file_path, sheet_name=sheet_name)
if len(df) > 20:
result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n"
result += "First 10 rows:\n" + df.head(10).to_string(index=False)
result += f"\n\n... ({len(df) - 20} rows omitted) ...\n\n"
result += "Last 10 rows:\n" + df.tail(10).to_string(index=False)
else:
result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n"
result += df.to_string(index=False)
return result
except Exception as e:
return f"Error reading Excel file: {str(e)}"
@tool
def read_text_file(file_path: str) -> str:
"""Read a text file and return its contents."""
try:
file_path_obj = Path(file_path)
if not file_path_obj.exists():
return f"Error: File not found at {file_path}"
encodings = ['utf-8', 'utf-16', 'iso-8859-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path_obj, 'r', encoding=encoding) as f:
return f"File content ({encoding} encoding):\n\n{f.read()}"
except UnicodeDecodeError:
continue
return "Error: Could not decode file with any standard encoding"
except Exception as e:
return f"Error reading file: {str(e)}"
# ------------------ GAIA AGENT ------------------
class GaiaAgent:
"""LangGraph-based agent for GAIA tasks using OpenRouter DeepSeek"""
def __init__(self):
print("Initializing GaiaAgent with LangGraph and OpenRouter DeepSeek...")
self.llm = OpenRouterLLM(
model="deepseek/deepseek-v3.1-terminus",
temperature=0.1,
max_tokens=2000
)
self.tools = [search_web, search_wikipedia, execute_python, read_excel_file, read_text_file]
prompt_modifier = self._get_system_prompt()
# Detect correct kwarg for your LangGraph version
sig = inspect.signature(create_react_agent)
accepted = sig.parameters.keys()
kwargs = {}
if "messages_modifier" in accepted:
kwargs["messages_modifier"] = prompt_modifier
elif "state_modifier" in accepted:
kwargs["state_modifier"] = prompt_modifier
elif "prompt" in accepted:
kwargs["prompt"] = prompt_modifier
self.agent = create_react_agent(self.llm, self.tools, **kwargs)
print("GaiaAgent initialized successfully!")
def _get_system_prompt(self) -> str:
return """You are an advanced AI agent designed to answer complex questions...
(keep your original system prompt here)"""
def __call__(self, task_id: str, question: str) -> str:
try:
print(f"Processing task {task_id}: {question[:100]}...")
messages = [HumanMessage(content=question)]
result = self.agent.invoke({"messages": messages})
final_message = result["messages"][-1]
answer = final_message.content
return self._clean_answer(answer)
except Exception as e:
return f"Agent error: {e}"
def _clean_answer(self, answer: str) -> str:
# same cleaning code as before
answer = answer.strip()
if "final answer:" in answer.lower():
parts = re.split(r'final answer:', answer, flags=re.IGNORECASE)
if len(parts) > 1:
answer = parts[-1].strip()
prefixes = ["The answer is", "Answer:", "Result:", "Solution:",
"Based on", "Therefore", "In conclusion", "So the answer is"]
for prefix in prefixes:
if answer.lower().startswith(prefix.lower()):
answer = answer[len(prefix):].strip()
if answer.startswith(':'):
answer = answer[1:].strip()
break
if len(answer.split()) <= 3:
answer = answer.strip('"\'.')
return answer
# ------------------ ENTRYPOINT ------------------
import gradio as gr
agent = GaiaAgent()
def run_agent(prompt: str) -> str:
return agent("gaia_task", prompt)
demo = gr.Interface(fn=run_agent, inputs="text", outputs="text", title="GAIA Agent")
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)