Spaces:
Build error
Build error
| import io | |
| import os | |
| import tempfile | |
| from typing import Optional | |
| from urllib.parse import urlparse | |
| import uuid | |
| import pandas as pd | |
| import contextlib | |
| from langchain_core.tools import tool | |
| import requests | |
| from PIL import Image | |
| import pytesseract | |
| from transformers import pipeline | |
| def analyze_excel_file(file_path: str, query: str) -> str: | |
| """ | |
| Analyze an Excel file using pandas and answer a question about it. | |
| Args: | |
| file_path (str): the path to the Excel file. | |
| query (str): Question about the data | |
| """ | |
| try: | |
| # Read the Excel file | |
| df = pd.read_excel(file_path) | |
| # Run various analyses based on the query | |
| result = ( | |
| f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| ) | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| # Add summary statistics | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing Excel file: {str(e)}" | |
| # Load ASR pipeline once at module level (for efficiency) | |
| asr_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=-1) | |
| def transcribe_audio(file_path: str, query: str = "") -> str: | |
| """ | |
| Transcribes speech from an audio file (e.g., .mp3 or .wav). | |
| Args: | |
| file_path (str): Path to the audio file. | |
| query (str): (Optional) Ignored; present to support LangChain tool schema. | |
| Returns: | |
| str: Transcribed text from the audio. | |
| """ | |
| try: | |
| print(f"Transcribing: {file_path}") | |
| result = asr_pipeline(file_path) | |
| transcript = result["text"] | |
| return transcript.strip() if transcript.strip() else "No speech detected." | |
| except Exception as e: | |
| return f"Error transcribing audio: {str(e)}" | |
| def execute_python_code(code: str) -> str: | |
| """ | |
| Executes a Python code string and returns the output or error. | |
| Args: | |
| code (str): The Python code to execute. | |
| Returns: | |
| str: The output or error message. | |
| """ | |
| local_vars = {} | |
| stdout = io.StringIO() | |
| try: | |
| with contextlib.redirect_stdout(stdout): | |
| exec(code, {}, local_vars) | |
| output = stdout.getvalue() | |
| if output.strip(): | |
| return output.strip() | |
| # If code defines a variable named 'result', return its value | |
| if "result" in local_vars: | |
| return str(local_vars["result"]) | |
| return "Code executed successfully, but produced no output." | |
| except Exception as e: | |
| return f"Error executing code: {e}" | |
| def save_and_read_file(content: str, filename: Optional[str] = None) -> str: | |
| """ | |
| Save content to a file and return the path. | |
| Args: | |
| content (str): the content to save to the file | |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. | |
| """ | |
| temp_dir = tempfile.gettempdir() | |
| if filename is None: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) | |
| filepath = temp_file.name | |
| else: | |
| filepath = os.path.join(temp_dir, filename) | |
| with open(filepath, "w") as f: | |
| f.write(content) | |
| return f"File saved to {filepath}. You can read this file to process its contents." | |
| def download_file_from_url(url: str, filename: Optional[str] = None) -> str: | |
| """ | |
| Download a file from a URL and save it to a temporary location. | |
| Args: | |
| url (str): the URL of the file to download. | |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. | |
| """ | |
| try: | |
| # Parse URL to get filename if not provided | |
| if not filename: | |
| path = urlparse(url).path | |
| filename = os.path.basename(path) | |
| if not filename: | |
| filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
| # Create temporary file | |
| temp_dir = tempfile.gettempdir() | |
| filepath = os.path.join(temp_dir, filename) | |
| # Download the file | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| # Save the file | |
| with open(filepath, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return f"File downloaded to {filepath}. You can read this file to process its contents." | |
| except Exception as e: | |
| return f"Error downloading file: {str(e)}" | |
| def extract_text_from_image(image_path: str) -> str: | |
| """ | |
| Extract text from an image using OCR library pytesseract (if available). | |
| Args: | |
| image_path (str): the path to the image file. | |
| """ | |
| try: | |
| # Open the image | |
| image = Image.open(image_path) | |
| # Extract text from the image | |
| text = pytesseract.image_to_string(image) | |
| return f"Extracted text from image:\n\n{text}" | |
| except Exception as e: | |
| return f"Error extracting text from image: {str(e)}" | |
| def analyze_csv_file(file_path: str, query: str) -> str: | |
| """ | |
| Analyze a CSV file using pandas and answer a question about it. | |
| Args: | |
| file_path (str): the path to the CSV file. | |
| query (str): Question about the data | |
| """ | |
| try: | |
| # Read the CSV file | |
| df = pd.read_csv(file_path) | |
| # Run various analyses based on the query | |
| result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| # Add summary statistics | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing CSV file: {str(e)}" |