""" Database Analytics Module Provides insights and statistics about database usage """ from typing import Dict, List from datetime import datetime, timedelta from app.query_engine import create_query_engine class DatabaseAnalytics: """Analyze database usage and provide insights""" def __init__(self): pass def get_table_stats(self, user_store, database: str, table: str) -> Dict: """Get detailed statistics for a table""" engine = create_query_engine(user_store) try: # Get row count count_result = engine.execute_sql(f'SELECT COUNT(*) as count FROM "{database}"."{table}"') row_count = count_result['data'][0]['count'] if count_result['ok'] else 0 # Get column info schema_result = engine.execute_sql(f'DESCRIBE "{database}"."{table}"') columns = schema_result['data'] if schema_result['ok'] else [] # Get sample data sample_result = engine.execute_sql(f'SELECT * FROM "{database}"."{table}" LIMIT 5') sample_data = sample_result['data'] if sample_result['ok'] else [] return { 'ok': True, 'database': database, 'table': table, 'row_count': row_count, 'column_count': len(columns), 'columns': columns, 'sample_data': sample_data, 'size_estimate': self._estimate_size(row_count, len(columns)) } except Exception as e: return {'ok': False, 'error': str(e)} finally: engine.close() def get_database_summary(self, user_store, database: str) -> Dict: """Get summary statistics for entire database""" from app.table_manager import table_manager tables = table_manager.list(user_store) db_tables = [t for t in tables if t['database'] == database] total_rows = 0 total_columns = 0 engine = create_query_engine(user_store) try: for table_info in db_tables: try: result = engine.execute_sql(f'SELECT COUNT(*) as count FROM "{database}"."{table_info["table"]}"') if result['ok']: total_rows += result['data'][0]['count'] schema = engine.execute_sql(f'DESCRIBE "{database}"."{table_info["table"]}"') if schema['ok']: total_columns += len(schema['data']) except: pass return { 'ok': True, 'database': database, 'table_count': len(db_tables), 'total_rows': total_rows, 'total_columns': total_columns, 'tables': [t['table'] for t in db_tables], 'size_estimate': self._estimate_size(total_rows, total_columns) } finally: engine.close() def get_query_suggestions(self, user_store, database: str, table: str) -> List[str]: """Generate helpful query suggestions""" suggestions = [ f'SELECT * FROM "{database}"."{table}" LIMIT 10', f'SELECT COUNT(*) FROM "{database}"."{table}"', f'SELECT * FROM "{database}"."{table}" ORDER BY id DESC LIMIT 10', ] # Get column names for more suggestions engine = create_query_engine(user_store) try: schema = engine.execute_sql(f'DESCRIBE "{database}"."{table}"') if schema['ok'] and schema['data']: first_col = schema['data'][0].get('column_name', 'id') suggestions.append(f'SELECT {first_col}, COUNT(*) FROM "{database}"."{table}" GROUP BY {first_col}') except: pass finally: engine.close() return suggestions def _estimate_size(self, rows: int, columns: int) -> str: """Estimate data size""" # Rough estimate: 100 bytes per cell bytes_estimate = rows * columns * 100 if bytes_estimate < 1024: return f"{bytes_estimate} B" elif bytes_estimate < 1024 * 1024: return f"{bytes_estimate / 1024:.2f} KB" elif bytes_estimate < 1024 * 1024 * 1024: return f"{bytes_estimate / (1024 * 1024):.2f} MB" else: return f"{bytes_estimate / (1024 * 1024 * 1024):.2f} GB" database_analytics = DatabaseAnalytics()