Spaces:
No application file
No application file
| import threading | |
| import queue | |
| import time | |
| import logging | |
| from typing import Optional | |
| from datetime import datetime | |
| class TrainingQueue: | |
| """Simple in-memory training queue for LoRA tasks""" | |
| def __init__(self): | |
| self.task_queue = queue.Queue() | |
| self.current_task: Optional[int] = None | |
| self.worker_thread: Optional[threading.Thread] = None | |
| self.is_running = False | |
| self.logger = logging.getLogger(__name__) | |
| # Start worker thread | |
| self.start_worker() | |
| def start_worker(self): | |
| """Start the worker thread""" | |
| if self.worker_thread is None or not self.worker_thread.is_alive(): | |
| self.is_running = True | |
| self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True) | |
| self.worker_thread.start() | |
| self.logger.info("Training queue worker started") | |
| def stop_worker(self): | |
| """Stop the worker thread""" | |
| self.is_running = False | |
| if self.worker_thread and self.worker_thread.is_alive(): | |
| self.worker_thread.join(timeout=5) | |
| self.logger.info("Training queue worker stopped") | |
| def add_task(self, project_id: int): | |
| """Add a training task to the queue""" | |
| self.task_queue.put(project_id) | |
| self.logger.info(f"Added project {project_id} to training queue") | |
| def get_queue_status(self): | |
| """Get current queue status""" | |
| return { | |
| 'queue_size': self.task_queue.qsize(), | |
| 'current_task': self.current_task, | |
| 'is_running': self.is_running | |
| } | |
| def _worker_loop(self): | |
| """Main worker loop that processes training tasks""" | |
| from src.services.lora_trainer import LoRATrainer | |
| trainer = LoRATrainer() | |
| while self.is_running: | |
| try: | |
| # Get next task from queue (with timeout to allow checking is_running) | |
| try: | |
| project_id = self.task_queue.get(timeout=1) | |
| except queue.Empty: | |
| continue | |
| self.current_task = project_id | |
| self.logger.info(f"Starting training for project {project_id}") | |
| # Process the training task | |
| try: | |
| trainer.train_project(project_id) | |
| self.logger.info(f"Completed training for project {project_id}") | |
| except Exception as e: | |
| self.logger.error(f"Training failed for project {project_id}: {str(e)}") | |
| finally: | |
| self.current_task = None | |
| self.task_queue.task_done() | |
| except Exception as e: | |
| self.logger.error(f"Error in worker loop: {str(e)}") | |
| time.sleep(1) # Prevent tight loop on persistent errors | |