File size: 3,781 Bytes
4bb871a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
intervention/temporal_model.py
==============================
Secure wrapper around :class:`TemporalStressProfile` that enforces
encryption at rest for user stress history.

Workflow
--------
1. **Decrypt** the user's encrypted history into RAM.
2. Reconstruct a ``TemporalStressProfile`` from the decrypted entries.
3. Add the new score and compute velocity / threshold / volatility.
4. **Re-encrypt** the updated history before persisting.

The raw ``(timestamp, score)`` tuples are NEVER stored in plaintext.
"""

from __future__ import annotations

import logging
from typing import Any, Optional

from cryptography.fernet import InvalidToken
from models.temporal_stress_profile import TemporalAnalysis, TemporalStressProfile
from security.auth import decrypt_data, encrypt_data

logger = logging.getLogger(__name__)


class SecureTemporalModel:
    """Encrypt-at-rest wrapper for temporal stress tracking.

    Parameters
    ----------
    max_history : int
        Maximum entries to retain per user.
    velocity_window : int
        Window size for stress velocity computation.
    volatility_window : int
        Window size for volatility computation.
    volatility_threshold : float
        σ threshold above which the user is flagged volatile.
    """

    def __init__(
        self,
        max_history: int = 50,
        velocity_window: int = 5,
        volatility_window: int = 5,
        volatility_threshold: float = 0.25,
    ) -> None:
        self._max_history = max_history
        self._velocity_window = velocity_window
        self._volatility_window = volatility_window
        self._volatility_threshold = volatility_threshold

    def process(
        self,
        score: float,
        encrypted_history: Optional[str] = None,
        timestamp: Optional[float] = None,
    ) -> tuple[TemporalAnalysis, str]:
        """Process a new score, updating the encrypted history.

        Parameters
        ----------
        score : float
            Stress score in [0, 1].
        encrypted_history : str, optional
            Previously encrypted history blob. ``None`` for first use.
        timestamp : float, optional
            Unix timestamp. Defaults to now.

        Returns
        -------
        tuple[TemporalAnalysis, str]
            ``(analysis, new_encrypted_history)``
        """
        # 1. Decrypt existing history into RAM
        history: list[list[float]] = []
        if encrypted_history:
            try:
                result = decrypt_data(encrypted_history)
                if result is None:
                    logger.warning(
                        "Failed to decrypt stress history (key may have rotated). "
                        "Starting with fresh history."
                    )
                else:
                    history = result
            except Exception:
                logger.warning(
                    "Failed to decrypt stress history (key may have rotated). "
                    "Starting with fresh history."
                )
                history = []

        # 2. Rebuild the profile from history
        profile = TemporalStressProfile(
            max_history=self._max_history,
            velocity_window=self._velocity_window,
            volatility_window=self._volatility_window,
            volatility_threshold=self._volatility_threshold,
        )
        for entry in history:
            ts, sc = entry[0], entry[1]
            profile.add_score(sc, timestamp=ts)

        # 3. Add new score
        analysis = profile.add_score(score, timestamp=timestamp)

        # 4. Re-encrypt the updated history
        updated_history = [[ts, sc] for ts, sc in profile.history]
        new_encrypted = encrypt_data(updated_history)

        return analysis, new_encrypted