File size: 4,648 Bytes
723f9ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""
Database Analytics Module
Provides insights and statistics about database usage
"""

from typing import Dict, List
from datetime import datetime, timedelta
from app.query_engine import create_query_engine

class DatabaseAnalytics:
    """Analyze database usage and provide insights"""
    
    def __init__(self):
        pass
    
    def get_table_stats(self, user_store, database: str, table: str) -> Dict:
        """Get detailed statistics for a table"""
        engine = create_query_engine(user_store)
        
        try:
            # Get row count
            count_result = engine.execute_sql(f'SELECT COUNT(*) as count FROM "{database}"."{table}"')
            row_count = count_result['data'][0]['count'] if count_result['ok'] else 0
            
            # Get column info
            schema_result = engine.execute_sql(f'DESCRIBE "{database}"."{table}"')
            columns = schema_result['data'] if schema_result['ok'] else []
            
            # Get sample data
            sample_result = engine.execute_sql(f'SELECT * FROM "{database}"."{table}" LIMIT 5')
            sample_data = sample_result['data'] if sample_result['ok'] else []
            
            return {
                'ok': True,
                'database': database,
                'table': table,
                'row_count': row_count,
                'column_count': len(columns),
                'columns': columns,
                'sample_data': sample_data,
                'size_estimate': self._estimate_size(row_count, len(columns))
            }
        except Exception as e:
            return {'ok': False, 'error': str(e)}
        finally:
            engine.close()
    
    def get_database_summary(self, user_store, database: str) -> Dict:
        """Get summary statistics for entire database"""
        from app.table_manager import table_manager
        
        tables = table_manager.list(user_store)
        db_tables = [t for t in tables if t['database'] == database]
        
        total_rows = 0
        total_columns = 0
        
        engine = create_query_engine(user_store)
        
        try:
            for table_info in db_tables:
                try:
                    result = engine.execute_sql(f'SELECT COUNT(*) as count FROM "{database}"."{table_info["table"]}"')
                    if result['ok']:
                        total_rows += result['data'][0]['count']
                    
                    schema = engine.execute_sql(f'DESCRIBE "{database}"."{table_info["table"]}"')
                    if schema['ok']:
                        total_columns += len(schema['data'])
                except:
                    pass
            
            return {
                'ok': True,
                'database': database,
                'table_count': len(db_tables),
                'total_rows': total_rows,
                'total_columns': total_columns,
                'tables': [t['table'] for t in db_tables],
                'size_estimate': self._estimate_size(total_rows, total_columns)
            }
        finally:
            engine.close()
    
    def get_query_suggestions(self, user_store, database: str, table: str) -> List[str]:
        """Generate helpful query suggestions"""
        suggestions = [
            f'SELECT * FROM "{database}"."{table}" LIMIT 10',
            f'SELECT COUNT(*) FROM "{database}"."{table}"',
            f'SELECT * FROM "{database}"."{table}" ORDER BY id DESC LIMIT 10',
        ]
        
        # Get column names for more suggestions
        engine = create_query_engine(user_store)
        try:
            schema = engine.execute_sql(f'DESCRIBE "{database}"."{table}"')
            if schema['ok'] and schema['data']:
                first_col = schema['data'][0].get('column_name', 'id')
                suggestions.append(f'SELECT {first_col}, COUNT(*) FROM "{database}"."{table}" GROUP BY {first_col}')
        except:
            pass
        finally:
            engine.close()
        
        return suggestions
    
    def _estimate_size(self, rows: int, columns: int) -> str:
        """Estimate data size"""
        # Rough estimate: 100 bytes per cell
        bytes_estimate = rows * columns * 100
        
        if bytes_estimate < 1024:
            return f"{bytes_estimate} B"
        elif bytes_estimate < 1024 * 1024:
            return f"{bytes_estimate / 1024:.2f} KB"
        elif bytes_estimate < 1024 * 1024 * 1024:
            return f"{bytes_estimate / (1024 * 1024):.2f} MB"
        else:
            return f"{bytes_estimate / (1024 * 1024 * 1024):.2f} GB"

database_analytics = DatabaseAnalytics()