import os import re import json import base64 import shutil import subprocess import mimetypes from pathlib import Path from typing import Optional from urllib.parse import quote_plus from html import unescape import requests import pandas as pd from openai import OpenAI from langchain_core.tools import tool from youtube_transcript_api import YouTubeTranscriptApi DEFAULT_API_URL = os.getenv( "AGENT_COURSE_API_URL", "https://agents-course-unit4-scoring.hf.space", ) CACHE_DIR = Path("/tmp/agent_course_files") CACHE_DIR.mkdir(parents=True, exist_ok=True) def _safe_filename(name: str) -> str: """Make a filename safe for local storage.""" return re.sub(r"[^a-zA-Z0-9._-]+", "_", name).strip("_") or "file" def _infer_ext_from_bytes(content: bytes, content_type: str = "") -> str: ct = (content_type or "").lower() head = content[:512] # image if head.startswith(b"\x89PNG\r\n\x1a\n"): return ".png" if head.startswith(b"\xff\xd8\xff"): return ".jpg" if head.startswith(b"GIF87a") or head.startswith(b"GIF89a"): return ".gif" # audio if head.startswith(b"ID3"): return ".mp3" if len(head) >= 2 and head[0] == 0xFF and (head[1] & 0xE0) == 0xE0: return ".mp3" if head.startswith(b"RIFF") and b"WAVE" in head[:32]: return ".wav" if b"ftypM4A" in head[:64] or b"M4A" in head[:64]: return ".m4a" # spreadsheet / zip-based xlsx if head.startswith(b"PK\x03\x04"): return ".xlsx" # content-type fallback if "mpeg" in ct or "mp3" in ct or "audio/mpeg" in ct: return ".mp3" if "wav" in ct: return ".wav" if "m4a" in ct or "mp4" in ct: return ".m4a" if "png" in ct: return ".png" if "jpeg" in ct or "jpg" in ct: return ".jpg" if "excel" in ct or "spreadsheet" in ct: return ".xlsx" if "csv" in ct: return ".csv" if "python" in ct: return ".py" return "" def _filename_from_headers(task_id: str, response: requests.Response) -> str: """Infer filename from Content-Disposition, Content-Type, or file bytes.""" content_disposition = response.headers.get("content-disposition", "") match = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', content_disposition) if match: return _safe_filename(match.group(1)) content_type = response.headers.get("content-type", "").lower() content = response.content ext = "" if "spreadsheet" in content_type or "excel" in content_type: ext = ".xlsx" elif "csv" in content_type: ext = ".csv" elif "python" in content_type: ext = ".py" elif "audio" in content_type or "mpeg" in content_type or "mp3" in content_type: ext = ".mp3" elif "wav" in content_type: ext = ".wav" elif "image/png" in content_type: ext = ".png" elif "image/jpeg" in content_type: ext = ".jpg" if not ext: ext = _infer_ext_from_bytes(content, content_type) return _safe_filename(task_id) + ext def _download_file_by_task_id(task_id: str) -> Optional[Path]: """Download attached task file from the official scoring API.""" if not task_id: return None url = f"{DEFAULT_API_URL}/files/{task_id}" try: response = requests.get(url, timeout=45) print( f"[download] task_id={task_id} status={response.status_code} " f"content_type={response.headers.get('content-type')} " f"content_disposition={response.headers.get('content-disposition')} " f"bytes={len(response.content)}", flush=True, ) if response.status_code == 404: return None response.raise_for_status() filename = _filename_from_headers(task_id, response) file_path = CACHE_DIR / filename file_path.write_bytes(response.content) print(f"[download] saved to {file_path} suffix={file_path.suffix}", flush=True) return file_path except Exception as e: print(f"[download ERROR] task_id={task_id} url={url} error={repr(e)}", flush=True) return None def _resolve_file(task_id: str = "", file_path: str = "") -> Optional[Path]: """Resolve either a local file path or a task_id into a real local Path.""" if file_path: path = Path(file_path) if path.exists(): return path if task_id: return _download_file_by_task_id(task_id) return None @tool def download_task_file(task_id: str) -> str: """ Download the attached file for a Hugging Face Agent Course task_id. Use this when a question mentions an attached file, image, audio, spreadsheet, or code file. Returns the local file path and basic file information. """ path = _download_file_by_task_id(task_id) if path is None: return f"No attached file found for task_id={task_id}." info = { "task_id": task_id, "file_path": str(path), "filename": path.name, "suffix": path.suffix, "size_bytes": path.stat().st_size, } return json.dumps(info, ensure_ascii=False) @tool def read_attached_text_file(task_id: str = "", file_path: str = "", max_chars: int = 12000) -> str: """ Read the text content of an attached file. Use this for .txt, .py, .csv, .md, .json, or other plain-text attachments. Provide either task_id or file_path. """ path = _resolve_file(task_id=task_id, file_path=file_path) if path is None: return "No file could be resolved from the given task_id or file_path." try: text = path.read_text(encoding="utf-8", errors="replace") return text[:max_chars] except Exception as e: return f"Failed to read file {path}: {e}" IMAGE_SUFFIXES = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif"} def _image_to_data_url(path: Path) -> str: """ Convert a local image file to a base64 data URL for Qwen-VL / OpenAI-compatible API. """ mime_type, _ = mimetypes.guess_type(str(path)) if not mime_type or not mime_type.startswith("image/"): suffix = path.suffix.lower() if suffix in [".jpg", ".jpeg"]: mime_type = "image/jpeg" elif suffix == ".png": mime_type = "image/png" elif suffix == ".webp": mime_type = "image/webp" elif suffix == ".bmp": mime_type = "image/bmp" elif suffix == ".gif": mime_type = "image/gif" else: mime_type = "image/jpeg" encoded = base64.b64encode(path.read_bytes()).decode("utf-8") return f"data:{mime_type};base64,{encoded}" @tool def answer_image_question(task_id: str = "", file_path: str = "", question: str = "") -> str: """ Analyze an attached image and answer the user's question. Use this tool when the question mentions an attached image, picture, screenshot, chess position, visual content, chart image, diagram, object counting, OCR from image, or asks what is shown in an image. Provide task_id when available. Also include the original question. """ path = _resolve_file(task_id=task_id, file_path=file_path) if path is None: return "No image file could be resolved from the given task_id or file_path." suffix = path.suffix.lower() if suffix not in IMAGE_SUFFIXES: return ( f"Resolved file is not a supported image. " f"file_path={path}, suffix={suffix}. " f"Supported suffixes: {sorted(IMAGE_SUFFIXES)}" ) api_key = os.getenv("DASHSCOPE_API_KEY") if not api_key: return "DASHSCOPE_API_KEY is not set." try: image_url = _image_to_data_url(path) client = OpenAI( api_key=api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", timeout=25.0, max_retries=1, ) prompt = f""" You are a precise visual question-answering tool for an evaluation benchmark. Task: Answer the user's question using the image. Rules: - Use the image content as the primary evidence. - If the question asks for a number, return only the number unless explanation is required. - If the question asks for a word, name, color, object, move, or label, return only that final answer. - For chess/checker/board-game images, carefully identify the board and pieces before answering. - For OCR-like questions, read visible text carefully. - Do not add markdown. - Do not mention that you are an AI model. Question: {question} """.strip() response = client.chat.completions.create( model=os.getenv("DASHSCOPE_VL_MODEL", "qwen3.6-plus"), messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": {"url": image_url}, }, ], } ], temperature=0, max_tokens=256, ) answer = response.choices[0].message.content return answer.strip() if answer else "" except Exception as e: return f"Failed to analyze image {path}: {e}" @tool def answer_python_question(task_id: str = "", file_path: str = "") -> str: """ Execute an attached Python file and return its final output. Use this when the question asks for the final numeric output from attached Python code. Provide either task_id or file_path. """ path = _resolve_file(task_id=task_id, file_path=file_path) if path is None: return "No Python file could be resolved from the given task_id or file_path." try: result = subprocess.run( ["python", str(path)], cwd=str(path.parent), capture_output=True, text=True, timeout=10, ) stdout = result.stdout.strip() stderr = result.stderr.strip() if stdout: lines = [line.strip() for line in stdout.splitlines() if line.strip()] return lines[-1] if lines else stdout if stderr: return f"Python execution produced no stdout. stderr:\n{stderr[-2000:]}" return "Python execution finished with no output." except subprocess.TimeoutExpired: return "Python execution timed out." except Exception as e: return f"Failed to execute Python file: {e}" @tool def answer_excel_question(task_id: str = "", file_path: str = "", question: str = "") -> str: """ Read an attached Excel or CSV file and return compact spreadsheet data for answering the question. Use this for questions about attached .xlsx, .xls, or .csv files. Provide task_id when available. Also include the original question. """ path = _resolve_file(task_id=task_id, file_path=file_path) if path is None: return "No spreadsheet file could be resolved from the given task_id or file_path." try: suffix = path.suffix.lower() if suffix == ".csv": sheets = {"sheet1": pd.read_csv(path)} else: sheets = pd.read_excel(path, sheet_name=None) outputs = [] for sheet_name, df in sheets.items(): df = df.copy() outputs.append(f"Sheet: {sheet_name}") outputs.append(f"Shape: {df.shape[0]} rows x {df.shape[1]} columns") outputs.append(f"Columns: {list(df.columns)}") preview_csv = df.head(80).to_csv(index=False) outputs.append("Preview CSV:") outputs.append(preview_csv) numeric_summary = df.select_dtypes(include="number").sum(numeric_only=True) if not numeric_summary.empty: outputs.append("Numeric column sums:") outputs.append(numeric_summary.to_string()) # Heuristic helper for common “food excluding drinks” sales question. q = question.lower() if "food" in q and ("drink" in q or "drinks" in q): possible_category_cols = [ col for col in df.columns if any(key in str(col).lower() for key in ["category", "type", "item", "name", "menu"]) ] possible_value_cols = [ col for col in df.columns if any(key in str(col).lower() for key in ["sales", "revenue", "amount", "total", "usd", "price"]) ] if possible_category_cols and possible_value_cols: category_col = possible_category_cols[0] value_col = possible_value_cols[0] values = ( df[value_col] .astype(str) .str.replace(r"[$,]", "", regex=True) ) values = pd.to_numeric(values, errors="coerce").fillna(0) drink_pattern = r"drink|drinks|beverage|soda|coffee|tea|juice|water|milkshake|shake|smoothie" food_mask = ~df[category_col].astype(str).str.lower().str.contains( drink_pattern, regex=True, na=False, ) total_food_sales = values[food_mask].sum() outputs.append( f"Heuristic food-not-drinks total using category column " f"'{category_col}' and value column '{value_col}': " f"${total_food_sales:.2f}" ) return "\n\n".join(outputs)[:20000] except Exception as e: return f"Failed to read spreadsheet {path}: {e}" def _extract_youtube_id(text: str) -> Optional[str]: """Extract YouTube video ID from a URL or text containing a URL.""" urls = re.findall(r"https?://[^\s)]+", text) for url in urls: if "youtube.com/watch" in url: match = re.search(r"[?&]v=([^&\s]+)", url) if match: return match.group(1) if "youtu.be/" in url: match = re.search(r"youtu\.be/([^?&\s]+)", url) if match: return match.group(1) return None def _transcript_to_text(transcript) -> str: """Convert transcript result to plain text, compatible with old/new youtube-transcript-api.""" pieces = [] for item in transcript: if isinstance(item, dict): text = item.get("text", "") else: text = getattr(item, "text", "") if text: pieces.append(text.replace("\n", " ").strip()) return " ".join(pieces) @tool def get_youtube_transcript(url_or_question: str) -> str: """ Get the transcript text of a YouTube video from a URL or a question containing a YouTube URL. Use this when the question asks what someone says in a YouTube video. This may not help for purely visual questions about video frames. """ video_id = _extract_youtube_id(url_or_question) if not video_id: return "No YouTube video ID found." try: # Compatibility with older youtube-transcript-api versions. try: transcript = YouTubeTranscriptApi.get_transcript( video_id, languages=["en", "en-US", "en-GB"], ) except AttributeError: api = YouTubeTranscriptApi() transcript = api.fetch( video_id, languages=["en", "en-US", "en-GB"], ) text = _transcript_to_text(transcript) if not text: return "Transcript was found but is empty." return text[:20000] except Exception as e: return f"Failed to fetch YouTube transcript for video_id={video_id}: {e}" def _youtube_url_from_id(video_id: str) -> str: return f"https://www.youtube.com/watch?v={video_id}" def _download_youtube_video(url_or_question: str) -> Optional[Path]: video_id = _extract_youtube_id(url_or_question) if not video_id: return None try: import yt_dlp except Exception: return None output_template = str(CACHE_DIR / f"youtube_{video_id}.%(ext)s") url = _youtube_url_from_id(video_id) options = { "format": "worst[ext=mp4][height<=480]/worst[height<=480]/worst", "outtmpl": output_template, "noplaylist": True, "quiet": True, "no_warnings": True, } try: with yt_dlp.YoutubeDL(options) as ydl: info = ydl.extract_info(url, download=True) candidates = sorted(CACHE_DIR.glob(f"youtube_{video_id}.*")) for candidate in candidates: if candidate.suffix.lower() in {".mp4", ".webm", ".mkv", ".mov"} and candidate.stat().st_size > 0: return candidate requested = (info or {}).get("requested_downloads") or [] for item in requested: filepath = item.get("filepath") if filepath and Path(filepath).exists(): return Path(filepath) except Exception as e: print(f"[youtube download ERROR] video_id={video_id} error={repr(e)}", flush=True) return None def _extract_video_frames(video_path: Path, max_frames: int = 12) -> list[Path]: ffmpeg = shutil.which("ffmpeg") if not ffmpeg: return [] frame_dir = CACHE_DIR / f"{video_path.stem}_frames" frame_dir.mkdir(parents=True, exist_ok=True) for old_frame in frame_dir.glob("frame_*.jpg"): try: old_frame.unlink() except Exception: pass output_pattern = str(frame_dir / "frame_%03d.jpg") try: result = subprocess.run( [ ffmpeg, "-y", "-i", str(video_path), "-vf", "fps=1/5,scale=640:-1", "-frames:v", str(max_frames), output_pattern, ], capture_output=True, text=True, timeout=45, ) if result.returncode != 0: print(f"[ffmpeg ERROR] {result.stderr[-1000:]}", flush=True) return [] except Exception as e: print(f"[ffmpeg ERROR] video={video_path} error={repr(e)}", flush=True) return [] return sorted(frame_dir.glob("frame_*.jpg"))[:max_frames] @tool def answer_youtube_video_question(url_or_question: str, question: str = "") -> str: """ Answer visual questions about a YouTube video by downloading the video, sampling frames, and analyzing those frames with a vision model. Use this for questions about what appears on camera, visible objects, counts in video frames, or visual scenes. Use get_youtube_transcript instead for spoken words. """ if not question: question = url_or_question video_path = _download_youtube_video(url_or_question) if video_path is None: return "Failed to download YouTube video for visual analysis." frames = _extract_video_frames(video_path) if not frames: return "Failed to extract frames from YouTube video for visual analysis." api_key = os.getenv("DASHSCOPE_API_KEY") if not api_key: return "DASHSCOPE_API_KEY is not set." try: content = [ { "type": "text", "text": ( "You are a precise video-frame visual QA tool for an exact-match benchmark. " "The images are sampled frames from the same YouTube video in chronological order. " "Answer the user's question using the visible evidence. Return only the final answer.\n\n" f"Question:\n{question}" ), } ] for frame in frames: content.append({"type": "image_url", "image_url": {"url": _image_to_data_url(frame)}}) client = OpenAI( api_key=api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", timeout=45.0, max_retries=1, ) response = client.chat.completions.create( model=os.getenv("DASHSCOPE_VL_MODEL", "qwen3.6-plus"), messages=[{"role": "user", "content": content}], temperature=0, max_tokens=256, ) answer = response.choices[0].message.content return answer.strip() if answer else "" except Exception as e: return f"Failed to analyze YouTube video frames: {e}" @tool def fetch_webpage_text(url: str, max_chars: int = 12000) -> str: """ Fetch a public webpage and return readable text. Use this when search results identify a specific source page or article. """ try: response = requests.get( url, timeout=30, headers={"User-Agent": "Mozilla/5.0 compatible GAIA coursework agent"}, ) response.raise_for_status() text = response.text text = re.sub(r"(?is)|", " ", text) text = re.sub(r"(?s)<[^>]+>", " ", text) text = re.sub(r"\s+", " ", text).strip() return text[:max_chars] except Exception as e: return f"Failed to fetch webpage {url}: {e}" @tool def web_search_text(query: str, max_results: int = 5) -> str: """ Search the public web without a paid API key and return compact results. Use this as a fallback when Tavily is unavailable or when broad web lookup is needed. """ try: url = f"https://duckduckgo.com/html/?q={quote_plus(query)}" response = requests.get( url, timeout=30, headers={"User-Agent": "Mozilla/5.0 compatible GAIA coursework agent"}, ) response.raise_for_status() html = response.text blocks = re.findall(r'(?is)
\s*
', html) results = [] for block in blocks: title_match = re.search(r'(?is)]+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?)', block) snippet_match = re.search(r'(?is)]+class="result__snippet"[^>]*>(.*?)', block) if not title_match: continue link = unescape(title_match.group(1)) title = re.sub(r"(?s)<[^>]+>", " ", title_match.group(2)) snippet = re.sub(r"(?s)<[^>]+>", " ", snippet_match.group(1)) if snippet_match else "" title = re.sub(r"\s+", " ", unescape(title)).strip() snippet = re.sub(r"\s+", " ", unescape(snippet)).strip() results.append(f"Title: {title}\nURL: {link}\nSnippet: {snippet}") if len(results) >= max_results: break if results: return "\n\n---\n\n".join(results) text = re.sub(r"(?s)<[^>]+>", " ", html) text = re.sub(r"\s+", " ", unescape(text)).strip() return text[:8000] except Exception as e: return f"Failed to search web for {query}: {e}" @tool def wikipedia_api_search(query: str, max_chars: int = 12000) -> str: """ Search Wikipedia through the public API and return extracts from top pages. Use this for factual questions likely answerable from Wikipedia. """ try: search_url = ( "https://en.wikipedia.org/w/api.php?action=query&list=search&format=json" f"&srlimit=3&srsearch={quote_plus(query)}" ) search_response = requests.get(search_url, timeout=30) search_response.raise_for_status() hits = search_response.json().get("query", {}).get("search", []) outputs = [] for hit in hits: title = hit.get("title", "") if not title: continue extract_url = ( "https://en.wikipedia.org/w/api.php?action=query&prop=extracts&explaintext=1" f"&format=json&titles={quote_plus(title)}" ) extract_response = requests.get(extract_url, timeout=30) extract_response.raise_for_status() pages = extract_response.json().get("query", {}).get("pages", {}) for page in pages.values(): outputs.append(f"Title: {page.get('title', title)}\n{page.get('extract', '')}") return "\n\n---\n\n".join(outputs)[:max_chars] except Exception as e: return f"Failed to search Wikipedia for {query}: {e}" AUDIO_SUFFIXES = {".mp3", ".wav", ".m4a", ".aac", ".flac", ".ogg", ".opus", ".webm"} AUDIO_FORMAT_BY_SUFFIX = { ".mp3": "mp3", ".wav": "wav", ".m4a": "m4a", ".aac": "aac", ".flac": "flac", ".ogg": "ogg", ".opus": "opus", ".webm": "webm", } # DashScope Qwen-Omni Base64 input limit is documented as encoded Base64 < 10MB. # Keep a margin to avoid request rejection. MAX_AUDIO_BASE64_CHARS = int(os.getenv("MAX_AUDIO_BASE64_CHARS", str(9_500_000))) def _audio_format_from_path(path: Path) -> str: """Infer audio format required by input_audio.format.""" return AUDIO_FORMAT_BY_SUFFIX.get(path.suffix.lower(), path.suffix.lower().lstrip(".") or "mp3") def _encode_audio_base64(path: Path) -> str: """Read a local audio file and encode it as Base64 text.""" return base64.b64encode(path.read_bytes()).decode("utf-8") def _audio_to_data_url(path: Path) -> tuple[str, str]: """ Convert a local audio file to DashScope-compatible Base64 Data URL. DashScope examples use: data:;base64, together with input_audio.format. """ audio_format = _audio_format_from_path(path) encoded = _encode_audio_base64(path) return f"data:;base64,{encoded}", audio_format def _compress_audio_if_needed(path: Path) -> Path: """ If Base64 audio is too large, try to compress it with ffmpeg. This is a best-effort fallback. If ffmpeg is unavailable or compression fails, return the original path. """ try: encoded_len = len(_encode_audio_base64(path)) if encoded_len <= MAX_AUDIO_BASE64_CHARS: return path except Exception: return path ffmpeg = shutil.which("ffmpeg") if not ffmpeg: return path compressed_path = path.with_suffix(".compressed.mp3") try: # Mono, 16kHz, low bitrate is usually enough for speech QA. result = subprocess.run( [ ffmpeg, "-y", "-i", str(path), "-vn", "-ac", "1", "-ar", "16000", "-b:a", "32k", str(compressed_path), ], capture_output=True, text=True, timeout=30, ) if result.returncode == 0 and compressed_path.exists() and compressed_path.stat().st_size > 0: if len(_encode_audio_base64(compressed_path)) <= MAX_AUDIO_BASE64_CHARS: return compressed_path return path except Exception: return path def _collect_stream_text(completion) -> str: """ Collect text from OpenAI-compatible streaming chunks. Handles common delta.content layouts robustly. """ pieces = [] for chunk in completion: try: if not getattr(chunk, "choices", None): continue delta = chunk.choices[0].delta content = getattr(delta, "content", None) if isinstance(content, str) and content: pieces.append(content) elif isinstance(content, list): for item in content: if isinstance(item, dict): text = item.get("text") or item.get("content") if text: pieces.append(str(text)) else: text = getattr(item, "text", None) if text: pieces.append(str(text)) except Exception: continue return "".join(pieces).strip() def _dashscope_client() -> OpenAI: """Create OpenAI-compatible DashScope client.""" api_key = os.getenv("DASHSCOPE_API_KEY") if not api_key: raise RuntimeError("DASHSCOPE_API_KEY is not set.") base_url = os.getenv( "DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1", ) return OpenAI( api_key=api_key, base_url=base_url, ) def _call_qwen_omni_audio( audio_path: Path, prompt: str, max_tokens: int = 1024, ) -> str: """ Call Qwen-Omni with local audio as Base64 Data URL. Returns text only. """ usable_path = _compress_audio_if_needed(audio_path) data_url, audio_format = _audio_to_data_url(usable_path) encoded_size = len(data_url) if encoded_size > MAX_AUDIO_BASE64_CHARS + 100: return ( f"Audio file is too large for Base64 input after compression attempt. " f"file={usable_path}, encoded_chars={encoded_size}, " f"limit={MAX_AUDIO_BASE64_CHARS}." ) client = _dashscope_client() audio_model = os.getenv("DASHSCOPE_AUDIO_MODEL", "qwen3.5-omni-plus") messages = [ { "role": "user", "content": [ { "type": "input_audio", "input_audio": { "data": data_url, "format": audio_format, }, }, { "type": "text", "text": prompt, }, ], } ] kwargs = dict( model=audio_model, messages=messages, modalities=["text"], stream=True, stream_options={"include_usage": True}, max_tokens=max_tokens, temperature=0, ) # Some Qwen-Omni Flash variants require non-thinking mode for multimodal usage. # If the backend rejects extra_body, retry without it. try: completion = client.chat.completions.create( **kwargs, extra_body={"enable_thinking": False}, ) except Exception: completion = client.chat.completions.create(**kwargs) return _collect_stream_text(completion) def _extract_final_answer_from_transcript( question: str, transcript: str, max_tokens: int = 256, ) -> str: """ Use a text model to extract the exact final answer from the transcript. This is intentionally separate from transcription to reduce hallucination. """ client = _dashscope_client() text_model = os.getenv("DASHSCOPE_TEXT_MODEL", "qwen3.5-flash") prompt = f""" You are an exact-match answer extractor for an evaluation benchmark. Original question: {question} Audio transcript: {transcript} Task: Answer the original question using only the transcript. Rules: - Return only the final answer. - No explanation. - No markdown. - No citations. - Do not mention the transcript. - If the question asks for a list of ingredients/items/pages/names, return exactly that list. - If comma-separated output is appropriate, use comma + space. - If the question asks for numbers, return only the requested numbers. - If the answer is not present in the transcript, return the best concise answer from the transcript, not an apology. """.strip() response = client.chat.completions.create( model=text_model, messages=[ { "role": "system", "content": "You extract exact final answers from transcripts.", }, { "role": "user", "content": prompt, }, ], temperature=0, max_tokens=max_tokens, ) answer = response.choices[0].message.content return answer.strip() if answer else "" @tool def answer_audio_question( task_id: str = "", file_path: str = "", question: str = "", return_transcript: bool = False, ) -> str: """ Transcribe an attached audio file and answer the user's question. Use this tool when the question mentions an attached audio file, recording, mp3/wav/m4a file, professor recording, voicemail, spoken recipe, lecture audio, or asks what someone says in an audio attachment. Provide task_id when available, or file_path if the file has already been downloaded. Also include the original question. """ path = _resolve_file(task_id=task_id, file_path=file_path) if path is None: return "No audio file could be resolved from the given task_id or file_path." suffix = path.suffix.lower() if suffix not in AUDIO_SUFFIXES: inferred = _infer_ext_from_bytes(path.read_bytes()[:1024], "") if inferred in AUDIO_SUFFIXES: new_path = path.with_suffix(inferred) if not new_path.exists(): new_path.write_bytes(path.read_bytes()) path = new_path suffix = inferred else: return ( f"Resolved file is not a supported audio file. " f"file_path={path}, suffix={suffix}, inferred={inferred}, supported={sorted(AUDIO_SUFFIXES)}" ) if not question: question = "What is said in this audio? Return the important content concisely." try: transcription_prompt = f""" You are a careful speech transcription system. Listen to the audio and produce a clean transcript. Rules: - Transcribe all spoken words relevant to the user's question. - Preserve proper nouns, numbers, page numbers, ingredient names, and units carefully. - If the audio contains a recipe, preserve ingredient names exactly. - If the audio contains a lecture or assignment instructions, preserve page numbers and topics exactly. - Do not summarize too aggressively. - Return transcript text only. User's question for context: {question} """.strip() transcript = _call_qwen_omni_audio( audio_path=path, prompt=transcription_prompt, max_tokens=1200, ).strip() if not transcript: return "Audio transcription returned empty text." if transcript.lower().startswith("audio file is too large"): return transcript final_answer = _extract_final_answer_from_transcript( question=question, transcript=transcript, max_tokens=256, ).strip() if return_transcript: return ( f"Final answer:\n{final_answer}\n\n" f"Transcript:\n{transcript}" ).strip() return final_answer if final_answer else transcript except Exception as e: return f"Failed to process audio {path}: {e}"