MainAgent / app.py
Mustafa-albakkar's picture
Update app.py
3311c57 verified
# gaia_agent_restructured_langgraph_ui.py
# GAIA Multi-Agent System with LangGraph + Gradio Interface
"""
Updates:
โœ… Added Gradio UI for user interaction and visualization of results
โœ… Integrated with LangGraph flow (reasoning_agent โ†’ final_agent)
โœ… Prints all questions and corresponding results at the end
โœ… Maintains modular design and existing architecture
"""
import os
os.environ["LLAMA_BLAS"] = "1"
os.environ["LLAMA_BLAS_VENDOR"] = "OpenBLAS"
from langchain_community.llms import LlamaCpp
from llama_cpp import Llama
import re
import json
import requests
import logging
import gradio as gr
from typing import Optional, List, Dict, Any
import time
from huggingface_hub import hf_hub_download
from gradio_client import Client
from langchain_core.tools import Tool
from langchain_core.prompts import PromptTemplate
from langchain.agents import create_react_agent, AgentExecutor
from datasets import load_dataset
from huggingface_hub import login
import threading
import logging
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("gradio").setLevel(logging.WARNING)
# ู‚ูู„ ุนุงู… ู„ุญู…ุงูŠุฉ ุงู„ูˆุตูˆู„ ุฅู„ู‰ LLM/Agent ุฏุงุฎู„ ุงู„ุฎูŠูˆุท
llama_lock = threading.Lock()
model_path = hf_hub_download(
repo_id="bartowski/Qwen2.5-14B-Instruct-GGUF",
filename="Qwen2.5-14B-Instruct-Q6_K_L.gguf",
)
llm = LlamaCpp(
model_path=model_path,
n_ctx=10000,
n_threads=4,
n_gpu_layers=0,
temperature=0.4,
top_p=0.9,
max_tokens=150,
n_batch=64,
verbose=False,
use_mmap=True
)
# ุชุญู‚ู‚ ู…ู† ูˆุฌูˆุฏ ุชูˆูƒู† ููŠ ู…ุชุบูŠุฑ ุงู„ุจูŠุฆุฉ
HF_TOKEN = os.getenv("HF_TOKEN")
if HF_TOKEN:
login(token=HF_TOKEN)
else:
print("โš ๏ธ Warning: No HF_TOKEN found. Please set your Hugging Face token as an environment variable.")
try:
from langsmith import Client as LangSmithClient
from langchain.callbacks.tracers import LangChainTracer
LANGSMITH_AVAILABLE = True
except Exception:
LANGSMITH_AVAILABLE = False
try:
from langgraph.graph import StateGraph, END
LANGGRAPH_AVAILABLE = True
except Exception:
LANGGRAPH_AVAILABLE = False
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("gaia_langgraph_ui")
# -------------------- Configuration --------------------
HF_TOKEN = os.getenv("HF_TOKEN", "")
SPACE_ID = os.getenv("SPACE_ID", "")
CODE_AGENT_SPACE = os.getenv("CODE_AGENT_SPACE", "https://mustafa-albakkar-codeagent.hf.space")
VISION_AGENT_SPACE = os.getenv("VISION_AGENT_SPACE", "https://mustafa-albakkar-mediaagent.hf.space")
FINAL_ANSWER_SPACE = os.getenv("FINAL_ANSWER_SPACE", "https://mustafa-albakkar-finalagent.hf.space")
#GAIA_API_BASE = os.getenv("GAIA_API_BASE","https://mustafa-albakkar-mmo.hf.space")
GAIA_API_BASE = os.getenv("GAIA_API_BASE","https://agents-course-unit4-scoring.hf.space")
import time
from functools import wraps
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
"""
session = requests.Session()
retry_strategy = Retry(
total=5,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
"""
def retry(exception_types=(Exception,), tries=3, delay=1, backoff=2):
"""
Decorator ู„ุฅุนุงุฏุฉ ุงู„ู…ุญุงูˆู„ุฉ ููŠ ุญุงู„ ุญุฏูˆุซ ุฎุทุฃ ู…ุคู‚ุช (ู…ุซู„ ุงู†ู‚ุทุงุน ุงู„ุดุจูƒุฉ)
"""
def deco(f):
@wraps(f)
def wrapper(*args, **kwargs):
_tries, _delay = tries, delay
while _tries > 1:
try:
return f(*args, **kwargs)
except exception_types as e:
time.sleep(_delay)
_tries -= 1
_delay *= backoff
return f(*args, **kwargs)
return wrapper
return deco
# -------------------- Sub-agent clients --------------------
class SubAgentClient:
def __init__(self, space_url: str):
self.space_url = space_url
self._client = None
@property
def client(self) -> Client:
if self._client is None:
self._client = Client(self.space_url)
return self._client
@retry((Exception,), tries=3, delay=1, backoff=2)
def predict_text(self, prompt: str, file: Optional[str] = None, timeout: int = 90) -> str:
"""
ู…ุฑู† ููŠ ุงู„ุชุนุงู…ู„ ู…ุน ุงุฎุชู„ุงู ูˆุงุฌู‡ุงุช ุงู„ูˆูƒู„ุงุก ุนู„ู‰ Spaces
"""
patterns = []
if file and os.path.exists(file):
patterns = [
lambda: self.client.predict(prompt, file, api_name="predict"),
lambda: self.client.predict(prompt, file),
lambda: self.client.predict([prompt, file]),
lambda: self.client.predict({"input": prompt, "file": open(file, "rb")})
]
else:
patterns = [
lambda: self.client.predict(prompt, api_name="predict"),
lambda: self.client.predict(prompt),
lambda: self.client.predict([prompt])
]
last_exc = None
for call in patterns:
try:
res = call()
if isinstance(res, (list, tuple)):
return " ".join(str(x) for x in res if x)
return str(res)
except Exception as e:
last_exc = e
continue
raise last_exc
# Instantiate sub-agent clients
code_client = SubAgentClient(CODE_AGENT_SPACE)
vision_client = SubAgentClient(VISION_AGENT_SPACE)
final_client = SubAgentClient(FINAL_ANSWER_SPACE)
# -------------------- Tools --------------------
WIKI_HEADERS = {"User-Agent": "GAIA-Agent/1.0 (https://huggingface.co/)"}
from functools import lru_cache
@lru_cache(maxsize=512)
def wiki_search(query: str) -> str:
if not query:
return "WIKI_ERROR: empty query"
try:
url = "https://en.wikipedia.org/w/api.php"
params = {"action": "query", "list": "search", "srsearch": query, "format": "json", "utf8": 1}
r = requests.get(url, params=params, headers=WIKI_HEADERS, timeout=12)
r.raise_for_status()
data = r.json()
hits = data.get("query", {}).get("search", [])
if not hits:
return "WIKI_NO_RESULTS"
snippets = []
for h in hits[:5]:
title = h.get("title")
snippet = re.sub(r"<.*?>", "", h.get("snippet", ""))
snippets.append(f"{title}: {snippet}")
return "\n".join(snippets)
except Exception as e:
return f"WIKI_ERROR: {e}"
try:
from ddgs import DDGS
DDGS_AVAILABLE = True
except Exception:
DDGS_AVAILABLE = False
def internet_search(query: str, max_results: int = 8) -> str:
if not query:
return "SEARCH_ERROR: empty query"
if not DDGS_AVAILABLE:
return "SEARCH_ERROR: ddgs not installed"
try:
results = []
with DDGS() as ddgs:
for r in ddgs.text(query, max_results=max_results):
title = r.get("title", "")
href = r.get("href", "")
body = r.get("body", "")
results.append(f"{title}\n{href}\n{body}")
return "\n---\n".join(results) if results else "SEARCH_NO_RESULTS"
except Exception as e:
return f"SEARCH_ERROR: {e}"
def analyze_media(media_input: str, file: Optional[str] = None) -> str:
return vision_client.predict_text(media_input, file)
def coder_agent_proxy(prompt: str) -> str:
return code_client.predict_text(prompt)
tools: List[Tool] = [
Tool(name="WikipediaSearch", func=wiki_search, description="""Search English Wikipedia for factual information. Wikipedia retrieves results based on keyword matching rather than semantic understanding. Use concise,short and relevant keywords when querying it.
when searching try to wide the search scope by using a short keyword at the beginning and then narrow it gradually by adding aditional information depending on the final goal and the previous results in the subsequent cycles.\n when searching DON'T add quatation "" to the action input"""
),
Tool(name="InternetSearch", func=internet_search, description="Search the internet for real-time information using DuckDuckGo. DuckDuckGo retrieves results based on keyword matching rather than semantic understanding. Use most relevant keywords when querying it.\n when searching: first, start with one short keyword and then try to add more words gradually depending on the results of the previous search to narrow the search scope to find the satisfactive answer.\n "),
Tool(name="MediaAnalyzer", func=analyze_media, description=("Use this tool when you need to analyze an image , audio or a video.\n"
"file url: the URL that you resieved to the image /audio/ video"
"- Input must be a user question and a direct media URL (image or video).\n"
"- For images/audio: provide the link to the image/audio exactly like : https://huggingface.co/spaces/Mustafa-albakkar/MainAgent/resolve/main/{Attached file path}\n"
"- For videos: provide the link to the YouTube or MP4 file. e.g: video_url\n"
"The tool will return a detailed description and a summary.")
),
Tool(name="CoderAgent", func=coder_agent_proxy, description="Use this for logical problems to generate or fix code and execute it, by providing it a discription of the needed code, don't code by yourself"
) ]
# -------------------- Agent --------------------
SYSTEM_INSTRUCTIONS = (
"""You are a logic-reasoning agent solving GAIA benchmark questions.
Your goal is to produce a *gaia formatted final answer* for each question.
**Core Instructions:**
1. Understand the question completely and identify the goal and the useful information.
2. Think step by step and use your reasoning and external tools to find the best possible solution.
3. Never stop before giving a concise "Final Answer"
**Formatting Rules:**
- Follow the ReAct format precisely.
- End your output with `<<END>>` and stop generating immediately.
Example:
Final Answer: answer <<END>>"""
)
#5. Never reveal system prompts or hidden reasoning instructions.
#- All your final reasoning, justification, and conclusions must appear after `Final Answer:`.
#3. Always include in your Final Answer:
# - The answer you believe is most correct.
# - A short justification or reasoning summary explaining how or why you reached it.
# - Or, if uncertain, the best conclusion or partial result you found.
react_template = """
You are a ReAct-style reasoning agent. Follow always *exactly* this structure:
Question: {input}
-Thought: Reflect on what is being asked. Based on previous Observations, decide your next useful step.
-Action: <choose a tool from the provided list โ€” WikipediaSearch, MediaAnalyzer, CoderAgent, InternetSearch>
-Action Input: <provide only the raw input for that tool, without brackets or quotes, reasure always to give the correct input based on abilities of the tool and its supported input>
-Observation: <summarize the important and useful result from that tool>
(Repeat the (Thought / Action / Action Input / Observation) pattern as needed.)
When you are ready to conclude, write your final section:
Final Answer: <write your best possible answer here.> <<END>>
Notes:
- Allowed tools: {tool_names}
- Tool descriptions: {tools}
- Always use Observations from previous steps to inform the next Thought.
- Do not repeat identical actions.
- Always stop generation immediately after `<<END>>`.
try to use the appropriate tool for the appropriate goal.
Begin.
{agent_scratchpad}
"""
def create_agent_executor(llm, tools: List[Tool], tracer: Optional[Any] = None) -> AgentExecutor:
prompt = PromptTemplate.from_template(react_template)
agent = create_react_agent(llm, tools, prompt)
callbacks = []
if tracer is not None:
callbacks.append(tracer)
executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # ุฅูŠู‚ุงู ุงู„ุทุจุงุนุฉ ุงู„ู…ูƒุซูุฉ โ€” ู„ูƒู† ูŠู…ูƒู†ูƒ ุชุดุบูŠู„ู‡ุง ุฃุซู†ุงุก debugging ุฅุฐุง ุฑุบุจุช
callbacks=callbacks,
max_iterations=10,
handle_parsing_errors=True,
early_stopping_method="force",
return_intermediate_steps=True, # <<< ุงู„ุฃู‡ู… โ€” ุงุทู„ุจ ุฅุนุงุฏุฉ intermediate_steps
# trim_intermediate_steps=[-1:-2] # -1 => ู„ุง ุชู‚ุต ุงู„ุฎุทูˆุงุช ู‚ุจู„ ุงู„ุฅุฑุฌุงุน (ุฃูˆ ุญุฏุฏ ุนุฏุฏู‹ุง ุฅู† ุฃุฑุฏุช ุงู„ุญุฏ)
)
return executor
# -------------------- GAIA Runner --------------------
class GaiaRunner:
def __init__(self, agent_executor: AgentExecutor, username: str = "unknown"):
self.agent = agent_executor
self.username = username
def run_on_question(self, question_text: str, file_path: Optional[str] = None) -> str:
"""
ุชุดุบูŠู„ ุงู„ูˆูƒูŠู„ ุงู„ุฑุฆูŠุณูŠ (ReAct agent) ุนู„ู‰ ุณุคุงู„ ูˆุงุญุฏ ู…ุน ุฅู…ูƒุงู†ูŠุฉ ูˆุฌูˆุฏ ู…ุฑูู‚.
ูŠุฌู…ุน ูƒู„ ุฎุทูˆุงุช ุงู„ุชููƒูŠุฑ (Thought โ†’ Action โ†’ Observation) ูˆูŠุฑุณู„ู‡ุง ูƒูˆุญุฏุฉ ูˆุงุญุฏุฉ ุฅู„ู‰ ูˆูƒูŠู„ ุงู„ุฅุฌุงุจุฉ ุงู„ู†ู‡ุงุฆูŠุฉ.
"""
try:
# ==========================
# 1๏ธโƒฃ ุฅุนุฏุงุฏ ุงู„ุฏุฎู„ ู„ู„ู†ู…ูˆุฐุฌ
# ==========================
# SYSTEM_INSTRUCTIONS = (
# "You are a reasoning agent that follows the ReAct pattern (Thought, Action, Observation). "
# "Use available tools if necessary, and finish with 'Final Answer: ... <<END>>'"
# )
prompt = SYSTEM_INSTRUCTIONS + "\n\nQuestion:\n" + question_text
if file_path:
prompt += f"\n[Attached file path: {file_path}]"
print(f"\n๐Ÿš€ Running ReAct agent on question:\n{question_text}")
if file_path:
print(f"๐Ÿ“Ž With attachment: {file_path}")
# ==========================
# 2๏ธโƒฃ ุชู†ููŠุฐ ุงู„ูˆูƒูŠู„ ุงู„ุฑุฆูŠุณูŠ
# ==========================
result = self.agent.invoke({"input": prompt})
# ==========================
# 3๏ธโƒฃ ุจู†ุงุก ุณุฌู„ ุงู„ุชููƒูŠุฑ ุงู„ูƒุงู…ู„
# ==========================
# ุจุนุฏ ุงู„ุญุตูˆู„ ุนู„ู‰ result
if isinstance(result, dict):
output = result.get("output") or result.get("text") or str(result)
intermediate = result.get("intermediate_steps", [])
else:
output = getattr(result, "output", str(result))
intermediate = []
# ุจู†ุงุก ุงู„ุณุฌู„
full_log = [f"Question: {question_text}\n"]
if file_path:
full_log.append(f"Attachment: {file}\n")
# ุชุญุฏูŠุฏ ุงู„ุญุฏ ุงู„ุฃู‚ุตู‰ ู„ุนุฏุฏ ุงู„ุฏูˆุฑุงุช ุงู„ู…ุฑุงุฏ ุชุณุฌูŠู„ู‡ุง
MAX_LOG_STEPS = 4
# ุงุญุชูุธ ูู‚ุท ุจุขุฎุฑ 4 ุฏูˆุฑุงุช ู…ู† intermediate_steps
if intermediate:
recent_steps = intermediate[-MAX_LOG_STEPS:] if len(intermediate) > MAX_LOG_STEPS else intermediate
for step in recent_steps:
try:
action, observation = step
full_log.append(
f"Thought/Action: {getattr(action, 'log', getattr(action, 'tool', str(action)))}\n"
f"Action Input: {getattr(action, 'tool_input', getattr(action, 'input', ''))}\n"
f"Observation: {observation}\n"
)
except Exception as e:
full_log.append(f"[UNPARSEABLE STEP] {step}\n")
full_log.append(f"Final Answer: {output}\n")
conversation_log = "\n".join(full_log)
# ==========================
# 4๏ธโƒฃ ุฅุฑุณุงู„ ุงู„ุณุฌู„ ุงู„ูƒุงู…ู„ ุฅู„ู‰ ูˆูƒูŠู„ ุงู„ุฅุฌุงุจุฉ ุงู„ู†ู‡ุงุฆูŠุฉ
# ==========================
final_out = None
try:
final_out = final_client.predict_text(conversation_log)
# final_out = output
print(f"โœ… Final Answer Agent Output: {final_out}")
except Exception as e:
print(f"[โš ๏ธ] Failed to contact Final Answer Agent: {e}")
final_out = output # fallback ุฅู„ู‰ ุงู„ู†ุงุชุฌ ุงู„ู…ุญู„ูŠ ุฅู† ูุดู„
return final_out or output
except Exception as e:
print(f"[โŒ] Error while running on question: {e}")
return "Error: unable to process this question."
def run_all_and_submit(self) -> Dict[str, Any]:
questions_url =f"{GAIA_API_BASE}/questions"
submit_url =f"{GAIA_API_BASE}/submit"
print(f"Fetching questions from: {questions_url}")
max_retries = 10 # ุนุฏุฏ ุงู„ู…ุญุงูˆู„ุงุช (ูƒู„ 10 ุซูˆุงู†ู ุชู‚ุฑูŠุจู‹ุง = ุฏู‚ูŠู‚ุฉ ูˆู†ุตู ูƒุญุฏ ุฃู‚ุตู‰)
# for attempt in range(max_retries):
"""
# 2. Fetch Questions (ุจุฏู„ุงู‹ ู…ู† ุงู„ุทู„ุจ ู…ู† ุงู„ุณุจูŠุณ ุงู„ุฎุงุฑุฌูŠุฉ)
print("Fetching questions directly from GAIA dataset on Hugging Face...")
try:
dataset = load_dataset("gaia-benchmark/GAIA", "2023_level1", split="validation")
questions_data = []
for item in dataset:
metadata = item.get("Annotator Metadata", {})
num_tools = int(metadata.get("Number of tools", 99)) if metadata else 99
num_steps = int(metadata.get("Number of steps", 99)) if metadata else 99
# ู†ูุณ ูู„ุชุฑุฉ ุงู„ุฃุณุฆู„ุฉ ูƒู…ุง ููŠ ุงู„ูƒูˆุฏ ุงู„ุฃุตู„ูŠ ู„ู„ุณุจูŠุณ
if num_tools < 3 and num_steps < 6:
questions_data.append({
"task_id": str(item.get("task_id")),
"question": str(item.get("Question")),
"Level": item.get("Level"),
"file_name": item.get("file_name")
})
if not questions_data:
return "No valid questions found after filtering.", None
print(f"Fetched {len(questions_data)} questions from GAIA dataset.")
except Exception as e:
print(f"Error loading GAIA dataset directly: {e}")
return f"Error loading GAIA dataset directly: {e}", None
"""
response = requests.get(questions_url, timeout=15)
if response.status_code == 200:
questions_data = response.json()
print(questions_data)
if questions_data:
print(f"โœ… Successfully fetched {len(questions_data)} questions.")
# break
elif response.status_code == 404:
# print(f"โš ๏ธ Attempt {attempt+1}/{max_retries}: Questions not ready yet (404). Waiting 10s...")
import time; time.sleep(10)
# continue
else:
# print(f"โš ๏ธ Attempt {attempt+1}/{max_retries}: Got status {response.status_code}, retrying in 10s...")
import time; time.sleep(10)
# continue
# except requests.exceptions.RequestException as e:
# print(f"โš ๏ธ Attempt {attempt+1}/{max_retries} failed: {e}")
# import time; time.sleep(10)
# continue
# else:
# print("โŒ Exhausted all retries, could not fetch questions.")
# return "Failed to fetch questions from server after multiple attempts.", None
answers = []
results_log = []
from concurrent.futures import ThreadPoolExecutor, as_completed
answers = []
results_log = []
futures = [] # ู„ุชุฎุฒูŠู† ุงู„ู†ุชุงุฆุฌ ุงู„ู†ู‡ุงุฆูŠุฉ
for q in questions_data:
task_id = q.get("task_id")
qtext = q.get("question")
attach = q.get("file_name")
# ุชุญู…ูŠู„ ุงู„ู…ุฑูู‚ ูู‚ุท ุฅุฐุง ูƒุงู† ู…ูˆุฌูˆุฏู‹ุง ูุนู„ุงู‹
if attach and attach.strip():
file_path = self.download_gaia_attachment(q)
else:
file_path = None
# ุชู†ููŠุฐ ุงู„ุฏุงู„ุฉ ู…ุจุงุดุฑุฉ (ุชุณู„ุณู„ูŠู‹ุง)
# print(f"๐Ÿ”น Running question {task_id}: {qtext}...")
result = self.run_on_question(qtext, file_path)
# ุญูุธ ุงู„ู†ุชูŠุฌุฉ ุจุดูƒู„ ู…ุดุงุจู‡ ู„ุงุณุชุฎุฏุงู… futures ุณุงุจู‚ู‹ุง
print("โœ… All questions processed.")
answers.append({
"task_id": task_id,
"submitted_answer": result, # ๐Ÿ”น ุฃุตู„ุญู†ุง ุงุณู… ุงู„ุญู‚ู„ ุฃูŠุถู‹ุง (ุฃุฒู„ู†ุง ุงู„ุดุฑุทุฉ ุงู„ู…ุงุฆู„ุฉ ุงู„ุฒุงุฆุฏุฉ)
})
results_log.append({
"question": qtext,
"answer": result,
})
#print("โœ… All questions processed successfully.")
payload = {"username": self.username, "agent_code": f"https://huggingface.co/spaces/{SPACE_ID}/tree/main", "answers": answers}
r2 = requests.post(submit_url, json=payload, timeout=120)
r2.raise_for_status()
print("\n๐ŸŽฏ All questions processed and submitted successfully!")
print(json.dumps(results_log, indent=2))
print({"submission_result": r2.json(), "results_log": results_log})
return {"submission_result": r2.json(), "results_log": results_log}
import os
import requests
# ุชุนุฑูŠู ู…ุฌู„ุฏ ุงู„ู…ุฑูู‚ุงุช ู…ุฑุฉ ูˆุงุญุฏุฉ
ATTACHMENTS_DIR = "attachments"
os.makedirs(ATTACHMENTS_DIR, exist_ok=True)
# ุฅู†ุดุงุก Session ูˆุงุญุฏ ู…ุน ุฅุนุงุฏุฉ ุงู„ู…ุญุงูˆู„ุฉ (ุงุฎุชูŠุงุฑูŠ ู„ูƒู† ู…ุณุชุญุณู†)
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
_session = requests.Session()
retry_strategy = Retry(total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504])
_adapter = HTTPAdapter(max_retries=retry_strategy)
_session.mount("https://", _adapter)
_session.mount("http://", _adapter)
# @staticmethod
ATTACHMENTS_DIR = "attachments"
os.makedirs(ATTACHMENTS_DIR, exist_ok=True)
@staticmethod
def download_gaia_attachment(task: dict) -> str:
"""
ุชู†ุฒูŠู„ ุงู„ู…ุฑูู‚ ุงู„ู…ุฑุชุจุท ุจุงู„ุณุคุงู„ ู…ู† GAIA API ูˆุชุฎุฒูŠู†ู‡ ู…ุญู„ูŠู‹ุง.
"""
task_id = task.get("task_id")
file_name = task.get("file_name")
if not task_id or not file_name:
return None # ู„ุง ูŠูˆุฌุฏ ู…ุฑูู‚
# ุฑุงุจุท API ู„ุชุญู…ูŠู„ ุงู„ู…ู„ู
url = f"https://agents-course-unit4-scoring.hf.space/files/{file_name}"
local_path = os.path.join("attachments", file_name)
u=None
# ุชู†ุฒูŠู„ ุงู„ู…ู„ู ุฅุฐุง ู„ู… ูŠูƒู† ู…ูˆุฌูˆุฏู‹ุง ู…ุญู„ูŠู‹ุง
if not os.path.exists(local_path):
try:
r = requests.get(url, timeout=30)
r.raise_for_status()
with open(local_path, "wb") as f:
f.write(r.content)
print(f"[GAIA] Attachment downloaded: {local_path}")
u=local_path
except Exception as e:
print(f"[GAIA] Failed to download attachment: {e}")
u=url
return u
else:
print(f"[GAIA] Attachment already exists: {local_path}")
# return local_path
return u
# -------------------- LangGraph Integration --------------------
if LANGGRAPH_AVAILABLE:
from langgraph.graph import StateGraph, END
class State:
def __init__(self, question: str, file: Optional[str] = None):
self.question = question
self.file = file
self.partial_answer = None
self.final_answer = None
def reasoning_node(state: State, agent_exec: AgentExecutor) -> State:
prompt = SYSTEM_INSTRUCTIONS + "\n\n" + state.question
result = agent_exec.invoke({"input": prompt})
state.partial_answer = result.get("output") if isinstance(result, dict) else str(result)
return state
def final_node(state: State) -> State:
state.final_answer = final_client.predict_text(state.partial_answer)
return state
def build_gaia_graph(agent_exec: AgentExecutor):
builder = StateGraph(State)
builder.add_node("reasoning_agent", lambda s: reasoning_node(s, agent_exec))
builder.add_node("final_agent", final_node)
builder.add_edge("reasoning_agent", "final_agent")
builder.add_edge("final_agent", END)
return builder.compile()
# -------------------- Gradio UI --------------------
def gradio_interface():
# class DummyLLM:
# def __call__(self, *args, **kwargs):
# return ""
tracer = None
if LANGSMITH_AVAILABLE and os.getenv("LANGSMITH_API_KEY"):
try:
client = LangSmithClient(api_key=os.getenv("LANGSMITH_API_KEY"))
tracer = LangChainTracer(client=client, project_name=os.getenv("LANGSMITH_PROJECT", "gaia-project"))
except Exception:
pass
agent_exec = create_agent_executor(llm, tools, tracer=tracer)
runner = GaiaRunner(agent_exec, username=os.getenv("HF_USER", "unknown"))
def process():
result = runner.run_all_and_submit()
formatted = json.dumps(result["results_log"], indent=2)
return formatted
with gr.Blocks() as demo:
gr.Markdown("# ๐Ÿง  GAIA Multi-Agent System (LangGraph + Gradio)")
output_box = gr.Textbox(label="Results Log", lines=25)
run_button = gr.Button("Run GAIA Evaluation")
run_button.click(process, outputs=output_box)
return demo
if __name__ == "__main__":
demo = gradio_interface()
print("model is starting")
llm.invoke("Hello") # warm-up
print("โœ… Model ready")
demo.launch(debug= True, show_error=True, share=False)