Novelt / task_manager.py
Ruhivig65's picture
Upload 6 files
6cc0372 verified
"""
task_manager.py
===============
Background Task Manager for Massive File Translation
RESPONSIBILITIES:
-----------------
1. Manages the lifecycle of translation tasks (create, run, track, cleanup)
2. Runs translation in a background thread (non-blocking to FastAPI)
3. Maintains a registry of all tasks with their progress
4. Handles file path management (uploads, outputs)
5. Supports single-task mode (one translation at a time on HuggingFace free tier)
WHY SINGLE TASK MODE:
---------------------
On HuggingFace Spaces (free tier):
- Shared IP = easier to get rate-limited by Google
- Limited CPU/RAM compared to dedicated server
- Running 2+ massive translations simultaneously would guarantee IP ban
- So we queue: one active translation, others wait
ARCHITECTURE:
-------------
FastAPI Request β†’ TaskManager.create_task()
↓
Background Thread starts
↓
MassiveFileTranslator.translate_file()
↓
Progress updated in real-time
↓
Task marked "completed"
↓
FastAPI serves download link
"""
import os
import uuid
import time
import shutil
import threading
import logging
from typing import Optional, Dict
from dataclasses import dataclass, field
from enum import Enum
from translator_engine import (
MassiveFileTranslator,
TranslatorConfig,
TranslationProgress,
)
logger = logging.getLogger("task_manager")
# ============================================================================
# DIRECTORY SETUP
# ============================================================================
# Base directories β€” created at module load time
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
UPLOAD_DIR = os.path.join(BASE_DIR, "uploads")
OUTPUT_DIR = os.path.join(BASE_DIR, "outputs")
# Ensure directories exist
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# ============================================================================
# TASK STATUS ENUM
# ============================================================================
class TaskStatus(str, Enum):
QUEUED = "queued"
PREPARING = "preparing"
TRANSLATING = "translating"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
# ============================================================================
# SINGLE TASK REPRESENTATION
# ============================================================================
@dataclass
class TranslationTask:
"""
Represents one translation job.
Lifecycle:
QUEUED β†’ PREPARING β†’ TRANSLATING β†’ COMPLETED
β†’ FAILED
β†’ CANCELLED
"""
task_id: str
original_filename: str
input_path: str
output_path: str
status: TaskStatus = TaskStatus.QUEUED
created_at: float = field(default_factory=time.time)
completed_at: float = 0.0
error_message: str = ""
progress: TranslationProgress = field(default_factory=TranslationProgress)
translator: Optional[MassiveFileTranslator] = field(default=None, repr=False)
thread: Optional[threading.Thread] = field(default=None, repr=False)
def to_dict(self) -> dict:
"""Serialize task info for API response."""
return {
"task_id": self.task_id,
"original_filename": self.original_filename,
"status": self.status.value,
"created_at": self.created_at,
"completed_at": self.completed_at,
"error_message": self.error_message,
"progress": self.progress.to_dict(),
"output_filename": os.path.basename(self.output_path),
}
# ============================================================================
# TASK MANAGER β€” Singleton that manages all translation tasks
# ============================================================================
class TaskManager:
"""
Central manager for all translation tasks.
THREAD SAFETY:
- Uses a lock for task registry modifications
- Each task runs in its own background thread
- Progress objects are internally thread-safe (have their own locks)
SINGLE TASK ENFORCEMENT:
- Only one translation can be ACTIVE at a time
- If a new upload comes while one is running, it returns an error
- This prevents Google IP bans from concurrent heavy usage
FILE CLEANUP:
- Old completed tasks' files are cleaned up after configurable time
- Prevents disk space exhaustion on HuggingFace (limited storage)
"""
def __init__(self, config: Optional[TranslatorConfig] = None):
self.config = config or TranslatorConfig()
self._tasks: Dict[str, TranslationTask] = {}
self._lock = threading.Lock()
self._active_task_id: Optional[str] = None
# Auto-cleanup interval (seconds) β€” remove completed tasks after 1 hour
self.cleanup_after_seconds = 3600
logger.info("TaskManager initialized.")
logger.info(f"Upload directory: {UPLOAD_DIR}")
logger.info(f"Output directory: {OUTPUT_DIR}")
# -----------------------------------------------------------------------
# PUBLIC API
# -----------------------------------------------------------------------
def is_busy(self) -> bool:
"""Check if a translation is currently running."""
with self._lock:
if self._active_task_id is None:
return False
task = self._tasks.get(self._active_task_id)
if task is None:
self._active_task_id = None
return False
# If active task is done/failed/cancelled, we're not busy
if task.status in (
TaskStatus.COMPLETED,
TaskStatus.FAILED,
TaskStatus.CANCELLED,
):
self._active_task_id = None
return False
return True
def create_task(self, original_filename: str, input_path: str) -> TranslationTask:
"""
Create a new translation task.
Args:
original_filename: The user's original file name (for display)
input_path: Path to the uploaded file on disk
Returns:
TranslationTask object
Raises:
RuntimeError if another translation is already running
"""
with self._lock:
# --- Single task enforcement ---
if self.is_busy():
raise RuntimeError(
"Another translation is currently in progress. "
"Please wait for it to complete before uploading a new file."
)
# Generate unique task ID
task_id = str(uuid.uuid4())[:12]
# Create output file path
# Original: "my_novel.txt" β†’ Output: "my_novel_hindi_a1b2c3d4.txt"
name_without_ext = os.path.splitext(original_filename)[0]
output_filename = f"{name_without_ext}_hindi_{task_id}.txt"
output_path = os.path.join(OUTPUT_DIR, output_filename)
# Create progress tracker
progress = TranslationProgress()
progress.file_name = original_filename
# Create translator instance
translator = MassiveFileTranslator(
config=self.config,
progress=progress,
)
# Create task
task = TranslationTask(
task_id=task_id,
original_filename=original_filename,
input_path=input_path,
output_path=output_path,
progress=progress,
translator=translator,
)
# Register task
self._tasks[task_id] = task
self._active_task_id = task_id
logger.info(
f"Task created: {task_id} | File: {original_filename} | "
f"Input: {input_path} | Output: {output_path}"
)
return task
def start_task(self, task_id: str):
"""
Start the translation task in a background thread.
The thread runs the translator and updates progress in real-time.
FastAPI can poll progress via get_task_progress().
"""
with self._lock:
task = self._tasks.get(task_id)
if task is None:
raise ValueError(f"Task not found: {task_id}")
if task.status != TaskStatus.QUEUED:
raise RuntimeError(f"Task {task_id} is not in QUEUED state")
# Create and start background thread
thread = threading.Thread(
target=self._run_translation,
args=(task,),
name=f"translator-{task_id}",
daemon=True, # Thread dies if main process exits
)
task.thread = thread
thread.start()
logger.info(f"Task {task_id} started in background thread.")
def get_task(self, task_id: str) -> Optional[TranslationTask]:
"""Get a task by ID."""
with self._lock:
return self._tasks.get(task_id)
def get_task_progress(self, task_id: str) -> Optional[dict]:
"""Get task progress as a dictionary (for API response)."""
task = self.get_task(task_id)
if task is None:
return None
return task.to_dict()
def get_latest_task(self) -> Optional[TranslationTask]:
"""Get the most recent task (for simple single-task UI)."""
with self._lock:
if not self._tasks:
return None
# Return the task with the latest created_at timestamp
return max(self._tasks.values(), key=lambda t: t.created_at)
def get_latest_task_progress(self) -> Optional[dict]:
"""Get progress of the most recent task."""
task = self.get_latest_task()
if task is None:
return None
return task.to_dict()
def cancel_task(self, task_id: str) -> bool:
"""
Cancel a running translation task.
Sets a cancel flag that the translator checks periodically.
The translator will stop after completing its current paragraph.
Already-translated content is preserved on disk.
"""
task = self.get_task(task_id)
if task is None:
return False
if task.status not in (TaskStatus.QUEUED, TaskStatus.PREPARING, TaskStatus.TRANSLATING):
return False # Can't cancel a completed/failed task
task.translator.cancel()
task.status = TaskStatus.CANCELLED
task.completed_at = time.time()
with self._lock:
if self._active_task_id == task_id:
self._active_task_id = None
logger.info(f"Task {task_id} cancelled.")
return True
def cleanup_old_tasks(self):
"""
Remove completed/failed tasks older than cleanup_after_seconds.
Also deletes their files from disk to free space.
Called periodically (e.g., before each new upload).
"""
now = time.time()
to_remove = []
with self._lock:
for task_id, task in self._tasks.items():
if task.status in (
TaskStatus.COMPLETED,
TaskStatus.FAILED,
TaskStatus.CANCELLED,
):
age = now - task.completed_at if task.completed_at > 0 else now - task.created_at
if age > self.cleanup_after_seconds:
to_remove.append(task_id)
for task_id in to_remove:
self._remove_task_files(task_id)
with self._lock:
del self._tasks[task_id]
logger.info(f"Cleaned up old task: {task_id}")
def get_output_path(self, task_id: str) -> Optional[str]:
"""Get the output file path for a completed task."""
task = self.get_task(task_id)
if task is None:
return None
if task.status != TaskStatus.COMPLETED:
return None
if not os.path.exists(task.output_path):
return None
return task.output_path
# -----------------------------------------------------------------------
# PRIVATE METHODS
# -----------------------------------------------------------------------
def _run_translation(self, task: TranslationTask):
"""
The actual translation runner β€” executes in a background thread.
ERROR HANDLING:
- Any exception is caught and stored in the task
- Task status is set to FAILED
- The active_task_id is cleared so new tasks can be submitted
- Already-written content is preserved on disk
"""
try:
task.status = TaskStatus.PREPARING
task.progress.set_status("preparing", "Starting translation...")
logger.info(
f"Task {task.task_id}: Starting translation of "
f"'{task.original_filename}'"
)
# Run the translation (this blocks until complete)
task.translator.translate_file(task.input_path, task.output_path)
# Check if it was cancelled during translation
if task.translator._cancel_flag.is_set():
task.status = TaskStatus.CANCELLED
task.completed_at = time.time()
logger.info(f"Task {task.task_id}: Cancelled during translation.")
else:
task.status = TaskStatus.COMPLETED
task.completed_at = time.time()
logger.info(
f"Task {task.task_id}: Translation COMPLETED. "
f"Output: {task.output_path}"
)
except Exception as e:
task.status = TaskStatus.FAILED
task.error_message = str(e)
task.completed_at = time.time()
task.progress.set_status("failed", f"Error: {str(e)}")
logger.exception(f"Task {task.task_id}: FAILED with error: {e}")
finally:
# Clear active task so new uploads are accepted
with self._lock:
if self._active_task_id == task.task_id:
self._active_task_id = None
def _remove_task_files(self, task_id: str):
"""Safely delete task files from disk."""
task = self._tasks.get(task_id)
if task is None:
return
for path in [task.input_path, task.output_path]:
try:
if path and os.path.exists(path):
os.remove(path)
logger.debug(f"Deleted file: {path}")
except OSError as e:
logger.warning(f"Could not delete {path}: {e}")
# ============================================================================
# GLOBAL SINGLETON β€” Used by FastAPI app
# ============================================================================
# Create a single TaskManager instance shared across the entire application
# This is safe because TaskManager is thread-safe internally
_global_task_manager: Optional[TaskManager] = None
def get_task_manager() -> TaskManager:
"""Get or create the global TaskManager singleton."""
global _global_task_manager
if _global_task_manager is None:
config = TranslatorConfig()
_global_task_manager = TaskManager(config=config)
return _global_task_manager