jhjv / app /task_runner.py
Docfile's picture
Upload 76 files
d324dde verified
"""
Database-based Task Scheduler for Apex Ores
Remplace les cron jobs traditionnels par un système robuste basé sur la DB:
- Rattrapage automatique des tâches manquées après redémarrage
- Historique complet des exécutions
- Interface admin pour monitoring et contrôle
- Tolérance aux pannes serveur
Usage:
# Mode daemon (boucle infinie)
python -m app.task_runner --daemon
# Mode one-shot (pour cron ou test)
python -m app.task_runner
# Via Flask CLI
flask run-scheduler
"""
import sys
import os
import socket
import traceback
from datetime import datetime, timedelta, timezone, date, time
from io import StringIO
from contextlib import redirect_stdout
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app import create_app, db
from app.models import ScheduledTask, TaskExecutionLog
class TaskRunner:
def __init__(self):
self.app = create_app()
self.hostname = socket.gethostname()
self.pid = os.getpid()
def run_pending_tasks(self):
"""Exécute toutes les tâches qui doivent tourner"""
with self.app.app_context():
now = datetime.now(timezone.utc)
pending_tasks = ScheduledTask.query.filter(
ScheduledTask.is_active == True,
db.or_(
ScheduledTask.next_run_at == None, ScheduledTask.next_run_at <= now
),
).all()
print(f"[{now}] {len(pending_tasks)} tâche(s) à exécuter")
for task in pending_tasks:
self.execute_task(task)
def execute_task(self, task):
"""Exécute une tâche spécifique avec logging complet"""
print(f"\n[{datetime.now(timezone.utc)}] Exécution de: {task.name}")
with self.app.app_context():
execution_log = TaskExecutionLog(
task_id=task.id,
task_name=task.name,
server_hostname=self.hostname,
process_id=self.pid,
)
db.session.add(execution_log)
task.mark_running()
execution_log.status = "running"
db.session.commit()
try:
output = self.run_task_function(task.name)
execution_log.status = "success"
execution_log.completed_at = datetime.now(timezone.utc)
execution_log.output_log = output
task.mark_success()
print(f" ✓ {task.name} terminé avec succès")
except Exception as e:
error_msg = f"{str(e)}\n{traceback.format_exc()}"
execution_log.status = "failed"
execution_log.completed_at = datetime.now(timezone.utc)
execution_log.error_message = error_msg[:2000]
task.mark_failed(str(e))
print(f" ✗ {task.name} a échoué: {str(e)}")
db.session.commit()
def run_task_function(self, task_name):
"""Exécute la fonction associée au nom de la tâche"""
f = StringIO()
with redirect_stdout(f):
if task_name == "daily_gains":
self._run_daily_gains()
elif task_name == "hourly_cleanup":
self._run_hourly_cleanup()
elif task_name == "weekly_report":
self._run_weekly_report()
else:
raise ValueError(f"Tâche inconnue: {task_name}")
return f.getvalue()
def _run_daily_gains(self):
"""Calcule les gains quotidiens et traite les commissions"""
from scripts.daily_gains import (
calculate_daily_gains,
auto_process_withdrawals,
cleanup_old_notifications,
print_platform_summary,
)
calculate_daily_gains()
auto_process_withdrawals()
cleanup_old_notifications()
print_platform_summary()
def _run_hourly_cleanup(self):
"""Nettoyage périodique"""
print("Nettoyage horaire...")
from scripts.daily_gains import cleanup_old_notifications
cleanup_old_notifications()
print("Nettoyage terminé")
def _run_weekly_report(self):
"""Rapport hebdomadaire"""
print("Génération du rapport hebdomadaire...")
print("TODO: Implémenter le rapport hebdomadaire")
def check_missed_tasks(self):
"""Vérifie et exécute les tâches manquées depuis le dernier démarrage"""
with self.app.app_context():
now = datetime.now(timezone.utc)
missed_tasks = ScheduledTask.query.filter(
ScheduledTask.is_active == True,
db.or_(
ScheduledTask.last_run_at.is_(None),
ScheduledTask.last_run_at < now - timedelta(hours=25),
),
).all()
if missed_tasks:
print(f"\n⚠️ {len(missed_tasks)} tâche(s) manquée(s) détectée(s)!")
for task in missed_tasks:
last_run = (
task.last_run_at.strftime("%Y-%m-%d %H:%M")
if task.last_run_at
else "JAMAIS"
)
print(f" - {task.name} (dernier run: {last_run})")
self.execute_task(task)
else:
print(f"\n✓ Aucune tâche manquée")
def initialize_default_tasks(self):
"""Crée les tâches par défaut si elles n'existent pas"""
with self.app.app_context():
default_tasks = [
{
"name": "daily_gains",
"description": "Calcule les gains quotidiens, traite les retraits et commissions",
"schedule_type": "daily",
"schedule_time": time(0, 5, 0),
"is_active": True,
},
{
"name": "hourly_cleanup",
"description": "Nettoyage des notifications et logs anciens",
"schedule_type": "hourly",
"is_active": True,
},
]
for task_data in default_tasks:
existing = ScheduledTask.query.filter_by(name=task_data["name"]).first()
if not existing:
task = ScheduledTask(**task_data)
task.calculate_next_run()
db.session.add(task)
print(f"✓ Tâche créée: {task_data['name']}")
db.session.commit()
def print_status(self):
"""Affiche le statut de toutes les tâches"""
with self.app.app_context():
tasks = ScheduledTask.query.all()
print("\n" + "=" * 70)
print("STATUT DES TÂCHES PLANIFIÉES")
print("=" * 70)
for task in tasks:
status_icon = (
"✓"
if task.last_status == "success"
else "✗"
if task.last_status == "failed"
else "○"
)
next_run = (
task.next_run_at.strftime("%d/%m %H:%M")
if task.next_run_at
else "Non calculé"
)
last_run = (
task.last_run_at.strftime("%d/%m %H:%M")
if task.last_run_at
else "Jamais"
)
print(
f"{status_icon} {task.name:20} | Prochain: {next_run:12} | Dernier: {last_run:12} | Runs: {task.run_count}"
)
print("=" * 70)
def run_scheduler_daemon():
"""Mode daemon - boucle infinie qui vérifie chaque minute"""
runner = TaskRunner()
print("=" * 70)
print("SCHEDULER DB-BASED - MODE DAEMON")
print(f"Démarré à: {datetime.now(timezone.utc)}")
print(f"Hostname: {runner.hostname} | PID: {runner.pid}")
print("=" * 70)
runner.initialize_default_tasks()
runner.check_missed_tasks()
runner.print_status()
print("\nEn attente des tâches... (Ctrl+C pour arrêter)\n")
try:
while True:
runner.run_pending_tasks()
import time
time.sleep(60)
except KeyboardInterrupt:
print("\n\nArrêt du scheduler.")
print(f"Terminé à: {datetime.now(timezone.utc)}")
def run_scheduler_once():
"""Mode one-shot - une seule exécution puis sortie"""
runner = TaskRunner()
print("=" * 70)
print("SCHEDULER DB-BASED - MODE ONE-SHOT")
print(f"Exécution à: {datetime.now(timezone.utc)}")
print("=" * 70)
runner.initialize_default_tasks()
runner.check_missed_tasks()
runner.run_pending_tasks()
runner.print_status()
print("\nExécution terminée.")
def check_and_run_missed_on_startup():
"""Vérifie les tâches manquées au démarrage de l'app (non bloquant)"""
try:
runner = TaskRunner()
runner.initialize_default_tasks()
runner.check_missed_tasks()
except Exception as e:
print(f"[Scheduler] Erreur lors de la vérification des tâches: {e}")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--daemon":
run_scheduler_daemon()
else:
run_scheduler_once()