import os import re import io import contextlib import requests import base64 import zipfile import json from typing import TypedDict, Annotated from langgraph.graph import StateGraph, START from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode, tools_condition from langchain_openai import ChatOpenAI from langchain_core.messages import AnyMessage, HumanMessage, SystemMessage from langchain_core.tools import tool from pydantic import BaseModel, Field from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace from dotenv import load_dotenv load_dotenv() SYSTEM_PROMPT = """You are a research agent solving questions from the GAIA benchmark. WORKFLOW: 1. Analyze the question carefully before acting. 2. If the question contains reversed text, reverse it back first using python_executor. 3. If the question references a file (Excel, CSV, Python, etc.), use read_file to read it. 4. If the question references an image file, use analyze_image to look at it. 5. If the question references an audio/mp3 file, use transcribe_audio to get the text. 6. If the question requires math or logic, use python_executor. 7. If the question asks about a YouTube video, first try youtube_transcript. If that fails, use web_search. 8. Use web_search or wikipedia_search for factual questions. 9. If you find a URL that might have the answer, use fetch_webpage to read it. RULES: - NEVER call the same tool with the same query twice. - If a tool fails, try a DIFFERENT approach. - For math/logic problems with tables, use python_executor to check ALL pairs systematically. - For math — ALWAYS use python_executor, never calculate in your head. - Keep search queries short: 2-5 words. - NEVER say "I cannot access" or "I'm unable to" — always try tools first, then give your best guess. - For botany questions: bell peppers, corn, green beans, zucchini, tomatoes, pumpkins are botanical FRUITS, not vegetables. CRITICAL — ANSWER FORMAT: Your response must end with exactly: FINAL ANSWER: [your answer] The answer must be: - CONCISE: a number, name, date, or short phrase - EXACT: no extra words like "The answer is..." - If a number: just the number - If a name: just the name - If a list: comma-separated values """ MAX_TOOL_CALLS = 10 RECURSION_LIMIT = 40 @tool def web_search(query: str) -> str: """Search the web for current events, facts, people, etc. Args: query: search query string (keep it short and specific) """ try: from langchain_tavily import TavilySearch search = TavilySearch(max_results=3) results = search.invoke(query) # TavilySearch возвращает list of dicts или string if isinstance(results, list): formatted = [] for r in results: url = r.get("url", "") content = r.get("content", "") formatted.append(f"Source: {url}\n{content}") return "\n\n---\n\n".join(formatted)[:5000] return str(results)[:5000] except Exception as e: return f"Search failed: {e}" @tool def wikipedia_search(query: str) -> str: """Search Wikipedia for factual information about people, places, history, science. Args: query: topic to search on Wikipedia """ try: from langchain_community.utilities import WikipediaAPIWrapper wiki = WikipediaAPIWrapper(top_k_results=2, doc_content_chars_max=4000) return wiki.run(query) except Exception as e: return f"Wikipedia search failed: {e}" @tool def arxiv_search(query: str) -> str: """Search academic papers on ArXiv for scientific/research questions. Args: query: search query for academic papers """ try: from langchain_community.document_loaders import ArxivLoader docs = ArxivLoader(query=query, load_max_docs=2).load() results = [] for doc in docs: title = doc.metadata.get("Title", "No title") results.append(f"**{title}**\n{doc.page_content[:1500]}") return "\n\n---\n\n".join(results) if results else "No results found." except Exception as e: return f"ArXiv search failed: {e}" @tool def fetch_webpage(url: str) -> str: """Fetch and read content from a URL/webpage. Args: url: full URL to fetch """ try: headers = {"User-Agent": "Mozilla/5.0"} resp = requests.get(url, headers=headers, timeout=15) resp.raise_for_status() from bs4 import BeautifulSoup soup = BeautifulSoup(resp.text, "html.parser") for tag in soup(["script", "style", "nav", "footer", "header"]): tag.decompose() text = soup.get_text(separator="\n", strip=True) return text[:8000] except Exception as e: return f"Failed to fetch URL: {e}" python_state = { "__builtins__": __builtins__, "import_module": __import__ } @tool def python_executor(code: str) -> str: """ Execute Python code with persistent state across calls. Use print() to see results. All variables are saved for the next call. """ # Очистка кода от Markdown-оберток, если модель их добавила code = re.sub(r'^```python\n|```$', '', code, flags=re.MULTILINE) output = io.StringIO() try: with contextlib.redirect_stdout(output): # Используем один и тот же словарь python_state exec(code, python_state) result = output.getvalue().strip() if not result: return "Code executed successfully, but produced no output. Remember to use print()." return result except Exception as e: return f"Python Error: {str(e)}" @tool def read_file(file_path: str) -> str: """ Read content of files: TXT, CSV, JSON, PY, XLSX, PDF, or ZIP. For ZIP: lists files inside. For PDF: extracts text. For Tables: returns a summary and first 10 rows. """ if not os.path.exists(file_path): return f"Error: File '{file_path}' not found." ext = file_path.lower().split('.')[-1] try: # 1. Таблицы (Excel, CSV) if ext in ['xlsx', 'xls', 'csv']: import pandas as pd df = pd.read_excel(file_path) if ext.startswith('xls') else pd.read_csv(file_path) summary = f"Rows: {len(df)}, Columns: {df.columns.tolist()}\n" return summary + df.head(15).to_string() # 2. PDF (через PyMuPDF / fitz) elif ext == 'pdf': import fitz doc = fitz.open(file_path) text = [] for i, page in enumerate(doc[:10]): # Ограничимся 10 страницами text.append(f"--- Page {i+1} ---\n{page.get_text()}") return "\n".join(text)[:15000] # 3. ZIP-архивы elif ext == 'zip': with zipfile.ZipFile(file_path, 'r') as z: files = z.namelist() return f"ZIP Archive contains: {files}. Use python_executor to extract if needed." # 4. JSON elif ext == 'json': with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return json.dumps(data, indent=2)[:10000] # 5. Обычный текст else: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: return f.read(15000) # Читаем первые 15к символов except Exception as e: return f"Error processing file {file_path}: {str(e)}" @tool def analyze_image(image_path: str, question: str) -> str: """Analyze an image using GPT-4o vision. Use for photos, charts, chess positions, diagrams. Args: image_path: path to the image file (png, jpg, etc.) question: what you want to know about the image """ try: with open(image_path, "rb") as f: image_data = base64.b64encode(f.read()).decode("utf-8") # Determine mime type ext = image_path.lower().split(".")[-1] mime_map = {"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg", "gif": "image/gif", "webp": "image/webp"} mime_type = mime_map.get(ext, "image/png") from openai import OpenAI client = OpenAI() response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": [ {"type": "text", "text": question}, {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_data}"}}, ], } ], max_tokens=1000, ) return response.choices[0].message.content except Exception as e: return f"Image analysis failed: {e}" @tool def transcribe_audio(file_path: str) -> str: """Transcribe an audio file (mp3, wav, m4a) to text using OpenAI Whisper. Args: file_path: path to the audio file """ try: from openai import OpenAI client = OpenAI() with open(file_path, "rb") as f: transcription = client.audio.transcriptions.create( model="whisper-1", file=f, ) return transcription.text[:8000] except Exception as e: return f"Transcription failed: {e}" # llm_fast = #ChatOpenAI(model="gpt-4o-mini", temperature=0) # основной агент # llm_strong = ChatOpenAI(model="gpt-4o", temperature=0) llm = HuggingFaceEndpoint( repo_id="Qwen/Qwen2.5-72B-Instruct", huggingfacehub_api_token= os.environ["HF_TOKEN"] ) llm_fast = ChatHuggingFace(llm=llm, verbose=True) tools = [ web_search, wikipedia_search, python_executor, arxiv_search, read_file, fetch_webpage, analyze_image, transcribe_audio, ] llm_with_tools = llm_fast.bind_tools(tools) class AgentState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] def assistant(state: AgentState): tool_count = sum(1 for msg in state["messages"] if msg.type == "tool") if tool_count >= MAX_TOOL_CALLS: force = SystemMessage( content="Provide your FINAL ANSWER now. Format: FINAL ANSWER: [answer]." ) return {"messages": [llm_fast.invoke(state["messages"] + [force])]} return {"messages": [llm_with_tools.invoke(state["messages"])]} class FinalAnswer(BaseModel): answer: str = Field(description="The exact final answer — concise, no extra words") answer_extractor = llm_fast.with_structured_output(FinalAnswer) def agent_func(): builder = StateGraph(AgentState) # Define nodes: these do the work builder.add_node("assistant", assistant) builder.add_node("tools", ToolNode(tools, handle_tool_errors=True)) # Define edges: these determine how the control flow moves builder.add_edge(START, "assistant") builder.add_conditional_edges( "assistant", # If the latest message requires a tool, route to tools # Otherwise, provide a direct response tools_condition, ) builder.add_edge("tools", "assistant") alfred = builder.compile() return alfred