File size: 15,759 Bytes
6cc0372
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
"""
task_manager.py
===============
Background Task Manager for Massive File Translation

RESPONSIBILITIES:
-----------------
1. Manages the lifecycle of translation tasks (create, run, track, cleanup)
2. Runs translation in a background thread (non-blocking to FastAPI)
3. Maintains a registry of all tasks with their progress
4. Handles file path management (uploads, outputs)
5. Supports single-task mode (one translation at a time on HuggingFace free tier)

WHY SINGLE TASK MODE:
---------------------
On HuggingFace Spaces (free tier):
- Shared IP = easier to get rate-limited by Google
- Limited CPU/RAM compared to dedicated server
- Running 2+ massive translations simultaneously would guarantee IP ban
- So we queue: one active translation, others wait

ARCHITECTURE:
-------------
    FastAPI Request β†’ TaskManager.create_task()
                          ↓
                    Background Thread starts
                          ↓
                    MassiveFileTranslator.translate_file()
                          ↓
                    Progress updated in real-time
                          ↓
                    Task marked "completed"
                          ↓
                    FastAPI serves download link
"""

import os
import uuid
import time
import shutil
import threading
import logging
from typing import Optional, Dict
from dataclasses import dataclass, field
from enum import Enum

from translator_engine import (
    MassiveFileTranslator,
    TranslatorConfig,
    TranslationProgress,
)

logger = logging.getLogger("task_manager")


# ============================================================================
# DIRECTORY SETUP
# ============================================================================
# Base directories β€” created at module load time
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
UPLOAD_DIR = os.path.join(BASE_DIR, "uploads")
OUTPUT_DIR = os.path.join(BASE_DIR, "outputs")

# Ensure directories exist
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)


# ============================================================================
# TASK STATUS ENUM
# ============================================================================
class TaskStatus(str, Enum):
    QUEUED = "queued"
    PREPARING = "preparing"
    TRANSLATING = "translating"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"


# ============================================================================
# SINGLE TASK REPRESENTATION
# ============================================================================
@dataclass
class TranslationTask:
    """
    Represents one translation job.
    
    Lifecycle:
        QUEUED β†’ PREPARING β†’ TRANSLATING β†’ COMPLETED
                                         β†’ FAILED
                                         β†’ CANCELLED
    """
    task_id: str
    original_filename: str
    input_path: str
    output_path: str
    status: TaskStatus = TaskStatus.QUEUED
    created_at: float = field(default_factory=time.time)
    completed_at: float = 0.0
    error_message: str = ""
    progress: TranslationProgress = field(default_factory=TranslationProgress)
    translator: Optional[MassiveFileTranslator] = field(default=None, repr=False)
    thread: Optional[threading.Thread] = field(default=None, repr=False)

    def to_dict(self) -> dict:
        """Serialize task info for API response."""
        return {
            "task_id": self.task_id,
            "original_filename": self.original_filename,
            "status": self.status.value,
            "created_at": self.created_at,
            "completed_at": self.completed_at,
            "error_message": self.error_message,
            "progress": self.progress.to_dict(),
            "output_filename": os.path.basename(self.output_path),
        }


# ============================================================================
# TASK MANAGER β€” Singleton that manages all translation tasks
# ============================================================================
class TaskManager:
    """
    Central manager for all translation tasks.
    
    THREAD SAFETY:
    - Uses a lock for task registry modifications
    - Each task runs in its own background thread
    - Progress objects are internally thread-safe (have their own locks)
    
    SINGLE TASK ENFORCEMENT:
    - Only one translation can be ACTIVE at a time
    - If a new upload comes while one is running, it returns an error
    - This prevents Google IP bans from concurrent heavy usage
    
    FILE CLEANUP:
    - Old completed tasks' files are cleaned up after configurable time
    - Prevents disk space exhaustion on HuggingFace (limited storage)
    """

    def __init__(self, config: Optional[TranslatorConfig] = None):
        self.config = config or TranslatorConfig()
        self._tasks: Dict[str, TranslationTask] = {}
        self._lock = threading.Lock()
        self._active_task_id: Optional[str] = None

        # Auto-cleanup interval (seconds) β€” remove completed tasks after 1 hour
        self.cleanup_after_seconds = 3600

        logger.info("TaskManager initialized.")
        logger.info(f"Upload directory: {UPLOAD_DIR}")
        logger.info(f"Output directory: {OUTPUT_DIR}")

    # -----------------------------------------------------------------------
    # PUBLIC API
    # -----------------------------------------------------------------------

    def is_busy(self) -> bool:
        """Check if a translation is currently running."""
        with self._lock:
            if self._active_task_id is None:
                return False
            task = self._tasks.get(self._active_task_id)
            if task is None:
                self._active_task_id = None
                return False
            # If active task is done/failed/cancelled, we're not busy
            if task.status in (
                TaskStatus.COMPLETED,
                TaskStatus.FAILED,
                TaskStatus.CANCELLED,
            ):
                self._active_task_id = None
                return False
            return True

    def create_task(self, original_filename: str, input_path: str) -> TranslationTask:
        """
        Create a new translation task.
        
        Args:
            original_filename: The user's original file name (for display)
            input_path: Path to the uploaded file on disk
            
        Returns:
            TranslationTask object
            
        Raises:
            RuntimeError if another translation is already running
        """
        with self._lock:
            # --- Single task enforcement ---
            if self.is_busy():
                raise RuntimeError(
                    "Another translation is currently in progress. "
                    "Please wait for it to complete before uploading a new file."
                )

            # Generate unique task ID
            task_id = str(uuid.uuid4())[:12]

            # Create output file path
            # Original: "my_novel.txt" β†’ Output: "my_novel_hindi_a1b2c3d4.txt"
            name_without_ext = os.path.splitext(original_filename)[0]
            output_filename = f"{name_without_ext}_hindi_{task_id}.txt"
            output_path = os.path.join(OUTPUT_DIR, output_filename)

            # Create progress tracker
            progress = TranslationProgress()
            progress.file_name = original_filename

            # Create translator instance
            translator = MassiveFileTranslator(
                config=self.config,
                progress=progress,
            )

            # Create task
            task = TranslationTask(
                task_id=task_id,
                original_filename=original_filename,
                input_path=input_path,
                output_path=output_path,
                progress=progress,
                translator=translator,
            )

            # Register task
            self._tasks[task_id] = task
            self._active_task_id = task_id

            logger.info(
                f"Task created: {task_id} | File: {original_filename} | "
                f"Input: {input_path} | Output: {output_path}"
            )

            return task

    def start_task(self, task_id: str):
        """
        Start the translation task in a background thread.
        
        The thread runs the translator and updates progress in real-time.
        FastAPI can poll progress via get_task_progress().
        """
        with self._lock:
            task = self._tasks.get(task_id)
            if task is None:
                raise ValueError(f"Task not found: {task_id}")
            if task.status != TaskStatus.QUEUED:
                raise RuntimeError(f"Task {task_id} is not in QUEUED state")

        # Create and start background thread
        thread = threading.Thread(
            target=self._run_translation,
            args=(task,),
            name=f"translator-{task_id}",
            daemon=True,  # Thread dies if main process exits
        )
        task.thread = thread
        thread.start()

        logger.info(f"Task {task_id} started in background thread.")

    def get_task(self, task_id: str) -> Optional[TranslationTask]:
        """Get a task by ID."""
        with self._lock:
            return self._tasks.get(task_id)

    def get_task_progress(self, task_id: str) -> Optional[dict]:
        """Get task progress as a dictionary (for API response)."""
        task = self.get_task(task_id)
        if task is None:
            return None
        return task.to_dict()

    def get_latest_task(self) -> Optional[TranslationTask]:
        """Get the most recent task (for simple single-task UI)."""
        with self._lock:
            if not self._tasks:
                return None
            # Return the task with the latest created_at timestamp
            return max(self._tasks.values(), key=lambda t: t.created_at)

    def get_latest_task_progress(self) -> Optional[dict]:
        """Get progress of the most recent task."""
        task = self.get_latest_task()
        if task is None:
            return None
        return task.to_dict()

    def cancel_task(self, task_id: str) -> bool:
        """
        Cancel a running translation task.
        
        Sets a cancel flag that the translator checks periodically.
        The translator will stop after completing its current paragraph.
        Already-translated content is preserved on disk.
        """
        task = self.get_task(task_id)
        if task is None:
            return False

        if task.status not in (TaskStatus.QUEUED, TaskStatus.PREPARING, TaskStatus.TRANSLATING):
            return False  # Can't cancel a completed/failed task

        task.translator.cancel()
        task.status = TaskStatus.CANCELLED
        task.completed_at = time.time()

        with self._lock:
            if self._active_task_id == task_id:
                self._active_task_id = None

        logger.info(f"Task {task_id} cancelled.")
        return True

    def cleanup_old_tasks(self):
        """
        Remove completed/failed tasks older than cleanup_after_seconds.
        Also deletes their files from disk to free space.
        
        Called periodically (e.g., before each new upload).
        """
        now = time.time()
        to_remove = []

        with self._lock:
            for task_id, task in self._tasks.items():
                if task.status in (
                    TaskStatus.COMPLETED,
                    TaskStatus.FAILED,
                    TaskStatus.CANCELLED,
                ):
                    age = now - task.completed_at if task.completed_at > 0 else now - task.created_at
                    if age > self.cleanup_after_seconds:
                        to_remove.append(task_id)

        for task_id in to_remove:
            self._remove_task_files(task_id)
            with self._lock:
                del self._tasks[task_id]
            logger.info(f"Cleaned up old task: {task_id}")

    def get_output_path(self, task_id: str) -> Optional[str]:
        """Get the output file path for a completed task."""
        task = self.get_task(task_id)
        if task is None:
            return None
        if task.status != TaskStatus.COMPLETED:
            return None
        if not os.path.exists(task.output_path):
            return None
        return task.output_path

    # -----------------------------------------------------------------------
    # PRIVATE METHODS
    # -----------------------------------------------------------------------

    def _run_translation(self, task: TranslationTask):
        """
        The actual translation runner β€” executes in a background thread.
        
        ERROR HANDLING:
        - Any exception is caught and stored in the task
        - Task status is set to FAILED
        - The active_task_id is cleared so new tasks can be submitted
        - Already-written content is preserved on disk
        """
        try:
            task.status = TaskStatus.PREPARING
            task.progress.set_status("preparing", "Starting translation...")

            logger.info(
                f"Task {task.task_id}: Starting translation of "
                f"'{task.original_filename}'"
            )

            # Run the translation (this blocks until complete)
            task.translator.translate_file(task.input_path, task.output_path)

            # Check if it was cancelled during translation
            if task.translator._cancel_flag.is_set():
                task.status = TaskStatus.CANCELLED
                task.completed_at = time.time()
                logger.info(f"Task {task.task_id}: Cancelled during translation.")
            else:
                task.status = TaskStatus.COMPLETED
                task.completed_at = time.time()
                logger.info(
                    f"Task {task.task_id}: Translation COMPLETED. "
                    f"Output: {task.output_path}"
                )

        except Exception as e:
            task.status = TaskStatus.FAILED
            task.error_message = str(e)
            task.completed_at = time.time()
            task.progress.set_status("failed", f"Error: {str(e)}")
            logger.exception(f"Task {task.task_id}: FAILED with error: {e}")

        finally:
            # Clear active task so new uploads are accepted
            with self._lock:
                if self._active_task_id == task.task_id:
                    self._active_task_id = None

    def _remove_task_files(self, task_id: str):
        """Safely delete task files from disk."""
        task = self._tasks.get(task_id)
        if task is None:
            return

        for path in [task.input_path, task.output_path]:
            try:
                if path and os.path.exists(path):
                    os.remove(path)
                    logger.debug(f"Deleted file: {path}")
            except OSError as e:
                logger.warning(f"Could not delete {path}: {e}")


# ============================================================================
# GLOBAL SINGLETON β€” Used by FastAPI app
# ============================================================================
# Create a single TaskManager instance shared across the entire application
# This is safe because TaskManager is thread-safe internally

_global_task_manager: Optional[TaskManager] = None


def get_task_manager() -> TaskManager:
    """Get or create the global TaskManager singleton."""
    global _global_task_manager
    if _global_task_manager is None:
        config = TranslatorConfig()
        _global_task_manager = TaskManager(config=config)
    return _global_task_manager