Makerpage / app /main.py
Edoruin's picture
solving telegram bot issues
b50c1f8
import eventlet
eventlet.monkey_patch() # Parchea librerías estándar para compatibilidad con eventlet (necesario para Socket.IO)
import logging
# Silenciar logs de error de socket.io que son ruido (Bad file descriptor)
logging.getLogger('engineio.server').setLevel(logging.CRITICAL)
logging.getLogger('socketio.server').setLevel(logging.CRITICAL)
import os
import fcntl
import json
import uuid
import threading
import time
import base64
import requests
import datetime
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash
from flask_socketio import SocketIO, emit
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
import gitlab
import telebot
from telebot import types
import markdown
from dotenv import load_dotenv
import resend
# Importar Hugging Face Datasets para persistencia
try:
from datasets import Dataset, load_dataset
from huggingface_hub import HfApi
HF_AVAILABLE = True
except ImportError:
HF_AVAILABLE = False
print("[WARNING] huggingface_hub/datasets not available. Using local JSON storage only.")
# Cargar variables de entorno desde .env
load_dotenv()
# --- CONFIGURACIÓN DE LA APLICACIÓN ---
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "maker-secret-key")
socketio = SocketIO(app, cors_allowed_origins="*")
# DEBUG: Verificar si los modelos existen
try:
model_path = os.path.join(app.root_path, 'static', 'models')
if os.path.exists(model_path):
print(f"[DEBUG] Models directory found at: {model_path}")
print(f"[DEBUG] Files: {os.listdir(model_path)}")
else:
print(f"[DEBUG] ERROR: Models directory NOT found at {model_path}")
except Exception as e:
print(f"[DEBUG] Error checking models: {e}")
# --- GESTIÓN DE USUARIOS (Flask-Login) ---
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = "Por favor, inicia sesión para acceder."
login_manager.login_message_category = "red"
# --- GESTOR DE DATASETS DE HUGGING FACE ---
class HFDatasetManager:
"""Clase base para manejar persistencia con Hugging Face Datasets."""
def __init__(self, dataset_name, data_key, local_filename):
self.dataset_name = os.getenv(dataset_name)
self.data_key = data_key
self.hf_token = os.getenv("HF_TOKEN")
# Log para diagnóstico de variables de entorno (solo nombres para seguridad)
hf_vars = [k for k in os.environ.keys() if k.startswith("HF_")]
print(f"[{self.__class__.__name__}] DEBUG - HF Vars detectadas: {hf_vars}")
self.use_hf = HF_AVAILABLE and self.dataset_name and self.hf_token
# Configurar almacenamiento local como fallback
data_dir = os.getenv("DATA_DIR", "data")
if not os.path.isabs(data_dir):
data_dir = os.path.join(os.getcwd(), data_dir)
os.makedirs(data_dir, exist_ok=True)
self.local_file = os.path.join(data_dir, local_filename)
print(f"[{self.__class__.__name__}] HF Datasets: {'ENABLED' if self.use_hf else 'DISABLED (using local JSON)'}")
if self.use_hf:
print(f"[{self.__class__.__name__}] Dataset: {self.dataset_name}")
def _load_from_hf(self):
"""Carga datos desde Hugging Face Dataset."""
try:
dataset = load_dataset(self.dataset_name, split="train", token=self.hf_token)
if len(dataset) > 0:
return dataset[0][self.data_key]
return []
except Exception as e:
print(f"[HF ERROR] Error loading from dataset: {e}")
return None
def _load_from_local(self):
"""Carga datos desde archivo JSON local."""
if os.path.exists(self.local_file):
try:
with open(self.local_file, "r") as f:
return json.load(f)
except:
return []
return []
def _save_to_hf(self, data):
"""Guarda datos en Hugging Face Dataset."""
try:
dataset_dict = {self.data_key: [data]}
dataset = Dataset.from_dict(dataset_dict)
dataset.push_to_hub(
self.dataset_name,
token=self.hf_token,
private=True
)
print(f"[HF SUCCESS] Data saved to {self.dataset_name}")
return True
except Exception as e:
print(f"[HF ERROR] Error saving to dataset: {e}")
return False
def _save_to_local(self, data):
"""Guarda datos en archivo JSON local."""
try:
with open(self.local_file, "w") as f:
json.dump(data, f, indent=4)
return True
except Exception as e:
print(f"[LOCAL ERROR] Error saving to local file: {e}")
return False
class User(UserMixin):
"""Clase de usuario compatible con Flask-Login."""
def __init__(self, id, username):
self.id = id
self.username = username
# --- GESTOR DE USUARIOS (Persistencia con HF Datasets) ---
class UserManager(HFDatasetManager):
"""Clase para manejar los usuarios con persistencia en HF Datasets."""
def __init__(self):
super().__init__(
dataset_name="HF_DATASET_USERS",
data_key="users",
local_filename="users.json"
)
self.users = self._load()
def _load(self):
"""Carga los usuarios desde HF Dataset o archivo JSON local."""
if self.use_hf:
data = self._load_from_hf()
if data: # Si hay datos (no vacío)
print(f"[UserManager] Loaded {len(data)} users from HF Dataset")
return data
if data is not None:
print("[UserManager] HF Dataset is empty, checking local...")
else:
print("[UserManager] Failed to load from HF, checking local...")
data = self._load_from_local()
print(f"[UserManager] Loaded {len(data)} users from local JSON")
return data
def save(self):
"""Guarda la lista de usuarios en HF Dataset y/o archivo JSON."""
# Siempre guardar localmente como backup
self._save_to_local(self.users)
# Intentar guardar en HF si está habilitado
if self.use_hf:
success = self._save_to_hf(self.users)
if success:
print(f"[UserManager] Saved {len(self.users)} users to HF Dataset")
else:
print("[UserManager] Failed to save to HF, data saved locally only")
def add_user(self, username, password, email, status="PENDING"):
"""Registra un nuevo usuario con email, contraseña hasheada y estado pendiente."""
if self.get_by_username(username) or self.get_by_email(email):
return False
user_id = str(uuid.uuid4())
hashed_pw = generate_password_hash(password)
self.users.append({
"id": user_id,
"username": username,
"email": email,
"password": hashed_pw,
"status": status,
"reset_token": None,
"reset_expiry": None
})
self.save()
return True
def activate_user(self, username):
"""Activa un usuario pendiente."""
for user in self.users:
if user["username"] == username:
user["status"] = "ACTIVE"
self.save()
return True
return False
def delete_user(self, username):
"""Elimina un usuario (para rechazar registros)."""
self.users = [u for u in self.users if u["username"] != username]
self.save()
return True
def get_by_username(self, username):
"""Busca un usuario por su nombre."""
for user in self.users:
if user["username"] == username:
return user
return None
def get_by_email(self, email):
"""Busca un usuario por su email."""
for user in self.users:
if user.get("email") == email:
return user
return None
def get_by_id(self, user_id):
"""Busca un usuario por su ID."""
for user in self.users:
if user["id"] == user_id:
return user
return None
def generate_reset_token(self, email):
"""Genera un token de recuperación de contraseña."""
user = self.get_by_email(email)
if user:
token = str(uuid.uuid4())
expiry = (datetime.datetime.now() + datetime.timedelta(hours=1)).isoformat()
user["reset_token"] = token
user["reset_expiry"] = expiry
self.save()
return token
return None
def verify_reset_token(self, token):
"""Verifica un token de recuperación y retorna el usuario si es válido."""
for user in self.users:
if user.get("reset_token") == token:
expiry_str = user.get("reset_expiry")
if expiry_str:
expiry = datetime.datetime.fromisoformat(expiry_str)
if datetime.datetime.now() < expiry:
return user
return None
def verify_user(self, username, password):
"""Verifica las credenciales de un usuario."""
user_data = self.get_by_username(username) # Changed from get_user to get_by_username
if user_data and check_password_hash(user_data['password'], password):
return User(user_data['id'], username) # Changed from User(username, username) to User(user_data['id'], username)
return None
def update_password(self, user_id, new_password):
"""Actualiza la contraseña de un usuario."""
for user in self.users:
if user["id"] == user_id:
user["password"] = generate_password_hash(new_password)
user["reset_token"] = None
user["reset_expiry"] = None
self.save()
return True
return False
# --- GESTOR DE AULAS (Persistencia con HF Datasets) ---
class ClassroomManager(HFDatasetManager):
"""Clase para manejar aulas y estudiantes con persistencia."""
def __init__(self):
super().__init__(
dataset_name="HF_DATASET_CLASSROOMS", # Usar variable de entorno distinta para evitar conflictos
data_key="classrooms",
local_filename="classrooms.json"
)
self.classrooms = self._load()
def _load(self):
"""Carga los cursos desde HF Dataset o archivo JSON local."""
if self.use_hf:
data = self._load_from_hf()
if data: # Si hay datos (no vacío)
print(f"[ClassroomManager] Loaded {len(data)} courses from HF Dataset")
return data
if data is not None:
print("[ClassroomManager] HF Dataset is empty, checking local...")
else:
print("[ClassroomManager] Failed to load from HF, checking local...")
data = self._load_from_local()
print(f"[ClassroomManager] Loaded {len(data)} courses from local JSON")
return data
def save(self):
"""Guarda la lista de cursos en HF Dataset y/o archivo JSON."""
# Siempre guardar localmente como backup
self._save_to_local(self.classrooms)
# Intentar guardar en HF si está habilitado
if self.use_hf:
success = self._save_to_hf(self.classrooms)
if success:
print(f"[ClassroomManager] Saved {len(self.classrooms)} courses to HF Dataset")
else:
print("[ClassroomManager] Failed to save to HF, data saved locally only")
def get_courses(self):
return self.classrooms
def get_course(self, course_id):
return next((c for c in self.classrooms if c['id'] == course_id), None)
def create_course(self, name):
course = {
"id": str(uuid.uuid4()),
"name": name,
"students": []
}
self.classrooms.append(course)
self.save()
return course
def add_student(self, course_id, student_name):
course = self.get_course(course_id)
if course:
# Evitar duplicados por nombre
if not any(s['name'] == student_name for s in course['students']):
student_id = str(uuid.uuid4())
course['students'].append({
"id": student_id,
"name": student_name,
"attendance": []
})
self.save()
return True
return False
def record_attendance(self, student_identifier):
"""Registra asistencia buscando por ID o Nombre."""
today = datetime.datetime.now().strftime("%Y-%m-%d")
recorded = False
student_name_found = None
for course in self.classrooms:
for student in course['students']:
# Verificar ID (prioridad) o Nombre
s_id = student.get('id')
if s_id == student_identifier or student['name'] == student_identifier:
if today not in student['attendance']:
student['attendance'].append(today)
recorded = True
student_name_found = student['name']
if recorded:
self.save()
return student_name_found # Retornamos el nombre para la notificación
def delete_course(self, course_id):
"""Elimina un curso por ID."""
self.classrooms = [c for c in self.classrooms if c['id'] != course_id]
self.save()
return True
def delete_student(self, course_id, student_id):
"""Elimina un estudiante de un curso."""
course = self.get_course(course_id)
if course:
course['students'] = [s for s in course['students'] if s.get('id') != student_id]
self.save()
return True
return False
user_mgr = UserManager()
classroom_manager = ClassroomManager()
@login_manager.user_loader
def load_user(user_id):
"""Cargador de usuario para Flask-Login desde el JSON."""
user_data = user_mgr.get_by_id(user_id)
if user_data and user_data.get('status') == "ACTIVE":
return User(user_data['id'], user_data['username'])
return None
# --- GESTOR DE PRÉSTAMOS (Persistencia con HF Datasets) ---
class LoanManager(HFDatasetManager):
"""Clase para manejar la persistencia de préstamos en HF Datasets."""
def __init__(self):
super().__init__(
dataset_name="HF_DATASET_LOANS",
data_key="loans",
local_filename="prestamos.json"
)
self.loans = self._load()
def _load(self):
"""Carga los préstamos desde HF Dataset o archivo JSON local."""
if self.use_hf:
data = self._load_from_hf()
if data: # Si hay datos (no vacío)
print(f"[LoanManager] Loaded {len(data)} loans from HF Dataset")
return data
if data is not None:
print("[LoanManager] HF Dataset is empty, checking local...")
else:
print("[LoanManager] Failed to load from HF, checking local...")
data = self._load_from_local()
print(f"[LoanManager] Loaded {len(data)} loans from local JSON")
return data
def save(self):
"""Guarda la lista actual de préstamos en HF Dataset y/o archivo JSON."""
# Siempre guardar localmente como backup
self._save_to_local(self.loans)
# Intentar guardar en HF si está habilitado
if self.use_hf:
success = self._save_to_hf(self.loans)
if success:
print(f"[LoanManager] Saved {len(self.loans)} loans to HF Dataset")
else:
print("[LoanManager] Failed to save to HF, data saved locally only")
def add_loan(self, loan):
"""Añade un nuevo préstamo a la lista y guarda en disco."""
self.loans.append(loan)
self.save()
def update_status(self, loan_id, status):
"""Actualiza el estado de un préstamo existente."""
for loan in self.loans:
if loan["id"] == loan_id:
loan["status_loan"] = status
self.save()
return True
return False
def get_all(self):
"""Retorna todos los préstamos registrados."""
return self.loans
loan_mgr = LoanManager()
# --- GESTOR DE ROSTROS (Asistencia) ---
class FaceManager(HFDatasetManager):
"""Clase para manejar descriptores de rostros para asistencia."""
def __init__(self):
super().__init__(
dataset_name="HF_DATASET_FACES",
data_key="faces",
local_filename="faces.json"
)
self.faces = self._load()
def _load(self):
"""Carga los rostros desde HF Dataset o archivo JSON local."""
if self.use_hf:
data = self._load_from_hf()
if data: # Si hay datos (no vacío)
print(f"[FaceManager] Loaded {len(data)} faces from HF Dataset")
return data
if data is not None:
print("[FaceManager] HF Dataset is empty, checking local...")
else:
print("[FaceManager] Failed to load from HF, checking local...")
data = self._load_from_local()
print(f"[FaceManager] Loaded {len(data)} faces from local JSON")
return data
def save(self):
"""Guarda la lista de rostros en HF Dataset y/o archivo JSON."""
# Siempre guardar localmente como backup
self._save_to_local(self.faces)
# Intentar guardar en HF si está habilitado
if self.use_hf:
success = self._save_to_hf(self.faces)
if success:
print(f"[FaceManager] Saved {len(self.faces)} faces to HF Dataset")
else:
print("[FaceManager] Failed to save to HF, data saved locally only")
def add_face(self, label, descriptor):
# El descriptor es una lista de 128 floats (face-api.js)
self.faces.append({
"label": label,
"descriptor": descriptor
})
self.save()
def get_all(self):
return self.faces
face_mgr = FaceManager()
# --- INTEGRACIÓN CON TELEGRAM ---
TG_TOKEN = os.getenv("TELEGRAM_TOKEN")
TG_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
try:
if TG_CHAT_ID: TG_CHAT_ID = int(TG_CHAT_ID)
except:
pass
# URL del proxy de Google Script
GOOGLE_PROXY_URL = os.getenv("GOOGLE_PROXY_URL") or "https://script.google.com/macros/s/AKfycbz7z1Jb0vsur42GmmqrL3PVXeRkN2WxSojFDIleEDoLOg6MnrmJjb_uuPcQ15CTwyzD/exec"
if TG_TOKEN:
if GOOGLE_PROXY_URL:
print("[BOT] Usando Google Proxy URL")
base_url = GOOGLE_PROXY_URL.split('?')[0]
telebot.apihelper.API_URL = base_url + "?path={1}&token={0}"
else:
print("[BOT] Usando conexión directa a Telegram")
# Los timeouts de la librería DEBEN ser mayores que el tiempo de espera del long polling de Telegram
# para evitar "Ghost connections" que causan el error 409 Conflict.
telebot.apihelper.CONNECT_TIMEOUT = 90
telebot.apihelper.READ_TIMEOUT = 90
bot = telebot.TeleBot(TG_TOKEN) if TG_TOKEN else None
def escape_md(text):
"""Escapa caracteres para Markdown de Telegram."""
if not text: return ""
for char in ['_', '*', '[', '`']:
text = text.replace(char, f"\\{char}")
return text
def mark_as_delivered(loan_id):
"""Lógica central para marcar un préstamo como entregado físicamente."""
loan = None
for l in loan_mgr.get_all():
if l['id'] == loan_id:
loan = l
break
if loan and loan['status_loan'] == "ACCEPTED":
# Hora AST (Rep. Dom.)
utc_now = datetime.datetime.now(datetime.timezone.utc)
rd_now = utc_now - datetime.timedelta(hours=4)
ahora = rd_now.strftime("%H:%M")
loan_mgr.update_status(loan_id, "DELIVERED")
socketio.emit('notification', {"text": f"{loan['Solicitante']} ha entregado", "color": "blue"})
return True, loan, ahora
return False, None, None
if bot:
@bot.callback_query_handler(func=lambda call: True)
def handle_query(call):
try:
if call.data.startswith("accept_"):
loan_id = call.data.replace("accept_", "")
if loan_mgr.update_status(loan_id, "ACCEPTED"):
bot.answer_callback_query(call.id, "Préstamo Aceptado")
# Al aceptar, añadimos el botón de "ENTREGAR" en el mismo mensaje
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton("📦 MARCAR ENTREGADO", callback_data=f"deliver_{loan_id}"))
nuevo_texto = f"✅ *ACEPTADO*\n{escape_md(call.message.text)}"
bot.edit_message_text(nuevo_texto, call.message.chat.id, call.message.message_id,
reply_markup=markup, parse_mode="Markdown")
socketio.emit('notification', {"text": f"Préstamo {loan_id[:8]} ACEPTADO", "color": "green"})
elif call.data.startswith("decline_"):
loan_id = call.data.replace("decline_", "")
if loan_mgr.update_status(loan_id, "DECLINED"):
bot.answer_callback_query(call.id, "Préstamo Declinado")
nuevo_texto = f"❌ *DECLINADO*\n{escape_md(call.message.text)}"
bot.edit_message_text(nuevo_texto, call.message.chat.id, call.message.message_id, parse_mode="Markdown")
socketio.emit('notification', {"text": f"Préstamo {loan_id[:8]} DECLINADO", "color": "red"})
elif call.data.startswith("deliver_"):
loan_id = call.data.replace("deliver_", "")
success, loan, ahora = mark_as_delivered(loan_id)
if success:
bot.answer_callback_query(call.id, "Entrega Confirmada")
nuevo_texto = f"📦 *ENTREGADO A LAS {ahora}*\n{escape_md(call.message.text)}"
bot.edit_message_text(nuevo_texto, call.message.chat.id, call.message.message_id, parse_mode="Markdown")
else:
bot.answer_callback_query(call.id, "Error: El préstamo no está aceptado o no existe", show_alert=True)
elif call.data.startswith("approve_user_"):
username = call.data.replace("approve_user_", "")
if user_mgr.activate_user(username):
bot.answer_callback_query(call.id, "Usuario aprobado")
bot.edit_message_text(f"✅ *APROBADO:* El usuario `{username}` ya puede iniciar sesión.",
call.message.chat.id, call.message.message_id, parse_mode="Markdown")
socketio.emit('notification', {"text": f"Usuario {username} APROBADO", "color": "green"})
elif call.data.startswith("decline_user_"):
username = call.data.replace("decline_user_", "")
if user_mgr.delete_user(username):
bot.answer_callback_query(call.id, "Usuario rechazado")
bot.edit_message_text(f"❌ *RECHAZADO:* El registro de `{username}` ha sido eliminado.",
call.message.chat.id, call.message.message_id, parse_mode="Markdown")
except Exception as e:
print(f"Callback Error: {e}")
@bot.message_handler(commands=['aceptar', 'declinar', 'entregado', 'status'])
def handle_text_commands(message):
try:
text = message.text.split()
if len(text) < 2:
bot.reply_to(message, "Uso: /aceptar <id>, /declinar <id> o /entregado <id>")
return
cmd = text[0][1:]; loan_id = text[1]
if cmd == "entregado":
success, loan, ahora = mark_as_delivered(loan_id)
if success:
bot.reply_to(message, f"📦 Entrega confirmada para {loan['Solicitante']} a las {ahora}")
else:
bot.reply_to(message, "No se pudo marcar como entregado. Verifica el ID y el estado.")
else:
status = "ACCEPTED" if cmd == "aceptar" else "DECLINED"
if loan_mgr.update_status(loan_id, status):
emoji = "✅" if status == "ACCEPTED" else "❌"
bot.reply_to(message, f"{emoji} Préstamo {loan_id} actualizado a {status}")
socketio.emit('notification', {"text": f"Préstamo {loan_id[:8]} {status}", "color": "green" if status == "ACCEPTED" else "red"})
except Exception as e:
print(f"Command Error: {e}")
def start_bot_thread():
if not bot:
return
# Usar un lock de archivo para prevenir múltiples instancias en Gunicorn workers o Flask reloader
try:
# Abrimos el archivo en modo escritura y pedimos un lock exclusivo no bloqueante
lock_file_path = "/tmp/tg_bot.lock"
# Mantenemos la referencia al archivo abierta para que el lock sea efectivo mientras el hilo viva
# Nota: En Python, si el objeto file se cierra, el lock se libera.
# Por eso lo hacemos global o lo mantenemos en el scope del bucle.
global _bot_lock_file
_bot_lock_file = open(lock_file_path, "w")
fcntl.flock(_bot_lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (IOError, OSError):
# Si no podemos obtener el lock, es que otra instancia ya lo tiene
print("[BOT] Otra instancia detectada. No se iniciará el bot en este hilo.")
return
# Espera inicial para que los recursos se estabilicen
time.sleep(5)
try:
bot.delete_webhook()
print("[BOT] Webhook eliminado.")
except Exception as e:
print(f"[BOT] Error eliminando webhook: {e}")
print("[BOT] Iniciando infinity_polling...")
# infinity_polling ya maneja reconexiones automáticas y errores internos.
# timeout: Cada cuánto tiempo la librería refresca la conexión.
# long_polling_timeout: Cuánto tiempo espera el servidor de Telegram antes de responder vacío.
bot.infinity_polling(timeout=90, long_polling_timeout=30)
if bot:
threading.Thread(target=start_bot_thread, daemon=True).start()
# --- RUTAS WEB ---
@app.route('/')
def index():
return render_template('index.html', title="MAKER SPACE")
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('miembros'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user_data = user_mgr.get_by_username(username)
if user_data:
if user_data.get('status') != "ACTIVE":
flash("Tu cuenta está pendiente de aprobación por un administrador.", "orange")
return render_template('login.html')
if check_password_hash(user_data['password'], password):
user = User(user_data['id'], user_data['username'])
login_user(user)
flash(f"¡Bienvenido de nuevo, {username}!", "green")
return redirect(url_for('miembros'))
flash("Usuario o contraseña incorrectos", "red")
return render_template('login.html', title="Login")
@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('miembros'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
if password != confirm_password:
flash("Las contraseñas no coinciden", "red")
return render_template('register.html')
if user_mgr.add_user(username, password, email, status="PENDING"):
# Notificar a Telegram sobre el nuevo registro
if bot and TG_CHAT_ID:
try:
markup = types.InlineKeyboardMarkup()
markup.add(
types.InlineKeyboardButton("✅ Aprobar", callback_data=f"approve_user_{username}"),
types.InlineKeyboardButton("❌ Rechazar", callback_data=f"decline_user_{username}")
)
bot.send_message(TG_CHAT_ID, f"👤 *NUEVO REGISTRO:* `{username}` solicita acceso.",
reply_markup=markup, parse_mode="Markdown")
except Exception as e:
print(f"Error TG Register: {e}")
flash("Registro enviado. Un administrador debe aprobar tu cuenta.", "blue")
return redirect(url_for('login'))
else:
flash("El nombre de usuario ya existe.", "red")
return render_template('register.html', title="Registro")
@app.route('/logout')
@login_required
def logout():
logout_user()
flash("Has cerrado sesión", "blue")
return redirect(url_for('index'))
def send_email(to_email, subject, body):
"""Envía un correo electrónico usando Resend (100% gratis hasta 100 emails/día)."""
resend_api_key = os.getenv("RESEND_API_KEY")
from_email = os.getenv("FROM_EMAIL", "onboarding@resend.dev") # Email verificado en Resend
if not resend_api_key:
print("[EMAIL ERROR] Falta RESEND_API_KEY en variables de entorno.")
return False
try:
resend.api_key = resend_api_key
params = {
"from": from_email,
"to": [to_email],
"subject": subject,
"text": body,
}
email_response = resend.Emails.send(params)
print(f"[EMAIL SUCCESS] Correo enviado a {to_email}: {email_response}")
return True
except Exception as e:
print(f"[EMAIL ERROR] Error enviando correo con Resend: {e}")
return False
@app.route('/forgot-password', methods=['GET', 'POST'])
def forgot_password():
if current_user.is_authenticated:
return redirect(url_for('miembros'))
if request.method == 'POST':
email = request.form.get('email')
token = user_mgr.generate_reset_token(email)
if token:
reset_url = url_for('reset_password', token=token, _external=True)
# Intentar enviar correo real
subject = "Recuperación de Contraseña - Maker Space"
body = f"Hola,\n\nHemos recibido una solicitud para restablecer tu contraseña. Haz clic en el siguiente enlace para continuar:\n\n{reset_url}\n\nSi no solicitaste este cambio, puedes ignorar este correo.\n\nEste enlace caducará en 1 hora."
if send_email(email, subject, body):
flash("Se ha enviado un enlace de recuperación a tu correo.", "blue")
else:
# Fallback a logs si falla el envío
print(f"\n[EMAIL FALLBACK] Para: {email}")
print(f"[EMAIL FALLBACK] Enlace: {reset_url}\n")
flash("Hubo un problema enviando el correo. Contacta al administrador o revisa los logs.", "orange")
else:
# Por seguridad, no decimos si el email existe
flash("Si el correo está registrado, recibirás un enlace de recuperación.", "blue")
return redirect(url_for('login'))
return render_template('forgot_password.html', title="Recuperar Contraseña")
@app.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_password(token):
if current_user.is_authenticated:
return redirect(url_for('miembros'))
user = user_mgr.verify_reset_token(token)
if not user:
flash("El enlace de recuperación es inválido o ha expirado.", "red")
return redirect(url_for('forgot_password'))
if request.method == 'POST':
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
if password != confirm_password:
flash("Las contraseñas no coinciden", "red")
return render_template('reset_password.html', title="Nueva Contraseña")
if user_mgr.update_password(user['id'], password):
flash("Contraseña actualizada correctamente. Ya puedes iniciar sesión.", "green")
return redirect(url_for('login'))
else:
flash("Error al actualizar la contraseña.", "red")
return render_template('reset_password.html', title="Nueva Contraseña")
@app.route('/prestamos')
def prestamos():
loans = loan_mgr.get_all()
return render_template('prestamos.html', title="Préstamos", loans=loans)
@app.route('/api/prestamo', methods=['POST'])
def api_prestamo():
try:
data = request.json
if not data:
return jsonify({"status": "error", "message": "No data received"}), 400
solicitante = data.get('solicitante')
hora_salida = data.get('hora_salida')
hora_retorno = data.get('hora_retorno')
items_list = data.get('items', [])
if not solicitante:
return jsonify({"status": "error", "message": "Solicitante es requerido"}), 400
# Formatear items para el campo 'item'
items_desc = []
for it in items_list:
items_desc.append(f"• {it['descripcion']} ({it['cantidad']}) [{it['categoria']}]")
full_items_string = "\n".join(items_desc)
loan_id = str(uuid.uuid4())
new_loan = {
"id": loan_id,
"Solicitante": solicitante,
"hora": hora_salida,
"devolucion": hora_retorno,
"item": full_items_string,
"status_loan": "PENDING",
"timestamp": datetime.datetime.now().isoformat()
}
loan_mgr.add_loan(new_loan)
# Notificar a Telegram
if bot and TG_CHAT_ID:
try:
msg = f"📦 *NUEVA SOLICITUD DE PRÉSTAMO*\n\n"
msg += f"👤 *Solicitante:* {escape_md(solicitante)}\n"
msg += f"🕒 *Horario:* {hora_salida} - {hora_retorno}\n"
msg += f"🛠 *Herramientas:*\n{escape_md(full_items_string)}"
markup = types.InlineKeyboardMarkup()
markup.add(
types.InlineKeyboardButton("✅ ACEPTAR", callback_data=f"accept_{loan_id}"),
types.InlineKeyboardButton("❌ DECLINAR", callback_data=f"decline_{loan_id}")
)
bot.send_message(TG_CHAT_ID, msg, reply_markup=markup, parse_mode="Markdown")
except Exception as tg_e:
print(f"Error TG Loan: {tg_e}")
# Emitir notificación por socket
socketio.emit('notification', {"text": f"Nueva solicitud de {solicitante}", "color": "orange"})
return jsonify({"status": "success", "loan_id": loan_id})
except Exception as e:
print(f"API Prestamo Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
# --- CLASSROOM MAKER ---
@app.route('/miembros')
@login_required
def miembros():
return render_template('miembros.html', title="Acceso Miembros")
@app.route('/classroom')
def classroom():
return render_template('classroom.html', title="Classroom Maker")
@app.route('/asistencia')
def asistencia():
return render_template('asistencia.html', title="Toma de Asistencia")
@app.route('/tutoria')
def tutoria():
return render_template('tutoria.html', title="Tutoría y Guías")
@app.route('/guia/registro')
def guia_registro():
return render_template('guia_registro.html', title="Guía de Registro")
@app.route('/guia/herramientas')
def guia_herramientas():
return render_template('guia_herramientas.html', title="Guía de Herramientas")
@app.route('/guia/asistencia')
def guia_asistencia():
return render_template('guia_asistencia.html', title="Guía de Asistencia")
@app.route('/guia/gestion')
def guia_gestion():
return render_template('guia_gestion.html', title="Guía de Gestión")
# --- RUTAS DE GESTIÓN DE AULAS ---
# --- RUTAS DE GESTIÓN DE AULAS ---
@app.route('/classroom/dashboard')
@login_required
def classroom_dashboard():
courses = classroom_manager.get_courses()
return render_template('classroom_dashboard.html', title="Gestión de Aulas", courses=courses)
@app.route('/classroom/create', methods=['POST'])
@login_required
def create_course():
name = request.form.get('name')
if name:
classroom_manager.create_course(name)
flash('Curso creado exitosamente', 'green')
else:
flash('El nombre del curso es requerido', 'red')
return redirect(url_for('classroom_dashboard'))
@app.route('/classroom/<course_id>')
@login_required
def course_details(course_id):
course = classroom_manager.get_course(course_id)
if not course:
flash('Curso no encontrado', 'red')
return redirect(url_for('classroom_dashboard'))
return render_template('course_details.html', title=course['name'], course=course)
@app.route('/classroom/<course_id>/add_student', methods=['POST'])
@login_required
def add_student(course_id):
name = request.form.get('student_name')
if name:
if classroom_manager.add_student(course_id, name):
flash('Estudiante agregado', 'green')
else:
flash('Estudiante ya existe o error al agregar', 'red')
return redirect(url_for('course_details', course_id=course_id))
@app.route('/classroom/<course_id>/delete', methods=['POST'])
@login_required
def delete_course(course_id):
classroom_manager.delete_course(course_id)
flash('Curso eliminado', 'green')
return redirect(url_for('classroom_dashboard'))
@app.route('/classroom/<course_id>/delete_student/<student_id>', methods=['POST'])
@login_required
def delete_student(course_id, student_id):
if classroom_manager.delete_student(course_id, student_id):
flash('Estudiante eliminado', 'green')
else:
flash('Error al eliminar estudiante', 'red')
return redirect(url_for('course_details', course_id=course_id))
@app.route('/api/courses')
def api_courses():
"""Devuelve la lista de cursos y sus estudiantes para el frontend."""
courses = classroom_manager.get_courses()
return jsonify(courses)
@app.route('/api/faces', methods=['GET', 'POST'])
def api_faces():
if request.method == 'POST':
data = request.json
label = data.get('label') # Ahora esto puede ser el ID o Nombre
descriptor = data.get('descriptor')
student_id = data.get('student_id')
course_id = data.get('course_id')
if label and descriptor:
# Aquí idealmente guardaríamos el descriptor asociado al ID del estudiante
# Por simplicidad en este prototipo sin DB vectorial real,
# guardamos un JSON local "faces.json" o similar.
face_data = {
"label": label, # Nombre para mostrar
"descriptor": descriptor,
"student_id": student_id,
"course_id": course_id
}
# Cargar existentes
faces = []
if os.path.exists('faces.json'):
try:
with open('faces.json', 'r') as f:
faces = json.load(f)
except:
pass
faces.append(face_data)
with open('faces.json', 'w') as f:
json.dump(faces, f)
return jsonify({"status": "success"})
# GET: Devolver rostros guardados
if os.path.exists('faces.json'):
try:
with open('faces.json', 'r') as f:
return jsonify(json.load(f))
except:
return jsonify([])
return jsonify([])
@app.route('/api/attendance', methods=['POST'])
def api_attendance():
data = request.json
label = data.get('label') # El label que viene de face-api (puede ser el nombre)
if label:
print(f"[ASISTENCIA] Registrando: {label}")
# Registrar en el sistema de aulas
# ClassroomManager buscará por nombre o ID
if classroom_manager.record_attendance(label):
socketio.emit('notification', {'text': f'Bienvenido/a {label}', 'color': 'green'})
return jsonify({"status": "success", "message": f"Asistencia registrada para {label}"})
else:
# Si no se encuentra en una clase, igual notificar pero indicar advertencia?
# O asumimos que es un invitado.
socketio.emit('notification', {'text': f'Hola {label} (No inscrito)', 'color': 'blue'})
return jsonify({"status": "warning", "message": "Registrado pero no vinculado a curso"})
return jsonify({"status": "error", "message": "No label provided"}), 400
@app.route('/repos')
def repos():
GIT_TOKEN = os.getenv("GITLAB_TOKEN")
GIT_GROUP = os.getenv("GITLAB_GROUP_ID")
projects = []
if GIT_TOKEN and GIT_GROUP:
try:
gl = gitlab.Gitlab("https://gitlab.com", private_token=GIT_TOKEN)
projects = gl.groups.get(GIT_GROUP).projects.list(all=True)
except: pass
return render_template('repos.html', title="Proyectos", projects=projects)
@app.route('/ver/<pid>/<pname>')
def ver_repo(pid, pname):
GIT_TOKEN = os.getenv("GITLAB_TOKEN")
readme_html = "<p>README no disponible</p>"
web_url = "#"; download_url = "#"
if GIT_TOKEN:
try:
gl = gitlab.Gitlab("https://gitlab.com", private_token=GIT_TOKEN)
project = gl.projects.get(pid)
web_url = project.web_url
download_url = f"https://gitlab.com/api/v4/projects/{pid}/repository/archive.zip?private_token={GIT_TOKEN}"
for branch in ["main", "master"]:
try:
f = project.files.get(file_path='README.md', ref=branch)
readme_text = base64.b64decode(f.content).decode("utf-8")
readme_html = markdown.markdown(readme_text, extensions=['fenced_code', 'tables'])
break
except: continue
except: pass
return render_template('ver_repo.html', title=pname, project_name=pname, readme_html=readme_html, web_url=web_url, download_url=download_url)
if __name__ == '__main__':
port = int(os.getenv("PORT", 7860))
socketio.run(app, host="0.0.0.0", port=port, debug=True)