""" intervention/temporal_model.py ============================== Secure wrapper around :class:`TemporalStressProfile` that enforces encryption at rest for user stress history. Workflow -------- 1. **Decrypt** the user's encrypted history into RAM. 2. Reconstruct a ``TemporalStressProfile`` from the decrypted entries. 3. Add the new score and compute velocity / threshold / volatility. 4. **Re-encrypt** the updated history before persisting. The raw ``(timestamp, score)`` tuples are NEVER stored in plaintext. """ from __future__ import annotations import logging from typing import Any, Optional from cryptography.fernet import InvalidToken from models.temporal_stress_profile import TemporalAnalysis, TemporalStressProfile from security.auth import decrypt_data, encrypt_data logger = logging.getLogger(__name__) class SecureTemporalModel: """Encrypt-at-rest wrapper for temporal stress tracking. Parameters ---------- max_history : int Maximum entries to retain per user. velocity_window : int Window size for stress velocity computation. volatility_window : int Window size for volatility computation. volatility_threshold : float σ threshold above which the user is flagged volatile. """ def __init__( self, max_history: int = 50, velocity_window: int = 5, volatility_window: int = 5, volatility_threshold: float = 0.25, ) -> None: self._max_history = max_history self._velocity_window = velocity_window self._volatility_window = volatility_window self._volatility_threshold = volatility_threshold def process( self, score: float, encrypted_history: Optional[str] = None, timestamp: Optional[float] = None, ) -> tuple[TemporalAnalysis, str]: """Process a new score, updating the encrypted history. Parameters ---------- score : float Stress score in [0, 1]. encrypted_history : str, optional Previously encrypted history blob. ``None`` for first use. timestamp : float, optional Unix timestamp. Defaults to now. Returns ------- tuple[TemporalAnalysis, str] ``(analysis, new_encrypted_history)`` """ # 1. Decrypt existing history into RAM history: list[list[float]] = [] if encrypted_history: try: result = decrypt_data(encrypted_history) if result is None: logger.warning( "Failed to decrypt stress history (key may have rotated). " "Starting with fresh history." ) else: history = result except Exception: logger.warning( "Failed to decrypt stress history (key may have rotated). " "Starting with fresh history." ) history = [] # 2. Rebuild the profile from history profile = TemporalStressProfile( max_history=self._max_history, velocity_window=self._velocity_window, volatility_window=self._volatility_window, volatility_threshold=self._volatility_threshold, ) for entry in history: ts, sc = entry[0], entry[1] profile.add_score(sc, timestamp=ts) # 3. Add new score analysis = profile.add_score(score, timestamp=timestamp) # 4. Re-encrypt the updated history updated_history = [[ts, sc] for ts, sc in profile.history] new_encrypted = encrypt_data(updated_history) return analysis, new_encrypted