Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import io | |
| import contextlib | |
| import requests | |
| import base64 | |
| import zipfile | |
| import json | |
| from typing import TypedDict, Annotated | |
| from langgraph.graph import StateGraph, START | |
| from langgraph.graph.message import add_messages | |
| from langgraph.prebuilt import ToolNode, tools_condition | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import AnyMessage, HumanMessage, SystemMessage | |
| from langchain_core.tools import tool | |
| from pydantic import BaseModel, Field | |
| from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| SYSTEM_PROMPT = """You are a research agent solving questions from the GAIA benchmark. | |
| WORKFLOW: | |
| 1. Analyze the question carefully before acting. | |
| 2. If the question contains reversed text, reverse it back first using python_executor. | |
| 3. If the question references a file (Excel, CSV, Python, etc.), use read_file to read it. | |
| 4. If the question references an image file, use analyze_image to look at it. | |
| 5. If the question references an audio/mp3 file, use transcribe_audio to get the text. | |
| 6. If the question requires math or logic, use python_executor. | |
| 7. If the question asks about a YouTube video, first try youtube_transcript. If that fails, use web_search. | |
| 8. Use web_search or wikipedia_search for factual questions. | |
| 9. If you find a URL that might have the answer, use fetch_webpage to read it. | |
| RULES: | |
| - NEVER call the same tool with the same query twice. | |
| - If a tool fails, try a DIFFERENT approach. | |
| - For math/logic problems with tables, use python_executor to check ALL pairs systematically. | |
| - For math — ALWAYS use python_executor, never calculate in your head. | |
| - Keep search queries short: 2-5 words. | |
| - NEVER say "I cannot access" or "I'm unable to" — always try tools first, then give your best guess. | |
| - For botany questions: bell peppers, corn, green beans, zucchini, tomatoes, pumpkins are botanical FRUITS, not vegetables. | |
| CRITICAL — ANSWER FORMAT: | |
| Your response must end with exactly: | |
| FINAL ANSWER: [your answer] | |
| The answer must be: | |
| - CONCISE: a number, name, date, or short phrase | |
| - EXACT: no extra words like "The answer is..." | |
| - If a number: just the number | |
| - If a name: just the name | |
| - If a list: comma-separated values | |
| """ | |
| MAX_TOOL_CALLS = 10 | |
| RECURSION_LIMIT = 40 | |
| def web_search(query: str) -> str: | |
| """Search the web for current events, facts, people, etc. | |
| Args: | |
| query: search query string (keep it short and specific) | |
| """ | |
| try: | |
| from langchain_tavily import TavilySearch | |
| search = TavilySearch(max_results=3) | |
| results = search.invoke(query) | |
| # TavilySearch возвращает list of dicts или string | |
| if isinstance(results, list): | |
| formatted = [] | |
| for r in results: | |
| url = r.get("url", "") | |
| content = r.get("content", "") | |
| formatted.append(f"Source: {url}\n{content}") | |
| return "\n\n---\n\n".join(formatted)[:5000] | |
| return str(results)[:5000] | |
| except Exception as e: | |
| return f"Search failed: {e}" | |
| def wikipedia_search(query: str) -> str: | |
| """Search Wikipedia for factual information about people, places, history, science. | |
| Args: | |
| query: topic to search on Wikipedia | |
| """ | |
| try: | |
| from langchain_community.utilities import WikipediaAPIWrapper | |
| wiki = WikipediaAPIWrapper(top_k_results=2, doc_content_chars_max=4000) | |
| return wiki.run(query) | |
| except Exception as e: | |
| return f"Wikipedia search failed: {e}" | |
| def arxiv_search(query: str) -> str: | |
| """Search academic papers on ArXiv for scientific/research questions. | |
| Args: | |
| query: search query for academic papers | |
| """ | |
| try: | |
| from langchain_community.document_loaders import ArxivLoader | |
| docs = ArxivLoader(query=query, load_max_docs=2).load() | |
| results = [] | |
| for doc in docs: | |
| title = doc.metadata.get("Title", "No title") | |
| results.append(f"**{title}**\n{doc.page_content[:1500]}") | |
| return "\n\n---\n\n".join(results) if results else "No results found." | |
| except Exception as e: | |
| return f"ArXiv search failed: {e}" | |
| def fetch_webpage(url: str) -> str: | |
| """Fetch and read content from a URL/webpage. | |
| Args: | |
| url: full URL to fetch | |
| """ | |
| try: | |
| headers = {"User-Agent": "Mozilla/5.0"} | |
| resp = requests.get(url, headers=headers, timeout=15) | |
| resp.raise_for_status() | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| for tag in soup(["script", "style", "nav", "footer", "header"]): | |
| tag.decompose() | |
| text = soup.get_text(separator="\n", strip=True) | |
| return text[:8000] | |
| except Exception as e: | |
| return f"Failed to fetch URL: {e}" | |
| python_state = { | |
| "__builtins__": __builtins__, | |
| "import_module": __import__ | |
| } | |
| def python_executor(code: str) -> str: | |
| """ | |
| Execute Python code with persistent state across calls. | |
| Use print() to see results. All variables are saved for the next call. | |
| """ | |
| # Очистка кода от Markdown-оберток, если модель их добавила | |
| code = re.sub(r'^```python\n|```$', '', code, flags=re.MULTILINE) | |
| output = io.StringIO() | |
| try: | |
| with contextlib.redirect_stdout(output): | |
| # Используем один и тот же словарь python_state | |
| exec(code, python_state) | |
| result = output.getvalue().strip() | |
| if not result: | |
| return "Code executed successfully, but produced no output. Remember to use print()." | |
| return result | |
| except Exception as e: | |
| return f"Python Error: {str(e)}" | |
| def read_file(file_path: str) -> str: | |
| """ | |
| Read content of files: TXT, CSV, JSON, PY, XLSX, PDF, or ZIP. | |
| For ZIP: lists files inside. For PDF: extracts text. | |
| For Tables: returns a summary and first 10 rows. | |
| """ | |
| if not os.path.exists(file_path): | |
| return f"Error: File '{file_path}' not found." | |
| ext = file_path.lower().split('.')[-1] | |
| try: | |
| # 1. Таблицы (Excel, CSV) | |
| if ext in ['xlsx', 'xls', 'csv']: | |
| import pandas as pd | |
| df = pd.read_excel(file_path) if ext.startswith('xls') else pd.read_csv(file_path) | |
| summary = f"Rows: {len(df)}, Columns: {df.columns.tolist()}\n" | |
| return summary + df.head(15).to_string() | |
| # 2. PDF (через PyMuPDF / fitz) | |
| elif ext == 'pdf': | |
| import fitz | |
| doc = fitz.open(file_path) | |
| text = [] | |
| for i, page in enumerate(doc[:10]): # Ограничимся 10 страницами | |
| text.append(f"--- Page {i+1} ---\n{page.get_text()}") | |
| return "\n".join(text)[:15000] | |
| # 3. ZIP-архивы | |
| elif ext == 'zip': | |
| with zipfile.ZipFile(file_path, 'r') as z: | |
| files = z.namelist() | |
| return f"ZIP Archive contains: {files}. Use python_executor to extract if needed." | |
| # 4. JSON | |
| elif ext == 'json': | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return json.dumps(data, indent=2)[:10000] | |
| # 5. Обычный текст | |
| else: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| return f.read(15000) # Читаем первые 15к символов | |
| except Exception as e: | |
| return f"Error processing file {file_path}: {str(e)}" | |
| def analyze_image(image_path: str, question: str) -> str: | |
| """Analyze an image using GPT-4o vision. Use for photos, charts, chess positions, diagrams. | |
| Args: | |
| image_path: path to the image file (png, jpg, etc.) | |
| question: what you want to know about the image | |
| """ | |
| try: | |
| with open(image_path, "rb") as f: | |
| image_data = base64.b64encode(f.read()).decode("utf-8") | |
| # Determine mime type | |
| ext = image_path.lower().split(".")[-1] | |
| mime_map = {"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg", "gif": "image/gif", "webp": "image/webp"} | |
| mime_type = mime_map.get(ext, "image/png") | |
| from openai import OpenAI | |
| client = OpenAI() | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": question}, | |
| {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_data}"}}, | |
| ], | |
| } | |
| ], | |
| max_tokens=1000, | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Image analysis failed: {e}" | |
| def transcribe_audio(file_path: str) -> str: | |
| """Transcribe an audio file (mp3, wav, m4a) to text using OpenAI Whisper. | |
| Args: | |
| file_path: path to the audio file | |
| """ | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI() | |
| with open(file_path, "rb") as f: | |
| transcription = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=f, | |
| ) | |
| return transcription.text[:8000] | |
| except Exception as e: | |
| return f"Transcription failed: {e}" | |
| # llm_fast = #ChatOpenAI(model="gpt-4o-mini", temperature=0) # основной агент | |
| # llm_strong = ChatOpenAI(model="gpt-4o", temperature=0) | |
| llm = HuggingFaceEndpoint( | |
| repo_id="Qwen/Qwen2.5-72B-Instruct", | |
| huggingfacehub_api_token= os.environ["HF_TOKEN"] | |
| ) | |
| llm_fast = ChatHuggingFace(llm=llm, verbose=True) | |
| tools = [ | |
| web_search, | |
| wikipedia_search, | |
| python_executor, | |
| arxiv_search, | |
| read_file, | |
| fetch_webpage, | |
| analyze_image, | |
| transcribe_audio, | |
| ] | |
| llm_with_tools = llm_fast.bind_tools(tools) | |
| class AgentState(TypedDict): | |
| messages: Annotated[list[AnyMessage], add_messages] | |
| def assistant(state: AgentState): | |
| tool_count = sum(1 for msg in state["messages"] if msg.type == "tool") | |
| if tool_count >= MAX_TOOL_CALLS: | |
| force = SystemMessage( | |
| content="Provide your FINAL ANSWER now. Format: FINAL ANSWER: [answer]." | |
| ) | |
| return {"messages": [llm_fast.invoke(state["messages"] + [force])]} | |
| return {"messages": [llm_with_tools.invoke(state["messages"])]} | |
| class FinalAnswer(BaseModel): | |
| answer: str = Field(description="The exact final answer — concise, no extra words") | |
| answer_extractor = llm_fast.with_structured_output(FinalAnswer) | |
| def agent_func(): | |
| builder = StateGraph(AgentState) | |
| # Define nodes: these do the work | |
| builder.add_node("assistant", assistant) | |
| builder.add_node("tools", ToolNode(tools, handle_tool_errors=True)) | |
| # Define edges: these determine how the control flow moves | |
| builder.add_edge(START, "assistant") | |
| builder.add_conditional_edges( | |
| "assistant", | |
| # If the latest message requires a tool, route to tools | |
| # Otherwise, provide a direct response | |
| tools_condition, | |
| ) | |
| builder.add_edge("tools", "assistant") | |
| alfred = builder.compile() | |
| return alfred |