from __future__ import annotations import base64 import io import json import os from datetime import datetime from typing import Any import difflib import matplotlib.pyplot as plt class ELOSystem: """ELO rating system for comparing playbook versions.""" def __init__(self, k_factor: int = 32, initial_elo: float = 1000.0) -> None: """Initialize ELO system with k-factor and starting rating.""" self.k_factor = k_factor self.initial_elo = initial_elo def calculate_expected_score(self, elo_a: float, elo_b: float) -> float: """Calculate expected score for player A against player B.""" return 1 / (1 + 10 ** ((elo_b - elo_a) / 400)) def update_elo(self, winner_elo: float, loser_elo: float) -> tuple[float, float]: """Update ELO ratings after a match and return new ratings.""" expected_winner = self.calculate_expected_score(winner_elo, loser_elo) expected_loser = self.calculate_expected_score(loser_elo, winner_elo) new_winner_elo = winner_elo + self.k_factor * (1 - expected_winner) new_loser_elo = loser_elo + self.k_factor * (0 - expected_loser) return round(new_winner_elo, 2), round(new_loser_elo, 2) def plot_elo_curve(history: list[dict]) -> str: """Generate ELO rating curve plot and return as base64 PNG string.""" plt.style.use('dark_background') fig, ax = plt.subplots(figsize=(10, 6), facecolor='#1a1a2e') ax.set_facecolor('#1a1a2e') versions = [h["version"] for h in history] elos = [h["elo"] for h in history] ax.plot(versions, elos, color='#00ff88', linewidth=2, marker='o', markersize=8) ax.set_xlabel('Version', color='white', fontsize=12) ax.set_ylabel('ELO Rating', color='white', fontsize=12) ax.set_title('Playbook ELO Rating Over Time', color='white', fontsize=14, fontweight='bold') ax.tick_params(axis='x', colors='white', rotation=45) ax.tick_params(axis='y', colors='white') ax.grid(True, color='#333333', linestyle='--', alpha=0.5) for i, (version, elo) in enumerate(zip(versions, elos)): ax.annotate(f'{elo:.0f}', (i, elo), textcoords="offset points", xytext=(0, 10), ha='center', color='white', fontsize=9) plt.tight_layout() buf = io.BytesIO() plt.savefig(buf, format='png', facecolor='#1a1a2e', dpi=100) buf.seek(0) img_base64 = base64.b64encode(buf.read()).decode('utf-8') plt.close() return img_base64 DEFAULT_PLAYBOOK = """# DBRE Diagnostic Playbook v1 ## Priority Order: 1. Check EXPLAIN ANALYZE for sequential scans → always add index if found 2. Check for N+1 patterns → rewrite as JOIN 3. Check join order → put smallest table first 4. Check for SELECT * → replace with specific columns 5. Check for functions on indexed columns → rewrite to use index 6. Verify correctness by comparing row count with original """ class PlaybookManager: def __init__(self, storage_path: str = "./playbook_versions/") -> None: self.storage_path = storage_path self.current_playbook: str = DEFAULT_PLAYBOOK self._ensure_storage_exists() self._load_current_if_exists() def _ensure_storage_exists(self) -> None: if not os.path.exists(self.storage_path): os.makedirs(self.storage_path, exist_ok=True) def _load_current_if_exists(self) -> None: current_file = os.path.join(self.storage_path, "current.md") if os.path.exists(current_file): with open(current_file, "r", encoding="utf-8") as f: self.current_playbook = f.read() def _save_current(self) -> None: current_file = os.path.join(self.storage_path, "current.md") with open(current_file, "w", encoding="utf-8") as f: f.write(self.current_playbook) def _get_next_version_number(self) -> int: history = self.get_version_history() if not history: return 1 return max(v["version_number"] for v in history) + 1 def get_current(self) -> str: return self.current_playbook def apply_diff(self, diff_text: str) -> str: old_playbook = self.current_playbook old_lines = old_playbook.splitlines(keepends=True) if not old_lines or not old_lines[-1].endswith('\n'): if old_lines: old_lines[-1] += '\n' else: old_lines = ['\n'] patched_lines = list(old_lines) diff_lines = diff_text.splitlines() i = 0 while i < len(diff_lines): line = diff_lines[i] if line.startswith('@@'): parts = line.split() old_range = parts[1][1:] if ',' in old_range: start_line, count = map(int, old_range.split(',')) else: start_line = int(old_range) count = 1 start_idx = start_line - 1 old_content = [] new_content = [] i += 1 while i < len(diff_lines) and not diff_lines[i].startswith('@@'): dl = diff_lines[i] if dl.startswith('-'): old_content.append(dl[1:] + '\n') elif dl.startswith('+'): new_content.append(dl[1:] + '\n') elif dl.startswith(' '): old_content.append(dl[1:] + '\n') new_content.append(dl[1:] + '\n') elif dl == '\\ No newline at end of file': pass i += 1 if start_idx < len(patched_lines) and count > 0: end_idx = min(start_idx + count, len(patched_lines)) patched_lines = patched_lines[:start_idx] + new_content + patched_lines[end_idx:] continue i += 1 new_playbook = ''.join(patched_lines) self.current_playbook = new_playbook self._save_current() return new_playbook def archive_version(self, version_number: int, content: str, elo_score: float) -> None: version_file = os.path.join(self.storage_path, f"v{version_number}.md") with open(version_file, "w", encoding="utf-8") as f: f.write(content) metadata_file = os.path.join(self.storage_path, "versions.json") metadata = [] if os.path.exists(metadata_file): with open(metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) metadata.append({ "version_number": version_number, "timestamp": datetime.now().isoformat(), "elo_score": elo_score, "filename": f"v{version_number}.md" }) with open(metadata_file, "w", encoding="utf-8") as f: json.dump(metadata, f, indent=2) def get_version_history(self) -> list[dict[str, Any]]: metadata_file = os.path.join(self.storage_path, "versions.json") if os.path.exists(metadata_file): with open(metadata_file, "r", encoding="utf-8") as f: return json.load(f) return [] def revert_to_version(self, version_number: int) -> None: version_file = os.path.join(self.storage_path, f"v{version_number}.md") if os.path.exists(version_file): with open(version_file, "r", encoding="utf-8") as f: content = f.read() self.current_playbook = content self._save_current()