realtime / runtime /task_supervisor.py
teganmosi
Deploy Stage 2 decoupled Event-Driven Conversational Runtime architecture
deea75c
Raw
History Blame Contribute Delete
1.03 kB
import logging
import asyncio
from typing import Dict
logger = logging.getLogger("s2s_server")
class TaskSupervisor:
def __init__(self, session_id: str):
self.session_id = session_id
self._tasks: Dict[str, asyncio.Task] = {}
def register_task(self, name: str, task: asyncio.Task):
# Cancel if already registered
self.cancel_task(name)
self._tasks[name] = task
def cancel_task(self, name: str):
task = self._tasks.get(name)
if task and not task.done():
logger.info(f"[{self.session_id}] TaskSupervisor: Cancelling task '{name}'")
task.cancel()
if name in self._tasks:
del self._tasks[name]
async def cancel_tasks(self, *names: str):
for name in names:
self.cancel_task(name)
def cancel_all(self):
logger.info(f"[{self.session_id}] TaskSupervisor: Cancelling all tasks ({list(self._tasks.keys())})")
for name in list(self._tasks.keys()):
self.cancel_task(name)