import sqlite3 import os import logging logger = logging.getLogger(__name__) class Database: """Base class for handling SQLite database connections.""" def __init__(self, db_path: str): self.db_path = db_path self._ensure_dir() self._memory_conn = None if self.db_path == ":memory:": self._memory_conn = sqlite3.connect(self.db_path) self._memory_conn.row_factory = sqlite3.Row def _ensure_dir(self): if self.db_path == ":memory:": return db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) def get_connection(self) -> sqlite3.Connection: if self._memory_conn: return self._memory_conn conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def initialize_schema(self): """Initializes all database tables.""" with self.get_connection() as conn: # Runner Profile conn.execute( """ CREATE TABLE IF NOT EXISTS runner_profiles ( runner_id TEXT PRIMARY KEY, runner_display_name TEXT, age INTEGER, sex TEXT, experience_level TEXT, baseline_weekly_km REAL, gender TEXT, injury_history_notes TEXT, created_at TEXT, updated_at TEXT ) """ ) # Individual Runs conn.execute( """ CREATE TABLE IF NOT EXISTS runs ( id TEXT PRIMARY KEY, runner_id TEXT NOT NULL, start_time TEXT NOT NULL, total_distance_m REAL, total_time_sec REAL, avg_hr REAL, max_hr REAL, avg_pace_sec_per_km REAL, source_file TEXT, json_data TEXT ) """ ) # Weekly Snapshots conn.execute( """ CREATE TABLE IF NOT EXISTS weekly_snapshots ( id TEXT PRIMARY KEY, runner_id TEXT NOT NULL, week_start_date TEXT NOT NULL UNIQUE, total_distance_km REAL, avg_pace_sec_per_km REAL, avg_hr REAL, max_hr REAL, total_time_sec REAL, run_count INTEGER, consistency_score REAL, performance_brief TEXT, performance_focus TEXT, brief_generated_at TEXT, brief_source_hash TEXT, structure_status TEXT, created_at TEXT ) """ ) # Trend Snapshots conn.execute( """ CREATE TABLE IF NOT EXISTS trend_snapshots ( id TEXT PRIMARY KEY, runner_id TEXT NOT NULL, week_start_date TEXT NOT NULL, comparison_type TEXT NOT NULL, reference_week_start_date TEXT, distance_delta_km REAL, avg_pace_delta_s_per_km REAL, avg_hr_delta REAL, runs_count_delta INTEGER, consistency_delta REAL, engine_version TEXT NOT NULL, computed_at TEXT NOT NULL, UNIQUE(runner_id, week_start_date, comparison_type) ) """ ) # Goals conn.execute( """ CREATE TABLE IF NOT EXISTS goals ( id TEXT PRIMARY KEY, runner_id TEXT NOT NULL, type TEXT NOT NULL, target_value REAL NOT NULL, unit TEXT NOT NULL, target_date TEXT, status TEXT NOT NULL, created_at TEXT NOT NULL, achieved_at TEXT ) """ ) conn.commit() # Analyses conn.execute( """ CREATE TABLE IF NOT EXISTS analyses ( id TEXT PRIMARY KEY, created_at TEXT NOT NULL, source_files TEXT, formats TEXT, run_summary TEXT, run_timeseries TEXT, insights_json TEXT, plan_json TEXT, route_json TEXT ) """ ) conn.commit() # Planned Sessions conn.execute( """ CREATE TABLE IF NOT EXISTS planned_sessions ( id TEXT PRIMARY KEY, runner_id TEXT NOT NULL, week_start_date TEXT NOT NULL, session_type TEXT NOT NULL, planned_date TEXT NOT NULL, target_distance_km REAL NOT NULL, completed_run_id TEXT, created_at TEXT NOT NULL ) """ ) conn.commit() # Runs conn.execute( """ CREATE TABLE IF NOT EXISTS runs ( id TEXT PRIMARY KEY, runner_id TEXT NOT NULL, start_time TEXT NOT NULL, total_distance_m REAL, total_time_sec REAL, avg_hr REAL, max_hr REAL, avg_pace_sec_per_km REAL, source_file TEXT, json_data TEXT ) """ ) conn.commit() # Run migrations for existing tables (DB Tables updated) self._migrate_schema() def _migrate_schema(self): """Adds missing columns to existing tables.""" with self.get_connection() as conn: # Runner Profiles migrations cursor = conn.execute("PRAGMA table_info(runner_profiles)") columns = [row["name"] for row in cursor.fetchall()] if "gender" not in columns: conn.execute("ALTER TABLE runner_profiles ADD COLUMN gender TEXT") if "baseline_weekly_km" not in columns: conn.execute("ALTER TABLE runner_profiles ADD COLUMN baseline_weekly_km REAL") # Weekly Snapshots migrations cursor = conn.execute("PRAGMA table_info(weekly_snapshots)") columns = [row["name"] for row in cursor.fetchall()] if "runner_id" not in columns: # SQLite doesn't support ADD COLUMN with NOT NULL without default or being empty. # Since this is a migration, we'll allow it as nullable first or provide a default. conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN runner_id TEXT") if "performance_brief" not in columns: conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN performance_brief TEXT") if "performance_focus" not in columns: conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN performance_focus TEXT") if "brief_generated_at" not in columns: conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN brief_generated_at TEXT") if "brief_source_hash" not in columns: conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN brief_source_hash TEXT") if "structure_status" not in columns: conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN structure_status TEXT") # Analyses Table migrations (Legacy cleanup) cursor = conn.execute("PRAGMA table_info(analyses)") columns = [row["name"] for row in cursor.fetchall()] renames = { "source_files_json": "source_files", "formats_json": "formats", "run_summary_json": "run_summary", "run_timeseries_json": "run_timeseries", } for old_col, new_col in renames.items(): if old_col in columns and new_col not in columns: logger.info(f"Migrating column {old_col} to {new_col} in analyses table") conn.execute(f"ALTER TABLE analyses RENAME COLUMN {old_col} TO {new_col}") # Goals Table conn.execute( """ CREATE TABLE IF NOT EXISTS goals ( id TEXT PRIMARY KEY, runner_id TEXT NOT NULL, type TEXT NOT NULL, target_value REAL NOT NULL, unit TEXT NOT NULL, target_date TEXT, status TEXT NOT NULL, created_at TEXT NOT NULL, achieved_at TEXT ) """ ) # Planned Sessions Table conn.execute( """ CREATE TABLE IF NOT EXISTS planned_sessions ( id TEXT PRIMARY KEY, runner_id TEXT NOT NULL, week_start_date TEXT NOT NULL, session_type TEXT NOT NULL, planned_date TEXT NOT NULL, target_distance_km REAL NOT NULL, completed_run_id TEXT, created_at TEXT NOT NULL ) """ ) conn.commit()