import aiosqlite import os from datetime import datetime, timedelta from app.core.config import settings from custom_logger import logger_config as logger async def insert_task(task_id: str, filename: str, filepath: str, status: str, hide_from_ui: int): async with aiosqlite.connect(settings.DATABASE_FILE) as db: await db.execute('''INSERT INTO tasks (id, filename, filepath, status, created_at, hide_from_ui) VALUES (?, ?, ?, ?, ?, ?)''', (task_id, filename, filepath, status, datetime.now().isoformat(), hide_from_ui)) await db.commit() logger.debug(f"Inserted task {filename} (ID: {task_id}) into database.") async def update_status(task_id: str, status: str, result: str = None, error: str = None): async with aiosqlite.connect(settings.DATABASE_FILE) as db: if status == 'completed': await db.execute('''UPDATE tasks SET status = ?, result = ?, processed_at = ?, progress = 100, progress_text = 'Completed' WHERE id = ?''', (status, result, datetime.now().isoformat(), task_id)) logger.info(f"Task ID {task_id} marked as completed.") elif status == 'failed': await db.execute('''UPDATE tasks SET status = ?, result = ?, processed_at = ?, progress_text = 'Failed' WHERE id = ?''', (status, f"Error: {error}", datetime.now().isoformat(), task_id)) logger.error(f"Task ID {task_id} marked as failed. Error: {error}") else: await db.execute('UPDATE tasks SET status = ? WHERE id = ?', (status, task_id)) logger.debug(f"Task ID {task_id} status updated to {status}.") await db.commit() async def update_progress(task_id: str, progress: int, progress_text: str = None): async with aiosqlite.connect(settings.DATABASE_FILE) as db: await db.execute('UPDATE tasks SET progress = ?, progress_text = ? WHERE id = ?', (progress, progress_text, task_id)) await db.commit() logger.debug(f"Task ID {task_id} progress updated to {progress}% ({progress_text}).") async def get_next_not_started(): async with aiosqlite.connect(settings.DATABASE_FILE) as db: db.row_factory = aiosqlite.Row async with db.execute('''SELECT * FROM tasks WHERE status = 'not_started' ORDER BY created_at ASC LIMIT 1''') as cursor: return await cursor.fetchone() async def cleanup_old_entries(): try: async with aiosqlite.connect(settings.DATABASE_FILE) as db: db.row_factory = aiosqlite.Row cutoff_date = (datetime.now() - timedelta(days=10)).isoformat() async with db.execute('''SELECT id, filepath FROM tasks WHERE created_at < ?''', (cutoff_date,)) as cursor: old_entries = await cursor.fetchall() if old_entries: deleted_files = 0 deleted_rows = 0 for entry in old_entries: filepath = entry['filepath'] if filepath and os.path.exists(filepath): try: os.remove(filepath) deleted_files += 1 except Exception as e: logger.warning(f"Failed to delete old file {filepath}: {e}") # Use a separate execution for deletion to get rowcount correctly if needed, # or just run it. aiosqlite doesn't have cursor.rowcount directly on execute sometimes? # Actually it does. async with db.execute('''DELETE FROM tasks WHERE created_at < ?''', (cutoff_date,)) as cursor: deleted_rows = cursor.rowcount await db.commit() if deleted_rows > 0 or deleted_files > 0: logger.info(f"Cleanup: Deleted {deleted_rows} old entries and {deleted_files} files (older than 10 days)") except Exception as e: logger.error(f"Cleanup error: {e}") async def get_average_processing_time(): async with aiosqlite.connect(settings.DATABASE_FILE) as db: db.row_factory = aiosqlite.Row async with db.execute('''SELECT created_at, processed_at FROM tasks WHERE status = 'completed' AND processed_at IS NOT NULL ORDER BY processed_at DESC LIMIT 20''') as cursor: completed_rows = await cursor.fetchall() if not completed_rows: return 30.0 total_seconds = 0 count = 0 for r in completed_rows: try: created = datetime.fromisoformat(r['created_at']) processed = datetime.fromisoformat(r['processed_at']) duration = (processed - created).total_seconds() if duration > 0: total_seconds += duration count += 1 except: continue return total_seconds / count if count > 0 else 30.0 async def get_all_tasks(): async with aiosqlite.connect(settings.DATABASE_FILE) as db: db.row_factory = aiosqlite.Row avg_time = await get_average_processing_time() async with db.execute('''SELECT id FROM tasks WHERE status = 'not_started' ORDER BY created_at ASC''') as cursor: queue_ids = [row['id'] for row in await cursor.fetchall()] async with db.execute('''SELECT COUNT(*) as count FROM tasks WHERE status = 'processing' ''') as cursor: row = await cursor.fetchone() processing_count = row['count'] async with db.execute('SELECT * FROM tasks WHERE hide_from_ui = 0 OR hide_from_ui IS NULL ORDER BY created_at DESC') as cursor: rows = await cursor.fetchall() return rows, queue_ids, processing_count, avg_time async def get_task_by_id(task_id: str): async with aiosqlite.connect(settings.DATABASE_FILE) as db: db.row_factory = aiosqlite.Row async with db.execute('SELECT * FROM tasks WHERE id = ?', (task_id,)) as cursor: row = await cursor.fetchone() if not row: return None queue_position = None estimated_start_seconds = None if row['status'] == 'not_started': avg_time = await get_average_processing_time() async with db.execute('''SELECT COUNT(*) as position FROM tasks WHERE status = 'not_started' AND created_at < ?''', (row['created_at'],)) as cursor: position_row = await cursor.fetchone() queue_position = position_row['position'] + 1 async with db.execute('''SELECT COUNT(*) as count FROM tasks WHERE status = 'processing' ''') as cursor: count_row = await cursor.fetchone() processing_count = count_row['count'] tasks_ahead = queue_position - 1 + processing_count estimated_start_seconds = round(tasks_ahead * avg_time) return row, queue_position, estimated_start_seconds