| """Codette Guardian — Input Safety, Ethical Checks, Trust Calibration
|
|
|
| Three-layer protection:
|
| 1. InputSanitizer: Catches injection, XSS, encoded attacks
|
| 2. EthicalAnchor: Tracks ethical regret and learning over time
|
| 3. TrustCalibrator: Dynamic trust scores for adapter/agent outputs
|
|
|
| Origin: input_sanitizer.py + validate_ethics.py + trust_logic.py +
|
| Codette_Deep_Simulation_v1.py (EthicalAnchor), rebuilt
|
| """
|
|
|
| import re
|
| import math
|
| import time
|
| import logging
|
| from dataclasses import dataclass, field
|
| from typing import Dict, List, Optional
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
| class InputSanitizer:
|
| """Detect and neutralize injection patterns in user input."""
|
|
|
| _INJECTION_PATTERNS = re.compile(
|
| r"(?:"
|
| r"\\[nr]|"
|
| r"�[ad];|"
|
| r"%0[ad]|"
|
| r"<script|"
|
| r"<iframe|"
|
| r";--|"
|
| r"UNION\s+SELECT|"
|
| r"\bDROP\s+TABLE|"
|
| r"javascript:|"
|
| r"data:text/html"
|
| r")",
|
| re.IGNORECASE,
|
| )
|
|
|
| _PROMPT_INJECTION = re.compile(
|
| r"(?:"
|
| r"ignore\s+(?:all\s+)?(?:previous|above)|"
|
| r"disregard\s+(?:your|all)|"
|
| r"you\s+are\s+now|"
|
| r"new\s+instructions?:|"
|
| r"system\s*prompt:|"
|
| r"forget\s+everything"
|
| r")",
|
| re.IGNORECASE,
|
| )
|
|
|
| def sanitize(self, text: str) -> str:
|
| """Remove dangerous patterns, return cleaned text."""
|
| original = text
|
| text = self._INJECTION_PATTERNS.sub("[BLOCKED]", text)
|
| if text != original:
|
| logger.warning("Input sanitized: injection pattern detected")
|
| return text
|
|
|
| def detect_threats(self, text: str) -> Dict[str, bool]:
|
| """Analyze text for various threat types."""
|
| return {
|
| "injection": bool(self._INJECTION_PATTERNS.search(text)),
|
| "prompt_injection": bool(self._PROMPT_INJECTION.search(text)),
|
| "excessive_length": len(text) > 10000,
|
| }
|
|
|
| def is_safe(self, text: str) -> bool:
|
| """Quick safety check — True if no threats detected."""
|
| threats = self.detect_threats(text)
|
| return not any(threats.values())
|
|
|
|
|
|
|
|
|
|
|
| @dataclass
|
| class EthicalAnchor:
|
| """Tracks ethical alignment through regret-based learning.
|
|
|
| The ethical score M evolves as:
|
| M = λ(R + H) + γ·Learn(M_prev, E) + μ·regret
|
|
|
| Where regret = |intended - actual| measures the gap between
|
| what the system intended to do and what it actually did.
|
| """
|
| lam: float = 0.7
|
| gamma: float = 0.5
|
| mu: float = 0.3
|
| learning_rate: float = 0.2
|
|
|
| score: float = 0.5
|
| total_regret: float = 0.0
|
| history: List[Dict] = field(default_factory=list)
|
|
|
| def update(self, coherence: float, tension: float,
|
| intended_helpfulness: float = 0.8,
|
| actual_helpfulness: float = 0.7) -> float:
|
| """Update ethical score after a response.
|
|
|
| Args:
|
| coherence: How coherent the response was [0, 1]
|
| tension: Epistemic tension level [0, 1]
|
| intended_helpfulness: What we aimed for [0, 1]
|
| actual_helpfulness: Estimated actual quality [0, 1]
|
| """
|
| regret = abs(intended_helpfulness - actual_helpfulness)
|
| self.total_regret += regret
|
|
|
|
|
| learn = self.learning_rate * (coherence - self.score)
|
|
|
|
|
| reasoning_quality = 0.5 * coherence + 0.5 * (1.0 - tension)
|
| self.score = (
|
| self.lam * reasoning_quality
|
| + self.gamma * learn
|
| + self.mu * (1.0 - regret)
|
| )
|
| self.score = max(0.0, min(1.0, self.score))
|
|
|
| record = {
|
| "timestamp": time.time(),
|
| "score": round(self.score, 4),
|
| "regret": round(regret, 4),
|
| "coherence": round(coherence, 4),
|
| }
|
| self.history.append(record)
|
|
|
| if len(self.history) > 50:
|
| self.history = self.history[-50:]
|
|
|
| return self.score
|
|
|
| def get_state(self) -> Dict:
|
| return {
|
| "ethical_score": round(self.score, 4),
|
| "total_regret": round(self.total_regret, 4),
|
| "recent_trend": self._trend(),
|
| }
|
|
|
| def _trend(self) -> str:
|
| if len(self.history) < 3:
|
| return "insufficient_data"
|
| recent = [h["score"] for h in self.history[-5:]]
|
| slope = recent[-1] - recent[0]
|
| if slope > 0.05:
|
| return "improving"
|
| elif slope < -0.05:
|
| return "declining"
|
| return "stable"
|
|
|
| def to_dict(self) -> Dict:
|
| return {
|
| "score": self.score,
|
| "total_regret": self.total_regret,
|
| "history": self.history[-10:],
|
| }
|
|
|
| @classmethod
|
| def from_dict(cls, d: Dict) -> "EthicalAnchor":
|
| anchor = cls()
|
| anchor.score = d.get("score", 0.5)
|
| anchor.total_regret = d.get("total_regret", 0.0)
|
| anchor.history = d.get("history", [])
|
| return anchor
|
|
|
|
|
|
|
|
|
|
|
| class TrustCalibrator:
|
| """Dynamic trust scores for adapter outputs.
|
|
|
| Trust increases when outputs are coherent, helpful, and ethically sound.
|
| Trust decreases for incoherent, harmful, or low-quality outputs.
|
| """
|
|
|
| def __init__(self):
|
| self.trust_scores: Dict[str, float] = {}
|
| self.interaction_counts: Dict[str, int] = {}
|
|
|
| def get_trust(self, adapter: str) -> float:
|
| """Get current trust score for an adapter [0.05, 1.5]."""
|
| return self.trust_scores.get(adapter, 1.0)
|
|
|
| def update(self, adapter: str, coherence: float = 0.5,
|
| was_helpful: bool = True, ethical_score: float = 0.5):
|
| """Update trust for an adapter based on output quality."""
|
| current = self.trust_scores.get(adapter, 1.0)
|
| count = self.interaction_counts.get(adapter, 0)
|
|
|
|
|
| quality = 0.4 * coherence + 0.3 * float(was_helpful) + 0.3 * ethical_score
|
|
|
|
|
| adjustment_rate = 0.1 / (1.0 + count * 0.01)
|
|
|
| if quality > 0.6:
|
| current *= (1.0 + adjustment_rate)
|
| elif quality < 0.3:
|
| current *= (1.0 - 2 * adjustment_rate)
|
| else:
|
| current *= (1.0 - 0.5 * adjustment_rate)
|
|
|
|
|
| current = max(0.05, min(1.5, current))
|
|
|
| self.trust_scores[adapter] = current
|
| self.interaction_counts[adapter] = count + 1
|
|
|
| def weighted_consensus(self, adapter_responses: Dict[str, str]) -> List[str]:
|
| """Rank adapter responses by trust-weighted priority."""
|
| ranked = sorted(
|
| adapter_responses.keys(),
|
| key=lambda a: self.get_trust(a),
|
| reverse=True,
|
| )
|
| return ranked
|
|
|
| def get_state(self) -> Dict:
|
| return {
|
| "trust_scores": {k: round(v, 3) for k, v in self.trust_scores.items()},
|
| "total_interactions": sum(self.interaction_counts.values()),
|
| }
|
|
|
| def to_dict(self) -> Dict:
|
| return {
|
| "trust_scores": self.trust_scores,
|
| "interaction_counts": self.interaction_counts,
|
| }
|
|
|
| @classmethod
|
| def from_dict(cls, d: Dict) -> "TrustCalibrator":
|
| cal = cls()
|
| cal.trust_scores = d.get("trust_scores", {})
|
| cal.interaction_counts = d.get("interaction_counts", {})
|
| return cal
|
|
|
|
|
|
|
|
|
|
|
| class CodetteGuardian:
|
| """Unified guardian combining all three safety layers."""
|
|
|
| def __init__(self):
|
| self.sanitizer = InputSanitizer()
|
| self.ethics = EthicalAnchor()
|
| self.trust = TrustCalibrator()
|
|
|
| def check_input(self, text: str) -> Dict:
|
| """Check user input for safety issues."""
|
| threats = self.sanitizer.detect_threats(text)
|
| safe_text = self.sanitizer.sanitize(text) if any(threats.values()) else text
|
| return {
|
| "safe": not any(threats.values()),
|
| "threats": threats,
|
| "cleaned_text": safe_text,
|
| }
|
|
|
| def evaluate_output(self, adapter: str, response: str,
|
| coherence: float = 0.5, tension: float = 0.3):
|
| """Evaluate an adapter's output and update trust/ethics."""
|
|
|
| helpful = len(response) > 50 and coherence > 0.3
|
|
|
| self.ethics.update(
|
| coherence=coherence,
|
| tension=tension,
|
| actual_helpfulness=0.7 if helpful else 0.3,
|
| )
|
| self.trust.update(
|
| adapter=adapter,
|
| coherence=coherence,
|
| was_helpful=helpful,
|
| ethical_score=self.ethics.score,
|
| )
|
|
|
| def get_state(self) -> Dict:
|
| return {
|
| "ethics": self.ethics.get_state(),
|
| "trust": self.trust.get_state(),
|
| }
|
|
|
| def to_dict(self) -> Dict:
|
| return {
|
| "ethics": self.ethics.to_dict(),
|
| "trust": self.trust.to_dict(),
|
| }
|
|
|
| @classmethod
|
| def from_dict(cls, d: Dict) -> "CodetteGuardian":
|
| g = cls()
|
| if "ethics" in d:
|
| g.ethics = EthicalAnchor.from_dict(d["ethics"])
|
| if "trust" in d:
|
| g.trust = TrustCalibrator.from_dict(d["trust"])
|
| return g
|
|
|