Spaces:
Running
Running
| import logging | |
| from datetime import datetime, timezone | |
| from backend.services.task_store_service import task_store | |
| logger = logging.getLogger(__name__) | |
| def update_task_status(task_id: str, status: str, message: str = None, progress: int = None): | |
| """ | |
| Update the status of a task in the task store. | |
| If the task doesn't exist, create it first. | |
| Args: | |
| task_id: ID of the task to update | |
| status: New status (PENDING, PROCESSING, COMPLETED, FAILED) | |
| message: Optional status message | |
| progress: Optional progress percentage (0-100) | |
| """ | |
| try: | |
| # Check if task exists first | |
| task = task_store.get_task(task_id) | |
| if not task: | |
| # If task doesn't exist, create it first | |
| logger.info(f"Creating missing task {task_id} with status {status}") | |
| create_task(task_id, "auto_created", message or f"Auto-created task with status {status}") | |
| logger.info(f"UPDATING_TASK: id={task_id}, status={status}, progress={progress}, message={message}") | |
| # Update task with new status | |
| task_store.update_task( | |
| task_id=task_id, | |
| status=status, | |
| error=message if status == "FAILED" else None, | |
| progress=progress | |
| ) | |
| # Add message to task if provided | |
| if message and status != "FAILED": | |
| task = task_store.get_task(task_id) | |
| if task: | |
| task["message"] = message | |
| logger.debug(f"Updated task {task_id} status to {status}" + | |
| (f" with progress {progress}%" if progress is not None else "") + | |
| (f": {message}" if message else "")) | |
| except Exception as e: | |
| logger.error(f"Error updating task status: {str(e)}") | |
| # Don't propagate the exception - we want the process to continue even if task update fails | |
| def create_task(task_id: str, task_type: str, message: str = ""): | |
| """ | |
| Create a new task in the task store with a predefined ID. | |
| If a task with the given ID already exists, update its properties instead. | |
| Args: | |
| task_id: ID to use for the task | |
| task_type: Type of task | |
| message: Initial status message | |
| Returns: | |
| The task ID | |
| """ | |
| try: | |
| # Check if task already exists | |
| existing_task = task_store.get_task(task_id) | |
| if existing_task: | |
| # If it exists, just update its properties | |
| existing_task["task_type"] = task_type | |
| existing_task["message"] = message | |
| existing_task["update_timestamp"] = datetime.now(timezone.utc).isoformat() | |
| logger.info(f"Updated existing task {task_id} of type {task_type}") | |
| return task_id | |
| # The task_store.create_task generates its own ID, so we need to handle our predefined ID | |
| generated_id = task_store.create_task(task_type=task_type, params={"external_id": task_id}) | |
| # Store our task_id in the task store | |
| task = task_store.get_task(generated_id) | |
| if task: | |
| # Map our task_id to the generated one | |
| task["external_id"] = task_id | |
| task["message"] = message | |
| # Also add a duplicate entry with our ID | |
| task_store.tasks[task_id] = task_store.tasks[generated_id] | |
| logger.info(f"Created new task {task_id} of type {task_type}") | |
| return task_id | |
| except Exception as e: | |
| logger.error(f"Error creating task: {str(e)}") | |
| # Create a minimal task entry directly in the task store as a last resort | |
| try: | |
| task_store.tasks[task_id] = { | |
| "id": task_id, | |
| "task_type": task_type, | |
| "status": "PENDING", | |
| "message": message, | |
| "creation_timestamp": datetime.now(timezone.utc).isoformat(), | |
| "external_id": task_id | |
| } | |
| logger.info(f"Created fallback task entry for {task_id}") | |
| except Exception as inner_e: | |
| logger.error(f"Failed to create fallback task: {str(inner_e)}") | |
| return task_id | |
| def get_task_data(task_id: str): | |
| """ | |
| Get task data from task store with error handling. | |
| Args: | |
| task_id: ID of the task to retrieve | |
| Returns: | |
| Dictionary with task data or None if not found | |
| """ | |
| try: | |
| task = task_store.get_task(task_id) | |
| # If task not found directly, check if it's mapped in another task | |
| if not task: | |
| # Check all tasks to see if this ID is stored as an external_id | |
| for t in task_store.tasks.values(): | |
| if t.get("external_id") == task_id: | |
| task = t | |
| break | |
| if not task: | |
| return None | |
| return { | |
| "status": task.get("status", "unknown"), | |
| "progress": task.get("progress", 0), | |
| "message": task.get("message", ""), | |
| "error": task.get("error", None) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error retrieving task data: {e}") | |
| return None |