File size: 7,817 Bytes
b675a16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import os
import re
import json
import time
import sqlite3
import threading
import subprocess
from datetime import datetime
from queue import Queue
import yt_dlp

class TaskManager:
    def __init__(self, socketio):
        self.socketio = socketio
        self.task_queue = Queue()
        self.active_tasks = {}
        self.worker_thread = threading.Thread(target=self._worker, daemon=True)
        self.worker_thread.start()
    
    def start_task(self, task_id, task_data):
        """Add task to queue"""
        self.task_queue.put((task_id, task_data))
    
    def cancel_task(self, task_id):
        """Cancel a running task"""
        if task_id in self.active_tasks:
            self.active_tasks[task_id]['cancelled'] = True
    
    def _worker(self):
        """Worker thread that processes tasks"""
        while True:
            try:
                task_id, task_data = self.task_queue.get()
                self._process_task(task_id, task_data)
            except Exception as e:
                print(f"Worker error: {e}")
            finally:
                self.task_queue.task_done()
    
    def _update_task_status(self, task_id, status, progress_data=None, error=None, file_info=None):
        """Update task status in database and emit to clients"""
        conn = sqlite3.connect('/tmp/vod-archiver/tasks.db')
        c = conn.cursor()
        
        updates = ['status = ?', 'updated_at = ?']
        values = [status, datetime.now()]
        
        if progress_data:
            updates.append('progress_data = ?')
            values.append(json.dumps(progress_data))
        
        if error:
            updates.append('error = ?')
            values.append(error)
        
        if file_info:
            updates.append('file_info = ?')
            values.append(json.dumps(file_info))
        
        values.append(task_id)
        
        c.execute(f'UPDATE tasks SET {", ".join(updates)} WHERE id = ?', values)
        conn.commit()
        conn.close()
        
        # Emit update to connected clients
        self.socketio.emit('task_update', {
            'task_id': task_id,
            'status': status,
            'progress': progress_data,
            'error': error,
            'file_info': file_info
        }, room=task_id)
    
    def _process_task(self, task_id, task_data):
        """Process a single task"""
        self.active_tasks[task_id] = {'cancelled': False}
        
        try:
            # Update status to processing
            self._update_task_status(task_id, 'processing', {'phase': 'initializing', 'percent': 0})
            
            # Get VOD info
            vod_info = self._get_vod_info(task_data['vod_url'])
            if not vod_info:
                raise Exception("Failed to get VOD information")
            
            # Download VOD
            downloaded_file = self._download_vod(
                task_id,
                task_data['vod_url'],
                task_data.get('format_id', 'best'),
                vod_info.get('title', 'vod')
            )
            
            if not downloaded_file:
                raise Exception("Download failed or was cancelled")
            
            # Upload to provider
            provider = self._get_provider(task_data['provider'], task_data['credentials'])
            upload_result = self._upload_to_provider(task_id, downloaded_file, provider)
            
            if upload_result:
                self._update_task_status(
                    task_id,
                    'completed',
                    {'phase': 'completed', 'percent': 100},
                    file_info=upload_result
                )
            else:
                raise Exception("Upload failed")
            
        except Exception as e:
            self._update_task_status(
                task_id,
                'failed',
                {'phase': 'error', 'percent': 0},
                error=str(e)
            )
        finally:
            # Cleanup
            if task_id in self.active_tasks:
                del self.active_tasks[task_id]
    
    def _get_vod_info(self, vod_url):
        """Get VOD information"""
        try:
            ydl_opts = {
                'quiet': True,
                'no_warnings': True
            }
            
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                return ydl.extract_info(vod_url, download=False)
        except Exception as e:
            print(f"Error getting VOD info: {e}")
            return None
    
    def _download_vod(self, task_id, vod_url, format_id, title):
        """Download VOD with progress tracking"""
        # Clean filename
        safe_title = re.sub(r'[^\w\s-]', '', title)
        safe_title = re.sub(r'[-\s]+', '-', safe_title)[:50]
        output_path = f'/tmp/vod-archiver/{task_id}_{safe_title}.%(ext)s'
        
        def progress_hook(d):
            if self.active_tasks.get(task_id, {}).get('cancelled'):
                raise Exception("Task cancelled")
            
            if d['status'] == 'downloading':
                percent = 0
                if d.get('total_bytes'):
                    percent = (d['downloaded_bytes'] / d['total_bytes']) * 100
                elif d.get('total_bytes_estimate'):
                    percent = (d['downloaded_bytes'] / d['total_bytes_estimate']) * 100
                
                progress_data = {
                    'phase': 'downloading',
                    'percent': percent,
                    'speed': d.get('speed', 0),
                    'eta': d.get('eta', 0),
                    'size': d.get('total_bytes', 0)
                }
                
                self._update_task_status(task_id, 'processing', progress_data)
        
        ydl_opts = {
            'format': format_id,
            'outtmpl': output_path,
            'quiet': True,
            'no_warnings': True,
            'progress_hooks': [progress_hook]
        }
        
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([vod_url])
            
            # Find downloaded file
            for f in os.listdir('/tmp/vod-archiver'):
                if f.startswith(f"{task_id}_"):
                    return os.path.join('/tmp/vod-archiver', f)
            
            return None
            
        except Exception as e:
            print(f"Download error: {e}")
            return None
    
    def _get_provider(self, provider_id, credentials):
        """Get provider instance"""
        if provider_id == 'mega':
            from providers.mega_provider import MegaProvider
            return MegaProvider(credentials['email'], credentials['password'])
        elif provider_id == 'filen':
            from providers.filen_provider import FilenProvider
            return FilenProvider(credentials['email'], credentials['password'])
        elif provider_id == 'drime':
            from providers.drime_provider import DrimeProvider
            return DrimeProvider(credentials['email'], credentials['password'])
        else:
            raise Exception(f"Unknown provider: {provider_id}")
    
    def _upload_to_provider(self, task_id, file_path, provider):
        """Upload file to provider with progress tracking"""
        def progress_callback(percent):
            if self.active_tasks.get(task_id, {}).get('cancelled'):
                raise Exception("Task cancelled")
            
            self._update_task_status(task_id, 'processing', {
                'phase': 'uploading',
                'percent': percent
            })
        
        try:
            result = provider.upload(file_path, progress_callback)
            return result
        except Exception as e:
            print(f"Upload error: {e}")
            return None