| |
|
| |
|
| | import os
|
| | import streamlit as st
|
| | from azure.cosmos import CosmosClient, exceptions
|
| | from azure.cosmos.exceptions import CosmosHttpResponseError
|
| | import bcrypt
|
| | import base64
|
| | from ..database.sql_db import (
|
| | get_user,
|
| | get_student_user,
|
| | get_admin_user,
|
| | create_student_user,
|
| | update_student_user,
|
| | delete_student_user,
|
| | record_login,
|
| | record_logout
|
| | )
|
| |
|
| | import logging
|
| |
|
| | from datetime import datetime, timezone
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | def clean_and_validate_key(key):
|
| | """Limpia y valida la clave de CosmosDB"""
|
| | key = key.strip()
|
| | while len(key) % 4 != 0:
|
| | key += '='
|
| | try:
|
| | base64.b64decode(key)
|
| | return key
|
| | except:
|
| | raise ValueError("La clave proporcionada no es v谩lida")
|
| |
|
| |
|
| | endpoint = os.getenv("COSMOS_ENDPOINT")
|
| | key = os.getenv("COSMOS_KEY")
|
| |
|
| | if not endpoint or not key:
|
| | raise ValueError("Las variables de entorno COSMOS_ENDPOINT y COSMOS_KEY deben estar configuradas")
|
| |
|
| | key = clean_and_validate_key(key)
|
| |
|
| |
|
| | def authenticate_user(username, password):
|
| | """Autentica un usuario y registra el inicio de sesi贸n"""
|
| | try:
|
| | user_item = get_user(username)
|
| |
|
| | if not user_item:
|
| | logger.warning(f"Usuario no encontrado: {username}")
|
| | return False, None
|
| |
|
| | if verify_password(user_item['password'], password):
|
| | logger.info(f"Usuario autenticado: {username}, Rol: {user_item['role']}")
|
| |
|
| | try:
|
| | session_id = record_login(username)
|
| | if session_id:
|
| | st.session_state.session_id = session_id
|
| | st.session_state.username = username
|
| | st.session_state.login_time = datetime.now(timezone.utc).isoformat()
|
| | logger.info(f"Sesi贸n iniciada: {session_id}")
|
| | else:
|
| | logger.warning("No se pudo registrar la sesi贸n")
|
| | except Exception as e:
|
| | logger.error(f"Error al registrar inicio de sesi贸n: {str(e)}")
|
| |
|
| | return True, user_item['role']
|
| |
|
| | logger.warning(f"Contrase帽a incorrecta para usuario: {username}")
|
| | return False, None
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error durante la autenticaci贸n del usuario: {str(e)}")
|
| | return False, None
|
| |
|
| | def authenticate_student(username, password):
|
| | """Autentica un estudiante"""
|
| | success, role = authenticate_user(username, password)
|
| | if success and role == 'Estudiante':
|
| | return True, role
|
| | return False, None
|
| |
|
| | def authenticate_admin(username, password):
|
| | """Autentica un administrador"""
|
| | success, role = authenticate_user(username, password)
|
| | if success and role == 'Administrador':
|
| | return True, role
|
| | return False, None
|
| |
|
| | def register_student(username, password, additional_info=None):
|
| | """Registra un nuevo estudiante"""
|
| | try:
|
| | if get_student_user(username):
|
| | logger.warning(f"Estudiante ya existe: {username}")
|
| | return False
|
| |
|
| | hashed_password = hash_password(password)
|
| |
|
| |
|
| | if not additional_info:
|
| | additional_info = {}
|
| | additional_info['role'] = 'Estudiante'
|
| |
|
| | success = create_student_user(username, hashed_password, additional_info)
|
| | if success:
|
| | logger.info(f"Nuevo estudiante registrado: {username}")
|
| | return True
|
| |
|
| | logger.error(f"Error al crear estudiante: {username}")
|
| | return False
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error al registrar estudiante: {str(e)}")
|
| | return False
|
| |
|
| | def update_student_info(username, new_info):
|
| | """Actualiza la informaci贸n de un estudiante"""
|
| | try:
|
| | if 'password' in new_info:
|
| | new_info['password'] = hash_password(new_info['password'])
|
| |
|
| | success = update_student_user(username, new_info)
|
| | if success:
|
| | logger.info(f"Informaci贸n actualizada: {username}")
|
| | return True
|
| |
|
| | logger.error(f"Error al actualizar: {username}")
|
| | return False
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error en actualizaci贸n: {str(e)}")
|
| | return False
|
| |
|
| | def delete_student(username):
|
| | """Elimina un estudiante"""
|
| | try:
|
| | success = delete_student_user(username)
|
| | if success:
|
| | logger.info(f"Estudiante eliminado: {username}")
|
| | return True
|
| |
|
| | logger.error(f"Error al eliminar: {username}")
|
| | return False
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error en eliminaci贸n: {str(e)}")
|
| | return False
|
| |
|
| | def logout():
|
| | """Cierra la sesi贸n del usuario"""
|
| | try:
|
| | if 'session_id' in st.session_state and 'username' in st.session_state:
|
| | success = record_logout(
|
| | st.session_state.username,
|
| | st.session_state.session_id
|
| | )
|
| | if success:
|
| | logger.info(f"Sesi贸n cerrada: {st.session_state.username}")
|
| | else:
|
| | logger.warning(f"Error al registrar cierre de sesi贸n: {st.session_state.username}")
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error en logout: {str(e)}")
|
| | finally:
|
| | st.session_state.clear()
|
| |
|
| | def hash_password(password):
|
| | """Hashea una contrase帽a"""
|
| | return bcrypt.hashpw(
|
| | password.encode('utf-8'),
|
| | bcrypt.gensalt()
|
| | ).decode('utf-8')
|
| |
|
| | def verify_password(stored_password, provided_password):
|
| | """Verifica una contrase帽a"""
|
| | return bcrypt.checkpw(
|
| | provided_password.encode('utf-8'),
|
| | stored_password.encode('utf-8')
|
| | )
|
| |
|
| | __all__ = [
|
| | 'authenticate_user',
|
| | 'authenticate_admin',
|
| | 'authenticate_student',
|
| | 'register_student',
|
| | 'update_student_info',
|
| | 'delete_student',
|
| | 'logout',
|
| | 'hash_password',
|
| | 'verify_password'
|
| | ] |