|
|
""" |
|
|
State Management - Single Source of Truth |
|
|
All state changes go through here |
|
|
""" |
|
|
import json |
|
|
import threading |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Any, Callable |
|
|
from dataclasses import dataclass, asdict, field |
|
|
|
|
|
from core.config import STATE_FILE, MODELS_DIR |
|
|
from core.logger import logger |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class InstalledModel: |
|
|
"""Installed model metadata""" |
|
|
id: str |
|
|
name: str |
|
|
hf_repo: str |
|
|
filename: str |
|
|
model_type: str |
|
|
size_bytes: int |
|
|
quant: str |
|
|
installed_at: str |
|
|
system_prompt: str = "" |
|
|
params_b: float = 0.0 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AppState: |
|
|
"""Application state - single source of truth""" |
|
|
installed_models: List[Dict] = field(default_factory=list) |
|
|
loaded_model_id: Optional[str] = None |
|
|
default_model_id: Optional[str] = None |
|
|
sessions: List[Dict] = field(default_factory=list) |
|
|
active_session_id: Optional[str] = None |
|
|
settings: Dict = field(default_factory=dict) |
|
|
version: str = "2.0.0" |
|
|
|
|
|
|
|
|
class StateManager: |
|
|
""" |
|
|
Manages all application state with: |
|
|
- File persistence |
|
|
- Thread safety |
|
|
- Change notifications |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self._state: AppState = AppState() |
|
|
self._lock = threading.RLock() |
|
|
self._subscribers: List[Callable] = [] |
|
|
self._load_state() |
|
|
|
|
|
def _load_state(self): |
|
|
"""Load state from file""" |
|
|
with self._lock: |
|
|
if STATE_FILE.exists(): |
|
|
try: |
|
|
data = json.loads(STATE_FILE.read_text()) |
|
|
self._state = AppState(**data) |
|
|
logger.info("State", "State loaded from file") |
|
|
except Exception as e: |
|
|
logger.error("State", f"Failed to load state: {e}") |
|
|
self._state = AppState() |
|
|
else: |
|
|
self._state = AppState() |
|
|
self._save_state() |
|
|
|
|
|
def _save_state(self): |
|
|
"""Save state to file""" |
|
|
try: |
|
|
STATE_FILE.write_text(json.dumps(asdict(self._state), indent=2)) |
|
|
except Exception as e: |
|
|
logger.error("State", f"Failed to save state: {e}") |
|
|
|
|
|
def _notify_subscribers(self): |
|
|
"""Notify all subscribers of state change""" |
|
|
for callback in self._subscribers: |
|
|
try: |
|
|
callback(self._state) |
|
|
except Exception as e: |
|
|
logger.error("State", f"Subscriber error: {e}") |
|
|
|
|
|
def subscribe(self, callback: Callable): |
|
|
"""Subscribe to state changes""" |
|
|
self._subscribers.append(callback) |
|
|
|
|
|
def unsubscribe(self, callback: Callable): |
|
|
"""Unsubscribe from state changes""" |
|
|
if callback in self._subscribers: |
|
|
self._subscribers.remove(callback) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_installed_models(self) -> List[Dict]: |
|
|
"""Get all installed models""" |
|
|
with self._lock: |
|
|
return self._state.installed_models.copy() |
|
|
|
|
|
def get_model_by_id(self, model_id: str) -> Optional[Dict]: |
|
|
"""Get specific model by ID""" |
|
|
with self._lock: |
|
|
for m in self._state.installed_models: |
|
|
if m["id"] == model_id: |
|
|
return m.copy() |
|
|
return None |
|
|
|
|
|
def is_model_installed(self, hf_repo: str, filename: str) -> bool: |
|
|
"""Check if model is already installed (duplicate check)""" |
|
|
with self._lock: |
|
|
for m in self._state.installed_models: |
|
|
if m["hf_repo"] == hf_repo and m["filename"] == filename: |
|
|
return True |
|
|
return False |
|
|
|
|
|
def add_model(self, model: InstalledModel) -> bool: |
|
|
"""Add a new installed model""" |
|
|
with self._lock: |
|
|
|
|
|
if self.is_model_installed(model.hf_repo, model.filename): |
|
|
logger.warn("State", f"Model already installed: {model.filename}") |
|
|
return False |
|
|
|
|
|
self._state.installed_models.append(asdict(model)) |
|
|
|
|
|
|
|
|
if len(self._state.installed_models) == 1: |
|
|
self._state.default_model_id = model.id |
|
|
|
|
|
self._save_state() |
|
|
self._notify_subscribers() |
|
|
logger.event("State", f"Model added: {model.name}") |
|
|
return True |
|
|
|
|
|
def remove_model(self, model_id: str) -> bool: |
|
|
"""Remove an installed model""" |
|
|
with self._lock: |
|
|
for i, m in enumerate(self._state.installed_models): |
|
|
if m["id"] == model_id: |
|
|
|
|
|
if self._state.loaded_model_id == model_id: |
|
|
self._state.loaded_model_id = None |
|
|
|
|
|
|
|
|
removed = self._state.installed_models.pop(i) |
|
|
|
|
|
|
|
|
if self._state.default_model_id == model_id: |
|
|
self._state.default_model_id = ( |
|
|
self._state.installed_models[0]["id"] |
|
|
if self._state.installed_models else None |
|
|
) |
|
|
|
|
|
self._save_state() |
|
|
self._notify_subscribers() |
|
|
logger.event("State", f"Model removed: {removed['name']}") |
|
|
return True |
|
|
return False |
|
|
|
|
|
def update_model(self, model_id: str, updates: Dict) -> bool: |
|
|
"""Update model properties""" |
|
|
with self._lock: |
|
|
for m in self._state.installed_models: |
|
|
if m["id"] == model_id: |
|
|
m.update(updates) |
|
|
self._save_state() |
|
|
self._notify_subscribers() |
|
|
return True |
|
|
return False |
|
|
|
|
|
def set_loaded_model(self, model_id: Optional[str]): |
|
|
"""Set the currently loaded model""" |
|
|
with self._lock: |
|
|
self._state.loaded_model_id = model_id |
|
|
self._save_state() |
|
|
self._notify_subscribers() |
|
|
logger.event("State", f"Model loaded: {model_id}") |
|
|
|
|
|
def get_loaded_model_id(self) -> Optional[str]: |
|
|
"""Get currently loaded model ID""" |
|
|
with self._lock: |
|
|
return self._state.loaded_model_id |
|
|
|
|
|
def set_default_model(self, model_id: str): |
|
|
"""Set default model""" |
|
|
with self._lock: |
|
|
self._state.default_model_id = model_id |
|
|
self._save_state() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_sessions(self) -> List[Dict]: |
|
|
"""Get all sessions""" |
|
|
with self._lock: |
|
|
return self._state.sessions.copy() |
|
|
|
|
|
def add_session(self, session: Dict) -> str: |
|
|
"""Add new session, return session ID""" |
|
|
with self._lock: |
|
|
self._state.sessions.insert(0, session) |
|
|
self._state.active_session_id = session["id"] |
|
|
self._save_state() |
|
|
self._notify_subscribers() |
|
|
return session["id"] |
|
|
|
|
|
def update_session(self, session_id: str, updates: Dict): |
|
|
"""Update session""" |
|
|
with self._lock: |
|
|
for s in self._state.sessions: |
|
|
if s["id"] == session_id: |
|
|
s.update(updates) |
|
|
s["updated_at"] = datetime.now().isoformat() |
|
|
self._save_state() |
|
|
return True |
|
|
return False |
|
|
|
|
|
def delete_session(self, session_id: str) -> bool: |
|
|
"""Delete session""" |
|
|
with self._lock: |
|
|
for i, s in enumerate(self._state.sessions): |
|
|
if s["id"] == session_id: |
|
|
self._state.sessions.pop(i) |
|
|
if self._state.active_session_id == session_id: |
|
|
self._state.active_session_id = ( |
|
|
self._state.sessions[0]["id"] |
|
|
if self._state.sessions else None |
|
|
) |
|
|
self._save_state() |
|
|
self._notify_subscribers() |
|
|
return True |
|
|
return False |
|
|
|
|
|
def set_active_session(self, session_id: str): |
|
|
"""Set active session""" |
|
|
with self._lock: |
|
|
self._state.active_session_id = session_id |
|
|
self._save_state() |
|
|
|
|
|
def get_active_session_id(self) -> Optional[str]: |
|
|
"""Get active session ID""" |
|
|
with self._lock: |
|
|
return self._state.active_session_id |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_settings(self) -> Dict: |
|
|
"""Get all settings""" |
|
|
with self._lock: |
|
|
return self._state.settings.copy() |
|
|
|
|
|
def update_settings(self, updates: Dict): |
|
|
"""Update settings""" |
|
|
with self._lock: |
|
|
self._state.settings.update(updates) |
|
|
self._save_state() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_full_state(self) -> Dict: |
|
|
"""Get complete state (for debugging)""" |
|
|
with self._lock: |
|
|
return asdict(self._state) |
|
|
|
|
|
def sync_with_filesystem(self): |
|
|
""" |
|
|
Sync state with actual files on disk. |
|
|
Call this on startup to handle manual file changes. |
|
|
""" |
|
|
with self._lock: |
|
|
|
|
|
valid_models = [] |
|
|
for m in self._state.installed_models: |
|
|
model_path = MODELS_DIR / m["filename"] |
|
|
if model_path.exists(): |
|
|
valid_models.append(m) |
|
|
else: |
|
|
logger.warn("State", f"Model file missing, removing: {m['filename']}") |
|
|
|
|
|
if len(valid_models) != len(self._state.installed_models): |
|
|
self._state.installed_models = valid_models |
|
|
self._save_state() |
|
|
logger.info("State", "State synced with filesystem") |
|
|
|
|
|
|
|
|
|
|
|
_state_manager: Optional[StateManager] = None |
|
|
|
|
|
|
|
|
def get_state() -> StateManager: |
|
|
"""Get the singleton state manager""" |
|
|
global _state_manager |
|
|
if _state_manager is None: |
|
|
_state_manager = StateManager() |
|
|
return _state_manager |
|
|
|