File size: 4,343 Bytes
22ef263
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4804a2c
 
 
 
756c289
ad35e87
756c289
 
 
4804a2c
 
 
 
 
 
 
 
756c289
 
 
4804a2c
 
ad35e87
4804a2c
756c289
 
 
4804a2c
 
756c289
 
4804a2c
 
 
22ef263
756c289
4804a2c
 
756c289
4804a2c
 
756c289
 
 
 
4804a2c
756c289
 
 
4804a2c
 
 
756c289
22ef263
 
756c289
4804a2c
ad35e87
 
 
756c289
ad35e87
756c289
 
 
4804a2c
 
 
756c289
 
 
4804a2c
22ef263
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# import asyncio
# import uuid
# from typing import Dict, Any, Optional
# from enum import Enum
# from pipeline.pipeline import run_pipeline

# # ---------------------------
# # Task status enum
# # ---------------------------
# class TaskStatus(str, Enum):
#     PENDING = "pending"
#     RUNNING = "running"
#     WAITING_CONFIRMATION = "waiting_for_confirmation"
#     CONFIRMED = "confirmed"
#     COMPLETED = "completed"
#     FAILED = "failed"

# # ---------------------------
# # Globals
# # ---------------------------
# tasks: Dict[str, Dict[str, Any]] = {}
# task_queue = asyncio.Queue()
# pending_confirmations: Dict[str, asyncio.Event] = {}

# # ---------------------------
# # Add task
# # ---------------------------
# async def add_task(idea: str) -> str:
#     task_id = str(uuid.uuid4())
#     confirmation_event = asyncio.Event()

#     tasks[task_id] = {
#         "id": task_id,
#         "idea": idea,
#         "status": TaskStatus.PENDING,
#         "result": {},
#         "confirmation_required": False
#     }
#     pending_confirmations[task_id] = confirmation_event
#     await task_queue.put(task_id)

#     # Start the pipeline immediately in background
#     asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event))
#     print(f"🧩 Task added and pipeline started: {task_id}")
#     return task_id

# # ---------------------------
# # Confirm task
# # ---------------------------
# async def confirm_task(task_id: str):
#     task = tasks.get(task_id)
#     if not task:
#         return {"error": "Invalid task ID"}
#     if task["status"] != TaskStatus.WAITING_CONFIRMATION:
#         return {"error": "Task is not waiting for confirmation"}

#     event = pending_confirmations.get(task_id)
#     if event:
#         event.set()
#     return {"message": f"Task {task_id} confirmed"}

# # ---------------------------
# # Get task status
# # ---------------------------
# def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
#     return tasks.get(task_id)

# # ---------------------------
# # Worker (optional)
# # ---------------------------
# def start_worker():
#     print("⚙️ Worker loop not required, pipeline runs per task")




import asyncio
import uuid
from typing import Dict, Any, Optional
from enum import Enum
from pipeline.pipeline import run_pipeline

# ---------------------------
# Task status enum
# ---------------------------
class TaskStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    WAITING_CONFIRMATION = "waiting_for_confirmation"
    CONFIRMED = "confirmed"
    COMPLETED = "completed"
    FAILED = "failed"

# ---------------------------
# Globals
# ---------------------------
tasks: Dict[str, Dict[str, Any]] = {}
task_queue = asyncio.Queue()
pending_confirmations: Dict[str, asyncio.Event] = {}

# ---------------------------
# Add task
# ---------------------------
async def add_task(idea: str) -> str:
    task_id = str(uuid.uuid4())
    confirmation_event = asyncio.Event()

    tasks[task_id] = {
        "id": task_id,
        "idea": idea,
        "status": TaskStatus.PENDING.value,
        "result": {},
        "confirmation_required": False
    }
    pending_confirmations[task_id] = confirmation_event
    await task_queue.put(task_id)

    # Start the pipeline immediately in background
    asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event))
    print(f"🧩 Task added and pipeline started: {task_id}")
    return task_id

# ---------------------------
# Confirm task
# ---------------------------
async def confirm_task(task_id: str):
    task = tasks.get(task_id)
    if not task:
        return {"error": "Invalid task ID"}
    # compare to the enum .value (task status stored as string)
    if task["status"] != TaskStatus.WAITING_CONFIRMATION.value:
        return {"error": "Task is not waiting for confirmation"}

    event = pending_confirmations.get(task_id)
    if event:
        event.set()
    return {"message": f"Task {task_id} confirmed"}

# ---------------------------
# Get task status
# ---------------------------
def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
    return tasks.get(task_id)

# ---------------------------
# Worker (optional)
# ---------------------------
def start_worker():
    print("⚙️ Worker loop not required, pipeline runs per task")