File size: 3,838 Bytes
92add6c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
import hashlib
import secrets
import time
from typing import Optional
from sqlalchemy.orm import Session
from .models import UserSession, User
def generate_session_hash(user_id: int, session_name: str) -> str:
"""
Genera un hash único para una sesión basado en:
- ID del usuario
- Nombre de la sesión
- Timestamp actual
- Salt aleatorio
"""
# Crear un salt aleatorio
salt = secrets.token_hex(16)
# Crear string único combinando datos
unique_string = f"{user_id}:{session_name}:{time.time()}:{salt}"
# Generar hash SHA-256
session_hash = hashlib.sha256(unique_string.encode()).hexdigest()
return session_hash
def create_private_session(
db: Session,
user_id: int,
session_name: str
) -> UserSession:
"""
Crea una nueva sesión privada con hash único
"""
# Generar hash único
session_hash = generate_session_hash(user_id, session_name)
# Verificar que el hash sea único (muy improbable que se repita, pero por seguridad)
while db.query(UserSession).filter(UserSession.session_hash == session_hash).first():
session_hash = generate_session_hash(user_id, session_name)
# Crear la sesión
new_session = UserSession(
session_name=session_name,
session_hash=session_hash,
user_id=user_id,
is_active=True
)
db.add(new_session)
db.commit()
db.refresh(new_session)
return new_session
def verify_session_access(
db: Session,
session_hash: str,
user_id: Optional[int] = None
) -> Optional[UserSession]:
"""
Verifica el acceso a una sesión mediante hash.
Args:
db: Sesión de base de datos
session_hash: Hash de la sesión
user_id: ID del usuario (opcional, para verificación adicional)
Returns:
UserSession si el acceso es válido, None si no
"""
query = db.query(UserSession).filter(
UserSession.session_hash == session_hash,
UserSession.is_active == True
)
# Si se proporciona user_id, verificar que coincida
if user_id is not None:
query = query.filter(UserSession.user_id == user_id)
return query.first()
def get_session_by_hash(db: Session, session_hash: str) -> Optional[UserSession]:
"""
Obtiene una sesión por su hash
"""
return db.query(UserSession).filter(
UserSession.session_hash == session_hash,
UserSession.is_active == True
).first()
def get_user_sessions(db: Session, user_id: int) -> list[UserSession]:
"""
Obtiene todas las sesiones activas de un usuario
"""
return db.query(UserSession).filter(
UserSession.user_id == user_id,
UserSession.is_active == True
).all()
def deactivate_session(db: Session, session_hash: str, user_id: int) -> bool:
"""
Desactiva una sesión (solo el propietario puede hacerlo)
"""
session = db.query(UserSession).filter(
UserSession.session_hash == session_hash,
UserSession.user_id == user_id,
UserSession.is_active == True
).first()
if session:
session.is_active = False
db.commit()
return True
return False
def is_session_owner(db: Session, session_hash: str, user_id: int) -> bool:
"""
Verifica si un usuario es propietario de una sesión
"""
session = db.query(UserSession).filter(
UserSession.session_hash == session_hash,
UserSession.user_id == user_id,
UserSession.is_active == True
).first()
return session is not None
def generate_session_url(session_hash: str, base_url: str = "http://localhost:8002") -> str:
"""
Genera una URL para acceder a una sesión específica
"""
return f"{base_url}/session/{session_hash}"
|