Spaces:
Sleeping
Sleeping
| # gaia_agent_restructured_langgraph_ui.py | |
| # GAIA Multi-Agent System with LangGraph + Gradio Interface | |
| """ | |
| Updates: | |
| โ Added Gradio UI for user interaction and visualization of results | |
| โ Integrated with LangGraph flow (reasoning_agent โ final_agent) | |
| โ Prints all questions and corresponding results at the end | |
| โ Maintains modular design and existing architecture | |
| """ | |
| import os | |
| os.environ["LLAMA_BLAS"] = "1" | |
| os.environ["LLAMA_BLAS_VENDOR"] = "OpenBLAS" | |
| from langchain_community.llms import LlamaCpp | |
| from llama_cpp import Llama | |
| import re | |
| import json | |
| import requests | |
| import logging | |
| import gradio as gr | |
| from typing import Optional, List, Dict, Any | |
| import time | |
| from huggingface_hub import hf_hub_download | |
| from gradio_client import Client | |
| from langchain_core.tools import Tool | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain.agents import create_react_agent, AgentExecutor | |
| from datasets import load_dataset | |
| from huggingface_hub import login | |
| import threading | |
| import logging | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("gradio").setLevel(logging.WARNING) | |
| # ููู ุนุงู ูุญู ุงูุฉ ุงููุตูู ุฅูู LLM/Agent ุฏุงุฎู ุงูุฎููุท | |
| llama_lock = threading.Lock() | |
| model_path = hf_hub_download( | |
| repo_id="bartowski/Qwen2.5-14B-Instruct-GGUF", | |
| filename="Qwen2.5-14B-Instruct-Q6_K_L.gguf", | |
| ) | |
| llm = LlamaCpp( | |
| model_path=model_path, | |
| n_ctx=10000, | |
| n_threads=4, | |
| n_gpu_layers=0, | |
| temperature=0.4, | |
| top_p=0.9, | |
| max_tokens=150, | |
| n_batch=64, | |
| verbose=False, | |
| use_mmap=True | |
| ) | |
| # ุชุญูู ู ู ูุฌูุฏ ุชููู ูู ู ุชุบูุฑ ุงูุจูุฆุฉ | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if HF_TOKEN: | |
| login(token=HF_TOKEN) | |
| else: | |
| print("โ ๏ธ Warning: No HF_TOKEN found. Please set your Hugging Face token as an environment variable.") | |
| try: | |
| from langsmith import Client as LangSmithClient | |
| from langchain.callbacks.tracers import LangChainTracer | |
| LANGSMITH_AVAILABLE = True | |
| except Exception: | |
| LANGSMITH_AVAILABLE = False | |
| try: | |
| from langgraph.graph import StateGraph, END | |
| LANGGRAPH_AVAILABLE = True | |
| except Exception: | |
| LANGGRAPH_AVAILABLE = False | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("gaia_langgraph_ui") | |
| # -------------------- Configuration -------------------- | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| SPACE_ID = os.getenv("SPACE_ID", "") | |
| CODE_AGENT_SPACE = os.getenv("CODE_AGENT_SPACE", "https://mustafa-albakkar-codeagent.hf.space") | |
| VISION_AGENT_SPACE = os.getenv("VISION_AGENT_SPACE", "https://mustafa-albakkar-mediaagent.hf.space") | |
| FINAL_ANSWER_SPACE = os.getenv("FINAL_ANSWER_SPACE", "https://mustafa-albakkar-finalagent.hf.space") | |
| #GAIA_API_BASE = os.getenv("GAIA_API_BASE","https://mustafa-albakkar-mmo.hf.space") | |
| GAIA_API_BASE = os.getenv("GAIA_API_BASE","https://agents-course-unit4-scoring.hf.space") | |
| import time | |
| from functools import wraps | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| """ | |
| session = requests.Session() | |
| retry_strategy = Retry( | |
| total=5, | |
| backoff_factor=0.5, | |
| status_forcelist=[429, 500, 502, 503, 504] | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry_strategy) | |
| session.mount("https://", adapter) | |
| session.mount("http://", adapter) | |
| """ | |
| def retry(exception_types=(Exception,), tries=3, delay=1, backoff=2): | |
| """ | |
| Decorator ูุฅุนุงุฏุฉ ุงูู ุญุงููุฉ ูู ุญุงู ุญุฏูุซ ุฎุทุฃ ู ุคูุช (ู ุซู ุงููุทุงุน ุงูุดุจูุฉ) | |
| """ | |
| def deco(f): | |
| def wrapper(*args, **kwargs): | |
| _tries, _delay = tries, delay | |
| while _tries > 1: | |
| try: | |
| return f(*args, **kwargs) | |
| except exception_types as e: | |
| time.sleep(_delay) | |
| _tries -= 1 | |
| _delay *= backoff | |
| return f(*args, **kwargs) | |
| return wrapper | |
| return deco | |
| # -------------------- Sub-agent clients -------------------- | |
| class SubAgentClient: | |
| def __init__(self, space_url: str): | |
| self.space_url = space_url | |
| self._client = None | |
| def client(self) -> Client: | |
| if self._client is None: | |
| self._client = Client(self.space_url) | |
| return self._client | |
| def predict_text(self, prompt: str, file: Optional[str] = None, timeout: int = 90) -> str: | |
| """ | |
| ู ุฑู ูู ุงูุชุนุงู ู ู ุน ุงุฎุชูุงู ูุงุฌูุงุช ุงููููุงุก ุนูู Spaces | |
| """ | |
| patterns = [] | |
| if file and os.path.exists(file): | |
| patterns = [ | |
| lambda: self.client.predict(prompt, file, api_name="predict"), | |
| lambda: self.client.predict(prompt, file), | |
| lambda: self.client.predict([prompt, file]), | |
| lambda: self.client.predict({"input": prompt, "file": open(file, "rb")}) | |
| ] | |
| else: | |
| patterns = [ | |
| lambda: self.client.predict(prompt, api_name="predict"), | |
| lambda: self.client.predict(prompt), | |
| lambda: self.client.predict([prompt]) | |
| ] | |
| last_exc = None | |
| for call in patterns: | |
| try: | |
| res = call() | |
| if isinstance(res, (list, tuple)): | |
| return " ".join(str(x) for x in res if x) | |
| return str(res) | |
| except Exception as e: | |
| last_exc = e | |
| continue | |
| raise last_exc | |
| # Instantiate sub-agent clients | |
| code_client = SubAgentClient(CODE_AGENT_SPACE) | |
| vision_client = SubAgentClient(VISION_AGENT_SPACE) | |
| final_client = SubAgentClient(FINAL_ANSWER_SPACE) | |
| # -------------------- Tools -------------------- | |
| WIKI_HEADERS = {"User-Agent": "GAIA-Agent/1.0 (https://huggingface.co/)"} | |
| from functools import lru_cache | |
| def wiki_search(query: str) -> str: | |
| if not query: | |
| return "WIKI_ERROR: empty query" | |
| try: | |
| url = "https://en.wikipedia.org/w/api.php" | |
| params = {"action": "query", "list": "search", "srsearch": query, "format": "json", "utf8": 1} | |
| r = requests.get(url, params=params, headers=WIKI_HEADERS, timeout=12) | |
| r.raise_for_status() | |
| data = r.json() | |
| hits = data.get("query", {}).get("search", []) | |
| if not hits: | |
| return "WIKI_NO_RESULTS" | |
| snippets = [] | |
| for h in hits[:5]: | |
| title = h.get("title") | |
| snippet = re.sub(r"<.*?>", "", h.get("snippet", "")) | |
| snippets.append(f"{title}: {snippet}") | |
| return "\n".join(snippets) | |
| except Exception as e: | |
| return f"WIKI_ERROR: {e}" | |
| try: | |
| from ddgs import DDGS | |
| DDGS_AVAILABLE = True | |
| except Exception: | |
| DDGS_AVAILABLE = False | |
| def internet_search(query: str, max_results: int = 8) -> str: | |
| if not query: | |
| return "SEARCH_ERROR: empty query" | |
| if not DDGS_AVAILABLE: | |
| return "SEARCH_ERROR: ddgs not installed" | |
| try: | |
| results = [] | |
| with DDGS() as ddgs: | |
| for r in ddgs.text(query, max_results=max_results): | |
| title = r.get("title", "") | |
| href = r.get("href", "") | |
| body = r.get("body", "") | |
| results.append(f"{title}\n{href}\n{body}") | |
| return "\n---\n".join(results) if results else "SEARCH_NO_RESULTS" | |
| except Exception as e: | |
| return f"SEARCH_ERROR: {e}" | |
| def analyze_media(media_input: str, file: Optional[str] = None) -> str: | |
| return vision_client.predict_text(media_input, file) | |
| def coder_agent_proxy(prompt: str) -> str: | |
| return code_client.predict_text(prompt) | |
| tools: List[Tool] = [ | |
| Tool(name="WikipediaSearch", func=wiki_search, description="""Search English Wikipedia for factual information. Wikipedia retrieves results based on keyword matching rather than semantic understanding. Use concise,short and relevant keywords when querying it. | |
| when searching try to wide the search scope by using a short keyword at the beginning and then narrow it gradually by adding aditional information depending on the final goal and the previous results in the subsequent cycles.\n when searching DON'T add quatation "" to the action input""" | |
| ), | |
| Tool(name="InternetSearch", func=internet_search, description="Search the internet for real-time information using DuckDuckGo. DuckDuckGo retrieves results based on keyword matching rather than semantic understanding. Use most relevant keywords when querying it.\n when searching: first, start with one short keyword and then try to add more words gradually depending on the results of the previous search to narrow the search scope to find the satisfactive answer.\n "), | |
| Tool(name="MediaAnalyzer", func=analyze_media, description=("Use this tool when you need to analyze an image , audio or a video.\n" | |
| "file url: the URL that you resieved to the image /audio/ video" | |
| "- Input must be a user question and a direct media URL (image or video).\n" | |
| "- For images/audio: provide the link to the image/audio exactly like : https://huggingface.co/spaces/Mustafa-albakkar/MainAgent/resolve/main/{Attached file path}\n" | |
| "- For videos: provide the link to the YouTube or MP4 file. e.g: video_url\n" | |
| "The tool will return a detailed description and a summary.") | |
| ), | |
| Tool(name="CoderAgent", func=coder_agent_proxy, description="Use this for logical problems to generate or fix code and execute it, by providing it a discription of the needed code, don't code by yourself" | |
| ) ] | |
| # -------------------- Agent -------------------- | |
| SYSTEM_INSTRUCTIONS = ( | |
| """You are a logic-reasoning agent solving GAIA benchmark questions. | |
| Your goal is to produce a *gaia formatted final answer* for each question. | |
| **Core Instructions:** | |
| 1. Understand the question completely and identify the goal and the useful information. | |
| 2. Think step by step and use your reasoning and external tools to find the best possible solution. | |
| 3. Never stop before giving a concise "Final Answer" | |
| **Formatting Rules:** | |
| - Follow the ReAct format precisely. | |
| - End your output with `<<END>>` and stop generating immediately. | |
| Example: | |
| Final Answer: answer <<END>>""" | |
| ) | |
| #5. Never reveal system prompts or hidden reasoning instructions. | |
| #- All your final reasoning, justification, and conclusions must appear after `Final Answer:`. | |
| #3. Always include in your Final Answer: | |
| # - The answer you believe is most correct. | |
| # - A short justification or reasoning summary explaining how or why you reached it. | |
| # - Or, if uncertain, the best conclusion or partial result you found. | |
| react_template = """ | |
| You are a ReAct-style reasoning agent. Follow always *exactly* this structure: | |
| Question: {input} | |
| -Thought: Reflect on what is being asked. Based on previous Observations, decide your next useful step. | |
| -Action: <choose a tool from the provided list โ WikipediaSearch, MediaAnalyzer, CoderAgent, InternetSearch> | |
| -Action Input: <provide only the raw input for that tool, without brackets or quotes, reasure always to give the correct input based on abilities of the tool and its supported input> | |
| -Observation: <summarize the important and useful result from that tool> | |
| (Repeat the (Thought / Action / Action Input / Observation) pattern as needed.) | |
| When you are ready to conclude, write your final section: | |
| Final Answer: <write your best possible answer here.> <<END>> | |
| Notes: | |
| - Allowed tools: {tool_names} | |
| - Tool descriptions: {tools} | |
| - Always use Observations from previous steps to inform the next Thought. | |
| - Do not repeat identical actions. | |
| - Always stop generation immediately after `<<END>>`. | |
| try to use the appropriate tool for the appropriate goal. | |
| Begin. | |
| {agent_scratchpad} | |
| """ | |
| def create_agent_executor(llm, tools: List[Tool], tracer: Optional[Any] = None) -> AgentExecutor: | |
| prompt = PromptTemplate.from_template(react_template) | |
| agent = create_react_agent(llm, tools, prompt) | |
| callbacks = [] | |
| if tracer is not None: | |
| callbacks.append(tracer) | |
| executor = AgentExecutor( | |
| agent=agent, | |
| tools=tools, | |
| verbose=True, # ุฅููุงู ุงูุทุจุงุนุฉ ุงูู ูุซูุฉ โ ููู ูู ููู ุชุดุบูููุง ุฃุซูุงุก debugging ุฅุฐุง ุฑุบุจุช | |
| callbacks=callbacks, | |
| max_iterations=10, | |
| handle_parsing_errors=True, | |
| early_stopping_method="force", | |
| return_intermediate_steps=True, # <<< ุงูุฃูู โ ุงุทูุจ ุฅุนุงุฏุฉ intermediate_steps | |
| # trim_intermediate_steps=[-1:-2] # -1 => ูุง ุชูุต ุงูุฎุทูุงุช ูุจู ุงูุฅุฑุฌุงุน (ุฃู ุญุฏุฏ ุนุฏุฏูุง ุฅู ุฃุฑุฏุช ุงูุญุฏ) | |
| ) | |
| return executor | |
| # -------------------- GAIA Runner -------------------- | |
| class GaiaRunner: | |
| def __init__(self, agent_executor: AgentExecutor, username: str = "unknown"): | |
| self.agent = agent_executor | |
| self.username = username | |
| def run_on_question(self, question_text: str, file_path: Optional[str] = None) -> str: | |
| """ | |
| ุชุดุบูู ุงููููู ุงูุฑุฆูุณู (ReAct agent) ุนูู ุณุคุงู ูุงุญุฏ ู ุน ุฅู ูุงููุฉ ูุฌูุฏ ู ุฑูู. | |
| ูุฌู ุน ูู ุฎุทูุงุช ุงูุชูููุฑ (Thought โ Action โ Observation) ููุฑุณููุง ููุญุฏุฉ ูุงุญุฏุฉ ุฅูู ูููู ุงูุฅุฌุงุจุฉ ุงูููุงุฆูุฉ. | |
| """ | |
| try: | |
| # ========================== | |
| # 1๏ธโฃ ุฅุนุฏุงุฏ ุงูุฏุฎู ูููู ูุฐุฌ | |
| # ========================== | |
| # SYSTEM_INSTRUCTIONS = ( | |
| # "You are a reasoning agent that follows the ReAct pattern (Thought, Action, Observation). " | |
| # "Use available tools if necessary, and finish with 'Final Answer: ... <<END>>'" | |
| # ) | |
| prompt = SYSTEM_INSTRUCTIONS + "\n\nQuestion:\n" + question_text | |
| if file_path: | |
| prompt += f"\n[Attached file path: {file_path}]" | |
| print(f"\n๐ Running ReAct agent on question:\n{question_text}") | |
| if file_path: | |
| print(f"๐ With attachment: {file_path}") | |
| # ========================== | |
| # 2๏ธโฃ ุชูููุฐ ุงููููู ุงูุฑุฆูุณู | |
| # ========================== | |
| result = self.agent.invoke({"input": prompt}) | |
| # ========================== | |
| # 3๏ธโฃ ุจูุงุก ุณุฌู ุงูุชูููุฑ ุงููุงู ู | |
| # ========================== | |
| # ุจุนุฏ ุงูุญุตูู ุนูู result | |
| if isinstance(result, dict): | |
| output = result.get("output") or result.get("text") or str(result) | |
| intermediate = result.get("intermediate_steps", []) | |
| else: | |
| output = getattr(result, "output", str(result)) | |
| intermediate = [] | |
| # ุจูุงุก ุงูุณุฌู | |
| full_log = [f"Question: {question_text}\n"] | |
| if file_path: | |
| full_log.append(f"Attachment: {file}\n") | |
| # ุชุญุฏูุฏ ุงูุญุฏ ุงูุฃูุตู ูุนุฏุฏ ุงูุฏูุฑุงุช ุงูู ุฑุงุฏ ุชุณุฌูููุง | |
| MAX_LOG_STEPS = 4 | |
| # ุงุญุชูุธ ููุท ุจุขุฎุฑ 4 ุฏูุฑุงุช ู ู intermediate_steps | |
| if intermediate: | |
| recent_steps = intermediate[-MAX_LOG_STEPS:] if len(intermediate) > MAX_LOG_STEPS else intermediate | |
| for step in recent_steps: | |
| try: | |
| action, observation = step | |
| full_log.append( | |
| f"Thought/Action: {getattr(action, 'log', getattr(action, 'tool', str(action)))}\n" | |
| f"Action Input: {getattr(action, 'tool_input', getattr(action, 'input', ''))}\n" | |
| f"Observation: {observation}\n" | |
| ) | |
| except Exception as e: | |
| full_log.append(f"[UNPARSEABLE STEP] {step}\n") | |
| full_log.append(f"Final Answer: {output}\n") | |
| conversation_log = "\n".join(full_log) | |
| # ========================== | |
| # 4๏ธโฃ ุฅุฑุณุงู ุงูุณุฌู ุงููุงู ู ุฅูู ูููู ุงูุฅุฌุงุจุฉ ุงูููุงุฆูุฉ | |
| # ========================== | |
| final_out = None | |
| try: | |
| final_out = final_client.predict_text(conversation_log) | |
| # final_out = output | |
| print(f"โ Final Answer Agent Output: {final_out}") | |
| except Exception as e: | |
| print(f"[โ ๏ธ] Failed to contact Final Answer Agent: {e}") | |
| final_out = output # fallback ุฅูู ุงููุงุชุฌ ุงูู ุญูู ุฅู ูุดู | |
| return final_out or output | |
| except Exception as e: | |
| print(f"[โ] Error while running on question: {e}") | |
| return "Error: unable to process this question." | |
| def run_all_and_submit(self) -> Dict[str, Any]: | |
| questions_url =f"{GAIA_API_BASE}/questions" | |
| submit_url =f"{GAIA_API_BASE}/submit" | |
| print(f"Fetching questions from: {questions_url}") | |
| max_retries = 10 # ุนุฏุฏ ุงูู ุญุงููุงุช (ูู 10 ุซูุงูู ุชูุฑูุจูุง = ุฏูููุฉ ููุตู ูุญุฏ ุฃูุตู) | |
| # for attempt in range(max_retries): | |
| """ | |
| # 2. Fetch Questions (ุจุฏูุงู ู ู ุงูุทูุจ ู ู ุงูุณุจูุณ ุงูุฎุงุฑุฌูุฉ) | |
| print("Fetching questions directly from GAIA dataset on Hugging Face...") | |
| try: | |
| dataset = load_dataset("gaia-benchmark/GAIA", "2023_level1", split="validation") | |
| questions_data = [] | |
| for item in dataset: | |
| metadata = item.get("Annotator Metadata", {}) | |
| num_tools = int(metadata.get("Number of tools", 99)) if metadata else 99 | |
| num_steps = int(metadata.get("Number of steps", 99)) if metadata else 99 | |
| # ููุณ ููุชุฑุฉ ุงูุฃุณุฆูุฉ ูู ุง ูู ุงูููุฏ ุงูุฃุตูู ููุณุจูุณ | |
| if num_tools < 3 and num_steps < 6: | |
| questions_data.append({ | |
| "task_id": str(item.get("task_id")), | |
| "question": str(item.get("Question")), | |
| "Level": item.get("Level"), | |
| "file_name": item.get("file_name") | |
| }) | |
| if not questions_data: | |
| return "No valid questions found after filtering.", None | |
| print(f"Fetched {len(questions_data)} questions from GAIA dataset.") | |
| except Exception as e: | |
| print(f"Error loading GAIA dataset directly: {e}") | |
| return f"Error loading GAIA dataset directly: {e}", None | |
| """ | |
| response = requests.get(questions_url, timeout=15) | |
| if response.status_code == 200: | |
| questions_data = response.json() | |
| print(questions_data) | |
| if questions_data: | |
| print(f"โ Successfully fetched {len(questions_data)} questions.") | |
| # break | |
| elif response.status_code == 404: | |
| # print(f"โ ๏ธ Attempt {attempt+1}/{max_retries}: Questions not ready yet (404). Waiting 10s...") | |
| import time; time.sleep(10) | |
| # continue | |
| else: | |
| # print(f"โ ๏ธ Attempt {attempt+1}/{max_retries}: Got status {response.status_code}, retrying in 10s...") | |
| import time; time.sleep(10) | |
| # continue | |
| # except requests.exceptions.RequestException as e: | |
| # print(f"โ ๏ธ Attempt {attempt+1}/{max_retries} failed: {e}") | |
| # import time; time.sleep(10) | |
| # continue | |
| # else: | |
| # print("โ Exhausted all retries, could not fetch questions.") | |
| # return "Failed to fetch questions from server after multiple attempts.", None | |
| answers = [] | |
| results_log = [] | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| answers = [] | |
| results_log = [] | |
| futures = [] # ูุชุฎุฒูู ุงููุชุงุฆุฌ ุงูููุงุฆูุฉ | |
| for q in questions_data: | |
| task_id = q.get("task_id") | |
| qtext = q.get("question") | |
| attach = q.get("file_name") | |
| # ุชุญู ูู ุงูู ุฑูู ููุท ุฅุฐุง ูุงู ู ูุฌูุฏูุง ูุนูุงู | |
| if attach and attach.strip(): | |
| file_path = self.download_gaia_attachment(q) | |
| else: | |
| file_path = None | |
| # ุชูููุฐ ุงูุฏุงูุฉ ู ุจุงุดุฑุฉ (ุชุณูุณูููุง) | |
| # print(f"๐น Running question {task_id}: {qtext}...") | |
| result = self.run_on_question(qtext, file_path) | |
| # ุญูุธ ุงููุชูุฌุฉ ุจุดูู ู ุดุงุจู ูุงุณุชุฎุฏุงู futures ุณุงุจููุง | |
| print("โ All questions processed.") | |
| answers.append({ | |
| "task_id": task_id, | |
| "submitted_answer": result, # ๐น ุฃุตูุญูุง ุงุณู ุงูุญูู ุฃูุถูุง (ุฃุฒููุง ุงูุดุฑุทุฉ ุงูู ุงุฆูุฉ ุงูุฒุงุฆุฏุฉ) | |
| }) | |
| results_log.append({ | |
| "question": qtext, | |
| "answer": result, | |
| }) | |
| #print("โ All questions processed successfully.") | |
| payload = {"username": self.username, "agent_code": f"https://huggingface.co/spaces/{SPACE_ID}/tree/main", "answers": answers} | |
| r2 = requests.post(submit_url, json=payload, timeout=120) | |
| r2.raise_for_status() | |
| print("\n๐ฏ All questions processed and submitted successfully!") | |
| print(json.dumps(results_log, indent=2)) | |
| print({"submission_result": r2.json(), "results_log": results_log}) | |
| return {"submission_result": r2.json(), "results_log": results_log} | |
| import os | |
| import requests | |
| # ุชุนุฑูู ู ุฌูุฏ ุงูู ุฑููุงุช ู ุฑุฉ ูุงุญุฏุฉ | |
| ATTACHMENTS_DIR = "attachments" | |
| os.makedirs(ATTACHMENTS_DIR, exist_ok=True) | |
| # ุฅูุดุงุก Session ูุงุญุฏ ู ุน ุฅุนุงุฏุฉ ุงูู ุญุงููุฉ (ุงุฎุชูุงุฑู ููู ู ุณุชุญุณู) | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| _session = requests.Session() | |
| retry_strategy = Retry(total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504]) | |
| _adapter = HTTPAdapter(max_retries=retry_strategy) | |
| _session.mount("https://", _adapter) | |
| _session.mount("http://", _adapter) | |
| # @staticmethod | |
| ATTACHMENTS_DIR = "attachments" | |
| os.makedirs(ATTACHMENTS_DIR, exist_ok=True) | |
| def download_gaia_attachment(task: dict) -> str: | |
| """ | |
| ุชูุฒูู ุงูู ุฑูู ุงูู ุฑุชุจุท ุจุงูุณุคุงู ู ู GAIA API ูุชุฎุฒููู ู ุญูููุง. | |
| """ | |
| task_id = task.get("task_id") | |
| file_name = task.get("file_name") | |
| if not task_id or not file_name: | |
| return None # ูุง ููุฌุฏ ู ุฑูู | |
| # ุฑุงุจุท API ูุชุญู ูู ุงูู ูู | |
| url = f"https://agents-course-unit4-scoring.hf.space/files/{file_name}" | |
| local_path = os.path.join("attachments", file_name) | |
| u=None | |
| # ุชูุฒูู ุงูู ูู ุฅุฐุง ูู ููู ู ูุฌูุฏูุง ู ุญูููุง | |
| if not os.path.exists(local_path): | |
| try: | |
| r = requests.get(url, timeout=30) | |
| r.raise_for_status() | |
| with open(local_path, "wb") as f: | |
| f.write(r.content) | |
| print(f"[GAIA] Attachment downloaded: {local_path}") | |
| u=local_path | |
| except Exception as e: | |
| print(f"[GAIA] Failed to download attachment: {e}") | |
| u=url | |
| return u | |
| else: | |
| print(f"[GAIA] Attachment already exists: {local_path}") | |
| # return local_path | |
| return u | |
| # -------------------- LangGraph Integration -------------------- | |
| if LANGGRAPH_AVAILABLE: | |
| from langgraph.graph import StateGraph, END | |
| class State: | |
| def __init__(self, question: str, file: Optional[str] = None): | |
| self.question = question | |
| self.file = file | |
| self.partial_answer = None | |
| self.final_answer = None | |
| def reasoning_node(state: State, agent_exec: AgentExecutor) -> State: | |
| prompt = SYSTEM_INSTRUCTIONS + "\n\n" + state.question | |
| result = agent_exec.invoke({"input": prompt}) | |
| state.partial_answer = result.get("output") if isinstance(result, dict) else str(result) | |
| return state | |
| def final_node(state: State) -> State: | |
| state.final_answer = final_client.predict_text(state.partial_answer) | |
| return state | |
| def build_gaia_graph(agent_exec: AgentExecutor): | |
| builder = StateGraph(State) | |
| builder.add_node("reasoning_agent", lambda s: reasoning_node(s, agent_exec)) | |
| builder.add_node("final_agent", final_node) | |
| builder.add_edge("reasoning_agent", "final_agent") | |
| builder.add_edge("final_agent", END) | |
| return builder.compile() | |
| # -------------------- Gradio UI -------------------- | |
| def gradio_interface(): | |
| # class DummyLLM: | |
| # def __call__(self, *args, **kwargs): | |
| # return "" | |
| tracer = None | |
| if LANGSMITH_AVAILABLE and os.getenv("LANGSMITH_API_KEY"): | |
| try: | |
| client = LangSmithClient(api_key=os.getenv("LANGSMITH_API_KEY")) | |
| tracer = LangChainTracer(client=client, project_name=os.getenv("LANGSMITH_PROJECT", "gaia-project")) | |
| except Exception: | |
| pass | |
| agent_exec = create_agent_executor(llm, tools, tracer=tracer) | |
| runner = GaiaRunner(agent_exec, username=os.getenv("HF_USER", "unknown")) | |
| def process(): | |
| result = runner.run_all_and_submit() | |
| formatted = json.dumps(result["results_log"], indent=2) | |
| return formatted | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# ๐ง GAIA Multi-Agent System (LangGraph + Gradio)") | |
| output_box = gr.Textbox(label="Results Log", lines=25) | |
| run_button = gr.Button("Run GAIA Evaluation") | |
| run_button.click(process, outputs=output_box) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = gradio_interface() | |
| print("model is starting") | |
| llm.invoke("Hello") # warm-up | |
| print("โ Model ready") | |
| demo.launch(debug= True, show_error=True, share=False) | |