autonomic-dbre / dbre /schema_drift.py
ZeroiJ's picture
Initial commit: Autonomic DBRE - Self-Improving Database Reliability Engineer
b59a07e
from __future__ import annotations
import random
from typing import List
import psycopg2
class SchemaDrifter:
"""Applies random schema mutations to simulate production drift."""
def __init__(self, connection):
self.conn = connection
self.applied_drifts: List[str] = []
self._mutation_count = 0
random.seed(42)
def apply_random_drift(self) -> str:
self.conn.rollback() # Clear any failed transaction
mutations = [
self._add_column,
self._drop_column,
self._rename_column,
self._create_index,
self._drop_index,
]
mutation = random.choice(mutations)
drift_description = mutation()
self.applied_drifts.append(drift_description)
print(f"Applied drift: {drift_description}")
return drift_description
def get_schema_diff(self) -> List[str]:
return self.applied_drifts.copy()
def reset(self) -> None:
"""Drop all applied drifts by recreating tables."""
cur = self.conn.cursor()
cur.execute("DROP TABLE IF EXISTS reviews, order_items, orders, products, customers CASCADE;")
self.conn.commit()
cur.close()
self.applied_drifts = []
def _add_column(self) -> str:
tables = ["customers", "products", "orders"]
table = random.choice(tables)
col_name = f"drift_col_{random.randint(1000, 9999)}"
cur = self.conn.cursor()
cur.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {col_name} VARCHAR(255) DEFAULT NULL;")
self.conn.commit()
cur.close()
return f"ADD COLUMN {col_name} TO {table}"
def _drop_column(self) -> str:
safe_drops = {"customers": ["city"], "products": ["stock"], "orders": ["status"]}
table = random.choice(list(safe_drops.keys()))
cols = safe_drops[table]
col = random.choice(cols)
cur = self.conn.cursor()
try:
cur.execute(f"ALTER TABLE {table} DROP COLUMN IF EXISTS {col};")
self.conn.commit()
except Exception:
self.conn.rollback()
cur.close()
return f"DROP COLUMN {col} FROM {table}"
def _rename_column(self) -> str:
tables_cols = {"customers": "city", "products": "category", "orders": "status"}
table = random.choice(list(tables_cols.keys()))
old_col = tables_cols[table]
new_col = f"{old_col}_v{random.randint(100, 999)}"
cur = self.conn.cursor()
try:
cur.execute(f"ALTER TABLE {table} RENAME COLUMN {old_col} TO {new_col};")
self.conn.commit()
except Exception:
self.conn.rollback()
cur.close()
return f"RENAME COLUMN {table}.{old_col} TO {new_col}"
def _create_index(self) -> str:
tables_cols = {"customers": "email", "products": "category", "orders": "customer_id",
"order_items": "order_id", "reviews": "customer_id"}
table = random.choice(list(tables_cols.keys()))
col = tables_cols[table]
idx_name = f"idx_{table}_{col}"
cur = self.conn.cursor()
try:
cur.execute(f"CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({col});")
self.conn.commit()
except Exception:
self.conn.rollback()
cur.close()
return f"CREATE INDEX {idx_name} ON {table}({col})"
def _drop_index(self) -> str:
cur = self.conn.cursor()
cur.execute("SELECT indexname FROM pg_indexes WHERE schemaname='public' AND indexname LIKE 'idx_%';")
indexes = [row[0] for row in cur.fetchall()]
cur.close()
if not indexes:
return self._create_index()
idx = random.choice(indexes)
cur = self.conn.cursor()
try:
cur.execute(f"DROP INDEX IF EXISTS {idx};")
self.conn.commit()
except Exception:
self.conn.rollback()
cur.close()
return f"DROP INDEX {idx}"