Spaces:
Running
Running
| """ | |
| AdaptiveAuth Database Models | |
| All SQLAlchemy models for the authentication framework. | |
| """ | |
| import enum | |
| from datetime import datetime | |
| from sqlalchemy import ( | |
| Boolean, Column, Integer, String, DateTime, Float, | |
| ForeignKey, Text, JSON, Enum, Index | |
| ) | |
| from sqlalchemy.orm import relationship, declarative_base | |
| Base = declarative_base() | |
| class UserRole(str, enum.Enum): | |
| """User role enumeration.""" | |
| USER = "user" | |
| ADMIN = "admin" | |
| SUPERADMIN = "superadmin" | |
| class RiskLevel(str, enum.Enum): | |
| """Risk level enumeration for security assessment.""" | |
| LOW = "low" | |
| MEDIUM = "medium" | |
| HIGH = "high" | |
| CRITICAL = "critical" | |
| class SecurityLevel(int, enum.Enum): | |
| """Security level (0-4) based on risk assessment.""" | |
| LEVEL_0 = 0 # Known device + IP + browser - minimal auth | |
| LEVEL_1 = 1 # Unknown browser - password only | |
| LEVEL_2 = 2 # Unknown IP - password + email verification | |
| LEVEL_3 = 3 # Unknown device - password + 2FA | |
| LEVEL_4 = 4 # Suspicious pattern - blocked/full verification | |
| class SessionStatus(str, enum.Enum): | |
| """Session status enumeration.""" | |
| ACTIVE = "active" | |
| EXPIRED = "expired" | |
| REVOKED = "revoked" | |
| SUSPICIOUS = "suspicious" | |
| # ======================== USER MODELS ======================== | |
| class User(Base): | |
| """Main user model with authentication data.""" | |
| __tablename__ = "adaptiveauth_users" | |
| id = Column(Integer, primary_key=True, index=True) | |
| email = Column(String(255), unique=True, index=True, nullable=False) | |
| full_name = Column(String(255), nullable=True) | |
| password_hash = Column(String(255), nullable=False) | |
| role = Column(String(50), default=UserRole.USER.value) | |
| # Account Status | |
| is_active = Column(Boolean, default=True) | |
| is_verified = Column(Boolean, default=False) | |
| is_locked = Column(Boolean, default=False) | |
| locked_until = Column(DateTime, nullable=True) | |
| # 2FA Settings | |
| tfa_enabled = Column(Boolean, default=False) | |
| tfa_secret = Column(String(255), nullable=True) | |
| # Security Tracking | |
| failed_login_attempts = Column(Integer, default=0) | |
| last_failed_login = Column(DateTime, nullable=True) | |
| last_successful_login = Column(DateTime, nullable=True) | |
| password_changed_at = Column(DateTime, default=datetime.utcnow) | |
| # Timestamps | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) | |
| # Relationships | |
| profile = relationship("UserProfile", back_populates="user", uselist=False, cascade="all, delete-orphan") | |
| login_attempts = relationship("LoginAttempt", back_populates="user", cascade="all, delete-orphan") | |
| sessions = relationship("UserSession", back_populates="user", cascade="all, delete-orphan") | |
| risk_events = relationship("RiskEvent", back_populates="user", cascade="all, delete-orphan") | |
| __table_args__ = ( | |
| Index("ix_user_email_active", "email", "is_active"), | |
| ) | |
| class UserProfile(Base): | |
| """User behavioral profile for risk assessment.""" | |
| __tablename__ = "adaptiveauth_user_profiles" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), unique=True) | |
| # Known Devices & Browsers (JSON arrays) | |
| known_devices = Column(JSON, default=list) # [{fingerprint, name, first_seen, last_seen}] | |
| known_browsers = Column(JSON, default=list) # [{user_agent, first_seen, last_seen}] | |
| known_ips = Column(JSON, default=list) # [{ip, location, first_seen, last_seen}] | |
| # Login Patterns | |
| typical_login_hours = Column(JSON, default=list) # [8, 9, 10, ...] typical hours | |
| typical_login_days = Column(JSON, default=list) # [0, 1, 2, ...] typical weekdays | |
| average_session_duration = Column(Float, default=0.0) | |
| # Risk History | |
| risk_score_history = Column(JSON, default=list) # [{timestamp, score, factors}] | |
| total_logins = Column(Integer, default=0) | |
| successful_logins = Column(Integer, default=0) | |
| failed_logins = Column(Integer, default=0) | |
| # Timestamps | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) | |
| # Relationships | |
| user = relationship("User", back_populates="profile") | |
| # ======================== AUTHENTICATION MODELS ======================== | |
| class LoginAttempt(Base): | |
| """Login attempt history for analysis.""" | |
| __tablename__ = "adaptiveauth_login_attempts" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), nullable=True) | |
| email = Column(String(255), index=True) # Store email even if user doesn't exist | |
| # Request Context | |
| ip_address = Column(String(45)) | |
| user_agent = Column(Text) | |
| device_fingerprint = Column(String(255), nullable=True) | |
| # Geolocation (if available) | |
| country = Column(String(100), nullable=True) | |
| city = Column(String(100), nullable=True) | |
| latitude = Column(Float, nullable=True) | |
| longitude = Column(Float, nullable=True) | |
| # Risk Assessment | |
| risk_score = Column(Float, default=0.0) | |
| risk_level = Column(String(20), default=RiskLevel.LOW.value) | |
| security_level = Column(Integer, default=0) | |
| risk_factors = Column(JSON, default=dict) | |
| # Result | |
| success = Column(Boolean, default=False) | |
| failure_reason = Column(String(255), nullable=True) | |
| required_action = Column(String(100), nullable=True) # e.g., "2fa", "email_verify", "blocked" | |
| # Timestamps | |
| attempted_at = Column(DateTime, default=datetime.utcnow, index=True) | |
| # Relationships | |
| user = relationship("User", back_populates="login_attempts") | |
| __table_args__ = ( | |
| Index("ix_login_attempt_user_time", "user_id", "attempted_at"), | |
| Index("ix_login_attempt_ip", "ip_address"), | |
| ) | |
| class UserSession(Base): | |
| """Active user sessions with risk monitoring.""" | |
| __tablename__ = "adaptiveauth_sessions" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE")) | |
| session_token = Column(String(255), unique=True, index=True) | |
| # Session Context | |
| ip_address = Column(String(45)) | |
| user_agent = Column(Text) | |
| device_fingerprint = Column(String(255), nullable=True) | |
| # Location | |
| country = Column(String(100), nullable=True) | |
| city = Column(String(100), nullable=True) | |
| # Risk Status | |
| current_risk_score = Column(Float, default=0.0) | |
| current_risk_level = Column(String(20), default=RiskLevel.LOW.value) | |
| status = Column(String(20), default=SessionStatus.ACTIVE.value) | |
| step_up_completed = Column(Boolean, default=False) | |
| # Activity Tracking | |
| last_activity = Column(DateTime, default=datetime.utcnow) | |
| activity_count = Column(Integer, default=0) | |
| # Timestamps | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| expires_at = Column(DateTime, nullable=False) | |
| # Relationships | |
| user = relationship("User", back_populates="sessions") | |
| __table_args__ = ( | |
| Index("ix_session_user_status", "user_id", "status"), | |
| Index("ix_session_token", "session_token"), | |
| ) | |
| class TokenBlacklist(Base): | |
| """Blacklisted/revoked JWT tokens.""" | |
| __tablename__ = "adaptiveauth_token_blacklist" | |
| id = Column(Integer, primary_key=True, index=True) | |
| token = Column(String(500), unique=True, index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="SET NULL"), nullable=True) | |
| reason = Column(String(255), nullable=True) | |
| blacklisted_at = Column(DateTime, default=datetime.utcnow) | |
| expires_at = Column(DateTime, nullable=True) | |
| class PasswordResetCode(Base): | |
| """Password reset tokens.""" | |
| __tablename__ = "adaptiveauth_password_resets" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE")) | |
| email = Column(String(255), index=True) | |
| reset_code = Column(String(255), unique=True, index=True) | |
| is_used = Column(Boolean, default=False) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| expires_at = Column(DateTime, nullable=False) | |
| class EmailVerificationCode(Base): | |
| """Email verification codes.""" | |
| __tablename__ = "adaptiveauth_email_verifications" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE")) | |
| email = Column(String(255), index=True) | |
| verification_code = Column(String(255), unique=True, index=True) | |
| is_used = Column(Boolean, default=False) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| expires_at = Column(DateTime, nullable=False) | |
| # ======================== RISK ASSESSMENT MODELS ======================== | |
| class RiskEvent(Base): | |
| """Risk events for logging and analysis.""" | |
| __tablename__ = "adaptiveauth_risk_events" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), nullable=True) | |
| # Event Details | |
| event_type = Column(String(100), nullable=False) # login, session_activity, suspicious_pattern | |
| risk_score = Column(Float, default=0.0) | |
| risk_level = Column(String(20), default=RiskLevel.LOW.value) | |
| security_level = Column(Integer, default=0) | |
| # Context | |
| ip_address = Column(String(45)) | |
| user_agent = Column(Text, nullable=True) | |
| device_fingerprint = Column(String(255), nullable=True) | |
| # Risk Details | |
| risk_factors = Column(JSON, default=dict) # {factor: score} | |
| triggered_rules = Column(JSON, default=list) # [rule_name, ...] | |
| # Action Taken | |
| action_required = Column(String(100), nullable=True) | |
| action_taken = Column(String(100), nullable=True) | |
| resolved = Column(Boolean, default=False) | |
| # Timestamps | |
| created_at = Column(DateTime, default=datetime.utcnow, index=True) | |
| resolved_at = Column(DateTime, nullable=True) | |
| # Relationships | |
| user = relationship("User", back_populates="risk_events") | |
| __table_args__ = ( | |
| Index("ix_risk_event_user_time", "user_id", "created_at"), | |
| Index("ix_risk_event_type", "event_type"), | |
| ) | |
| class AnomalyPattern(Base): | |
| """Detected anomaly patterns for suspicious activity.""" | |
| __tablename__ = "adaptiveauth_anomaly_patterns" | |
| id = Column(Integer, primary_key=True, index=True) | |
| # Pattern Details | |
| pattern_type = Column(String(100), nullable=False) # brute_force, impossible_travel, credential_stuffing | |
| pattern_data = Column(JSON, default=dict) | |
| # Scope | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), nullable=True) | |
| ip_address = Column(String(45), nullable=True) | |
| # Severity | |
| severity = Column(String(20), default=RiskLevel.MEDIUM.value) | |
| confidence = Column(Float, default=0.0) # 0.0 to 1.0 | |
| # Status | |
| is_active = Column(Boolean, default=True) | |
| false_positive = Column(Boolean, default=False) | |
| # Timestamps | |
| first_detected = Column(DateTime, default=datetime.utcnow) | |
| last_detected = Column(DateTime, default=datetime.utcnow) | |
| resolved_at = Column(DateTime, nullable=True) | |
| class SessionTrustEvent(Base): | |
| """Trust score change events throughout a session's lifecycle. (Feature 1 & 3)""" | |
| __tablename__ = "adaptiveauth_trust_events" | |
| id = Column(Integer, primary_key=True, index=True) | |
| session_id = Column(Integer, ForeignKey("adaptiveauth_sessions.id", ondelete="CASCADE"), index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), index=True) | |
| trust_score = Column(Float, nullable=False) # score AFTER this event | |
| delta = Column(Float, default=0.0) # how much it changed | |
| event_type = Column(String(100), nullable=False) # decay | behavior | context_change | micro_challenge | impossible_travel | |
| reason = Column(String(255), nullable=True) | |
| signals = Column(JSON, default=dict) # contributing signal values | |
| created_at = Column(DateTime, default=datetime.utcnow, index=True) | |
| class BehaviorSignalRecord(Base): | |
| """Privacy-first behavior signal records. Only aggregated scores are stored. (Feature 2 & 8)""" | |
| __tablename__ = "adaptiveauth_behavior_signals" | |
| id = Column(Integer, primary_key=True, index=True) | |
| session_id = Column(Integer, ForeignKey("adaptiveauth_sessions.id", ondelete="CASCADE"), index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE"), index=True) | |
| # Client-computed signals (0.0–1.0 each). Raw keystrokes/mouse coords are NEVER sent. | |
| typing_entropy = Column(Float, nullable=True) # 1.0 = very human-like keystroke rhythm | |
| mouse_linearity = Column(Float, nullable=True) # higher = more curved/natural paths | |
| scroll_variance = Column(Float, nullable=True) # moderate variance = normal human | |
| local_risk_score = Column(Float, nullable=True) # client-side composite (privacy-first) | |
| anomaly_score = Column(Float, default=0.0) # server-computed 0–100 | |
| recorded_at = Column(DateTime, default=datetime.utcnow, index=True) | |
| class StepUpChallenge(Base): | |
| """Step-up authentication challenges.""" | |
| __tablename__ = "adaptiveauth_stepup_challenges" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, ForeignKey("adaptiveauth_users.id", ondelete="CASCADE")) | |
| session_id = Column(Integer, ForeignKey("adaptiveauth_sessions.id", ondelete="CASCADE"), nullable=True) | |
| # Challenge Details | |
| challenge_type = Column(String(50), nullable=False) # otp, email, sms, security_question | |
| challenge_code = Column(String(255), nullable=True) | |
| # Status | |
| is_completed = Column(Boolean, default=False) | |
| attempts = Column(Integer, default=0) | |
| max_attempts = Column(Integer, default=3) | |
| # Timestamps | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| expires_at = Column(DateTime, nullable=False) | |
| completed_at = Column(DateTime, nullable=True) | |