Spaces:
Sleeping
Sleeping
Luigi D'Addona
il tool get_youtube_transcript() ora restituisce un dict con i campi "transcript" e "metadata"
ffe7776
| import os, sys | |
| from dotenv import load_dotenv | |
| import requests | |
| import pandas as pd | |
| import base64 | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| from langchain_community.utilities import WikipediaAPIWrapper | |
| from langchain_community.tools import WikipediaQueryRun | |
| from langchain_community.document_loaders import WikipediaLoader | |
| import wikipedia | |
| from langchain_tavily import TavilySearch | |
| from langchain_community.document_loaders import ArxivLoader | |
| from langchain_community.document_loaders import YoutubeLoader | |
| from langchain_core.tools import tool | |
| from langchain.tools import Tool | |
| from langchain_core.messages import HumanMessage | |
| # per gestire esecuzione di codice python | |
| import subprocess | |
| DATASET_API_URL = 'https://agents-course-unit4-scoring.hf.space' | |
| load_dotenv() | |
| WIKIPEDIA_TOP_K_RESULTS = int(os.environ.get("WIKIPEDIA_TOP_K_RESULTS")) | |
| WIKIPEDIA_DOC_CONTENT_CHARS_MAX = int(os.environ.get("WIKIPEDIA_DOC_CONTENT_CHARS_MAX")) | |
| def get_search_tool(): | |
| search_tool = DuckDuckGoSearchRun() | |
| return search_tool | |
| def get_tavily_search_tool(): | |
| tavily_search_tool = TavilySearch( | |
| max_results=3, | |
| topic="general", | |
| # include_answer=False, | |
| # include_raw_content=False, | |
| # include_images=False, | |
| # include_image_descriptions=False, | |
| # search_depth="basic", | |
| # time_range="day", | |
| # include_domains=None, | |
| # exclude_domains=None | |
| ) | |
| return tavily_search_tool | |
| # Wikipedia tool 1: usa WikipediaQueryRun dal package 'langchain_community.tools' | |
| # problema: sembra ottenere solo i summary | |
| def get_wikipedia_tool(): | |
| #print("WIKIPEDIA_TOP_K_RESULTS:{}, WIKIPEDIA_DOC_CONTENT_CHARS_MAX:{}".format(WIKIPEDIA_TOP_K_RESULTS, WIKIPEDIA_DOC_CONTENT_CHARS_MAX)) | |
| # creates an instance of the Wikipedia API wrapper. top_k_results=1 means it will only fetch the top result from Wikipedia | |
| wikipedia_api_wrapper = WikipediaAPIWrapper(top_k_results=WIKIPEDIA_TOP_K_RESULTS, doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX) | |
| # converts the WikipediaAPIWrapper into a LangChain tool. | |
| wikipedia_tool = WikipediaQueryRun(api_wrapper=wikipedia_api_wrapper) | |
| return wikipedia_tool | |
| # Wikipedia tool 2: utilizza direttamente il package 'wikipedia' | |
| def wikipedia_search(query: str) -> str: | |
| """ | |
| Search Wikipedia and return the full content of the most relevant article. | |
| """ | |
| try: | |
| results = wikipedia.search(query) | |
| if not results: | |
| return f"No results found for '{query}'." | |
| page = wikipedia.page(results[0]) | |
| content = page.content | |
| # Truncate content if it's too long | |
| if len(content) > WIKIPEDIA_DOC_CONTENT_CHARS_MAX: | |
| content = content[:WIKIPEDIA_DOC_CONTENT_CHARS_MAX] + "..." | |
| return content | |
| except wikipedia.exceptions.DisambiguationError as e: | |
| return f"Ambiguous query. Possible options: {', '.join(e.options[:5])}..." | |
| except wikipedia.exceptions.PageError: | |
| return f"Page not found for '{query}'." | |
| except Exception as e: | |
| return f"Error occurred: {str(e)}" | |
| # Wikipedia tool 3: utilizza WikipediaLoader dla package 'langchain_community.document_loaders' | |
| def wikipedia_search_3(query: str) -> str: | |
| """ | |
| Search Wikipedia and return the full content of the most relevant articles. | |
| Args: | |
| query: The search query. | |
| """ | |
| search_docs = WikipediaLoader(query=query, | |
| load_max_docs=WIKIPEDIA_TOP_K_RESULTS, | |
| doc_content_chars_max=WIKIPEDIA_DOC_CONTENT_CHARS_MAX, | |
| load_all_available_meta=True).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"wiki_results": formatted_search_docs} | |
| def execute_python_code_from_file(file_path: str) -> str: | |
| """ | |
| Reads a Python file from the given path, executes its code, and returns the combined stdout and stderr. | |
| WARNING: Executing arbitrary code from files is a significant security risk. | |
| Only use this tool with trusted code in a controlled environment. | |
| """ | |
| if not os.path.exists(file_path): | |
| return f"Error: File not found at '{file_path}'." | |
| if not file_path.endswith(".py"): | |
| return f"Error: Provided file '{file_path}' is not a Python (.py) file." | |
| try: | |
| # Use subprocess to run the Python file in a new process. | |
| # This provides some isolation compared to 'exec()' but is still dangerous for untrusted code. | |
| result = subprocess.run( | |
| [sys.executable, file_path], # sys.executable ensures it uses the current Python interpreter | |
| capture_output=True, # Capture stdout and stderr | |
| text=True, # Capture output as text (strings) | |
| check=False # Do not raise an exception for non-zero exit codes (handle errors manually) | |
| ) | |
| stdout_output = result.stdout.strip() | |
| stderr_output = result.stderr.strip() | |
| output_lines = [] | |
| if stdout_output: | |
| output_lines.append(f"STDOUT:\n{stdout_output}") | |
| if stderr_output: | |
| output_lines.append(f"STDERR:\n{stderr_output}") | |
| if result.returncode != 0: | |
| output_lines.append(f"Process exited with code {result.returncode}. This usually indicates an error.") | |
| if not output_lines: | |
| return "Execution completed with no output." | |
| return "\n".join(output_lines) | |
| except Exception as e: | |
| return f"An unexpected error occurred during code execution: {e}" | |
| def download_taskid_file(task_id: str, file_name: str) -> str: | |
| """ | |
| Downloads the file associated with the given task_id (if any). Returns the absolute path of the file | |
| """ | |
| try: | |
| response = requests.get(f"{DATASET_API_URL}/files/{task_id}", timeout=20) | |
| response.raise_for_status() | |
| with open(file_name, 'wb') as file: | |
| file.write(response.content) | |
| return os.path.abspath(file_name) | |
| except Exception as e: | |
| return "Error occurred: {}".format(e) | |
| def analyze_excel_file(file_path: str, query: str) -> str: | |
| """ | |
| Analyzes an Excel (.xlsx) file using pandas. | |
| Loads the specified Excel file into a pandas DataFrame and executes a Python query against it. | |
| The query should be a valid pandas DataFrame operation (e.g., df.head(), df.describe(), | |
| df[df['column_name'] > 10], df.groupby('category')['value'].mean()). | |
| Returns the result of the query as a string (JSON or string representation). | |
| """ | |
| if not os.path.exists(file_path): | |
| return f"Error: File not found at {file_path}" | |
| try: | |
| df = pd.read_excel(file_path) | |
| # Make the DataFrame accessible for the query | |
| local_vars = {"df": df} | |
| # Execute the query | |
| # IMPORTANT: Be extremely cautious with eval/exec for user-provided input in a production system. | |
| # For a ReAct agent, the LLM generates this query, so it's generally safer | |
| # if the LLM is well-constrained and reliable. | |
| # For sensitive applications, consider a safer parsing mechanism or a restricted set of operations. | |
| result = eval(query, {}, local_vars) | |
| return str(result) # Convert result to string for the LLM | |
| except Exception as e: | |
| return f"Error analyzing Excel file: {e}" | |
| def get_analyze_mp3_tool(llm): | |
| def analyze_mp3_file(audio_path: str) -> str: | |
| """ | |
| Extract text from an mp3 audio file. | |
| """ | |
| all_text = "" | |
| try: | |
| # Read audio and encode as base64 | |
| with open(audio_path, "rb") as audio_file: | |
| audio_bytes = audio_file.read() | |
| audio_base64 = base64.b64encode(audio_bytes).decode("utf-8") | |
| # Determine the MIME type for MP3 | |
| audio_mime_type = "audio/mpeg" # Or "audio/mp3", "audio/wav" etc. for other formats | |
| # Prepare the prompt including the base64 image data | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": ( | |
| "Extract all the text from this audio. " | |
| "Return only the extracted text, no explanations." | |
| ), | |
| }, | |
| { | |
| "type": "media", # <--- CORRECTED: Use 'media' type | |
| "data": audio_base64, # <--- Use 'data' for the base64 content | |
| "mime_type": audio_mime_type, # <--- Specify the MIME type | |
| } | |
| ] | |
| ) | |
| ] | |
| # Call the vision-capable model | |
| response = llm.invoke(message) | |
| # Append extracted text | |
| all_text += response.content + "\n\n" | |
| return all_text.strip() | |
| except Exception as e: | |
| print("Error extracting text from audio file:{} - {}".format(audio_path, e)) | |
| return "" | |
| return analyze_mp3_file | |
| def get_analyze_image_tool(llm): | |
| def analyze_png_image(image_path: str) -> str: | |
| """ | |
| Analyzes a PNG image and returns a detailed description of its content. | |
| This tool requires an LLM capable of processing images, such as Gemini 1.5 Pro or Gemini 2.0 Flash. | |
| """ | |
| try: | |
| # Read image and encode as base64 | |
| with open(image_path, "rb") as image_file: | |
| image_bytes = image_file.read() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| # Prepare the prompt including the base64 image data | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": ( | |
| "Provide a very detailed description of the content of this image. " | |
| "Focus on objects, people, actions, text, and overall scene context. " | |
| "Be as comprehensive as possible." | |
| ), | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/png;base64,{image_base64}"}, | |
| }, | |
| ] | |
| ) | |
| ] | |
| # Call the vision-capable model | |
| response = llm.invoke(message) | |
| return response.content.strip() | |
| except Exception as e: | |
| print("Error analyzing image file:{} - {}".format(image_path, e)) | |
| return "" | |
| return analyze_png_image | |
| def arxiv_search(query: str) -> str: | |
| """Search Arxiv for a query and return maximum 3 result. | |
| Args: | |
| query: The search query.""" | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return {"arxiv_results": formatted_search_docs} | |
| def get_youtube_transcript(url: str) -> dict: | |
| """Fetches the transcript from a YouTube video URL. | |
| Args: | |
| url: The URL of the YouTube video. | |
| Returns: | |
| A dictionary containing the transcript and metadata. | |
| The dictionary will have keys "transcript" (string, the video transcript or an error message) and "metadata" (dictionary, containing video title and other information, if available, otherwise empty). | |
| """ | |
| try: | |
| loader = YoutubeLoader.from_youtube_url(url, add_video_info=True) | |
| docs = loader.load() | |
| # Combine all transcript chunks into a single string | |
| transcript = "\n".join(doc.page_content for doc in docs) | |
| metadata = docs[0].metadata if docs else {} | |
| return {"transcript": transcript, "metadata": metadata} | |
| except Exception as e: | |
| if "Could not retrieve transcript" in str(e): | |
| return {"transcript": "No transcript available for this video.", "metadata": {}} | |
| else: | |
| return {"transcript": f"Error fetching transcript: {e}", "metadata": {}} |