ehl0wr0ld Rafael Uzarowski commited on
Commit
32aad59
·
unverified ·
1 Parent(s): 075a337

Scheduler wait_for_task and find_task_by_name (#388)

Browse files

* feat: scheduler.wait_for_task tool and context param for scheduler.run_task tool

* fix: Tasks in same context are not waitable

* feat: scheduler.find_task_by_name

* fix: find_task_by_name case insensitive

---------

Co-authored-by: Rafael Uzarowski <uzarowski.rafael@proton.me>

prompts/default/agent.system.tool.scheduler.md CHANGED
@@ -11,10 +11,12 @@ dedicated_context flag then the task will run in the chat it was created in incl
11
  There are manual and automatically executed tasks.
12
  Automatic execution happens by a schedule defined when creating the task.
13
 
 
 
14
  ### Important instructions
15
  When a task is scheduled or planned, do not manually run it, if you have no more tasks, respond to user.
16
  Be careful not to create recursive prompt, do not send a message that would make the agent schedule more tasks, no need to mention the interval in message, just the objective.
17
-
18
 
19
  ### Types of scheduler tasks
20
  There are 3 types of scheduler tasks:
@@ -34,7 +36,7 @@ This type of task is run manually and does not follow any schedule. It can be ru
34
 
35
  #### scheduler:list_tasks
36
  List all tasks present in the system with their 'uuid', 'name', 'type', 'state', 'schedule' and 'next_run'.
37
- All runnable tasks can be listed and filtered here. The arbuments a filter fields.
38
 
39
  ##### Arguments:
40
  * state: list(str) (Optional) - The state filter, one of "idle", "running", "disabled", "error". To only show tasks in given state.
@@ -59,6 +61,26 @@ All runnable tasks can be listed and filtered here. The arbuments a filter field
59
  ~~~
60
 
61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
  #### scheduler:show_task
63
  Show task details for scheduler task with the given uuid.
64
 
@@ -84,9 +106,11 @@ Execute a task manually which is not in "running" state
84
  This can be used to trigger tasks manually.
85
  Normally you should only "run" tasks manually if they are in the "idle" state.
86
  It is also advised to only run "adhoc" tasks manually but every task type can be triggered by this tool.
 
87
 
88
  ##### Arguments:
89
  * uuid: string - The uuid of the task to run. Can be retrieved for example from "scheduler:tasks_list"
 
90
 
91
  ##### Usage (execute task with uuid "xyz-123"):
92
  ~~~json
@@ -97,6 +121,7 @@ It is also advised to only run "adhoc" tasks manually but every task type can be
97
  "tool_name": "scheduler:run_task",
98
  "tool_args": {
99
  "uuid": "xyz-123",
 
100
  }
101
  }
102
  ~~~
@@ -218,3 +243,24 @@ The planned type of tasks is being run by a fixed plan, a list of datetimes that
218
  }
219
  }
220
  ~~~
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  There are manual and automatically executed tasks.
12
  Automatic execution happens by a schedule defined when creating the task.
13
 
14
+ Tasks are run asynchronously. If you need to wait for a running task's completion or need the result of the last task run, use the scheduler:wait_for_task tool. It will wait for the task completion in case the task is currently running and will provide the result of the last execution.
15
+
16
  ### Important instructions
17
  When a task is scheduled or planned, do not manually run it, if you have no more tasks, respond to user.
18
  Be careful not to create recursive prompt, do not send a message that would make the agent schedule more tasks, no need to mention the interval in message, just the objective.
19
+ !!! When the user asks you to execute a task, first check if the task already exists and do not create a new task for execution. Execute the existing task instead. If the task in question does not exist ask the user what action to take. Never create tasks if asked to execute a task.
20
 
21
  ### Types of scheduler tasks
22
  There are 3 types of scheduler tasks:
 
36
 
37
  #### scheduler:list_tasks
38
  List all tasks present in the system with their 'uuid', 'name', 'type', 'state', 'schedule' and 'next_run'.
39
+ All runnable tasks can be listed and filtered here. The arguments are filter fields.
40
 
41
  ##### Arguments:
42
  * state: list(str) (Optional) - The state filter, one of "idle", "running", "disabled", "error". To only show tasks in given state.
 
61
  ~~~
62
 
63
 
64
+ #### scheduler:find_task_by_name
65
+ List all tasks whose name is matching partially or fully the provided name parameter.
66
+
67
+ ##### Arguments:
68
+ * name: str - The task name to look for
69
+
70
+ ##### Usage:
71
+ ~~~json
72
+ {
73
+ "thoughts": [
74
+ "I must look for tasks with name XYZ"
75
+ ],
76
+ "tool_name": "scheduler:find_task_by_name",
77
+ "tool_args": {
78
+ "name": "XYZ"
79
+ }
80
+ }
81
+ ~~~
82
+
83
+
84
  #### scheduler:show_task
85
  Show task details for scheduler task with the given uuid.
86
 
 
106
  This can be used to trigger tasks manually.
107
  Normally you should only "run" tasks manually if they are in the "idle" state.
108
  It is also advised to only run "adhoc" tasks manually but every task type can be triggered by this tool.
109
+ You can pass input data in text form as the "context" argument. The context will then be prepended to the task prompt when executed. This way you can pass for example result of one task as the input of another task or provide additional information specific to this one task run.
110
 
111
  ##### Arguments:
112
  * uuid: string - The uuid of the task to run. Can be retrieved for example from "scheduler:tasks_list"
113
+ * context: (Optional) string - The context that will be prepended to the actual task prompt as contextual information.
114
 
115
  ##### Usage (execute task with uuid "xyz-123"):
116
  ~~~json
 
121
  "tool_name": "scheduler:run_task",
122
  "tool_args": {
123
  "uuid": "xyz-123",
124
+ "context": "This text is useful to execute the task more precisely"
125
  }
126
  }
127
  ~~~
 
243
  }
244
  }
245
  ~~~
246
+
247
+
248
+ #### scheduler:wait_for_task
249
+ Wait for the completion of a scheduler task identified by the uuid argument and return the result of last execution of the task.
250
+ Attention: You can only wait for tasks running in a different chat context (dedicated). Tasks with dedicated_context=False can not be waited for.
251
+
252
+ ##### Arguments:
253
+ * uuid: string - The uuid of the task to wait for. Can be retrieved for example from "scheduler:tasks_list"
254
+
255
+ ##### Usage (wait for task with uuid "xyz-123"):
256
+ ~~~json
257
+ {
258
+ "thoughts": [
259
+ "I need the most current result of the task xyz-123",
260
+ ],
261
+ "tool_name": "scheduler:wait_for_task",
262
+ "tool_args": {
263
+ "uuid": "xyz-123",
264
+ }
265
+ }
266
+ ~~~
python/helpers/task_scheduler.py CHANGED
@@ -579,6 +579,10 @@ class SchedulerTaskList(BaseModel):
579
  with self._lock:
580
  return next((task for task in self.tasks if task.name == name), None)
581
 
 
 
 
 
582
  async def remove_task_by_uuid(self, task_uuid: str) -> "SchedulerTaskList":
583
  with self._lock:
584
  self.tasks = [task for task in self.tasks if task.uuid != task_uuid]
@@ -622,7 +626,7 @@ class TaskScheduler:
622
 
623
  async def add_task(self, task: Union[ScheduledTask, AdHocTask, PlannedTask]) -> "TaskScheduler":
624
  await self._tasks.add_task(task)
625
- ctx = await self._get_chat_context(task) # invoke context creation
626
  return self
627
 
628
  async def remove_task_by_uuid(self, task_uuid: str) -> "TaskScheduler":
@@ -639,11 +643,14 @@ class TaskScheduler:
639
  def get_task_by_name(self, name: str) -> Union[ScheduledTask, AdHocTask, PlannedTask] | None:
640
  return self._tasks.get_task_by_name(name)
641
 
 
 
 
642
  async def tick(self):
643
  for task in await self._tasks.get_due_tasks():
644
  await self._run_task(task)
645
 
646
- async def run_task_by_uuid(self, task_uuid: str):
647
  # First reload tasks to ensure we have the latest state
648
  await self._tasks.reload()
649
 
@@ -671,13 +678,13 @@ class TaskScheduler:
671
  raise ValueError(f"Task with UUID '{task_uuid}' not found after state reset")
672
 
673
  # Run the task
674
- await self._run_task(task)
675
 
676
- async def run_task_by_name(self, name: str):
677
  task = self._tasks.get_task_by_name(name)
678
  if task is None:
679
  raise ValueError(f"Task with name {name} not found")
680
- await self._run_task(task)
681
 
682
  async def save(self):
683
  await self._tasks.save()
@@ -737,9 +744,9 @@ class TaskScheduler:
737
  raise ValueError(f"Context ID mismatch for task {task.name}: context {context.id} != task {task.context_id}")
738
  save_tmp_chat(context)
739
 
740
- async def _run_task(self, task: Union[ScheduledTask, AdHocTask, PlannedTask]):
741
 
742
- async def _run_task_wrapper(task_uuid: str):
743
 
744
  # preflight checks with a snapshot of the task
745
  task_snapshot: Union[ScheduledTask, AdHocTask, PlannedTask] | None = self.get_task_by_uuid(task_uuid)
@@ -799,18 +806,24 @@ class TaskScheduler:
799
  for filename in attachment_filenames:
800
  self._printer.print(f"- {filename}")
801
 
 
 
 
 
 
 
802
  # Log the message with message_id and attachments
803
  context.log.log(
804
  type="user",
805
  heading="User message",
806
- content=current_task.prompt,
807
  kvps={"attachments": attachment_filenames},
808
  id=str(uuid.uuid4()),
809
  )
810
 
811
  agent.hist_add_user_message(
812
  UserMessage(
813
- message=current_task.prompt,
814
  system_message=[current_task.system_prompt],
815
  attachments=attachment_filenames))
816
 
@@ -854,7 +867,7 @@ class TaskScheduler:
854
  await self._tasks.save()
855
 
856
  deferred_task = DeferredTask(thread_name=self.__class__.__name__)
857
- deferred_task.start_task(_run_task_wrapper, task.uuid)
858
 
859
  # Ensure background execution doesn't exit immediately on async await, especially in script contexts
860
  # This helps prevent premature exits when running from non-event-loop contexts
 
579
  with self._lock:
580
  return next((task for task in self.tasks if task.name == name), None)
581
 
582
+ def find_task_by_name(self, name: str) -> list[Union[ScheduledTask, AdHocTask, PlannedTask]]:
583
+ with self._lock:
584
+ return [task for task in self.tasks if name.lower() in task.name.lower()]
585
+
586
  async def remove_task_by_uuid(self, task_uuid: str) -> "SchedulerTaskList":
587
  with self._lock:
588
  self.tasks = [task for task in self.tasks if task.uuid != task_uuid]
 
626
 
627
  async def add_task(self, task: Union[ScheduledTask, AdHocTask, PlannedTask]) -> "TaskScheduler":
628
  await self._tasks.add_task(task)
629
+ ctx = await self._get_chat_context(task) # invoke context creation
630
  return self
631
 
632
  async def remove_task_by_uuid(self, task_uuid: str) -> "TaskScheduler":
 
643
  def get_task_by_name(self, name: str) -> Union[ScheduledTask, AdHocTask, PlannedTask] | None:
644
  return self._tasks.get_task_by_name(name)
645
 
646
+ def find_task_by_name(self, name: str) -> list[Union[ScheduledTask, AdHocTask, PlannedTask]]:
647
+ return self._tasks.find_task_by_name(name)
648
+
649
  async def tick(self):
650
  for task in await self._tasks.get_due_tasks():
651
  await self._run_task(task)
652
 
653
+ async def run_task_by_uuid(self, task_uuid: str, task_context: str | None = None):
654
  # First reload tasks to ensure we have the latest state
655
  await self._tasks.reload()
656
 
 
678
  raise ValueError(f"Task with UUID '{task_uuid}' not found after state reset")
679
 
680
  # Run the task
681
+ await self._run_task(task, task_context)
682
 
683
+ async def run_task_by_name(self, name: str, task_context: str | None = None):
684
  task = self._tasks.get_task_by_name(name)
685
  if task is None:
686
  raise ValueError(f"Task with name {name} not found")
687
+ await self._run_task(task, task_context)
688
 
689
  async def save(self):
690
  await self._tasks.save()
 
744
  raise ValueError(f"Context ID mismatch for task {task.name}: context {context.id} != task {task.context_id}")
745
  save_tmp_chat(context)
746
 
747
+ async def _run_task(self, task: Union[ScheduledTask, AdHocTask, PlannedTask], task_context: str | None = None):
748
 
749
+ async def _run_task_wrapper(task_uuid: str, task_context: str | None = None):
750
 
751
  # preflight checks with a snapshot of the task
752
  task_snapshot: Union[ScheduledTask, AdHocTask, PlannedTask] | None = self.get_task_by_uuid(task_uuid)
 
806
  for filename in attachment_filenames:
807
  self._printer.print(f"- {filename}")
808
 
809
+ task_prompt = f"# Starting scheduler task '{current_task.name}' ({current_task.uuid})"
810
+ if task_context:
811
+ task_prompt = f"## Context:\n{task_context}\n\n## Task:\n{current_task.prompt}"
812
+ else:
813
+ task_prompt = f"## Task:\n{current_task.prompt}"
814
+
815
  # Log the message with message_id and attachments
816
  context.log.log(
817
  type="user",
818
  heading="User message",
819
+ content=task_prompt,
820
  kvps={"attachments": attachment_filenames},
821
  id=str(uuid.uuid4()),
822
  )
823
 
824
  agent.hist_add_user_message(
825
  UserMessage(
826
+ message=task_prompt,
827
  system_message=[current_task.system_prompt],
828
  attachments=attachment_filenames))
829
 
 
867
  await self._tasks.save()
868
 
869
  deferred_task = DeferredTask(thread_name=self.__class__.__name__)
870
+ deferred_task.start_task(_run_task_wrapper, task.uuid, task_context)
871
 
872
  # Ensure background execution doesn't exit immediately on async await, especially in script contexts
873
  # This helps prevent premature exits when running from non-event-loop contexts
python/tools/scheduler.py CHANGED
@@ -1,20 +1,26 @@
 
1
  from datetime import datetime
2
  import json
3
  import random
4
  import re
5
  from python.helpers.tool import Tool, Response
6
  from python.helpers.task_scheduler import (
7
- TaskScheduler, ScheduledTask, AdHocTask, PlannedTask, serialize_task, TaskState, TaskSchedule, TaskPlan, parse_datetime
 
8
  )
9
  from agent import AgentContext
10
  from python.helpers import persist_chat
11
 
 
 
12
 
13
  class SchedulerTool(Tool):
14
 
15
  async def execute(self, **kwargs):
16
  if self.method == "list_tasks":
17
  return await self.list_tasks(**kwargs)
 
 
18
  elif self.method == "show_task":
19
  return await self.show_task(**kwargs)
20
  elif self.method == "run_task":
@@ -27,6 +33,8 @@ class SchedulerTool(Tool):
27
  return await self.create_adhoc_task(**kwargs)
28
  elif self.method == "create_planned_task":
29
  return await self.create_planned_task(**kwargs)
 
 
30
  else:
31
  return Response(message=f"Unknown method '{self.name}:{self.method}'", break_loop=False)
32
 
@@ -51,6 +59,15 @@ class SchedulerTool(Tool):
51
 
52
  return Response(message=json.dumps(filtered_tasks, indent=4), break_loop=False)
53
 
 
 
 
 
 
 
 
 
 
54
  async def show_task(self, **kwargs) -> Response:
55
  task_uuid: str = kwargs.get("uuid", None)
56
  if not task_uuid:
@@ -64,12 +81,13 @@ class SchedulerTool(Tool):
64
  task_uuid: str = kwargs.get("uuid", None)
65
  if not task_uuid:
66
  return Response(message="Task UUID is required", break_loop=False)
 
67
  task: ScheduledTask | AdHocTask | PlannedTask | None = TaskScheduler.get().get_task_by_uuid(task_uuid)
68
  if not task:
69
  return Response(message=f"Task not found: {task_uuid}", break_loop=False)
70
- await TaskScheduler.get().run_task_by_uuid(task_uuid)
71
  if task.context_id == self.agent.context.id:
72
- break_loop = True # break loop if task is running in the same context, otherwise it would start two conversations in one window
73
  else:
74
  break_loop = False
75
  return Response(message=f"Task started: {task_uuid}", break_loop=break_loop)
@@ -199,3 +217,37 @@ class SchedulerTool(Tool):
199
  )
200
  await TaskScheduler.get().add_task(task)
201
  return Response(message=f"Planned task '{name}' created: {task.uuid}", break_loop=False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
  from datetime import datetime
3
  import json
4
  import random
5
  import re
6
  from python.helpers.tool import Tool, Response
7
  from python.helpers.task_scheduler import (
8
+ TaskScheduler, ScheduledTask, AdHocTask, PlannedTask,
9
+ serialize_task, TaskState, TaskSchedule, TaskPlan, parse_datetime, serialize_datetime
10
  )
11
  from agent import AgentContext
12
  from python.helpers import persist_chat
13
 
14
+ DEFAULT_WAIT_TIMEOUT = 300
15
+
16
 
17
  class SchedulerTool(Tool):
18
 
19
  async def execute(self, **kwargs):
20
  if self.method == "list_tasks":
21
  return await self.list_tasks(**kwargs)
22
+ elif self.method == "find_task_by_name":
23
+ return await self.find_task_by_name(**kwargs)
24
  elif self.method == "show_task":
25
  return await self.show_task(**kwargs)
26
  elif self.method == "run_task":
 
33
  return await self.create_adhoc_task(**kwargs)
34
  elif self.method == "create_planned_task":
35
  return await self.create_planned_task(**kwargs)
36
+ elif self.method == "wait_for_task":
37
+ return await self.wait_for_task(**kwargs)
38
  else:
39
  return Response(message=f"Unknown method '{self.name}:{self.method}'", break_loop=False)
40
 
 
59
 
60
  return Response(message=json.dumps(filtered_tasks, indent=4), break_loop=False)
61
 
62
+ async def find_task_by_name(self, **kwargs) -> Response:
63
+ name: str = kwargs.get("name", None)
64
+ if not name:
65
+ return Response(message="Task name is required", break_loop=False)
66
+ tasks: list[ScheduledTask | AdHocTask | PlannedTask] = TaskScheduler.get().find_task_by_name(name)
67
+ if not tasks:
68
+ return Response(message=f"Task not found: {name}", break_loop=False)
69
+ return Response(message=json.dumps([serialize_task(task) for task in tasks], indent=4), break_loop=False)
70
+
71
  async def show_task(self, **kwargs) -> Response:
72
  task_uuid: str = kwargs.get("uuid", None)
73
  if not task_uuid:
 
81
  task_uuid: str = kwargs.get("uuid", None)
82
  if not task_uuid:
83
  return Response(message="Task UUID is required", break_loop=False)
84
+ task_context: str | None = kwargs.get("context", None)
85
  task: ScheduledTask | AdHocTask | PlannedTask | None = TaskScheduler.get().get_task_by_uuid(task_uuid)
86
  if not task:
87
  return Response(message=f"Task not found: {task_uuid}", break_loop=False)
88
+ await TaskScheduler.get().run_task_by_uuid(task_uuid, task_context)
89
  if task.context_id == self.agent.context.id:
90
+ break_loop = True # break loop if task is running in the same context, otherwise it would start two conversations in one window
91
  else:
92
  break_loop = False
93
  return Response(message=f"Task started: {task_uuid}", break_loop=break_loop)
 
217
  )
218
  await TaskScheduler.get().add_task(task)
219
  return Response(message=f"Planned task '{name}' created: {task.uuid}", break_loop=False)
220
+
221
+ async def wait_for_task(self, **kwargs) -> Response:
222
+ task_uuid: str = kwargs.get("uuid", None)
223
+ if not task_uuid:
224
+ return Response(message="Task UUID is required", break_loop=False)
225
+
226
+ scheduler = TaskScheduler.get()
227
+ task: ScheduledTask | AdHocTask | PlannedTask | None = scheduler.get_task_by_uuid(task_uuid)
228
+ if not task:
229
+ return Response(message=f"Task not found: {task_uuid}", break_loop=False)
230
+
231
+ if task.context_id == self.agent.context.id:
232
+ return Response(message="You can only wait for tasks running in a different chat context (dedicated_context=True).", break_loop=False)
233
+
234
+ done = False
235
+ elapsed = 0
236
+ while not done:
237
+ await scheduler.reload()
238
+ task = scheduler.get_task_by_uuid(task_uuid)
239
+ if not task:
240
+ return Response(message=f"Task not found: {task_uuid}", break_loop=False)
241
+
242
+ if task.state == TaskState.RUNNING:
243
+ await asyncio.sleep(1)
244
+ elapsed += 1
245
+ if elapsed > DEFAULT_WAIT_TIMEOUT:
246
+ return Response(message=f"Task wait timeout ({DEFAULT_WAIT_TIMEOUT} seconds): {task_uuid}", break_loop=False)
247
+ else:
248
+ done = True
249
+
250
+ return Response(
251
+ message=f"*Task*: {task_uuid}\n*State*: {task.state}\n*Last run*: {serialize_datetime(task.last_run)}\n*Result*:\n{task.last_result}",
252
+ break_loop=False
253
+ )