avfranco's picture
HF Space deploy snapshot (minimal allow-list)
557ee65
import sqlite3
import os
import logging
logger = logging.getLogger(__name__)
class Database:
"""Base class for handling SQLite database connections."""
def __init__(self, db_path: str):
self.db_path = db_path
self._ensure_dir()
self._memory_conn = None
if self.db_path == ":memory:":
self._memory_conn = sqlite3.connect(self.db_path)
self._memory_conn.row_factory = sqlite3.Row
def _ensure_dir(self):
if self.db_path == ":memory:":
return
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
def get_connection(self) -> sqlite3.Connection:
if self._memory_conn:
return self._memory_conn
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def initialize_schema(self):
"""Initializes all database tables."""
with self.get_connection() as conn:
# Runner Profile
conn.execute(
"""
CREATE TABLE IF NOT EXISTS runner_profiles (
runner_id TEXT PRIMARY KEY,
runner_display_name TEXT,
age INTEGER,
sex TEXT,
experience_level TEXT,
baseline_weekly_km REAL,
gender TEXT,
injury_history_notes TEXT,
created_at TEXT,
updated_at TEXT
)
"""
)
# Individual Runs
conn.execute(
"""
CREATE TABLE IF NOT EXISTS runs (
id TEXT PRIMARY KEY,
runner_id TEXT NOT NULL,
start_time TEXT NOT NULL,
total_distance_m REAL,
total_time_sec REAL,
avg_hr REAL,
max_hr REAL,
avg_pace_sec_per_km REAL,
source_file TEXT,
json_data TEXT
)
"""
)
# Weekly Snapshots
conn.execute(
"""
CREATE TABLE IF NOT EXISTS weekly_snapshots (
id TEXT PRIMARY KEY,
runner_id TEXT NOT NULL,
week_start_date TEXT NOT NULL UNIQUE,
total_distance_km REAL,
avg_pace_sec_per_km REAL,
avg_hr REAL,
max_hr REAL,
total_time_sec REAL,
run_count INTEGER,
consistency_score REAL,
performance_brief TEXT,
performance_focus TEXT,
brief_generated_at TEXT,
brief_source_hash TEXT,
structure_status TEXT,
created_at TEXT
)
"""
)
# Trend Snapshots
conn.execute(
"""
CREATE TABLE IF NOT EXISTS trend_snapshots (
id TEXT PRIMARY KEY,
runner_id TEXT NOT NULL,
week_start_date TEXT NOT NULL,
comparison_type TEXT NOT NULL,
reference_week_start_date TEXT,
distance_delta_km REAL,
avg_pace_delta_s_per_km REAL,
avg_hr_delta REAL,
runs_count_delta INTEGER,
consistency_delta REAL,
engine_version TEXT NOT NULL,
computed_at TEXT NOT NULL,
UNIQUE(runner_id, week_start_date, comparison_type)
)
"""
)
# Goals
conn.execute(
"""
CREATE TABLE IF NOT EXISTS goals (
id TEXT PRIMARY KEY,
runner_id TEXT NOT NULL,
type TEXT NOT NULL,
target_value REAL NOT NULL,
unit TEXT NOT NULL,
target_date TEXT,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
achieved_at TEXT
)
"""
)
conn.commit()
# Analyses
conn.execute(
"""
CREATE TABLE IF NOT EXISTS analyses (
id TEXT PRIMARY KEY,
created_at TEXT NOT NULL,
source_files TEXT,
formats TEXT,
run_summary TEXT,
run_timeseries TEXT,
insights_json TEXT,
plan_json TEXT,
route_json TEXT
)
"""
)
conn.commit()
# Planned Sessions
conn.execute(
"""
CREATE TABLE IF NOT EXISTS planned_sessions (
id TEXT PRIMARY KEY,
runner_id TEXT NOT NULL,
week_start_date TEXT NOT NULL,
session_type TEXT NOT NULL,
planned_date TEXT NOT NULL,
target_distance_km REAL NOT NULL,
completed_run_id TEXT,
created_at TEXT NOT NULL
)
"""
)
conn.commit()
# Runs
conn.execute(
"""
CREATE TABLE IF NOT EXISTS runs (
id TEXT PRIMARY KEY,
runner_id TEXT NOT NULL,
start_time TEXT NOT NULL,
total_distance_m REAL,
total_time_sec REAL,
avg_hr REAL,
max_hr REAL,
avg_pace_sec_per_km REAL,
source_file TEXT,
json_data TEXT
)
"""
)
conn.commit()
# Run migrations for existing tables (DB Tables updated)
self._migrate_schema()
def _migrate_schema(self):
"""Adds missing columns to existing tables."""
with self.get_connection() as conn:
# Runner Profiles migrations
cursor = conn.execute("PRAGMA table_info(runner_profiles)")
columns = [row["name"] for row in cursor.fetchall()]
if "gender" not in columns:
conn.execute("ALTER TABLE runner_profiles ADD COLUMN gender TEXT")
if "baseline_weekly_km" not in columns:
conn.execute("ALTER TABLE runner_profiles ADD COLUMN baseline_weekly_km REAL")
# Weekly Snapshots migrations
cursor = conn.execute("PRAGMA table_info(weekly_snapshots)")
columns = [row["name"] for row in cursor.fetchall()]
if "runner_id" not in columns:
# SQLite doesn't support ADD COLUMN with NOT NULL without default or being empty.
# Since this is a migration, we'll allow it as nullable first or provide a default.
conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN runner_id TEXT")
if "performance_brief" not in columns:
conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN performance_brief TEXT")
if "performance_focus" not in columns:
conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN performance_focus TEXT")
if "brief_generated_at" not in columns:
conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN brief_generated_at TEXT")
if "brief_source_hash" not in columns:
conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN brief_source_hash TEXT")
if "structure_status" not in columns:
conn.execute("ALTER TABLE weekly_snapshots ADD COLUMN structure_status TEXT")
# Analyses Table migrations (Legacy cleanup)
cursor = conn.execute("PRAGMA table_info(analyses)")
columns = [row["name"] for row in cursor.fetchall()]
renames = {
"source_files_json": "source_files",
"formats_json": "formats",
"run_summary_json": "run_summary",
"run_timeseries_json": "run_timeseries",
}
for old_col, new_col in renames.items():
if old_col in columns and new_col not in columns:
logger.info(f"Migrating column {old_col} to {new_col} in analyses table")
conn.execute(f"ALTER TABLE analyses RENAME COLUMN {old_col} TO {new_col}")
# Goals Table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS goals (
id TEXT PRIMARY KEY,
runner_id TEXT NOT NULL,
type TEXT NOT NULL,
target_value REAL NOT NULL,
unit TEXT NOT NULL,
target_date TEXT,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
achieved_at TEXT
)
"""
)
# Planned Sessions Table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS planned_sessions (
id TEXT PRIMARY KEY,
runner_id TEXT NOT NULL,
week_start_date TEXT NOT NULL,
session_type TEXT NOT NULL,
planned_date TEXT NOT NULL,
target_distance_km REAL NOT NULL,
completed_run_id TEXT,
created_at TEXT NOT NULL
)
"""
)
conn.commit()