Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import base64 | |
| import shutil | |
| import subprocess | |
| import mimetypes | |
| from pathlib import Path | |
| from typing import Optional | |
| from urllib.parse import quote_plus | |
| from html import unescape | |
| import requests | |
| import pandas as pd | |
| from openai import OpenAI | |
| from langchain_core.tools import tool | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| DEFAULT_API_URL = os.getenv( | |
| "AGENT_COURSE_API_URL", | |
| "https://agents-course-unit4-scoring.hf.space", | |
| ) | |
| CACHE_DIR = Path("/tmp/agent_course_files") | |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| def _safe_filename(name: str) -> str: | |
| """Make a filename safe for local storage.""" | |
| return re.sub(r"[^a-zA-Z0-9._-]+", "_", name).strip("_") or "file" | |
| def _infer_ext_from_bytes(content: bytes, content_type: str = "") -> str: | |
| ct = (content_type or "").lower() | |
| head = content[:512] | |
| # image | |
| if head.startswith(b"\x89PNG\r\n\x1a\n"): | |
| return ".png" | |
| if head.startswith(b"\xff\xd8\xff"): | |
| return ".jpg" | |
| if head.startswith(b"GIF87a") or head.startswith(b"GIF89a"): | |
| return ".gif" | |
| # audio | |
| if head.startswith(b"ID3"): | |
| return ".mp3" | |
| if len(head) >= 2 and head[0] == 0xFF and (head[1] & 0xE0) == 0xE0: | |
| return ".mp3" | |
| if head.startswith(b"RIFF") and b"WAVE" in head[:32]: | |
| return ".wav" | |
| if b"ftypM4A" in head[:64] or b"M4A" in head[:64]: | |
| return ".m4a" | |
| # spreadsheet / zip-based xlsx | |
| if head.startswith(b"PK\x03\x04"): | |
| return ".xlsx" | |
| # content-type fallback | |
| if "mpeg" in ct or "mp3" in ct or "audio/mpeg" in ct: | |
| return ".mp3" | |
| if "wav" in ct: | |
| return ".wav" | |
| if "m4a" in ct or "mp4" in ct: | |
| return ".m4a" | |
| if "png" in ct: | |
| return ".png" | |
| if "jpeg" in ct or "jpg" in ct: | |
| return ".jpg" | |
| if "excel" in ct or "spreadsheet" in ct: | |
| return ".xlsx" | |
| if "csv" in ct: | |
| return ".csv" | |
| if "python" in ct: | |
| return ".py" | |
| return "" | |
| def _filename_from_headers(task_id: str, response: requests.Response) -> str: | |
| """Infer filename from Content-Disposition, Content-Type, or file bytes.""" | |
| content_disposition = response.headers.get("content-disposition", "") | |
| match = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', content_disposition) | |
| if match: | |
| return _safe_filename(match.group(1)) | |
| content_type = response.headers.get("content-type", "").lower() | |
| content = response.content | |
| ext = "" | |
| if "spreadsheet" in content_type or "excel" in content_type: | |
| ext = ".xlsx" | |
| elif "csv" in content_type: | |
| ext = ".csv" | |
| elif "python" in content_type: | |
| ext = ".py" | |
| elif "audio" in content_type or "mpeg" in content_type or "mp3" in content_type: | |
| ext = ".mp3" | |
| elif "wav" in content_type: | |
| ext = ".wav" | |
| elif "image/png" in content_type: | |
| ext = ".png" | |
| elif "image/jpeg" in content_type: | |
| ext = ".jpg" | |
| if not ext: | |
| ext = _infer_ext_from_bytes(content, content_type) | |
| return _safe_filename(task_id) + ext | |
| def _download_file_by_task_id(task_id: str) -> Optional[Path]: | |
| """Download attached task file from the official scoring API.""" | |
| if not task_id: | |
| return None | |
| url = f"{DEFAULT_API_URL}/files/{task_id}" | |
| try: | |
| response = requests.get(url, timeout=45) | |
| print( | |
| f"[download] task_id={task_id} status={response.status_code} " | |
| f"content_type={response.headers.get('content-type')} " | |
| f"content_disposition={response.headers.get('content-disposition')} " | |
| f"bytes={len(response.content)}", | |
| flush=True, | |
| ) | |
| if response.status_code == 404: | |
| return None | |
| response.raise_for_status() | |
| filename = _filename_from_headers(task_id, response) | |
| file_path = CACHE_DIR / filename | |
| file_path.write_bytes(response.content) | |
| print(f"[download] saved to {file_path} suffix={file_path.suffix}", flush=True) | |
| return file_path | |
| except Exception as e: | |
| print(f"[download ERROR] task_id={task_id} url={url} error={repr(e)}", flush=True) | |
| return None | |
| def _resolve_file(task_id: str = "", file_path: str = "") -> Optional[Path]: | |
| """Resolve either a local file path or a task_id into a real local Path.""" | |
| if file_path: | |
| path = Path(file_path) | |
| if path.exists(): | |
| return path | |
| if task_id: | |
| return _download_file_by_task_id(task_id) | |
| return None | |
| def download_task_file(task_id: str) -> str: | |
| """ | |
| Download the attached file for a Hugging Face Agent Course task_id. | |
| Use this when a question mentions an attached file, image, audio, spreadsheet, or code file. | |
| Returns the local file path and basic file information. | |
| """ | |
| path = _download_file_by_task_id(task_id) | |
| if path is None: | |
| return f"No attached file found for task_id={task_id}." | |
| info = { | |
| "task_id": task_id, | |
| "file_path": str(path), | |
| "filename": path.name, | |
| "suffix": path.suffix, | |
| "size_bytes": path.stat().st_size, | |
| } | |
| return json.dumps(info, ensure_ascii=False) | |
| def read_attached_text_file(task_id: str = "", file_path: str = "", max_chars: int = 12000) -> str: | |
| """ | |
| Read the text content of an attached file. | |
| Use this for .txt, .py, .csv, .md, .json, or other plain-text attachments. | |
| Provide either task_id or file_path. | |
| """ | |
| path = _resolve_file(task_id=task_id, file_path=file_path) | |
| if path is None: | |
| return "No file could be resolved from the given task_id or file_path." | |
| try: | |
| text = path.read_text(encoding="utf-8", errors="replace") | |
| return text[:max_chars] | |
| except Exception as e: | |
| return f"Failed to read file {path}: {e}" | |
| IMAGE_SUFFIXES = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif"} | |
| def _image_to_data_url(path: Path) -> str: | |
| """ | |
| Convert a local image file to a base64 data URL for Qwen-VL / OpenAI-compatible API. | |
| """ | |
| mime_type, _ = mimetypes.guess_type(str(path)) | |
| if not mime_type or not mime_type.startswith("image/"): | |
| suffix = path.suffix.lower() | |
| if suffix in [".jpg", ".jpeg"]: | |
| mime_type = "image/jpeg" | |
| elif suffix == ".png": | |
| mime_type = "image/png" | |
| elif suffix == ".webp": | |
| mime_type = "image/webp" | |
| elif suffix == ".bmp": | |
| mime_type = "image/bmp" | |
| elif suffix == ".gif": | |
| mime_type = "image/gif" | |
| else: | |
| mime_type = "image/jpeg" | |
| encoded = base64.b64encode(path.read_bytes()).decode("utf-8") | |
| return f"data:{mime_type};base64,{encoded}" | |
| def answer_image_question(task_id: str = "", file_path: str = "", question: str = "") -> str: | |
| """ | |
| Analyze an attached image and answer the user's question. | |
| Use this tool when the question mentions an attached image, picture, screenshot, | |
| chess position, visual content, chart image, diagram, object counting, OCR from image, | |
| or asks what is shown in an image. | |
| Provide task_id when available. Also include the original question. | |
| """ | |
| path = _resolve_file(task_id=task_id, file_path=file_path) | |
| if path is None: | |
| return "No image file could be resolved from the given task_id or file_path." | |
| suffix = path.suffix.lower() | |
| if suffix not in IMAGE_SUFFIXES: | |
| return ( | |
| f"Resolved file is not a supported image. " | |
| f"file_path={path}, suffix={suffix}. " | |
| f"Supported suffixes: {sorted(IMAGE_SUFFIXES)}" | |
| ) | |
| api_key = os.getenv("DASHSCOPE_API_KEY") | |
| if not api_key: | |
| return "DASHSCOPE_API_KEY is not set." | |
| try: | |
| image_url = _image_to_data_url(path) | |
| client = OpenAI( | |
| api_key=api_key, | |
| base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", | |
| timeout=25.0, | |
| max_retries=1, | |
| ) | |
| prompt = f""" | |
| You are a precise visual question-answering tool for an evaluation benchmark. | |
| Task: | |
| Answer the user's question using the image. | |
| Rules: | |
| - Use the image content as the primary evidence. | |
| - If the question asks for a number, return only the number unless explanation is required. | |
| - If the question asks for a word, name, color, object, move, or label, return only that final answer. | |
| - For chess/checker/board-game images, carefully identify the board and pieces before answering. | |
| - For OCR-like questions, read visible text carefully. | |
| - Do not add markdown. | |
| - Do not mention that you are an AI model. | |
| Question: | |
| {question} | |
| """.strip() | |
| response = client.chat.completions.create( | |
| model=os.getenv("DASHSCOPE_VL_MODEL", "qwen3.6-plus"), | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": image_url}, | |
| }, | |
| ], | |
| } | |
| ], | |
| temperature=0, | |
| max_tokens=256, | |
| ) | |
| answer = response.choices[0].message.content | |
| return answer.strip() if answer else "" | |
| except Exception as e: | |
| return f"Failed to analyze image {path}: {e}" | |
| def answer_python_question(task_id: str = "", file_path: str = "") -> str: | |
| """ | |
| Execute an attached Python file and return its final output. | |
| Use this when the question asks for the final numeric output from attached Python code. | |
| Provide either task_id or file_path. | |
| """ | |
| path = _resolve_file(task_id=task_id, file_path=file_path) | |
| if path is None: | |
| return "No Python file could be resolved from the given task_id or file_path." | |
| try: | |
| result = subprocess.run( | |
| ["python", str(path)], | |
| cwd=str(path.parent), | |
| capture_output=True, | |
| text=True, | |
| timeout=10, | |
| ) | |
| stdout = result.stdout.strip() | |
| stderr = result.stderr.strip() | |
| if stdout: | |
| lines = [line.strip() for line in stdout.splitlines() if line.strip()] | |
| return lines[-1] if lines else stdout | |
| if stderr: | |
| return f"Python execution produced no stdout. stderr:\n{stderr[-2000:]}" | |
| return "Python execution finished with no output." | |
| except subprocess.TimeoutExpired: | |
| return "Python execution timed out." | |
| except Exception as e: | |
| return f"Failed to execute Python file: {e}" | |
| def answer_excel_question(task_id: str = "", file_path: str = "", question: str = "") -> str: | |
| """ | |
| Read an attached Excel or CSV file and return compact spreadsheet data for answering the question. | |
| Use this for questions about attached .xlsx, .xls, or .csv files. | |
| Provide task_id when available. Also include the original question. | |
| """ | |
| path = _resolve_file(task_id=task_id, file_path=file_path) | |
| if path is None: | |
| return "No spreadsheet file could be resolved from the given task_id or file_path." | |
| try: | |
| suffix = path.suffix.lower() | |
| if suffix == ".csv": | |
| sheets = {"sheet1": pd.read_csv(path)} | |
| else: | |
| sheets = pd.read_excel(path, sheet_name=None) | |
| outputs = [] | |
| for sheet_name, df in sheets.items(): | |
| df = df.copy() | |
| outputs.append(f"Sheet: {sheet_name}") | |
| outputs.append(f"Shape: {df.shape[0]} rows x {df.shape[1]} columns") | |
| outputs.append(f"Columns: {list(df.columns)}") | |
| preview_csv = df.head(80).to_csv(index=False) | |
| outputs.append("Preview CSV:") | |
| outputs.append(preview_csv) | |
| numeric_summary = df.select_dtypes(include="number").sum(numeric_only=True) | |
| if not numeric_summary.empty: | |
| outputs.append("Numeric column sums:") | |
| outputs.append(numeric_summary.to_string()) | |
| # Heuristic helper for common “food excluding drinks” sales question. | |
| q = question.lower() | |
| if "food" in q and ("drink" in q or "drinks" in q): | |
| possible_category_cols = [ | |
| col for col in df.columns | |
| if any(key in str(col).lower() for key in ["category", "type", "item", "name", "menu"]) | |
| ] | |
| possible_value_cols = [ | |
| col for col in df.columns | |
| if any(key in str(col).lower() for key in ["sales", "revenue", "amount", "total", "usd", "price"]) | |
| ] | |
| if possible_category_cols and possible_value_cols: | |
| category_col = possible_category_cols[0] | |
| value_col = possible_value_cols[0] | |
| values = ( | |
| df[value_col] | |
| .astype(str) | |
| .str.replace(r"[$,]", "", regex=True) | |
| ) | |
| values = pd.to_numeric(values, errors="coerce").fillna(0) | |
| drink_pattern = r"drink|drinks|beverage|soda|coffee|tea|juice|water|milkshake|shake|smoothie" | |
| food_mask = ~df[category_col].astype(str).str.lower().str.contains( | |
| drink_pattern, | |
| regex=True, | |
| na=False, | |
| ) | |
| total_food_sales = values[food_mask].sum() | |
| outputs.append( | |
| f"Heuristic food-not-drinks total using category column " | |
| f"'{category_col}' and value column '{value_col}': " | |
| f"${total_food_sales:.2f}" | |
| ) | |
| return "\n\n".join(outputs)[:20000] | |
| except Exception as e: | |
| return f"Failed to read spreadsheet {path}: {e}" | |
| def _extract_youtube_id(text: str) -> Optional[str]: | |
| """Extract YouTube video ID from a URL or text containing a URL.""" | |
| urls = re.findall(r"https?://[^\s)]+", text) | |
| for url in urls: | |
| if "youtube.com/watch" in url: | |
| match = re.search(r"[?&]v=([^&\s]+)", url) | |
| if match: | |
| return match.group(1) | |
| if "youtu.be/" in url: | |
| match = re.search(r"youtu\.be/([^?&\s]+)", url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def _transcript_to_text(transcript) -> str: | |
| """Convert transcript result to plain text, compatible with old/new youtube-transcript-api.""" | |
| pieces = [] | |
| for item in transcript: | |
| if isinstance(item, dict): | |
| text = item.get("text", "") | |
| else: | |
| text = getattr(item, "text", "") | |
| if text: | |
| pieces.append(text.replace("\n", " ").strip()) | |
| return " ".join(pieces) | |
| def get_youtube_transcript(url_or_question: str) -> str: | |
| """ | |
| Get the transcript text of a YouTube video from a URL or a question containing a YouTube URL. | |
| Use this when the question asks what someone says in a YouTube video. | |
| This may not help for purely visual questions about video frames. | |
| """ | |
| video_id = _extract_youtube_id(url_or_question) | |
| if not video_id: | |
| return "No YouTube video ID found." | |
| try: | |
| # Compatibility with older youtube-transcript-api versions. | |
| try: | |
| transcript = YouTubeTranscriptApi.get_transcript( | |
| video_id, | |
| languages=["en", "en-US", "en-GB"], | |
| ) | |
| except AttributeError: | |
| api = YouTubeTranscriptApi() | |
| transcript = api.fetch( | |
| video_id, | |
| languages=["en", "en-US", "en-GB"], | |
| ) | |
| text = _transcript_to_text(transcript) | |
| if not text: | |
| return "Transcript was found but is empty." | |
| return text[:20000] | |
| except Exception as e: | |
| return f"Failed to fetch YouTube transcript for video_id={video_id}: {e}" | |
| def _youtube_url_from_id(video_id: str) -> str: | |
| return f"https://www.youtube.com/watch?v={video_id}" | |
| def _download_youtube_video(url_or_question: str) -> Optional[Path]: | |
| video_id = _extract_youtube_id(url_or_question) | |
| if not video_id: | |
| return None | |
| try: | |
| import yt_dlp | |
| except Exception: | |
| return None | |
| output_template = str(CACHE_DIR / f"youtube_{video_id}.%(ext)s") | |
| url = _youtube_url_from_id(video_id) | |
| options = { | |
| "format": "worst[ext=mp4][height<=480]/worst[height<=480]/worst", | |
| "outtmpl": output_template, | |
| "noplaylist": True, | |
| "quiet": True, | |
| "no_warnings": True, | |
| } | |
| try: | |
| with yt_dlp.YoutubeDL(options) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| candidates = sorted(CACHE_DIR.glob(f"youtube_{video_id}.*")) | |
| for candidate in candidates: | |
| if candidate.suffix.lower() in {".mp4", ".webm", ".mkv", ".mov"} and candidate.stat().st_size > 0: | |
| return candidate | |
| requested = (info or {}).get("requested_downloads") or [] | |
| for item in requested: | |
| filepath = item.get("filepath") | |
| if filepath and Path(filepath).exists(): | |
| return Path(filepath) | |
| except Exception as e: | |
| print(f"[youtube download ERROR] video_id={video_id} error={repr(e)}", flush=True) | |
| return None | |
| def _extract_video_frames(video_path: Path, max_frames: int = 12) -> list[Path]: | |
| ffmpeg = shutil.which("ffmpeg") | |
| if not ffmpeg: | |
| return [] | |
| frame_dir = CACHE_DIR / f"{video_path.stem}_frames" | |
| frame_dir.mkdir(parents=True, exist_ok=True) | |
| for old_frame in frame_dir.glob("frame_*.jpg"): | |
| try: | |
| old_frame.unlink() | |
| except Exception: | |
| pass | |
| output_pattern = str(frame_dir / "frame_%03d.jpg") | |
| try: | |
| result = subprocess.run( | |
| [ | |
| ffmpeg, | |
| "-y", | |
| "-i", | |
| str(video_path), | |
| "-vf", | |
| "fps=1/5,scale=640:-1", | |
| "-frames:v", | |
| str(max_frames), | |
| output_pattern, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=45, | |
| ) | |
| if result.returncode != 0: | |
| print(f"[ffmpeg ERROR] {result.stderr[-1000:]}", flush=True) | |
| return [] | |
| except Exception as e: | |
| print(f"[ffmpeg ERROR] video={video_path} error={repr(e)}", flush=True) | |
| return [] | |
| return sorted(frame_dir.glob("frame_*.jpg"))[:max_frames] | |
| def answer_youtube_video_question(url_or_question: str, question: str = "") -> str: | |
| """ | |
| Answer visual questions about a YouTube video by downloading the video, | |
| sampling frames, and analyzing those frames with a vision model. | |
| Use this for questions about what appears on camera, visible objects, | |
| counts in video frames, or visual scenes. Use get_youtube_transcript | |
| instead for spoken words. | |
| """ | |
| if not question: | |
| question = url_or_question | |
| video_path = _download_youtube_video(url_or_question) | |
| if video_path is None: | |
| return "Failed to download YouTube video for visual analysis." | |
| frames = _extract_video_frames(video_path) | |
| if not frames: | |
| return "Failed to extract frames from YouTube video for visual analysis." | |
| api_key = os.getenv("DASHSCOPE_API_KEY") | |
| if not api_key: | |
| return "DASHSCOPE_API_KEY is not set." | |
| try: | |
| content = [ | |
| { | |
| "type": "text", | |
| "text": ( | |
| "You are a precise video-frame visual QA tool for an exact-match benchmark. " | |
| "The images are sampled frames from the same YouTube video in chronological order. " | |
| "Answer the user's question using the visible evidence. Return only the final answer.\n\n" | |
| f"Question:\n{question}" | |
| ), | |
| } | |
| ] | |
| for frame in frames: | |
| content.append({"type": "image_url", "image_url": {"url": _image_to_data_url(frame)}}) | |
| client = OpenAI( | |
| api_key=api_key, | |
| base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", | |
| timeout=45.0, | |
| max_retries=1, | |
| ) | |
| response = client.chat.completions.create( | |
| model=os.getenv("DASHSCOPE_VL_MODEL", "qwen3.6-plus"), | |
| messages=[{"role": "user", "content": content}], | |
| temperature=0, | |
| max_tokens=256, | |
| ) | |
| answer = response.choices[0].message.content | |
| return answer.strip() if answer else "" | |
| except Exception as e: | |
| return f"Failed to analyze YouTube video frames: {e}" | |
| def fetch_webpage_text(url: str, max_chars: int = 12000) -> str: | |
| """ | |
| Fetch a public webpage and return readable text. | |
| Use this when search results identify a specific source page or article. | |
| """ | |
| try: | |
| response = requests.get( | |
| url, | |
| timeout=30, | |
| headers={"User-Agent": "Mozilla/5.0 compatible GAIA coursework agent"}, | |
| ) | |
| response.raise_for_status() | |
| text = response.text | |
| text = re.sub(r"(?is)<script.*?</script>|<style.*?</style>", " ", text) | |
| text = re.sub(r"(?s)<[^>]+>", " ", text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text[:max_chars] | |
| except Exception as e: | |
| return f"Failed to fetch webpage {url}: {e}" | |
| def web_search_text(query: str, max_results: int = 5) -> str: | |
| """ | |
| Search the public web without a paid API key and return compact results. | |
| Use this as a fallback when Tavily is unavailable or when broad web lookup is needed. | |
| """ | |
| try: | |
| url = f"https://duckduckgo.com/html/?q={quote_plus(query)}" | |
| response = requests.get( | |
| url, | |
| timeout=30, | |
| headers={"User-Agent": "Mozilla/5.0 compatible GAIA coursework agent"}, | |
| ) | |
| response.raise_for_status() | |
| html = response.text | |
| blocks = re.findall(r'(?is)<div class="result[^"]*".*?</div>\s*</div>', html) | |
| results = [] | |
| for block in blocks: | |
| title_match = re.search(r'(?is)<a[^>]+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?)</a>', block) | |
| snippet_match = re.search(r'(?is)<a[^>]+class="result__snippet"[^>]*>(.*?)</a>', block) | |
| if not title_match: | |
| continue | |
| link = unescape(title_match.group(1)) | |
| title = re.sub(r"(?s)<[^>]+>", " ", title_match.group(2)) | |
| snippet = re.sub(r"(?s)<[^>]+>", " ", snippet_match.group(1)) if snippet_match else "" | |
| title = re.sub(r"\s+", " ", unescape(title)).strip() | |
| snippet = re.sub(r"\s+", " ", unescape(snippet)).strip() | |
| results.append(f"Title: {title}\nURL: {link}\nSnippet: {snippet}") | |
| if len(results) >= max_results: | |
| break | |
| if results: | |
| return "\n\n---\n\n".join(results) | |
| text = re.sub(r"(?s)<[^>]+>", " ", html) | |
| text = re.sub(r"\s+", " ", unescape(text)).strip() | |
| return text[:8000] | |
| except Exception as e: | |
| return f"Failed to search web for {query}: {e}" | |
| def wikipedia_api_search(query: str, max_chars: int = 12000) -> str: | |
| """ | |
| Search Wikipedia through the public API and return extracts from top pages. | |
| Use this for factual questions likely answerable from Wikipedia. | |
| """ | |
| try: | |
| search_url = ( | |
| "https://en.wikipedia.org/w/api.php?action=query&list=search&format=json" | |
| f"&srlimit=3&srsearch={quote_plus(query)}" | |
| ) | |
| search_response = requests.get(search_url, timeout=30) | |
| search_response.raise_for_status() | |
| hits = search_response.json().get("query", {}).get("search", []) | |
| outputs = [] | |
| for hit in hits: | |
| title = hit.get("title", "") | |
| if not title: | |
| continue | |
| extract_url = ( | |
| "https://en.wikipedia.org/w/api.php?action=query&prop=extracts&explaintext=1" | |
| f"&format=json&titles={quote_plus(title)}" | |
| ) | |
| extract_response = requests.get(extract_url, timeout=30) | |
| extract_response.raise_for_status() | |
| pages = extract_response.json().get("query", {}).get("pages", {}) | |
| for page in pages.values(): | |
| outputs.append(f"Title: {page.get('title', title)}\n{page.get('extract', '')}") | |
| return "\n\n---\n\n".join(outputs)[:max_chars] | |
| except Exception as e: | |
| return f"Failed to search Wikipedia for {query}: {e}" | |
| AUDIO_SUFFIXES = {".mp3", ".wav", ".m4a", ".aac", ".flac", ".ogg", ".opus", ".webm"} | |
| AUDIO_FORMAT_BY_SUFFIX = { | |
| ".mp3": "mp3", | |
| ".wav": "wav", | |
| ".m4a": "m4a", | |
| ".aac": "aac", | |
| ".flac": "flac", | |
| ".ogg": "ogg", | |
| ".opus": "opus", | |
| ".webm": "webm", | |
| } | |
| # DashScope Qwen-Omni Base64 input limit is documented as encoded Base64 < 10MB. | |
| # Keep a margin to avoid request rejection. | |
| MAX_AUDIO_BASE64_CHARS = int(os.getenv("MAX_AUDIO_BASE64_CHARS", str(9_500_000))) | |
| def _audio_format_from_path(path: Path) -> str: | |
| """Infer audio format required by input_audio.format.""" | |
| return AUDIO_FORMAT_BY_SUFFIX.get(path.suffix.lower(), path.suffix.lower().lstrip(".") or "mp3") | |
| def _encode_audio_base64(path: Path) -> str: | |
| """Read a local audio file and encode it as Base64 text.""" | |
| return base64.b64encode(path.read_bytes()).decode("utf-8") | |
| def _audio_to_data_url(path: Path) -> tuple[str, str]: | |
| """ | |
| Convert a local audio file to DashScope-compatible Base64 Data URL. | |
| DashScope examples use: | |
| data:;base64,<BASE64_AUDIO> | |
| together with input_audio.format. | |
| """ | |
| audio_format = _audio_format_from_path(path) | |
| encoded = _encode_audio_base64(path) | |
| return f"data:;base64,{encoded}", audio_format | |
| def _compress_audio_if_needed(path: Path) -> Path: | |
| """ | |
| If Base64 audio is too large, try to compress it with ffmpeg. | |
| This is a best-effort fallback. If ffmpeg is unavailable or compression fails, | |
| return the original path. | |
| """ | |
| try: | |
| encoded_len = len(_encode_audio_base64(path)) | |
| if encoded_len <= MAX_AUDIO_BASE64_CHARS: | |
| return path | |
| except Exception: | |
| return path | |
| ffmpeg = shutil.which("ffmpeg") | |
| if not ffmpeg: | |
| return path | |
| compressed_path = path.with_suffix(".compressed.mp3") | |
| try: | |
| # Mono, 16kHz, low bitrate is usually enough for speech QA. | |
| result = subprocess.run( | |
| [ | |
| ffmpeg, | |
| "-y", | |
| "-i", str(path), | |
| "-vn", | |
| "-ac", "1", | |
| "-ar", "16000", | |
| "-b:a", "32k", | |
| str(compressed_path), | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=30, | |
| ) | |
| if result.returncode == 0 and compressed_path.exists() and compressed_path.stat().st_size > 0: | |
| if len(_encode_audio_base64(compressed_path)) <= MAX_AUDIO_BASE64_CHARS: | |
| return compressed_path | |
| return path | |
| except Exception: | |
| return path | |
| def _collect_stream_text(completion) -> str: | |
| """ | |
| Collect text from OpenAI-compatible streaming chunks. | |
| Handles common delta.content layouts robustly. | |
| """ | |
| pieces = [] | |
| for chunk in completion: | |
| try: | |
| if not getattr(chunk, "choices", None): | |
| continue | |
| delta = chunk.choices[0].delta | |
| content = getattr(delta, "content", None) | |
| if isinstance(content, str) and content: | |
| pieces.append(content) | |
| elif isinstance(content, list): | |
| for item in content: | |
| if isinstance(item, dict): | |
| text = item.get("text") or item.get("content") | |
| if text: | |
| pieces.append(str(text)) | |
| else: | |
| text = getattr(item, "text", None) | |
| if text: | |
| pieces.append(str(text)) | |
| except Exception: | |
| continue | |
| return "".join(pieces).strip() | |
| def _dashscope_client() -> OpenAI: | |
| """Create OpenAI-compatible DashScope client.""" | |
| api_key = os.getenv("DASHSCOPE_API_KEY") | |
| if not api_key: | |
| raise RuntimeError("DASHSCOPE_API_KEY is not set.") | |
| base_url = os.getenv( | |
| "DASHSCOPE_BASE_URL", | |
| "https://dashscope.aliyuncs.com/compatible-mode/v1", | |
| ) | |
| return OpenAI( | |
| api_key=api_key, | |
| base_url=base_url, | |
| ) | |
| def _call_qwen_omni_audio( | |
| audio_path: Path, | |
| prompt: str, | |
| max_tokens: int = 1024, | |
| ) -> str: | |
| """ | |
| Call Qwen-Omni with local audio as Base64 Data URL. | |
| Returns text only. | |
| """ | |
| usable_path = _compress_audio_if_needed(audio_path) | |
| data_url, audio_format = _audio_to_data_url(usable_path) | |
| encoded_size = len(data_url) | |
| if encoded_size > MAX_AUDIO_BASE64_CHARS + 100: | |
| return ( | |
| f"Audio file is too large for Base64 input after compression attempt. " | |
| f"file={usable_path}, encoded_chars={encoded_size}, " | |
| f"limit={MAX_AUDIO_BASE64_CHARS}." | |
| ) | |
| client = _dashscope_client() | |
| audio_model = os.getenv("DASHSCOPE_AUDIO_MODEL", "qwen3.5-omni-plus") | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_audio", | |
| "input_audio": { | |
| "data": data_url, | |
| "format": audio_format, | |
| }, | |
| }, | |
| { | |
| "type": "text", | |
| "text": prompt, | |
| }, | |
| ], | |
| } | |
| ] | |
| kwargs = dict( | |
| model=audio_model, | |
| messages=messages, | |
| modalities=["text"], | |
| stream=True, | |
| stream_options={"include_usage": True}, | |
| max_tokens=max_tokens, | |
| temperature=0, | |
| ) | |
| # Some Qwen-Omni Flash variants require non-thinking mode for multimodal usage. | |
| # If the backend rejects extra_body, retry without it. | |
| try: | |
| completion = client.chat.completions.create( | |
| **kwargs, | |
| extra_body={"enable_thinking": False}, | |
| ) | |
| except Exception: | |
| completion = client.chat.completions.create(**kwargs) | |
| return _collect_stream_text(completion) | |
| def _extract_final_answer_from_transcript( | |
| question: str, | |
| transcript: str, | |
| max_tokens: int = 256, | |
| ) -> str: | |
| """ | |
| Use a text model to extract the exact final answer from the transcript. | |
| This is intentionally separate from transcription to reduce hallucination. | |
| """ | |
| client = _dashscope_client() | |
| text_model = os.getenv("DASHSCOPE_TEXT_MODEL", "qwen3.5-flash") | |
| prompt = f""" | |
| You are an exact-match answer extractor for an evaluation benchmark. | |
| Original question: | |
| {question} | |
| Audio transcript: | |
| {transcript} | |
| Task: | |
| Answer the original question using only the transcript. | |
| Rules: | |
| - Return only the final answer. | |
| - No explanation. | |
| - No markdown. | |
| - No citations. | |
| - Do not mention the transcript. | |
| - If the question asks for a list of ingredients/items/pages/names, return exactly that list. | |
| - If comma-separated output is appropriate, use comma + space. | |
| - If the question asks for numbers, return only the requested numbers. | |
| - If the answer is not present in the transcript, return the best concise answer from the transcript, not an apology. | |
| """.strip() | |
| response = client.chat.completions.create( | |
| model=text_model, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You extract exact final answers from transcripts.", | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt, | |
| }, | |
| ], | |
| temperature=0, | |
| max_tokens=max_tokens, | |
| ) | |
| answer = response.choices[0].message.content | |
| return answer.strip() if answer else "" | |
| def answer_audio_question( | |
| task_id: str = "", | |
| file_path: str = "", | |
| question: str = "", | |
| return_transcript: bool = False, | |
| ) -> str: | |
| """ | |
| Transcribe an attached audio file and answer the user's question. | |
| Use this tool when the question mentions an attached audio file, recording, | |
| mp3/wav/m4a file, professor recording, voicemail, spoken recipe, lecture audio, | |
| or asks what someone says in an audio attachment. | |
| Provide task_id when available, or file_path if the file has already been downloaded. | |
| Also include the original question. | |
| """ | |
| path = _resolve_file(task_id=task_id, file_path=file_path) | |
| if path is None: | |
| return "No audio file could be resolved from the given task_id or file_path." | |
| suffix = path.suffix.lower() | |
| if suffix not in AUDIO_SUFFIXES: | |
| inferred = _infer_ext_from_bytes(path.read_bytes()[:1024], "") | |
| if inferred in AUDIO_SUFFIXES: | |
| new_path = path.with_suffix(inferred) | |
| if not new_path.exists(): | |
| new_path.write_bytes(path.read_bytes()) | |
| path = new_path | |
| suffix = inferred | |
| else: | |
| return ( | |
| f"Resolved file is not a supported audio file. " | |
| f"file_path={path}, suffix={suffix}, inferred={inferred}, supported={sorted(AUDIO_SUFFIXES)}" | |
| ) | |
| if not question: | |
| question = "What is said in this audio? Return the important content concisely." | |
| try: | |
| transcription_prompt = f""" | |
| You are a careful speech transcription system. | |
| Listen to the audio and produce a clean transcript. | |
| Rules: | |
| - Transcribe all spoken words relevant to the user's question. | |
| - Preserve proper nouns, numbers, page numbers, ingredient names, and units carefully. | |
| - If the audio contains a recipe, preserve ingredient names exactly. | |
| - If the audio contains a lecture or assignment instructions, preserve page numbers and topics exactly. | |
| - Do not summarize too aggressively. | |
| - Return transcript text only. | |
| User's question for context: | |
| {question} | |
| """.strip() | |
| transcript = _call_qwen_omni_audio( | |
| audio_path=path, | |
| prompt=transcription_prompt, | |
| max_tokens=1200, | |
| ).strip() | |
| if not transcript: | |
| return "Audio transcription returned empty text." | |
| if transcript.lower().startswith("audio file is too large"): | |
| return transcript | |
| final_answer = _extract_final_answer_from_transcript( | |
| question=question, | |
| transcript=transcript, | |
| max_tokens=256, | |
| ).strip() | |
| if return_transcript: | |
| return ( | |
| f"Final answer:\n{final_answer}\n\n" | |
| f"Transcript:\n{transcript}" | |
| ).strip() | |
| return final_answer if final_answer else transcript | |
| except Exception as e: | |
| return f"Failed to process audio {path}: {e}" | |