import os import re import json import time import bm25s import requests import yaml from pathlib import Path from langchain_core.messages import SystemMessage _YOUTUBE_ID_RE = re.compile( r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/|youtube\.com/v/|youtube\.com/shorts/)([\w-]{11})' ) def extract_youtube_id(url: str) -> str | None: """Pull the 11-char video ID from any common YouTube URL form, or accept a bare ID.""" m = _YOUTUBE_ID_RE.search(url) if m: return m.group(1) if re.fullmatch(r'[\w-]{11}', url.strip()): return url.strip() return None _FINAL_ANSWER_RE = re.compile(r'FINAL ANSWER:\s*(.*)', re.DOTALL | re.IGNORECASE) def extract_final_answer(content: str) -> str: """Pull the value after 'FINAL ANSWER:' (case-insensitive), or return the content stripped.""" content = content or "" m = _FINAL_ANSWER_RE.search(content) return (m.group(1) if m else content).strip() def load_config(path="config.yaml"): with open(path, "r") as f: return yaml.safe_load(f) def download_task_file( task_id: str, file_name: str, base_url: str, files_dir: str, max_retries: int = 3, timeout: int = 30, ) -> tuple[str | None, str]: """Download a task file from the GAIA scoring API. Returns (local_path, error_message). On success the error string is empty; on failure local_path is None and the error string says what went wrong. Retries on 5xx and network errors with exponential backoff (2s, 4s); 4xx is treated as a definitive "no file" answer and not retried. """ if not task_id or not file_name: return None, "missing task_id or file_name" safe_name = Path(file_name).name if not safe_name: return None, f"invalid file_name '{file_name}'" try: save_dir = Path(files_dir) / task_id save_dir.mkdir(parents=True, exist_ok=True) except Exception as e: return None, f"could not create cache dir: {e}" local_path = save_dir / safe_name if local_path.exists() and local_path.stat().st_size > 0: return str(local_path), "" url = f"{base_url}/files/{task_id}" last_err = "" for attempt in range(1, max_retries + 1): try: response = requests.get(url, timeout=timeout) status = response.status_code if status >= 500: last_err = f"HTTP {status} (attempt {attempt}/{max_retries})" if attempt < max_retries: time.sleep(2 * attempt) continue if status >= 400: return None, f"HTTP {status} from {url}" if not response.content: last_err = f"empty body (attempt {attempt}/{max_retries})" if attempt < max_retries: time.sleep(2 * attempt) continue local_path.write_bytes(response.content) return str(local_path), "" except requests.RequestException as e: last_err = f"{type(e).__name__}: {e} (attempt {attempt}/{max_retries})" if attempt < max_retries: time.sleep(2 * attempt) except Exception as e: return None, f"unexpected error: {type(e).__name__}: {e}" return None, f"all {max_retries} attempts failed; last: {last_err}" def load_prompt(prompt_location: str) -> SystemMessage: """Load system prompt from YAML file.""" with open(prompt_location) as f: try: prompt = yaml.safe_load(f)["prompt"] return SystemMessage(content=prompt) except yaml.YAMLError as exc: print(exc) return SystemMessage(content="You are a helpful assistant.") def init_bm25_index(corpus_file = "data/metadata.jsonl"): """BM25 Index Initialization (Local Corpus)""" try: if not os.path.exists(corpus_file): print(f"Warning: {corpus_file} not found. BM25 will use empty index.") return None, [], [] search_texts = [] # question-only — used for BM25 indexing corpus_texts = [] # Q+A+Steps — returned for context injection corpus_ids = [] with open(corpus_file, "r") as f: for line in f: item = json.loads(line) question = item.get('Question', '') answer = item.get('Final answer', '') steps = item.get('Annotator Metadata', {}).get('Steps', '') search_texts.append(question) parts = [f"Question: {question}"] if answer: parts.append(f"Final Answer: {answer}") if steps: parts.append(f"Solution Steps: {steps}") corpus_texts.append("\n".join(parts)) corpus_ids.append(item.get('task_id', '')) corpus_tokens = bm25s.tokenize(search_texts, stopwords="en", stemmer=None) retriever_bm25 = bm25s.BM25() retriever_bm25.index(corpus_tokens) print(f"BM25 Index initialized with {len(corpus_texts)} documents.") return retriever_bm25, corpus_texts, corpus_ids except Exception as e: print(f"Error initializing BM25: {e}") return None, [], [] def reciprocal_rank_fusion(results: list[list[dict]], k=60) -> list[tuple[dict, float]]: """ Fuse multiple ranked lists using Reciprocal Rank Fusion (RRF). """ fused_scores = {} for rank_list in results: for rank, doc in enumerate(rank_list): doc_id = doc["metadata"]["task_id"] doc_content = doc["content"] if doc_id not in fused_scores: fused_scores[doc_id] = {"id": doc_id, "content": doc_content, "score": 0.0} fused_scores[doc_id]["score"] += 1.0 / (k + rank + 1) sorted_results = sorted(fused_scores.values(), key=lambda x: x["score"], reverse=True) return [(item["id"], item["content"], item["score"]) for item in sorted_results]