Spaces:
Sleeping
Sleeping
| """ | |
| FocusTrack - Activity Tracker Engine | |
| Detects active window, tracks idle time, logs heartbeats. | |
| Cross-platform: Windows / macOS / Linux | |
| """ | |
| import time | |
| import json | |
| import logging | |
| import platform | |
| import threading | |
| from datetime import datetime | |
| from typing import Optional, Tuple | |
| from pathlib import Path | |
| logger = logging.getLogger("focustrack.tracker") | |
| SYSTEM = platform.system() # 'Windows', 'Darwin', 'Linux' | |
| # βββ Window Detection ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_active_window() -> Tuple[str, str]: | |
| """ | |
| Returns (app_name, window_title) for the currently focused window. | |
| Cross-platform with graceful fallbacks. | |
| """ | |
| try: | |
| if SYSTEM == "Windows": | |
| return _get_window_windows() | |
| elif SYSTEM == "Darwin": | |
| return _get_window_macos() | |
| else: | |
| return _get_window_linux() | |
| except Exception as e: | |
| logger.debug(f"Window detection error: {e}") | |
| return ("unknown", "unknown") | |
| def _get_window_windows() -> Tuple[str, str]: | |
| import ctypes | |
| import ctypes.wintypes | |
| user32 = ctypes.windll.user32 | |
| hwnd = user32.GetForegroundWindow() | |
| length = user32.GetWindowTextLengthW(hwnd) | |
| buf = ctypes.create_unicode_buffer(length + 1) | |
| user32.GetWindowTextW(hwnd, buf, length + 1) | |
| title = buf.value or "unknown" | |
| # Get process name | |
| pid = ctypes.wintypes.DWORD() | |
| user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) | |
| try: | |
| import psutil | |
| proc = psutil.Process(pid.value) | |
| app = proc.name().replace(".exe", "") | |
| except Exception: | |
| app = "unknown" | |
| return (app, title) | |
| def _get_window_macos() -> Tuple[str, str]: | |
| try: | |
| from AppKit import NSWorkspace | |
| ws = NSWorkspace.sharedWorkspace() | |
| app = ws.activeApplication() | |
| app_name = app.get("NSApplicationName", "unknown") if app else "unknown" | |
| title = "unknown" | |
| try: | |
| import subprocess | |
| script = 'tell application "System Events" to get name of first window of (first process whose frontmost is true)' | |
| result = subprocess.run( | |
| ["osascript", "-e", script], capture_output=True, text=True, timeout=2 | |
| ) | |
| if result.returncode == 0: | |
| title = result.stdout.strip() | |
| except Exception: | |
| pass | |
| return (app_name, title) | |
| except ImportError: | |
| # Fallback via subprocess | |
| try: | |
| import subprocess | |
| script = """ | |
| tell application "System Events" | |
| set frontApp to name of first application process whose frontmost is true | |
| set frontTitle to "" | |
| try | |
| set frontTitle to name of front window of (first process whose frontmost is true) | |
| end try | |
| return frontApp & "|" & frontTitle | |
| end tell | |
| """ | |
| result = subprocess.run( | |
| ["osascript", "-e", script], capture_output=True, text=True, timeout=3 | |
| ) | |
| if result.returncode == 0: | |
| parts = result.stdout.strip().split("|", 1) | |
| return (parts[0], parts[1] if len(parts) > 1 else "") | |
| except Exception: | |
| pass | |
| return ("unknown", "unknown") | |
| def _get_window_linux() -> Tuple[str, str]: | |
| try: | |
| import subprocess | |
| # Try xdotool | |
| wid = subprocess.run( | |
| ["xdotool", "getactivewindow"], capture_output=True, text=True, timeout=2 | |
| ) | |
| if wid.returncode == 0: | |
| wid_val = wid.stdout.strip() | |
| name = subprocess.run( | |
| ["xdotool", "getwindowname", wid_val], | |
| capture_output=True, text=True, timeout=2 | |
| ) | |
| pid_result = subprocess.run( | |
| ["xdotool", "getwindowpid", wid_val], | |
| capture_output=True, text=True, timeout=2 | |
| ) | |
| title = name.stdout.strip() if name.returncode == 0 else "unknown" | |
| app = "unknown" | |
| if pid_result.returncode == 0: | |
| try: | |
| import psutil | |
| proc = psutil.Process(int(pid_result.stdout.strip())) | |
| app = proc.name() | |
| except Exception: | |
| pass | |
| return (app, title) | |
| except FileNotFoundError: | |
| pass | |
| try: | |
| # Fallback: wmctrl | |
| import subprocess | |
| result = subprocess.run( | |
| ["wmctrl", "-a", ":ACTIVE:"], capture_output=True, text=True | |
| ) | |
| except Exception: | |
| pass | |
| return ("unknown", "unknown") | |
| # βββ Categorizer βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Categorizer: | |
| """Rule-based app categorization using DB-defined rules.""" | |
| def __init__(self, db): | |
| self.db = db | |
| self._cache: dict = {} | |
| self._reload_interval = 60 # seconds | |
| self._last_reload = 0.0 | |
| self._load_rules() | |
| def _load_rules(self): | |
| cats = self.db.get_categories() | |
| self._rules = [] | |
| for cat in cats: | |
| self._rules.append({ | |
| "name": cat["name"], | |
| "keywords": json.loads(cat["keywords"] or "[]"), | |
| "apps": json.loads(cat["apps"] or "[]"), | |
| }) | |
| self._last_reload = time.time() | |
| def categorize(self, app_name: str, window_title: str, is_idle: bool) -> str: | |
| if is_idle: | |
| return "idle" | |
| # Reload rules periodically | |
| if time.time() - self._last_reload > self._reload_interval: | |
| self._load_rules() | |
| app_lower = app_name.lower() | |
| title_lower = window_title.lower() | |
| for rule in self._rules: | |
| if rule["name"] == "idle": | |
| continue | |
| for app_kw in rule["apps"]: | |
| if app_kw.lower() in app_lower: | |
| return rule["name"] | |
| for kw in rule["keywords"]: | |
| if kw.lower() in title_lower or kw.lower() in app_lower: | |
| return rule["name"] | |
| return "uncategorized" | |
| # βββ Idle Detector βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class IdleDetector: | |
| """Tracks mouse + keyboard activity to detect idle state.""" | |
| def __init__(self, threshold_seconds: int = 300): | |
| self.threshold = threshold_seconds | |
| self._last_event = time.time() | |
| self._listener = None | |
| self._running = False | |
| def start(self): | |
| self._running = True | |
| try: | |
| from pynput import mouse, keyboard | |
| def on_activity(*args, **kwargs): | |
| self._last_event = time.time() | |
| self._mouse_listener = mouse.Listener( | |
| on_move=on_activity, on_click=on_activity, on_scroll=on_activity | |
| ) | |
| self._keyboard_listener = keyboard.Listener(on_press=on_activity) | |
| self._mouse_listener.start() | |
| self._keyboard_listener.start() | |
| logger.info("Idle detector started (pynput)") | |
| except Exception as e: | |
| logger.warning(f"pynput unavailable ({e}), idle detection disabled") | |
| def stop(self): | |
| self._running = False | |
| try: | |
| if self._mouse_listener: | |
| self._mouse_listener.stop() | |
| if self._keyboard_listener: | |
| self._keyboard_listener.stop() | |
| except Exception: | |
| pass | |
| def is_idle(self) -> bool: | |
| return (time.time() - self._last_event) > self.threshold | |
| def idle_seconds(self) -> float: | |
| elapsed = time.time() - self._last_event | |
| return max(0.0, elapsed - self.threshold) if self.is_idle else 0.0 | |
| def update_threshold(self, seconds: int): | |
| self.threshold = seconds | |
| # βββ Main Tracker ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ActivityTracker: | |
| """ | |
| Background activity tracking service. | |
| Logs heartbeats every N seconds to SQLite. | |
| """ | |
| def __init__(self, db): | |
| self.db = db | |
| self._running = False | |
| self._paused = False | |
| self._lock = threading.Lock() | |
| idle_threshold = int(db.get_setting("idle_threshold_seconds", 300)) | |
| self.heartbeat_interval = int(db.get_setting("heartbeat_interval", 5)) | |
| self.idle_detector = IdleDetector(idle_threshold) | |
| self.categorizer = Categorizer(db) | |
| # Current session state | |
| self.current_app = "" | |
| self.current_title = "" | |
| self.current_category = "" | |
| self.session_start = datetime.now() | |
| self.is_idle = False | |
| def run(self): | |
| """Main tracking loop. Runs in a background thread.""" | |
| self._running = True | |
| self.idle_detector.start() | |
| logger.info(f"Tracker started (heartbeat: {self.heartbeat_interval}s)") | |
| ignored_raw = self.db.get_setting("ignored_apps", "[]") | |
| try: | |
| ignored_apps = [a.lower() for a in json.loads(ignored_raw)] | |
| except Exception: | |
| ignored_apps = [] | |
| while self._running: | |
| try: | |
| if not self._paused: | |
| self._tick(ignored_apps) | |
| time.sleep(self.heartbeat_interval) | |
| except Exception as e: | |
| logger.error(f"Tracker error: {e}", exc_info=True) | |
| time.sleep(self.heartbeat_interval) | |
| def _tick(self, ignored_apps: list): | |
| """One heartbeat: detect window, log activity.""" | |
| app_name, window_title = get_active_window() | |
| is_idle = self.idle_detector.is_idle | |
| # Skip ignored apps | |
| if any(ig in app_name.lower() for ig in ignored_apps): | |
| return | |
| category = self.categorizer.categorize(app_name, window_title, is_idle) | |
| with self._lock: | |
| self.current_app = app_name | |
| self.current_title = window_title | |
| self.current_category = category | |
| self.is_idle = is_idle | |
| # Check if session changed | |
| if (app_name != self.current_app or | |
| abs((datetime.now() - self.session_start).total_seconds()) > 30): | |
| self.session_start = datetime.now() | |
| self.db.log_activity( | |
| timestamp=datetime.now(), | |
| app_name=app_name, | |
| window_title=window_title, | |
| duration_seconds=self.heartbeat_interval, | |
| category=category, | |
| is_idle=is_idle, | |
| ) | |
| # βββ Controls ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def pause(self): | |
| self._paused = True | |
| self.db.set_setting("tracker_running", "paused") | |
| logger.info("Tracker paused") | |
| def resume(self): | |
| self._paused = False | |
| self.db.set_setting("tracker_running", "true") | |
| logger.info("Tracker resumed") | |
| def stop(self): | |
| self._running = False | |
| self.idle_detector.stop() | |
| self.db.set_setting("tracker_running", "false") | |
| logger.info("Tracker stopped") | |
| def status(self) -> str: | |
| if not self._running: | |
| return "stopped" | |
| if self._paused: | |
| return "paused" | |
| return "running" | |
| def get_current_state(self) -> dict: | |
| with self._lock: | |
| return { | |
| "app": self.current_app, | |
| "title": self.current_title, | |
| "category": self.current_category, | |
| "is_idle": self.is_idle, | |
| "status": self.status, | |
| "session_start": self.session_start.isoformat(), | |
| } | |
| def reload_settings(self): | |
| """Reload settings from DB (called after settings change).""" | |
| idle_threshold = int(self.db.get_setting("idle_threshold_seconds", 300)) | |
| self.heartbeat_interval = int(self.db.get_setting("heartbeat_interval", 5)) | |
| self.idle_detector.update_threshold(idle_threshold) | |
| logger.info("Tracker settings reloaded") | |