admaker / services /queue_manager.py
karthikeya1212's picture
Update services/queue_manager.py
22ef263 verified
# import asyncio
# import uuid
# from typing import Dict, Any, Optional
# from enum import Enum
# from pipeline.pipeline import run_pipeline
# # ---------------------------
# # Task status enum
# # ---------------------------
# class TaskStatus(str, Enum):
# PENDING = "pending"
# RUNNING = "running"
# WAITING_CONFIRMATION = "waiting_for_confirmation"
# CONFIRMED = "confirmed"
# COMPLETED = "completed"
# FAILED = "failed"
# # ---------------------------
# # Globals
# # ---------------------------
# tasks: Dict[str, Dict[str, Any]] = {}
# task_queue = asyncio.Queue()
# pending_confirmations: Dict[str, asyncio.Event] = {}
# # ---------------------------
# # Add task
# # ---------------------------
# async def add_task(idea: str) -> str:
# task_id = str(uuid.uuid4())
# confirmation_event = asyncio.Event()
# tasks[task_id] = {
# "id": task_id,
# "idea": idea,
# "status": TaskStatus.PENDING,
# "result": {},
# "confirmation_required": False
# }
# pending_confirmations[task_id] = confirmation_event
# await task_queue.put(task_id)
# # Start the pipeline immediately in background
# asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event))
# print(f"🧩 Task added and pipeline started: {task_id}")
# return task_id
# # ---------------------------
# # Confirm task
# # ---------------------------
# async def confirm_task(task_id: str):
# task = tasks.get(task_id)
# if not task:
# return {"error": "Invalid task ID"}
# if task["status"] != TaskStatus.WAITING_CONFIRMATION:
# return {"error": "Task is not waiting for confirmation"}
# event = pending_confirmations.get(task_id)
# if event:
# event.set()
# return {"message": f"Task {task_id} confirmed"}
# # ---------------------------
# # Get task status
# # ---------------------------
# def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
# return tasks.get(task_id)
# # ---------------------------
# # Worker (optional)
# # ---------------------------
# def start_worker():
# print("⚙️ Worker loop not required, pipeline runs per task")
import asyncio
import uuid
from typing import Dict, Any, Optional
from enum import Enum
from pipeline.pipeline import run_pipeline
# ---------------------------
# Task status enum
# ---------------------------
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
WAITING_CONFIRMATION = "waiting_for_confirmation"
CONFIRMED = "confirmed"
COMPLETED = "completed"
FAILED = "failed"
# ---------------------------
# Globals
# ---------------------------
tasks: Dict[str, Dict[str, Any]] = {}
task_queue = asyncio.Queue()
pending_confirmations: Dict[str, asyncio.Event] = {}
# ---------------------------
# Add task
# ---------------------------
async def add_task(idea: str) -> str:
task_id = str(uuid.uuid4())
confirmation_event = asyncio.Event()
tasks[task_id] = {
"id": task_id,
"idea": idea,
"status": TaskStatus.PENDING.value,
"result": {},
"confirmation_required": False
}
pending_confirmations[task_id] = confirmation_event
await task_queue.put(task_id)
# Start the pipeline immediately in background
asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event))
print(f"🧩 Task added and pipeline started: {task_id}")
return task_id
# ---------------------------
# Confirm task
# ---------------------------
async def confirm_task(task_id: str):
task = tasks.get(task_id)
if not task:
return {"error": "Invalid task ID"}
# compare to the enum .value (task status stored as string)
if task["status"] != TaskStatus.WAITING_CONFIRMATION.value:
return {"error": "Task is not waiting for confirmation"}
event = pending_confirmations.get(task_id)
if event:
event.set()
return {"message": f"Task {task_id} confirmed"}
# ---------------------------
# Get task status
# ---------------------------
def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
return tasks.get(task_id)
# ---------------------------
# Worker (optional)
# ---------------------------
def start_worker():
print("⚙️ Worker loop not required, pipeline runs per task")