""" FocusTrack - Activity Tracker Engine Detects active window, tracks idle time, logs heartbeats. Cross-platform: Windows / macOS / Linux """ import time import json import logging import platform import threading from datetime import datetime from typing import Optional, Tuple from pathlib import Path logger = logging.getLogger("focustrack.tracker") SYSTEM = platform.system() # 'Windows', 'Darwin', 'Linux' # ─── Window Detection ──────────────────────────────────────────────────────── def get_active_window() -> Tuple[str, str]: """ Returns (app_name, window_title) for the currently focused window. Cross-platform with graceful fallbacks. """ try: if SYSTEM == "Windows": return _get_window_windows() elif SYSTEM == "Darwin": return _get_window_macos() else: return _get_window_linux() except Exception as e: logger.debug(f"Window detection error: {e}") return ("unknown", "unknown") def _get_window_windows() -> Tuple[str, str]: import ctypes import ctypes.wintypes user32 = ctypes.windll.user32 hwnd = user32.GetForegroundWindow() length = user32.GetWindowTextLengthW(hwnd) buf = ctypes.create_unicode_buffer(length + 1) user32.GetWindowTextW(hwnd, buf, length + 1) title = buf.value or "unknown" # Get process name pid = ctypes.wintypes.DWORD() user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) try: import psutil proc = psutil.Process(pid.value) app = proc.name().replace(".exe", "") except Exception: app = "unknown" return (app, title) def _get_window_macos() -> Tuple[str, str]: try: from AppKit import NSWorkspace ws = NSWorkspace.sharedWorkspace() app = ws.activeApplication() app_name = app.get("NSApplicationName", "unknown") if app else "unknown" title = "unknown" try: import subprocess script = 'tell application "System Events" to get name of first window of (first process whose frontmost is true)' result = subprocess.run( ["osascript", "-e", script], capture_output=True, text=True, timeout=2 ) if result.returncode == 0: title = result.stdout.strip() except Exception: pass return (app_name, title) except ImportError: # Fallback via subprocess try: import subprocess script = """ tell application "System Events" set frontApp to name of first application process whose frontmost is true set frontTitle to "" try set frontTitle to name of front window of (first process whose frontmost is true) end try return frontApp & "|" & frontTitle end tell """ result = subprocess.run( ["osascript", "-e", script], capture_output=True, text=True, timeout=3 ) if result.returncode == 0: parts = result.stdout.strip().split("|", 1) return (parts[0], parts[1] if len(parts) > 1 else "") except Exception: pass return ("unknown", "unknown") def _get_window_linux() -> Tuple[str, str]: try: import subprocess # Try xdotool wid = subprocess.run( ["xdotool", "getactivewindow"], capture_output=True, text=True, timeout=2 ) if wid.returncode == 0: wid_val = wid.stdout.strip() name = subprocess.run( ["xdotool", "getwindowname", wid_val], capture_output=True, text=True, timeout=2 ) pid_result = subprocess.run( ["xdotool", "getwindowpid", wid_val], capture_output=True, text=True, timeout=2 ) title = name.stdout.strip() if name.returncode == 0 else "unknown" app = "unknown" if pid_result.returncode == 0: try: import psutil proc = psutil.Process(int(pid_result.stdout.strip())) app = proc.name() except Exception: pass return (app, title) except FileNotFoundError: pass try: # Fallback: wmctrl import subprocess result = subprocess.run( ["wmctrl", "-a", ":ACTIVE:"], capture_output=True, text=True ) except Exception: pass return ("unknown", "unknown") # ─── Categorizer ───────────────────────────────────────────────────────────── class Categorizer: """Rule-based app categorization using DB-defined rules.""" def __init__(self, db): self.db = db self._cache: dict = {} self._reload_interval = 60 # seconds self._last_reload = 0.0 self._load_rules() def _load_rules(self): cats = self.db.get_categories() self._rules = [] for cat in cats: self._rules.append({ "name": cat["name"], "keywords": json.loads(cat["keywords"] or "[]"), "apps": json.loads(cat["apps"] or "[]"), }) self._last_reload = time.time() def categorize(self, app_name: str, window_title: str, is_idle: bool) -> str: if is_idle: return "idle" # Reload rules periodically if time.time() - self._last_reload > self._reload_interval: self._load_rules() app_lower = app_name.lower() title_lower = window_title.lower() for rule in self._rules: if rule["name"] == "idle": continue for app_kw in rule["apps"]: if app_kw.lower() in app_lower: return rule["name"] for kw in rule["keywords"]: if kw.lower() in title_lower or kw.lower() in app_lower: return rule["name"] return "uncategorized" # ─── Idle Detector ─────────────────────────────────────────────────────────── class IdleDetector: """Tracks mouse + keyboard activity to detect idle state.""" def __init__(self, threshold_seconds: int = 300): self.threshold = threshold_seconds self._last_event = time.time() self._listener = None self._running = False def start(self): self._running = True try: from pynput import mouse, keyboard def on_activity(*args, **kwargs): self._last_event = time.time() self._mouse_listener = mouse.Listener( on_move=on_activity, on_click=on_activity, on_scroll=on_activity ) self._keyboard_listener = keyboard.Listener(on_press=on_activity) self._mouse_listener.start() self._keyboard_listener.start() logger.info("Idle detector started (pynput)") except Exception as e: logger.warning(f"pynput unavailable ({e}), idle detection disabled") def stop(self): self._running = False try: if self._mouse_listener: self._mouse_listener.stop() if self._keyboard_listener: self._keyboard_listener.stop() except Exception: pass @property def is_idle(self) -> bool: return (time.time() - self._last_event) > self.threshold @property def idle_seconds(self) -> float: elapsed = time.time() - self._last_event return max(0.0, elapsed - self.threshold) if self.is_idle else 0.0 def update_threshold(self, seconds: int): self.threshold = seconds # ─── Main Tracker ──────────────────────────────────────────────────────────── class ActivityTracker: """ Background activity tracking service. Logs heartbeats every N seconds to SQLite. """ def __init__(self, db): self.db = db self._running = False self._paused = False self._lock = threading.Lock() idle_threshold = int(db.get_setting("idle_threshold_seconds", 300)) self.heartbeat_interval = int(db.get_setting("heartbeat_interval", 5)) self.idle_detector = IdleDetector(idle_threshold) self.categorizer = Categorizer(db) # Current session state self.current_app = "" self.current_title = "" self.current_category = "" self.session_start = datetime.now() self.is_idle = False def run(self): """Main tracking loop. Runs in a background thread.""" self._running = True self.idle_detector.start() logger.info(f"Tracker started (heartbeat: {self.heartbeat_interval}s)") ignored_raw = self.db.get_setting("ignored_apps", "[]") try: ignored_apps = [a.lower() for a in json.loads(ignored_raw)] except Exception: ignored_apps = [] while self._running: try: if not self._paused: self._tick(ignored_apps) time.sleep(self.heartbeat_interval) except Exception as e: logger.error(f"Tracker error: {e}", exc_info=True) time.sleep(self.heartbeat_interval) def _tick(self, ignored_apps: list): """One heartbeat: detect window, log activity.""" app_name, window_title = get_active_window() is_idle = self.idle_detector.is_idle # Skip ignored apps if any(ig in app_name.lower() for ig in ignored_apps): return category = self.categorizer.categorize(app_name, window_title, is_idle) with self._lock: self.current_app = app_name self.current_title = window_title self.current_category = category self.is_idle = is_idle # Check if session changed if (app_name != self.current_app or abs((datetime.now() - self.session_start).total_seconds()) > 30): self.session_start = datetime.now() self.db.log_activity( timestamp=datetime.now(), app_name=app_name, window_title=window_title, duration_seconds=self.heartbeat_interval, category=category, is_idle=is_idle, ) # ─── Controls ──────────────────────────────────────────────────────────── def pause(self): self._paused = True self.db.set_setting("tracker_running", "paused") logger.info("Tracker paused") def resume(self): self._paused = False self.db.set_setting("tracker_running", "true") logger.info("Tracker resumed") def stop(self): self._running = False self.idle_detector.stop() self.db.set_setting("tracker_running", "false") logger.info("Tracker stopped") @property def status(self) -> str: if not self._running: return "stopped" if self._paused: return "paused" return "running" def get_current_state(self) -> dict: with self._lock: return { "app": self.current_app, "title": self.current_title, "category": self.current_category, "is_idle": self.is_idle, "status": self.status, "session_start": self.session_start.isoformat(), } def reload_settings(self): """Reload settings from DB (called after settings change).""" idle_threshold = int(self.db.get_setting("idle_threshold_seconds", 300)) self.heartbeat_interval = int(self.db.get_setting("heartbeat_interval", 5)) self.idle_detector.update_threshold(idle_threshold) logger.info("Tracker settings reloaded")