Spaces:
Sleeping
Sleeping
| """ | |
| FocusTrack - Database Layer | |
| SQLite-based local storage. Zero cloud, zero accounts. | |
| """ | |
| import sqlite3 | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime, date | |
| from typing import Optional, List, Dict, Any | |
| import threading | |
| logger = logging.getLogger("focustrack.database") | |
| DB_PATH = Path(__file__).parent / "data" / "activity.db" | |
| class Database: | |
| """Thread-safe SQLite database manager for FocusTrack.""" | |
| def __init__(self, db_path: Path = DB_PATH): | |
| self.db_path = db_path | |
| self.db_path.parent.mkdir(exist_ok=True) | |
| self._local = threading.local() | |
| def _get_conn(self) -> sqlite3.Connection: | |
| """Get thread-local database connection.""" | |
| if not hasattr(self._local, "conn") or self._local.conn is None: | |
| self._local.conn = sqlite3.connect( | |
| str(self.db_path), | |
| detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, | |
| check_same_thread=False, | |
| ) | |
| self._local.conn.row_factory = sqlite3.Row | |
| self._local.conn.execute("PRAGMA journal_mode=WAL") | |
| self._local.conn.execute("PRAGMA synchronous=NORMAL") | |
| return self._local.conn | |
| def conn(self) -> sqlite3.Connection: | |
| return self._get_conn() | |
| def initialize(self): | |
| """Create tables and default data.""" | |
| self.conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS activities ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp DATETIME NOT NULL, | |
| app_name TEXT NOT NULL, | |
| window_title TEXT NOT NULL, | |
| duration_seconds INTEGER DEFAULT 0, | |
| category TEXT DEFAULT 'uncategorized', | |
| is_idle BOOLEAN DEFAULT 0 | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_activities_timestamp | |
| ON activities(timestamp); | |
| CREATE INDEX IF NOT EXISTS idx_activities_app | |
| ON activities(app_name); | |
| CREATE TABLE IF NOT EXISTS settings ( | |
| key TEXT PRIMARY KEY, | |
| value TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS categories ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT UNIQUE NOT NULL, | |
| color TEXT DEFAULT '#6366f1', | |
| keywords TEXT DEFAULT '[]', | |
| apps TEXT DEFAULT '[]' | |
| ); | |
| """) | |
| self.conn.commit() | |
| self._seed_defaults() | |
| logger.info(f"Database ready at {self.db_path}") | |
| def _seed_defaults(self): | |
| """Insert default settings and categories if not present.""" | |
| defaults = { | |
| "idle_threshold_seconds": "300", | |
| "heartbeat_interval": "5", | |
| "theme": "dark", | |
| "auto_start": "false", | |
| "ignored_apps": '["Finder", "explorer.exe"]', | |
| "tracker_running": "true", | |
| } | |
| for key, value in defaults.items(): | |
| self.conn.execute( | |
| "INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)", | |
| (key, value) | |
| ) | |
| default_categories = [ | |
| ("coding", "#22d3ee", '["code", "terminal", "vim", "emacs", "debug"]', '["code", "cursor", "vscode", "pycharm", "intellij", "xcode", "terminal", "iterm", "warp", "alacritty", "sublime", "atom", "neovim"]'), | |
| ("browsing", "#f59e0b", '["http", "www", "chrome", "firefox", "safari"]', '["chrome", "firefox", "safari", "edge", "brave", "opera"]'), | |
| ("communication", "#10b981", '["mail", "slack", "teams", "discord", "zoom"]', '["slack", "discord", "teams", "zoom", "meet", "mail", "outlook", "telegram", "whatsapp"]'), | |
| ("design", "#a78bfa", '["figma", "sketch", "photoshop", "illustrator"]','["figma", "sketch", "photoshop", "illustrator", "xd", "canva", "affinity"]'), | |
| ("documents", "#fb923c", '["word", "excel", "sheets", "docs", "notion"]', '["word", "excel", "notion", "obsidian", "pages", "numbers", "libreoffice"]'), | |
| ("media", "#f43f5e", '["youtube", "spotify", "vlc", "netflix"]', '["spotify", "vlc", "quicktime", "itunes", "music"]'), | |
| ("system", "#64748b", '["settings", "finder", "explorer"]', '["finder", "explorer", "systempreferences", "taskmanager"]'), | |
| ("idle", "#374151", '[]', '[]'), | |
| ] | |
| for cat in default_categories: | |
| self.conn.execute( | |
| "INSERT OR IGNORE INTO categories (name, color, keywords, apps) VALUES (?,?,?,?)", | |
| cat | |
| ) | |
| self.conn.commit() | |
| # βββ Activity CRUD βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_activity(self, timestamp: datetime, app_name: str, window_title: str, | |
| duration_seconds: int, category: str, is_idle: bool): | |
| self.conn.execute( | |
| """INSERT INTO activities | |
| (timestamp, app_name, window_title, duration_seconds, category, is_idle) | |
| VALUES (?,?,?,?,?,?)""", | |
| (timestamp.isoformat(), app_name, window_title, | |
| duration_seconds, category, int(is_idle)) | |
| ) | |
| self.conn.commit() | |
| def get_activities(self, start: datetime, end: datetime, | |
| search: str = "", limit: int = 500, offset: int = 0) -> List[Dict]: | |
| query = """ | |
| SELECT * FROM activities | |
| WHERE timestamp BETWEEN ? AND ? | |
| """ | |
| params: list = [start.isoformat(), end.isoformat()] | |
| if search: | |
| query += " AND (app_name LIKE ? OR window_title LIKE ? OR category LIKE ?)" | |
| params += [f"%{search}%", f"%{search}%", f"%{search}%"] | |
| query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" | |
| params += [limit, offset] | |
| rows = self.conn.execute(query, params).fetchall() | |
| return [dict(r) for r in rows] | |
| def get_activity_count(self, start: datetime, end: datetime, search: str = "") -> int: | |
| query = "SELECT COUNT(*) FROM activities WHERE timestamp BETWEEN ? AND ?" | |
| params: list = [start.isoformat(), end.isoformat()] | |
| if search: | |
| query += " AND (app_name LIKE ? OR window_title LIKE ? OR category LIKE ?)" | |
| params += [f"%{search}%", f"%{search}%", f"%{search}%"] | |
| return self.conn.execute(query, params).fetchone()[0] | |
| def get_summary(self, start: datetime, end: datetime) -> Dict[str, Any]: | |
| """Aggregate stats for dashboard cards.""" | |
| row = self.conn.execute(""" | |
| SELECT | |
| SUM(CASE WHEN is_idle=0 THEN duration_seconds ELSE 0 END) as focus_seconds, | |
| SUM(CASE WHEN is_idle=1 THEN duration_seconds ELSE 0 END) as idle_seconds, | |
| SUM(duration_seconds) as total_seconds, | |
| COUNT(DISTINCT app_name) as unique_apps | |
| FROM activities | |
| WHERE timestamp BETWEEN ? AND ? | |
| """, (start.isoformat(), end.isoformat())).fetchone() | |
| return dict(row) if row else {} | |
| def get_by_category(self, start: datetime, end: datetime) -> List[Dict]: | |
| rows = self.conn.execute(""" | |
| SELECT category, | |
| SUM(duration_seconds) as total_seconds, | |
| COUNT(*) as sessions | |
| FROM activities | |
| WHERE timestamp BETWEEN ? AND ? AND is_idle=0 | |
| GROUP BY category | |
| ORDER BY total_seconds DESC | |
| """, (start.isoformat(), end.isoformat())).fetchall() | |
| return [dict(r) for r in rows] | |
| def get_by_app(self, start: datetime, end: datetime, limit: int = 10) -> List[Dict]: | |
| rows = self.conn.execute(""" | |
| SELECT app_name, category, | |
| SUM(duration_seconds) as total_seconds, | |
| COUNT(*) as sessions | |
| FROM activities | |
| WHERE timestamp BETWEEN ? AND ? AND is_idle=0 | |
| GROUP BY app_name | |
| ORDER BY total_seconds DESC | |
| LIMIT ? | |
| """, (start.isoformat(), end.isoformat(), limit)).fetchall() | |
| return [dict(r) for r in rows] | |
| def get_hourly_timeline(self, target_date: date) -> List[Dict]: | |
| start = datetime.combine(target_date, datetime.min.time()) | |
| end = datetime.combine(target_date, datetime.max.time()) | |
| rows = self.conn.execute(""" | |
| SELECT strftime('%H', timestamp) as hour, | |
| category, | |
| SUM(duration_seconds) as total_seconds | |
| FROM activities | |
| WHERE timestamp BETWEEN ? AND ? AND is_idle=0 | |
| GROUP BY hour, category | |
| ORDER BY hour | |
| """, (start.isoformat(), end.isoformat())).fetchall() | |
| return [dict(r) for r in rows] | |
| def get_daily_totals(self, start: datetime, end: datetime) -> List[Dict]: | |
| rows = self.conn.execute(""" | |
| SELECT date(timestamp) as day, | |
| SUM(CASE WHEN is_idle=0 THEN duration_seconds ELSE 0 END) as focus_seconds, | |
| SUM(CASE WHEN is_idle=1 THEN duration_seconds ELSE 0 END) as idle_seconds | |
| FROM activities | |
| WHERE timestamp BETWEEN ? AND ? | |
| GROUP BY day | |
| ORDER BY day | |
| """, (start.isoformat(), end.isoformat())).fetchall() | |
| return [dict(r) for r in rows] | |
| def get_latest_activity(self) -> Optional[Dict]: | |
| row = self.conn.execute( | |
| "SELECT * FROM activities ORDER BY timestamp DESC LIMIT 1" | |
| ).fetchone() | |
| return dict(row) if row else None | |
| # βββ Settings ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_setting(self, key: str, default: Any = None) -> Any: | |
| row = self.conn.execute( | |
| "SELECT value FROM settings WHERE key=?", (key,) | |
| ).fetchone() | |
| return row[0] if row else default | |
| def set_setting(self, key: str, value: Any): | |
| self.conn.execute( | |
| "INSERT OR REPLACE INTO settings (key, value) VALUES (?,?)", | |
| (key, str(value)) | |
| ) | |
| self.conn.commit() | |
| def get_all_settings(self) -> Dict[str, str]: | |
| rows = self.conn.execute("SELECT key, value FROM settings").fetchall() | |
| return {r[0]: r[1] for r in rows} | |
| # βββ Categories ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_categories(self) -> List[Dict]: | |
| rows = self.conn.execute("SELECT * FROM categories ORDER BY name").fetchall() | |
| return [dict(r) for r in rows] | |
| def upsert_category(self, name: str, color: str, keywords: list, apps: list): | |
| self.conn.execute(""" | |
| INSERT OR REPLACE INTO categories (name, color, keywords, apps) | |
| VALUES (?,?,?,?) | |
| """, (name, color, json.dumps(keywords), json.dumps(apps))) | |
| self.conn.commit() | |
| def delete_category(self, name: str): | |
| self.conn.execute("DELETE FROM categories WHERE name=?", (name,)) | |
| self.conn.commit() | |
| # βββ Maintenance βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def clear_all_data(self): | |
| self.conn.execute("DELETE FROM activities") | |
| self.conn.commit() | |
| logger.warning("All activity data cleared") | |
| def get_db_size_mb(self) -> float: | |
| try: | |
| return self.db_path.stat().st_size / (1024 * 1024) | |
| except Exception: | |
| return 0.0 | |