""" AdaptiveAuth Database Models All SQLAlchemy models for the authentication framework. """ import enum from datetime import datetime from sqlalchemy import ( Boolean, Column, Integer, String, DateTime, Float, ForeignKey, Text, JSON, Enum, Index ) from sqlalchemy.orm import relationship, declarative_base Base = declarative_base() class UserRole(str, enum.Enum): """User role enumeration.""" USER = "user" ADMIN = "admin" SUPERADMIN = "superadmin" class RiskLevel(str, enum.Enum): """Risk level enumeration for security assessment.""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class SecurityLevel(int, enum.Enum): """Security level (0-4) based on risk assessment.""" LEVEL_0 = 0 # Known device + IP + browser - minimal auth LEVEL_1 = 1 # Unknown browser - password only LEVEL_2 = 2 # Unknown IP - password + email verification LEVEL_3 = 3 # Unknown device - password + 2FA LEVEL_4 = 4 # Suspicious pattern - blocked/full verification class SessionStatus(str, enum.Enum): """Session status enumeration.""" ACTIVE = "active" EXPIRED = "expired" REVOKED = "revoked" SUSPICIOUS = "suspicious" # ======================== USER MODELS ======================== class User(Base): """Main user model with authentication data.""" __tablename__ = "adaptiveauth_users" id = Column(Integer, primary_key=True, index=True) email = Column(String(255), unique=True, index=True, nullable=False) full_name = Column(String(255), nullable=True) password_hash = Column(String(255), nullable=False) role = Column(String(50), default=UserRole.USER.value) # Account Status is_active = Column(Boolean, default=True) is_verified = Column(Boolean, default=False) is_locked = Column(Boolean, default=False) locked_until = Column(DateTime, nullable=True) # 2FA Settings tfa_enabled = Column(Boolean, default=False) tfa_secret = Column(String(255), nullable=True) # Security Tracking failed_login_attempts = Column(Integer, default=0) last_failed_login = Column(DateTime, nullable=True) last_successful_login = Column(DateTime, nullable=True) password_changed_at = Column(DateTime, default=datetime.utcnow) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships profile = relationship("UserProfile", back_populates="user", uselist=False, cascade="all, delete-orphan") login_attempts = relationship("LoginAttempt", back_populates="user", cascade="all, delete-orphan") sessions = relationship("UserSession", back_populates="user", cascade="all, delete-orphan") risk_events = relationship("RiskEvent", back_populates="user", cascade="all, delete-orphan") __table_args__ = ( Index("ix_user_email_active", "email", "is_active"), ) class UserProfile(Base): """User behavioral profile for risk assessment.""" __tablename__ = "adaptiveauth_user_profiles" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), unique=True) # Known Devices & Browsers (JSON arrays) known_devices = Column(JSON, default=list) # [{fingerprint, name, first_seen, last_seen}] known_browsers = Column(JSON, default=list) # [{user_agent, first_seen, last_seen}] known_ips = Column(JSON, default=list) # [{ip, location, first_seen, last_seen}] # Login Patterns typical_login_hours = Column(JSON, default=list) # [8, 9, 10, ...] typical hours typical_login_days = Column(JSON, default=list) # [0, 1, 2, ...] typical weekdays average_session_duration = Column(Float, default=0.0) # Risk History risk_score_history = Column(JSON, default=list) # [{timestamp, score, factors}] total_logins = Column(Integer, default=0) successful_logins = Column(Integer, default=0) failed_logins = Column(Integer, default=0) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships user = relationship("User", back_populates="profile") # ======================== AUTHENTICATION MODELS ======================== class LoginAttempt(Base): """Login attempt history for analysis.""" __tablename__ = "adaptiveauth_login_attempts" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), nullable=True) email = Column(String(255), index=True) # Store email even if user doesn't exist # Request Context ip_address = Column(String(45)) user_agent = Column(Text) device_fingerprint = Column(String(255), nullable=True) # Geolocation (if available) country = Column(String(100), nullable=True) city = Column(String(100), nullable=True) latitude = Column(Float, nullable=True) longitude = Column(Float, nullable=True) # Risk Assessment risk_score = Column(Float, default=0.0) risk_level = Column(String(20), default=RiskLevel.LOW.value) security_level = Column(Integer, default=0) risk_factors = Column(JSON, default=dict) # Result success = Column(Boolean, default=False) failure_reason = Column(String(255), nullable=True) required_action = Column(String(100), nullable=True) # e.g., "2fa", "email_verify", "blocked" # Timestamps attempted_at = Column(DateTime, default=datetime.utcnow, index=True) # Relationships user = relationship("User", back_populates="login_attempts") __table_args__ = ( Index("ix_login_attempt_user_time", "user_id", "attempted_at"), Index("ix_login_attempt_ip", "ip_address"), ) class UserSession(Base): """Active user sessions with risk monitoring.""" __tablename__ = "adaptiveauth_sessions" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE")) session_token = Column(String(255), unique=True, index=True) # Session Context ip_address = Column(String(45)) user_agent = Column(Text) device_fingerprint = Column(String(255), nullable=True) # Location country = Column(String(100), nullable=True) city = Column(String(100), nullable=True) # Risk Status current_risk_score = Column(Float, default=0.0) current_risk_level = Column(String(20), default=RiskLevel.LOW.value) status = Column(String(20), default=SessionStatus.ACTIVE.value) step_up_completed = Column(Boolean, default=False) # Activity Tracking last_activity = Column(DateTime, default=datetime.utcnow) activity_count = Column(Integer, default=0) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) expires_at = Column(DateTime, nullable=False) # Relationships user = relationship("User", back_populates="sessions") __table_args__ = ( Index("ix_session_user_status", "user_id", "status"), Index("ix_session_token", "session_token"), ) class TokenBlacklist(Base): """Blacklisted/revoked JWT tokens.""" __tablename__ = "adaptiveauth_token_blacklist" id = Column(Integer, primary_key=True, index=True) token = Column(String(500), unique=True, index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="SET NULL"), nullable=True) reason = Column(String(255), nullable=True) blacklisted_at = Column(DateTime, default=datetime.utcnow) expires_at = Column(DateTime, nullable=True) class PasswordResetCode(Base): """Password reset tokens.""" __tablename__ = "adaptiveauth_password_resets" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE")) email = Column(String(255), index=True) reset_code = Column(String(255), unique=True, index=True) is_used = Column(Boolean, default=False) created_at = Column(DateTime, default=datetime.utcnow) expires_at = Column(DateTime, nullable=False) class EmailVerificationCode(Base): """Email verification codes.""" __tablename__ = "adaptiveauth_email_verifications" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE")) email = Column(String(255), index=True) verification_code = Column(String(255), unique=True, index=True) is_used = Column(Boolean, default=False) created_at = Column(DateTime, default=datetime.utcnow) expires_at = Column(DateTime, nullable=False) # ======================== RISK ASSESSMENT MODELS ======================== class RiskEvent(Base): """Risk events for logging and analysis.""" __tablename__ = "adaptiveauth_risk_events" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), nullable=True) # Event Details event_type = Column(String(100), nullable=False) # login, session_activity, suspicious_pattern risk_score = Column(Float, default=0.0) risk_level = Column(String(20), default=RiskLevel.LOW.value) security_level = Column(Integer, default=0) # Context ip_address = Column(String(45)) user_agent = Column(Text, nullable=True) device_fingerprint = Column(String(255), nullable=True) # Risk Details risk_factors = Column(JSON, default=dict) # {factor: score} triggered_rules = Column(JSON, default=list) # [rule_name, ...] # Action Taken action_required = Column(String(100), nullable=True) action_taken = Column(String(100), nullable=True) resolved = Column(Boolean, default=False) # Timestamps created_at = Column(DateTime, default=datetime.utcnow, index=True) resolved_at = Column(DateTime, nullable=True) # Relationships user = relationship("User", back_populates="risk_events") __table_args__ = ( Index("ix_risk_event_user_time", "user_id", "created_at"), Index("ix_risk_event_type", "event_type"), ) class AnomalyPattern(Base): """Detected anomaly patterns for suspicious activity.""" __tablename__ = "adaptiveauth_anomaly_patterns" id = Column(Integer, primary_key=True, index=True) # Pattern Details pattern_type = Column(String(100), nullable=False) # brute_force, impossible_travel, credential_stuffing pattern_data = Column(JSON, default=dict) # Scope user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), nullable=True) ip_address = Column(String(45), nullable=True) # Severity severity = Column(String(20), default=RiskLevel.MEDIUM.value) confidence = Column(Float, default=0.0) # 0.0 to 1.0 # Status is_active = Column(Boolean, default=True) false_positive = Column(Boolean, default=False) # Timestamps first_detected = Column(DateTime, default=datetime.utcnow) last_detected = Column(DateTime, default=datetime.utcnow) resolved_at = Column(DateTime, nullable=True) class SessionTrustEvent(Base): """Trust score change events throughout a session's lifecycle. (Feature 1 & 3)""" __tablename__ = "adaptiveauth_trust_events" id = Column(Integer, primary_key=True, index=True) session_id = Column(Integer, ForeignKey("adaptiveauth_sessions.id", ondelete="CASCADE"), index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), index=True) trust_score = Column(Float, nullable=False) # score AFTER this event delta = Column(Float, default=0.0) # how much it changed event_type = Column(String(100), nullable=False) # decay | behavior | context_change | micro_challenge | impossible_travel reason = Column(String(255), nullable=True) signals = Column(JSON, default=dict) # contributing signal values created_at = Column(DateTime, default=datetime.utcnow, index=True) class BehaviorSignalRecord(Base): """Privacy-first behavior signal records. Only aggregated scores are stored. (Feature 2 & 8)""" __tablename__ = "adaptiveauth_behavior_signals" id = Column(Integer, primary_key=True, index=True) session_id = Column(Integer, ForeignKey("adaptiveauth_sessions.id", ondelete="CASCADE"), index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), index=True) # Client-computed signals (0.0–1.0 each). Raw keystrokes/mouse coords are NEVER sent. typing_entropy = Column(Float, nullable=True) # 1.0 = very human-like keystroke rhythm mouse_linearity = Column(Float, nullable=True) # higher = more curved/natural paths scroll_variance = Column(Float, nullable=True) # moderate variance = normal human local_risk_score = Column(Float, nullable=True) # client-side composite (privacy-first) anomaly_score = Column(Float, default=0.0) # server-computed 0–100 recorded_at = Column(DateTime, default=datetime.utcnow, index=True) class StepUpChallenge(Base): """Step-up authentication challenges.""" __tablename__ = "adaptiveauth_stepup_challenges" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE")) session_id = Column(Integer, ForeignKey("adaptiveauth_sessions.id", ondelete="CASCADE"), nullable=True) # Challenge Details challenge_type = Column(String(50), nullable=False) # otp, email, sms, security_question challenge_code = Column(String(255), nullable=True) # Status is_completed = Column(Boolean, default=False) attempts = Column(Integer, default=0) max_attempts = Column(Integer, default=3) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) expires_at = Column(DateTime, nullable=False) completed_at = Column(DateTime, nullable=True)