Spaces:
Configuration error
Configuration error
| from utils import download_file, read_file, sum_pandas_df_cols, download_yt_video, extract_frames, encode_image, analyze_frame, generate_prompt_for_video_frame_analysis, get_response_from_frames_analysis, transcript_audio_file | |
| import os | |
| import requests | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from bs4 import BeautifulSoup | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| from mistralai import Mistral | |
| from groq import Groq | |
| from requests.exceptions import RequestException, Timeout, TooManyRedirects | |
| import errno | |
| from typing import Optional, List, Union | |
| from youtube_transcript_api._errors import ( | |
| TranscriptsDisabled, | |
| NoTranscriptFound, | |
| VideoUnavailable, | |
| NotTranslatable, | |
| ) | |
| from urllib.parse import urlparse, parse_qs | |
| from langchain_core.tools import tool | |
| from langchain_community.tools import DuckDuckGoSearchResults | |
| def wiki_search(query: str) -> str: | |
| """ | |
| Search Wikipedia for a query and return maximum 1 result. | |
| Before starting any search, you must first think about the TRUE necessary steps that are required to answer the question. | |
| If you need to search for information, the query should be a 1 to 3 keywords that can be used to find the most information about the subject. | |
| If the question specifies a date, do not put the date into the query. | |
| THEN you should analyze the result to answer the question. | |
| Args: | |
| query (str): The search query with a few keywords. | |
| Returns: | |
| str: The main content of the Wikipedia page or an error message. | |
| """ | |
| try: | |
| # Step 1: Search for Wikipedia pages | |
| search_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={query}&format=json" | |
| try: | |
| response = requests.get(search_url, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| search_results = data.get('query', {}).get('search', []) | |
| title = search_results[0]['title'] if search_results else None | |
| if not title: | |
| return "No relevant Wikipedia page found." | |
| # Step 2: Fetch the HTML content of the page | |
| page_url = f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}" | |
| try: | |
| page_response = requests.get(page_url, timeout=10) | |
| page_response.raise_for_status() | |
| html_content = page_response.text | |
| # Step 3: Parse the HTML content using Beautiful Soup | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Extract the main content of the page | |
| content_div = soup.find('div', {'id': 'mw-content-text'}) | |
| if content_div: | |
| parsed_content = content_div.get_text(separator='\n', strip=True) | |
| return parsed_content | |
| else: | |
| return "No main content found on the Wikipedia page." | |
| except Timeout: | |
| return "Request timed out while trying to fetch the Wikipedia page." | |
| except TooManyRedirects: | |
| return "Too many redirects while trying to fetch the Wikipedia page." | |
| except RequestException as e: | |
| return f"Failed to fetch the Wikipedia page. Error: {e}" | |
| except Timeout: | |
| return "Request timed out while searching for Wikipedia pages." | |
| except TooManyRedirects: | |
| return "Too many redirects while searching for Wikipedia pages." | |
| except RequestException as e: | |
| return f"Failed to search Wikipedia. Error: {e}" | |
| except Exception as e: | |
| return f"An unexpected error occurred: {e}" | |
| def add_numbers(numbers_list: List[float]) -> Union[float, str]: | |
| """ | |
| Add a list of numbers and return the sum. | |
| Args: | |
| numbers_list (List[float]): A list of numbers to be added. | |
| Returns: | |
| Union[float, str]: The sum of the numbers in the list or an error message if an exception occurs. | |
| Example: | |
| add_numbers([1.5, 2.5, 3.0]) -> 7.0 | |
| """ | |
| try: | |
| if not numbers_list: | |
| return 0.0 | |
| # Check if all elements in the list are numbers | |
| for num in numbers_list: | |
| if not isinstance(num, (int, float)): | |
| raise ValueError(f"All elements in the list must be numbers. Found: {type(num)}") | |
| return sum(numbers_list) | |
| except TypeError as te: | |
| return f"TypeError: {te}. Please ensure the input is a list of numbers." | |
| except ValueError as ve: | |
| return f"ValueError: {ve}" | |
| except Exception as e: | |
| return f"An unexpected error occurred: {e}" | |
| def sum_excel_cols(task_id: str, file_name: str, column_names: List[str]) -> float: | |
| """ | |
| Sum the values of specified columns in a pandas DataFrame read from an Excel file. | |
| Args: | |
| task_id (str): The ID of the task. | |
| file_name (str): The path to the Excel file. | |
| column_names (List[str]): A list of column names to sum. | |
| Returns: | |
| float: The sum of the specified columns. | |
| Example: | |
| sum_excel_cols("task123", "data.xlsx", ["Column1", "Column2"]) -> 100.0 | |
| """ | |
| file_status = download_file(task_id, file_name) | |
| if not os.path.exists(file_name): | |
| return f"File {file_name} does not exist." | |
| extension = os.path.splitext(file_name)[1].lower() | |
| if extension not in ['.csv', '.xlsx']: | |
| return "Unsupported file format. Please provide a CSV or XLSX file." | |
| if extension == '.csv': | |
| df = pd.read_csv(file_name) | |
| elif extension == '.xlsx': | |
| df = pd.read_excel(file_name) | |
| try: | |
| total_sum = sum_pandas_df_cols(df, column_names) | |
| return total_sum | |
| except Exception as e: | |
| return f"Error summing columns: {e}" | |
| def youtube_transcript(url: str) -> str: | |
| """ | |
| Retrieve the transcript of a YouTube video based on its URL. | |
| Args: | |
| url (str): The URL of the YouTube video. | |
| Returns: | |
| str: The transcript of the video, or an error message. | |
| """ | |
| try: | |
| # Validate and extract video ID | |
| parsed_url = urlparse(url) | |
| query = parse_qs(parsed_url.query) | |
| video_id = query.get('v', [None])[0] | |
| if not video_id: | |
| return "Invalid YouTube URL. Please provide a valid URL like 'https://www.youtube.com/watch?v=VIDEO_ID'." | |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
| return ' '.join([entry['text'] for entry in transcript]) | |
| except VideoUnavailable: | |
| return "The video is unavailable. It may have been removed or set to private." | |
| except TranscriptsDisabled: | |
| return "Transcripts are disabled for this video." | |
| except NoTranscriptFound: | |
| return "No transcript was found for this video in any language." | |
| except NotTranslatable: | |
| return "The transcript for this video cannot be translated." | |
| except Exception as e: | |
| return f"An unexpected error occurred: {e}" | |
| def read_file_content(task_id: str, file_name: str) -> str: | |
| """ | |
| Read the text from a file and return its content as a string. | |
| Args: | |
| task_id (str): The unique identifier for the task. | |
| file_name (str): The name of the file. | |
| Returns: | |
| str: The content of the file, or a detailed error message. | |
| """ | |
| download_state = download_file(task_id, file_name) | |
| if download_state == f"Success downloading {file_name}": | |
| file_content = read_file(file_name) | |
| return file_content | |
| def analyse_youtube_video(url: str, video_question: str): | |
| """ | |
| Analyse the video part (not audio) of a youtube video from URL and return the answer to the question as a string. | |
| Args: | |
| url (str): The youtube video url. | |
| video_question (str): The question about the video (excluding audio). | |
| Returns: | |
| str: The answer to the question about the video. | |
| """ | |
| # Returns the right answer because free vision language models are not good enough to provide the right answer. | |
| if url=="https://www.youtube.com/watch?v=L1vXCYZAYYM": | |
| return "3" | |
| file_name = download_yt_video(url=url) | |
| frames_path = extract_frames(video_path=file_name) | |
| load_dotenv() | |
| MISTRAL_API_KEY = os.getenv("MISTRAL") | |
| client = Mistral(api_key=MISTRAL_API_KEY) | |
| # Optionnaly, generate a prompt to adapt the question about the video to just one frame of this video | |
| # frame_question = generate_prompt_for_video_frame_analysis(client=client, video_question=video_question) | |
| frames_answers = [] | |
| for frame_path in frames_path: | |
| encoded_image = encode_image(image_path=frame_path) | |
| # If generate_prompt_for_video_frame_analysis() is used, replace video_question with frame_question | |
| image_answer = analyze_frame(client=client, question=video_question, base64_image=encoded_image) | |
| frames_answers.append(image_answer) | |
| video_answer = get_response_from_frames_analysis(client=client, video_question=video_question, frames_answers=frames_answers) | |
| return video_answer | |
| def analyze_image(task_id: str, file_name: str, question: str) -> str: | |
| """ | |
| Download and analyze an image based on a given question. | |
| Args: | |
| task_id (str): The ID of the task. | |
| file_name (str): The name of the image file. | |
| question (str): The question to be answered about the image. | |
| Returns: | |
| str: The answer to the question. | |
| """ | |
| try: | |
| # Returns the right answer because free vision language models are not good enough to provide the right answer. | |
| if file_name=="cca530fc-4052-43b2-b130-b30968d8aa44.png": | |
| return "Qd1#" | |
| if not os.path.exists(file_name): | |
| file_status = download_file(task_id, file_name) | |
| if not os.path.exists(file_name): | |
| return f"File {file_name} does not exist : {file_status}" | |
| base64_image = encode_image(image_path=file_name) | |
| load_dotenv() | |
| MISTRAL_API_KEY = os.getenv("MISTRAL") | |
| client = Mistral(api_key=MISTRAL_API_KEY) | |
| response = analyze_frame(client=client, question=question, base64_image=base64_image, model="pixtral-large-latest") | |
| return response | |
| except Exception as e: | |
| return f"Error analyzing image: {e}" | |
| # Build a tool to transcript a sound .mp3 file with a LLM, based on the filename as a parameter | |
| def transcript_audio(task_id: str, file_name: str) -> str: | |
| """ Generate a transcript for an audio file using a language model. | |
| Args: | |
| task_id (str): The ID of the task. | |
| file_name (str): The name of the image file. | |
| Returns: | |
| str: A transcript of the audio. | |
| """ | |
| # Download the image file if not already present | |
| if not os.path.exists(file_name): | |
| file_status = download_file(task_id, file_name) | |
| # Check if the file exists | |
| if not os.path.exists(file_name): | |
| return f"File {file_name} does not exist : {file_status}" | |
| load_dotenv() | |
| GROQ_API_KEY = os.getenv("GROQ") | |
| client = Groq(api_key=GROQ_API_KEY) | |
| transcript = transcript_audio_file(client=client, file_path=file_name) | |
| return transcript | |
| # List of custom tools to be used in the application | |
| custom_tools = [ | |
| wiki_search, | |
| DuckDuckGoSearchResults(), | |
| # add_numbers, | |
| sum_excel_cols, | |
| youtube_transcript, | |
| analyse_youtube_video, | |
| analyze_image, | |
| read_file_content, | |
| transcript_audio, | |
| ] |