Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import random | |
| from typing import List | |
| import psycopg2 | |
| class SchemaDrifter: | |
| """Applies random schema mutations to simulate production drift.""" | |
| def __init__(self, connection): | |
| self.conn = connection | |
| self.applied_drifts: List[str] = [] | |
| self._mutation_count = 0 | |
| random.seed(42) | |
| def apply_random_drift(self) -> str: | |
| self.conn.rollback() # Clear any failed transaction | |
| mutations = [ | |
| self._add_column, | |
| self._drop_column, | |
| self._rename_column, | |
| self._create_index, | |
| self._drop_index, | |
| ] | |
| mutation = random.choice(mutations) | |
| drift_description = mutation() | |
| self.applied_drifts.append(drift_description) | |
| print(f"Applied drift: {drift_description}") | |
| return drift_description | |
| def get_schema_diff(self) -> List[str]: | |
| return self.applied_drifts.copy() | |
| def reset(self) -> None: | |
| """Drop all applied drifts by recreating tables.""" | |
| cur = self.conn.cursor() | |
| cur.execute("DROP TABLE IF EXISTS reviews, order_items, orders, products, customers CASCADE;") | |
| self.conn.commit() | |
| cur.close() | |
| self.applied_drifts = [] | |
| def _add_column(self) -> str: | |
| tables = ["customers", "products", "orders"] | |
| table = random.choice(tables) | |
| col_name = f"drift_col_{random.randint(1000, 9999)}" | |
| cur = self.conn.cursor() | |
| cur.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {col_name} VARCHAR(255) DEFAULT NULL;") | |
| self.conn.commit() | |
| cur.close() | |
| return f"ADD COLUMN {col_name} TO {table}" | |
| def _drop_column(self) -> str: | |
| safe_drops = {"customers": ["city"], "products": ["stock"], "orders": ["status"]} | |
| table = random.choice(list(safe_drops.keys())) | |
| cols = safe_drops[table] | |
| col = random.choice(cols) | |
| cur = self.conn.cursor() | |
| try: | |
| cur.execute(f"ALTER TABLE {table} DROP COLUMN IF EXISTS {col};") | |
| self.conn.commit() | |
| except Exception: | |
| self.conn.rollback() | |
| cur.close() | |
| return f"DROP COLUMN {col} FROM {table}" | |
| def _rename_column(self) -> str: | |
| tables_cols = {"customers": "city", "products": "category", "orders": "status"} | |
| table = random.choice(list(tables_cols.keys())) | |
| old_col = tables_cols[table] | |
| new_col = f"{old_col}_v{random.randint(100, 999)}" | |
| cur = self.conn.cursor() | |
| try: | |
| cur.execute(f"ALTER TABLE {table} RENAME COLUMN {old_col} TO {new_col};") | |
| self.conn.commit() | |
| except Exception: | |
| self.conn.rollback() | |
| cur.close() | |
| return f"RENAME COLUMN {table}.{old_col} TO {new_col}" | |
| def _create_index(self) -> str: | |
| tables_cols = {"customers": "email", "products": "category", "orders": "customer_id", | |
| "order_items": "order_id", "reviews": "customer_id"} | |
| table = random.choice(list(tables_cols.keys())) | |
| col = tables_cols[table] | |
| idx_name = f"idx_{table}_{col}" | |
| cur = self.conn.cursor() | |
| try: | |
| cur.execute(f"CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({col});") | |
| self.conn.commit() | |
| except Exception: | |
| self.conn.rollback() | |
| cur.close() | |
| return f"CREATE INDEX {idx_name} ON {table}({col})" | |
| def _drop_index(self) -> str: | |
| cur = self.conn.cursor() | |
| cur.execute("SELECT indexname FROM pg_indexes WHERE schemaname='public' AND indexname LIKE 'idx_%';") | |
| indexes = [row[0] for row in cur.fetchall()] | |
| cur.close() | |
| if not indexes: | |
| return self._create_index() | |
| idx = random.choice(indexes) | |
| cur = self.conn.cursor() | |
| try: | |
| cur.execute(f"DROP INDEX IF EXISTS {idx};") | |
| self.conn.commit() | |
| except Exception: | |
| self.conn.rollback() | |
| cur.close() | |
| return f"DROP INDEX {idx}" | |