Spaces:
Running
Running
File size: 5,238 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import logging
from datetime import datetime, timezone
from backend.services.task_store_service import task_store
logger = logging.getLogger(__name__)
def update_task_status(task_id: str, status: str, message: str = None, progress: int = None):
"""
Update the status of a task in the task store.
If the task doesn't exist, create it first.
Args:
task_id: ID of the task to update
status: New status (PENDING, PROCESSING, COMPLETED, FAILED)
message: Optional status message
progress: Optional progress percentage (0-100)
"""
try:
# Check if task exists first
task = task_store.get_task(task_id)
if not task:
# If task doesn't exist, create it first
logger.info(f"Creating missing task {task_id} with status {status}")
create_task(task_id, "auto_created", message or f"Auto-created task with status {status}")
logger.info(f"UPDATING_TASK: id={task_id}, status={status}, progress={progress}, message={message}")
# Update task with new status
task_store.update_task(
task_id=task_id,
status=status,
error=message if status == "FAILED" else None,
progress=progress
)
# Add message to task if provided
if message and status != "FAILED":
task = task_store.get_task(task_id)
if task:
task["message"] = message
logger.debug(f"Updated task {task_id} status to {status}" +
(f" with progress {progress}%" if progress is not None else "") +
(f": {message}" if message else ""))
except Exception as e:
logger.error(f"Error updating task status: {str(e)}")
# Don't propagate the exception - we want the process to continue even if task update fails
def create_task(task_id: str, task_type: str, message: str = ""):
"""
Create a new task in the task store with a predefined ID.
If a task with the given ID already exists, update its properties instead.
Args:
task_id: ID to use for the task
task_type: Type of task
message: Initial status message
Returns:
The task ID
"""
try:
# Check if task already exists
existing_task = task_store.get_task(task_id)
if existing_task:
# If it exists, just update its properties
existing_task["task_type"] = task_type
existing_task["message"] = message
existing_task["update_timestamp"] = datetime.now(timezone.utc).isoformat()
logger.info(f"Updated existing task {task_id} of type {task_type}")
return task_id
# The task_store.create_task generates its own ID, so we need to handle our predefined ID
generated_id = task_store.create_task(task_type=task_type, params={"external_id": task_id})
# Store our task_id in the task store
task = task_store.get_task(generated_id)
if task:
# Map our task_id to the generated one
task["external_id"] = task_id
task["message"] = message
# Also add a duplicate entry with our ID
task_store.tasks[task_id] = task_store.tasks[generated_id]
logger.info(f"Created new task {task_id} of type {task_type}")
return task_id
except Exception as e:
logger.error(f"Error creating task: {str(e)}")
# Create a minimal task entry directly in the task store as a last resort
try:
task_store.tasks[task_id] = {
"id": task_id,
"task_type": task_type,
"status": "PENDING",
"message": message,
"creation_timestamp": datetime.now(timezone.utc).isoformat(),
"external_id": task_id
}
logger.info(f"Created fallback task entry for {task_id}")
except Exception as inner_e:
logger.error(f"Failed to create fallback task: {str(inner_e)}")
return task_id
def get_task_data(task_id: str):
"""
Get task data from task store with error handling.
Args:
task_id: ID of the task to retrieve
Returns:
Dictionary with task data or None if not found
"""
try:
task = task_store.get_task(task_id)
# If task not found directly, check if it's mapped in another task
if not task:
# Check all tasks to see if this ID is stored as an external_id
for t in task_store.tasks.values():
if t.get("external_id") == task_id:
task = t
break
if not task:
return None
return {
"status": task.get("status", "unknown"),
"progress": task.get("progress", 0),
"message": task.get("message", ""),
"error": task.get("error", None)
}
except Exception as e:
logger.error(f"Error retrieving task data: {e}")
return None |