Spaces:
Sleeping
Sleeping
File size: 1,679 Bytes
74711df | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | """Background task execution utilities."""
import logging
import traceback
from collections.abc import Callable
from typing import Any
from src.storage.db import get_session
from src.storage.repositories import TaskRepository
logger = logging.getLogger(__name__)
def execute_task(task_id: int, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
"""Executes a background task and updates its status in the database.
Args:
task_id (int): The ID of the task to update.
func (Callable): The function to execute.
args: Positional arguments for the function.
kwargs: Keyword arguments for the function.
"""
with get_session() as session:
repo = TaskRepository(session)
task = repo.update_task(task_id=task_id, status="RUNNING")
session.commit()
if not task:
logger.error(f"Task {task_id} not found to start execution.")
return
logger.info(f"Starting task {task_id} of type {task.task_type}")
try:
result = func(*args, **kwargs)
repo.update_task(
task_id=task_id,
status="COMPLETED",
output_payload={"result": result} if result is not None else {},
)
logger.info(f"Task {task_id} completed successfully")
except Exception as e:
error_msg = f"{e}\n{traceback.format_exc()}"
repo.update_task(
task_id=task_id,
status="FAILED",
error_message=error_msg,
)
logger.exception(f"Task {task_id} failed: {e}")
finally:
session.commit()
|