File size: 10,664 Bytes
ed1b365
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""Codette Guardian — Input Safety, Ethical Checks, Trust Calibration



Three-layer protection:

1. InputSanitizer: Catches injection, XSS, encoded attacks

2. EthicalAnchor: Tracks ethical regret and learning over time

3. TrustCalibrator: Dynamic trust scores for adapter/agent outputs



Origin: input_sanitizer.py + validate_ethics.py + trust_logic.py +

        Codette_Deep_Simulation_v1.py (EthicalAnchor), rebuilt

"""

import re
import math
import time
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional

logger = logging.getLogger(__name__)


# ================================================================
# Layer 1: Input Sanitization
# ================================================================
class InputSanitizer:
    """Detect and neutralize injection patterns in user input."""

    _INJECTION_PATTERNS = re.compile(
        r"(?:"
        r"\\[nr]|"           # Escaped newlines
        r"&#x0[ad];|"        # HTML entities for CR/LF
        r"%0[ad]|"           # URL-encoded CR/LF
        r"<script|"          # Script injection
        r"<iframe|"          # IFrame injection
        r";--|"              # SQL comment injection
        r"UNION\s+SELECT|"   # SQL union
        r"\bDROP\s+TABLE|"   # SQL drop
        r"javascript:|"      # JS protocol
        r"data:text/html"    # Data URI XSS
        r")",
        re.IGNORECASE,
    )

    _PROMPT_INJECTION = re.compile(
        r"(?:"
        r"ignore\s+(?:all\s+)?(?:previous|above)|"
        r"disregard\s+(?:your|all)|"
        r"you\s+are\s+now|"
        r"new\s+instructions?:|"
        r"system\s*prompt:|"
        r"forget\s+everything"
        r")",
        re.IGNORECASE,
    )

    def sanitize(self, text: str) -> str:
        """Remove dangerous patterns, return cleaned text."""
        original = text
        text = self._INJECTION_PATTERNS.sub("[BLOCKED]", text)
        if text != original:
            logger.warning("Input sanitized: injection pattern detected")
        return text

    def detect_threats(self, text: str) -> Dict[str, bool]:
        """Analyze text for various threat types."""
        return {
            "injection": bool(self._INJECTION_PATTERNS.search(text)),
            "prompt_injection": bool(self._PROMPT_INJECTION.search(text)),
            "excessive_length": len(text) > 10000,
        }

    def is_safe(self, text: str) -> bool:
        """Quick safety check — True if no threats detected."""
        threats = self.detect_threats(text)
        return not any(threats.values())


# ================================================================
# Layer 2: Ethical Anchor (from Deep Simulation)
# ================================================================
@dataclass
class EthicalAnchor:
    """Tracks ethical alignment through regret-based learning.



    The ethical score M evolves as:

        M = λ(R + H) + γ·Learn(M_prev, E) + μ·regret



    Where regret = |intended - actual| measures the gap between

    what the system intended to do and what it actually did.

    """
    lam: float = 0.7      # Weight for recent reasoning + history
    gamma: float = 0.5    # Weight for learning from experience
    mu: float = 0.3       # Weight for regret signal
    learning_rate: float = 0.2

    score: float = 0.5    # Current ethical alignment score [0, 1]
    total_regret: float = 0.0
    history: List[Dict] = field(default_factory=list)

    def update(self, coherence: float, tension: float,

               intended_helpfulness: float = 0.8,

               actual_helpfulness: float = 0.7) -> float:
        """Update ethical score after a response.



        Args:

            coherence: How coherent the response was [0, 1]

            tension: Epistemic tension level [0, 1]

            intended_helpfulness: What we aimed for [0, 1]

            actual_helpfulness: Estimated actual quality [0, 1]

        """
        regret = abs(intended_helpfulness - actual_helpfulness)
        self.total_regret += regret

        # Learning signal: move toward better alignment
        learn = self.learning_rate * (coherence - self.score)

        # New score
        reasoning_quality = 0.5 * coherence + 0.5 * (1.0 - tension)
        self.score = (
            self.lam * reasoning_quality
            + self.gamma * learn
            + self.mu * (1.0 - regret)  # Low regret → high ethics
        )
        self.score = max(0.0, min(1.0, self.score))

        record = {
            "timestamp": time.time(),
            "score": round(self.score, 4),
            "regret": round(regret, 4),
            "coherence": round(coherence, 4),
        }
        self.history.append(record)
        # Keep only recent history
        if len(self.history) > 50:
            self.history = self.history[-50:]

        return self.score

    def get_state(self) -> Dict:
        return {
            "ethical_score": round(self.score, 4),
            "total_regret": round(self.total_regret, 4),
            "recent_trend": self._trend(),
        }

    def _trend(self) -> str:
        if len(self.history) < 3:
            return "insufficient_data"
        recent = [h["score"] for h in self.history[-5:]]
        slope = recent[-1] - recent[0]
        if slope > 0.05:
            return "improving"
        elif slope < -0.05:
            return "declining"
        return "stable"

    def to_dict(self) -> Dict:
        return {
            "score": self.score,
            "total_regret": self.total_regret,
            "history": self.history[-10:],
        }

    @classmethod
    def from_dict(cls, d: Dict) -> "EthicalAnchor":
        anchor = cls()
        anchor.score = d.get("score", 0.5)
        anchor.total_regret = d.get("total_regret", 0.0)
        anchor.history = d.get("history", [])
        return anchor


# ================================================================
# Layer 3: Trust Calibration
# ================================================================
class TrustCalibrator:
    """Dynamic trust scores for adapter outputs.



    Trust increases when outputs are coherent, helpful, and ethically sound.

    Trust decreases for incoherent, harmful, or low-quality outputs.

    """

    def __init__(self):
        self.trust_scores: Dict[str, float] = {}
        self.interaction_counts: Dict[str, int] = {}

    def get_trust(self, adapter: str) -> float:
        """Get current trust score for an adapter [0.05, 1.5]."""
        return self.trust_scores.get(adapter, 1.0)

    def update(self, adapter: str, coherence: float = 0.5,

               was_helpful: bool = True, ethical_score: float = 0.5):
        """Update trust for an adapter based on output quality."""
        current = self.trust_scores.get(adapter, 1.0)
        count = self.interaction_counts.get(adapter, 0)

        # Quality composite
        quality = 0.4 * coherence + 0.3 * float(was_helpful) + 0.3 * ethical_score

        # Adaptive adjustment (smaller changes as trust stabilizes)
        adjustment_rate = 0.1 / (1.0 + count * 0.01)

        if quality > 0.6:
            current *= (1.0 + adjustment_rate)
        elif quality < 0.3:
            current *= (1.0 - 2 * adjustment_rate)
        else:
            current *= (1.0 - 0.5 * adjustment_rate)

        # Clamp to valid range
        current = max(0.05, min(1.5, current))

        self.trust_scores[adapter] = current
        self.interaction_counts[adapter] = count + 1

    def weighted_consensus(self, adapter_responses: Dict[str, str]) -> List[str]:
        """Rank adapter responses by trust-weighted priority."""
        ranked = sorted(
            adapter_responses.keys(),
            key=lambda a: self.get_trust(a),
            reverse=True,
        )
        return ranked

    def get_state(self) -> Dict:
        return {
            "trust_scores": {k: round(v, 3) for k, v in self.trust_scores.items()},
            "total_interactions": sum(self.interaction_counts.values()),
        }

    def to_dict(self) -> Dict:
        return {
            "trust_scores": self.trust_scores,
            "interaction_counts": self.interaction_counts,
        }

    @classmethod
    def from_dict(cls, d: Dict) -> "TrustCalibrator":
        cal = cls()
        cal.trust_scores = d.get("trust_scores", {})
        cal.interaction_counts = d.get("interaction_counts", {})
        return cal


# ================================================================
# Combined Guardian
# ================================================================
class CodetteGuardian:
    """Unified guardian combining all three safety layers."""

    def __init__(self):
        self.sanitizer = InputSanitizer()
        self.ethics = EthicalAnchor()
        self.trust = TrustCalibrator()

    def check_input(self, text: str) -> Dict:
        """Check user input for safety issues."""
        threats = self.sanitizer.detect_threats(text)
        safe_text = self.sanitizer.sanitize(text) if any(threats.values()) else text
        return {
            "safe": not any(threats.values()),
            "threats": threats,
            "cleaned_text": safe_text,
        }

    def evaluate_output(self, adapter: str, response: str,

                        coherence: float = 0.5, tension: float = 0.3):
        """Evaluate an adapter's output and update trust/ethics."""
        # Estimate helpfulness from response quality signals
        helpful = len(response) > 50 and coherence > 0.3

        self.ethics.update(
            coherence=coherence,
            tension=tension,
            actual_helpfulness=0.7 if helpful else 0.3,
        )
        self.trust.update(
            adapter=adapter,
            coherence=coherence,
            was_helpful=helpful,
            ethical_score=self.ethics.score,
        )

    def get_state(self) -> Dict:
        return {
            "ethics": self.ethics.get_state(),
            "trust": self.trust.get_state(),
        }

    def to_dict(self) -> Dict:
        return {
            "ethics": self.ethics.to_dict(),
            "trust": self.trust.to_dict(),
        }

    @classmethod
    def from_dict(cls, d: Dict) -> "CodetteGuardian":
        g = cls()
        if "ethics" in d:
            g.ethics = EthicalAnchor.from_dict(d["ethics"])
        if "trust" in d:
            g.trust = TrustCalibrator.from_dict(d["trust"])
        return g