import logging from datetime import datetime, timezone from backend.services.task_store_service import task_store logger = logging.getLogger(__name__) def update_task_status(task_id: str, status: str, message: str = None, progress: int = None): """ Update the status of a task in the task store. If the task doesn't exist, create it first. Args: task_id: ID of the task to update status: New status (PENDING, PROCESSING, COMPLETED, FAILED) message: Optional status message progress: Optional progress percentage (0-100) """ try: # Check if task exists first task = task_store.get_task(task_id) if not task: # If task doesn't exist, create it first logger.info(f"Creating missing task {task_id} with status {status}") create_task(task_id, "auto_created", message or f"Auto-created task with status {status}") logger.info(f"UPDATING_TASK: id={task_id}, status={status}, progress={progress}, message={message}") # Update task with new status task_store.update_task( task_id=task_id, status=status, error=message if status == "FAILED" else None, progress=progress ) # Add message to task if provided if message and status != "FAILED": task = task_store.get_task(task_id) if task: task["message"] = message logger.debug(f"Updated task {task_id} status to {status}" + (f" with progress {progress}%" if progress is not None else "") + (f": {message}" if message else "")) except Exception as e: logger.error(f"Error updating task status: {str(e)}") # Don't propagate the exception - we want the process to continue even if task update fails def create_task(task_id: str, task_type: str, message: str = ""): """ Create a new task in the task store with a predefined ID. If a task with the given ID already exists, update its properties instead. Args: task_id: ID to use for the task task_type: Type of task message: Initial status message Returns: The task ID """ try: # Check if task already exists existing_task = task_store.get_task(task_id) if existing_task: # If it exists, just update its properties existing_task["task_type"] = task_type existing_task["message"] = message existing_task["update_timestamp"] = datetime.now(timezone.utc).isoformat() logger.info(f"Updated existing task {task_id} of type {task_type}") return task_id # The task_store.create_task generates its own ID, so we need to handle our predefined ID generated_id = task_store.create_task(task_type=task_type, params={"external_id": task_id}) # Store our task_id in the task store task = task_store.get_task(generated_id) if task: # Map our task_id to the generated one task["external_id"] = task_id task["message"] = message # Also add a duplicate entry with our ID task_store.tasks[task_id] = task_store.tasks[generated_id] logger.info(f"Created new task {task_id} of type {task_type}") return task_id except Exception as e: logger.error(f"Error creating task: {str(e)}") # Create a minimal task entry directly in the task store as a last resort try: task_store.tasks[task_id] = { "id": task_id, "task_type": task_type, "status": "PENDING", "message": message, "creation_timestamp": datetime.now(timezone.utc).isoformat(), "external_id": task_id } logger.info(f"Created fallback task entry for {task_id}") except Exception as inner_e: logger.error(f"Failed to create fallback task: {str(inner_e)}") return task_id def get_task_data(task_id: str): """ Get task data from task store with error handling. Args: task_id: ID of the task to retrieve Returns: Dictionary with task data or None if not found """ try: task = task_store.get_task(task_id) # If task not found directly, check if it's mapped in another task if not task: # Check all tasks to see if this ID is stored as an external_id for t in task_store.tasks.values(): if t.get("external_id") == task_id: task = t break if not task: return None return { "status": task.get("status", "unknown"), "progress": task.get("progress", 0), "message": task.get("message", ""), "error": task.get("error", None) } except Exception as e: logger.error(f"Error retrieving task data: {e}") return None