| """ |
| task_manager.py |
| =============== |
| Background Task Manager for Massive File Translation |
| |
| RESPONSIBILITIES: |
| ----------------- |
| 1. Manages the lifecycle of translation tasks (create, run, track, cleanup) |
| 2. Runs translation in a background thread (non-blocking to FastAPI) |
| 3. Maintains a registry of all tasks with their progress |
| 4. Handles file path management (uploads, outputs) |
| 5. Supports single-task mode (one translation at a time on HuggingFace free tier) |
| |
| WHY SINGLE TASK MODE: |
| --------------------- |
| On HuggingFace Spaces (free tier): |
| - Shared IP = easier to get rate-limited by Google |
| - Limited CPU/RAM compared to dedicated server |
| - Running 2+ massive translations simultaneously would guarantee IP ban |
| - So we queue: one active translation, others wait |
| |
| ARCHITECTURE: |
| ------------- |
| FastAPI Request β TaskManager.create_task() |
| β |
| Background Thread starts |
| β |
| MassiveFileTranslator.translate_file() |
| β |
| Progress updated in real-time |
| β |
| Task marked "completed" |
| β |
| FastAPI serves download link |
| """ |
|
|
| import os |
| import uuid |
| import time |
| import shutil |
| import threading |
| import logging |
| from typing import Optional, Dict |
| from dataclasses import dataclass, field |
| from enum import Enum |
|
|
| from translator_engine import ( |
| MassiveFileTranslator, |
| TranslatorConfig, |
| TranslationProgress, |
| ) |
|
|
| logger = logging.getLogger("task_manager") |
|
|
|
|
| |
| |
| |
| |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") |
| OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") |
|
|
| |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
|
| |
| |
| |
| class TaskStatus(str, Enum): |
| QUEUED = "queued" |
| PREPARING = "preparing" |
| TRANSLATING = "translating" |
| COMPLETED = "completed" |
| FAILED = "failed" |
| CANCELLED = "cancelled" |
|
|
|
|
| |
| |
| |
| @dataclass |
| class TranslationTask: |
| """ |
| Represents one translation job. |
| |
| Lifecycle: |
| QUEUED β PREPARING β TRANSLATING β COMPLETED |
| β FAILED |
| β CANCELLED |
| """ |
| task_id: str |
| original_filename: str |
| input_path: str |
| output_path: str |
| status: TaskStatus = TaskStatus.QUEUED |
| created_at: float = field(default_factory=time.time) |
| completed_at: float = 0.0 |
| error_message: str = "" |
| progress: TranslationProgress = field(default_factory=TranslationProgress) |
| translator: Optional[MassiveFileTranslator] = field(default=None, repr=False) |
| thread: Optional[threading.Thread] = field(default=None, repr=False) |
|
|
| def to_dict(self) -> dict: |
| """Serialize task info for API response.""" |
| return { |
| "task_id": self.task_id, |
| "original_filename": self.original_filename, |
| "status": self.status.value, |
| "created_at": self.created_at, |
| "completed_at": self.completed_at, |
| "error_message": self.error_message, |
| "progress": self.progress.to_dict(), |
| "output_filename": os.path.basename(self.output_path), |
| } |
|
|
|
|
| |
| |
| |
| class TaskManager: |
| """ |
| Central manager for all translation tasks. |
| |
| THREAD SAFETY: |
| - Uses a lock for task registry modifications |
| - Each task runs in its own background thread |
| - Progress objects are internally thread-safe (have their own locks) |
| |
| SINGLE TASK ENFORCEMENT: |
| - Only one translation can be ACTIVE at a time |
| - If a new upload comes while one is running, it returns an error |
| - This prevents Google IP bans from concurrent heavy usage |
| |
| FILE CLEANUP: |
| - Old completed tasks' files are cleaned up after configurable time |
| - Prevents disk space exhaustion on HuggingFace (limited storage) |
| """ |
|
|
| def __init__(self, config: Optional[TranslatorConfig] = None): |
| self.config = config or TranslatorConfig() |
| self._tasks: Dict[str, TranslationTask] = {} |
| self._lock = threading.Lock() |
| self._active_task_id: Optional[str] = None |
|
|
| |
| self.cleanup_after_seconds = 3600 |
|
|
| logger.info("TaskManager initialized.") |
| logger.info(f"Upload directory: {UPLOAD_DIR}") |
| logger.info(f"Output directory: {OUTPUT_DIR}") |
|
|
| |
| |
| |
|
|
| def is_busy(self) -> bool: |
| """Check if a translation is currently running.""" |
| with self._lock: |
| if self._active_task_id is None: |
| return False |
| task = self._tasks.get(self._active_task_id) |
| if task is None: |
| self._active_task_id = None |
| return False |
| |
| if task.status in ( |
| TaskStatus.COMPLETED, |
| TaskStatus.FAILED, |
| TaskStatus.CANCELLED, |
| ): |
| self._active_task_id = None |
| return False |
| return True |
|
|
| def create_task(self, original_filename: str, input_path: str) -> TranslationTask: |
| """ |
| Create a new translation task. |
| |
| Args: |
| original_filename: The user's original file name (for display) |
| input_path: Path to the uploaded file on disk |
| |
| Returns: |
| TranslationTask object |
| |
| Raises: |
| RuntimeError if another translation is already running |
| """ |
| with self._lock: |
| |
| if self.is_busy(): |
| raise RuntimeError( |
| "Another translation is currently in progress. " |
| "Please wait for it to complete before uploading a new file." |
| ) |
|
|
| |
| task_id = str(uuid.uuid4())[:12] |
|
|
| |
| |
| name_without_ext = os.path.splitext(original_filename)[0] |
| output_filename = f"{name_without_ext}_hindi_{task_id}.txt" |
| output_path = os.path.join(OUTPUT_DIR, output_filename) |
|
|
| |
| progress = TranslationProgress() |
| progress.file_name = original_filename |
|
|
| |
| translator = MassiveFileTranslator( |
| config=self.config, |
| progress=progress, |
| ) |
|
|
| |
| task = TranslationTask( |
| task_id=task_id, |
| original_filename=original_filename, |
| input_path=input_path, |
| output_path=output_path, |
| progress=progress, |
| translator=translator, |
| ) |
|
|
| |
| self._tasks[task_id] = task |
| self._active_task_id = task_id |
|
|
| logger.info( |
| f"Task created: {task_id} | File: {original_filename} | " |
| f"Input: {input_path} | Output: {output_path}" |
| ) |
|
|
| return task |
|
|
| def start_task(self, task_id: str): |
| """ |
| Start the translation task in a background thread. |
| |
| The thread runs the translator and updates progress in real-time. |
| FastAPI can poll progress via get_task_progress(). |
| """ |
| with self._lock: |
| task = self._tasks.get(task_id) |
| if task is None: |
| raise ValueError(f"Task not found: {task_id}") |
| if task.status != TaskStatus.QUEUED: |
| raise RuntimeError(f"Task {task_id} is not in QUEUED state") |
|
|
| |
| thread = threading.Thread( |
| target=self._run_translation, |
| args=(task,), |
| name=f"translator-{task_id}", |
| daemon=True, |
| ) |
| task.thread = thread |
| thread.start() |
|
|
| logger.info(f"Task {task_id} started in background thread.") |
|
|
| def get_task(self, task_id: str) -> Optional[TranslationTask]: |
| """Get a task by ID.""" |
| with self._lock: |
| return self._tasks.get(task_id) |
|
|
| def get_task_progress(self, task_id: str) -> Optional[dict]: |
| """Get task progress as a dictionary (for API response).""" |
| task = self.get_task(task_id) |
| if task is None: |
| return None |
| return task.to_dict() |
|
|
| def get_latest_task(self) -> Optional[TranslationTask]: |
| """Get the most recent task (for simple single-task UI).""" |
| with self._lock: |
| if not self._tasks: |
| return None |
| |
| return max(self._tasks.values(), key=lambda t: t.created_at) |
|
|
| def get_latest_task_progress(self) -> Optional[dict]: |
| """Get progress of the most recent task.""" |
| task = self.get_latest_task() |
| if task is None: |
| return None |
| return task.to_dict() |
|
|
| def cancel_task(self, task_id: str) -> bool: |
| """ |
| Cancel a running translation task. |
| |
| Sets a cancel flag that the translator checks periodically. |
| The translator will stop after completing its current paragraph. |
| Already-translated content is preserved on disk. |
| """ |
| task = self.get_task(task_id) |
| if task is None: |
| return False |
|
|
| if task.status not in (TaskStatus.QUEUED, TaskStatus.PREPARING, TaskStatus.TRANSLATING): |
| return False |
|
|
| task.translator.cancel() |
| task.status = TaskStatus.CANCELLED |
| task.completed_at = time.time() |
|
|
| with self._lock: |
| if self._active_task_id == task_id: |
| self._active_task_id = None |
|
|
| logger.info(f"Task {task_id} cancelled.") |
| return True |
|
|
| def cleanup_old_tasks(self): |
| """ |
| Remove completed/failed tasks older than cleanup_after_seconds. |
| Also deletes their files from disk to free space. |
| |
| Called periodically (e.g., before each new upload). |
| """ |
| now = time.time() |
| to_remove = [] |
|
|
| with self._lock: |
| for task_id, task in self._tasks.items(): |
| if task.status in ( |
| TaskStatus.COMPLETED, |
| TaskStatus.FAILED, |
| TaskStatus.CANCELLED, |
| ): |
| age = now - task.completed_at if task.completed_at > 0 else now - task.created_at |
| if age > self.cleanup_after_seconds: |
| to_remove.append(task_id) |
|
|
| for task_id in to_remove: |
| self._remove_task_files(task_id) |
| with self._lock: |
| del self._tasks[task_id] |
| logger.info(f"Cleaned up old task: {task_id}") |
|
|
| def get_output_path(self, task_id: str) -> Optional[str]: |
| """Get the output file path for a completed task.""" |
| task = self.get_task(task_id) |
| if task is None: |
| return None |
| if task.status != TaskStatus.COMPLETED: |
| return None |
| if not os.path.exists(task.output_path): |
| return None |
| return task.output_path |
|
|
| |
| |
| |
|
|
| def _run_translation(self, task: TranslationTask): |
| """ |
| The actual translation runner β executes in a background thread. |
| |
| ERROR HANDLING: |
| - Any exception is caught and stored in the task |
| - Task status is set to FAILED |
| - The active_task_id is cleared so new tasks can be submitted |
| - Already-written content is preserved on disk |
| """ |
| try: |
| task.status = TaskStatus.PREPARING |
| task.progress.set_status("preparing", "Starting translation...") |
|
|
| logger.info( |
| f"Task {task.task_id}: Starting translation of " |
| f"'{task.original_filename}'" |
| ) |
|
|
| |
| task.translator.translate_file(task.input_path, task.output_path) |
|
|
| |
| if task.translator._cancel_flag.is_set(): |
| task.status = TaskStatus.CANCELLED |
| task.completed_at = time.time() |
| logger.info(f"Task {task.task_id}: Cancelled during translation.") |
| else: |
| task.status = TaskStatus.COMPLETED |
| task.completed_at = time.time() |
| logger.info( |
| f"Task {task.task_id}: Translation COMPLETED. " |
| f"Output: {task.output_path}" |
| ) |
|
|
| except Exception as e: |
| task.status = TaskStatus.FAILED |
| task.error_message = str(e) |
| task.completed_at = time.time() |
| task.progress.set_status("failed", f"Error: {str(e)}") |
| logger.exception(f"Task {task.task_id}: FAILED with error: {e}") |
|
|
| finally: |
| |
| with self._lock: |
| if self._active_task_id == task.task_id: |
| self._active_task_id = None |
|
|
| def _remove_task_files(self, task_id: str): |
| """Safely delete task files from disk.""" |
| task = self._tasks.get(task_id) |
| if task is None: |
| return |
|
|
| for path in [task.input_path, task.output_path]: |
| try: |
| if path and os.path.exists(path): |
| os.remove(path) |
| logger.debug(f"Deleted file: {path}") |
| except OSError as e: |
| logger.warning(f"Could not delete {path}: {e}") |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| _global_task_manager: Optional[TaskManager] = None |
|
|
|
|
| def get_task_manager() -> TaskManager: |
| """Get or create the global TaskManager singleton.""" |
| global _global_task_manager |
| if _global_task_manager is None: |
| config = TranslatorConfig() |
| _global_task_manager = TaskManager(config=config) |
| return _global_task_manager |