Spaces:
Runtime error
Runtime error
Yago Bolivar
Refactor speech_to_text.py to implement a singleton ASR pipeline, enhance error handling, and introduce SpeechToTextTool for better integration. Update spreadsheet_tool.py to support querying and improve parsing functionality, including CSV support. Enhance video_processing_tool.py with new tasks for metadata extraction and frame extraction, while improving object detection capabilities and initialization checks.
87aa741
| import os | |
| import pandas as pd | |
| from typing import Dict, List, Union, Tuple, Any, Optional | |
| import numpy as np | |
| from smolagents.tools import Tool | |
| class SpreadsheetTool(Tool): | |
| """ | |
| Parses spreadsheet files (e.g., .xlsx) and extracts tabular data for analysis or allows querying. | |
| Useful for reading, processing, and converting spreadsheet content to Python data structures. | |
| """ | |
| name = "spreadsheet_processor" | |
| description = "Parses a spreadsheet file (e.g., .xlsx, .xls, .csv) and can perform queries. Returns extracted data or query results." | |
| inputs = { | |
| 'file_path': {'type': 'string', 'description': 'Path to the spreadsheet file.'}, | |
| 'query_instructions': {'type': 'string', 'description': 'Optional. Instructions for querying the data (e.g., "Sum column A"). If None, parses the whole sheet.', 'nullable': True} | |
| } | |
| outputs = {'result': {'type': 'object', 'description': 'A dictionary containing parsed sheet data, query results, or an error message.'}} | |
| output_type = "object" | |
| def __init__(self, *args, **kwargs): | |
| """Initialize the SpreadsheetTool.""" | |
| super().__init__(*args, **kwargs) | |
| self.is_initialized = True | |
| # Main entry point for the agent | |
| def forward(self, file_path: str, query_instructions: Optional[str] = None) -> Dict[str, Any]: | |
| if not os.path.exists(file_path): | |
| return {"error": f"File not found: {file_path}"} | |
| # Determine file type for appropriate parsing | |
| _, file_extension = os.path.splitext(file_path) | |
| file_extension = file_extension.lower() | |
| parsed_data = None | |
| if file_extension in ['.xlsx', '.xls']: | |
| parsed_data = self._parse_excel(file_path) | |
| elif file_extension == '.csv': | |
| parsed_data = self._parse_csv(file_path) | |
| else: | |
| return {"error": f"Unsupported file type: {file_extension}. Supported types: .xlsx, .xls, .csv"} | |
| if parsed_data.get("error"): | |
| return parsed_data # Return error from parsing step | |
| if query_instructions: | |
| return self._query_data(parsed_data, query_instructions) | |
| else: | |
| # If no query, return the parsed data and summary | |
| return { | |
| "parsed_sheets": parsed_data.get("sheets"), | |
| "summary": parsed_data.get("summary"), | |
| "message": "Spreadsheet parsed successfully." | |
| } | |
| def _parse_excel(self, file_path: str) -> Dict[str, Any]: | |
| """Parse an Excel spreadsheet and extract useful information.""" | |
| try: | |
| excel_file = pd.ExcelFile(file_path) | |
| sheet_names = excel_file.sheet_names | |
| sheets = {} | |
| for sheet_name in sheet_names: | |
| sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name) | |
| summary = self._create_summary(sheets) | |
| return {"sheets": sheets, "sheet_names": sheet_names, "summary": summary, "error": None} | |
| except Exception as e: | |
| return {"error": f"Error parsing Excel spreadsheet: {str(e)}"} | |
| def _parse_csv(self, file_path: str) -> Dict[str, Any]: | |
| """Parse a CSV file.""" | |
| try: | |
| df = pd.read_csv(file_path) | |
| # CSVs don't have multiple sheets, so we adapt the structure | |
| sheet_name = os.path.splitext(os.path.basename(file_path))[0] | |
| sheets = {sheet_name: df} | |
| summary = self._create_summary(sheets) | |
| return {"sheets": sheets, "sheet_names": [sheet_name], "summary": summary, "error": None} | |
| except Exception as e: | |
| return {"error": f"Error parsing CSV file: {str(e)}"} | |
| def _create_summary(self, sheets_dict: Dict[str, pd.DataFrame]) -> Dict[str, Any]: | |
| """Create a summary of the spreadsheet contents.""" | |
| summary = {} | |
| for sheet_name, df in sheets_dict.items(): | |
| summary[sheet_name] = { | |
| "shape": df.shape, | |
| "columns": df.columns.tolist(), | |
| "numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(), | |
| "text_columns": df.select_dtypes(include=['object']).columns.tolist(), | |
| "has_nulls": df.isnull().any().any(), | |
| "first_few_rows": df.head(3).to_dict('records') | |
| } | |
| return summary | |
| # Renamed from query_data to _query_data and adjusted arguments | |
| def _query_data(self, parsed_data_dict: Dict[str, Any], query_instructions: str) -> Dict[str, Any]: | |
| """ | |
| Execute a query on the spreadsheet data based on instructions. | |
| This is a simplified placeholder. Real implementation would need robust query parsing. | |
| """ | |
| if parsed_data_dict.get("error"): | |
| return {"error": parsed_data_dict["error"]} | |
| sheets = parsed_data_dict.get("sheets") | |
| if not sheets: | |
| return {"error": "No sheets data available for querying."} | |
| # Placeholder for actual query logic. | |
| # This would involve parsing `query_instructions` (e.g., using regex, NLP, or a DSL) | |
| # and applying pandas operations. | |
| # For now, let's return a message indicating the query was received and basic info. | |
| results = {} | |
| explanation = f"Query instruction received: '{query_instructions}'. Advanced query execution is not fully implemented. " \ | |
| f"Returning summary of available sheets: {list(sheets.keys())}." | |
| # Example: if query asks for sum, try to sum first numeric column of first sheet | |
| if "sum" in query_instructions.lower(): | |
| first_sheet_name = next(iter(sheets)) | |
| df = sheets[first_sheet_name] | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if not numeric_cols.empty: | |
| col_to_sum = numeric_cols[0] | |
| try: | |
| total_sum = df[col_to_sum].sum() | |
| results[f'{first_sheet_name}_{col_to_sum}_sum'] = total_sum | |
| explanation += f" Example sum of column '{col_to_sum}' in sheet '{first_sheet_name}': {total_sum}." | |
| except Exception as e: | |
| explanation += f" Could not perform example sum: {e}." | |
| else: | |
| explanation += " No numeric columns found for example sum." | |
| return {"query_results": results, "explanation": explanation, "original_query": query_instructions} | |
| # Example usage (for direct testing) | |
| if __name__ == '__main__': | |
| tool = SpreadsheetTool() | |
| # Create dummy files for testing | |
| dummy_excel_file = "dummy_test.xlsx" | |
| dummy_csv_file = "dummy_test.csv" | |
| # Create a dummy Excel file | |
| df_excel = pd.DataFrame({ | |
| 'colA': [1, 2, 3, 4, 5], | |
| 'colB': ['apple', 'banana', 'cherry', 'date', 'elderberry'], | |
| 'colC': [10.1, 20.2, 30.3, 40.4, 50.5] | |
| }) | |
| with pd.ExcelWriter(dummy_excel_file) as writer: | |
| df_excel.to_excel(writer, sheet_name='Sheet1', index=False) | |
| df_excel.head(2).to_excel(writer, sheet_name='Sheet2', index=False) | |
| # Create a dummy CSV file | |
| df_csv = pd.DataFrame({ | |
| 'id': [101, 102, 103], | |
| 'product': ['widget', 'gadget', 'gizmo'], | |
| 'price': [19.99, 29.50, 15.00] | |
| }) | |
| df_csv.to_csv(dummy_csv_file, index=False) | |
| print("--- Test 1: Parse Excel file (no query) ---") | |
| result1 = tool.forward(file_path=dummy_excel_file) | |
| print(result1) | |
| assert "error" not in result1 or result1["error"] is None | |
| assert "Sheet1" in result1["parsed_sheets"] | |
| print("\n--- Test 2: Parse CSV file (no query) ---") | |
| result2 = tool.forward(file_path=dummy_csv_file) | |
| print(result2) | |
| assert "error" not in result2 or result2["error"] is None | |
| assert dummy_csv_file.split('.')[0] in result2["parsed_sheets"] | |
| print("\n--- Test 3: Query Excel file (simple sum example) ---") | |
| result3 = tool.forward(file_path=dummy_excel_file, query_instructions="sum colA from Sheet1") | |
| print(result3) | |
| assert "error" not in result3 or result3["error"] is None | |
| assert "query_results" in result3 | |
| if result3.get("query_results"): | |
| assert "Sheet1_colA_sum" in result3["query_results"] | |
| assert result3["query_results"]["Sheet1_colA_sum"] == 15 | |
| print("\n--- Test 4: File not found ---") | |
| result4 = tool.forward(file_path="non_existent_file.xlsx") | |
| print(result4) | |
| assert result4["error"] is not None | |
| assert "File not found" in result4["error"] | |
| print("\n--- Test 5: Unsupported file type ---") | |
| dummy_txt_file = "dummy_test.txt" | |
| with open(dummy_txt_file, "w") as f: | |
| f.write("hello") | |
| result5 = tool.forward(file_path=dummy_txt_file) | |
| print(result5) | |
| assert result5["error"] is not None | |
| assert "Unsupported file type" in result5["error"] | |
| os.remove(dummy_txt_file) | |
| # Clean up dummy files | |
| if os.path.exists(dummy_excel_file): | |
| os.remove(dummy_excel_file) | |
| if os.path.exists(dummy_csv_file): | |
| os.remove(dummy_csv_file) | |
| print("\nSpreadsheetTool tests completed.") |