vod / task_manager.py
hannabaker's picture
Create task_manager.py
b675a16 verified
import os
import re
import json
import time
import sqlite3
import threading
import subprocess
from datetime import datetime
from queue import Queue
import yt_dlp
class TaskManager:
def __init__(self, socketio):
self.socketio = socketio
self.task_queue = Queue()
self.active_tasks = {}
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
def start_task(self, task_id, task_data):
"""Add task to queue"""
self.task_queue.put((task_id, task_data))
def cancel_task(self, task_id):
"""Cancel a running task"""
if task_id in self.active_tasks:
self.active_tasks[task_id]['cancelled'] = True
def _worker(self):
"""Worker thread that processes tasks"""
while True:
try:
task_id, task_data = self.task_queue.get()
self._process_task(task_id, task_data)
except Exception as e:
print(f"Worker error: {e}")
finally:
self.task_queue.task_done()
def _update_task_status(self, task_id, status, progress_data=None, error=None, file_info=None):
"""Update task status in database and emit to clients"""
conn = sqlite3.connect('/tmp/vod-archiver/tasks.db')
c = conn.cursor()
updates = ['status = ?', 'updated_at = ?']
values = [status, datetime.now()]
if progress_data:
updates.append('progress_data = ?')
values.append(json.dumps(progress_data))
if error:
updates.append('error = ?')
values.append(error)
if file_info:
updates.append('file_info = ?')
values.append(json.dumps(file_info))
values.append(task_id)
c.execute(f'UPDATE tasks SET {", ".join(updates)} WHERE id = ?', values)
conn.commit()
conn.close()
# Emit update to connected clients
self.socketio.emit('task_update', {
'task_id': task_id,
'status': status,
'progress': progress_data,
'error': error,
'file_info': file_info
}, room=task_id)
def _process_task(self, task_id, task_data):
"""Process a single task"""
self.active_tasks[task_id] = {'cancelled': False}
try:
# Update status to processing
self._update_task_status(task_id, 'processing', {'phase': 'initializing', 'percent': 0})
# Get VOD info
vod_info = self._get_vod_info(task_data['vod_url'])
if not vod_info:
raise Exception("Failed to get VOD information")
# Download VOD
downloaded_file = self._download_vod(
task_id,
task_data['vod_url'],
task_data.get('format_id', 'best'),
vod_info.get('title', 'vod')
)
if not downloaded_file:
raise Exception("Download failed or was cancelled")
# Upload to provider
provider = self._get_provider(task_data['provider'], task_data['credentials'])
upload_result = self._upload_to_provider(task_id, downloaded_file, provider)
if upload_result:
self._update_task_status(
task_id,
'completed',
{'phase': 'completed', 'percent': 100},
file_info=upload_result
)
else:
raise Exception("Upload failed")
except Exception as e:
self._update_task_status(
task_id,
'failed',
{'phase': 'error', 'percent': 0},
error=str(e)
)
finally:
# Cleanup
if task_id in self.active_tasks:
del self.active_tasks[task_id]
def _get_vod_info(self, vod_url):
"""Get VOD information"""
try:
ydl_opts = {
'quiet': True,
'no_warnings': True
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
return ydl.extract_info(vod_url, download=False)
except Exception as e:
print(f"Error getting VOD info: {e}")
return None
def _download_vod(self, task_id, vod_url, format_id, title):
"""Download VOD with progress tracking"""
# Clean filename
safe_title = re.sub(r'[^\w\s-]', '', title)
safe_title = re.sub(r'[-\s]+', '-', safe_title)[:50]
output_path = f'/tmp/vod-archiver/{task_id}_{safe_title}.%(ext)s'
def progress_hook(d):
if self.active_tasks.get(task_id, {}).get('cancelled'):
raise Exception("Task cancelled")
if d['status'] == 'downloading':
percent = 0
if d.get('total_bytes'):
percent = (d['downloaded_bytes'] / d['total_bytes']) * 100
elif d.get('total_bytes_estimate'):
percent = (d['downloaded_bytes'] / d['total_bytes_estimate']) * 100
progress_data = {
'phase': 'downloading',
'percent': percent,
'speed': d.get('speed', 0),
'eta': d.get('eta', 0),
'size': d.get('total_bytes', 0)
}
self._update_task_status(task_id, 'processing', progress_data)
ydl_opts = {
'format': format_id,
'outtmpl': output_path,
'quiet': True,
'no_warnings': True,
'progress_hooks': [progress_hook]
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([vod_url])
# Find downloaded file
for f in os.listdir('/tmp/vod-archiver'):
if f.startswith(f"{task_id}_"):
return os.path.join('/tmp/vod-archiver', f)
return None
except Exception as e:
print(f"Download error: {e}")
return None
def _get_provider(self, provider_id, credentials):
"""Get provider instance"""
if provider_id == 'mega':
from providers.mega_provider import MegaProvider
return MegaProvider(credentials['email'], credentials['password'])
elif provider_id == 'filen':
from providers.filen_provider import FilenProvider
return FilenProvider(credentials['email'], credentials['password'])
elif provider_id == 'drime':
from providers.drime_provider import DrimeProvider
return DrimeProvider(credentials['email'], credentials['password'])
else:
raise Exception(f"Unknown provider: {provider_id}")
def _upload_to_provider(self, task_id, file_path, provider):
"""Upload file to provider with progress tracking"""
def progress_callback(percent):
if self.active_tasks.get(task_id, {}).get('cancelled'):
raise Exception("Task cancelled")
self._update_task_status(task_id, 'processing', {
'phase': 'uploading',
'percent': percent
})
try:
result = provider.upload(file_path, progress_callback)
return result
except Exception as e:
print(f"Upload error: {e}")
return None