import eventlet eventlet.monkey_patch() # Parchea librerías estándar para compatibilidad con eventlet (necesario para Socket.IO) import logging # Silenciar logs de error de socket.io que son ruido (Bad file descriptor) logging.getLogger('engineio.server').setLevel(logging.CRITICAL) logging.getLogger('socketio.server').setLevel(logging.CRITICAL) import os import fcntl import json import uuid import threading import time import base64 import requests import datetime from flask import Flask, render_template, request, jsonify, redirect, url_for, flash from flask_socketio import SocketIO, emit from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash import gitlab import telebot from telebot import types import markdown from dotenv import load_dotenv import resend # Importar Hugging Face Datasets para persistencia try: from datasets import Dataset, load_dataset from huggingface_hub import HfApi HF_AVAILABLE = True except ImportError: HF_AVAILABLE = False print("[WARNING] huggingface_hub/datasets not available. Using local JSON storage only.") # Cargar variables de entorno desde .env load_dotenv() # --- CONFIGURACIÓN DE LA APLICACIÓN --- app = Flask(__name__) app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "maker-secret-key") socketio = SocketIO(app, cors_allowed_origins="*") # DEBUG: Verificar si los modelos existen try: model_path = os.path.join(app.root_path, 'static', 'models') if os.path.exists(model_path): print(f"[DEBUG] Models directory found at: {model_path}") print(f"[DEBUG] Files: {os.listdir(model_path)}") else: print(f"[DEBUG] ERROR: Models directory NOT found at {model_path}") except Exception as e: print(f"[DEBUG] Error checking models: {e}") # --- GESTIÓN DE USUARIOS (Flask-Login) --- login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' login_manager.login_message = "Por favor, inicia sesión para acceder." login_manager.login_message_category = "red" # --- GESTOR DE DATASETS DE HUGGING FACE --- class HFDatasetManager: """Clase base para manejar persistencia con Hugging Face Datasets.""" def __init__(self, dataset_name, data_key, local_filename): self.dataset_name = os.getenv(dataset_name) self.data_key = data_key self.hf_token = os.getenv("HF_TOKEN") # Log para diagnóstico de variables de entorno (solo nombres para seguridad) hf_vars = [k for k in os.environ.keys() if k.startswith("HF_")] print(f"[{self.__class__.__name__}] DEBUG - HF Vars detectadas: {hf_vars}") self.use_hf = HF_AVAILABLE and self.dataset_name and self.hf_token # Configurar almacenamiento local como fallback data_dir = os.getenv("DATA_DIR", "data") if not os.path.isabs(data_dir): data_dir = os.path.join(os.getcwd(), data_dir) os.makedirs(data_dir, exist_ok=True) self.local_file = os.path.join(data_dir, local_filename) print(f"[{self.__class__.__name__}] HF Datasets: {'ENABLED' if self.use_hf else 'DISABLED (using local JSON)'}") if self.use_hf: print(f"[{self.__class__.__name__}] Dataset: {self.dataset_name}") def _load_from_hf(self): """Carga datos desde Hugging Face Dataset.""" try: dataset = load_dataset(self.dataset_name, split="train", token=self.hf_token) if len(dataset) > 0: return dataset[0][self.data_key] return [] except Exception as e: print(f"[HF ERROR] Error loading from dataset: {e}") return None def _load_from_local(self): """Carga datos desde archivo JSON local.""" if os.path.exists(self.local_file): try: with open(self.local_file, "r") as f: return json.load(f) except: return [] return [] def _save_to_hf(self, data): """Guarda datos en Hugging Face Dataset.""" try: dataset_dict = {self.data_key: [data]} dataset = Dataset.from_dict(dataset_dict) dataset.push_to_hub( self.dataset_name, token=self.hf_token, private=True ) print(f"[HF SUCCESS] Data saved to {self.dataset_name}") return True except Exception as e: print(f"[HF ERROR] Error saving to dataset: {e}") return False def _save_to_local(self, data): """Guarda datos en archivo JSON local.""" try: with open(self.local_file, "w") as f: json.dump(data, f, indent=4) return True except Exception as e: print(f"[LOCAL ERROR] Error saving to local file: {e}") return False class User(UserMixin): """Clase de usuario compatible con Flask-Login.""" def __init__(self, id, username): self.id = id self.username = username # --- GESTOR DE USUARIOS (Persistencia con HF Datasets) --- class UserManager(HFDatasetManager): """Clase para manejar los usuarios con persistencia en HF Datasets.""" def __init__(self): super().__init__( dataset_name="HF_DATASET_USERS", data_key="users", local_filename="users.json" ) self.users = self._load() def _load(self): """Carga los usuarios desde HF Dataset o archivo JSON local.""" if self.use_hf: data = self._load_from_hf() if data: # Si hay datos (no vacío) print(f"[UserManager] Loaded {len(data)} users from HF Dataset") return data if data is not None: print("[UserManager] HF Dataset is empty, checking local...") else: print("[UserManager] Failed to load from HF, checking local...") data = self._load_from_local() print(f"[UserManager] Loaded {len(data)} users from local JSON") return data def save(self): """Guarda la lista de usuarios en HF Dataset y/o archivo JSON.""" # Siempre guardar localmente como backup self._save_to_local(self.users) # Intentar guardar en HF si está habilitado if self.use_hf: success = self._save_to_hf(self.users) if success: print(f"[UserManager] Saved {len(self.users)} users to HF Dataset") else: print("[UserManager] Failed to save to HF, data saved locally only") def add_user(self, username, password, email, status="PENDING"): """Registra un nuevo usuario con email, contraseña hasheada y estado pendiente.""" if self.get_by_username(username) or self.get_by_email(email): return False user_id = str(uuid.uuid4()) hashed_pw = generate_password_hash(password) self.users.append({ "id": user_id, "username": username, "email": email, "password": hashed_pw, "status": status, "reset_token": None, "reset_expiry": None }) self.save() return True def activate_user(self, username): """Activa un usuario pendiente.""" for user in self.users: if user["username"] == username: user["status"] = "ACTIVE" self.save() return True return False def delete_user(self, username): """Elimina un usuario (para rechazar registros).""" self.users = [u for u in self.users if u["username"] != username] self.save() return True def get_by_username(self, username): """Busca un usuario por su nombre.""" for user in self.users: if user["username"] == username: return user return None def get_by_email(self, email): """Busca un usuario por su email.""" for user in self.users: if user.get("email") == email: return user return None def get_by_id(self, user_id): """Busca un usuario por su ID.""" for user in self.users: if user["id"] == user_id: return user return None def generate_reset_token(self, email): """Genera un token de recuperación de contraseña.""" user = self.get_by_email(email) if user: token = str(uuid.uuid4()) expiry = (datetime.datetime.now() + datetime.timedelta(hours=1)).isoformat() user["reset_token"] = token user["reset_expiry"] = expiry self.save() return token return None def verify_reset_token(self, token): """Verifica un token de recuperación y retorna el usuario si es válido.""" for user in self.users: if user.get("reset_token") == token: expiry_str = user.get("reset_expiry") if expiry_str: expiry = datetime.datetime.fromisoformat(expiry_str) if datetime.datetime.now() < expiry: return user return None def verify_user(self, username, password): """Verifica las credenciales de un usuario.""" user_data = self.get_by_username(username) # Changed from get_user to get_by_username if user_data and check_password_hash(user_data['password'], password): return User(user_data['id'], username) # Changed from User(username, username) to User(user_data['id'], username) return None def update_password(self, user_id, new_password): """Actualiza la contraseña de un usuario.""" for user in self.users: if user["id"] == user_id: user["password"] = generate_password_hash(new_password) user["reset_token"] = None user["reset_expiry"] = None self.save() return True return False # --- GESTOR DE AULAS (Persistencia con HF Datasets) --- class ClassroomManager(HFDatasetManager): """Clase para manejar aulas y estudiantes con persistencia.""" def __init__(self): super().__init__( dataset_name="HF_DATASET_CLASSROOMS", # Usar variable de entorno distinta para evitar conflictos data_key="classrooms", local_filename="classrooms.json" ) self.classrooms = self._load() def _load(self): """Carga los cursos desde HF Dataset o archivo JSON local.""" if self.use_hf: data = self._load_from_hf() if data: # Si hay datos (no vacío) print(f"[ClassroomManager] Loaded {len(data)} courses from HF Dataset") return data if data is not None: print("[ClassroomManager] HF Dataset is empty, checking local...") else: print("[ClassroomManager] Failed to load from HF, checking local...") data = self._load_from_local() print(f"[ClassroomManager] Loaded {len(data)} courses from local JSON") return data def save(self): """Guarda la lista de cursos en HF Dataset y/o archivo JSON.""" # Siempre guardar localmente como backup self._save_to_local(self.classrooms) # Intentar guardar en HF si está habilitado if self.use_hf: success = self._save_to_hf(self.classrooms) if success: print(f"[ClassroomManager] Saved {len(self.classrooms)} courses to HF Dataset") else: print("[ClassroomManager] Failed to save to HF, data saved locally only") def get_courses(self): return self.classrooms def get_course(self, course_id): return next((c for c in self.classrooms if c['id'] == course_id), None) def create_course(self, name): course = { "id": str(uuid.uuid4()), "name": name, "students": [] } self.classrooms.append(course) self.save() return course def add_student(self, course_id, student_name): course = self.get_course(course_id) if course: # Evitar duplicados por nombre if not any(s['name'] == student_name for s in course['students']): student_id = str(uuid.uuid4()) course['students'].append({ "id": student_id, "name": student_name, "attendance": [] }) self.save() return True return False def record_attendance(self, student_identifier): """Registra asistencia buscando por ID o Nombre.""" today = datetime.datetime.now().strftime("%Y-%m-%d") recorded = False student_name_found = None for course in self.classrooms: for student in course['students']: # Verificar ID (prioridad) o Nombre s_id = student.get('id') if s_id == student_identifier or student['name'] == student_identifier: if today not in student['attendance']: student['attendance'].append(today) recorded = True student_name_found = student['name'] if recorded: self.save() return student_name_found # Retornamos el nombre para la notificación def delete_course(self, course_id): """Elimina un curso por ID.""" self.classrooms = [c for c in self.classrooms if c['id'] != course_id] self.save() return True def delete_student(self, course_id, student_id): """Elimina un estudiante de un curso.""" course = self.get_course(course_id) if course: course['students'] = [s for s in course['students'] if s.get('id') != student_id] self.save() return True return False user_mgr = UserManager() classroom_manager = ClassroomManager() @login_manager.user_loader def load_user(user_id): """Cargador de usuario para Flask-Login desde el JSON.""" user_data = user_mgr.get_by_id(user_id) if user_data and user_data.get('status') == "ACTIVE": return User(user_data['id'], user_data['username']) return None # --- GESTOR DE PRÉSTAMOS (Persistencia con HF Datasets) --- class LoanManager(HFDatasetManager): """Clase para manejar la persistencia de préstamos en HF Datasets.""" def __init__(self): super().__init__( dataset_name="HF_DATASET_LOANS", data_key="loans", local_filename="prestamos.json" ) self.loans = self._load() def _load(self): """Carga los préstamos desde HF Dataset o archivo JSON local.""" if self.use_hf: data = self._load_from_hf() if data: # Si hay datos (no vacío) print(f"[LoanManager] Loaded {len(data)} loans from HF Dataset") return data if data is not None: print("[LoanManager] HF Dataset is empty, checking local...") else: print("[LoanManager] Failed to load from HF, checking local...") data = self._load_from_local() print(f"[LoanManager] Loaded {len(data)} loans from local JSON") return data def save(self): """Guarda la lista actual de préstamos en HF Dataset y/o archivo JSON.""" # Siempre guardar localmente como backup self._save_to_local(self.loans) # Intentar guardar en HF si está habilitado if self.use_hf: success = self._save_to_hf(self.loans) if success: print(f"[LoanManager] Saved {len(self.loans)} loans to HF Dataset") else: print("[LoanManager] Failed to save to HF, data saved locally only") def add_loan(self, loan): """Añade un nuevo préstamo a la lista y guarda en disco.""" self.loans.append(loan) self.save() def update_status(self, loan_id, status): """Actualiza el estado de un préstamo existente.""" for loan in self.loans: if loan["id"] == loan_id: loan["status_loan"] = status self.save() return True return False def get_all(self): """Retorna todos los préstamos registrados.""" return self.loans loan_mgr = LoanManager() # --- GESTOR DE ROSTROS (Asistencia) --- class FaceManager(HFDatasetManager): """Clase para manejar descriptores de rostros para asistencia.""" def __init__(self): super().__init__( dataset_name="HF_DATASET_FACES", data_key="faces", local_filename="faces.json" ) self.faces = self._load() def _load(self): """Carga los rostros desde HF Dataset o archivo JSON local.""" if self.use_hf: data = self._load_from_hf() if data: # Si hay datos (no vacío) print(f"[FaceManager] Loaded {len(data)} faces from HF Dataset") return data if data is not None: print("[FaceManager] HF Dataset is empty, checking local...") else: print("[FaceManager] Failed to load from HF, checking local...") data = self._load_from_local() print(f"[FaceManager] Loaded {len(data)} faces from local JSON") return data def save(self): """Guarda la lista de rostros en HF Dataset y/o archivo JSON.""" # Siempre guardar localmente como backup self._save_to_local(self.faces) # Intentar guardar en HF si está habilitado if self.use_hf: success = self._save_to_hf(self.faces) if success: print(f"[FaceManager] Saved {len(self.faces)} faces to HF Dataset") else: print("[FaceManager] Failed to save to HF, data saved locally only") def add_face(self, label, descriptor): # El descriptor es una lista de 128 floats (face-api.js) self.faces.append({ "label": label, "descriptor": descriptor }) self.save() def get_all(self): return self.faces face_mgr = FaceManager() # --- INTEGRACIÓN CON TELEGRAM --- TG_TOKEN = os.getenv("TELEGRAM_TOKEN") TG_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") try: if TG_CHAT_ID: TG_CHAT_ID = int(TG_CHAT_ID) except: pass # URL del proxy de Google Script GOOGLE_PROXY_URL = os.getenv("GOOGLE_PROXY_URL") or "https://script.google.com/macros/s/AKfycbz7z1Jb0vsur42GmmqrL3PVXeRkN2WxSojFDIleEDoLOg6MnrmJjb_uuPcQ15CTwyzD/exec" if TG_TOKEN: if GOOGLE_PROXY_URL: print("[BOT] Usando Google Proxy URL") base_url = GOOGLE_PROXY_URL.split('?')[0] telebot.apihelper.API_URL = base_url + "?path={1}&token={0}" else: print("[BOT] Usando conexión directa a Telegram") # Los timeouts de la librería DEBEN ser mayores que el tiempo de espera del long polling de Telegram # para evitar "Ghost connections" que causan el error 409 Conflict. telebot.apihelper.CONNECT_TIMEOUT = 90 telebot.apihelper.READ_TIMEOUT = 90 bot = telebot.TeleBot(TG_TOKEN) if TG_TOKEN else None def escape_md(text): """Escapa caracteres para Markdown de Telegram.""" if not text: return "" for char in ['_', '*', '[', '`']: text = text.replace(char, f"\\{char}") return text def mark_as_delivered(loan_id): """Lógica central para marcar un préstamo como entregado físicamente.""" loan = None for l in loan_mgr.get_all(): if l['id'] == loan_id: loan = l break if loan and loan['status_loan'] == "ACCEPTED": # Hora AST (Rep. Dom.) utc_now = datetime.datetime.now(datetime.timezone.utc) rd_now = utc_now - datetime.timedelta(hours=4) ahora = rd_now.strftime("%H:%M") loan_mgr.update_status(loan_id, "DELIVERED") socketio.emit('notification', {"text": f"{loan['Solicitante']} ha entregado", "color": "blue"}) return True, loan, ahora return False, None, None if bot: @bot.callback_query_handler(func=lambda call: True) def handle_query(call): try: if call.data.startswith("accept_"): loan_id = call.data.replace("accept_", "") if loan_mgr.update_status(loan_id, "ACCEPTED"): bot.answer_callback_query(call.id, "Préstamo Aceptado") # Al aceptar, añadimos el botón de "ENTREGAR" en el mismo mensaje markup = types.InlineKeyboardMarkup() markup.add(types.InlineKeyboardButton("📦 MARCAR ENTREGADO", callback_data=f"deliver_{loan_id}")) nuevo_texto = f"✅ *ACEPTADO*\n{escape_md(call.message.text)}" bot.edit_message_text(nuevo_texto, call.message.chat.id, call.message.message_id, reply_markup=markup, parse_mode="Markdown") socketio.emit('notification', {"text": f"Préstamo {loan_id[:8]} ACEPTADO", "color": "green"}) elif call.data.startswith("decline_"): loan_id = call.data.replace("decline_", "") if loan_mgr.update_status(loan_id, "DECLINED"): bot.answer_callback_query(call.id, "Préstamo Declinado") nuevo_texto = f"❌ *DECLINADO*\n{escape_md(call.message.text)}" bot.edit_message_text(nuevo_texto, call.message.chat.id, call.message.message_id, parse_mode="Markdown") socketio.emit('notification', {"text": f"Préstamo {loan_id[:8]} DECLINADO", "color": "red"}) elif call.data.startswith("deliver_"): loan_id = call.data.replace("deliver_", "") success, loan, ahora = mark_as_delivered(loan_id) if success: bot.answer_callback_query(call.id, "Entrega Confirmada") nuevo_texto = f"📦 *ENTREGADO A LAS {ahora}*\n{escape_md(call.message.text)}" bot.edit_message_text(nuevo_texto, call.message.chat.id, call.message.message_id, parse_mode="Markdown") else: bot.answer_callback_query(call.id, "Error: El préstamo no está aceptado o no existe", show_alert=True) elif call.data.startswith("approve_user_"): username = call.data.replace("approve_user_", "") if user_mgr.activate_user(username): bot.answer_callback_query(call.id, "Usuario aprobado") bot.edit_message_text(f"✅ *APROBADO:* El usuario `{username}` ya puede iniciar sesión.", call.message.chat.id, call.message.message_id, parse_mode="Markdown") socketio.emit('notification', {"text": f"Usuario {username} APROBADO", "color": "green"}) elif call.data.startswith("decline_user_"): username = call.data.replace("decline_user_", "") if user_mgr.delete_user(username): bot.answer_callback_query(call.id, "Usuario rechazado") bot.edit_message_text(f"❌ *RECHAZADO:* El registro de `{username}` ha sido eliminado.", call.message.chat.id, call.message.message_id, parse_mode="Markdown") except Exception as e: print(f"Callback Error: {e}") @bot.message_handler(commands=['aceptar', 'declinar', 'entregado', 'status']) def handle_text_commands(message): try: text = message.text.split() if len(text) < 2: bot.reply_to(message, "Uso: /aceptar , /declinar o /entregado ") return cmd = text[0][1:]; loan_id = text[1] if cmd == "entregado": success, loan, ahora = mark_as_delivered(loan_id) if success: bot.reply_to(message, f"📦 Entrega confirmada para {loan['Solicitante']} a las {ahora}") else: bot.reply_to(message, "No se pudo marcar como entregado. Verifica el ID y el estado.") else: status = "ACCEPTED" if cmd == "aceptar" else "DECLINED" if loan_mgr.update_status(loan_id, status): emoji = "✅" if status == "ACCEPTED" else "❌" bot.reply_to(message, f"{emoji} Préstamo {loan_id} actualizado a {status}") socketio.emit('notification', {"text": f"Préstamo {loan_id[:8]} {status}", "color": "green" if status == "ACCEPTED" else "red"}) except Exception as e: print(f"Command Error: {e}") def start_bot_thread(): if not bot: return # Usar un lock de archivo para prevenir múltiples instancias en Gunicorn workers o Flask reloader try: # Abrimos el archivo en modo escritura y pedimos un lock exclusivo no bloqueante lock_file_path = "/tmp/tg_bot.lock" # Mantenemos la referencia al archivo abierta para que el lock sea efectivo mientras el hilo viva # Nota: En Python, si el objeto file se cierra, el lock se libera. # Por eso lo hacemos global o lo mantenemos en el scope del bucle. global _bot_lock_file _bot_lock_file = open(lock_file_path, "w") fcntl.flock(_bot_lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) except (IOError, OSError): # Si no podemos obtener el lock, es que otra instancia ya lo tiene print("[BOT] Otra instancia detectada. No se iniciará el bot en este hilo.") return # Espera inicial para que los recursos se estabilicen time.sleep(5) try: bot.delete_webhook() print("[BOT] Webhook eliminado.") except Exception as e: print(f"[BOT] Error eliminando webhook: {e}") print("[BOT] Iniciando infinity_polling...") # infinity_polling ya maneja reconexiones automáticas y errores internos. # timeout: Cada cuánto tiempo la librería refresca la conexión. # long_polling_timeout: Cuánto tiempo espera el servidor de Telegram antes de responder vacío. bot.infinity_polling(timeout=90, long_polling_timeout=30) if bot: threading.Thread(target=start_bot_thread, daemon=True).start() # --- RUTAS WEB --- @app.route('/') def index(): return render_template('index.html', title="MAKER SPACE") @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('miembros')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user_data = user_mgr.get_by_username(username) if user_data: if user_data.get('status') != "ACTIVE": flash("Tu cuenta está pendiente de aprobación por un administrador.", "orange") return render_template('login.html') if check_password_hash(user_data['password'], password): user = User(user_data['id'], user_data['username']) login_user(user) flash(f"¡Bienvenido de nuevo, {username}!", "green") return redirect(url_for('miembros')) flash("Usuario o contraseña incorrectos", "red") return render_template('login.html', title="Login") @app.route('/register', methods=['GET', 'POST']) def register(): if current_user.is_authenticated: return redirect(url_for('miembros')) if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') confirm_password = request.form.get('confirm_password') if password != confirm_password: flash("Las contraseñas no coinciden", "red") return render_template('register.html') if user_mgr.add_user(username, password, email, status="PENDING"): # Notificar a Telegram sobre el nuevo registro if bot and TG_CHAT_ID: try: markup = types.InlineKeyboardMarkup() markup.add( types.InlineKeyboardButton("✅ Aprobar", callback_data=f"approve_user_{username}"), types.InlineKeyboardButton("❌ Rechazar", callback_data=f"decline_user_{username}") ) bot.send_message(TG_CHAT_ID, f"👤 *NUEVO REGISTRO:* `{username}` solicita acceso.", reply_markup=markup, parse_mode="Markdown") except Exception as e: print(f"Error TG Register: {e}") flash("Registro enviado. Un administrador debe aprobar tu cuenta.", "blue") return redirect(url_for('login')) else: flash("El nombre de usuario ya existe.", "red") return render_template('register.html', title="Registro") @app.route('/logout') @login_required def logout(): logout_user() flash("Has cerrado sesión", "blue") return redirect(url_for('index')) def send_email(to_email, subject, body): """Envía un correo electrónico usando Resend (100% gratis hasta 100 emails/día).""" resend_api_key = os.getenv("RESEND_API_KEY") from_email = os.getenv("FROM_EMAIL", "onboarding@resend.dev") # Email verificado en Resend if not resend_api_key: print("[EMAIL ERROR] Falta RESEND_API_KEY en variables de entorno.") return False try: resend.api_key = resend_api_key params = { "from": from_email, "to": [to_email], "subject": subject, "text": body, } email_response = resend.Emails.send(params) print(f"[EMAIL SUCCESS] Correo enviado a {to_email}: {email_response}") return True except Exception as e: print(f"[EMAIL ERROR] Error enviando correo con Resend: {e}") return False @app.route('/forgot-password', methods=['GET', 'POST']) def forgot_password(): if current_user.is_authenticated: return redirect(url_for('miembros')) if request.method == 'POST': email = request.form.get('email') token = user_mgr.generate_reset_token(email) if token: reset_url = url_for('reset_password', token=token, _external=True) # Intentar enviar correo real subject = "Recuperación de Contraseña - Maker Space" body = f"Hola,\n\nHemos recibido una solicitud para restablecer tu contraseña. Haz clic en el siguiente enlace para continuar:\n\n{reset_url}\n\nSi no solicitaste este cambio, puedes ignorar este correo.\n\nEste enlace caducará en 1 hora." if send_email(email, subject, body): flash("Se ha enviado un enlace de recuperación a tu correo.", "blue") else: # Fallback a logs si falla el envío print(f"\n[EMAIL FALLBACK] Para: {email}") print(f"[EMAIL FALLBACK] Enlace: {reset_url}\n") flash("Hubo un problema enviando el correo. Contacta al administrador o revisa los logs.", "orange") else: # Por seguridad, no decimos si el email existe flash("Si el correo está registrado, recibirás un enlace de recuperación.", "blue") return redirect(url_for('login')) return render_template('forgot_password.html', title="Recuperar Contraseña") @app.route('/reset-password/', methods=['GET', 'POST']) def reset_password(token): if current_user.is_authenticated: return redirect(url_for('miembros')) user = user_mgr.verify_reset_token(token) if not user: flash("El enlace de recuperación es inválido o ha expirado.", "red") return redirect(url_for('forgot_password')) if request.method == 'POST': password = request.form.get('password') confirm_password = request.form.get('confirm_password') if password != confirm_password: flash("Las contraseñas no coinciden", "red") return render_template('reset_password.html', title="Nueva Contraseña") if user_mgr.update_password(user['id'], password): flash("Contraseña actualizada correctamente. Ya puedes iniciar sesión.", "green") return redirect(url_for('login')) else: flash("Error al actualizar la contraseña.", "red") return render_template('reset_password.html', title="Nueva Contraseña") @app.route('/prestamos') def prestamos(): loans = loan_mgr.get_all() return render_template('prestamos.html', title="Préstamos", loans=loans) @app.route('/api/prestamo', methods=['POST']) def api_prestamo(): try: data = request.json if not data: return jsonify({"status": "error", "message": "No data received"}), 400 solicitante = data.get('solicitante') hora_salida = data.get('hora_salida') hora_retorno = data.get('hora_retorno') items_list = data.get('items', []) if not solicitante: return jsonify({"status": "error", "message": "Solicitante es requerido"}), 400 # Formatear items para el campo 'item' items_desc = [] for it in items_list: items_desc.append(f"• {it['descripcion']} ({it['cantidad']}) [{it['categoria']}]") full_items_string = "\n".join(items_desc) loan_id = str(uuid.uuid4()) new_loan = { "id": loan_id, "Solicitante": solicitante, "hora": hora_salida, "devolucion": hora_retorno, "item": full_items_string, "status_loan": "PENDING", "timestamp": datetime.datetime.now().isoformat() } loan_mgr.add_loan(new_loan) # Notificar a Telegram if bot and TG_CHAT_ID: try: msg = f"📦 *NUEVA SOLICITUD DE PRÉSTAMO*\n\n" msg += f"👤 *Solicitante:* {escape_md(solicitante)}\n" msg += f"🕒 *Horario:* {hora_salida} - {hora_retorno}\n" msg += f"🛠 *Herramientas:*\n{escape_md(full_items_string)}" markup = types.InlineKeyboardMarkup() markup.add( types.InlineKeyboardButton("✅ ACEPTAR", callback_data=f"accept_{loan_id}"), types.InlineKeyboardButton("❌ DECLINAR", callback_data=f"decline_{loan_id}") ) bot.send_message(TG_CHAT_ID, msg, reply_markup=markup, parse_mode="Markdown") except Exception as tg_e: print(f"Error TG Loan: {tg_e}") # Emitir notificación por socket socketio.emit('notification', {"text": f"Nueva solicitud de {solicitante}", "color": "orange"}) return jsonify({"status": "success", "loan_id": loan_id}) except Exception as e: print(f"API Prestamo Error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # --- CLASSROOM MAKER --- @app.route('/miembros') @login_required def miembros(): return render_template('miembros.html', title="Acceso Miembros") @app.route('/classroom') def classroom(): return render_template('classroom.html', title="Classroom Maker") @app.route('/asistencia') def asistencia(): return render_template('asistencia.html', title="Toma de Asistencia") @app.route('/tutoria') def tutoria(): return render_template('tutoria.html', title="Tutoría y Guías") @app.route('/guia/registro') def guia_registro(): return render_template('guia_registro.html', title="Guía de Registro") @app.route('/guia/herramientas') def guia_herramientas(): return render_template('guia_herramientas.html', title="Guía de Herramientas") @app.route('/guia/asistencia') def guia_asistencia(): return render_template('guia_asistencia.html', title="Guía de Asistencia") @app.route('/guia/gestion') def guia_gestion(): return render_template('guia_gestion.html', title="Guía de Gestión") # --- RUTAS DE GESTIÓN DE AULAS --- # --- RUTAS DE GESTIÓN DE AULAS --- @app.route('/classroom/dashboard') @login_required def classroom_dashboard(): courses = classroom_manager.get_courses() return render_template('classroom_dashboard.html', title="Gestión de Aulas", courses=courses) @app.route('/classroom/create', methods=['POST']) @login_required def create_course(): name = request.form.get('name') if name: classroom_manager.create_course(name) flash('Curso creado exitosamente', 'green') else: flash('El nombre del curso es requerido', 'red') return redirect(url_for('classroom_dashboard')) @app.route('/classroom/') @login_required def course_details(course_id): course = classroom_manager.get_course(course_id) if not course: flash('Curso no encontrado', 'red') return redirect(url_for('classroom_dashboard')) return render_template('course_details.html', title=course['name'], course=course) @app.route('/classroom//add_student', methods=['POST']) @login_required def add_student(course_id): name = request.form.get('student_name') if name: if classroom_manager.add_student(course_id, name): flash('Estudiante agregado', 'green') else: flash('Estudiante ya existe o error al agregar', 'red') return redirect(url_for('course_details', course_id=course_id)) @app.route('/classroom//delete', methods=['POST']) @login_required def delete_course(course_id): classroom_manager.delete_course(course_id) flash('Curso eliminado', 'green') return redirect(url_for('classroom_dashboard')) @app.route('/classroom//delete_student/', methods=['POST']) @login_required def delete_student(course_id, student_id): if classroom_manager.delete_student(course_id, student_id): flash('Estudiante eliminado', 'green') else: flash('Error al eliminar estudiante', 'red') return redirect(url_for('course_details', course_id=course_id)) @app.route('/api/courses') def api_courses(): """Devuelve la lista de cursos y sus estudiantes para el frontend.""" courses = classroom_manager.get_courses() return jsonify(courses) @app.route('/api/faces', methods=['GET', 'POST']) def api_faces(): if request.method == 'POST': data = request.json label = data.get('label') # Ahora esto puede ser el ID o Nombre descriptor = data.get('descriptor') student_id = data.get('student_id') course_id = data.get('course_id') if label and descriptor: # Aquí idealmente guardaríamos el descriptor asociado al ID del estudiante # Por simplicidad en este prototipo sin DB vectorial real, # guardamos un JSON local "faces.json" o similar. face_data = { "label": label, # Nombre para mostrar "descriptor": descriptor, "student_id": student_id, "course_id": course_id } # Cargar existentes faces = [] if os.path.exists('faces.json'): try: with open('faces.json', 'r') as f: faces = json.load(f) except: pass faces.append(face_data) with open('faces.json', 'w') as f: json.dump(faces, f) return jsonify({"status": "success"}) # GET: Devolver rostros guardados if os.path.exists('faces.json'): try: with open('faces.json', 'r') as f: return jsonify(json.load(f)) except: return jsonify([]) return jsonify([]) @app.route('/api/attendance', methods=['POST']) def api_attendance(): data = request.json label = data.get('label') # El label que viene de face-api (puede ser el nombre) if label: print(f"[ASISTENCIA] Registrando: {label}") # Registrar en el sistema de aulas # ClassroomManager buscará por nombre o ID if classroom_manager.record_attendance(label): socketio.emit('notification', {'text': f'Bienvenido/a {label}', 'color': 'green'}) return jsonify({"status": "success", "message": f"Asistencia registrada para {label}"}) else: # Si no se encuentra en una clase, igual notificar pero indicar advertencia? # O asumimos que es un invitado. socketio.emit('notification', {'text': f'Hola {label} (No inscrito)', 'color': 'blue'}) return jsonify({"status": "warning", "message": "Registrado pero no vinculado a curso"}) return jsonify({"status": "error", "message": "No label provided"}), 400 @app.route('/repos') def repos(): GIT_TOKEN = os.getenv("GITLAB_TOKEN") GIT_GROUP = os.getenv("GITLAB_GROUP_ID") projects = [] if GIT_TOKEN and GIT_GROUP: try: gl = gitlab.Gitlab("https://gitlab.com", private_token=GIT_TOKEN) projects = gl.groups.get(GIT_GROUP).projects.list(all=True) except: pass return render_template('repos.html', title="Proyectos", projects=projects) @app.route('/ver//') def ver_repo(pid, pname): GIT_TOKEN = os.getenv("GITLAB_TOKEN") readme_html = "

README no disponible

" web_url = "#"; download_url = "#" if GIT_TOKEN: try: gl = gitlab.Gitlab("https://gitlab.com", private_token=GIT_TOKEN) project = gl.projects.get(pid) web_url = project.web_url download_url = f"https://gitlab.com/api/v4/projects/{pid}/repository/archive.zip?private_token={GIT_TOKEN}" for branch in ["main", "master"]: try: f = project.files.get(file_path='README.md', ref=branch) readme_text = base64.b64decode(f.content).decode("utf-8") readme_html = markdown.markdown(readme_text, extensions=['fenced_code', 'tables']) break except: continue except: pass return render_template('ver_repo.html', title=pname, project_name=pname, readme_html=readme_html, web_url=web_url, download_url=download_url) if __name__ == '__main__': port = int(os.getenv("PORT", 7860)) socketio.run(app, host="0.0.0.0", port=port, debug=True)