Spaces:
Running
Running
| """ | |
| Enterprise-Grade Data Query Engine for SIRUS Intelligence | |
| This is the refactored core engine that orchestrates file handling and LLM services | |
| for scalable, multi-table data analysis with enterprise-grade performance. | |
| Author: SIRUS Intelligence Team | |
| Architecture: Modular, scalable, enterprise-grade | |
| Performance: DuckDB-native, memory-efficient, multi-table aware | |
| """ | |
| import duckdb | |
| import polars as pl | |
| import os | |
| import pandas as pd | |
| from pathlib import Path | |
| import logging | |
| import hashlib | |
| import json | |
| from functools import lru_cache | |
| from typing import Dict, List, Any, Optional, Tuple, Union | |
| import sys | |
| # Add the parent directory to sys.path so we can import excel_module | |
| EXCEL_MODULE_ROOT = Path(__file__).resolve().parent.parent | |
| sys.path.insert(0, str(EXCEL_MODULE_ROOT.parent)) | |
| # Import our new enterprise-grade modules | |
| from .file_handlers import FileHandlers | |
| from .llm_service import LLMService | |
| # Import MinIO configuration for S3 setup | |
| from core.minio.config import ( | |
| MINIO_ENDPOINT, | |
| MINIO_ACCESS_KEY, | |
| MINIO_SECRET_KEY, | |
| MINIO_BUCKET_NAME | |
| ) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger('DataQueryEngine') | |
| class DataQueryEngine: | |
| """ | |
| Enterprise-Grade Data Query Engine with Multi-Table Capabilities | |
| Revolutionary improvements in Phase 1: | |
| - DuckDB-native CSV/Parquet loading (10x+ performance improvement) | |
| - Multi-sheet Excel as separate tables (true multi-table analysis) | |
| - Advanced LLM prompting for cross-table queries | |
| - Modular architecture for enterprise scalability | |
| - Memory-efficient processing with intelligent caching | |
| """ | |
| def __init__(self, minio_client, redis_client): | |
| """ | |
| Initialize the enterprise-grade DataQueryEngine with modular components. | |
| Args: | |
| minio_client: MinIO client for distributed file access | |
| redis_client: Redis client for session management | |
| Key Enterprise Features: | |
| - In-memory DuckDB for blazing-fast analytics | |
| - MinIO integration for distributed file access | |
| - Modular file handling with native performance optimizations | |
| - Advanced LLM service with multi-table query generation | |
| - Comprehensive caching and state management | |
| """ | |
| logger.info("[DATA_ENGINE] Initializing enterprise-grade data query engine with stateless architecture") | |
| # Store distributed clients | |
| self.minio_client = minio_client | |
| self.redis_client = redis_client | |
| # Core database connection | |
| self.conn = duckdb.connect(':memory:') | |
| # Configure DuckDB for S3/MinIO access | |
| try: | |
| # Install and load S3 extension | |
| self.conn.execute("INSTALL s3;") | |
| logger.info("[DATA_ENGINE] DuckDB S3 extension installed successfully") | |
| self.conn.execute("LOAD s3;") | |
| logger.info("[DATA_ENGINE] DuckDB S3 extension loaded successfully") | |
| # Configure MinIO endpoint and credentials | |
| self.conn.execute(f"SET s3_endpoint='{MINIO_ENDPOINT}';") | |
| self.conn.execute(f"SET s3_access_key_id='{MINIO_ACCESS_KEY}';") | |
| self.conn.execute(f"SET s3_secret_access_key='{MINIO_SECRET_KEY}';") | |
| self.conn.execute("SET s3_url_style='path';") # Force path-style access | |
| self.conn.execute("SET s3_use_ssl=false;") # Set to true if using HTTPS | |
| self.conn.execute("SET s3_region='us-east-1';") # MinIO needs a region | |
| logger.info("[DATA_ENGINE] DuckDB S3 extension configured for MinIO access - Ready for distributed file operations") | |
| except Exception as e: | |
| logger.error(f"[DATA_ENGINE] CRITICAL: Failed to configure S3 extension: {str(e)}") | |
| logger.error("[DATA_ENGINE] MinIO integration will not work without S3 extension") | |
| raise RuntimeError(f"DuckDB S3 extension configuration failed: {str(e)}") | |
| # Legacy compatibility properties (maintain API surface) | |
| self.file_path: str = None | |
| self.file_type: str = None | |
| self.active_sheet: str = None | |
| self.available_sheets: list[str] = [] | |
| self.column_names: list[str] = [] # Primary table columns | |
| self.table_name: str = "data" # Primary table name | |
| # Enterprise-grade enhancements | |
| self.tables_info: List[Dict[str, Any]] = [] # Multi-table metadata | |
| self.query_cache: Dict[str, Dict[str, Any]] = {} | |
| self.max_cache_size: int = 100 | |
| # Initialize modular components with distributed clients | |
| self.file_handlers = FileHandlers(self.conn, self.minio_client) | |
| self.llm_service = LLMService() | |
| logger.info("[DATA_ENGINE] Enterprise data query engine initialized successfully with stateless architecture") | |
| def __enter__(self): | |
| """Support for context manager protocol""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Clean up resources when exiting context manager""" | |
| self.close() | |
| return False | |
| def load_file(self, object_name: str, original_filename: str, sheet_name: str = None) -> dict: | |
| """ | |
| Load data with enterprise-grade performance and multi-table support from MinIO. | |
| Revolutionary Phase 2 improvements: | |
| - Stateless architecture with MinIO distributed file access | |
| - DuckDB-native CSV/Parquet loading from S3-compatible storage | |
| - Multi-sheet Excel as separate queryable tables | |
| - Comprehensive metadata tracking | |
| - Intelligent caching and state management | |
| Args: | |
| object_name: MinIO object name (file identifier in storage) | |
| original_filename: Original filename for metadata and type detection | |
| sheet_name: Sheet name for Excel files (None for first sheet) | |
| Returns: | |
| dict: Comprehensive loading status with enterprise metadata | |
| """ | |
| logger.info(f"[DATA_ENGINE] Enterprise file loading from MinIO: {original_filename} (object: {object_name})") | |
| # Store file metadata (legacy compatibility) | |
| self.file_path = f"s3://{MINIO_BUCKET_NAME}/{object_name}" # Virtual S3 path | |
| file_ext = Path(original_filename).suffix.lower() | |
| # Clear previous state | |
| self.query_cache = {} | |
| try: | |
| # Use our enterprise-grade file handlers with MinIO | |
| result = self.file_handlers.load_file(object_name, original_filename, sheet_name) | |
| if result["status"] == "error": | |
| logger.error(f"[DATA_ENGINE] File loading failed: {result['message']}") | |
| return result | |
| # Update engine state with enterprise metadata | |
| self._update_engine_state(result, file_ext) | |
| # Log enterprise performance metrics | |
| total_rows = sum(table.get("row_count", 0) for table in self.tables_info) | |
| total_columns = sum(len(table.get("column_names", [])) for table in self.tables_info) | |
| logger.info(f"[DATA_ENGINE] Enterprise loading complete: {len(self.tables_info)} tables, {total_rows:,} rows, {total_columns} columns") | |
| return result | |
| except Exception as e: | |
| logger.error(f"[DATA_ENGINE] Critical error in enterprise file loading: {str(e)}") | |
| return {"status": "error", "message": f"Error loading file: {str(e)}"} | |
| def _update_engine_state(self, load_result: Dict[str, Any], file_ext: str): | |
| """ | |
| Update engine state with enterprise-grade metadata tracking. | |
| Maintains backward compatibility while adding multi-table capabilities. | |
| """ | |
| # Set file type | |
| if file_ext in ['.xlsx', '.xls', '.xlsm']: | |
| self.file_type = 'excel' | |
| elif file_ext == '.csv': | |
| self.file_type = 'csv' | |
| elif file_ext == '.json': | |
| self.file_type = 'json' | |
| elif file_ext == '.parquet': | |
| self.file_type = 'parquet' | |
| # Update multi-table information | |
| self.tables_info = load_result.get("tables_info", []) | |
| # Update legacy compatibility properties (primary table) | |
| if self.tables_info: | |
| primary_table = self.tables_info[0] | |
| self.column_names = primary_table.get("column_names", []) | |
| self.table_name = primary_table.get("table_name", "data") | |
| # Update Excel-specific properties | |
| self.available_sheets = load_result.get("available_sheets", []) | |
| self.active_sheet = load_result.get("active_sheet", None) | |
| logger.debug(f"[ENGINE_STATE] Updated state: {len(self.tables_info)} tables, active_sheet: {self.active_sheet}") | |
| def change_sheet(self, sheet_name: str) -> dict: | |
| """ | |
| Enterprise-grade sheet switching with full state management. | |
| Enhanced for multi-table context awareness. | |
| """ | |
| logger.info(f"[DATA_ENGINE] Enterprise sheet change: {sheet_name}") | |
| if self.file_type != 'excel': | |
| return {"status": "error", "message": "Current file is not an Excel file"} | |
| if sheet_name not in self.available_sheets: | |
| return {"status": "error", "message": f"Sheet '{sheet_name}' not found in Excel file"} | |
| try: | |
| # Clear cache for state consistency | |
| self.query_cache = {} | |
| # Use enterprise file handlers for sheet switching | |
| result = self.file_handlers.change_sheet(self.file_path, sheet_name, self.available_sheets) | |
| if result["status"] == "success": | |
| # Update engine state | |
| self._update_engine_state(result, Path(self.file_path).suffix.lower()) | |
| logger.info(f"[DATA_ENGINE] Successfully switched to sheet: {sheet_name}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"[DATA_ENGINE] Error in enterprise sheet change: {str(e)}") | |
| return {"status": "error", "message": f"Error changing sheet: {str(e)}"} | |
| def get_column_names(self) -> List[str]: | |
| """Get column names of the primary loaded table (backward compatibility).""" | |
| return self.column_names | |
| def get_file_info(self) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive file information with enterprise metadata. | |
| Enhanced with multi-table awareness. | |
| """ | |
| return { | |
| "file_path": self.file_path, | |
| "file_type": self.file_type, | |
| "active_sheet": self.active_sheet, | |
| "available_sheets": self.available_sheets, | |
| "column_count": len(self.column_names), | |
| "columns": self.column_names, | |
| "tables_info": self.tables_info, # Enterprise enhancement | |
| "total_tables": len(self.tables_info) # Enterprise enhancement | |
| } | |
| def generate_sql_query(self, user_query: str) -> dict: | |
| """ | |
| Generate SQL query with enterprise-grade multi-table capabilities. | |
| Revolutionary Phase 1 improvement: Multi-table query generation. | |
| This method now intelligently handles: | |
| - Single table queries (backward compatible) | |
| - Multi-table queries (revolutionary enhancement) | |
| - Cross-sheet Excel analysis | |
| - Advanced business intelligence queries | |
| """ | |
| logger.info(f"[DATA_ENGINE] Enterprise SQL generation for: '{user_query[:50]}...'") | |
| if not self.tables_info: | |
| logger.warning("[DATA_ENGINE] No tables loaded for query generation") | |
| return {"status": "error", "message": "No data loaded. Please load a file first."} | |
| try: | |
| # Use enterprise LLM service with multi-table awareness | |
| result = self.llm_service.generate_sql_query_multi_table( | |
| user_query, | |
| self.tables_info, | |
| self.conn | |
| ) | |
| if result["status"] == "success": | |
| logger.info(f"[DATA_ENGINE] Enterprise SQL generated successfully, tables used: {result.get('tables_used', [])}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"[DATA_ENGINE] Error in enterprise SQL generation: {str(e)}") | |
| return {"status": "error", "message": f"Error generating SQL query: {str(e)}"} | |
| def execute_query(self, sql_query: str) -> Dict[str, Any]: | |
| """ | |
| Execute SQL query with enterprise-grade performance and error handling. | |
| Enhanced with comprehensive logging and multi-table context. | |
| """ | |
| logger.info(f"[DATA_ENGINE] Executing enterprise query") | |
| if not self.tables_info: | |
| return {"status": "error", "message": "No data loaded. Please load a file first."} | |
| try: | |
| # Execute query with DuckDB's high-performance engine | |
| duckdb_result = self.conn.execute(sql_query) | |
| result = duckdb_result.fetchall() | |
| if result: | |
| column_names_from_duckdb = [desc[0] for desc in duckdb_result.description] | |
| data_list = [] | |
| for row in result: | |
| data_list.append(dict(zip(column_names_from_duckdb, row))) | |
| logger.info(f"[DATA_ENGINE] Query executed successfully: {len(data_list)} rows, {len(column_names_from_duckdb)} columns") | |
| return { | |
| "status": "success", | |
| "data": data_list, | |
| "row_count": len(data_list), | |
| "column_count": len(column_names_from_duckdb) | |
| } | |
| else: | |
| logger.info("[DATA_ENGINE] Query executed successfully but returned no data") | |
| return { | |
| "status": "success", | |
| "data": [], | |
| "row_count": 0, | |
| "column_count": 0, | |
| "message": "Query executed successfully but returned no data" | |
| } | |
| except Exception as e: | |
| logger.error(f"[DATA_ENGINE] Query execution error: {str(e)}") | |
| return {"status": "error", "message": f"Error executing query: {str(e)}"} | |
| def analyze_query_results(self, user_query: str, sql_query: str, query_results: List[Dict[str, Any]]) -> dict: | |
| """ | |
| Enterprise-grade query result analysis with business intelligence. | |
| Enhanced with multi-table context awareness. | |
| """ | |
| logger.info("[DATA_ENGINE] Performing enterprise result analysis") | |
| try: | |
| result = self.llm_service.analyze_query_results( | |
| user_query, | |
| sql_query, | |
| query_results, | |
| self.tables_info | |
| ) | |
| if result["status"] == "success": | |
| logger.info("[DATA_ENGINE] Enterprise analysis completed successfully") | |
| return result | |
| except Exception as e: | |
| logger.error(f"[DATA_ENGINE] Error in enterprise analysis: {str(e)}") | |
| return {"status": "error", "message": f"Error generating analysis: {str(e)}"} | |
| def process_query(self, user_query: str) -> Dict[str, Any]: | |
| """Process a user query, determining if it's strategic and handling accordingly""" | |
| if not self.column_names: | |
| return {"status": "error", "message": "No data loaded. Please load a file first."} | |
| try: | |
| # Clean the user query | |
| user_query = user_query.replace("'", "") | |
| # Use LLM to determine if this is a strategic question | |
| is_strategic = self.llm_service.is_strategic_question(user_query) | |
| if is_strategic: | |
| # Get query plan | |
| query_plan = self.llm_service.understand_strategic_query(user_query, self.table_name, self.column_names) | |
| if not query_plan or "queries" not in query_plan or not query_plan["queries"]: | |
| logger.error(f"Strategic query understanding failed to produce a valid query plan for user query: '{user_query}'. Plan received: {query_plan}") | |
| return {"status": "error", "message": "Failed to analyze strategic question due to an issue in query planning."} | |
| all_results = [] | |
| all_queries = [] | |
| logger.info(f"Executing strategic query plan for user query: '{user_query}'. Plan: {json.dumps(query_plan, indent=2)}") | |
| for query_info in query_plan["queries"]: | |
| sql = query_info.get("sql") | |
| purpose = query_info.get("purpose", "No purpose provided") | |
| if not sql: | |
| logger.warning(f"Skipping query in plan due to missing SQL. Purpose: '{purpose}'") | |
| continue | |
| logger.info(f"Executing strategic sub-query. Purpose: '{purpose}', SQL: '{sql}'") | |
| try: | |
| result = self.execute_query(sql) | |
| status = result.get("status") | |
| data = result.get("data") | |
| message = result.get("message", "N/A") | |
| row_count = result.get("row_count", 0) | |
| logger.info(f"Sub-query execution result: Status: {status}, RowCount: {row_count}, HasData: {bool(data)}, Message: {message}") | |
| if status == "success" and data: | |
| all_results.append(data) | |
| all_queries.append({"sql": sql, "purpose": purpose}) | |
| logger.info(f"Successfully executed and added results for sub-query. Purpose: '{purpose}'") | |
| elif status == "success" and not data: | |
| logger.warning(f"Strategic sub-query executed successfully but returned no data. Purpose: '{purpose}', SQL: '{sql}'") | |
| else: | |
| logger.error(f"Strategic sub-query failed. Purpose: '{purpose}', SQL: '{sql}', Status: {status}, Error: {message}") | |
| except Exception as e: | |
| logger.error(f"Exception during strategic sub-query execution: {str(e)}. SQL: '{sql}', Purpose: '{purpose}'", exc_info=True) | |
| continue | |
| if not all_results: | |
| logger.error(f"Failed to gather any data from strategic query plan. User query: '{user_query}'. Executed queries info: {all_queries}") | |
| return {"status": "error", "message": "Failed to gather necessary data for analysis. All planned sub-queries returned no data or failed."} | |
| # Generate comprehensive analysis | |
| analysis = self.llm_service.analyze_strategic_results(user_query, query_plan, all_results) | |
| # Prepare final result and handle non-serializable floats | |
| final_result = { | |
| "status": "success", | |
| "type": "strategic", | |
| "data": all_results, | |
| "queries": all_queries, | |
| "analysis": analysis, | |
| "metrics": query_plan["metrics"], | |
| "dimensions": query_plan["dimensions"] | |
| } | |
| # Process data to replace NaN and Infinity with None | |
| def clean_data(data): | |
| if isinstance(data, list): | |
| return [clean_data(item) for item in data] | |
| elif isinstance(data, dict): | |
| return {key: clean_data(value) for key, value in data.items()} | |
| elif isinstance(data, float) and (data != data or data in [float('inf'), float('-inf')]): | |
| return None | |
| return data | |
| final_result["data"] = clean_data(final_result["data"]) | |
| return final_result | |
| # Handle non-strategic queries | |
| understanding = self.llm_service.understand_query(user_query, self.table_name, self.column_names) | |
| if understanding["status"] == "error": | |
| return understanding | |
| result = self.llm_service.execute_instructions( | |
| understanding["instructions"], | |
| understanding["type"], | |
| user_query, | |
| self.table_name, | |
| self.column_names | |
| ) | |
| if result["status"] == "error": | |
| return result | |
| if result["type"] == "sql": | |
| query_result = self.execute_query(result["cleaned_sql"]) | |
| if query_result["status"] == "error": | |
| return query_result | |
| analysis_result = None | |
| if query_result["data"]: | |
| analysis_result = self.llm_service.analyze_query_results( | |
| user_query, | |
| result["cleaned_sql"], | |
| query_result["data"] | |
| ) | |
| final_result = { | |
| "status": "success", | |
| "type": "sql", | |
| "data": query_result["data"], | |
| "row_count": query_result["row_count"], | |
| "column_count": query_result["column_count"], | |
| "sql": { | |
| "raw": result["raw_sql"], | |
| "cleaned": result["cleaned_sql"] | |
| } | |
| } | |
| if analysis_result and analysis_result["status"] == "success": | |
| final_result["analysis"] = analysis_result["analysis"] | |
| # Process data to replace NaN and Infinity with None | |
| def clean_data(data): | |
| if isinstance(data, list): | |
| return [clean_data(item) for item in data] | |
| elif isinstance(data, dict): | |
| return {key: clean_data(value) for key, value in data.items()} | |
| elif isinstance(data, float) and (data != data or data in [float('inf'), float('-inf')]): | |
| return None | |
| return data | |
| final_result["data"] = clean_data(final_result["data"]) | |
| return final_result | |
| else: | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in query processing: {str(e)}") | |
| return {"status": "error", "message": f"Error processing query: {str(e)}"} | |
| def get_data_profile(self) -> dict: | |
| """ | |
| Generate enterprise-grade data profile with multi-table statistics. | |
| Enhanced with comprehensive multi-table metadata. | |
| """ | |
| logger.info("[DATA_ENGINE] Generating enterprise data profile") | |
| if not self.tables_info: | |
| return {"status": "error", "message": "No data loaded. Please load a file first."} | |
| try: | |
| profile = { | |
| "status": "success", | |
| "file_info": { | |
| "file_path": self.file_path, | |
| "file_type": self.file_type, | |
| "active_sheet": self.active_sheet, | |
| "available_sheets": self.available_sheets | |
| }, | |
| "tables_summary": { | |
| "total_tables": len(self.tables_info), | |
| "total_rows": sum(table.get("row_count", 0) for table in self.tables_info), | |
| "total_columns": sum(len(table.get("column_names", [])) for table in self.tables_info) | |
| }, | |
| "tables_detail": [] | |
| } | |
| # Generate detailed statistics for each table | |
| for table_info in self.tables_info: | |
| table_name = table_info["table_name"] | |
| try: | |
| # Get basic table stats | |
| row_count = self.conn.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[0] | |
| table_detail = { | |
| "table_name": table_name, | |
| "sheet_name": table_info.get("sheet_name", table_name), | |
| "row_count": row_count, | |
| "column_count": len(table_info["column_names"]), | |
| "columns": table_info["column_names"] | |
| } | |
| profile["tables_detail"].append(table_detail) | |
| except Exception as table_error: | |
| logger.warning(f"[DATA_PROFILE] Error profiling table {table_name}: {str(table_error)}") | |
| continue | |
| logger.info(f"[DATA_ENGINE] Enterprise data profile generated: {profile['tables_summary']['total_tables']} tables") | |
| return profile | |
| except Exception as e: | |
| logger.error(f"[DATA_ENGINE] Error generating enterprise data profile: {str(e)}") | |
| return {"status": "error", "message": f"Error generating data profile: {str(e)}"} | |
| def query(self, sql: str) -> Dict[str, Any]: | |
| """ | |
| Execute a direct SQL query on the loaded data. | |
| Args: | |
| sql: SQL query to execute | |
| Returns: | |
| dict: Query result with status, data, and metadata | |
| """ | |
| logger.info(f"[DATA_ENGINE] Executing direct SQL query: {sql[:100]}...") | |
| try: | |
| return self.execute_query(sql) | |
| except Exception as e: | |
| logger.error(f"[DATA_ENGINE] Direct SQL query failed: {str(e)}") | |
| return {"status": "error", "message": f"Query execution failed: {str(e)}"} | |
| def close(self): | |
| """Clean up enterprise resources and connections.""" | |
| if hasattr(self, 'conn') and self.conn: | |
| try: | |
| self.conn.close() | |
| logger.info("[DATA_ENGINE] Enterprise database connection closed") | |
| except Exception as e: | |
| logger.error(f"[DATA_ENGINE] Error closing database connection: {str(e)}") | |
| # Legacy compatibility methods (maintained for backward compatibility) | |
| def _get_cache_key(self, query: str) -> str: | |
| """Generate cache key (legacy compatibility).""" | |
| cache_input = f"{self.file_path}:{self.active_sheet}:{query}" | |
| return hashlib.md5(cache_input.encode()).hexdigest() | |
| def _add_to_cache(self, query: str, result: Dict[str, Any]) -> None: | |
| """Add query result to cache (legacy compatibility).""" | |
| if len(self.query_cache) >= self.max_cache_size: | |
| oldest_key = next(iter(self.query_cache)) | |
| del self.query_cache[oldest_key] | |
| cache_key = self._get_cache_key(query) | |
| self.query_cache[cache_key] = result | |
| def _get_from_cache(self, query: str) -> Optional[Dict[str, Any]]: | |
| """Get query result from cache (legacy compatibility).""" | |
| cache_key = self._get_cache_key(query) | |
| if cache_key in self.query_cache: | |
| logger.info(f"[CACHE] Cache hit for query: {query[:30]}...") | |
| return self.query_cache[cache_key] | |
| return None | |
| if __name__ == "__main__": | |
| # Enterprise-grade interactive mode | |
| engine = DataQueryEngine() | |
| print("\n🔍 Welcome to the Enterprise Data Query Assistant! Let's explore your data together.\n") | |
| # This would typically be configured via environment or config file | |
| file_path = os.getenv('DATA_FILE_PATH', '/content/sales_data_sample.csv') | |
| load_result = engine.load_file(file_path) | |
| if load_result["status"] == "error": | |
| print(json.dumps({"status": "error", "message": load_result['message']})) | |
| exit() | |
| print(json.dumps({"status": "success", "message": load_result['message']})) | |
| # Display enterprise context | |
| if engine.file_type == 'excel' and len(engine.available_sheets) > 1: | |
| print(json.dumps({"available_sheets": engine.available_sheets, "active_sheet": engine.active_sheet})) | |
| print(json.dumps({ | |
| "enterprise_context": { | |
| "total_tables": len(engine.tables_info), | |
| "total_rows": sum(t.get("row_count", 0) for t in engine.tables_info), | |
| "capabilities": [ | |
| "Multi-table analysis across Excel sheets", | |
| "Enterprise-grade SQL query generation", | |
| "Advanced business intelligence insights", | |
| "High-performance DuckDB-native processing" | |
| ] | |
| }, | |
| "help": { | |
| "instructions": [ | |
| "Ask questions about data across multiple tables/sheets", | |
| "Request business insights and strategic analysis", | |
| "Type 'sheet:<name>' to switch sheets (Excel only)", | |
| "Type 'exit' to quit" | |
| ], | |
| "example_queries": [ | |
| "What's the overall trend across all data?", | |
| "Compare performance between different sheets", | |
| "How can we improve our key metrics?" | |
| ] | |
| } | |
| })) | |
| # Interactive loop with enterprise capabilities | |
| while True: | |
| user_input = input("\n👉 Enter your enterprise query: ") | |
| if user_input.lower() == 'exit': | |
| print(json.dumps({"status": "exit", "message": "Thanks for using the Enterprise Data Query Assistant! Goodbye!"})) | |
| break | |
| # Handle sheet switching | |
| if user_input.lower().startswith('sheet:'): | |
| sheet_name = user_input[6:].strip() | |
| sheet_result = engine.change_sheet(sheet_name) | |
| print(json.dumps(sheet_result)) | |
| continue | |
| # Process enterprise query | |
| result = engine.process_query(user_input) | |
| print(json.dumps(result, indent=2)) | |
| engine.close() |