AgentGraph / backend /services /task_service.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
import logging
from datetime import datetime, timezone
from backend.services.task_store_service import task_store
logger = logging.getLogger(__name__)
def update_task_status(task_id: str, status: str, message: str = None, progress: int = None):
"""
Update the status of a task in the task store.
If the task doesn't exist, create it first.
Args:
task_id: ID of the task to update
status: New status (PENDING, PROCESSING, COMPLETED, FAILED)
message: Optional status message
progress: Optional progress percentage (0-100)
"""
try:
# Check if task exists first
task = task_store.get_task(task_id)
if not task:
# If task doesn't exist, create it first
logger.info(f"Creating missing task {task_id} with status {status}")
create_task(task_id, "auto_created", message or f"Auto-created task with status {status}")
logger.info(f"UPDATING_TASK: id={task_id}, status={status}, progress={progress}, message={message}")
# Update task with new status
task_store.update_task(
task_id=task_id,
status=status,
error=message if status == "FAILED" else None,
progress=progress
)
# Add message to task if provided
if message and status != "FAILED":
task = task_store.get_task(task_id)
if task:
task["message"] = message
logger.debug(f"Updated task {task_id} status to {status}" +
(f" with progress {progress}%" if progress is not None else "") +
(f": {message}" if message else ""))
except Exception as e:
logger.error(f"Error updating task status: {str(e)}")
# Don't propagate the exception - we want the process to continue even if task update fails
def create_task(task_id: str, task_type: str, message: str = ""):
"""
Create a new task in the task store with a predefined ID.
If a task with the given ID already exists, update its properties instead.
Args:
task_id: ID to use for the task
task_type: Type of task
message: Initial status message
Returns:
The task ID
"""
try:
# Check if task already exists
existing_task = task_store.get_task(task_id)
if existing_task:
# If it exists, just update its properties
existing_task["task_type"] = task_type
existing_task["message"] = message
existing_task["update_timestamp"] = datetime.now(timezone.utc).isoformat()
logger.info(f"Updated existing task {task_id} of type {task_type}")
return task_id
# The task_store.create_task generates its own ID, so we need to handle our predefined ID
generated_id = task_store.create_task(task_type=task_type, params={"external_id": task_id})
# Store our task_id in the task store
task = task_store.get_task(generated_id)
if task:
# Map our task_id to the generated one
task["external_id"] = task_id
task["message"] = message
# Also add a duplicate entry with our ID
task_store.tasks[task_id] = task_store.tasks[generated_id]
logger.info(f"Created new task {task_id} of type {task_type}")
return task_id
except Exception as e:
logger.error(f"Error creating task: {str(e)}")
# Create a minimal task entry directly in the task store as a last resort
try:
task_store.tasks[task_id] = {
"id": task_id,
"task_type": task_type,
"status": "PENDING",
"message": message,
"creation_timestamp": datetime.now(timezone.utc).isoformat(),
"external_id": task_id
}
logger.info(f"Created fallback task entry for {task_id}")
except Exception as inner_e:
logger.error(f"Failed to create fallback task: {str(inner_e)}")
return task_id
def get_task_data(task_id: str):
"""
Get task data from task store with error handling.
Args:
task_id: ID of the task to retrieve
Returns:
Dictionary with task data or None if not found
"""
try:
task = task_store.get_task(task_id)
# If task not found directly, check if it's mapped in another task
if not task:
# Check all tasks to see if this ID is stored as an external_id
for t in task_store.tasks.values():
if t.get("external_id") == task_id:
task = t
break
if not task:
return None
return {
"status": task.get("status", "unknown"),
"progress": task.get("progress", 0),
"message": task.get("message", ""),
"error": task.get("error", None)
}
except Exception as e:
logger.error(f"Error retrieving task data: {e}")
return None