File size: 7,754 Bytes
f4bee9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
3️⃣ SECURITY MEMORY - Compressed threat experience
Purpose: Stores signals only, never raw data. Enables learning without liability.
"""

from sqlalchemy import Column, String, DateTime, JSON, Integer, Float, CheckConstraint, Index, ForeignKey, ARRAY
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import uuid

from database.models.base import Base

class SecurityMemory(Base):
    __tablename__ = "security_memory"
    
    # Core Identification
    memory_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
    
    # Threat Pattern Signature (hashed, not raw)
    pattern_signature = Column(String(64), unique=True, nullable=False)
    pattern_type = Column(String(30), nullable=False)
    
    # Domain Context
    source_domain = Column(String(20), nullable=False)
    affected_domains = Column(ARRAY(String(20)), nullable=False, default=[])
    
    # Signal Compression (NO RAW DATA)
    confidence_delta_vector = Column(JSON, nullable=False)  # Array of deltas, not raw confidences
    perturbation_statistics = Column(JSON, nullable=False)   # Stats only, not perturbations
    anomaly_signature_hash = Column(String(64), nullable=False)
    
    # Recurrence Tracking
    first_observed = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
    last_observed = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
    recurrence_count = Column(Integer, nullable=False, default=1, server_default="1")
    
    # Severity & Impact
    severity_score = Column(
        Float, 
        nullable=False, 
        default=0.5,
        server_default="0.5"
    )
    
    confidence_impact = Column(
        Float,
        nullable=False,
        default=0.0,
        server_default="0.0"
    )
    
    # Cross-Model Correlations
    correlated_patterns = Column(ARRAY(UUID(as_uuid=True)), nullable=False, default=[])
    correlation_strength = Column(Float, nullable=False, default=0.0, server_default="0.0")
    
    # Mitigation Intelligence
    effective_mitigations = Column(ARRAY(String(100)), nullable=False, default=[])
    mitigation_effectiveness = Column(Float, nullable=False, default=0.0, server_default="0.0")
    
    # Learning Source
    learned_from_models = Column(ARRAY(String(100)), nullable=False, default=[])
    compressed_experience = Column(JSON, nullable=False, default=dict, server_default="{}")
    
    # Relationships
    model_id = Column(String(100), ForeignKey("model_registry.model_id"))
    model = relationship("ModelRegistry", back_populates="security_memories")
    
    # Table constraints
    __table_args__ = (
        CheckConstraint(
            "pattern_type IN ('confidence_erosion', 'adversarial_pattern', 'anomaly_signature', 'distribution_shift', 'temporal_attack', 'cross_model_correlation')",
            name="ck_security_memory_pattern_type"
        ),
        CheckConstraint(
            "severity_score >= 0.0 AND severity_score <= 1.0",
            name="ck_security_memory_severity"
        ),
        CheckConstraint(
            "confidence_impact >= -1.0 AND confidence_impact <= 1.0",
            name="ck_security_memory_confidence_impact"
        ),
        CheckConstraint(
            "correlation_strength >= 0.0 AND correlation_strength <= 1.0",
            name="ck_security_memory_correlation"
        ),
        CheckConstraint(
            "mitigation_effectiveness >= 0.0 AND mitigation_effectiveness <= 1.0",
            name="ck_security_memory_mitigation"
        ),
        Index("idx_security_memory_pattern_type", "pattern_type"),
        Index("idx_security_memory_severity", "severity_score"),
        Index("idx_security_memory_recurrence", "recurrence_count"),
        Index("idx_security_memory_domain", "source_domain"),
        Index("idx_security_memory_recency", "last_observed"),
    )
    
    def __repr__(self):
        return f"<SecurityMemory {self.pattern_signature[:16]}...: {self.pattern_type}>"
    
    def to_dict(self):
        """Convert to dictionary for serialization"""
        return {
            "memory_id": str(self.memory_id),
            "pattern_signature": self.pattern_signature,
            "pattern_type": self.pattern_type,
            "source_domain": self.source_domain,
            "affected_domains": self.affected_domains,
            "severity_score": self.severity_score,
            "confidence_impact": self.confidence_impact,
            "recurrence_count": self.recurrence_count,
            "first_observed": self.first_observed.isoformat() if self.first_observed else None,
            "last_observed": self.last_observed.isoformat() if self.last_observed else None,
            "correlation_strength": self.correlation_strength,
            "effective_mitigations": self.effective_mitigations,
            "mitigation_effectiveness": self.mitigation_effectiveness,
            "learned_from_models": self.learned_from_models
        }
    
    @classmethod
    def get_by_pattern_type(cls, session, pattern_type, limit: int = 100):
        """Get security memories by pattern type"""
        return (
            session.query(cls)
            .filter(cls.pattern_type == pattern_type)
            .order_by(cls.last_observed.desc())
            .limit(limit)
            .all()
        )
    
    @classmethod
    def get_recent_threats(cls, session, hours: int = 24, limit: int = 50):
        """Get recent threats within specified hours"""
        from datetime import datetime, timedelta
        time_threshold = datetime.utcnow() - timedelta(hours=hours)
        
        return (
            session.query(cls)
            .filter(cls.last_observed >= time_threshold)
            .order_by(cls.severity_score.desc(), cls.last_observed.desc())
            .limit(limit)
            .all()
        )
    
    def record_recurrence(self, new_severity: float = None, new_confidence_impact: float = None):
        """Record another occurrence of this pattern"""
        from datetime import datetime
        
        self.last_observed = datetime.utcnow()
        self.recurrence_count += 1
        
        # Update severity with decayed average
        if new_severity is not None:
            decay = 0.8  # 80% weight to history
            self.severity_score = (
                decay * self.severity_score + 
                (1 - decay) * new_severity
            )
        
        # Update confidence impact
        if new_confidence_impact is not None:
            self.confidence_impact = (
                decay * self.confidence_impact + 
                (1 - decay) * new_confidence_impact
            )
    
    def add_mitigation(self, mitigation: str, effectiveness: float):
        """Add a mitigation strategy for this pattern"""
        if mitigation not in self.effective_mitigations:
            self.effective_mitigations.append(mitigation)
        
        # Update effectiveness score
        if self.mitigation_effectiveness == 0.0:
            self.mitigation_effectiveness = effectiveness
        else:
            # Weighted average
            self.mitigation_effectiveness = 0.7 * self.mitigation_effectiveness + 0.3 * effectiveness
    
    def add_correlation(self, other_memory_id: uuid.UUID, strength: float):
        """Add correlation with another security memory pattern"""
        if other_memory_id not in self.correlated_patterns:
            self.correlated_patterns.append(other_memory_id)
        
        # Update correlation strength
        if strength > self.correlation_strength:
            self.correlation_strength = strength