File size: 5,238 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import logging
from datetime import datetime, timezone
from backend.services.task_store_service import task_store

logger = logging.getLogger(__name__)

def update_task_status(task_id: str, status: str, message: str = None, progress: int = None):
    """
    Update the status of a task in the task store.
    If the task doesn't exist, create it first.
    
    Args:
        task_id: ID of the task to update
        status: New status (PENDING, PROCESSING, COMPLETED, FAILED)
        message: Optional status message
        progress: Optional progress percentage (0-100)
    """
    try:
        # Check if task exists first
        task = task_store.get_task(task_id)
        
        if not task:
            # If task doesn't exist, create it first
            logger.info(f"Creating missing task {task_id} with status {status}")
            create_task(task_id, "auto_created", message or f"Auto-created task with status {status}")
        
        logger.info(f"UPDATING_TASK: id={task_id}, status={status}, progress={progress}, message={message}")
        # Update task with new status
        task_store.update_task(
            task_id=task_id,
            status=status,
            error=message if status == "FAILED" else None,
            progress=progress
        )
        
        # Add message to task if provided
        if message and status != "FAILED":
            task = task_store.get_task(task_id)
            if task:
                task["message"] = message
                
        logger.debug(f"Updated task {task_id} status to {status}" + 
                    (f" with progress {progress}%" if progress is not None else "") +
                    (f": {message}" if message else ""))
    except Exception as e:
        logger.error(f"Error updating task status: {str(e)}")
        # Don't propagate the exception - we want the process to continue even if task update fails

def create_task(task_id: str, task_type: str, message: str = ""):
    """
    Create a new task in the task store with a predefined ID.
    If a task with the given ID already exists, update its properties instead.
    
    Args:
        task_id: ID to use for the task
        task_type: Type of task
        message: Initial status message
        
    Returns:
        The task ID
    """
    try:
        # Check if task already exists
        existing_task = task_store.get_task(task_id)
        
        if existing_task:
            # If it exists, just update its properties
            existing_task["task_type"] = task_type
            existing_task["message"] = message
            existing_task["update_timestamp"] = datetime.now(timezone.utc).isoformat()
            logger.info(f"Updated existing task {task_id} of type {task_type}")
            return task_id
            
        # The task_store.create_task generates its own ID, so we need to handle our predefined ID
        generated_id = task_store.create_task(task_type=task_type, params={"external_id": task_id})
        
        # Store our task_id in the task store
        task = task_store.get_task(generated_id)
        if task:
            # Map our task_id to the generated one
            task["external_id"] = task_id
            task["message"] = message
            
            # Also add a duplicate entry with our ID
            task_store.tasks[task_id] = task_store.tasks[generated_id]
            
            logger.info(f"Created new task {task_id} of type {task_type}")
            
        return task_id
    except Exception as e:
        logger.error(f"Error creating task: {str(e)}")
        # Create a minimal task entry directly in the task store as a last resort
        try:
            task_store.tasks[task_id] = {
                "id": task_id,
                "task_type": task_type,
                "status": "PENDING",
                "message": message,
                "creation_timestamp": datetime.now(timezone.utc).isoformat(),
                "external_id": task_id
            }
            logger.info(f"Created fallback task entry for {task_id}")
        except Exception as inner_e:
            logger.error(f"Failed to create fallback task: {str(inner_e)}")
        
        return task_id

def get_task_data(task_id: str):
    """
    Get task data from task store with error handling.
    
    Args:
        task_id: ID of the task to retrieve
        
    Returns:
        Dictionary with task data or None if not found
    """
    try:
        task = task_store.get_task(task_id)
        
        # If task not found directly, check if it's mapped in another task
        if not task:
            # Check all tasks to see if this ID is stored as an external_id
            for t in task_store.tasks.values():
                if t.get("external_id") == task_id:
                    task = t
                    break
        
        if not task:
            return None
            
        return {
            "status": task.get("status", "unknown"),
            "progress": task.get("progress", 0),
            "message": task.get("message", ""),
            "error": task.get("error", None)
        }
    except Exception as e:
        logger.error(f"Error retrieving task data: {e}")
        return None