File size: 12,696 Bytes
4ae946d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""
Database Performance Optimization Script
Adds indexes and optimizations for improved query performance
"""

import logging

from sqlalchemy import text
from sqlalchemy.engine import Engine

logger = logging.getLogger(__name__)


class DatabaseOptimizer:
    """Database performance optimization utilities"""

    def __init__(self, engine: Engine):
        self.engine = engine

    def create_performance_indexes(self) -> None:
        """Create performance indexes for common query patterns"""

        logger.info("Creating performance indexes...")

        with self.engine.connect() as conn:
            # Case Management Indexes
            self._create_case_indexes(conn)

            # User Management Indexes
            self._create_user_indexes(conn)

            # Transaction Indexes
            self._create_transaction_indexes(conn)

            # Audit and Activity Indexes
            self._create_audit_indexes(conn)

            logger.info("Performance indexes created successfully")

    def _create_case_indexes(self, conn) -> None:
        """Create indexes for case-related queries"""

        indexes = [
            # Composite indexes for common filters
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_status_priority_created ON cases(status, priority, created_at DESC)",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_assignee_status ON cases(assignee_id, status) WHERE assignee_id IS NOT NULL",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_priority_risk_score ON cases(priority, risk_score DESC)",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_created_at_status ON cases(created_at DESC, status)",

            # Partial indexes for active cases
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_active_only ON cases(created_at DESC) WHERE status != 'closed'",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_high_priority_open ON cases(created_at DESC) WHERE priority IN ('high', 'critical') AND status = 'open'",

            # Text search indexes (if PostgreSQL)
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_title_trgm ON cases USING gin (title gin_trgm_ops)",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_description_trgm ON cases USING gin ((case_metadata->>'description') gin_trgm_ops)",

            # JSON indexes for metadata queries
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_type ON cases((case_metadata->>'type'))",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_amount ON cases(CAST(case_metadata->>'amount' AS numeric)) WHERE case_metadata->>'amount' IS NOT NULL",

            # Date-based indexes for reporting
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_month_year ON cases(EXTRACT(year FROM created_at), EXTRACT(month FROM created_at))",
        ]

        for index_sql in indexes:
            try:
                conn.execute(text(index_sql))
                conn.commit()
                logger.info(f"Created index: {index_sql.split('ON')[0].strip()}")
            except Exception as e:
                logger.warning(f"Failed to create index: {e}")
                conn.rollback()

    def _create_user_indexes(self, conn) -> None:
        """Create indexes for user-related queries"""

        indexes = [
            # User authentication and lookup
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email_active ON users(email, is_active) WHERE is_active = true",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_role_active ON users(role, is_active) WHERE is_active = true",

            # User activity tracking
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_last_login ON users(last_login DESC) WHERE last_login IS NOT NULL",

            # MFA status for security reporting
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_mfa_enabled ON users(mfa_enabled) WHERE mfa_enabled = true",
        ]

        for index_sql in indexes:
            try:
                conn.execute(text(index_sql))
                conn.commit()
                logger.info(f"Created user index: {index_sql.split('ON')[0].strip()}")
            except Exception as e:
                logger.warning(f"Failed to create user index: {e}")
                conn.rollback()

    def _create_transaction_indexes(self, conn) -> None:
        """Create indexes for transaction-related queries"""

        indexes = [
            # Transaction lookup by case
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_case_amount ON transactions(case_id, amount DESC)",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_date_amount ON transactions(transaction_date DESC, amount)",

            # Fraud pattern analysis
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_amount_range ON transactions(amount) WHERE amount > 1000",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_merchant_pattern ON transactions(merchant_name, amount)",

            # Time-based analysis
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_hourly ON transactions(EXTRACT(hour FROM transaction_date), EXTRACT(day FROM transaction_date))",
        ]

        for index_sql in indexes:
            try:
                conn.execute(text(index_sql))
                conn.commit()
                logger.info(f"Created transaction index: {index_sql.split('ON')[0].strip()}")
            except Exception as e:
                logger.warning(f"Failed to create transaction index: {e}")
                conn.rollback()

    def _create_audit_indexes(self, conn) -> None:
        """Create indexes for audit and activity queries"""

        indexes = [
            # Case activity tracking
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_case_timestamp ON case_activities(case_id, timestamp DESC)",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_user_timestamp ON case_activities(user_id, timestamp DESC)",

            # Audit trail queries
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_action_timestamp ON case_activities(action, timestamp DESC)",
            "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_notes_case_created ON case_notes(case_id, created_at DESC)",
        ]

        for index_sql in indexes:
            try:
                conn.execute(text(index_sql))
                conn.commit()
                logger.info(f"Created audit index: {index_sql.split('ON')[0].strip()}")
            except Exception as e:
                logger.warning(f"Failed to create audit index: {e}")
                conn.rollback()

    def optimize_table_settings(self) -> None:
        """Apply table-level optimizations"""

        logger.info("Applying table optimizations...")

        with self.engine.connect() as conn:
            # Analyze tables for query planner optimization
            tables_to_analyze = [
                'cases', 'users', 'transactions', 'case_activities',
                'case_notes', 'evidence', 'fraud_alerts'
            ]

            for table in tables_to_analyze:
                try:
                    conn.execute(text(f"ANALYZE {table}"))
                    logger.info(f"Analyzed table: {table}")
                except Exception as e:
                    logger.warning(f"Failed to analyze table {table}: {e}")

            # Vacuum large tables (PostgreSQL specific)
            try:
                for table in tables_to_analyze:
                    conn.execute(text(f"VACUUM ANALYZE {table}"))
                    logger.info(f"Vacuum analyzed table: {table}")
            except Exception as e:
                logger.warning(f"Vacuum analyze not supported or failed: {e}")

            conn.commit()

    def create_partitioning_strategy(self) -> None:
        """Set up table partitioning for large datasets"""

        logger.info("Setting up partitioning strategy...")

        with self.engine.connect() as conn:
            # Partition cases table by month (if large dataset)
            try:
                # Check if partitioning is needed (table size > 1M rows)
                result = conn.execute(text("SELECT COUNT(*) FROM cases"))
                case_count = result.fetchone()[0]

                if case_count > 1000000:  # 1M+ cases
                    logger.info(f"Large cases table detected ({case_count} records), partitioning recommended")
                    # Note: Actual partitioning would require schema changes
                    # This is a recommendation for future implementation
                else:
                    logger.info(f"Cases table size OK ({case_count} records), no partitioning needed")

            except Exception as e:
                logger.warning(f"Could not check table sizes: {e}")

    def create_materialized_views(self) -> None:
        """Create materialized views for expensive queries"""

        logger.info("Creating materialized views for performance...")

        with self.engine.connect() as conn:
            # Materialized view for case statistics
            try:
                conn.execute(text("""
                    CREATE MATERIALIZED VIEW IF NOT EXISTS mv_case_stats AS
                    SELECT
                        DATE_TRUNC('month', created_at) as month,
                        status,
                        priority,
                        COUNT(*) as case_count,
                        AVG(risk_score) as avg_risk_score,
                        SUM(fraud_amount) as total_fraud_amount
                    FROM cases
                    GROUP BY DATE_TRUNC('month', created_at), status, priority
                    ORDER BY month DESC, case_count DESC
                """))

                # Create index on materialized view
                conn.execute(text("CREATE INDEX IF NOT EXISTS idx_mv_case_stats_month ON mv_case_stats(month DESC)"))

                logger.info("Created materialized view: mv_case_stats")

            except Exception as e:
                logger.warning(f"Failed to create materialized view: {e}")
                conn.rollback()

    def run_performance_audit(self) -> dict:
        """Run performance audit and return recommendations"""

        logger.info("Running performance audit...")

        audit_results = {
            'table_sizes': {},
            'index_usage': {},
            'slow_queries': [],
            'recommendations': []
        }

        with self.engine.connect() as conn:
            # Check table sizes
            tables = ['cases', 'users', 'transactions', 'case_activities']
            for table in tables:
                try:
                    result = conn.execute(text(f"SELECT COUNT(*) FROM {table}"))
                    count = result.fetchone()[0]
                    audit_results['table_sizes'][table] = count
                except Exception as e:
                    logger.warning(f"Could not get size for {table}: {e}")

            # Generate recommendations based on data
            total_cases = audit_results['table_sizes'].get('cases', 0)
            if total_cases > 500000:
                audit_results['recommendations'].append("Consider partitioning the cases table by month")
            if total_cases > 100000:
                audit_results['recommendations'].append("Implement query result caching for case listings")

            total_activities = audit_results['table_sizes'].get('case_activities', 0)
            if total_activities > 1000000:
                audit_results['recommendations'].append("Archive old case activities to separate table")

        logger.info(f"Performance audit complete. Found {len(audit_results['recommendations'])} recommendations")
        return audit_results


def optimize_database(engine: Engine) -> None:
    """Main function to optimize database performance"""

    optimizer = DatabaseOptimizer(engine)

    try:
        # Run audit first
        audit_results = optimizer.run_performance_audit()
        logger.info(f"Audit results: {audit_results}")

        # Apply optimizations
        optimizer.create_performance_indexes()
        optimizer.optimize_table_settings()
        optimizer.create_materialized_views()
        optimizer.create_partitioning_strategy()

        logger.info("Database optimization completed successfully")

    except Exception as e:
        logger.error(f"Database optimization failed: {e}")
        raise


if __name__ == "__main__":
    # For standalone execution
    from core.database import engine
    optimize_database(engine)