Spaces:
Paused
Paused
| """ | |
| User and Team Models | |
| Contains User, Team, Project, and related models for user management | |
| and project organization. | |
| """ | |
| import json | |
| import uuid | |
| from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, String | |
| from sqlalchemy.orm import relationship | |
| from .base import Base, EncryptedString, UserRole, utc_now | |
| class User(Base): | |
| __tablename__ = "users" | |
| id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) | |
| username = Column(String, unique=True, nullable=False, index=True) | |
| email = Column(EncryptedString, unique=True, nullable=False) | |
| full_name = Column(EncryptedString, nullable=False) | |
| password_hash = Column(String, nullable=False) | |
| role = Column(String, default=UserRole.INVESTIGATOR) | |
| is_active = Column(Boolean, default=True) | |
| created_at = Column(DateTime, default=utc_now) | |
| last_login = Column(DateTime) | |
| # MFA fields | |
| mfa_enabled = Column(Boolean, default=False) | |
| mfa_secret = Column(EncryptedString, nullable=True) | |
| # Preferences - Store as JSON string for compatibility | |
| preferences = Column(String, default=lambda: json.dumps({})) | |
| # Relationships | |
| cases = relationship( | |
| "Case", back_populates="assignee", foreign_keys="Case.assignee_id" | |
| ) | |
| activities = relationship("CaseActivity", back_populates="user") | |
| devices = relationship("UserDevice", backref="user") | |
| onboarding_state = relationship("UserOnboardingState", backref="user") | |
| class Team(Base): | |
| __tablename__ = "teams" | |
| id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) | |
| name = Column(String, nullable=False) | |
| description = Column(String) | |
| created_at = Column(DateTime, default=utc_now) | |
| # Relationships - members relationship removed due to missing association table | |
| class Project(Base): | |
| __tablename__ = "projects" | |
| id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) | |
| name = Column(String, nullable=False, index=True) | |
| description = Column(String) | |
| created_at = Column(DateTime, default=utc_now) | |
| created_by = Column(String, ForeignKey("users.id")) | |
| # Relationships | |
| cases = relationship("Case", back_populates="project") | |
| class UserDevice(Base): | |
| __tablename__ = "user_devices" | |
| id = Column(String, primary_key=True, index=True, default=lambda: str(uuid.uuid4())) | |
| user_id = Column(String, ForeignKey("users.id"), index=True) | |
| device_name = Column(String) | |
| device_type = Column(String) | |
| ip_address = Column(String) | |
| last_login = Column(DateTime, default=utc_now) | |
| is_trusted = Column(Boolean, default=False) | |
| created_at = Column(DateTime, default=utc_now) | |
| class UserSession(Base): | |
| __tablename__ = "user_sessions" | |
| id = Column(String, primary_key=True, index=True, default=lambda: str(uuid.uuid4())) | |
| user_id = Column(String, ForeignKey("users.id"), index=True, nullable=False) | |
| session_token = Column(String, unique=True, index=True, nullable=False) | |
| device_info = Column(String) # JSON string with device/browser info | |
| ip_address = Column(String, nullable=False) | |
| user_agent = Column(String) | |
| location = Column(String) # Geolocation data | |
| is_active = Column(Boolean, default=True, index=True) | |
| is_current = Column(Boolean, default=False) # Mark current session | |
| expires_at = Column(DateTime, index=True) | |
| last_activity = Column(DateTime, default=utc_now, index=True) | |
| created_at = Column(DateTime, default=utc_now) | |
| # Relationships | |
| user = relationship("User", backref="sessions") | |
| __table_args__ = ( | |
| Index("idx_user_sessions_active", "user_id", "is_active"), | |
| Index("idx_user_sessions_expires", "expires_at"), | |
| ) | |
| class LoginAttempt(Base): | |
| __tablename__ = "login_attempts" | |
| id = Column(String, primary_key=True, index=True, default=lambda: str(uuid.uuid4())) | |
| email = Column(String, index=True) # Store email for non-existent users | |
| user_id = Column(String, ForeignKey("users.id"), index=True, nullable=True) | |
| ip_address = Column(String, nullable=False, index=True) | |
| user_agent = Column(String) | |
| location = Column(String) | |
| device_type = Column(String, default="desktop") | |
| success = Column(Boolean, default=False, index=True) | |
| failure_reason = Column(String) | |
| attempted_at = Column(DateTime, default=utc_now, index=True) | |
| # Relationships | |
| user = relationship("User", backref="login_attempts") | |
| __table_args__ = ( | |
| Index("idx_login_attempts_user_time", "user_id", "attempted_at"), | |
| Index("idx_login_attempts_ip_time", "ip_address", "attempted_at"), | |
| ) | |
| class UserOnboardingState(Base): | |
| __tablename__ = "user_onboarding_states" | |
| user_id = Column(String, ForeignKey("users.id"), primary_key=True) | |
| checklist_state = Column(String, default=lambda: json.dumps(dict())) | |
| completed_at = Column(DateTime, nullable=True) | |
| updated_at = Column(DateTime, default=utc_now, onupdate=utc_now) | |
| class RookieChecklist(Base): | |
| __tablename__ = "rookie_checklists" | |
| id = Column(String, primary_key=True, index=True) | |
| user_email = Column(EncryptedString, index=True) | |
| user_id = Column(String, index=True) | |
| items = Column(EncryptedString, default=lambda: json.dumps([])) | |
| extra_metadata = Column(EncryptedString, default=lambda: json.dumps({})) | |
| created_at = Column(DateTime, default=utc_now, index=True) | |
| __table_args__ = ( | |
| Index("idx_rookie_user_email", "user_email"), | |
| Index("idx_rookie_created", "created_at"), | |
| ) | |
| # Training and compliance models | |
| class TrainingRecord(Base): | |
| __tablename__ = "training_records" | |
| id = Column(String, primary_key=True, index=True) | |
| user_id = Column(String, index=True) | |
| training_type = Column( | |
| String, index=True | |
| ) # security_awareness, compliance, technical | |
| training_module = Column(String, index=True) # specific course or topic | |
| completion_status = Column( | |
| String, default="not_started", index=True | |
| ) # not_started, in_progress, completed, failed | |
| score = Column(Float) # Test score if applicable | |
| completion_date = Column(DateTime, index=True) | |
| expiry_date = Column(DateTime, index=True) | |
| certificate_issued = Column(Boolean, default=False) | |
| certificate_id = Column(String) | |
| training_provider = Column(String, index=True) | |
| training_duration_hours = Column(Float) | |
| created_at = Column(DateTime, default=utc_now) | |
| updated_at = Column(DateTime, default=utc_now, onupdate=utc_now) | |
| __table_args__ = ( | |
| Index("idx_training_user_type", "user_id", "training_type"), | |
| Index("idx_training_completion", "completion_status", "completion_date"), | |
| Index("idx_training_expiry", "expiry_date"), | |
| ) | |
| class AccessReview(Base): | |
| __tablename__ = "access_reviews" | |
| id = Column(String, primary_key=True, index=True) | |
| user_id = Column(String, index=True) | |
| reviewer_id = Column(String, index=True) | |
| review_period_start = Column(DateTime, index=True) | |
| review_period_end = Column(DateTime, index=True) | |
| review_status = Column( | |
| String, default="pending", index=True | |
| ) # pending, in_progress, completed, overdue | |
| overall_risk_assessment = Column(String, index=True) # low, medium, high, critical | |
| findings = Column( | |
| String, default=lambda: json.dumps([]) | |
| ) # Specific access issues found | |
| recommendations = Column( | |
| String, default=lambda: json.dumps([]) | |
| ) # Remediation actions | |
| approval_status = Column(String, default="pending") # pending, approved, rejected | |
| approved_by = Column(String) | |
| approved_at = Column(DateTime) | |
| next_review_date = Column(DateTime, index=True) | |
| created_at = Column(DateTime, default=utc_now) | |
| updated_at = Column(DateTime, default=utc_now, onupdate=utc_now) | |
| __all__ = [ | |
| "User", | |
| "Team", | |
| "Project", | |
| "UserDevice", | |
| "UserSession", | |
| "LoginAttempt", | |
| "UserOnboardingState", | |
| "RookieChecklist", | |
| "TrainingRecord", | |
| "AccessReview", | |
| ] | |