# gaia_agent_restructured_langgraph_ui.py # GAIA Multi-Agent System with LangGraph + Gradio Interface """ Updates: ✅ Added Gradio UI for user interaction and visualization of results ✅ Integrated with LangGraph flow (reasoning_agent → final_agent) ✅ Prints all questions and corresponding results at the end ✅ Maintains modular design and existing architecture """ import os os.environ["LLAMA_BLAS"] = "1" os.environ["LLAMA_BLAS_VENDOR"] = "OpenBLAS" from langchain_community.llms import LlamaCpp from llama_cpp import Llama import re import json import requests import logging import gradio as gr from typing import Optional, List, Dict, Any import time from huggingface_hub import hf_hub_download from gradio_client import Client from langchain_core.tools import Tool from langchain_core.prompts import PromptTemplate from langchain.agents import create_react_agent, AgentExecutor from datasets import load_dataset from huggingface_hub import login import threading import logging logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("gradio").setLevel(logging.WARNING) # قفل عام لحماية الوصول إلى LLM/Agent داخل الخيوط llama_lock = threading.Lock() model_path = hf_hub_download( repo_id="bartowski/Qwen2.5-14B-Instruct-GGUF", filename="Qwen2.5-14B-Instruct-Q6_K_L.gguf", ) llm = LlamaCpp( model_path=model_path, n_ctx=10000, n_threads=4, n_gpu_layers=0, temperature=0.4, top_p=0.9, max_tokens=150, n_batch=64, verbose=False, use_mmap=True ) # تحقق من وجود توكن في متغير البيئة HF_TOKEN = os.getenv("HF_TOKEN") if HF_TOKEN: login(token=HF_TOKEN) else: print("⚠️ Warning: No HF_TOKEN found. Please set your Hugging Face token as an environment variable.") try: from langsmith import Client as LangSmithClient from langchain.callbacks.tracers import LangChainTracer LANGSMITH_AVAILABLE = True except Exception: LANGSMITH_AVAILABLE = False try: from langgraph.graph import StateGraph, END LANGGRAPH_AVAILABLE = True except Exception: LANGGRAPH_AVAILABLE = False logging.basicConfig(level=logging.INFO) logger = logging.getLogger("gaia_langgraph_ui") # -------------------- Configuration -------------------- HF_TOKEN = os.getenv("HF_TOKEN", "") SPACE_ID = os.getenv("SPACE_ID", "") CODE_AGENT_SPACE = os.getenv("CODE_AGENT_SPACE", "https://mustafa-albakkar-codeagent.hf.space") VISION_AGENT_SPACE = os.getenv("VISION_AGENT_SPACE", "https://mustafa-albakkar-mediaagent.hf.space") FINAL_ANSWER_SPACE = os.getenv("FINAL_ANSWER_SPACE", "https://mustafa-albakkar-finalagent.hf.space") #GAIA_API_BASE = os.getenv("GAIA_API_BASE","https://mustafa-albakkar-mmo.hf.space") GAIA_API_BASE = os.getenv("GAIA_API_BASE","https://agents-course-unit4-scoring.hf.space") import time from functools import wraps from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry """ session = requests.Session() retry_strategy = Retry( total=5, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("https://", adapter) session.mount("http://", adapter) """ def retry(exception_types=(Exception,), tries=3, delay=1, backoff=2): """ Decorator لإعادة المحاولة في حال حدوث خطأ مؤقت (مثل انقطاع الشبكة) """ def deco(f): @wraps(f) def wrapper(*args, **kwargs): _tries, _delay = tries, delay while _tries > 1: try: return f(*args, **kwargs) except exception_types as e: time.sleep(_delay) _tries -= 1 _delay *= backoff return f(*args, **kwargs) return wrapper return deco # -------------------- Sub-agent clients -------------------- class SubAgentClient: def __init__(self, space_url: str): self.space_url = space_url self._client = None @property def client(self) -> Client: if self._client is None: self._client = Client(self.space_url) return self._client @retry((Exception,), tries=3, delay=1, backoff=2) def predict_text(self, prompt: str, file: Optional[str] = None, timeout: int = 90) -> str: """ مرن في التعامل مع اختلاف واجهات الوكلاء على Spaces """ patterns = [] if file and os.path.exists(file): patterns = [ lambda: self.client.predict(prompt, file, api_name="predict"), lambda: self.client.predict(prompt, file), lambda: self.client.predict([prompt, file]), lambda: self.client.predict({"input": prompt, "file": open(file, "rb")}) ] else: patterns = [ lambda: self.client.predict(prompt, api_name="predict"), lambda: self.client.predict(prompt), lambda: self.client.predict([prompt]) ] last_exc = None for call in patterns: try: res = call() if isinstance(res, (list, tuple)): return " ".join(str(x) for x in res if x) return str(res) except Exception as e: last_exc = e continue raise last_exc # Instantiate sub-agent clients code_client = SubAgentClient(CODE_AGENT_SPACE) vision_client = SubAgentClient(VISION_AGENT_SPACE) final_client = SubAgentClient(FINAL_ANSWER_SPACE) # -------------------- Tools -------------------- WIKI_HEADERS = {"User-Agent": "GAIA-Agent/1.0 (https://huggingface.co/)"} from functools import lru_cache @lru_cache(maxsize=512) def wiki_search(query: str) -> str: if not query: return "WIKI_ERROR: empty query" try: url = "https://en.wikipedia.org/w/api.php" params = {"action": "query", "list": "search", "srsearch": query, "format": "json", "utf8": 1} r = requests.get(url, params=params, headers=WIKI_HEADERS, timeout=12) r.raise_for_status() data = r.json() hits = data.get("query", {}).get("search", []) if not hits: return "WIKI_NO_RESULTS" snippets = [] for h in hits[:5]: title = h.get("title") snippet = re.sub(r"<.*?>", "", h.get("snippet", "")) snippets.append(f"{title}: {snippet}") return "\n".join(snippets) except Exception as e: return f"WIKI_ERROR: {e}" try: from ddgs import DDGS DDGS_AVAILABLE = True except Exception: DDGS_AVAILABLE = False def internet_search(query: str, max_results: int = 8) -> str: if not query: return "SEARCH_ERROR: empty query" if not DDGS_AVAILABLE: return "SEARCH_ERROR: ddgs not installed" try: results = [] with DDGS() as ddgs: for r in ddgs.text(query, max_results=max_results): title = r.get("title", "") href = r.get("href", "") body = r.get("body", "") results.append(f"{title}\n{href}\n{body}") return "\n---\n".join(results) if results else "SEARCH_NO_RESULTS" except Exception as e: return f"SEARCH_ERROR: {e}" def analyze_media(media_input: str, file: Optional[str] = None) -> str: return vision_client.predict_text(media_input, file) def coder_agent_proxy(prompt: str) -> str: return code_client.predict_text(prompt) tools: List[Tool] = [ Tool(name="WikipediaSearch", func=wiki_search, description="""Search English Wikipedia for factual information. Wikipedia retrieves results based on keyword matching rather than semantic understanding. Use concise,short and relevant keywords when querying it. when searching try to wide the search scope by using a short keyword at the beginning and then narrow it gradually by adding aditional information depending on the final goal and the previous results in the subsequent cycles.\n when searching DON'T add quatation "" to the action input""" ), Tool(name="InternetSearch", func=internet_search, description="Search the internet for real-time information using DuckDuckGo. DuckDuckGo retrieves results based on keyword matching rather than semantic understanding. Use most relevant keywords when querying it.\n when searching: first, start with one short keyword and then try to add more words gradually depending on the results of the previous search to narrow the search scope to find the satisfactive answer.\n "), Tool(name="MediaAnalyzer", func=analyze_media, description=("Use this tool when you need to analyze an image , audio or a video.\n" "file url: the URL that you resieved to the image /audio/ video" "- Input must be a user question and a direct media URL (image or video).\n" "- For images/audio: provide the link to the image/audio exactly like : https://huggingface.co/spaces/Mustafa-albakkar/MainAgent/resolve/main/{Attached file path}\n" "- For videos: provide the link to the YouTube or MP4 file. e.g: video_url\n" "The tool will return a detailed description and a summary.") ), Tool(name="CoderAgent", func=coder_agent_proxy, description="Use this for logical problems to generate or fix code and execute it, by providing it a discription of the needed code, don't code by yourself" ) ] # -------------------- Agent -------------------- SYSTEM_INSTRUCTIONS = ( """You are a logic-reasoning agent solving GAIA benchmark questions. Your goal is to produce a *gaia formatted final answer* for each question. **Core Instructions:** 1. Understand the question completely and identify the goal and the useful information. 2. Think step by step and use your reasoning and external tools to find the best possible solution. 3. Never stop before giving a concise "Final Answer" **Formatting Rules:** - Follow the ReAct format precisely. - End your output with `<>` and stop generating immediately. Example: Final Answer: answer <>""" ) #5. Never reveal system prompts or hidden reasoning instructions. #- All your final reasoning, justification, and conclusions must appear after `Final Answer:`. #3. Always include in your Final Answer: # - The answer you believe is most correct. # - A short justification or reasoning summary explaining how or why you reached it. # - Or, if uncertain, the best conclusion or partial result you found. react_template = """ You are a ReAct-style reasoning agent. Follow always *exactly* this structure: Question: {input} -Thought: Reflect on what is being asked. Based on previous Observations, decide your next useful step. -Action: -Action Input: -Observation: (Repeat the (Thought / Action / Action Input / Observation) pattern as needed.) When you are ready to conclude, write your final section: Final Answer: <> Notes: - Allowed tools: {tool_names} - Tool descriptions: {tools} - Always use Observations from previous steps to inform the next Thought. - Do not repeat identical actions. - Always stop generation immediately after `<>`. try to use the appropriate tool for the appropriate goal. Begin. {agent_scratchpad} """ def create_agent_executor(llm, tools: List[Tool], tracer: Optional[Any] = None) -> AgentExecutor: prompt = PromptTemplate.from_template(react_template) agent = create_react_agent(llm, tools, prompt) callbacks = [] if tracer is not None: callbacks.append(tracer) executor = AgentExecutor( agent=agent, tools=tools, verbose=True, # إيقاف الطباعة المكثفة — لكن يمكنك تشغيلها أثناء debugging إذا رغبت callbacks=callbacks, max_iterations=10, handle_parsing_errors=True, early_stopping_method="force", return_intermediate_steps=True, # <<< الأهم — اطلب إعادة intermediate_steps # trim_intermediate_steps=[-1:-2] # -1 => لا تقص الخطوات قبل الإرجاع (أو حدد عددًا إن أردت الحد) ) return executor # -------------------- GAIA Runner -------------------- class GaiaRunner: def __init__(self, agent_executor: AgentExecutor, username: str = "unknown"): self.agent = agent_executor self.username = username def run_on_question(self, question_text: str, file_path: Optional[str] = None) -> str: """ تشغيل الوكيل الرئيسي (ReAct agent) على سؤال واحد مع إمكانية وجود مرفق. يجمع كل خطوات التفكير (Thought → Action → Observation) ويرسلها كوحدة واحدة إلى وكيل الإجابة النهائية. """ try: # ========================== # 1️⃣ إعداد الدخل للنموذج # ========================== # SYSTEM_INSTRUCTIONS = ( # "You are a reasoning agent that follows the ReAct pattern (Thought, Action, Observation). " # "Use available tools if necessary, and finish with 'Final Answer: ... <>'" # ) prompt = SYSTEM_INSTRUCTIONS + "\n\nQuestion:\n" + question_text if file_path: prompt += f"\n[Attached file path: {file_path}]" print(f"\n🚀 Running ReAct agent on question:\n{question_text}") if file_path: print(f"📎 With attachment: {file_path}") # ========================== # 2️⃣ تنفيذ الوكيل الرئيسي # ========================== result = self.agent.invoke({"input": prompt}) # ========================== # 3️⃣ بناء سجل التفكير الكامل # ========================== # بعد الحصول على result if isinstance(result, dict): output = result.get("output") or result.get("text") or str(result) intermediate = result.get("intermediate_steps", []) else: output = getattr(result, "output", str(result)) intermediate = [] # بناء السجل full_log = [f"Question: {question_text}\n"] if file_path: full_log.append(f"Attachment: {file}\n") # تحديد الحد الأقصى لعدد الدورات المراد تسجيلها MAX_LOG_STEPS = 4 # احتفظ فقط بآخر 4 دورات من intermediate_steps if intermediate: recent_steps = intermediate[-MAX_LOG_STEPS:] if len(intermediate) > MAX_LOG_STEPS else intermediate for step in recent_steps: try: action, observation = step full_log.append( f"Thought/Action: {getattr(action, 'log', getattr(action, 'tool', str(action)))}\n" f"Action Input: {getattr(action, 'tool_input', getattr(action, 'input', ''))}\n" f"Observation: {observation}\n" ) except Exception as e: full_log.append(f"[UNPARSEABLE STEP] {step}\n") full_log.append(f"Final Answer: {output}\n") conversation_log = "\n".join(full_log) # ========================== # 4️⃣ إرسال السجل الكامل إلى وكيل الإجابة النهائية # ========================== final_out = None try: final_out = final_client.predict_text(conversation_log) # final_out = output print(f"✅ Final Answer Agent Output: {final_out}") except Exception as e: print(f"[⚠️] Failed to contact Final Answer Agent: {e}") final_out = output # fallback إلى الناتج المحلي إن فشل return final_out or output except Exception as e: print(f"[❌] Error while running on question: {e}") return "Error: unable to process this question." def run_all_and_submit(self) -> Dict[str, Any]: questions_url =f"{GAIA_API_BASE}/questions" submit_url =f"{GAIA_API_BASE}/submit" print(f"Fetching questions from: {questions_url}") max_retries = 10 # عدد المحاولات (كل 10 ثوانٍ تقريبًا = دقيقة ونصف كحد أقصى) # for attempt in range(max_retries): """ # 2. Fetch Questions (بدلاً من الطلب من السبيس الخارجية) print("Fetching questions directly from GAIA dataset on Hugging Face...") try: dataset = load_dataset("gaia-benchmark/GAIA", "2023_level1", split="validation") questions_data = [] for item in dataset: metadata = item.get("Annotator Metadata", {}) num_tools = int(metadata.get("Number of tools", 99)) if metadata else 99 num_steps = int(metadata.get("Number of steps", 99)) if metadata else 99 # نفس فلترة الأسئلة كما في الكود الأصلي للسبيس if num_tools < 3 and num_steps < 6: questions_data.append({ "task_id": str(item.get("task_id")), "question": str(item.get("Question")), "Level": item.get("Level"), "file_name": item.get("file_name") }) if not questions_data: return "No valid questions found after filtering.", None print(f"Fetched {len(questions_data)} questions from GAIA dataset.") except Exception as e: print(f"Error loading GAIA dataset directly: {e}") return f"Error loading GAIA dataset directly: {e}", None """ response = requests.get(questions_url, timeout=15) if response.status_code == 200: questions_data = response.json() print(questions_data) if questions_data: print(f"✅ Successfully fetched {len(questions_data)} questions.") # break elif response.status_code == 404: # print(f"⚠️ Attempt {attempt+1}/{max_retries}: Questions not ready yet (404). Waiting 10s...") import time; time.sleep(10) # continue else: # print(f"⚠️ Attempt {attempt+1}/{max_retries}: Got status {response.status_code}, retrying in 10s...") import time; time.sleep(10) # continue # except requests.exceptions.RequestException as e: # print(f"⚠️ Attempt {attempt+1}/{max_retries} failed: {e}") # import time; time.sleep(10) # continue # else: # print("❌ Exhausted all retries, could not fetch questions.") # return "Failed to fetch questions from server after multiple attempts.", None answers = [] results_log = [] from concurrent.futures import ThreadPoolExecutor, as_completed answers = [] results_log = [] futures = [] # لتخزين النتائج النهائية for q in questions_data: task_id = q.get("task_id") qtext = q.get("question") attach = q.get("file_name") # تحميل المرفق فقط إذا كان موجودًا فعلاً if attach and attach.strip(): file_path = self.download_gaia_attachment(q) else: file_path = None # تنفيذ الدالة مباشرة (تسلسليًا) # print(f"🔹 Running question {task_id}: {qtext}...") result = self.run_on_question(qtext, file_path) # حفظ النتيجة بشكل مشابه لاستخدام futures سابقًا print("✅ All questions processed.") answers.append({ "task_id": task_id, "submitted_answer": result, # 🔹 أصلحنا اسم الحقل أيضًا (أزلنا الشرطة المائلة الزائدة) }) results_log.append({ "question": qtext, "answer": result, }) #print("✅ All questions processed successfully.") payload = {"username": self.username, "agent_code": f"https://huggingface.co/spaces/{SPACE_ID}/tree/main", "answers": answers} r2 = requests.post(submit_url, json=payload, timeout=120) r2.raise_for_status() print("\n🎯 All questions processed and submitted successfully!") print(json.dumps(results_log, indent=2)) print({"submission_result": r2.json(), "results_log": results_log}) return {"submission_result": r2.json(), "results_log": results_log} import os import requests # تعريف مجلد المرفقات مرة واحدة ATTACHMENTS_DIR = "attachments" os.makedirs(ATTACHMENTS_DIR, exist_ok=True) # إنشاء Session واحد مع إعادة المحاولة (اختياري لكن مستحسن) from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry _session = requests.Session() retry_strategy = Retry(total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504]) _adapter = HTTPAdapter(max_retries=retry_strategy) _session.mount("https://", _adapter) _session.mount("http://", _adapter) # @staticmethod ATTACHMENTS_DIR = "attachments" os.makedirs(ATTACHMENTS_DIR, exist_ok=True) @staticmethod def download_gaia_attachment(task: dict) -> str: """ تنزيل المرفق المرتبط بالسؤال من GAIA API وتخزينه محليًا. """ task_id = task.get("task_id") file_name = task.get("file_name") if not task_id or not file_name: return None # لا يوجد مرفق # رابط API لتحميل الملف url = f"https://agents-course-unit4-scoring.hf.space/files/{file_name}" local_path = os.path.join("attachments", file_name) u=None # تنزيل الملف إذا لم يكن موجودًا محليًا if not os.path.exists(local_path): try: r = requests.get(url, timeout=30) r.raise_for_status() with open(local_path, "wb") as f: f.write(r.content) print(f"[GAIA] Attachment downloaded: {local_path}") u=local_path except Exception as e: print(f"[GAIA] Failed to download attachment: {e}") u=url return u else: print(f"[GAIA] Attachment already exists: {local_path}") # return local_path return u # -------------------- LangGraph Integration -------------------- if LANGGRAPH_AVAILABLE: from langgraph.graph import StateGraph, END class State: def __init__(self, question: str, file: Optional[str] = None): self.question = question self.file = file self.partial_answer = None self.final_answer = None def reasoning_node(state: State, agent_exec: AgentExecutor) -> State: prompt = SYSTEM_INSTRUCTIONS + "\n\n" + state.question result = agent_exec.invoke({"input": prompt}) state.partial_answer = result.get("output") if isinstance(result, dict) else str(result) return state def final_node(state: State) -> State: state.final_answer = final_client.predict_text(state.partial_answer) return state def build_gaia_graph(agent_exec: AgentExecutor): builder = StateGraph(State) builder.add_node("reasoning_agent", lambda s: reasoning_node(s, agent_exec)) builder.add_node("final_agent", final_node) builder.add_edge("reasoning_agent", "final_agent") builder.add_edge("final_agent", END) return builder.compile() # -------------------- Gradio UI -------------------- def gradio_interface(): # class DummyLLM: # def __call__(self, *args, **kwargs): # return "" tracer = None if LANGSMITH_AVAILABLE and os.getenv("LANGSMITH_API_KEY"): try: client = LangSmithClient(api_key=os.getenv("LANGSMITH_API_KEY")) tracer = LangChainTracer(client=client, project_name=os.getenv("LANGSMITH_PROJECT", "gaia-project")) except Exception: pass agent_exec = create_agent_executor(llm, tools, tracer=tracer) runner = GaiaRunner(agent_exec, username=os.getenv("HF_USER", "unknown")) def process(): result = runner.run_all_and_submit() formatted = json.dumps(result["results_log"], indent=2) return formatted with gr.Blocks() as demo: gr.Markdown("# 🧠 GAIA Multi-Agent System (LangGraph + Gradio)") output_box = gr.Textbox(label="Results Log", lines=25) run_button = gr.Button("Run GAIA Evaluation") run_button.click(process, outputs=output_box) return demo if __name__ == "__main__": demo = gradio_interface() print("model is starting") llm.invoke("Hello") # warm-up print("✅ Model ready") demo.launch(debug= True, show_error=True, share=False)