File size: 6,670 Bytes
1a252b6
 
 
 
 
fc2f017
1a252b6
 
fc2f017
 
 
1a252b6
fc2f017
1a252b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import aiosqlite
from datetime import datetime, timedelta
from app.core.config import settings
from custom_logger import logger_config as logger

async def insert_task(task_id: str, input_text: str, system_prompt: str, status: str, hide_from_ui: int, model: str = 'qwen'):
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        await db.execute('''INSERT INTO text_tasks 
                     (id, input_text, system_prompt, model, status, created_at, hide_from_ui)
                     VALUES (?, ?, ?, ?, ?, ?, ?)''',
                  (task_id, input_text, system_prompt, model, status, datetime.now().isoformat(), hide_from_ui))
        await db.commit()
    logger.debug(f"Inserted task (ID: {task_id}, model: {model}) into database.")

async def update_status(task_id: str, status: str, result: str = None, error: str = None):
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        if status == 'completed':
            await db.execute('''UPDATE text_tasks 
                         SET status = ?, result = ?, processed_at = ?, progress = 100, progress_text = 'Completed'
                         WHERE id = ?''',
                      (status, result, datetime.now().isoformat(), task_id))
            logger.info(f"Task ID {task_id} marked as completed.")
        elif status == 'failed':
            await db.execute('''UPDATE text_tasks 
                         SET status = ?, result = ?, processed_at = ?, progress_text = 'Failed'
                         WHERE id = ?''',
                      (status, f"Error: {error}", datetime.now().isoformat(), task_id))
            logger.error(f"Task ID {task_id} marked as failed. Error: {error}")
        else:
            await db.execute('UPDATE text_tasks SET status = ? WHERE id = ?', (status, task_id))
            logger.debug(f"Task ID {task_id} status updated to {status}.")
        await db.commit()

async def update_progress(task_id: str, progress: int, progress_text: str = None):
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        await db.execute('UPDATE text_tasks SET progress = ?, progress_text = ? WHERE id = ?',
                  (progress, progress_text, task_id))
        await db.commit()
    logger.debug(f"Task ID {task_id} progress updated to {progress}% ({progress_text}).")

async def get_next_not_started():
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute('''SELECT * FROM text_tasks 
                     WHERE status = 'not_started' 
                     ORDER BY created_at ASC 
                     LIMIT 1''') as cursor:
            return await cursor.fetchone()

async def cleanup_old_entries():
    try:
        async with aiosqlite.connect(settings.DATABASE_FILE) as db:
            db.row_factory = aiosqlite.Row
            cutoff_date = (datetime.now() - timedelta(days=settings.CLEANUP_DAYS)).isoformat()
            
            async with db.execute('''DELETE FROM text_tasks WHERE created_at < ?''', (cutoff_date,)) as cursor:
                deleted_rows = cursor.rowcount
            await db.commit()
            
            if deleted_rows > 0:
                logger.info(f"Cleanup: Deleted {deleted_rows} old entries (older than {settings.CLEANUP_DAYS} days)")
    except Exception as e:
        logger.error(f"Cleanup error: {e}")

async def get_average_processing_time():
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute('''SELECT created_at, processed_at FROM text_tasks 
                          WHERE status = 'completed' AND processed_at IS NOT NULL
                          ORDER BY processed_at DESC LIMIT 20''') as cursor:
            completed_rows = await cursor.fetchall()
        
        if not completed_rows:
            return 30.0
        
        total_seconds = 0
        count = 0
        for r in completed_rows:
            try:
                created = datetime.fromisoformat(r['created_at'])
                processed = datetime.fromisoformat(r['processed_at'])
                duration = (processed - created).total_seconds()
                if duration > 0:
                    total_seconds += duration
                    count += 1
            except:
                continue
        
        return total_seconds / count if count > 0 else 30.0

async def get_all_tasks():
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        db.row_factory = aiosqlite.Row
        
        avg_time = await get_average_processing_time()
        
        async with db.execute('''SELECT id FROM text_tasks 
                     WHERE status = 'not_started' 
                     ORDER BY created_at ASC''') as cursor:
            queue_ids = [row['id'] for row in await cursor.fetchall()]
        
        async with db.execute('''SELECT COUNT(*) as count FROM text_tasks WHERE status = 'processing' ''') as cursor:
            row = await cursor.fetchone()
            processing_count = row['count']
        
        async with db.execute('SELECT * FROM text_tasks WHERE hide_from_ui = 0 OR hide_from_ui IS NULL ORDER BY created_at DESC') as cursor:
            rows = await cursor.fetchall()
            
        return rows, queue_ids, processing_count, avg_time

async def get_task_by_id(task_id: str):
    async with aiosqlite.connect(settings.DATABASE_FILE) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute('SELECT * FROM text_tasks WHERE id = ?', (task_id,)) as cursor:
            row = await cursor.fetchone()
            
        if not row:
            return None
            
        queue_position = None
        estimated_start_seconds = None
        
        if row['status'] == 'not_started':
            avg_time = await get_average_processing_time()
            
            async with db.execute('''SELECT COUNT(*) as position FROM text_tasks 
                         WHERE status = 'not_started' AND created_at < ?''',
                      (row['created_at'],)) as cursor:
                position_row = await cursor.fetchone()
                queue_position = position_row['position'] + 1
            
            async with db.execute('''SELECT COUNT(*) as count FROM text_tasks WHERE status = 'processing' ''') as cursor:
                count_row = await cursor.fetchone()
                processing_count = count_row['count']
            
            tasks_ahead = queue_position - 1 + processing_count
            estimated_start_seconds = round(tasks_ahead * avg_time)
            
        return row, queue_position, estimated_start_seconds