classhub-apis / app.py
K2MAR
feat: Système notifications complet
4645e9c
"""
ClassHub API Backend
Backend Python avec Flask et Firebase pour gérer l'authentification et les données
"""
from flask import Flask, request, jsonify
from flask_cors import CORS
from functools import wraps
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
import os
import json
import requests
from datetime import datetime, timedelta
# Configuration
app = Flask(__name__)
CORS(app)
# Secret key pour JWT
JWT_SECRET = os.getenv('JWT_SECRET', 'votre_secret_jwt_changez_moi')
# Firebase Realtime Database URL
FIREBASE_DB_URL = 'https://classehub-40d27-default-rtdb.firebaseio.com'
# Firebase Cloud Messaging (FCM) Server Key
# À récupérer depuis Firebase Console → Project Settings → Cloud Messaging → Server key
FCM_SERVER_KEY = os.getenv('FCM_SERVER_KEY', 'votre_fcm_server_key')
FCM_API_URL = 'https://fcm.googleapis.com/fcm/send'
print("✅ Backend initialisé avec Firebase Realtime Database")
print(f"📍 Database URL: {FIREBASE_DB_URL}")
# ==================== HELPERS PUSH NOTIFICATIONS ====================
def send_fcm_notification(token, title, body, data=None):
"""
Envoyer une notification push via Firebase Cloud Messaging
Args:
token: Token FCM de l'appareil
title: Titre de la notification
body: Corps de la notification
data: Données additionnelles (optionnel)
Returns:
bool: True si envoyé avec succès, False sinon
"""
try:
headers = {
'Authorization': f'key={FCM_SERVER_KEY}',
'Content-Type': 'application/json'
}
payload = {
'to': token,
'notification': {
'title': title,
'body': body,
'icon': '/icon-512.png',
'badge': '/icon-192.png',
'click_action': 'https://classehub.vercel.app'
},
'data': data or {}
}
response = requests.post(FCM_API_URL, headers=headers, json=payload)
if response.status_code == 200:
return True
else:
print(f"❌ Erreur FCM: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"❌ Erreur envoi notification: {e}")
return False
# ==================== HELPERS DATABASE ====================
def get_students():
"""Récupérer tous les étudiants depuis Firebase"""
try:
response = requests.get(f'{FIREBASE_DB_URL}/etudiants.json')
if response.status_code == 200:
return response.json() or {}
return {}
except Exception as e:
print(f"Erreur get_students: {e}")
return {}
def save_student(student_id, student_data):
"""Sauvegarder un étudiant dans Firebase"""
try:
response = requests.put(
f'{FIREBASE_DB_URL}/etudiants/{student_id}.json',
json=student_data
)
if response.status_code == 200:
print(f"✅ Étudiant {student_id} sauvegardé dans Firebase")
return response.status_code == 200
except Exception as e:
print(f"Erreur save_student: {e}")
return False
def get_student_by_id(student_id):
"""Récupérer un étudiant par ID depuis Firebase"""
try:
response = requests.get(f'{FIREBASE_DB_URL}/etudiants/{student_id}.json')
if response.status_code == 200:
return response.json()
return None
except Exception as e:
print(f"Erreur get_student_by_id: {e}")
return None
def find_student_by_email(email):
"""Trouver un étudiant par email"""
students = get_students()
for student_id, student_data in students.items():
if student_data and student_data.get('email') == email:
return student_id, student_data
return None, None
# ==================== MIDDLEWARES ====================
def token_required(f):
"""Middleware pour vérifier le token JWT"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({
'success': False,
'message': 'Token manquant'
}), 401
try:
# Extraire le token (format: "Bearer TOKEN")
if token.startswith('Bearer '):
token = token.split(' ')[1]
# Décoder le token
data = jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
request.user_id = data['id']
request.user_email = data['email']
except jwt.ExpiredSignatureError:
return jsonify({
'success': False,
'message': 'Token expiré'
}), 401
except jwt.InvalidTokenError:
return jsonify({
'success': False,
'message': 'Token invalide'
}), 401
return f(*args, **kwargs)
return decorated
# ==================== ROUTES D'AUTHENTIFICATION ====================
@app.route('/', methods=['GET'])
def home():
"""Page d'accueil de l'API"""
return jsonify({
'message': 'ClassHub API - Backend Python',
'version': '1.0.0',
'endpoints': {
'auth': {
'register': 'POST /api/auth/register',
'login': 'POST /api/auth/login',
'profile': 'GET /api/auth/profile (token requis)',
'change-password': 'POST /api/auth/change-password (token requis)',
'students': 'GET /api/auth/students (token requis)'
},
'cours': {
'list': 'GET /api/cours (token requis)',
'refresh': 'POST /api/cours/refresh (token requis)'
},
'groupes': {
'list': 'GET /api/groupes (token requis)',
'create': 'POST /api/groupes/create (token requis)'
},
'projets': {
'list': 'GET /api/projets (token requis)',
'create': 'POST /api/projets/create (token requis)'
},
'notifications': {
'subscribe': 'POST /api/notifications/subscribe (token requis)',
'new-project': 'POST /api/notifications/new-project (token requis)',
'new-groups': 'POST /api/notifications/new-groups (token requis)',
'group-assignment': 'POST /api/notifications/group-assignment (token requis)',
'check-deadlines': 'POST /api/notifications/check-deadlines (token requis)',
'check-examens': 'POST /api/notifications/check-examens (token requis)',
'check-exam-reminders': 'POST /api/notifications/check-exam-reminders (token requis)',
'check-new-items': 'POST /api/notifications/check-new-items (token requis)'
}
}
})
@app.route('/api/auth/register', methods=['POST'])
def register():
"""Inscription d'un nouvel étudiant"""
try:
data = request.get_json()
# Validation des données
required_fields = ['firstName', 'lastName', 'email', 'password']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({
'success': False,
'message': f'Le champ {field} est requis'
}), 400
firstName = data['firstName']
lastName = data['lastName']
email = data['email']
password = data['password']
promotion = data.get('promotion', 'S8 IA CYCLE ING IIA (2ème année)')
# Vérifier si l'email existe déjà
existing_id, existing_student = find_student_by_email(email)
if existing_student:
return jsonify({
'success': False,
'message': 'Cet email est déjà utilisé'
}), 400
# Hasher le mot de passe
hashed_password = generate_password_hash(password)
# Créer un nouvel étudiant
import uuid
etudiant_id = str(uuid.uuid4())
etudiant_data = {
'id': etudiant_id,
'firstName': firstName,
'lastName': lastName,
'email': email,
'password': hashed_password,
'promotion': promotion,
'createdAt': datetime.now().isoformat()
}
save_student(etudiant_id, etudiant_data)
# Générer un token JWT
token = jwt.encode({
'id': etudiant_id,
'email': email,
'exp': datetime.utcnow() + timedelta(days=7)
}, JWT_SECRET, algorithm='HS256')
# Retourner les données sans le mot de passe
etudiant_response = {k: v for k, v in etudiant_data.items() if k != 'password'}
return jsonify({
'success': True,
'message': 'Inscription réussie',
'token': token,
'user': etudiant_response
}), 201
except Exception as e:
print(f"Erreur lors de l'inscription: {e}")
return jsonify({
'success': False,
'message': 'Erreur serveur',
'error': str(e)
}), 500
@app.route('/api/auth/login', methods=['POST'])
def login():
"""Connexion d'un étudiant"""
try:
data = request.get_json()
# Validation
if not data.get('email') or not data.get('password'):
return jsonify({
'success': False,
'message': 'Email et mot de passe requis'
}), 400
email = data['email']
password = data['password']
# Rechercher l'étudiant par email
etudiant_id, etudiant_data = find_student_by_email(email)
if not etudiant_data:
return jsonify({
'success': False,
'message': 'Email ou mot de passe incorrect'
}), 401
# Vérifier le mot de passe
if not check_password_hash(etudiant_data['password'], password):
return jsonify({
'success': False,
'message': 'Email ou mot de passe incorrect'
}), 401
# Générer un token JWT
token = jwt.encode({
'id': etudiant_data['id'],
'email': etudiant_data['email'],
'exp': datetime.utcnow() + timedelta(days=7)
}, JWT_SECRET, algorithm='HS256')
# Retourner les données sans le mot de passe
etudiant_response = {k: v for k, v in etudiant_data.items() if k != 'password'}
return jsonify({
'success': True,
'message': 'Connexion réussie',
'token': token,
'user': etudiant_response
}), 200
except Exception as e:
print(f"Erreur lors de la connexion: {e}")
return jsonify({
'success': False,
'message': 'Erreur serveur',
'error': str(e)
}), 500
@app.route('/api/auth/profile', methods=['GET'])
@token_required
def get_profile():
"""Récupérer le profil de l'étudiant connecté"""
try:
etudiant_id = request.user_id
# Rechercher l'étudiant
etudiant_data = get_student_by_id(etudiant_id)
if not etudiant_data:
return jsonify({
'success': False,
'message': 'Étudiant non trouvé'
}), 404
# Retourner sans le mot de passe
etudiant_response = {k: v for k, v in etudiant_data.items() if k != 'password'}
return jsonify({
'success': True,
'user': etudiant_response
}), 200
except Exception as e:
print(f"Erreur lors de la récupération du profil: {e}")
return jsonify({
'success': False,
'message': 'Erreur serveur',
'error': str(e)
}), 500
@app.route('/api/auth/change-password', methods=['POST'])
@token_required
def change_password():
"""Changer le mot de passe de l'utilisateur connecté"""
try:
data = request.get_json()
current_password = data.get('currentPassword')
new_password = data.get('newPassword')
if not current_password or not new_password:
return jsonify({
'success': False,
'message': 'Mot de passe actuel et nouveau mot de passe requis'
}), 400
if len(new_password) < 6:
return jsonify({
'success': False,
'message': 'Le nouveau mot de passe doit contenir au moins 6 caractères'
}), 400
etudiant_id = request.user_id
# Récupérer l'étudiant
etudiant_data = get_student_by_id(etudiant_id)
if not etudiant_data:
return jsonify({
'success': False,
'message': 'Utilisateur non trouvé'
}), 404
# Vérifier le mot de passe actuel
stored_password = etudiant_data.get('password')
if not stored_password or stored_password != current_password:
return jsonify({
'success': False,
'message': 'Mot de passe actuel incorrect'
}), 401
# Mettre à jour le mot de passe dans Firebase
response = requests.patch(
f'{FIREBASE_DB_URL}/etudiants/{etudiant_id}.json',
json={'password': new_password}
)
if response.status_code == 200:
return jsonify({
'success': True,
'message': 'Mot de passe modifié avec succès'
}), 200
else:
return jsonify({
'success': False,
'message': 'Erreur lors de la mise à jour'
}), 500
except Exception as e:
print(f"❌ Erreur changement mot de passe: {e}")
return jsonify({
'success': False,
'message': str(e)
}), 500
@app.route('/api/auth/students', methods=['GET'])
@token_required
def get_all_students():
"""Récupérer la liste de tous les étudiants"""
try:
etudiants_data = get_students()
if not etudiants_data:
return jsonify({
'success': True,
'count': 0,
'etudiants': []
}), 200
# Formater la réponse sans les mots de passe
etudiants_list = []
for etudiant_id, etudiant in etudiants_data.items():
etudiant_clean = {k: v for k, v in etudiant.items() if k != 'password'}
etudiants_list.append(etudiant_clean)
return jsonify({
'success': True,
'count': len(etudiants_list),
'etudiants': etudiants_list
}), 200
except Exception as e:
print(f"Erreur lors de la récupération des étudiants: {e}")
return jsonify({
'success': False,
'message': 'Erreur serveur',
'error': str(e)
}), 500
# ==================== ROUTES COURS ====================
@app.route('/api/cours', methods=['GET'])
@token_required
def get_cours():
"""Récupérer l'emploi du temps depuis Firebase"""
try:
# Récupérer depuis Firebase
response = requests.get(f'{FIREBASE_DB_URL}/emploi_du_temps.json')
if response.status_code != 200:
return jsonify({
'success': False,
'message': 'Emploi du temps non disponible'
}), 404
emploi_data = response.json()
if not emploi_data:
return jsonify({
'success': False,
'message': 'Emploi du temps vide'
}), 404
return jsonify({
'success': True,
'data': emploi_data
}), 200
except Exception as e:
print(f"Erreur lors de la récupération des cours: {e}")
return jsonify({
'success': False,
'message': 'Erreur serveur',
'error': str(e)
}), 500
@app.route('/api/cours/refresh', methods=['POST'])
@token_required
def refresh_cours():
"""Rafraîchir l'emploi du temps en lançant le scraper"""
try:
import subprocess
# Lancer le scraper depuis le nouveau dossier
scraper_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'hyperplannigscrap')
venv_python = os.path.join(os.path.dirname(__file__), 'venv', 'bin', 'python3')
scraper_path = os.path.join(scraper_dir, 'scraper_hyperplanning.py')
result = subprocess.run(
[venv_python, scraper_path],
capture_output=True,
text=True,
timeout=180,
cwd=scraper_dir
)
if result.returncode == 0:
return jsonify({
'success': True,
'message': 'Emploi du temps rafraîchi avec succès'
}), 200
else:
return jsonify({
'success': False,
'message': 'Erreur lors du rafraîchissement',
'error': result.stderr
}), 500
except Exception as e:
print(f"Erreur lors du rafraîchissement: {e}")
return jsonify({
'success': False,
'message': 'Erreur serveur',
'error': str(e)
}), 500
# ==================== GESTION DES ERREURS ====================
@app.errorhandler(404)
def not_found(error):
return jsonify({
'success': False,
'message': 'Route non trouvée'
}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({
'success': False,
'message': 'Erreur serveur interne'
}), 500
# ==================== GESTION DES GROUPES ====================
@app.route('/api/groupes', methods=['GET'])
@token_required
def get_my_groups():
"""
Récupérer les groupes auxquels l'étudiant connecté appartient
"""
try:
current_user_id = request.user_id
# Récupérer tous les batches de groupes depuis Firebase
response = requests.get(f'{FIREBASE_DB_URL}/groupes.json')
if response.status_code != 200:
return jsonify({
'success': False,
'message': 'Erreur lors de la récupération des groupes'
}), 500
all_batches = response.json()
if not all_batches:
return jsonify({
'success': True,
'count': 0,
'groupes': []
}), 200
# Parcourir tous les batches et trouver les groupes de l'étudiant
my_groups = []
for batch_id, batch_data in all_batches.items():
if not batch_data or 'groups' not in batch_data:
continue
subject_name = batch_data.get('subjectName', 'Sans nom')
deadline = batch_data.get('deadline', '')
created_at = batch_data.get('createdAt', '')
for group_id, group_data in batch_data['groups'].items():
if not group_data or 'members' not in group_data:
continue
# Vérifier si l'étudiant est dans ce groupe
members = group_data.get('members', [])
is_member = any(member.get('id') == current_user_id for member in members)
if is_member:
my_groups.append({
'batchId': batch_id,
'groupId': group_id,
'groupName': group_data.get('name', 'Groupe'),
'subjectName': subject_name,
'deadline': deadline,
'createdAt': created_at,
'members': members,
'membersCount': len(members)
})
return jsonify({
'success': True,
'count': len(my_groups),
'groupes': my_groups
}), 200
except Exception as e:
print(f"❌ Erreur récupération groupes: {e}")
return jsonify({
'success': False,
'message': f'Erreur serveur: {str(e)}'
}), 500
@app.route('/api/groupes/create', methods=['POST'])
@token_required
def create_groups():
"""
Créer des groupes automatiquement
"""
try:
data = request.get_json()
subject_name = data.get('subjectName')
deadline = data.get('deadline')
students_per_group = data.get('studentsPerGroup')
groups = data.get('groups', [])
projet_id = data.get('projetId', '')
if not subject_name or not deadline or not students_per_group:
return jsonify({
'success': False,
'message': 'Données manquantes (subjectName, deadline, studentsPerGroup)'
}), 400
if len(groups) == 0:
return jsonify({
'success': False,
'message': 'Aucun groupe à créer'
}), 400
# Créer un ID unique pour cette série de groupes
import uuid
from datetime import datetime
batch_id = str(uuid.uuid4())
created_at = datetime.now().isoformat()
# Récupérer l'ID de l'utilisateur connecté depuis le token
current_user_id = request.user_id
# Préparer les données pour Firebase
groups_data = {
'batchId': batch_id,
'subjectName': subject_name,
'deadline': deadline,
'studentsPerGroup': students_per_group,
'createdAt': created_at,
'createdBy': current_user_id,
'groupsCount': len(groups),
'projetId': projet_id,
'groups': {}
}
# Ajouter chaque groupe
for idx, group in enumerate(groups):
group_id = f"group_{idx + 1}"
groups_data['groups'][group_id] = {
'name': group.get('name', f"Groupe {idx + 1}"),
'members': group.get('members', [])
}
# Sauvegarder dans Firebase
response = requests.put(
f'{FIREBASE_DB_URL}/groupes/{batch_id}.json',
json=groups_data
)
if response.status_code == 200:
return jsonify({
'success': True,
'message': f'{len(groups)} groupes créés avec succès',
'batchId': batch_id,
'groupsCount': len(groups)
}), 201
else:
return jsonify({
'success': False,
'message': 'Erreur lors de la sauvegarde dans Firebase'
}), 500
except Exception as e:
print(f"❌ Erreur création groupes: {e}")
return jsonify({
'success': False,
'message': f'Erreur serveur: {str(e)}'
}), 500
# ==================== GESTION DES PROJETS ====================
@app.route('/api/projets', methods=['GET'])
@token_required
def get_projets():
"""
Récupérer tous les projets depuis Firebase
"""
try:
# Récupérer tous les projets depuis Firebase
response = requests.get(f'{FIREBASE_DB_URL}/projets.json')
if response.status_code != 200:
return jsonify({
'success': False,
'message': 'Erreur lors de la récupération des projets'
}), 500
all_projets = response.json()
if not all_projets:
return jsonify({
'success': True,
'count': 0,
'projets': []
}), 200
# Formater les projets
projets_list = []
for projet_id, projet_data in all_projets.items():
if not projet_data:
continue
projets_list.append({
'projetId': projet_id,
'subjectName': projet_data.get('subjectName', ''),
'groupsWanted': projet_data.get('groupsWanted', 0),
'deadline': projet_data.get('deadline', ''),
'profName': projet_data.get('profName', ''),
'profEmail': projet_data.get('profEmail', ''),
'createdAt': projet_data.get('createdAt', ''),
'createdBy': projet_data.get('createdBy', ''),
'hasGroups': projet_data.get('hasGroups', False)
})
return jsonify({
'success': True,
'count': len(projets_list),
'projets': projets_list
}), 200
except Exception as e:
print(f"❌ Erreur récupération projets: {e}")
return jsonify({
'success': False,
'message': f'Erreur serveur: {str(e)}'
}), 500
@app.route('/api/projets/create', methods=['POST'])
@token_required
def create_projet():
"""
Créer un nouveau projet
"""
try:
data = request.get_json()
subject_name = data.get('subjectName')
groups_wanted = data.get('groupsWanted')
deadline = data.get('deadline')
prof_name = data.get('profName', '')
prof_email = data.get('profEmail', '')
if not subject_name or not groups_wanted or not deadline:
return jsonify({
'success': False,
'message': 'Données manquantes (subjectName, groupsWanted, deadline)'
}), 400
# Créer un ID unique pour le projet
import uuid
from datetime import datetime
projet_id = str(uuid.uuid4())
created_at = datetime.now().isoformat()
current_user_id = request.user_id
# Préparer les données pour Firebase
projet_data = {
'projetId': projet_id,
'subjectName': subject_name,
'groupsWanted': groups_wanted,
'deadline': deadline,
'profName': prof_name,
'profEmail': prof_email,
'createdAt': created_at,
'createdBy': current_user_id,
'hasGroups': False
}
# Sauvegarder dans Firebase
response = requests.put(
f'{FIREBASE_DB_URL}/projets/{projet_id}.json',
json=projet_data
)
if response.status_code == 200:
return jsonify({
'success': True,
'message': 'Projet créé avec succès',
'projetId': projet_id,
'projet': projet_data
}), 201
else:
return jsonify({
'success': False,
'message': 'Erreur lors de la sauvegarde dans Firebase'
}), 500
except Exception as e:
print(f"❌ Erreur création projet: {e}")
return jsonify({
'success': False,
'message': f'Erreur serveur: {str(e)}'
}), 500
@app.route('/api/projets/<projet_id>/mark-complete', methods=['PATCH'])
@token_required
def mark_projet_complete(projet_id):
"""
Marquer un projet comme ayant des groupes créés
"""
try:
# Récupérer le projet
response = requests.get(f'{FIREBASE_DB_URL}/projets/{projet_id}.json')
if response.status_code != 200 or not response.json():
return jsonify({
'success': False,
'message': 'Projet non trouvé'
}), 404
projet_data = response.json()
projet_data['hasGroups'] = True
# Mettre à jour dans Firebase
update_response = requests.put(
f'{FIREBASE_DB_URL}/projets/{projet_id}.json',
json=projet_data
)
if update_response.status_code == 200:
return jsonify({
'success': True,
'message': 'Projet marqué comme complété'
}), 200
else:
return jsonify({
'success': False,
'message': 'Erreur lors de la mise à jour'
}), 500
except Exception as e:
print(f"❌ Erreur marquage projet: {e}")
return jsonify({
'success': False,
'message': f'Erreur serveur: {str(e)}'
}), 500
# ==================== ROUTES NOTIFICATIONS ====================
@app.route('/api/notifications/subscribe', methods=['POST'])
@token_required
def subscribe_to_notifications():
"""Enregistrer le token FCM d'un utilisateur"""
try:
data = request.get_json()
fcm_token = data.get('fcmToken')
if not fcm_token:
return jsonify({
'success': False,
'message': 'Token FCM manquant'
}), 400
# Sauvegarder le token FCM dans Firebase
user_id = request.user_id
subscription_data = {
'fcmToken': fcm_token,
'email': request.user_email,
'updatedAt': datetime.now().isoformat()
}
response = requests.put(
f'{FIREBASE_DB_URL}/subscriptions/{user_id}.json',
json=subscription_data
)
if response.status_code == 200:
print(f"✅ Token FCM enregistré pour {request.user_email}")
return jsonify({
'success': True,
'message': 'Token FCM enregistré'
}), 200
else:
return jsonify({
'success': False,
'message': 'Erreur lors de la sauvegarde'
}), 500
except Exception as e:
print(f"❌ Erreur enregistrement token FCM: {e}")
return jsonify({
'success': False,
'message': str(e)
}), 500
@app.route('/api/notifications/new-project', methods=['POST'])
@token_required
def notify_new_project():
"""Envoyer une notification à tous les étudiants pour un nouveau projet"""
try:
data = request.get_json()
subject_name = data.get('subjectName')
deadline = data.get('deadline')
prof_name = data.get('profName')
groups_wanted = data.get('groupsWanted')
if not subject_name or not deadline:
return jsonify({
'success': False,
'message': 'Données manquantes'
}), 400
# Récupérer tous les étudiants depuis Firebase
students_response = requests.get(f'{FIREBASE_DB_URL}/etudiants.json')
if students_response.status_code != 200:
return jsonify({
'success': False,
'message': 'Erreur récupération étudiants'
}), 500
students = students_response.json()
if not students:
return jsonify({
'success': False,
'message': 'Aucun étudiant trouvé'
}), 404
# Préparer le message
deadline_date = datetime.fromisoformat(deadline.replace('Z', '+00:00')).strftime('%d/%m/%Y')
message_title = f"📁 Nouveau projet: {subject_name}"
message_body = f"Deadline: {deadline_date}"
if prof_name:
message_body += f" | Prof: {prof_name}"
if groups_wanted:
message_body += f" | {groups_wanted} groupe(s) demandé(s)"
notifications_sent = 0
# Récupérer toutes les subscriptions
subscriptions_response = requests.get(f'{FIREBASE_DB_URL}/subscriptions.json')
if subscriptions_response.status_code == 200:
subscriptions = subscriptions_response.json()
if subscriptions:
for user_id, sub_data in subscriptions.items():
fcm_token = sub_data.get('fcmToken')
if fcm_token:
# Envoyer la notification push via FCM
success = send_fcm_notification(
fcm_token,
message_title,
message_body,
{
'type': 'new-project',
'subjectName': subject_name,
'deadline': deadline
}
)
if success:
notifications_sent += 1
print(f"✅ Notification nouveau projet envoyée à {notifications_sent} utilisateur(s)")
return jsonify({
'success': True,
'message': f'Notifications envoyées à {notifications_sent} utilisateur(s)',
'notificationsSent': notifications_sent
}), 200
except Exception as e:
print(f"❌ Erreur envoi notifications projet: {e}")
return jsonify({
'success': False,
'message': str(e)
}), 500
@app.route('/api/notifications/new-groups', methods=['POST'])
@token_required
def notify_new_groups():
"""Envoyer une notification aux membres des groupes créés"""
try:
data = request.get_json()
subject_name = data.get('subjectName')
deadline = data.get('deadline')
groups = data.get('groups', [])
if not subject_name or not groups:
return jsonify({
'success': False,
'message': 'Données manquantes'
}), 400
deadline_date = datetime.fromisoformat(deadline.replace('Z', '+00:00')).strftime('%d/%m/%Y')
notifications_sent = 0
# Récupérer toutes les subscriptions
subscriptions_response = requests.get(f'{FIREBASE_DB_URL}/subscriptions.json')
if subscriptions_response.status_code != 200:
return jsonify({
'success': False,
'message': 'Erreur récupération subscriptions'
}), 500
subscriptions = subscriptions_response.json()
# Pour chaque groupe, notifier ses membres
if subscriptions:
for group in groups:
group_name = group.get('name', 'Groupe')
members = group.get('members', [])
message_title = f"👥 Nouveau groupe: {group_name}"
message_body = f"Projet: {subject_name} | Deadline: {deadline_date}"
for member in members:
member_email = member.get('email')
if not member_email:
continue
# Trouver la subscription de ce membre
for user_id, sub_data in subscriptions.items():
if sub_data.get('email') == member_email:
fcm_token = sub_data.get('fcmToken')
if fcm_token:
# Envoyer la notification push via FCM
success = send_fcm_notification(
fcm_token,
message_title,
message_body,
{
'type': 'new-group',
'groupName': group_name,
'subjectName': subject_name,
'members': [m.get('firstName') for m in members]
}
)
if success:
notifications_sent += 1
break
print(f"✅ Notifications groupes envoyées à {notifications_sent} membre(s)")
return jsonify({
'success': True,
'message': f'Notifications envoyées à {notifications_sent} membre(s)',
'notificationsSent': notifications_sent
}), 200
except Exception as e:
print(f"❌ Erreur envoi notifications groupes: {e}")
return jsonify({
'success': False,
'message': str(e)
}), 500
@app.route('/api/notifications/group-assignment', methods=['POST'])
@token_required
def notify_group_assignment():
"""Envoyer une notification personnalisée à un membre de groupe"""
try:
data = request.get_json()
member_email = data.get('memberEmail')
group_name = data.get('groupName')
subject_name = data.get('subjectName')
deadline = data.get('deadline')
members = data.get('members', [])
if not member_email or not group_name or not subject_name:
return jsonify({
'success': False,
'message': 'Données manquantes'
}), 400
# Récupérer la subscription de ce membre
subscriptions_response = requests.get(f'{FIREBASE_DB_URL}/subscriptions.json')
if subscriptions_response.status_code != 200:
return jsonify({
'success': False,
'message': 'Erreur récupération subscriptions'
}), 500
subscriptions = subscriptions_response.json()
notification_sent = False
message_title = f"👥 Affectation: {group_name}"
message_body = f"{subject_name}"
if deadline:
deadline_date = datetime.fromisoformat(deadline.replace('Z', '+00:00')).strftime('%d/%m/%Y')
message_body += f" | Deadline: {deadline_date}"
if subscriptions:
for user_id, sub_data in subscriptions.items():
if sub_data.get('email') == member_email:
fcm_token = sub_data.get('fcmToken')
if fcm_token:
# Envoyer la notification push via FCM
notification_sent = send_fcm_notification(
fcm_token,
message_title,
message_body,
{
'type': 'group-assignment',
'groupName': group_name,
'subjectName': subject_name,
'members': [m.get('firstName') for m in members]
}
)
break
if notification_sent:
print(f"✅ Notification envoyée à {member_email}")
return jsonify({
'success': True,
'message': 'Notification envoyée'
}), 200
else:
return jsonify({
'success': False,
'message': 'Utilisateur non trouvé ou non abonné'
}), 404
except Exception as e:
print(f"❌ Erreur envoi notification: {e}")
return jsonify({
'success': False,
'message': str(e)
}), 500
@app.route('/api/notifications/check-deadlines', methods=['POST'])
@token_required
def check_upcoming_deadlines():
"""Vérifier les deadlines imminentes et envoyer des rappels (1h avant)"""
try:
# Récupérer tous les projets depuis Firebase
projets_response = requests.get(f'{FIREBASE_DB_URL}/projets.json')
if projets_response.status_code != 200:
return jsonify({
'success': False,
'message': 'Erreur récupération projets'
}), 500
all_projets = projets_response.json()
if not all_projets:
return jsonify({
'success': True,
'message': 'Aucun projet trouvé',
'notificationsSent': 0
}), 200
# Calculer la fenêtre de temps (maintenant + 1h)
now = datetime.now()
one_hour_later = now + timedelta(hours=1)
notifications_sent = 0
upcoming_projets = []
# Parcourir tous les projets
for projet_id, projet_data in all_projets.items():
if not projet_data or not projet_data.get('deadline'):
continue
# Parser la deadline
try:
deadline_str = projet_data['deadline']
# Support des formats ISO avec ou sans Z
if deadline_str.endswith('Z'):
deadline = datetime.fromisoformat(deadline_str.replace('Z', '+00:00'))
else:
deadline = datetime.fromisoformat(deadline_str)
# Vérifier si la deadline est dans environ 1h (±5 minutes)
time_until_deadline = (deadline - now).total_seconds()
# Si entre 55 et 65 minutes
if 3300 <= time_until_deadline <= 3900:
upcoming_projets.append({
'projetId': projet_id,
'subjectName': projet_data.get('subjectName', 'Projet'),
'deadline': deadline_str,
'profName': projet_data.get('profName', ''),
'minutesRemaining': int(time_until_deadline / 60)
})
except Exception as e:
print(f"⚠️ Erreur parsing deadline pour {projet_id}: {e}")
continue
# Si des projets sont imminents, envoyer des notifications
if upcoming_projets:
# Récupérer toutes les subscriptions
subscriptions_response = requests.get(f'{FIREBASE_DB_URL}/subscriptions.json')
if subscriptions_response.status_code == 200:
subscriptions = subscriptions_response.json()
if subscriptions:
for projet in upcoming_projets:
subject_name = projet['subjectName']
minutes = projet['minutesRemaining']
prof_name = projet.get('profName', '')
message_title = f"⏰ Rappel: {subject_name}"
message_body = f"Deadline dans {minutes} minute(s)!"
if prof_name:
message_body += f" | Prof: {prof_name}"
# Envoyer à tous les utilisateurs abonnés
for user_id, sub_data in subscriptions.items():
fcm_token = sub_data.get('fcmToken')
if fcm_token:
success = send_fcm_notification(
fcm_token,
message_title,
message_body,
{
'type': 'deadline-reminder',
'projetId': projet['projetId'],
'subjectName': subject_name,
'minutesRemaining': minutes
}
)
if success:
notifications_sent += 1
print(f"⏰ Rappel envoyé pour: {subject_name} (deadline dans {minutes}min)")
return jsonify({
'success': True,
'message': f'Vérification effectuée, {len(upcoming_projets)} projet(s) imminent(s)',
'upcomingProjets': upcoming_projets,
'notificationsSent': notifications_sent
}), 200
except Exception as e:
print(f"❌ Erreur vérification deadlines: {e}")
return jsonify({
'success': False,
'message': str(e)
}), 500
@app.route('/api/notifications/check-examens', methods=['POST'])
@token_required
def check_upcoming_examens():
"""Vérifier les examens/évaluations imminents dans l'emploi du temps (1h avant)"""
try:
# Récupérer l'emploi du temps depuis Firebase
emploi_response = requests.get(f'{FIREBASE_DB_URL}/emploi_du_temps.json')
if emploi_response.status_code != 200:
return jsonify({
'success': False,
'message': 'Erreur récupération emploi du temps'
}), 500
emploi_data = emploi_response.json()
if not emploi_data or not emploi_data.get('semaines'):
return jsonify({
'success': True,
'message': 'Aucun emploi du temps trouvé',
'notificationsSent': 0
}), 200
# Calculer la fenêtre de temps (maintenant + 1h)
now = datetime.now()
today_str = now.strftime('%a %d %B').lower() # ex: "mar 11 mars"
current_hour = now.hour
current_minute = now.minute
notifications_sent = 0
upcoming_examens = []
# Trouver la semaine actuelle
semaine_actuelle = emploi_data.get('semaine_actuelle')
current_week = None
for semaine in emploi_data.get('semaines', []):
if semaine.get('semaine') == semaine_actuelle:
current_week = semaine
break
if not current_week or not current_week.get('cours_par_jour'):
return jsonify({
'success': True,
'message': 'Aucun cours cette semaine',
'notificationsSent': 0
}), 200
# Parcourir tous les cours de la semaine
for jour_id, cours_list in current_week.get('cours_par_jour', {}).items():
for cours in cours_list:
# Vérifier si c'est un examen ou évaluation
cours_type = cours.get('details', ['', '', '', '', ''])[4] if cours.get('details') else ''
if cours_type.lower() not in ['examen', 'evaluation', 'évaluation', 'test', 'contrôle']:
continue
# Parser l'horaire (ex: "de 10h00 à 12h00 (02h00)")
horaire = cours.get('horaire', '')
if not horaire.startswith('de '):
continue
try:
# Extraire l'heure de début (ex: "10h00")
heure_debut_str = horaire.split('de ')[1].split(' à ')[0]
heure_parts = heure_debut_str.replace('h', ':').split(':')
heure_debut = int(heure_parts[0])
minute_debut = int(heure_parts[1]) if len(heure_parts) > 1 else 0
# Créer un datetime pour l'examen aujourd'hui
examen_time = now.replace(hour=heure_debut, minute=minute_debut, second=0, microsecond=0)
# Vérifier si l'examen est dans environ 1h (±5 minutes)
time_until_examen = (examen_time - now).total_seconds()
# Si entre 55 et 65 minutes
if 3300 <= time_until_examen <= 3900:
cours_nom = cours.get('details', [''])[0] if cours.get('details') else 'Examen'
prof_nom = cours.get('details', ['', ''])[1] if cours.get('details') and len(cours.get('details', [])) > 1 else ''
salle = cours.get('details', ['', '', ''])[2] if cours.get('details') and len(cours.get('details', [])) > 2 else ''
upcoming_examens.append({
'type': cours_type,
'nom': cours_nom,
'horaire': horaire,
'prof': prof_nom,
'salle': salle,
'minutesRemaining': int(time_until_examen / 60)
})
except Exception as e:
print(f"⚠️ Erreur parsing horaire {horaire}: {e}")
continue
# Si des examens sont imminents, envoyer des notifications
if upcoming_examens:
# Récupérer toutes les subscriptions
subscriptions_response = requests.get(f'{FIREBASE_DB_URL}/subscriptions.json')
if subscriptions_response.status_code == 200:
subscriptions = subscriptions_response.json()
if subscriptions:
for examen in upcoming_examens:
cours_nom = examen['nom']
minutes = examen['minutesRemaining']
salle = examen.get('salle', '')
type_cours = examen['type']
message_title = f"⏰ Rappel {type_cours}: {cours_nom}"
message_body = f"Dans {minutes} minute(s)"
if salle:
message_body += f" | {salle}"
# Envoyer à tous les utilisateurs abonnés
for user_id, sub_data in subscriptions.items():
fcm_token = sub_data.get('fcmToken')
if fcm_token:
success = send_fcm_notification(
fcm_token,
message_title,
message_body,
{
'type': 'examen-reminder',
'nom': cours_nom,
'typeExamen': type_cours,
'salle': salle,
'horaire': examen.get('horaire', ''),
'minutesRemaining': minutes
}
)
if success:
notifications_sent += 1
print(f"⏰ Rappel {type_cours} envoyé: {cours_nom} dans {minutes}min ({salle})")
return jsonify({
'success': True,
'message': f'Vérification effectuée, {len(upcoming_examens)} examen(s) imminent(s)',
'upcomingExamens': upcoming_examens,
'notificationsSent': notifications_sent
}), 200
except Exception as e:
print(f"❌ Erreur vérification examens: {e}")
return jsonify({
'success': False,
'message': str(e)
}), 500
@app.route('/api/notifications/check-exam-reminders', methods=['POST'])
@token_required
def check_exam_reminders():
"""Vérifier les examens à venir et envoyer rappels (7j, 5j, 3j, 2j, 1j)"""
try:
# Récupérer l'emploi du temps depuis Firebase
emploi_response = requests.get(f'{FIREBASE_DB_URL}/emploi_du_temps.json')
if emploi_response.status_code != 200:
return jsonify({
'success': False,
'message': 'Erreur récupération emploi du temps'
}), 500
emploi_data = emploi_response.json()
if not emploi_data or not emploi_data.get('semaines'):
return jsonify({
'success': True,
'message': 'Aucun emploi du temps',
'reminders': []
}), 200
now = datetime.now()
reminders = []
# Jours à checker : 7, 5, 3, 2, 1
days_to_check = [7, 5, 3, 2, 1]
# Parcourir toutes les semaines
for semaine in emploi_data.get('semaines', []):
for jour_id, cours_list in semaine.get('cours_par_jour', {}).items():
for cours in cours_list:
# Vérifier si c'est un examen
cours_type = cours.get('details', ['', '', '', '', ''])[4] if cours.get('details') else ''
if cours_type.lower() not in ['examen', 'evaluation', 'évaluation', 'test', 'contrôle']:
continue
# Extraire la date (format: "lun 03 mars")
date_str = cours.get('date', '')
try:
# Parser la date complète avec l'année actuelle
# Format: "lun 03 mars"
parts = date_str.split()
if len(parts) >= 3:
day_num = int(parts[1])
month_name = parts[2].lower()
# Map mois français
mois = {
'janvier': 1, 'février': 2, 'mars': 3, 'avril': 4,
'mai': 5, 'juin': 6, 'juillet': 7, 'août': 8,
'septembre': 9, 'octobre': 10, 'novembre': 11, 'décembre': 12
}
month_num = mois.get(month_name)
if not month_num:
continue
# Créer la date de l'examen
year = now.year
examen_date = datetime(year, month_num, day_num)
# Si la date est passée, essayer l'année prochaine
if examen_date < now:
examen_date = datetime(year + 1, month_num, day_num)
# Calculer jours restants
days_until = (examen_date - now).days
# Vérifier si on doit envoyer un rappel
if days_until in days_to_check:
cours_nom = cours.get('details', [''])[0] if cours.get('details') else 'Examen'
salle = cours.get('details', ['', '', ''])[2] if cours.get('details') and len(cours.get('details', [])) > 2 else ''
horaire = cours.get('horaire', '')
reminders.append({
'type': cours_type,
'nom': cours_nom,
'date': date_str,
'horaire': horaire,
'salle': salle,
'daysRemaining': days_until
})
except Exception as e:
print(f"⚠️ Erreur parsing date {date_str}: {e}")
continue
return jsonify({
'success': True,
'reminders': reminders
}), 200
except Exception as e:
print(f"❌ Erreur vérification rappels examens: {e}")
return jsonify({
'success': False,
'message': str(e)
}), 500
@app.route('/api/notifications/check-new-items', methods=['POST'])
@token_required
def check_new_items():
"""Détecter les nouveaux projets et groupes créés depuis dernière vérification"""
try:
data = request.get_json()
last_check = data.get('lastCheck') # Timestamp ISO
if not last_check:
return jsonify({
'success': False,
'message': 'lastCheck requis'
}), 400
last_check_time = datetime.fromisoformat(last_check.replace('Z', '+00:00'))
new_projets = []
new_groupes = []
# 1. Vérifier nouveaux projets
projets_response = requests.get(f'{FIREBASE_DB_URL}/projets.json')
if projets_response.status_code == 200:
all_projets = projets_response.json()
if all_projets:
for projet_id, projet_data in all_projets.items():
if not projet_data or not projet_data.get('createdAt'):
continue
try:
created_at = datetime.fromisoformat(projet_data['createdAt'].replace('Z', '+00:00'))
if created_at > last_check_time:
new_projets.append({
'projetId': projet_id,
'subjectName': projet_data.get('subjectName', 'Projet'),
'deadline': projet_data.get('deadline', ''),
'profName': projet_data.get('profName', ''),
'createdAt': projet_data.get('createdAt')
})
except Exception as e:
print(f"⚠️ Erreur parsing projet {projet_id}: {e}")
continue
# 2. Vérifier nouveaux groupes
groupes_response = requests.get(f'{FIREBASE_DB_URL}/groupes.json')
if groupes_response.status_code == 200:
all_groupes = groupes_response.json()
if all_groupes:
for batch_id, batch_data in all_groupes.items():
if not batch_data or not batch_data.get('createdAt'):
continue
try:
created_at = datetime.fromisoformat(batch_data['createdAt'].replace('Z', '+00:00'))
if created_at > last_check_time:
# Vérifier si l'utilisateur actuel est dans un des groupes
user_email = request.current_user.get('email')
user_groups = []
for groupe in batch_data.get('groups', []):
for member in groupe.get('members', []):
if member.get('email') == user_email:
user_groups.append({
'groupName': groupe.get('name', 'Groupe'),
'members': [f"{m.get('firstName', '')} {m.get('lastName', '')}" for m in groupe.get('members', [])]
})
break
if user_groups:
new_groupes.append({
'batchId': batch_id,
'subjectName': batch_data.get('subjectName', 'Projet'),
'deadline': batch_data.get('deadline', ''),
'userGroups': user_groups,
'createdAt': batch_data.get('createdAt')
})
except Exception as e:
print(f"⚠️ Erreur parsing groupe {batch_id}: {e}")
continue
return jsonify({
'success': True,
'newProjets': new_projets,
'newGroupes': new_groupes
}), 200
except Exception as e:
print(f"❌ Erreur vérification nouveautés: {e}")
return jsonify({
'success': False,
'message': str(e)
}), 500
# ==================== DÉMARRAGE DU SERVEUR ====================
if __name__ == '__main__':
port = int(os.getenv('PORT', 5000))
print(f"\n🚀 Serveur ClassHub démarré sur le port {port}")
print(f"📍 API disponible sur http://localhost:{port}")
print(f"📚 Documentation: http://localhost:{port}\n")
app.run(
host='0.0.0.0',
port=port,
debug=True
)