import os import re import json import time import sqlite3 import threading import subprocess from datetime import datetime from queue import Queue import yt_dlp class TaskManager: def __init__(self, socketio): self.socketio = socketio self.task_queue = Queue() self.active_tasks = {} self.worker_thread = threading.Thread(target=self._worker, daemon=True) self.worker_thread.start() def start_task(self, task_id, task_data): """Add task to queue""" self.task_queue.put((task_id, task_data)) def cancel_task(self, task_id): """Cancel a running task""" if task_id in self.active_tasks: self.active_tasks[task_id]['cancelled'] = True def _worker(self): """Worker thread that processes tasks""" while True: try: task_id, task_data = self.task_queue.get() self._process_task(task_id, task_data) except Exception as e: print(f"Worker error: {e}") finally: self.task_queue.task_done() def _update_task_status(self, task_id, status, progress_data=None, error=None, file_info=None): """Update task status in database and emit to clients""" conn = sqlite3.connect('/tmp/vod-archiver/tasks.db') c = conn.cursor() updates = ['status = ?', 'updated_at = ?'] values = [status, datetime.now()] if progress_data: updates.append('progress_data = ?') values.append(json.dumps(progress_data)) if error: updates.append('error = ?') values.append(error) if file_info: updates.append('file_info = ?') values.append(json.dumps(file_info)) values.append(task_id) c.execute(f'UPDATE tasks SET {", ".join(updates)} WHERE id = ?', values) conn.commit() conn.close() # Emit update to connected clients self.socketio.emit('task_update', { 'task_id': task_id, 'status': status, 'progress': progress_data, 'error': error, 'file_info': file_info }, room=task_id) def _process_task(self, task_id, task_data): """Process a single task""" self.active_tasks[task_id] = {'cancelled': False} try: # Update status to processing self._update_task_status(task_id, 'processing', {'phase': 'initializing', 'percent': 0}) # Get VOD info vod_info = self._get_vod_info(task_data['vod_url']) if not vod_info: raise Exception("Failed to get VOD information") # Download VOD downloaded_file = self._download_vod( task_id, task_data['vod_url'], task_data.get('format_id', 'best'), vod_info.get('title', 'vod') ) if not downloaded_file: raise Exception("Download failed or was cancelled") # Upload to provider provider = self._get_provider(task_data['provider'], task_data['credentials']) upload_result = self._upload_to_provider(task_id, downloaded_file, provider) if upload_result: self._update_task_status( task_id, 'completed', {'phase': 'completed', 'percent': 100}, file_info=upload_result ) else: raise Exception("Upload failed") except Exception as e: self._update_task_status( task_id, 'failed', {'phase': 'error', 'percent': 0}, error=str(e) ) finally: # Cleanup if task_id in self.active_tasks: del self.active_tasks[task_id] def _get_vod_info(self, vod_url): """Get VOD information""" try: ydl_opts = { 'quiet': True, 'no_warnings': True } with yt_dlp.YoutubeDL(ydl_opts) as ydl: return ydl.extract_info(vod_url, download=False) except Exception as e: print(f"Error getting VOD info: {e}") return None def _download_vod(self, task_id, vod_url, format_id, title): """Download VOD with progress tracking""" # Clean filename safe_title = re.sub(r'[^\w\s-]', '', title) safe_title = re.sub(r'[-\s]+', '-', safe_title)[:50] output_path = f'/tmp/vod-archiver/{task_id}_{safe_title}.%(ext)s' def progress_hook(d): if self.active_tasks.get(task_id, {}).get('cancelled'): raise Exception("Task cancelled") if d['status'] == 'downloading': percent = 0 if d.get('total_bytes'): percent = (d['downloaded_bytes'] / d['total_bytes']) * 100 elif d.get('total_bytes_estimate'): percent = (d['downloaded_bytes'] / d['total_bytes_estimate']) * 100 progress_data = { 'phase': 'downloading', 'percent': percent, 'speed': d.get('speed', 0), 'eta': d.get('eta', 0), 'size': d.get('total_bytes', 0) } self._update_task_status(task_id, 'processing', progress_data) ydl_opts = { 'format': format_id, 'outtmpl': output_path, 'quiet': True, 'no_warnings': True, 'progress_hooks': [progress_hook] } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([vod_url]) # Find downloaded file for f in os.listdir('/tmp/vod-archiver'): if f.startswith(f"{task_id}_"): return os.path.join('/tmp/vod-archiver', f) return None except Exception as e: print(f"Download error: {e}") return None def _get_provider(self, provider_id, credentials): """Get provider instance""" if provider_id == 'mega': from providers.mega_provider import MegaProvider return MegaProvider(credentials['email'], credentials['password']) elif provider_id == 'filen': from providers.filen_provider import FilenProvider return FilenProvider(credentials['email'], credentials['password']) elif provider_id == 'drime': from providers.drime_provider import DrimeProvider return DrimeProvider(credentials['email'], credentials['password']) else: raise Exception(f"Unknown provider: {provider_id}") def _upload_to_provider(self, task_id, file_path, provider): """Upload file to provider with progress tracking""" def progress_callback(percent): if self.active_tasks.get(task_id, {}).get('cancelled'): raise Exception("Task cancelled") self._update_task_status(task_id, 'processing', { 'phase': 'uploading', 'percent': percent }) try: result = provider.upload(file_path, progress_callback) return result except Exception as e: print(f"Upload error: {e}") return None