sirus / backend /excel_module /agents /data_engine.py
ranilmukesh's picture
Deploy SiRUS SQL Agent backend
0535980
"""
Enterprise-Grade Data Query Engine for SIRUS Intelligence
This is the refactored core engine that orchestrates file handling and LLM services
for scalable, multi-table data analysis with enterprise-grade performance.
Author: SIRUS Intelligence Team
Architecture: Modular, scalable, enterprise-grade
Performance: DuckDB-native, memory-efficient, multi-table aware
"""
import duckdb
import polars as pl
import os
import pandas as pd
from pathlib import Path
import logging
import hashlib
import json
from functools import lru_cache
from typing import Dict, List, Any, Optional, Tuple, Union
import sys
# Add the parent directory to sys.path so we can import excel_module
EXCEL_MODULE_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(EXCEL_MODULE_ROOT.parent))
# Import our new enterprise-grade modules
from .file_handlers import FileHandlers
from .llm_service import LLMService
# Import MinIO configuration for S3 setup
from core.minio.config import (
MINIO_ENDPOINT,
MINIO_ACCESS_KEY,
MINIO_SECRET_KEY,
MINIO_BUCKET_NAME
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('DataQueryEngine')
class DataQueryEngine:
"""
Enterprise-Grade Data Query Engine with Multi-Table Capabilities
Revolutionary improvements in Phase 1:
- DuckDB-native CSV/Parquet loading (10x+ performance improvement)
- Multi-sheet Excel as separate tables (true multi-table analysis)
- Advanced LLM prompting for cross-table queries
- Modular architecture for enterprise scalability
- Memory-efficient processing with intelligent caching
"""
def __init__(self, minio_client, redis_client):
"""
Initialize the enterprise-grade DataQueryEngine with modular components.
Args:
minio_client: MinIO client for distributed file access
redis_client: Redis client for session management
Key Enterprise Features:
- In-memory DuckDB for blazing-fast analytics
- MinIO integration for distributed file access
- Modular file handling with native performance optimizations
- Advanced LLM service with multi-table query generation
- Comprehensive caching and state management
"""
logger.info("[DATA_ENGINE] Initializing enterprise-grade data query engine with stateless architecture")
# Store distributed clients
self.minio_client = minio_client
self.redis_client = redis_client
# Core database connection
self.conn = duckdb.connect(':memory:')
# Configure DuckDB for S3/MinIO access
try:
# Install and load S3 extension
self.conn.execute("INSTALL s3;")
logger.info("[DATA_ENGINE] DuckDB S3 extension installed successfully")
self.conn.execute("LOAD s3;")
logger.info("[DATA_ENGINE] DuckDB S3 extension loaded successfully")
# Configure MinIO endpoint and credentials
self.conn.execute(f"SET s3_endpoint='{MINIO_ENDPOINT}';")
self.conn.execute(f"SET s3_access_key_id='{MINIO_ACCESS_KEY}';")
self.conn.execute(f"SET s3_secret_access_key='{MINIO_SECRET_KEY}';")
self.conn.execute("SET s3_url_style='path';") # Force path-style access
self.conn.execute("SET s3_use_ssl=false;") # Set to true if using HTTPS
self.conn.execute("SET s3_region='us-east-1';") # MinIO needs a region
logger.info("[DATA_ENGINE] DuckDB S3 extension configured for MinIO access - Ready for distributed file operations")
except Exception as e:
logger.error(f"[DATA_ENGINE] CRITICAL: Failed to configure S3 extension: {str(e)}")
logger.error("[DATA_ENGINE] MinIO integration will not work without S3 extension")
raise RuntimeError(f"DuckDB S3 extension configuration failed: {str(e)}")
# Legacy compatibility properties (maintain API surface)
self.file_path: str = None
self.file_type: str = None
self.active_sheet: str = None
self.available_sheets: list[str] = []
self.column_names: list[str] = [] # Primary table columns
self.table_name: str = "data" # Primary table name
# Enterprise-grade enhancements
self.tables_info: List[Dict[str, Any]] = [] # Multi-table metadata
self.query_cache: Dict[str, Dict[str, Any]] = {}
self.max_cache_size: int = 100
# Initialize modular components with distributed clients
self.file_handlers = FileHandlers(self.conn, self.minio_client)
self.llm_service = LLMService()
logger.info("[DATA_ENGINE] Enterprise data query engine initialized successfully with stateless architecture")
def __enter__(self):
"""Support for context manager protocol"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Clean up resources when exiting context manager"""
self.close()
return False
def load_file(self, object_name: str, original_filename: str, sheet_name: str = None) -> dict:
"""
Load data with enterprise-grade performance and multi-table support from MinIO.
Revolutionary Phase 2 improvements:
- Stateless architecture with MinIO distributed file access
- DuckDB-native CSV/Parquet loading from S3-compatible storage
- Multi-sheet Excel as separate queryable tables
- Comprehensive metadata tracking
- Intelligent caching and state management
Args:
object_name: MinIO object name (file identifier in storage)
original_filename: Original filename for metadata and type detection
sheet_name: Sheet name for Excel files (None for first sheet)
Returns:
dict: Comprehensive loading status with enterprise metadata
"""
logger.info(f"[DATA_ENGINE] Enterprise file loading from MinIO: {original_filename} (object: {object_name})")
# Store file metadata (legacy compatibility)
self.file_path = f"s3://{MINIO_BUCKET_NAME}/{object_name}" # Virtual S3 path
file_ext = Path(original_filename).suffix.lower()
# Clear previous state
self.query_cache = {}
try:
# Use our enterprise-grade file handlers with MinIO
result = self.file_handlers.load_file(object_name, original_filename, sheet_name)
if result["status"] == "error":
logger.error(f"[DATA_ENGINE] File loading failed: {result['message']}")
return result
# Update engine state with enterprise metadata
self._update_engine_state(result, file_ext)
# Log enterprise performance metrics
total_rows = sum(table.get("row_count", 0) for table in self.tables_info)
total_columns = sum(len(table.get("column_names", [])) for table in self.tables_info)
logger.info(f"[DATA_ENGINE] Enterprise loading complete: {len(self.tables_info)} tables, {total_rows:,} rows, {total_columns} columns")
return result
except Exception as e:
logger.error(f"[DATA_ENGINE] Critical error in enterprise file loading: {str(e)}")
return {"status": "error", "message": f"Error loading file: {str(e)}"}
def _update_engine_state(self, load_result: Dict[str, Any], file_ext: str):
"""
Update engine state with enterprise-grade metadata tracking.
Maintains backward compatibility while adding multi-table capabilities.
"""
# Set file type
if file_ext in ['.xlsx', '.xls', '.xlsm']:
self.file_type = 'excel'
elif file_ext == '.csv':
self.file_type = 'csv'
elif file_ext == '.json':
self.file_type = 'json'
elif file_ext == '.parquet':
self.file_type = 'parquet'
# Update multi-table information
self.tables_info = load_result.get("tables_info", [])
# Update legacy compatibility properties (primary table)
if self.tables_info:
primary_table = self.tables_info[0]
self.column_names = primary_table.get("column_names", [])
self.table_name = primary_table.get("table_name", "data")
# Update Excel-specific properties
self.available_sheets = load_result.get("available_sheets", [])
self.active_sheet = load_result.get("active_sheet", None)
logger.debug(f"[ENGINE_STATE] Updated state: {len(self.tables_info)} tables, active_sheet: {self.active_sheet}")
def change_sheet(self, sheet_name: str) -> dict:
"""
Enterprise-grade sheet switching with full state management.
Enhanced for multi-table context awareness.
"""
logger.info(f"[DATA_ENGINE] Enterprise sheet change: {sheet_name}")
if self.file_type != 'excel':
return {"status": "error", "message": "Current file is not an Excel file"}
if sheet_name not in self.available_sheets:
return {"status": "error", "message": f"Sheet '{sheet_name}' not found in Excel file"}
try:
# Clear cache for state consistency
self.query_cache = {}
# Use enterprise file handlers for sheet switching
result = self.file_handlers.change_sheet(self.file_path, sheet_name, self.available_sheets)
if result["status"] == "success":
# Update engine state
self._update_engine_state(result, Path(self.file_path).suffix.lower())
logger.info(f"[DATA_ENGINE] Successfully switched to sheet: {sheet_name}")
return result
except Exception as e:
logger.error(f"[DATA_ENGINE] Error in enterprise sheet change: {str(e)}")
return {"status": "error", "message": f"Error changing sheet: {str(e)}"}
def get_column_names(self) -> List[str]:
"""Get column names of the primary loaded table (backward compatibility)."""
return self.column_names
def get_file_info(self) -> Dict[str, Any]:
"""
Get comprehensive file information with enterprise metadata.
Enhanced with multi-table awareness.
"""
return {
"file_path": self.file_path,
"file_type": self.file_type,
"active_sheet": self.active_sheet,
"available_sheets": self.available_sheets,
"column_count": len(self.column_names),
"columns": self.column_names,
"tables_info": self.tables_info, # Enterprise enhancement
"total_tables": len(self.tables_info) # Enterprise enhancement
}
def generate_sql_query(self, user_query: str) -> dict:
"""
Generate SQL query with enterprise-grade multi-table capabilities.
Revolutionary Phase 1 improvement: Multi-table query generation.
This method now intelligently handles:
- Single table queries (backward compatible)
- Multi-table queries (revolutionary enhancement)
- Cross-sheet Excel analysis
- Advanced business intelligence queries
"""
logger.info(f"[DATA_ENGINE] Enterprise SQL generation for: '{user_query[:50]}...'")
if not self.tables_info:
logger.warning("[DATA_ENGINE] No tables loaded for query generation")
return {"status": "error", "message": "No data loaded. Please load a file first."}
try:
# Use enterprise LLM service with multi-table awareness
result = self.llm_service.generate_sql_query_multi_table(
user_query,
self.tables_info,
self.conn
)
if result["status"] == "success":
logger.info(f"[DATA_ENGINE] Enterprise SQL generated successfully, tables used: {result.get('tables_used', [])}")
return result
except Exception as e:
logger.error(f"[DATA_ENGINE] Error in enterprise SQL generation: {str(e)}")
return {"status": "error", "message": f"Error generating SQL query: {str(e)}"}
def execute_query(self, sql_query: str) -> Dict[str, Any]:
"""
Execute SQL query with enterprise-grade performance and error handling.
Enhanced with comprehensive logging and multi-table context.
"""
logger.info(f"[DATA_ENGINE] Executing enterprise query")
if not self.tables_info:
return {"status": "error", "message": "No data loaded. Please load a file first."}
try:
# Execute query with DuckDB's high-performance engine
duckdb_result = self.conn.execute(sql_query)
result = duckdb_result.fetchall()
if result:
column_names_from_duckdb = [desc[0] for desc in duckdb_result.description]
data_list = []
for row in result:
data_list.append(dict(zip(column_names_from_duckdb, row)))
logger.info(f"[DATA_ENGINE] Query executed successfully: {len(data_list)} rows, {len(column_names_from_duckdb)} columns")
return {
"status": "success",
"data": data_list,
"row_count": len(data_list),
"column_count": len(column_names_from_duckdb)
}
else:
logger.info("[DATA_ENGINE] Query executed successfully but returned no data")
return {
"status": "success",
"data": [],
"row_count": 0,
"column_count": 0,
"message": "Query executed successfully but returned no data"
}
except Exception as e:
logger.error(f"[DATA_ENGINE] Query execution error: {str(e)}")
return {"status": "error", "message": f"Error executing query: {str(e)}"}
def analyze_query_results(self, user_query: str, sql_query: str, query_results: List[Dict[str, Any]]) -> dict:
"""
Enterprise-grade query result analysis with business intelligence.
Enhanced with multi-table context awareness.
"""
logger.info("[DATA_ENGINE] Performing enterprise result analysis")
try:
result = self.llm_service.analyze_query_results(
user_query,
sql_query,
query_results,
self.tables_info
)
if result["status"] == "success":
logger.info("[DATA_ENGINE] Enterprise analysis completed successfully")
return result
except Exception as e:
logger.error(f"[DATA_ENGINE] Error in enterprise analysis: {str(e)}")
return {"status": "error", "message": f"Error generating analysis: {str(e)}"}
def process_query(self, user_query: str) -> Dict[str, Any]:
"""Process a user query, determining if it's strategic and handling accordingly"""
if not self.column_names:
return {"status": "error", "message": "No data loaded. Please load a file first."}
try:
# Clean the user query
user_query = user_query.replace("'", "")
# Use LLM to determine if this is a strategic question
is_strategic = self.llm_service.is_strategic_question(user_query)
if is_strategic:
# Get query plan
query_plan = self.llm_service.understand_strategic_query(user_query, self.table_name, self.column_names)
if not query_plan or "queries" not in query_plan or not query_plan["queries"]:
logger.error(f"Strategic query understanding failed to produce a valid query plan for user query: '{user_query}'. Plan received: {query_plan}")
return {"status": "error", "message": "Failed to analyze strategic question due to an issue in query planning."}
all_results = []
all_queries = []
logger.info(f"Executing strategic query plan for user query: '{user_query}'. Plan: {json.dumps(query_plan, indent=2)}")
for query_info in query_plan["queries"]:
sql = query_info.get("sql")
purpose = query_info.get("purpose", "No purpose provided")
if not sql:
logger.warning(f"Skipping query in plan due to missing SQL. Purpose: '{purpose}'")
continue
logger.info(f"Executing strategic sub-query. Purpose: '{purpose}', SQL: '{sql}'")
try:
result = self.execute_query(sql)
status = result.get("status")
data = result.get("data")
message = result.get("message", "N/A")
row_count = result.get("row_count", 0)
logger.info(f"Sub-query execution result: Status: {status}, RowCount: {row_count}, HasData: {bool(data)}, Message: {message}")
if status == "success" and data:
all_results.append(data)
all_queries.append({"sql": sql, "purpose": purpose})
logger.info(f"Successfully executed and added results for sub-query. Purpose: '{purpose}'")
elif status == "success" and not data:
logger.warning(f"Strategic sub-query executed successfully but returned no data. Purpose: '{purpose}', SQL: '{sql}'")
else:
logger.error(f"Strategic sub-query failed. Purpose: '{purpose}', SQL: '{sql}', Status: {status}, Error: {message}")
except Exception as e:
logger.error(f"Exception during strategic sub-query execution: {str(e)}. SQL: '{sql}', Purpose: '{purpose}'", exc_info=True)
continue
if not all_results:
logger.error(f"Failed to gather any data from strategic query plan. User query: '{user_query}'. Executed queries info: {all_queries}")
return {"status": "error", "message": "Failed to gather necessary data for analysis. All planned sub-queries returned no data or failed."}
# Generate comprehensive analysis
analysis = self.llm_service.analyze_strategic_results(user_query, query_plan, all_results)
# Prepare final result and handle non-serializable floats
final_result = {
"status": "success",
"type": "strategic",
"data": all_results,
"queries": all_queries,
"analysis": analysis,
"metrics": query_plan["metrics"],
"dimensions": query_plan["dimensions"]
}
# Process data to replace NaN and Infinity with None
def clean_data(data):
if isinstance(data, list):
return [clean_data(item) for item in data]
elif isinstance(data, dict):
return {key: clean_data(value) for key, value in data.items()}
elif isinstance(data, float) and (data != data or data in [float('inf'), float('-inf')]):
return None
return data
final_result["data"] = clean_data(final_result["data"])
return final_result
# Handle non-strategic queries
understanding = self.llm_service.understand_query(user_query, self.table_name, self.column_names)
if understanding["status"] == "error":
return understanding
result = self.llm_service.execute_instructions(
understanding["instructions"],
understanding["type"],
user_query,
self.table_name,
self.column_names
)
if result["status"] == "error":
return result
if result["type"] == "sql":
query_result = self.execute_query(result["cleaned_sql"])
if query_result["status"] == "error":
return query_result
analysis_result = None
if query_result["data"]:
analysis_result = self.llm_service.analyze_query_results(
user_query,
result["cleaned_sql"],
query_result["data"]
)
final_result = {
"status": "success",
"type": "sql",
"data": query_result["data"],
"row_count": query_result["row_count"],
"column_count": query_result["column_count"],
"sql": {
"raw": result["raw_sql"],
"cleaned": result["cleaned_sql"]
}
}
if analysis_result and analysis_result["status"] == "success":
final_result["analysis"] = analysis_result["analysis"]
# Process data to replace NaN and Infinity with None
def clean_data(data):
if isinstance(data, list):
return [clean_data(item) for item in data]
elif isinstance(data, dict):
return {key: clean_data(value) for key, value in data.items()}
elif isinstance(data, float) and (data != data or data in [float('inf'), float('-inf')]):
return None
return data
final_result["data"] = clean_data(final_result["data"])
return final_result
else:
return result
except Exception as e:
logger.error(f"Error in query processing: {str(e)}")
return {"status": "error", "message": f"Error processing query: {str(e)}"}
def get_data_profile(self) -> dict:
"""
Generate enterprise-grade data profile with multi-table statistics.
Enhanced with comprehensive multi-table metadata.
"""
logger.info("[DATA_ENGINE] Generating enterprise data profile")
if not self.tables_info:
return {"status": "error", "message": "No data loaded. Please load a file first."}
try:
profile = {
"status": "success",
"file_info": {
"file_path": self.file_path,
"file_type": self.file_type,
"active_sheet": self.active_sheet,
"available_sheets": self.available_sheets
},
"tables_summary": {
"total_tables": len(self.tables_info),
"total_rows": sum(table.get("row_count", 0) for table in self.tables_info),
"total_columns": sum(len(table.get("column_names", [])) for table in self.tables_info)
},
"tables_detail": []
}
# Generate detailed statistics for each table
for table_info in self.tables_info:
table_name = table_info["table_name"]
try:
# Get basic table stats
row_count = self.conn.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[0]
table_detail = {
"table_name": table_name,
"sheet_name": table_info.get("sheet_name", table_name),
"row_count": row_count,
"column_count": len(table_info["column_names"]),
"columns": table_info["column_names"]
}
profile["tables_detail"].append(table_detail)
except Exception as table_error:
logger.warning(f"[DATA_PROFILE] Error profiling table {table_name}: {str(table_error)}")
continue
logger.info(f"[DATA_ENGINE] Enterprise data profile generated: {profile['tables_summary']['total_tables']} tables")
return profile
except Exception as e:
logger.error(f"[DATA_ENGINE] Error generating enterprise data profile: {str(e)}")
return {"status": "error", "message": f"Error generating data profile: {str(e)}"}
def query(self, sql: str) -> Dict[str, Any]:
"""
Execute a direct SQL query on the loaded data.
Args:
sql: SQL query to execute
Returns:
dict: Query result with status, data, and metadata
"""
logger.info(f"[DATA_ENGINE] Executing direct SQL query: {sql[:100]}...")
try:
return self.execute_query(sql)
except Exception as e:
logger.error(f"[DATA_ENGINE] Direct SQL query failed: {str(e)}")
return {"status": "error", "message": f"Query execution failed: {str(e)}"}
def close(self):
"""Clean up enterprise resources and connections."""
if hasattr(self, 'conn') and self.conn:
try:
self.conn.close()
logger.info("[DATA_ENGINE] Enterprise database connection closed")
except Exception as e:
logger.error(f"[DATA_ENGINE] Error closing database connection: {str(e)}")
# Legacy compatibility methods (maintained for backward compatibility)
def _get_cache_key(self, query: str) -> str:
"""Generate cache key (legacy compatibility)."""
cache_input = f"{self.file_path}:{self.active_sheet}:{query}"
return hashlib.md5(cache_input.encode()).hexdigest()
def _add_to_cache(self, query: str, result: Dict[str, Any]) -> None:
"""Add query result to cache (legacy compatibility)."""
if len(self.query_cache) >= self.max_cache_size:
oldest_key = next(iter(self.query_cache))
del self.query_cache[oldest_key]
cache_key = self._get_cache_key(query)
self.query_cache[cache_key] = result
def _get_from_cache(self, query: str) -> Optional[Dict[str, Any]]:
"""Get query result from cache (legacy compatibility)."""
cache_key = self._get_cache_key(query)
if cache_key in self.query_cache:
logger.info(f"[CACHE] Cache hit for query: {query[:30]}...")
return self.query_cache[cache_key]
return None
if __name__ == "__main__":
# Enterprise-grade interactive mode
engine = DataQueryEngine()
print("\n🔍 Welcome to the Enterprise Data Query Assistant! Let's explore your data together.\n")
# This would typically be configured via environment or config file
file_path = os.getenv('DATA_FILE_PATH', '/content/sales_data_sample.csv')
load_result = engine.load_file(file_path)
if load_result["status"] == "error":
print(json.dumps({"status": "error", "message": load_result['message']}))
exit()
print(json.dumps({"status": "success", "message": load_result['message']}))
# Display enterprise context
if engine.file_type == 'excel' and len(engine.available_sheets) > 1:
print(json.dumps({"available_sheets": engine.available_sheets, "active_sheet": engine.active_sheet}))
print(json.dumps({
"enterprise_context": {
"total_tables": len(engine.tables_info),
"total_rows": sum(t.get("row_count", 0) for t in engine.tables_info),
"capabilities": [
"Multi-table analysis across Excel sheets",
"Enterprise-grade SQL query generation",
"Advanced business intelligence insights",
"High-performance DuckDB-native processing"
]
},
"help": {
"instructions": [
"Ask questions about data across multiple tables/sheets",
"Request business insights and strategic analysis",
"Type 'sheet:<name>' to switch sheets (Excel only)",
"Type 'exit' to quit"
],
"example_queries": [
"What's the overall trend across all data?",
"Compare performance between different sheets",
"How can we improve our key metrics?"
]
}
}))
# Interactive loop with enterprise capabilities
while True:
user_input = input("\n👉 Enter your enterprise query: ")
if user_input.lower() == 'exit':
print(json.dumps({"status": "exit", "message": "Thanks for using the Enterprise Data Query Assistant! Goodbye!"}))
break
# Handle sheet switching
if user_input.lower().startswith('sheet:'):
sheet_name = user_input[6:].strip()
sheet_result = engine.change_sheet(sheet_name)
print(json.dumps(sheet_result))
continue
# Process enterprise query
result = engine.process_query(user_input)
print(json.dumps(result, indent=2))
engine.close()