""" Database-based Task Scheduler for Apex Ores Remplace les cron jobs traditionnels par un système robuste basé sur la DB: - Rattrapage automatique des tâches manquées après redémarrage - Historique complet des exécutions - Interface admin pour monitoring et contrôle - Tolérance aux pannes serveur Usage: # Mode daemon (boucle infinie) python -m app.task_runner --daemon # Mode one-shot (pour cron ou test) python -m app.task_runner # Via Flask CLI flask run-scheduler """ import sys import os import socket import traceback from datetime import datetime, timedelta, timezone, date, time from io import StringIO from contextlib import redirect_stdout sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app import create_app, db from app.models import ScheduledTask, TaskExecutionLog class TaskRunner: def __init__(self): self.app = create_app() self.hostname = socket.gethostname() self.pid = os.getpid() def run_pending_tasks(self): """Exécute toutes les tâches qui doivent tourner""" with self.app.app_context(): now = datetime.now(timezone.utc) pending_tasks = ScheduledTask.query.filter( ScheduledTask.is_active == True, db.or_( ScheduledTask.next_run_at == None, ScheduledTask.next_run_at <= now ), ).all() print(f"[{now}] {len(pending_tasks)} tâche(s) à exécuter") for task in pending_tasks: self.execute_task(task) def execute_task(self, task): """Exécute une tâche spécifique avec logging complet""" print(f"\n[{datetime.now(timezone.utc)}] Exécution de: {task.name}") with self.app.app_context(): execution_log = TaskExecutionLog( task_id=task.id, task_name=task.name, server_hostname=self.hostname, process_id=self.pid, ) db.session.add(execution_log) task.mark_running() execution_log.status = "running" db.session.commit() try: output = self.run_task_function(task.name) execution_log.status = "success" execution_log.completed_at = datetime.now(timezone.utc) execution_log.output_log = output task.mark_success() print(f" ✓ {task.name} terminé avec succès") except Exception as e: error_msg = f"{str(e)}\n{traceback.format_exc()}" execution_log.status = "failed" execution_log.completed_at = datetime.now(timezone.utc) execution_log.error_message = error_msg[:2000] task.mark_failed(str(e)) print(f" ✗ {task.name} a échoué: {str(e)}") db.session.commit() def run_task_function(self, task_name): """Exécute la fonction associée au nom de la tâche""" f = StringIO() with redirect_stdout(f): if task_name == "daily_gains": self._run_daily_gains() elif task_name == "hourly_cleanup": self._run_hourly_cleanup() elif task_name == "weekly_report": self._run_weekly_report() else: raise ValueError(f"Tâche inconnue: {task_name}") return f.getvalue() def _run_daily_gains(self): """Calcule les gains quotidiens et traite les commissions""" from scripts.daily_gains import ( calculate_daily_gains, auto_process_withdrawals, cleanup_old_notifications, print_platform_summary, ) calculate_daily_gains() auto_process_withdrawals() cleanup_old_notifications() print_platform_summary() def _run_hourly_cleanup(self): """Nettoyage périodique""" print("Nettoyage horaire...") from scripts.daily_gains import cleanup_old_notifications cleanup_old_notifications() print("Nettoyage terminé") def _run_weekly_report(self): """Rapport hebdomadaire""" print("Génération du rapport hebdomadaire...") print("TODO: Implémenter le rapport hebdomadaire") def check_missed_tasks(self): """Vérifie et exécute les tâches manquées depuis le dernier démarrage""" with self.app.app_context(): now = datetime.now(timezone.utc) missed_tasks = ScheduledTask.query.filter( ScheduledTask.is_active == True, db.or_( ScheduledTask.last_run_at.is_(None), ScheduledTask.last_run_at < now - timedelta(hours=25), ), ).all() if missed_tasks: print(f"\n⚠️ {len(missed_tasks)} tâche(s) manquée(s) détectée(s)!") for task in missed_tasks: last_run = ( task.last_run_at.strftime("%Y-%m-%d %H:%M") if task.last_run_at else "JAMAIS" ) print(f" - {task.name} (dernier run: {last_run})") self.execute_task(task) else: print(f"\n✓ Aucune tâche manquée") def initialize_default_tasks(self): """Crée les tâches par défaut si elles n'existent pas""" with self.app.app_context(): default_tasks = [ { "name": "daily_gains", "description": "Calcule les gains quotidiens, traite les retraits et commissions", "schedule_type": "daily", "schedule_time": time(0, 5, 0), "is_active": True, }, { "name": "hourly_cleanup", "description": "Nettoyage des notifications et logs anciens", "schedule_type": "hourly", "is_active": True, }, ] for task_data in default_tasks: existing = ScheduledTask.query.filter_by(name=task_data["name"]).first() if not existing: task = ScheduledTask(**task_data) task.calculate_next_run() db.session.add(task) print(f"✓ Tâche créée: {task_data['name']}") db.session.commit() def print_status(self): """Affiche le statut de toutes les tâches""" with self.app.app_context(): tasks = ScheduledTask.query.all() print("\n" + "=" * 70) print("STATUT DES TÂCHES PLANIFIÉES") print("=" * 70) for task in tasks: status_icon = ( "✓" if task.last_status == "success" else "✗" if task.last_status == "failed" else "○" ) next_run = ( task.next_run_at.strftime("%d/%m %H:%M") if task.next_run_at else "Non calculé" ) last_run = ( task.last_run_at.strftime("%d/%m %H:%M") if task.last_run_at else "Jamais" ) print( f"{status_icon} {task.name:20} | Prochain: {next_run:12} | Dernier: {last_run:12} | Runs: {task.run_count}" ) print("=" * 70) def run_scheduler_daemon(): """Mode daemon - boucle infinie qui vérifie chaque minute""" runner = TaskRunner() print("=" * 70) print("SCHEDULER DB-BASED - MODE DAEMON") print(f"Démarré à: {datetime.now(timezone.utc)}") print(f"Hostname: {runner.hostname} | PID: {runner.pid}") print("=" * 70) runner.initialize_default_tasks() runner.check_missed_tasks() runner.print_status() print("\nEn attente des tâches... (Ctrl+C pour arrêter)\n") try: while True: runner.run_pending_tasks() import time time.sleep(60) except KeyboardInterrupt: print("\n\nArrêt du scheduler.") print(f"Terminé à: {datetime.now(timezone.utc)}") def run_scheduler_once(): """Mode one-shot - une seule exécution puis sortie""" runner = TaskRunner() print("=" * 70) print("SCHEDULER DB-BASED - MODE ONE-SHOT") print(f"Exécution à: {datetime.now(timezone.utc)}") print("=" * 70) runner.initialize_default_tasks() runner.check_missed_tasks() runner.run_pending_tasks() runner.print_status() print("\nExécution terminée.") def check_and_run_missed_on_startup(): """Vérifie les tâches manquées au démarrage de l'app (non bloquant)""" try: runner = TaskRunner() runner.initialize_default_tasks() runner.check_missed_tasks() except Exception as e: print(f"[Scheduler] Erreur lors de la vérification des tâches: {e}") if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "--daemon": run_scheduler_daemon() else: run_scheduler_once()