Spaces:
Sleeping
Sleeping
ernani
Fixing youtube - in case it has network issues it still can access the offline transcripts
2d9eaee
| import os | |
| import io | |
| from typing import Dict, List, Optional, Any | |
| import requests | |
| from langchain.tools import BaseTool | |
| from langchain.schema import Document | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.tools import WikipediaQueryRun, DuckDuckGoSearchResults, TavilySearchResults | |
| from langchain_community.document_loaders import PythonLoader, ArxivLoader | |
| from langchain_community.utilities import WikipediaAPIWrapper, DuckDuckGoSearchAPIWrapper | |
| import pytube | |
| from PIL import Image | |
| import pandas as pd | |
| import librosa | |
| import json | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from langchain_community.document_loaders import YoutubeLoader | |
| import re | |
| import base64 | |
| from io import BytesIO | |
| from openai import OpenAI | |
| import aiohttp | |
| import logging | |
| from PyPDF2 import PdfReader | |
| from pydantic import Field | |
| logger = logging.getLogger(__name__) | |
| class ContentProcessingError(Exception): | |
| """Custom exception for content processing errors""" | |
| pass | |
| class ImageProcessingError(ContentProcessingError): | |
| """Specific exception for image processing errors""" | |
| pass | |
| class AudioProcessingError(ContentProcessingError): | |
| """Specific exception for audio processing errors""" | |
| pass | |
| class VideoProcessingError(ContentProcessingError): | |
| """Specific exception for video processing errors""" | |
| pass | |
| class WebProcessingError(ContentProcessingError): | |
| """Specific exception for web processing errors""" | |
| pass | |
| def encode_image_to_base64(image_content: bytes) -> str: | |
| """Convert image bytes to base64 string""" | |
| return base64.b64encode(image_content).decode('utf-8') | |
| class BaseContentTool(BaseTool): | |
| """Base class for all content processing tools""" | |
| text_splitter: RecursiveCharacterTextSplitter = Field(default_factory=lambda: RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| length_function=len, | |
| separators=["\n\n", "\n", " ", ""] | |
| )) | |
| def _get_file_metadata(self, task_id: str) -> dict: | |
| """Get file metadata from task ID""" | |
| # Note: This method can be removed or modified since the endpoint doesn't exist | |
| # For now, return a minimal metadata dict | |
| return { | |
| "task_id": task_id, | |
| "content_type": "application/octet-stream" | |
| } | |
| def _get_file_from_task_id(self, task_id: str, expected_type: str) -> bytes: | |
| """Helper method to get file content from task ID""" | |
| # Format the URL correctly for file retrieval | |
| base_url = "https://agents-course-unit4-scoring.hf.space/files" | |
| url = f"{base_url}/{task_id}" | |
| try: | |
| # Skip metadata check since that endpoint doesn't exist | |
| # Make request for file content | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| return response.content | |
| except requests.exceptions.RequestException as e: | |
| raise ContentProcessingError(f"Error fetching file: {str(e)}") | |
| except Exception as e: | |
| raise ContentProcessingError(f"Error processing file: {str(e)}") | |
| class WikipediaTool(BaseTool): | |
| """Tool for searching Wikipedia articles""" | |
| name: str = "wikipedia" | |
| description: str = "Search for information on Wikipedia. Useful for finding facts about people, places, events, concepts, etc." | |
| wikipedia_tool: WikipediaQueryRun = Field(default_factory=lambda: WikipediaQueryRun( | |
| api_wrapper=WikipediaAPIWrapper(top_k_results=5) | |
| )) | |
| def _run(self, question: str) -> str: | |
| """Search Wikipedia and return the result as a string""" | |
| try: | |
| # Try with optimized query | |
| results = self.wikipedia_tool.api_wrapper.run(question) | |
| # results is a list of dicts with 'title', 'summary', 'content', etc. | |
| formatted_results = [] | |
| for res in results: | |
| # Skip disambiguation pages | |
| if 'disambiguation' in res.get('title', '').lower(): | |
| continue | |
| summary = res.get('summary') or res.get('content') or '' | |
| if not summary: | |
| continue | |
| formatted_results.append( | |
| f'<Document source="wikipedia" title="{res.get("title", "")}">\n{summary}\n</Document>' | |
| ) | |
| if not formatted_results: | |
| # Fallback to web search if nothing found | |
| from langchain_community.tools import DuckDuckGoSearchResults | |
| web = DuckDuckGoSearchResults() | |
| web_result = web.run(question, max_results=2) | |
| return f"<Document source=\"web_fallback\">\n{web_result}\n</Document>" | |
| return "\n\n---\n\n".join(formatted_results)[:8000] | |
| except Exception as e: | |
| return f"Error searching Wikipedia: {str(e)}" | |
| class YouTubeVideoTool(BaseContentTool): | |
| """Tool for processing YouTube videos""" | |
| name: str = "youtube_video_processor" | |
| description: str = "Process YouTube videos to extract information" | |
| def _clean_url(self, url: str) -> str: | |
| """Clean the URL by removing trailing punctuation and whitespace""" | |
| # Remove trailing punctuation and whitespace | |
| url = url.strip().rstrip('.!?,;:') | |
| # Ensure we have a valid YouTube URL | |
| if 'youtu.be' in url: | |
| video_id = url.split('/')[-1].split('?')[0] | |
| return f"https://www.youtube.com/watch?v={video_id}" | |
| return url | |
| def _extract_video_id(self, url: str) -> str: | |
| """Extract video ID from URL""" | |
| if 'youtu.be' in url: | |
| return url.split('/')[-1].split('?')[0] | |
| elif 'youtube.com' in url: | |
| from urllib.parse import parse_qs, urlparse | |
| parsed = urlparse(url) | |
| return parse_qs(parsed.query)['v'][0] | |
| raise VideoProcessingError("Invalid YouTube URL format") | |
| def _get_transcript_with_api(self, video_id: str) -> str: | |
| """Get transcript using YouTube Transcript API directly""" | |
| try: | |
| transcript_list = YouTubeTranscriptApi.get_transcript( | |
| video_id, | |
| languages=['en', 'en-US', 'en-GB'] | |
| ) | |
| if not transcript_list: | |
| # Try with auto-translated transcript | |
| transcript_list = YouTubeTranscriptApi.get_transcript( | |
| video_id, | |
| languages=['en'], | |
| translation='en' | |
| ) | |
| # Format transcript with timestamps | |
| transcript_text = "\n".join([ | |
| f"[{entry['start']:.2f}s]: {entry['text']}" | |
| for entry in transcript_list | |
| ]) | |
| return transcript_text | |
| except Exception as e: | |
| return None | |
| def _run(self, video_url: str, question: str = "") -> List[Document]: | |
| try: | |
| # Clean the URL first | |
| clean_url = self._clean_url(video_url) | |
| video_id = self._extract_video_id(clean_url) | |
| metadata = { | |
| "source": video_id, | |
| "type": "youtube_video", | |
| "video_id": video_id, | |
| "question_context": question | |
| } | |
| # Prepare transcript path | |
| temp_dir = "temp_youtube" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| transcript_path = os.path.join(temp_dir, f"{video_id}_transcript.txt") | |
| transcript_text = None | |
| # If transcript file exists, read it and skip fetching | |
| if os.path.exists(transcript_path): | |
| with open(transcript_path, "r", encoding="utf-8") as f: | |
| transcript_text = f.read() | |
| else: | |
| # Try multiple methods to get transcript | |
| # Method 1: Try YouTube Transcript API directly | |
| transcript_text = self._get_transcript_with_api(video_id) | |
| # Method 2: Try YoutubeLoader if Method 1 failed | |
| if not transcript_text: | |
| try: | |
| loader = YoutubeLoader.from_youtube_url( | |
| clean_url, | |
| add_video_info=False, | |
| language=["en"] | |
| ) | |
| documents = loader.load() | |
| if documents: | |
| transcript_text = documents[0].page_content | |
| # Add video info from metadata if available | |
| if documents[0].metadata: | |
| metadata.update(documents[0].metadata) | |
| except Exception as e: | |
| pass | |
| # Save transcript to temp_youtube directory if available | |
| if transcript_text: | |
| with open(transcript_path, "w", encoding="utf-8") as f: | |
| f.write(transcript_text) | |
| # If we got transcript text, create and return document | |
| if transcript_text: | |
| # Extract the most relevant parts for the question if the question is specific | |
| if question and len(question) > 10: | |
| # Use regex to find sections that might contain answers to specific questions | |
| relevant_sections = [] | |
| # Look for question-relevant keywords in the transcript | |
| question_lower = question.lower() | |
| keywords = [word for word in question_lower.split() if len(word) > 3] | |
| # Add relevant sections containing keywords | |
| lines = transcript_text.split('\n') | |
| for i, line in enumerate(lines): | |
| if any(keyword in line.lower() for keyword in keywords): | |
| # Add context around the match (3 lines before and after) | |
| start = max(0, i - 3) | |
| end = min(len(lines), i + 4) | |
| relevant_sections.extend(lines[start:end]) | |
| # If we found relevant sections, combine them | |
| if relevant_sections: | |
| processed_text = "\n".join(relevant_sections) | |
| else: | |
| processed_text = transcript_text | |
| else: | |
| processed_text = transcript_text | |
| return [Document( | |
| page_content=processed_text, | |
| metadata=metadata | |
| )] | |
| else: | |
| error_msg = "Could not retrieve transcript from YouTube. The video may not have captions available." | |
| return [Document( | |
| page_content=error_msg, | |
| metadata=metadata | |
| )] | |
| except Exception as e: | |
| error_msg = f"Error processing YouTube video: {str(e)}" | |
| logging.error(error_msg) | |
| return [Document( | |
| page_content=error_msg, | |
| metadata={"source": video_url, "type": "youtube_video", "error": str(e)} | |
| )] | |
| async def _arun(self, video_url: str) -> List[Document]: | |
| """Async version of _run""" | |
| # Implement if needed | |
| raise NotImplementedError("Async version not implemented yet") | |
| class PythonTool(BaseContentTool): | |
| """Tool for processing Python files""" | |
| name: str = "python_processor" | |
| description: str = "Process Python files to extract information" | |
| temp_dir: str = Field(default="temp_python") | |
| def __init__(self, **kwargs): | |
| super().__init__(**kwargs) | |
| os.makedirs(self.temp_dir, exist_ok=True) | |
| def _save_temp_python(self, content: bytes, task_id: str) -> str: | |
| """Save Python content to temporary file""" | |
| temp_path = os.path.join(self.temp_dir, f"{task_id}.py") | |
| try: | |
| with open(temp_path, "wb") as f: | |
| f.write(content) | |
| return temp_path | |
| except Exception as e: | |
| raise ContentProcessingError(f"Error saving temporary Python file: {str(e)}") | |
| def _clean_temp_file(self, file_path: str): | |
| """Clean up temporary file""" | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| except Exception: | |
| # Silent fail | |
| pass | |
| def _run(self, task_id: str, question: str = "") -> List[Document]: | |
| """Process Python file and return documents with extracted information""" | |
| temp_path = None | |
| try: | |
| # Get file content using base class method | |
| content = self._get_file_from_task_id(task_id, "python") | |
| # Save to temporary file for PythonLoader | |
| temp_path = self._save_temp_python(content, task_id) | |
| # Use PythonLoader to process the file | |
| loader = PythonLoader(temp_path) | |
| documents = loader.load() | |
| # Add metadata to documents | |
| for doc in documents: | |
| doc.metadata.update({ | |
| "source": task_id, | |
| "type": "python", | |
| "content_type": "python_code", | |
| "question_context": question | |
| }) | |
| return documents | |
| except Exception as e: | |
| error_msg = f"Error processing Python file: {str(e)}" | |
| logging.error(error_msg) | |
| return [Document( | |
| page_content=error_msg, | |
| metadata={"source": task_id, "type": "python", "error": str(e), "question_context": question} | |
| )] | |
| finally: | |
| # Clean up temporary file | |
| if temp_path: | |
| self._clean_temp_file(temp_path) | |
| async def _arun(self, task_id: str) -> List[Document]: | |
| """Async version of _run""" | |
| return self._run(task_id) | |
| class ImageTool(BaseContentTool): | |
| """Tool for processing images using GPT-4V""" | |
| name: str = "image_processor" | |
| description: str = "Process images from task IDs using GPT-4V" | |
| client: OpenAI = Field(default_factory=OpenAI) | |
| base_system_prompt: str = """You are an expert at analyzing images with strong attention to detail. | |
| Your task is to provide a detailed, objective description of the image content. | |
| Focus on: | |
| 1. Key visual elements and their relationships | |
| 2. Any text or numbers present in the image | |
| 3. Specific details that might be relevant to answering questions about the image | |
| 4. Technical or specialized content (diagrams, charts, game positions, etc.) | |
| Provide your analysis in a clear, structured format that can be used by a language model to answer specific questions about the image.""" | |
| def _generate_context_aware_prompt(self, question: str) -> str: | |
| """Generate a context-aware system prompt based on the question""" | |
| # Extract key information from the question | |
| question_lower = question.lower() | |
| # Add specialized instructions based on question context | |
| specialized_instructions = [] | |
| if "chess" in question_lower: | |
| specialized_instructions.append(""" | |
| For chess positions: | |
| - Describe the position of all pieces using algebraic notation | |
| - Note any significant tactical or strategic elements | |
| - If asked about moves, specify them in algebraic notation""") | |
| if any(word in question_lower for word in ["count", "number", "how many"]): | |
| specialized_instructions.append(""" | |
| Pay special attention to counting and quantifying elements in the image. | |
| Provide specific numbers and ensure accuracy in counting.""") | |
| if "text" in question_lower or "write" in question_lower or "read" in question_lower: | |
| specialized_instructions.append(""" | |
| Focus on any text content: | |
| - Read and transcribe all visible text | |
| - Note the location and context of text elements | |
| - Pay attention to any numbers, symbols, or special characters""") | |
| if "color" in question_lower or "colour" in question_lower: | |
| specialized_instructions.append(""" | |
| Pay special attention to colors: | |
| - Describe colors precisely | |
| - Note color patterns or relationships | |
| - Mention any color-based groupings or distinctions""") | |
| # Combine base prompt with specialized instructions | |
| full_prompt = self.base_system_prompt | |
| if specialized_instructions: | |
| full_prompt += "\n\nSpecific focus areas for this image:\n" + "\n".join(specialized_instructions) | |
| return full_prompt | |
| def _process_image_with_gpt4o(self, image_content: bytes, question: str) -> str: | |
| """Process image using GPT-4o API with context from the question""" | |
| try: | |
| # Convert image to base64 | |
| base64_image = encode_image_to_base64(image_content) | |
| # Generate context-aware system prompt | |
| system_prompt = self._generate_context_aware_prompt(question) | |
| # Create content with base64 image data directly in the message | |
| content = [ | |
| {"type": "text", "text": system_prompt + "\n\nAnalyze this image in detail, keeping in mind the following question: " + question}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} | |
| ] | |
| # Call GPT-4o | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "user", "content": content} | |
| ], | |
| max_tokens=500, | |
| temperature=0.2 # Lower temperature for more focused analysis | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| raise ImageProcessingError(f"Error processing image with GPT-4o: {str(e)}") | |
| def _run(self, task_id: str, question: str = "") -> Document: | |
| """Process image and create a document with analysis""" | |
| try: | |
| # Get image content | |
| image_content = self._get_file_from_task_id(task_id, "image") | |
| # Process image with GPT-4o | |
| analysis = self._process_image_with_gpt4o(image_content, question) | |
| # Create document with metadata | |
| return Document( | |
| page_content=analysis, | |
| metadata={ | |
| "source": task_id, | |
| "type": "image", | |
| "content_type": "gpt4o_analysis", | |
| "question_context": question | |
| } | |
| ) | |
| except Exception as e: | |
| raise ImageProcessingError(f"Error processing image: {str(e)}") | |
| def _arun(self, task_id: str) -> Document: | |
| """Async version of _run""" | |
| # Implement if needed | |
| raise NotImplementedError("Async version not implemented yet") | |
| class AudioTool(BaseContentTool): | |
| """Tool for processing audio files using Whisper""" | |
| name: str = "audio_processor" | |
| description: str = "Process audio files from task IDs using Whisper" | |
| client: OpenAI = Field(default_factory=OpenAI) | |
| temp_dir: str = Field(default="temp_audio") | |
| def __init__(self, **kwargs): | |
| super().__init__(**kwargs) | |
| os.makedirs(self.temp_dir, exist_ok=True) | |
| def _save_temp_audio(self, audio_content: bytes, task_id: str) -> str: | |
| """Save audio content to temporary file""" | |
| # Create a temporary file with the task ID as name | |
| temp_path = os.path.join(self.temp_dir, f"{task_id}.mp3") | |
| try: | |
| with open(temp_path, "wb") as f: | |
| f.write(audio_content) | |
| return temp_path | |
| except Exception as e: | |
| raise AudioProcessingError(f"Error saving temporary audio file: {str(e)}") | |
| def _clean_temp_file(self, file_path: str): | |
| """Clean up temporary file""" | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| except Exception: | |
| # Silent fail | |
| pass | |
| def _transcribe_with_whisper(self, audio_path: str, question: str = "") -> dict: | |
| """Transcribe audio using Whisper API""" | |
| try: | |
| with open(audio_path, "rb") as audio_file: | |
| # Determine if we need timestamps based on the question | |
| timestamps_needed = any(word in question.lower() | |
| for word in ["when", "time", "moment", "timestamp"]) | |
| # Call Whisper API | |
| response = self.client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| response_format="verbose_json", | |
| timestamp_granularities=["segment"] if timestamps_needed else None, | |
| language="en" # You might want to make this dynamic based on the content | |
| ) | |
| return response | |
| except Exception as e: | |
| raise AudioProcessingError(f"Error transcribing audio with Whisper: {str(e)}") | |
| def _extract_relevant_info(self, transcription, question: str = "") -> str: | |
| """Extract relevant information from transcription based on question""" | |
| try: | |
| # Handle both object attribute and dictionary access | |
| if hasattr(transcription, 'text'): | |
| # Object attribute access (new API format) | |
| full_text = transcription.text | |
| # Check if timestamps are available and needed | |
| if hasattr(transcription, 'segments') and any(word in question.lower() | |
| for word in ["when", "time", "moment", "timestamp"]): | |
| segments = transcription.segments | |
| # Format text with timestamps | |
| text_with_timestamps = "\n".join([ | |
| f"[{segment.start:.2f}s - {segment.end:.2f}s]: {segment.text}" | |
| for segment in segments | |
| ]) | |
| return text_with_timestamps | |
| return full_text | |
| # Dictionary access (old API format) | |
| elif isinstance(transcription, dict): | |
| full_text = transcription.get("text", "") | |
| # Check if timestamps are available and needed | |
| if "segments" in transcription and any(word in question.lower() | |
| for word in ["when", "time", "moment", "timestamp"]): | |
| segments = transcription["segments"] | |
| # Format text with timestamps | |
| text_with_timestamps = "\n".join([ | |
| f"[{segment.get('start', 0):.2f}s - {segment.get('end', 0):.2f}s]: {segment.get('text', '')}" | |
| for segment in segments | |
| ]) | |
| return text_with_timestamps | |
| return full_text | |
| # Fallback | |
| elif str(transcription): | |
| return str(transcription) | |
| return "Could not extract text from transcription." | |
| except Exception as e: | |
| print(f"Error extracting info from transcription: {e}") | |
| print(f"Transcription type: {type(transcription)}") | |
| print(f"Transcription: {transcription}") | |
| return "Error extracting information from audio transcription." | |
| def _run(self, task_id: str, question: str = "") -> List[Document]: | |
| """Process audio file and create a document with transcription""" | |
| temp_path = None | |
| try: | |
| # Get audio content using base class method | |
| audio_content = self._get_file_from_task_id(task_id, "audio") | |
| # Save to temporary file | |
| temp_path = self._save_temp_audio(audio_content, task_id) | |
| # Transcribe with Whisper | |
| transcription = self._transcribe_with_whisper(temp_path, question) | |
| # Extract relevant information | |
| processed_content = self._extract_relevant_info(transcription, question) | |
| # Create document with metadata | |
| return [Document( | |
| page_content=processed_content, | |
| metadata={ | |
| "source": task_id, | |
| "type": "audio", | |
| "content_type": "whisper_transcription", | |
| "question_context": question, | |
| "language": getattr(transcription, 'language', 'en') if hasattr(transcription, 'language') else transcription.get('language', 'en') if isinstance(transcription, dict) else 'en', | |
| "duration": getattr(transcription, 'duration', None) if hasattr(transcription, 'duration') else transcription.get('duration', None) if isinstance(transcription, dict) else None | |
| } | |
| )] | |
| except Exception as e: | |
| error_msg = f"Error processing audio: {str(e)}" | |
| logging.error(error_msg) | |
| return [Document( | |
| page_content=error_msg, | |
| metadata={"source": task_id, "type": "audio", "error": str(e)} | |
| )] | |
| finally: | |
| # Clean up temporary file | |
| if temp_path: | |
| self._clean_temp_file(temp_path) | |
| async def _arun(self, task_id: str, question: str = "") -> List[Document]: | |
| """Async version of _run""" | |
| return self._run(task_id, question) | |
| class ExcelTool(BaseContentTool): | |
| name: str = "excel_tool" | |
| description: str = "Tool for processing Excel files and extracting their content" | |
| def _process_excel_content(self, content: bytes) -> pd.DataFrame: | |
| """Process Excel content and return a pandas DataFrame.""" | |
| try: | |
| return pd.read_excel(io.BytesIO(content)) | |
| except Exception as e: | |
| logging.error(f"Error reading Excel content: {str(e)}") | |
| raise ValueError(f"Failed to read Excel content: {str(e)}") | |
| def _dataframe_to_text(self, df: pd.DataFrame) -> str: | |
| """Convert DataFrame to a readable text format optimized for LLM analysis.""" | |
| # Use to_string for a clean, tabular format | |
| table_str = df.to_string(index=False) | |
| return f"Table:\n{table_str}" | |
| def _run(self, task_id: str, question: str = "") -> List[Document]: | |
| """Process Excel file content and return documents with extracted information.""" | |
| try: | |
| # Get file content using base class method | |
| content = self._get_file_from_task_id(task_id, "excel") | |
| # Process Excel content | |
| df = self._process_excel_content(content) | |
| # Convert DataFrame to text | |
| text_content = self._dataframe_to_text(df) | |
| # Create metadata | |
| metadata = { | |
| "source": task_id, | |
| "content_type": "excel", | |
| "row_count": len(df), | |
| "column_count": len(df.columns), | |
| "columns": df.columns.tolist(), | |
| "question_context": question | |
| } | |
| # Create and return document | |
| return [Document( | |
| page_content=text_content, | |
| metadata=metadata | |
| )] | |
| except Exception as e: | |
| error_msg = f"Error processing Excel file: {str(e)}" | |
| logging.error(error_msg) | |
| return [Document( | |
| page_content=error_msg, | |
| metadata={"source": task_id, "content_type": "error", "question_context": question} | |
| )] | |
| async def _arun(self, task_id: str) -> List[Document]: | |
| """Async version of _run.""" | |
| return self._run(task_id) | |
| class ArvixSearchTool(BaseTool): | |
| """Tool for searching Arxiv for a query and returning maximum 3 results as formatted string.""" | |
| name: str = "arvix_search" | |
| description: str = "Search Arxiv for a query and return maximum 3 results as formatted string." | |
| def _run(self, query: str) -> str: | |
| """Search Arxiv for a query and return maximum 3 results as formatted string.""" | |
| try: | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| # Rank by keyword overlap | |
| def score(doc): | |
| qwords = set(query.lower().split()) | |
| content = (doc.page_content or "").lower() | |
| return sum(1 for w in qwords if w in content) | |
| search_docs = sorted(search_docs, key=score, reverse=True) | |
| formatted = [] | |
| for doc in search_docs: | |
| meta = doc.metadata | |
| title = meta.get('Title') or meta.get('title') or '' | |
| authors = meta.get('Authors') or meta.get('authors') or '' | |
| year = meta.get('Year') or meta.get('year') or '' | |
| link = meta.get('Entry ID') or meta.get('entry_id') or '' | |
| abstract = doc.page_content[:1200] | |
| formatted.append( | |
| f'<Document source="arxiv" title="{title}" authors="{authors}" year="{year}" link="{link}">\n{abstract}\n</Document>' | |
| ) | |
| if not formatted: | |
| return "No relevant arXiv results found." | |
| return "\n\n---\n\n".join(formatted)[:8000] | |
| except Exception as e: | |
| return f"Error searching arXiv: {str(e)}" | |
| class WebSearchTool(BaseTool): | |
| """Tool for web search using DuckDuckGo""" | |
| name: str = "web_search" | |
| description: str = "Search the web for information. Useful for questions about current events, specific facts, or topics not covered in Wikipedia." | |
| #search_tool: DuckDuckGoSearchResults = Field(default_factory=DuckDuckGoSearchResults) | |
| search_tool: TavilySearchResults = Field(default_factory=TavilySearchResults) | |
| def _extract_links_from_results(self, search_result) -> list: | |
| """Extract links from search results, robust to type.""" | |
| links = [] | |
| try: | |
| # If result is a string (old DuckDuckGo style) | |
| if isinstance(search_result, str): | |
| parts = search_result.split('link:') | |
| for part in parts[1:]: | |
| url = part.split(',')[0].strip() | |
| if url.startswith('http') and url not in links: | |
| links.append(url) | |
| # If result is a list of dicts (Tavily or other modern search tools) | |
| elif isinstance(search_result, list): | |
| for item in search_result: | |
| if isinstance(item, dict) and 'url' in item: | |
| url = item['url'] | |
| if url.startswith('http') and url not in links: | |
| links.append(url) | |
| # Add more handling if your tool returns other types | |
| except Exception as e: | |
| print(f"Error extracting links: {str(e)}") | |
| return links | |
| def _is_promising_link(self, link: str, query: str) -> bool: | |
| query_terms = set(query.lower().split()) | |
| excluded_domains = [ | |
| 'youtube.com', 'facebook.com', 'twitter.com', 'instagram.com', | |
| 'pinterest.com', 'reddit.com', 'tiktok.com', 'linkedin.com' | |
| ] | |
| for domain in excluded_domains: | |
| if domain in link: | |
| return False | |
| preferred_domains = [ | |
| 'wikipedia.org', 'britannica.com', 'scholarpedia.org', | |
| '.edu', '.gov', '.org' | |
| ] | |
| for domain in preferred_domains: | |
| if domain in link: | |
| return True | |
| return True | |
| def _scrape_page_content(self, url: str) -> str: | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| content_type = response.headers.get('Content-Type', '') | |
| if 'text/html' not in content_type: | |
| return "" | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| for script in soup(["script", "style", "nav", "footer", "header"]): | |
| script.decompose() | |
| text = soup.get_text(separator=' ', strip=True) | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| return text[:5000] | |
| except Exception as e: | |
| return f"Error scraping page content {str(e)}" | |
| def _extract_most_relevant_chunk(self, content: str, query: str) -> str: | |
| paragraphs = content.split('\n') | |
| query_words = set(query.lower().split()) | |
| best_score = 0 | |
| best_para = paragraphs[0] if paragraphs else "" | |
| for para in paragraphs: | |
| score = sum(1 for word in query_words if word in para.lower()) | |
| if score > best_score: | |
| best_score = score | |
| best_para = para | |
| return best_para | |
| def _get_page_title(self, url: str) -> str: | |
| try: | |
| headers = {'User-Agent': 'Mozilla/5.0'} | |
| response = requests.get(url, headers=headers, timeout=5) | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| return soup.title.string.strip() if soup.title and soup.title.string else url | |
| except Exception: | |
| return url | |
| def _run(self, query: str) -> str: | |
| try: | |
| search_result = self.search_tool.run(query, max_results=5) | |
| links = self._extract_links_from_results(search_result) | |
| seen = set() | |
| results = [] | |
| processed_count = 0 | |
| for link in links: | |
| if processed_count >= 3: | |
| break | |
| if link in seen or not self._is_promising_link(link, query): | |
| continue | |
| seen.add(link) | |
| content = self._scrape_page_content(link) | |
| if content: | |
| best_chunk = self._extract_most_relevant_chunk(content, query) | |
| title = self._get_page_title(link) | |
| results.append(f'<Document source="{link}" title="{title}">\n{best_chunk}\n</Document>') | |
| processed_count += 1 | |
| combined_result = search_result + "\n\n" + "\n\n".join(results) if results else search_result | |
| return combined_result[:10000] | |
| return search_result[:10000] | |
| except Exception as e: | |
| return f"Error searching the web: {str(e)}" | |
| class MathTool(BaseTool): | |
| """Tool for analyzing operation tables for algebraic properties (e.g., commutativity).""" | |
| name: str = "math_tool" | |
| description: str = "Analyze operation tables for algebraic properties (e.g., commutativity)." | |
| def _run(self, question: str) -> str: | |
| import re | |
| # Extract the set S | |
| set_match = re.search(r'\{([a-zA-Z0-9_,\s]+)\}', question) | |
| if not set_match: | |
| return "Could not find set." | |
| S = [x.strip() for x in set_match.group(1).split(',')] | |
| # Extract the markdown table (find all lines that start with | and have at least 2 |'s) | |
| table_lines = [line for line in question.splitlines() if line.strip().startswith('|') and line.count('|') > 2] | |
| if not table_lines: | |
| return "Could not find operation table." | |
| # Remove separator row (contains only dashes and pipes) | |
| table_lines = [line for line in table_lines if not set(line.replace('|', '').strip()) <= set('-')] | |
| if not table_lines: | |
| return "Could not find operation table after removing separator." | |
| # Parse header | |
| header = [cell.strip() for cell in table_lines[0].strip('|').split('|')][1:] | |
| table = {} | |
| for line in table_lines[1:]: | |
| row = [cell.strip() for cell in line.strip('|').split('|')] | |
| row_label = row[0] | |
| table[row_label] = {col: val for col, val in zip(header, row[1:])} | |
| # Check commutativity | |
| involved = set() | |
| for x in S: | |
| for y in S: | |
| if x != y: | |
| xy = table[x][y] | |
| yx = table[y][x] | |
| if xy != yx: | |
| involved.update([x, y, xy, yx]) | |
| involved = sorted([z for z in involved if z in S]) | |
| return ', '.join(involved) | |