Spaces:
Paused
Paused
| import os | |
| import re | |
| import json | |
| import time | |
| import sqlite3 | |
| import threading | |
| import subprocess | |
| from datetime import datetime | |
| from queue import Queue | |
| import yt_dlp | |
| class TaskManager: | |
| def __init__(self, socketio): | |
| self.socketio = socketio | |
| self.task_queue = Queue() | |
| self.active_tasks = {} | |
| self.worker_thread = threading.Thread(target=self._worker, daemon=True) | |
| self.worker_thread.start() | |
| def start_task(self, task_id, task_data): | |
| """Add task to queue""" | |
| self.task_queue.put((task_id, task_data)) | |
| def cancel_task(self, task_id): | |
| """Cancel a running task""" | |
| if task_id in self.active_tasks: | |
| self.active_tasks[task_id]['cancelled'] = True | |
| def _worker(self): | |
| """Worker thread that processes tasks""" | |
| while True: | |
| try: | |
| task_id, task_data = self.task_queue.get() | |
| self._process_task(task_id, task_data) | |
| except Exception as e: | |
| print(f"Worker error: {e}") | |
| finally: | |
| self.task_queue.task_done() | |
| def _update_task_status(self, task_id, status, progress_data=None, error=None, file_info=None): | |
| """Update task status in database and emit to clients""" | |
| conn = sqlite3.connect('/tmp/vod-archiver/tasks.db') | |
| c = conn.cursor() | |
| updates = ['status = ?', 'updated_at = ?'] | |
| values = [status, datetime.now()] | |
| if progress_data: | |
| updates.append('progress_data = ?') | |
| values.append(json.dumps(progress_data)) | |
| if error: | |
| updates.append('error = ?') | |
| values.append(error) | |
| if file_info: | |
| updates.append('file_info = ?') | |
| values.append(json.dumps(file_info)) | |
| values.append(task_id) | |
| c.execute(f'UPDATE tasks SET {", ".join(updates)} WHERE id = ?', values) | |
| conn.commit() | |
| conn.close() | |
| # Emit update to connected clients | |
| self.socketio.emit('task_update', { | |
| 'task_id': task_id, | |
| 'status': status, | |
| 'progress': progress_data, | |
| 'error': error, | |
| 'file_info': file_info | |
| }, room=task_id) | |
| def _process_task(self, task_id, task_data): | |
| """Process a single task""" | |
| self.active_tasks[task_id] = {'cancelled': False} | |
| try: | |
| # Update status to processing | |
| self._update_task_status(task_id, 'processing', {'phase': 'initializing', 'percent': 0}) | |
| # Get VOD info | |
| vod_info = self._get_vod_info(task_data['vod_url']) | |
| if not vod_info: | |
| raise Exception("Failed to get VOD information") | |
| # Download VOD | |
| downloaded_file = self._download_vod( | |
| task_id, | |
| task_data['vod_url'], | |
| task_data.get('format_id', 'best'), | |
| vod_info.get('title', 'vod') | |
| ) | |
| if not downloaded_file: | |
| raise Exception("Download failed or was cancelled") | |
| # Upload to provider | |
| provider = self._get_provider(task_data['provider'], task_data['credentials']) | |
| upload_result = self._upload_to_provider(task_id, downloaded_file, provider) | |
| if upload_result: | |
| self._update_task_status( | |
| task_id, | |
| 'completed', | |
| {'phase': 'completed', 'percent': 100}, | |
| file_info=upload_result | |
| ) | |
| else: | |
| raise Exception("Upload failed") | |
| except Exception as e: | |
| self._update_task_status( | |
| task_id, | |
| 'failed', | |
| {'phase': 'error', 'percent': 0}, | |
| error=str(e) | |
| ) | |
| finally: | |
| # Cleanup | |
| if task_id in self.active_tasks: | |
| del self.active_tasks[task_id] | |
| def _get_vod_info(self, vod_url): | |
| """Get VOD information""" | |
| try: | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| return ydl.extract_info(vod_url, download=False) | |
| except Exception as e: | |
| print(f"Error getting VOD info: {e}") | |
| return None | |
| def _download_vod(self, task_id, vod_url, format_id, title): | |
| """Download VOD with progress tracking""" | |
| # Clean filename | |
| safe_title = re.sub(r'[^\w\s-]', '', title) | |
| safe_title = re.sub(r'[-\s]+', '-', safe_title)[:50] | |
| output_path = f'/tmp/vod-archiver/{task_id}_{safe_title}.%(ext)s' | |
| def progress_hook(d): | |
| if self.active_tasks.get(task_id, {}).get('cancelled'): | |
| raise Exception("Task cancelled") | |
| if d['status'] == 'downloading': | |
| percent = 0 | |
| if d.get('total_bytes'): | |
| percent = (d['downloaded_bytes'] / d['total_bytes']) * 100 | |
| elif d.get('total_bytes_estimate'): | |
| percent = (d['downloaded_bytes'] / d['total_bytes_estimate']) * 100 | |
| progress_data = { | |
| 'phase': 'downloading', | |
| 'percent': percent, | |
| 'speed': d.get('speed', 0), | |
| 'eta': d.get('eta', 0), | |
| 'size': d.get('total_bytes', 0) | |
| } | |
| self._update_task_status(task_id, 'processing', progress_data) | |
| ydl_opts = { | |
| 'format': format_id, | |
| 'outtmpl': output_path, | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'progress_hooks': [progress_hook] | |
| } | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([vod_url]) | |
| # Find downloaded file | |
| for f in os.listdir('/tmp/vod-archiver'): | |
| if f.startswith(f"{task_id}_"): | |
| return os.path.join('/tmp/vod-archiver', f) | |
| return None | |
| except Exception as e: | |
| print(f"Download error: {e}") | |
| return None | |
| def _get_provider(self, provider_id, credentials): | |
| """Get provider instance""" | |
| if provider_id == 'mega': | |
| from providers.mega_provider import MegaProvider | |
| return MegaProvider(credentials['email'], credentials['password']) | |
| elif provider_id == 'filen': | |
| from providers.filen_provider import FilenProvider | |
| return FilenProvider(credentials['email'], credentials['password']) | |
| elif provider_id == 'drime': | |
| from providers.drime_provider import DrimeProvider | |
| return DrimeProvider(credentials['email'], credentials['password']) | |
| else: | |
| raise Exception(f"Unknown provider: {provider_id}") | |
| def _upload_to_provider(self, task_id, file_path, provider): | |
| """Upload file to provider with progress tracking""" | |
| def progress_callback(percent): | |
| if self.active_tasks.get(task_id, {}).get('cancelled'): | |
| raise Exception("Task cancelled") | |
| self._update_task_status(task_id, 'processing', { | |
| 'phase': 'uploading', | |
| 'percent': percent | |
| }) | |
| try: | |
| result = provider.upload(file_path, progress_callback) | |
| return result | |
| except Exception as e: | |
| print(f"Upload error: {e}") | |
| return None |