File size: 7,517 Bytes
6203f3a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import aiosqlite
import os
from datetime import datetime, timedelta
from app.core.config import settings
from custom_logger import logger_config as logger

async def insert_task(task_id: str, filename: str, text: str, voice: str, speed: float, status: str, hide_from_ui: int):
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        await db.execute('''INSERT INTO tasks 
                     (id, filename, text, voice, speed, status, created_at, hide_from_ui)
                     VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
                  (task_id, filename, text, voice, speed, status, datetime.now().isoformat(), hide_from_ui))
        await db.commit()
    logger.debug(f"Inserted task {task_id} into database.")

async def update_status(task_id: str, status: str, output_file: str = None, error: str = None):
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        if status == 'completed':
            await db.execute('''UPDATE tasks 
                         SET status = ?, output_file = ?, processed_at = ?, progress = 100, progress_text = 'Completed'
                         WHERE id = ?''',
                      (status, output_file, datetime.now().isoformat(), task_id))
            logger.info(f"Task ID {task_id} marked as completed.")
        elif status == 'failed':
            await db.execute('''UPDATE tasks 
                         SET status = ?, error = ?, processed_at = ?, progress_text = 'Failed'
                         WHERE id = ?''',
                      (status, str(error), datetime.now().isoformat(), task_id))
            logger.error(f"Task ID {task_id} marked as failed. Error: {error}")
        else:
            await db.execute('UPDATE tasks SET status = ? WHERE id = ?', (status, task_id))
            logger.debug(f"Task ID {task_id} status updated to {status}.")
        await db.commit()

async def update_progress(task_id: str, progress: int, progress_text: str = None):
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        await db.execute('UPDATE tasks SET progress = ?, progress_text = ? WHERE id = ?',
                  (progress, progress_text, task_id))
        await db.commit()
    logger.debug(f"Task ID {task_id} progress updated to {progress}% ({progress_text}).")

async def get_next_not_started():
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute('''SELECT * FROM tasks 
                     WHERE status = 'not_started' 
                     ORDER BY created_at ASC 
                     LIMIT 1''') as cursor:
            return await cursor.fetchone()

async def cleanup_old_entries():
    try:
        async with aiosqlite.connect(settings.DATABASE_FILE) as db:
            db.row_factory = aiosqlite.Row
            cutoff_date = (datetime.now() - timedelta(days=settings.RETENTION_DAYS)).isoformat()
            
            async with db.execute('''SELECT id, output_file FROM tasks 
                         WHERE created_at < ?''', (cutoff_date,)) as cursor:
                old_entries = await cursor.fetchall()
            
            if old_entries:
                deleted_files = 0
                deleted_rows = 0
                
                for entry in old_entries:
                    output_file = entry['output_file']
                    if output_file:
                        filepath = os.path.join(settings.UPLOAD_FOLDER, output_file)
                        if os.path.exists(filepath):
                            try:
                                os.remove(filepath)
                                deleted_files += 1
                            except Exception as e:
                                logger.warning(f"Failed to delete old file {filepath}: {e}")
                
                async with db.execute('''DELETE FROM tasks WHERE created_at < ?''', (cutoff_date,)) as cursor:
                    deleted_rows = cursor.rowcount
                await db.commit()
                
                if deleted_rows > 0 or deleted_files > 0:
                    logger.info(f"Cleanup: Deleted {deleted_rows} old entries and {deleted_files} audio files")
    except Exception as e:
        logger.error(f"Cleanup error: {e}")

async def get_average_processing_time():
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute('''SELECT created_at, processed_at FROM tasks 
                          WHERE status = 'completed' AND processed_at IS NOT NULL
                          ORDER BY processed_at DESC LIMIT 20''') as cursor:
            completed_rows = await cursor.fetchall()
        
        if not completed_rows:
            return 30.0
        
        total_seconds = 0
        count = 0
        for r in completed_rows:
            try:
                created = datetime.fromisoformat(r['created_at'])
                processed = datetime.fromisoformat(r['processed_at'])
                duration = (processed - created).total_seconds()
                if duration > 0:
                    total_seconds += duration
                    count += 1
            except:
                continue
        
        return total_seconds / count if count > 0 else 30.0

async def get_all_tasks():
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        db.row_factory = aiosqlite.Row
        
        avg_time = await get_average_processing_time()
        
        async with db.execute('''SELECT id FROM tasks 
                     WHERE status = 'not_started' 
                     ORDER BY created_at ASC''') as cursor:
            queue_ids = [row['id'] for row in await cursor.fetchall()]
        
        async with db.execute('''SELECT COUNT(*) as count FROM tasks WHERE status = 'processing' ''') as cursor:
            row = await cursor.fetchone()
            processing_count = row['count']
        
        async with db.execute('SELECT * FROM tasks WHERE hide_from_ui = 0 OR hide_from_ui IS NULL ORDER BY created_at DESC') as cursor:
            rows = await cursor.fetchall()
            
        return rows, queue_ids, processing_count, avg_time

async def get_task_by_id(task_id: str):
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute('SELECT * FROM tasks WHERE id = ?', (task_id,)) as cursor:
            row = await cursor.fetchone()
            
        if not row:
            return None
            
        queue_position = None
        estimated_start_seconds = None
        
        if row['status'] == 'not_started':
            avg_time = await get_average_processing_time()
            
            async with db.execute('''SELECT COUNT(*) as position FROM tasks 
                         WHERE status = 'not_started' AND created_at < ?''',
                      (row['created_at'],)) as cursor:
                position_row = await cursor.fetchone()
                queue_position = position_row['position'] + 1
            
            async with db.execute('''SELECT COUNT(*) as count FROM tasks WHERE status = 'processing' ''') as cursor:
                count_row = await cursor.fetchone()
                processing_count = count_row['count']
            
            tasks_ahead = queue_position - 1 + processing_count
            estimated_start_seconds = round(tasks_ahead * avg_time)
            
        return row, queue_position, estimated_start_seconds