| """ |
| GAIA Agent v5 β With Vision and Audio Transcription! |
| Target: 40%+ (8+/20) |
| """ |
| import os |
| import re |
| import io |
| import time |
| import base64 |
| import traceback |
| import gradio as gr |
| import requests |
| import pandas as pd |
| from bs4 import BeautifulSoup |
| from typing import Optional, Tuple, List, Dict, Any |
|
|
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
| GROQ_API = "https://api.groq.com/openai/v1/chat/completions" |
| GROQ_AUDIO_API = "https://api.groq.com/openai/v1/audio/transcriptions" |
|
|
| |
| GROQ_MODELS = [ |
| "llama-3.3-70b-versatile", |
| "llama-3.1-70b-versatile", |
| "mixtral-8x7b-32768", |
| ] |
|
|
| GROQ_VISION_MODEL = "llama-3.2-90b-vision-preview" |
| GROQ_AUDIO_MODEL = "whisper-large-v3" |
|
|
| |
| |
| |
|
|
| def transcribe_audio(audio_bytes: bytes, groq_key: str, filename: str = "audio.mp3") -> str: |
| """Transcribe audio using Groq Whisper API.""" |
| if not groq_key or not audio_bytes: |
| return "" |
| |
| try: |
| print(f" π€ Transcribing audio ({len(audio_bytes)/1024:.1f} KB)...") |
| |
| files = { |
| 'file': (filename, audio_bytes, 'audio/mpeg'), |
| 'model': (None, GROQ_AUDIO_MODEL), |
| } |
| |
| resp = requests.post( |
| GROQ_AUDIO_API, |
| headers={"Authorization": f"Bearer {groq_key}"}, |
| files=files, |
| timeout=60, |
| ) |
| |
| if resp.status_code == 200: |
| result = resp.json() |
| text = result.get("text", "") |
| print(f" β
Transcribed: {text[:100]}...") |
| return text |
| else: |
| print(f" β οΈ Audio transcription failed: {resp.status_code} - {resp.text[:200]}") |
| return "" |
| except Exception as e: |
| print(f" β οΈ Audio transcription error: {e}") |
| return "" |
|
|
|
|
| def analyze_image(image_bytes: bytes, question: str, groq_key: str) -> str: |
| """Analyze image using Groq Vision API.""" |
| if not groq_key or not image_bytes: |
| return "" |
| |
| try: |
| print(f" πΌοΈ Analyzing image ({len(image_bytes)/1024:.1f} KB)...") |
| |
| |
| image_b64 = base64.b64encode(image_bytes).decode('utf-8') |
| |
| |
| if image_bytes[:8] == b'\x89PNG\r\n\x1a\n': |
| mime_type = "image/png" |
| elif image_bytes[:2] == b'\xff\xd8': |
| mime_type = "image/jpeg" |
| elif image_bytes[:6] in (b'GIF87a', b'GIF89a'): |
| mime_type = "image/gif" |
| else: |
| mime_type = "image/png" |
| |
| messages = [ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "text", |
| "text": f"Look at this image and answer the question precisely. Give ONLY the answer, no explanation.\n\nQuestion: {question}" |
| }, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:{mime_type};base64,{image_b64}" |
| } |
| } |
| ] |
| } |
| ] |
| |
| resp = requests.post( |
| GROQ_API, |
| headers={ |
| "Authorization": f"Bearer {groq_key}", |
| "Content-Type": "application/json" |
| }, |
| json={ |
| "model": GROQ_VISION_MODEL, |
| "messages": messages, |
| "temperature": 0.1, |
| "max_tokens": 300, |
| }, |
| timeout=60, |
| ) |
| |
| if resp.status_code == 200: |
| result = resp.json() |
| answer = result.get("choices", [{}])[0].get("message", {}).get("content", "") |
| print(f" β
Vision response: {answer[:100]}...") |
| return answer |
| else: |
| print(f" β οΈ Vision failed: {resp.status_code} - {resp.text[:200]}") |
| return "" |
| except Exception as e: |
| print(f" β οΈ Vision error: {e}") |
| return "" |
|
|
|
|
| |
| |
| |
|
|
| def fetch_webpage(url: str, timeout: int = 15) -> str: |
| """Fetch and extract text from a webpage.""" |
| try: |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
| "Accept-Language": "en-US,en;q=0.5", |
| } |
| resp = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True) |
| resp.raise_for_status() |
| |
| soup = BeautifulSoup(resp.text, "html.parser") |
| |
| |
| for el in soup(["script", "style", "nav", "footer", "header", "aside", "noscript", "iframe", "form"]): |
| el.extract() |
| |
| |
| main_content = soup.find("main") or soup.find("article") or soup.find("div", {"class": re.compile(r"content|main|article", re.I)}) |
| if main_content: |
| text = main_content.get_text("\n", strip=True) |
| else: |
| text = soup.get_text("\n", strip=True) |
| |
| lines = [l.strip() for l in text.splitlines() if l.strip() and len(l.strip()) > 2] |
| return "\n".join(lines)[:10000] |
| except Exception as e: |
| print(f" β οΈ Webpage fetch error: {e}") |
| return "" |
|
|
|
|
| def fetch_youtube_transcript(url: str) -> str: |
| """Fetch YouTube video transcript with multiple fallback methods.""" |
| try: |
| from youtube_transcript_api import YouTubeTranscriptApi |
| |
| |
| patterns = [ |
| r"(?:v=|/v/|youtu\.be/|embed/|shorts/)([a-zA-Z0-9_-]{11})", |
| r"^([a-zA-Z0-9_-]{11})$" |
| ] |
| vid = None |
| for pattern in patterns: |
| match = re.search(pattern, url) |
| if match: |
| vid = match.group(1) |
| break |
| |
| if not vid: |
| print(f" β οΈ Could not extract video ID from: {url}") |
| return "" |
| |
| print(f" πΊ Video ID: {vid}") |
| |
| |
| ytt_api = YouTubeTranscriptApi() |
| |
| |
| lang_options = [ |
| ("en",), |
| ("en", "en-US", "en-GB"), |
| ("it", "it-IT"), |
| ("en", "it", "fr", "de", "es", "pt"), |
| ] |
| |
| for langs in lang_options: |
| try: |
| transcript = ytt_api.fetch(vid, languages=langs) |
| |
| text = " ".join([snippet.text for snippet in transcript]) |
| if text: |
| print(f" β Got transcript ({len(text)} chars, langs: {langs})") |
| return text[:8000] |
| except Exception as e: |
| continue |
| |
| |
| try: |
| transcript_list = ytt_api.list(vid) |
| |
| |
| for t in transcript_list: |
| if not t.is_generated: |
| try: |
| fetched = t.fetch() |
| text = " ".join([snippet.text for snippet in fetched]) |
| if text: |
| print(f" β Got manual transcript ({len(text)} chars)") |
| return text[:8000] |
| except: |
| pass |
| |
| |
| for t in transcript_list: |
| if t.is_generated: |
| try: |
| fetched = t.fetch() |
| text = " ".join([snippet.text for snippet in fetched]) |
| if text: |
| print(f" β Got auto transcript ({len(text)} chars)") |
| return text[:8000] |
| except: |
| pass |
| |
| |
| for t in transcript_list: |
| try: |
| translated = t.translate('en') |
| fetched = translated.fetch() |
| text = " ".join([snippet.text for snippet in fetched]) |
| if text: |
| print(f" β Got translated transcript ({len(text)} chars)") |
| return text[:8000] |
| except: |
| pass |
| |
| except Exception as e: |
| print(f" β οΈ Transcript list error: {e}") |
| |
| return "" |
| except ImportError: |
| print(" β οΈ youtube_transcript_api not installed") |
| return "" |
| except Exception as e: |
| print(f" β οΈ YouTube error: {e}") |
| return "" |
|
|
|
|
| def fetch_task_file(task_id: str) -> Tuple[str, str, Optional[bytes]]: |
| """Fetch and parse attached file for a task. Returns (content_str, file_type, raw_bytes_for_media).""" |
| try: |
| url = f"{DEFAULT_API_URL}/files/{task_id}" |
| resp = requests.get(url, timeout=30) |
| |
| if resp.status_code == 404: |
| return "", "none", None |
| if resp.status_code != 200: |
| print(f" β οΈ File fetch failed: {resp.status_code}") |
| return "", "none", None |
| |
| ct = resp.headers.get("Content-Type", "").lower() |
| cd = resp.headers.get("Content-Disposition", "") |
| |
| |
| filename = "" |
| if "filename=" in cd: |
| filename = cd.split("filename=")[-1].strip('" ') |
| ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" |
| |
| print(f" π File: {filename or 'unknown'}, type: {ct[:50]}") |
|
|
| |
| if any(t in ct for t in ["text/", "json", "javascript", "python"]) or ext in ["txt", "csv", "json", "py", "md", "js", "html"]: |
| text = resp.text |
| |
| |
| if ext == "csv" or "csv" in ct: |
| try: |
| df = pd.read_csv(io.StringIO(text)) |
| summary = f"CSV file with {len(df)} rows and columns: {list(df.columns)}\n" |
| summary += f"Data:\n{df.to_string()}" |
| return summary[:8000], "csv", None |
| except Exception as e: |
| print(f" β οΈ CSV parse error: {e}") |
| |
| |
| if ext == "py": |
| return f"Python code:\n```python\n{text[:6000]}\n```", "python", None |
| |
| return text[:8000], "text", None |
|
|
| |
| if "spreadsheet" in ct or "excel" in ct or ext in ["xlsx", "xls"]: |
| try: |
| df = pd.read_excel(io.BytesIO(resp.content), engine="openpyxl") |
| summary = f"Excel file with {len(df)} rows and columns: {list(df.columns)}\n" |
| summary += f"Data:\n{df.to_string()}" |
| return summary[:8000], "excel", None |
| except Exception as e: |
| print(f" β οΈ Excel parse error: {e}") |
| try: |
| df = pd.read_excel(io.BytesIO(resp.content)) |
| summary = f"Excel file with {len(df)} rows and columns: {list(df.columns)}\n" |
| summary += f"Data:\n{df.to_string()}" |
| return summary[:8000], "excel", None |
| except: |
| return "Excel file (could not parse)", "excel", None |
|
|
| |
| if "pdf" in ct or ext == "pdf": |
| try: |
| import PyPDF2 |
| reader = PyPDF2.PdfReader(io.BytesIO(resp.content)) |
| text_parts = [] |
| for i, page in enumerate(reader.pages): |
| page_text = page.extract_text() or "" |
| if page_text: |
| text_parts.append(f"--- Page {i+1} ---\n{page_text}") |
| text = "\n".join(text_parts) |
| return text[:8000] if text else "PDF (no extractable text)", "pdf", None |
| except ImportError: |
| print(" β οΈ PyPDF2 not installed") |
| return "PDF file (PyPDF2 not available)", "pdf", None |
| except Exception as e: |
| print(f" β οΈ PDF parse error: {e}") |
| return "PDF file (parse error)", "pdf", None |
|
|
| |
| if "audio" in ct or ext in ["mp3", "wav", "m4a", "ogg", "flac"]: |
| size_kb = len(resp.content) / 1024 |
| print(f" π΅ Audio file detected ({size_kb:.1f} KB) - will transcribe") |
| return f"Audio file ({ext or 'unknown'}, {size_kb:.1f} KB)", "audio", resp.content |
|
|
| |
| if "image" in ct or ext in ["png", "jpg", "jpeg", "gif", "webp", "bmp"]: |
| size_kb = len(resp.content) / 1024 |
| print(f" πΌοΈ Image file detected ({size_kb:.1f} KB) - will analyze") |
| return f"Image file ({ext or 'unknown'}, {size_kb:.1f} KB)", "image", resp.content |
|
|
| |
| try: |
| text = resp.content.decode("utf-8") |
| return text[:8000], "text", None |
| except: |
| try: |
| text = resp.content.decode("latin-1") |
| return text[:8000], "text", None |
| except: |
| return f"Binary file ({ct or 'unknown type'}, {len(resp.content)} bytes)", "binary", None |
| |
| except requests.exceptions.Timeout: |
| print(" β οΈ File fetch timeout") |
| return "", "none", None |
| except Exception as e: |
| print(f" β οΈ File fetch error: {e}") |
| return "", "none", None |
|
|
|
|
| def web_search(query: str, max_results: int = 5) -> List[Dict[str, str]]: |
| """Search the web and return results.""" |
| results = [] |
| |
| |
| try: |
| from ddgs import DDGS |
| ddgs = DDGS() |
| for r in ddgs.text(query, max_results=max_results): |
| results.append({ |
| "title": r.get("title", ""), |
| "body": r.get("body", ""), |
| "href": r.get("href", "") |
| }) |
| if results: |
| print(f" π ddgs found {len(results)} results") |
| return results |
| except ImportError: |
| pass |
| except Exception as e: |
| print(f" β οΈ ddgs error: {e}") |
| |
| |
| try: |
| from duckduckgo_search import DDGS |
| with DDGS() as ddgs: |
| for r in ddgs.text(query, max_results=max_results): |
| results.append({ |
| "title": r.get("title", ""), |
| "body": r.get("body", ""), |
| "href": r.get("href", "") |
| }) |
| if results: |
| print(f" π DDG found {len(results)} results") |
| return results |
| except ImportError: |
| print(" β οΈ duckduckgo-search not installed") |
| except Exception as e: |
| print(f" β οΈ DDG error: {e}") |
| |
| return results |
|
|
|
|
| def search_wikipedia(query: str) -> str: |
| """Search Wikipedia and return article content.""" |
| try: |
| headers = { |
| "User-Agent": "GAIAAgent/1.0 (https://huggingface.co/spaces; contact@example.com)" |
| } |
| |
| |
| search_url = "https://en.wikipedia.org/w/api.php" |
| params = { |
| "action": "query", |
| "list": "search", |
| "srsearch": query, |
| "format": "json", |
| "srlimit": 3 |
| } |
| resp = requests.get(search_url, params=params, headers=headers, timeout=10) |
| |
| if resp.status_code != 200: |
| print(f" β οΈ Wikipedia search HTTP {resp.status_code}") |
| return "" |
| |
| data = resp.json() |
| |
| results = data.get("query", {}).get("search", []) |
| if not results: |
| return "" |
| |
| |
| title = results[0]["title"] |
| |
| |
| encoded_title = requests.utils.quote(title.replace(' ', '_')) |
| content_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{encoded_title}" |
| resp = requests.get(content_url, headers=headers, timeout=10) |
| |
| if resp.status_code == 200: |
| article = resp.json() |
| extract = article.get("extract", "") |
| if extract: |
| print(f" π Wikipedia: {title}") |
| return f"Wikipedia - {title}:\n{extract}" |
| |
| return "" |
| except requests.exceptions.Timeout: |
| print(f" β οΈ Wikipedia timeout") |
| return "" |
| except Exception as e: |
| print(f" β οΈ Wikipedia error: {e}") |
| return "" |
|
|
|
|
| |
| |
| |
|
|
| def ask_groq(messages: List[Dict], groq_key: str, max_tokens: int = 400, temperature: float = 0.1, model: str = None) -> str: |
| """Send request to Groq API with retries and model fallback.""" |
| if not groq_key: |
| print(" β GROQ_API_KEY is empty!") |
| return "" |
| |
| |
| models_to_try = [model] if model else GROQ_MODELS |
| |
| for model_name in models_to_try: |
| for attempt in range(2): |
| try: |
| resp = requests.post( |
| GROQ_API, |
| headers={ |
| "Authorization": f"Bearer {groq_key}", |
| "Content-Type": "application/json" |
| }, |
| json={ |
| "model": model_name, |
| "messages": messages, |
| "temperature": temperature, |
| "max_tokens": max_tokens, |
| }, |
| timeout=60, |
| ) |
| |
| if resp.status_code == 200: |
| result = resp.json() |
| content = result.get("choices", [{}])[0].get("message", {}).get("content", "") |
| if content: |
| print(f" π [{model_name}] Response: {content[:80]}...") |
| return content.strip() |
| else: |
| print(f" β οΈ [{model_name}] Empty content") |
| elif resp.status_code == 429: |
| wait_time = 10 * (attempt + 1) |
| print(f" β³ [{model_name}] Rate limited, waiting {wait_time}s...") |
| time.sleep(wait_time) |
| elif resp.status_code == 401: |
| print(f" β Groq API key invalid!") |
| return "" |
| elif resp.status_code == 404: |
| print(f" β οΈ Model {model_name} not found, trying next...") |
| break |
| else: |
| print(f" β οΈ [{model_name}] HTTP {resp.status_code}: {resp.text[:200]}") |
| time.sleep(3) |
| except requests.exceptions.Timeout: |
| print(f" β οΈ [{model_name}] Timeout (attempt {attempt + 1}/2)") |
| time.sleep(5) |
| except Exception as e: |
| print(f" β οΈ [{model_name}] Error: {type(e).__name__}: {e}") |
| time.sleep(3) |
| |
| print(" β All Groq attempts failed") |
| return "" |
|
|
|
|
| |
| |
| |
|
|
| def preprocess_question(question: str) -> str: |
| """Handle reversed or scrambled text.""" |
| stripped = question.strip() |
| |
| |
| reversed_text = stripped[::-1] |
| |
| |
| keywords = ["answer", "what", "who", "how", "find", "list", "which", "where", |
| "when", "the", "is", "are", "was", "were", "has", "have", "this", |
| "that", "from", "with", "about", "question", "video", "image", |
| "write", "opposite", "sentence", "if", "you", "understand"] |
| |
| orig_score = sum(1 for w in keywords if w in stripped.lower()) |
| rev_score = sum(1 for w in keywords if w in reversed_text.lower()) |
| |
| print(f" π Text analysis: orig_keywords={orig_score}, rev_keywords={rev_score}") |
| |
| |
| if rev_score > orig_score + 1 and len(stripped) > 20: |
| print(f" π Detected reversed text!") |
| print(f" π Reversed: {reversed_text[:100]}...") |
| return reversed_text |
| |
| |
| if stripped and stripped[0] in '.!?,;:' and rev_score >= orig_score: |
| print(f" π Text starts with punctuation, trying reversed") |
| print(f" π Reversed: {reversed_text[:100]}...") |
| return reversed_text |
| |
| return stripped |
|
|
|
|
| def clean_answer(raw: str) -> str: |
| """Extract and clean the final answer from LLM response.""" |
| if not raw: |
| return "" |
| |
| answer = raw.strip() |
| |
| |
| for line in answer.split("\n"): |
| line = line.strip() |
| if line and not line.startswith("#"): |
| answer = line |
| break |
| |
| |
| prefixes = [ |
| "the answer is:", "the answer is", "answer:", "answer is:", |
| "final answer:", "final answer is:", "the final answer is:", |
| "the correct answer is:", "the correct answer is", |
| "result:", "the result is:", |
| "based on my analysis,", "based on my analysis", |
| "based on the", "according to", |
| "sure,", "here is", "here's", "i found that" |
| ] |
| |
| |
| changed = True |
| max_iterations = 10 |
| iterations = 0 |
| while changed and iterations < max_iterations: |
| changed = False |
| iterations += 1 |
| answer_lower = answer.lower() |
| for prefix in prefixes: |
| if answer_lower.startswith(prefix): |
| answer = answer[len(prefix):].strip() |
| changed = True |
| break |
| |
| |
| |
| while answer and answer[-1] in '.,:;!': |
| char = answer[-1] |
| if char == '.' and len(answer) >= 2: |
| |
| before = answer[:-1] |
| |
| if '.' in before: |
| |
| answer = answer[:-1].strip() |
| elif before and before[-1].isdigit(): |
| |
| |
| |
| test_str = before.lstrip('-') |
| if test_str.isdigit(): |
| |
| answer = answer[:-1].strip() |
| else: |
| |
| answer = answer[:-1].strip() |
| else: |
| answer = answer[:-1].strip() |
| else: |
| answer = answer[:-1].strip() |
| |
| |
| answer = answer.replace("**", "").strip('"\'`') |
| |
| return answer.strip() |
|
|
|
|
| def is_valid_answer(answer: str) -> bool: |
| """Check if an answer is valid (not a refusal or error).""" |
| if not answer or len(answer.strip()) < 1: |
| return False |
| |
| |
| if len(answer) > 150: |
| print(f" β οΈ Answer too long ({len(answer)} chars), likely not a direct answer") |
| return False |
| |
| |
| refusal_starts = [ |
| "no image", "no information", "no transcript", "no data", |
| "i do not", "i don't", "i cannot", "i can't", "i am not able", |
| "unable to", "cannot determine", "not able to", |
| "without access", "i'm not sure", "i am unable", |
| "there is no", "there's no", "no file", "no video" |
| ] |
| |
| answer_lower = answer.lower().strip() |
| for phrase in refusal_starts: |
| if answer_lower.startswith(phrase): |
| print(f" β οΈ Answer starts with refusal: '{phrase}'") |
| return False |
| |
| invalid_phrases = [ |
| "i don't know", "i dont know", "i do not know", |
| "n/a", "error", |
| "i cannot", "i can't", "i cant", |
| "not available", "no answer", "unable to", |
| "i'm not sure", "im not sure", "i am not sure", |
| "no image", "cannot determine", "insufficient information", |
| "not provided", "cannot access", "i'm unable", "i am unable", |
| "not able to", "i am not able", "however,", "based on typical", |
| "without access", "no transcript", "no information" |
| ] |
| |
| return not any(phrase in answer_lower for phrase in invalid_phrases) |
|
|
|
|
| |
| |
| |
|
|
| SYSTEM_PROMPT = """Answer the question with ONLY the final answer. No explanation. |
| |
| Format: |
| - Numbers: just the number (e.g., 5) |
| - Names: just the name (e.g., John Smith) |
| - Words: just the word (e.g., right) |
| - Lists: comma-separated (e.g., a, b, c) |
| |
| IMPORTANT: |
| - If counting items from a list or table, count carefully and give the exact number |
| - If asked for opposite of a word, give that opposite word |
| - Always give your best answer, never refuse""" |
|
|
|
|
| def is_simple_question(question: str) -> bool: |
| """Check if question is simple enough to answer without web search.""" |
| q_lower = question.lower() |
| |
| simple_patterns = [ |
| "opposite of", "antonym of", "what is the opposite", |
| "write the opposite", "2+2", "2 + 2", |
| ] |
| return any(p in q_lower for p in simple_patterns) and len(question) < 200 |
|
|
|
|
| def solve_question(question: str, task_id: str, groq_key: str) -> str: |
| """Main function to solve a GAIA question.""" |
| print(f"\n[Q]: {question[:150]}{'...' if len(question) > 150 else ''}") |
| |
| |
| processed_q = preprocess_question(question) |
| context_parts = [] |
| |
| |
| if is_simple_question(processed_q): |
| print(" β‘ Simple question detected, answering directly") |
| answer_raw = ask_groq([ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": f"Answer this directly: {processed_q}"} |
| ], groq_key, max_tokens=50, temperature=0.0) |
| answer = clean_answer(answer_raw) if answer_raw else "" |
| if answer and is_valid_answer(answer): |
| print(f" β
Direct answer: {answer}") |
| return answer |
| |
| |
| file_content, file_type, file_bytes = fetch_task_file(task_id) |
| if file_content and file_type != "none": |
| |
| if file_type == "image" and file_bytes: |
| print(f" πΌοΈ Analyzing image with Vision API...") |
| vision_answer = analyze_image(file_bytes, processed_q, groq_key) |
| if vision_answer and is_valid_answer(clean_answer(vision_answer)): |
| |
| answer = clean_answer(vision_answer) |
| print(f" β
Vision answer: {answer}") |
| return answer |
| elif vision_answer: |
| |
| context_parts.append(f"[IMAGE ANALYSIS]:\n{vision_answer}") |
| |
| |
| elif file_type == "audio" and file_bytes: |
| print(f" π΅ Transcribing audio with Whisper...") |
| transcript = transcribe_audio(file_bytes, groq_key) |
| if transcript: |
| context_parts.append(f"[AUDIO TRANSCRIPTION]:\n{transcript}") |
| print(f" β
Got audio transcript ({len(transcript)} chars)") |
| else: |
| context_parts.append(f"[NOTE: Audio file attached but transcription failed.]") |
| |
| |
| else: |
| context_parts.append(f"[ATTACHED FILE - {file_type.upper()}]:\n{file_content}") |
| print(f" π Got {file_type} file ({len(file_content)} chars)") |
| |
| |
| yt_urls = re.findall(r'https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/shorts/)[^\s\)\]]+', processed_q) |
| for yt_url in yt_urls[:2]: |
| clean_url = yt_url.rstrip('.,;:') |
| print(f" π¬ Fetching transcript: {clean_url}") |
| transcript = fetch_youtube_transcript(clean_url) |
| if transcript: |
| context_parts.append(f"[YOUTUBE VIDEO TRANSCRIPT]:\n{transcript}") |
| else: |
| |
| vid_match = re.search(r'(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})', clean_url) |
| if vid_match: |
| vid_id = vid_match.group(1) |
| print(f" π No transcript, searching for video info: {vid_id}") |
| video_results = web_search(f"youtube {vid_id} video content summary", max_results=3) |
| if video_results: |
| snippets = "\n".join([f"β’ {r.get('title', '')}: {r.get('body', '')}" for r in video_results]) |
| context_parts.append(f"[YOUTUBE VIDEO INFO (no transcript available)]:\nVideo URL: {clean_url}\nSearch results about this video:\n{snippets}") |
| else: |
| context_parts.append(f"[YOUTUBE VIDEO]: {clean_url} - No transcript or info available.") |
| else: |
| context_parts.append(f"[YOUTUBE VIDEO]: {clean_url} - Could not process.") |
| |
| |
| other_urls = re.findall(r'https?://[^\s\)\]]+', processed_q) |
| other_urls = [u.rstrip('.,;:') for u in other_urls |
| if "youtube.com" not in u and "youtu.be" not in u] |
| |
| for url in other_urls[:2]: |
| print(f" π Fetching page: {url[:60]}...") |
| page_content = fetch_webpage(url) |
| if page_content: |
| context_parts.append(f"[WEBPAGE: {url}]:\n{page_content}") |
| |
| |
| |
| should_search = True |
| if file_type in ["excel", "csv"] and len(file_content) > 500: |
| should_search = False |
| print(" βοΈ Skipping search - using file data") |
| |
| if should_search and not yt_urls: |
| |
| search_query = processed_q[:200] if len(processed_q) < 200 else processed_q[:200] |
| |
| |
| query_prompt = ask_groq([ |
| {"role": "system", "content": "Extract the key search terms from this question. Output ONLY the search query (3-8 words), nothing else."}, |
| {"role": "user", "content": processed_q[:400]} |
| ], groq_key, max_tokens=30, temperature=0.0) |
| |
| if query_prompt and len(query_prompt) < 100 and len(query_prompt) > 3: |
| search_query = query_prompt |
| |
| print(f" π Searching: '{search_query[:50]}'") |
| |
| |
| results = web_search(search_query, max_results=5) |
| |
| if results: |
| |
| snippets = "\n".join([f"β’ {r.get('title', '')}: {r.get('body', '')}" for r in results]) |
| context_parts.append(f"[SEARCH RESULTS]:\n{snippets}") |
| |
| |
| wiki_fetched = False |
| for r in results: |
| href = r.get("href", "") |
| if "wikipedia.org" in href and not wiki_fetched: |
| page = fetch_webpage(href) |
| if page and len(page) > 500: |
| context_parts.append(f"[WIKIPEDIA PAGE]:\n{page[:6000]}") |
| wiki_fetched = True |
| print(f" π Fetched Wikipedia: {href[:50]}") |
| break |
| |
| |
| if not wiki_fetched: |
| for r in results[:2]: |
| href = r.get("href", "") |
| if href and "youtube" not in href: |
| page = fetch_webpage(href) |
| if page and len(page) > 300: |
| context_parts.append(f"[WEB PAGE]:\n{page[:4000]}") |
| print(f" π Fetched: {href[:50]}") |
| break |
| |
| |
| wiki_content = search_wikipedia(search_query) |
| if wiki_content and "[WIKIPEDIA PAGE]" not in str(context_parts): |
| context_parts.append(f"[WIKIPEDIA]:\n{wiki_content}") |
| |
| |
| context = "\n\n".join(context_parts) if context_parts else "" |
| |
| |
| if len(context) > 12000: |
| context = context[:12000] + "\n[...truncated]" |
| |
| |
| is_counting_q = any(w in processed_q.lower() for w in ['how many', 'count', 'number of', 'total']) |
| is_list_q = any(w in processed_q.lower() for w in ['list', 'name all', 'what are']) |
| |
| |
| if context and (is_counting_q or is_list_q): |
| |
| extract_prompt = f"""From this context, extract ONLY the specific information needed to answer the question. |
| |
| Context: {context[:8000]} |
| |
| Question: {processed_q} |
| |
| List the relevant facts (be brief):""" |
| |
| extracted = ask_groq([ |
| {"role": "user", "content": extract_prompt} |
| ], groq_key, max_tokens=500, temperature=0.0) |
| |
| if extracted: |
| print(f" π Extracted: {extracted[:150]}...") |
| |
| answer_raw = ask_groq([ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": f"Based on these facts:\n{extracted}\n\nQuestion: {processed_q}\n\nFinal answer (just the answer, nothing else):"} |
| ], groq_key, max_tokens=100, temperature=0.0) |
| else: |
| answer_raw = "" |
| elif context: |
| messages = [ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {processed_q}\n\nAnswer:"} |
| ] |
| answer_raw = ask_groq(messages, groq_key, max_tokens=100, temperature=0.1) |
| else: |
| messages = [ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": f"Question: {processed_q}\n\nAnswer:"} |
| ] |
| answer_raw = ask_groq(messages, groq_key, max_tokens=100, temperature=0.1) |
| |
| answer = clean_answer(answer_raw) if answer_raw else "" |
| |
| print(f" π€ Raw: '{answer_raw[:100] if answer_raw else '[empty]'}' -> Clean: '{answer}'") |
| |
| |
| if not is_valid_answer(answer): |
| print(f" β οΈ First attempt invalid: '{answer}', retrying...") |
| |
| |
| retry_messages = [ |
| {"role": "system", "content": "Give ONLY the answer. One word or number if possible."}, |
| {"role": "user", "content": f"{processed_q}"} |
| ] |
| answer_raw = ask_groq(retry_messages, groq_key, max_tokens=50, temperature=0.2) |
| answer = clean_answer(answer_raw) if answer_raw else "" |
| print(f" π€ Retry: '{answer}'") |
| |
| |
| if not is_valid_answer(answer): |
| print(f" β οΈ Second attempt invalid: '{answer}', trying knowledge-based...") |
| |
| retry_messages = [ |
| {"role": "system", "content": "Give ONLY the answer, nothing else. Best guess if unsure."}, |
| {"role": "user", "content": processed_q} |
| ] |
| answer_raw = ask_groq(retry_messages, groq_key, max_tokens=50, temperature=0.5) |
| answer = clean_answer(answer_raw) if answer_raw else "" |
| print(f" π€ Third try raw: '{answer_raw[:100] if answer_raw else '[empty]'}' -> Clean: '{answer}'") |
| |
| |
| if not answer or len(answer.strip()) == 0 or not is_valid_answer(answer): |
| if answer_raw and len(answer_raw.strip()) > 0: |
| |
| lines = answer_raw.strip().split('\n') |
| for line in lines: |
| line = line.strip() |
| if line and len(line) < 100 and not any(x in line.lower() for x in ['cannot', "don't know", 'unable', 'no image']): |
| answer = clean_answer(line) |
| print(f" π Extracted from response: '{answer}'") |
| break |
| |
| |
| if not answer or len(answer.strip()) == 0: |
| answer = "unknown" |
| print(f" β No answer found, defaulting to 'unknown'") |
| |
| print(f" β
Final Answer: {answer}") |
| return answer |
|
|
|
|
| |
| |
| |
|
|
| def run_and_submit_all(profile: gr.OAuthProfile | None): |
| """Run the agent on all questions and submit answers.""" |
| space_id = os.getenv("SPACE_ID", "") |
| |
| if not profile: |
| return "Effettua il login con Hugging Face per continuare.", None |
| |
| username = profile.username |
| groq_key = os.getenv("GROQ_API_KEY", "") |
| |
| if not groq_key: |
| return "β GROQ_API_KEY non configurata! Aggiungi la chiave nelle impostazioni dello Space.", None |
| |
| print(f"\n{'='*60}") |
| print(f"π€ User: {username}") |
| print(f"π€ Agent: GAIA Agent v5") |
| print(f"π API Key: {groq_key[:8]}...{groq_key[-4:]}") |
| print(f"{'='*60}") |
| |
| |
| print("\nπ Testing Groq API connectivity...") |
| test_response = ask_groq( |
| [{"role": "user", "content": "Say 'OK' and nothing else."}], |
| groq_key, max_tokens=10, temperature=0.0 |
| ) |
| if not test_response: |
| return "β Groq API test failed! Check your API key and try again.", None |
| print(f"β
Groq API test passed: '{test_response}'") |
| |
| |
| try: |
| resp = requests.get(f"{DEFAULT_API_URL}/questions", timeout=20) |
| resp.raise_for_status() |
| questions = resp.json() |
| except Exception as e: |
| return f"β Errore nel recupero delle domande: {e}", None |
| |
| print(f"\nπ {len(questions)} domande da processare\n") |
| |
| results = [] |
| answers = [] |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "" |
| |
| for i, item in enumerate(questions): |
| task_id = item.get("task_id", "") |
| q = item.get("question") |
| |
| if not task_id or q is None: |
| print(f"[{i+1}] Skipping invalid item") |
| continue |
| |
| print(f"\n{'β'*60}") |
| print(f"[{i+1}/{len(questions)}] Task: {task_id[:20]}...") |
| |
| try: |
| answer = solve_question(q, task_id, groq_key) |
| except Exception as e: |
| print(f" π₯ Exception: {e}") |
| traceback.print_exc() |
| answer = "I don't know" |
| |
| answers.append({ |
| "task_id": task_id, |
| "submitted_answer": answer |
| }) |
| results.append({ |
| "Task ID": task_id[:20] + "...", |
| "Question": q[:80] + ("..." if len(q) > 80 else ""), |
| "Answer": answer |
| }) |
| |
| |
| time.sleep(2.5) |
| |
| if not answers: |
| return "β Nessuna risposta generata.", pd.DataFrame(results) |
| |
| |
| print(f"\n{'='*60}") |
| print(f"π€ Submitting {len(answers)} answers...") |
| |
| try: |
| submit_resp = requests.post( |
| f"{DEFAULT_API_URL}/submit", |
| json={ |
| "username": username, |
| "agent_code": agent_code, |
| "answers": answers |
| }, |
| timeout=60, |
| ) |
| submit_resp.raise_for_status() |
| result = submit_resp.json() |
| |
| score = result.get('score', 'N/A') |
| correct = result.get('correct_count', '?') |
| total = result.get('total_attempted', '?') |
| message = result.get('message', '') |
| |
| status = f"""β
Completato! |
| π€ {result.get('username')} |
| π {score}% ({correct}/{total}) |
| π {message}""" |
| |
| print(f"\n{status}") |
| return status, pd.DataFrame(results) |
| |
| except Exception as e: |
| error_msg = f"β Errore nell'invio: {e}" |
| print(error_msg) |
| return error_msg, pd.DataFrame(results) |
|
|
|
|
| def create_demo(): |
| """Build and return the Gradio interface.""" |
| with gr.Blocks(title="GAIA Agent v5") as demo: |
| gr.Markdown("""# π GAIA Agent v5 |
| |
| **Full-featured agent with Vision & Audio!** |
| - π§ Groq Llama 3.3 70B for reasoning |
| - ποΈ Llama 3.2 Vision for image analysis |
| - π€ Whisper for audio transcription |
| - π Smart web search + Wikipedia |
| - πΊ YouTube transcript extraction |
| - π File parsing (CSV, Excel, PDF, Python) |
| """) |
| |
| gr.LoginButton() |
| |
| run_button = gr.Button("π₯ Avvia Valutazione", variant="primary", size="lg") |
| |
| status_output = gr.Textbox( |
| label="Risultato", |
| lines=6, |
| interactive=False |
| ) |
| |
| results_table = gr.DataFrame( |
| label="Risposte", |
| wrap=True |
| ) |
| |
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table] |
| ) |
| |
| return demo |
|
|
|
|
| if __name__ == "__main__": |
| demo = create_demo() |
| demo.queue(default_concurrency_limit=1).launch(debug=True, share=False) |
|
|