""" Database Performance Optimization Script Adds indexes and optimizations for improved query performance """ import logging from sqlalchemy import text from sqlalchemy.engine import Engine logger = logging.getLogger(__name__) class DatabaseOptimizer: """Database performance optimization utilities""" def __init__(self, engine: Engine): self.engine = engine def create_performance_indexes(self) -> None: """Create performance indexes for common query patterns""" logger.info("Creating performance indexes...") with self.engine.connect() as conn: # Case Management Indexes self._create_case_indexes(conn) # User Management Indexes self._create_user_indexes(conn) # Transaction Indexes self._create_transaction_indexes(conn) # Audit and Activity Indexes self._create_audit_indexes(conn) logger.info("Performance indexes created successfully") def _create_case_indexes(self, conn) -> None: """Create indexes for case-related queries""" indexes = [ # Composite indexes for common filters "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_status_priority_created ON cases(status, priority, created_at DESC)", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_assignee_status ON cases(assignee_id, status) WHERE assignee_id IS NOT NULL", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_priority_risk_score ON cases(priority, risk_score DESC)", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_created_at_status ON cases(created_at DESC, status)", # Partial indexes for active cases "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_active_only ON cases(created_at DESC) WHERE status != 'closed'", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_high_priority_open ON cases(created_at DESC) WHERE priority IN ('high', 'critical') AND status = 'open'", # Text search indexes (if PostgreSQL) "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_title_trgm ON cases USING gin (title gin_trgm_ops)", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_description_trgm ON cases USING gin ((case_metadata->>'description') gin_trgm_ops)", # JSON indexes for metadata queries "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_type ON cases((case_metadata->>'type'))", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_amount ON cases(CAST(case_metadata->>'amount' AS numeric)) WHERE case_metadata->>'amount' IS NOT NULL", # Date-based indexes for reporting "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_month_year ON cases(EXTRACT(year FROM created_at), EXTRACT(month FROM created_at))", ] for index_sql in indexes: try: conn.execute(text(index_sql)) conn.commit() logger.info(f"Created index: {index_sql.split('ON')[0].strip()}") except Exception as e: logger.warning(f"Failed to create index: {e}") conn.rollback() def _create_user_indexes(self, conn) -> None: """Create indexes for user-related queries""" indexes = [ # User authentication and lookup "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email_active ON users(email, is_active) WHERE is_active = true", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_role_active ON users(role, is_active) WHERE is_active = true", # User activity tracking "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_last_login ON users(last_login DESC) WHERE last_login IS NOT NULL", # MFA status for security reporting "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_mfa_enabled ON users(mfa_enabled) WHERE mfa_enabled = true", ] for index_sql in indexes: try: conn.execute(text(index_sql)) conn.commit() logger.info(f"Created user index: {index_sql.split('ON')[0].strip()}") except Exception as e: logger.warning(f"Failed to create user index: {e}") conn.rollback() def _create_transaction_indexes(self, conn) -> None: """Create indexes for transaction-related queries""" indexes = [ # Transaction lookup by case "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_case_amount ON transactions(case_id, amount DESC)", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_date_amount ON transactions(transaction_date DESC, amount)", # Fraud pattern analysis "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_amount_range ON transactions(amount) WHERE amount > 1000", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_merchant_pattern ON transactions(merchant_name, amount)", # Time-based analysis "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_hourly ON transactions(EXTRACT(hour FROM transaction_date), EXTRACT(day FROM transaction_date))", ] for index_sql in indexes: try: conn.execute(text(index_sql)) conn.commit() logger.info(f"Created transaction index: {index_sql.split('ON')[0].strip()}") except Exception as e: logger.warning(f"Failed to create transaction index: {e}") conn.rollback() def _create_audit_indexes(self, conn) -> None: """Create indexes for audit and activity queries""" indexes = [ # Case activity tracking "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_case_timestamp ON case_activities(case_id, timestamp DESC)", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_user_timestamp ON case_activities(user_id, timestamp DESC)", # Audit trail queries "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_action_timestamp ON case_activities(action, timestamp DESC)", "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_notes_case_created ON case_notes(case_id, created_at DESC)", ] for index_sql in indexes: try: conn.execute(text(index_sql)) conn.commit() logger.info(f"Created audit index: {index_sql.split('ON')[0].strip()}") except Exception as e: logger.warning(f"Failed to create audit index: {e}") conn.rollback() def optimize_table_settings(self) -> None: """Apply table-level optimizations""" logger.info("Applying table optimizations...") with self.engine.connect() as conn: # Analyze tables for query planner optimization tables_to_analyze = [ 'cases', 'users', 'transactions', 'case_activities', 'case_notes', 'evidence', 'fraud_alerts' ] for table in tables_to_analyze: try: conn.execute(text(f"ANALYZE {table}")) logger.info(f"Analyzed table: {table}") except Exception as e: logger.warning(f"Failed to analyze table {table}: {e}") # Vacuum large tables (PostgreSQL specific) try: for table in tables_to_analyze: conn.execute(text(f"VACUUM ANALYZE {table}")) logger.info(f"Vacuum analyzed table: {table}") except Exception as e: logger.warning(f"Vacuum analyze not supported or failed: {e}") conn.commit() def create_partitioning_strategy(self) -> None: """Set up table partitioning for large datasets""" logger.info("Setting up partitioning strategy...") with self.engine.connect() as conn: # Partition cases table by month (if large dataset) try: # Check if partitioning is needed (table size > 1M rows) result = conn.execute(text("SELECT COUNT(*) FROM cases")) case_count = result.fetchone()[0] if case_count > 1000000: # 1M+ cases logger.info(f"Large cases table detected ({case_count} records), partitioning recommended") # Note: Actual partitioning would require schema changes # This is a recommendation for future implementation else: logger.info(f"Cases table size OK ({case_count} records), no partitioning needed") except Exception as e: logger.warning(f"Could not check table sizes: {e}") def create_materialized_views(self) -> None: """Create materialized views for expensive queries""" logger.info("Creating materialized views for performance...") with self.engine.connect() as conn: # Materialized view for case statistics try: conn.execute(text(""" CREATE MATERIALIZED VIEW IF NOT EXISTS mv_case_stats AS SELECT DATE_TRUNC('month', created_at) as month, status, priority, COUNT(*) as case_count, AVG(risk_score) as avg_risk_score, SUM(fraud_amount) as total_fraud_amount FROM cases GROUP BY DATE_TRUNC('month', created_at), status, priority ORDER BY month DESC, case_count DESC """)) # Create index on materialized view conn.execute(text("CREATE INDEX IF NOT EXISTS idx_mv_case_stats_month ON mv_case_stats(month DESC)")) logger.info("Created materialized view: mv_case_stats") except Exception as e: logger.warning(f"Failed to create materialized view: {e}") conn.rollback() def run_performance_audit(self) -> dict: """Run performance audit and return recommendations""" logger.info("Running performance audit...") audit_results = { 'table_sizes': {}, 'index_usage': {}, 'slow_queries': [], 'recommendations': [] } with self.engine.connect() as conn: # Check table sizes tables = ['cases', 'users', 'transactions', 'case_activities'] for table in tables: try: result = conn.execute(text(f"SELECT COUNT(*) FROM {table}")) count = result.fetchone()[0] audit_results['table_sizes'][table] = count except Exception as e: logger.warning(f"Could not get size for {table}: {e}") # Generate recommendations based on data total_cases = audit_results['table_sizes'].get('cases', 0) if total_cases > 500000: audit_results['recommendations'].append("Consider partitioning the cases table by month") if total_cases > 100000: audit_results['recommendations'].append("Implement query result caching for case listings") total_activities = audit_results['table_sizes'].get('case_activities', 0) if total_activities > 1000000: audit_results['recommendations'].append("Archive old case activities to separate table") logger.info(f"Performance audit complete. Found {len(audit_results['recommendations'])} recommendations") return audit_results def optimize_database(engine: Engine) -> None: """Main function to optimize database performance""" optimizer = DatabaseOptimizer(engine) try: # Run audit first audit_results = optimizer.run_performance_audit() logger.info(f"Audit results: {audit_results}") # Apply optimizations optimizer.create_performance_indexes() optimizer.optimize_table_settings() optimizer.create_materialized_views() optimizer.create_partitioning_strategy() logger.info("Database optimization completed successfully") except Exception as e: logger.error(f"Database optimization failed: {e}") raise if __name__ == "__main__": # For standalone execution from core.database import engine optimize_database(engine)