|
|
""" |
|
|
3️⃣ SECURITY MEMORY - Compressed threat experience |
|
|
Purpose: Stores signals only, never raw data. Enables learning without liability. |
|
|
""" |
|
|
|
|
|
from sqlalchemy import Column, String, DateTime, JSON, Integer, Float, CheckConstraint, Index, ForeignKey, ARRAY |
|
|
from sqlalchemy.dialects.postgresql import UUID |
|
|
from sqlalchemy.orm import relationship |
|
|
from sqlalchemy.sql import func |
|
|
import uuid |
|
|
|
|
|
from database.models.base import Base |
|
|
|
|
|
class SecurityMemory(Base): |
|
|
__tablename__ = "security_memory" |
|
|
|
|
|
|
|
|
memory_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) |
|
|
created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) |
|
|
|
|
|
|
|
|
pattern_signature = Column(String(64), unique=True, nullable=False) |
|
|
pattern_type = Column(String(30), nullable=False) |
|
|
|
|
|
|
|
|
source_domain = Column(String(20), nullable=False) |
|
|
affected_domains = Column(ARRAY(String(20)), nullable=False, default=[]) |
|
|
|
|
|
|
|
|
confidence_delta_vector = Column(JSON, nullable=False) |
|
|
perturbation_statistics = Column(JSON, nullable=False) |
|
|
anomaly_signature_hash = Column(String(64), nullable=False) |
|
|
|
|
|
|
|
|
first_observed = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) |
|
|
last_observed = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) |
|
|
recurrence_count = Column(Integer, nullable=False, default=1, server_default="1") |
|
|
|
|
|
|
|
|
severity_score = Column( |
|
|
Float, |
|
|
nullable=False, |
|
|
default=0.5, |
|
|
server_default="0.5" |
|
|
) |
|
|
|
|
|
confidence_impact = Column( |
|
|
Float, |
|
|
nullable=False, |
|
|
default=0.0, |
|
|
server_default="0.0" |
|
|
) |
|
|
|
|
|
|
|
|
correlated_patterns = Column(ARRAY(UUID(as_uuid=True)), nullable=False, default=[]) |
|
|
correlation_strength = Column(Float, nullable=False, default=0.0, server_default="0.0") |
|
|
|
|
|
|
|
|
effective_mitigations = Column(ARRAY(String(100)), nullable=False, default=[]) |
|
|
mitigation_effectiveness = Column(Float, nullable=False, default=0.0, server_default="0.0") |
|
|
|
|
|
|
|
|
learned_from_models = Column(ARRAY(String(100)), nullable=False, default=[]) |
|
|
compressed_experience = Column(JSON, nullable=False, default=dict, server_default="{}") |
|
|
|
|
|
|
|
|
model_id = Column(String(100), ForeignKey("model_registry.model_id")) |
|
|
model = relationship("ModelRegistry", back_populates="security_memories") |
|
|
|
|
|
|
|
|
__table_args__ = ( |
|
|
CheckConstraint( |
|
|
"pattern_type IN ('confidence_erosion', 'adversarial_pattern', 'anomaly_signature', 'distribution_shift', 'temporal_attack', 'cross_model_correlation')", |
|
|
name="ck_security_memory_pattern_type" |
|
|
), |
|
|
CheckConstraint( |
|
|
"severity_score >= 0.0 AND severity_score <= 1.0", |
|
|
name="ck_security_memory_severity" |
|
|
), |
|
|
CheckConstraint( |
|
|
"confidence_impact >= -1.0 AND confidence_impact <= 1.0", |
|
|
name="ck_security_memory_confidence_impact" |
|
|
), |
|
|
CheckConstraint( |
|
|
"correlation_strength >= 0.0 AND correlation_strength <= 1.0", |
|
|
name="ck_security_memory_correlation" |
|
|
), |
|
|
CheckConstraint( |
|
|
"mitigation_effectiveness >= 0.0 AND mitigation_effectiveness <= 1.0", |
|
|
name="ck_security_memory_mitigation" |
|
|
), |
|
|
Index("idx_security_memory_pattern_type", "pattern_type"), |
|
|
Index("idx_security_memory_severity", "severity_score"), |
|
|
Index("idx_security_memory_recurrence", "recurrence_count"), |
|
|
Index("idx_security_memory_domain", "source_domain"), |
|
|
Index("idx_security_memory_recency", "last_observed"), |
|
|
) |
|
|
|
|
|
def __repr__(self): |
|
|
return f"<SecurityMemory {self.pattern_signature[:16]}...: {self.pattern_type}>" |
|
|
|
|
|
def to_dict(self): |
|
|
"""Convert to dictionary for serialization""" |
|
|
return { |
|
|
"memory_id": str(self.memory_id), |
|
|
"pattern_signature": self.pattern_signature, |
|
|
"pattern_type": self.pattern_type, |
|
|
"source_domain": self.source_domain, |
|
|
"affected_domains": self.affected_domains, |
|
|
"severity_score": self.severity_score, |
|
|
"confidence_impact": self.confidence_impact, |
|
|
"recurrence_count": self.recurrence_count, |
|
|
"first_observed": self.first_observed.isoformat() if self.first_observed else None, |
|
|
"last_observed": self.last_observed.isoformat() if self.last_observed else None, |
|
|
"correlation_strength": self.correlation_strength, |
|
|
"effective_mitigations": self.effective_mitigations, |
|
|
"mitigation_effectiveness": self.mitigation_effectiveness, |
|
|
"learned_from_models": self.learned_from_models |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def get_by_pattern_type(cls, session, pattern_type, limit: int = 100): |
|
|
"""Get security memories by pattern type""" |
|
|
return ( |
|
|
session.query(cls) |
|
|
.filter(cls.pattern_type == pattern_type) |
|
|
.order_by(cls.last_observed.desc()) |
|
|
.limit(limit) |
|
|
.all() |
|
|
) |
|
|
|
|
|
@classmethod |
|
|
def get_recent_threats(cls, session, hours: int = 24, limit: int = 50): |
|
|
"""Get recent threats within specified hours""" |
|
|
from datetime import datetime, timedelta |
|
|
time_threshold = datetime.utcnow() - timedelta(hours=hours) |
|
|
|
|
|
return ( |
|
|
session.query(cls) |
|
|
.filter(cls.last_observed >= time_threshold) |
|
|
.order_by(cls.severity_score.desc(), cls.last_observed.desc()) |
|
|
.limit(limit) |
|
|
.all() |
|
|
) |
|
|
|
|
|
def record_recurrence(self, new_severity: float = None, new_confidence_impact: float = None): |
|
|
"""Record another occurrence of this pattern""" |
|
|
from datetime import datetime |
|
|
|
|
|
self.last_observed = datetime.utcnow() |
|
|
self.recurrence_count += 1 |
|
|
|
|
|
|
|
|
if new_severity is not None: |
|
|
decay = 0.8 |
|
|
self.severity_score = ( |
|
|
decay * self.severity_score + |
|
|
(1 - decay) * new_severity |
|
|
) |
|
|
|
|
|
|
|
|
if new_confidence_impact is not None: |
|
|
self.confidence_impact = ( |
|
|
decay * self.confidence_impact + |
|
|
(1 - decay) * new_confidence_impact |
|
|
) |
|
|
|
|
|
def add_mitigation(self, mitigation: str, effectiveness: float): |
|
|
"""Add a mitigation strategy for this pattern""" |
|
|
if mitigation not in self.effective_mitigations: |
|
|
self.effective_mitigations.append(mitigation) |
|
|
|
|
|
|
|
|
if self.mitigation_effectiveness == 0.0: |
|
|
self.mitigation_effectiveness = effectiveness |
|
|
else: |
|
|
|
|
|
self.mitigation_effectiveness = 0.7 * self.mitigation_effectiveness + 0.3 * effectiveness |
|
|
|
|
|
def add_correlation(self, other_memory_id: uuid.UUID, strength: float): |
|
|
"""Add correlation with another security memory pattern""" |
|
|
if other_memory_id not in self.correlated_patterns: |
|
|
self.correlated_patterns.append(other_memory_id) |
|
|
|
|
|
|
|
|
if strength > self.correlation_strength: |
|
|
self.correlation_strength = strength |
|
|
|