Spaces:
Paused
Paused
File size: 12,696 Bytes
4ae946d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 | """
Database Performance Optimization Script
Adds indexes and optimizations for improved query performance
"""
import logging
from sqlalchemy import text
from sqlalchemy.engine import Engine
logger = logging.getLogger(__name__)
class DatabaseOptimizer:
"""Database performance optimization utilities"""
def __init__(self, engine: Engine):
self.engine = engine
def create_performance_indexes(self) -> None:
"""Create performance indexes for common query patterns"""
logger.info("Creating performance indexes...")
with self.engine.connect() as conn:
# Case Management Indexes
self._create_case_indexes(conn)
# User Management Indexes
self._create_user_indexes(conn)
# Transaction Indexes
self._create_transaction_indexes(conn)
# Audit and Activity Indexes
self._create_audit_indexes(conn)
logger.info("Performance indexes created successfully")
def _create_case_indexes(self, conn) -> None:
"""Create indexes for case-related queries"""
indexes = [
# Composite indexes for common filters
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_status_priority_created ON cases(status, priority, created_at DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_assignee_status ON cases(assignee_id, status) WHERE assignee_id IS NOT NULL",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_priority_risk_score ON cases(priority, risk_score DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_created_at_status ON cases(created_at DESC, status)",
# Partial indexes for active cases
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_active_only ON cases(created_at DESC) WHERE status != 'closed'",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_high_priority_open ON cases(created_at DESC) WHERE priority IN ('high', 'critical') AND status = 'open'",
# Text search indexes (if PostgreSQL)
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_title_trgm ON cases USING gin (title gin_trgm_ops)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_description_trgm ON cases USING gin ((case_metadata->>'description') gin_trgm_ops)",
# JSON indexes for metadata queries
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_type ON cases((case_metadata->>'type'))",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_amount ON cases(CAST(case_metadata->>'amount' AS numeric)) WHERE case_metadata->>'amount' IS NOT NULL",
# Date-based indexes for reporting
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_month_year ON cases(EXTRACT(year FROM created_at), EXTRACT(month FROM created_at))",
]
for index_sql in indexes:
try:
conn.execute(text(index_sql))
conn.commit()
logger.info(f"Created index: {index_sql.split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Failed to create index: {e}")
conn.rollback()
def _create_user_indexes(self, conn) -> None:
"""Create indexes for user-related queries"""
indexes = [
# User authentication and lookup
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email_active ON users(email, is_active) WHERE is_active = true",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_role_active ON users(role, is_active) WHERE is_active = true",
# User activity tracking
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_last_login ON users(last_login DESC) WHERE last_login IS NOT NULL",
# MFA status for security reporting
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_mfa_enabled ON users(mfa_enabled) WHERE mfa_enabled = true",
]
for index_sql in indexes:
try:
conn.execute(text(index_sql))
conn.commit()
logger.info(f"Created user index: {index_sql.split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Failed to create user index: {e}")
conn.rollback()
def _create_transaction_indexes(self, conn) -> None:
"""Create indexes for transaction-related queries"""
indexes = [
# Transaction lookup by case
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_case_amount ON transactions(case_id, amount DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_date_amount ON transactions(transaction_date DESC, amount)",
# Fraud pattern analysis
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_amount_range ON transactions(amount) WHERE amount > 1000",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_merchant_pattern ON transactions(merchant_name, amount)",
# Time-based analysis
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_hourly ON transactions(EXTRACT(hour FROM transaction_date), EXTRACT(day FROM transaction_date))",
]
for index_sql in indexes:
try:
conn.execute(text(index_sql))
conn.commit()
logger.info(f"Created transaction index: {index_sql.split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Failed to create transaction index: {e}")
conn.rollback()
def _create_audit_indexes(self, conn) -> None:
"""Create indexes for audit and activity queries"""
indexes = [
# Case activity tracking
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_case_timestamp ON case_activities(case_id, timestamp DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_user_timestamp ON case_activities(user_id, timestamp DESC)",
# Audit trail queries
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_action_timestamp ON case_activities(action, timestamp DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_notes_case_created ON case_notes(case_id, created_at DESC)",
]
for index_sql in indexes:
try:
conn.execute(text(index_sql))
conn.commit()
logger.info(f"Created audit index: {index_sql.split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Failed to create audit index: {e}")
conn.rollback()
def optimize_table_settings(self) -> None:
"""Apply table-level optimizations"""
logger.info("Applying table optimizations...")
with self.engine.connect() as conn:
# Analyze tables for query planner optimization
tables_to_analyze = [
'cases', 'users', 'transactions', 'case_activities',
'case_notes', 'evidence', 'fraud_alerts'
]
for table in tables_to_analyze:
try:
conn.execute(text(f"ANALYZE {table}"))
logger.info(f"Analyzed table: {table}")
except Exception as e:
logger.warning(f"Failed to analyze table {table}: {e}")
# Vacuum large tables (PostgreSQL specific)
try:
for table in tables_to_analyze:
conn.execute(text(f"VACUUM ANALYZE {table}"))
logger.info(f"Vacuum analyzed table: {table}")
except Exception as e:
logger.warning(f"Vacuum analyze not supported or failed: {e}")
conn.commit()
def create_partitioning_strategy(self) -> None:
"""Set up table partitioning for large datasets"""
logger.info("Setting up partitioning strategy...")
with self.engine.connect() as conn:
# Partition cases table by month (if large dataset)
try:
# Check if partitioning is needed (table size > 1M rows)
result = conn.execute(text("SELECT COUNT(*) FROM cases"))
case_count = result.fetchone()[0]
if case_count > 1000000: # 1M+ cases
logger.info(f"Large cases table detected ({case_count} records), partitioning recommended")
# Note: Actual partitioning would require schema changes
# This is a recommendation for future implementation
else:
logger.info(f"Cases table size OK ({case_count} records), no partitioning needed")
except Exception as e:
logger.warning(f"Could not check table sizes: {e}")
def create_materialized_views(self) -> None:
"""Create materialized views for expensive queries"""
logger.info("Creating materialized views for performance...")
with self.engine.connect() as conn:
# Materialized view for case statistics
try:
conn.execute(text("""
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_case_stats AS
SELECT
DATE_TRUNC('month', created_at) as month,
status,
priority,
COUNT(*) as case_count,
AVG(risk_score) as avg_risk_score,
SUM(fraud_amount) as total_fraud_amount
FROM cases
GROUP BY DATE_TRUNC('month', created_at), status, priority
ORDER BY month DESC, case_count DESC
"""))
# Create index on materialized view
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_mv_case_stats_month ON mv_case_stats(month DESC)"))
logger.info("Created materialized view: mv_case_stats")
except Exception as e:
logger.warning(f"Failed to create materialized view: {e}")
conn.rollback()
def run_performance_audit(self) -> dict:
"""Run performance audit and return recommendations"""
logger.info("Running performance audit...")
audit_results = {
'table_sizes': {},
'index_usage': {},
'slow_queries': [],
'recommendations': []
}
with self.engine.connect() as conn:
# Check table sizes
tables = ['cases', 'users', 'transactions', 'case_activities']
for table in tables:
try:
result = conn.execute(text(f"SELECT COUNT(*) FROM {table}"))
count = result.fetchone()[0]
audit_results['table_sizes'][table] = count
except Exception as e:
logger.warning(f"Could not get size for {table}: {e}")
# Generate recommendations based on data
total_cases = audit_results['table_sizes'].get('cases', 0)
if total_cases > 500000:
audit_results['recommendations'].append("Consider partitioning the cases table by month")
if total_cases > 100000:
audit_results['recommendations'].append("Implement query result caching for case listings")
total_activities = audit_results['table_sizes'].get('case_activities', 0)
if total_activities > 1000000:
audit_results['recommendations'].append("Archive old case activities to separate table")
logger.info(f"Performance audit complete. Found {len(audit_results['recommendations'])} recommendations")
return audit_results
def optimize_database(engine: Engine) -> None:
"""Main function to optimize database performance"""
optimizer = DatabaseOptimizer(engine)
try:
# Run audit first
audit_results = optimizer.run_performance_audit()
logger.info(f"Audit results: {audit_results}")
# Apply optimizations
optimizer.create_performance_indexes()
optimizer.optimize_table_settings()
optimizer.create_materialized_views()
optimizer.create_partitioning_strategy()
logger.info("Database optimization completed successfully")
except Exception as e:
logger.error(f"Database optimization failed: {e}")
raise
if __name__ == "__main__":
# For standalone execution
from core.database import engine
optimize_database(engine)
|