Spaces:
Sleeping
Sleeping
| import tempfile | |
| import requests | |
| import os | |
| from urllib.parse import urlparse | |
| from typing import Optional, List | |
| import yt_dlp | |
| import imageio | |
| from PIL import Image | |
| import whisper | |
| from dotenv import load_dotenv | |
| # Fallback tool decorator if gaia_benchmark.tools is not available | |
| try: | |
| from gaia_benchmark.tools import tool | |
| except ImportError: | |
| def tool(func): | |
| return func | |
| load_dotenv() | |
| def use_vision_model(question: str, images: List[Image.Image]) -> str: | |
| return "Vision model is not available for Mistral. Please integrate a separate endpoint for image analysis." | |
| def review_youtube_video(url: str, question: str) -> str: | |
| return "This tool is currently unsupported with Mistral. Please remove or replace." | |
| def youtube_frames_to_images(url: str, sample_interval_seconds: int = 5) -> List[Image.Image]: | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| ydl_opts = { | |
| 'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]/best', | |
| 'outtmpl': os.path.join(tmpdir, 'video.%(ext)s'), | |
| 'quiet': True, | |
| 'noplaylist': True, | |
| 'merge_output_format': 'mp4', | |
| 'force_ipv4': True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| video_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.mp4')), None) | |
| reader = imageio.get_reader(video_path) | |
| fps = reader.get_meta_data().get('fps', 25) | |
| frame_interval = int(fps * sample_interval_seconds) | |
| images = [Image.fromarray(frame) for idx, frame in enumerate(reader) if idx % frame_interval == 0] | |
| reader.close() | |
| return images | |
| def read_file(filepath: str) -> str: | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as file: | |
| return file.read() | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| def download_file_from_url(url: str, filename: Optional[str] = None) -> str: | |
| try: | |
| if not filename: | |
| filename = os.path.basename(urlparse(url).path) or f"download_{os.urandom(4).hex()}" | |
| filepath = os.path.join(tempfile.gettempdir(), filename) | |
| response = requests.get(url) | |
| with open(filepath, 'wb') as f: | |
| f.write(response.content) | |
| return filepath | |
| except Exception as e: | |
| return f"Error downloading file: {str(e)}" | |
| def extract_text_from_image(image_path: str) -> str: | |
| try: | |
| import pytesseract | |
| return pytesseract.image_to_string(Image.open(image_path)) | |
| except Exception as e: | |
| return f"Error extracting text: {str(e)}" | |
| def analyze_csv_file(file_path: str, query: str) -> str: | |
| try: | |
| import pandas as pd | |
| df = pd.read_csv(file_path) | |
| return f"Loaded CSV with shape {df.shape} and columns: {df.columns.tolist()}" | |
| except Exception as e: | |
| return f"CSV error: {str(e)}" | |
| def analyze_excel_file(file_path: str, query: str) -> str: | |
| try: | |
| import pandas as pd | |
| df = pd.read_excel(file_path) | |
| return f"Loaded Excel with shape {df.shape} and columns: {df.columns.tolist()}" | |
| except Exception as e: | |
| return f"Excel error: {str(e)}" | |
| def youtube_transcribe(url: str) -> str: | |
| model = whisper.load_model("small") | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': os.path.join(tmpdir, 'audio.%(ext)s'), | |
| 'quiet': True, | |
| 'noplaylist': True, | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'wav', | |
| }], | |
| 'force_ipv4': True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| audio_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.wav')), None) | |
| return model.transcribe(audio_path)['text'] | |
| def transcribe_audio(audio_file_path: str) -> str: | |
| return whisper.load_model("small").transcribe(audio_file_path)['text'] |