File size: 7,754 Bytes
f4bee9e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
"""
3️⃣ SECURITY MEMORY - Compressed threat experience
Purpose: Stores signals only, never raw data. Enables learning without liability.
"""
from sqlalchemy import Column, String, DateTime, JSON, Integer, Float, CheckConstraint, Index, ForeignKey, ARRAY
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import uuid
from database.models.base import Base
class SecurityMemory(Base):
__tablename__ = "security_memory"
# Core Identification
memory_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
# Threat Pattern Signature (hashed, not raw)
pattern_signature = Column(String(64), unique=True, nullable=False)
pattern_type = Column(String(30), nullable=False)
# Domain Context
source_domain = Column(String(20), nullable=False)
affected_domains = Column(ARRAY(String(20)), nullable=False, default=[])
# Signal Compression (NO RAW DATA)
confidence_delta_vector = Column(JSON, nullable=False) # Array of deltas, not raw confidences
perturbation_statistics = Column(JSON, nullable=False) # Stats only, not perturbations
anomaly_signature_hash = Column(String(64), nullable=False)
# Recurrence Tracking
first_observed = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
last_observed = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
recurrence_count = Column(Integer, nullable=False, default=1, server_default="1")
# Severity & Impact
severity_score = Column(
Float,
nullable=False,
default=0.5,
server_default="0.5"
)
confidence_impact = Column(
Float,
nullable=False,
default=0.0,
server_default="0.0"
)
# Cross-Model Correlations
correlated_patterns = Column(ARRAY(UUID(as_uuid=True)), nullable=False, default=[])
correlation_strength = Column(Float, nullable=False, default=0.0, server_default="0.0")
# Mitigation Intelligence
effective_mitigations = Column(ARRAY(String(100)), nullable=False, default=[])
mitigation_effectiveness = Column(Float, nullable=False, default=0.0, server_default="0.0")
# Learning Source
learned_from_models = Column(ARRAY(String(100)), nullable=False, default=[])
compressed_experience = Column(JSON, nullable=False, default=dict, server_default="{}")
# Relationships
model_id = Column(String(100), ForeignKey("model_registry.model_id"))
model = relationship("ModelRegistry", back_populates="security_memories")
# Table constraints
__table_args__ = (
CheckConstraint(
"pattern_type IN ('confidence_erosion', 'adversarial_pattern', 'anomaly_signature', 'distribution_shift', 'temporal_attack', 'cross_model_correlation')",
name="ck_security_memory_pattern_type"
),
CheckConstraint(
"severity_score >= 0.0 AND severity_score <= 1.0",
name="ck_security_memory_severity"
),
CheckConstraint(
"confidence_impact >= -1.0 AND confidence_impact <= 1.0",
name="ck_security_memory_confidence_impact"
),
CheckConstraint(
"correlation_strength >= 0.0 AND correlation_strength <= 1.0",
name="ck_security_memory_correlation"
),
CheckConstraint(
"mitigation_effectiveness >= 0.0 AND mitigation_effectiveness <= 1.0",
name="ck_security_memory_mitigation"
),
Index("idx_security_memory_pattern_type", "pattern_type"),
Index("idx_security_memory_severity", "severity_score"),
Index("idx_security_memory_recurrence", "recurrence_count"),
Index("idx_security_memory_domain", "source_domain"),
Index("idx_security_memory_recency", "last_observed"),
)
def __repr__(self):
return f"<SecurityMemory {self.pattern_signature[:16]}...: {self.pattern_type}>"
def to_dict(self):
"""Convert to dictionary for serialization"""
return {
"memory_id": str(self.memory_id),
"pattern_signature": self.pattern_signature,
"pattern_type": self.pattern_type,
"source_domain": self.source_domain,
"affected_domains": self.affected_domains,
"severity_score": self.severity_score,
"confidence_impact": self.confidence_impact,
"recurrence_count": self.recurrence_count,
"first_observed": self.first_observed.isoformat() if self.first_observed else None,
"last_observed": self.last_observed.isoformat() if self.last_observed else None,
"correlation_strength": self.correlation_strength,
"effective_mitigations": self.effective_mitigations,
"mitigation_effectiveness": self.mitigation_effectiveness,
"learned_from_models": self.learned_from_models
}
@classmethod
def get_by_pattern_type(cls, session, pattern_type, limit: int = 100):
"""Get security memories by pattern type"""
return (
session.query(cls)
.filter(cls.pattern_type == pattern_type)
.order_by(cls.last_observed.desc())
.limit(limit)
.all()
)
@classmethod
def get_recent_threats(cls, session, hours: int = 24, limit: int = 50):
"""Get recent threats within specified hours"""
from datetime import datetime, timedelta
time_threshold = datetime.utcnow() - timedelta(hours=hours)
return (
session.query(cls)
.filter(cls.last_observed >= time_threshold)
.order_by(cls.severity_score.desc(), cls.last_observed.desc())
.limit(limit)
.all()
)
def record_recurrence(self, new_severity: float = None, new_confidence_impact: float = None):
"""Record another occurrence of this pattern"""
from datetime import datetime
self.last_observed = datetime.utcnow()
self.recurrence_count += 1
# Update severity with decayed average
if new_severity is not None:
decay = 0.8 # 80% weight to history
self.severity_score = (
decay * self.severity_score +
(1 - decay) * new_severity
)
# Update confidence impact
if new_confidence_impact is not None:
self.confidence_impact = (
decay * self.confidence_impact +
(1 - decay) * new_confidence_impact
)
def add_mitigation(self, mitigation: str, effectiveness: float):
"""Add a mitigation strategy for this pattern"""
if mitigation not in self.effective_mitigations:
self.effective_mitigations.append(mitigation)
# Update effectiveness score
if self.mitigation_effectiveness == 0.0:
self.mitigation_effectiveness = effectiveness
else:
# Weighted average
self.mitigation_effectiveness = 0.7 * self.mitigation_effectiveness + 0.3 * effectiveness
def add_correlation(self, other_memory_id: uuid.UUID, strength: float):
"""Add correlation with another security memory pattern"""
if other_memory_id not in self.correlated_patterns:
self.correlated_patterns.append(other_memory_id)
# Update correlation strength
if strength > self.correlation_strength:
self.correlation_strength = strength
|