corpusdb / app /database_analytics.py
mrsavage1's picture
Upload 52 files
723f9ab verified
"""
Database Analytics Module
Provides insights and statistics about database usage
"""
from typing import Dict, List
from datetime import datetime, timedelta
from app.query_engine import create_query_engine
class DatabaseAnalytics:
"""Analyze database usage and provide insights"""
def __init__(self):
pass
def get_table_stats(self, user_store, database: str, table: str) -> Dict:
"""Get detailed statistics for a table"""
engine = create_query_engine(user_store)
try:
# Get row count
count_result = engine.execute_sql(f'SELECT COUNT(*) as count FROM "{database}"."{table}"')
row_count = count_result['data'][0]['count'] if count_result['ok'] else 0
# Get column info
schema_result = engine.execute_sql(f'DESCRIBE "{database}"."{table}"')
columns = schema_result['data'] if schema_result['ok'] else []
# Get sample data
sample_result = engine.execute_sql(f'SELECT * FROM "{database}"."{table}" LIMIT 5')
sample_data = sample_result['data'] if sample_result['ok'] else []
return {
'ok': True,
'database': database,
'table': table,
'row_count': row_count,
'column_count': len(columns),
'columns': columns,
'sample_data': sample_data,
'size_estimate': self._estimate_size(row_count, len(columns))
}
except Exception as e:
return {'ok': False, 'error': str(e)}
finally:
engine.close()
def get_database_summary(self, user_store, database: str) -> Dict:
"""Get summary statistics for entire database"""
from app.table_manager import table_manager
tables = table_manager.list(user_store)
db_tables = [t for t in tables if t['database'] == database]
total_rows = 0
total_columns = 0
engine = create_query_engine(user_store)
try:
for table_info in db_tables:
try:
result = engine.execute_sql(f'SELECT COUNT(*) as count FROM "{database}"."{table_info["table"]}"')
if result['ok']:
total_rows += result['data'][0]['count']
schema = engine.execute_sql(f'DESCRIBE "{database}"."{table_info["table"]}"')
if schema['ok']:
total_columns += len(schema['data'])
except:
pass
return {
'ok': True,
'database': database,
'table_count': len(db_tables),
'total_rows': total_rows,
'total_columns': total_columns,
'tables': [t['table'] for t in db_tables],
'size_estimate': self._estimate_size(total_rows, total_columns)
}
finally:
engine.close()
def get_query_suggestions(self, user_store, database: str, table: str) -> List[str]:
"""Generate helpful query suggestions"""
suggestions = [
f'SELECT * FROM "{database}"."{table}" LIMIT 10',
f'SELECT COUNT(*) FROM "{database}"."{table}"',
f'SELECT * FROM "{database}"."{table}" ORDER BY id DESC LIMIT 10',
]
# Get column names for more suggestions
engine = create_query_engine(user_store)
try:
schema = engine.execute_sql(f'DESCRIBE "{database}"."{table}"')
if schema['ok'] and schema['data']:
first_col = schema['data'][0].get('column_name', 'id')
suggestions.append(f'SELECT {first_col}, COUNT(*) FROM "{database}"."{table}" GROUP BY {first_col}')
except:
pass
finally:
engine.close()
return suggestions
def _estimate_size(self, rows: int, columns: int) -> str:
"""Estimate data size"""
# Rough estimate: 100 bytes per cell
bytes_estimate = rows * columns * 100
if bytes_estimate < 1024:
return f"{bytes_estimate} B"
elif bytes_estimate < 1024 * 1024:
return f"{bytes_estimate / 1024:.2f} KB"
elif bytes_estimate < 1024 * 1024 * 1024:
return f"{bytes_estimate / (1024 * 1024):.2f} MB"
else:
return f"{bytes_estimate / (1024 * 1024 * 1024):.2f} GB"
database_analytics = DatabaseAnalytics()