| import logging | |
| import asyncio | |
| from typing import Dict | |
| logger = logging.getLogger("s2s_server") | |
| class TaskSupervisor: | |
| def __init__(self, session_id: str): | |
| self.session_id = session_id | |
| self._tasks: Dict[str, asyncio.Task] = {} | |
| def register_task(self, name: str, task: asyncio.Task): | |
| # Cancel if already registered | |
| self.cancel_task(name) | |
| self._tasks[name] = task | |
| def cancel_task(self, name: str): | |
| task = self._tasks.get(name) | |
| if task and not task.done(): | |
| logger.info(f"[{self.session_id}] TaskSupervisor: Cancelling task '{name}'") | |
| task.cancel() | |
| if name in self._tasks: | |
| del self._tasks[name] | |
| async def cancel_tasks(self, *names: str): | |
| for name in names: | |
| self.cancel_task(name) | |
| def cancel_all(self): | |
| logger.info(f"[{self.session_id}] TaskSupervisor: Cancelling all tasks ({list(self._tasks.keys())})") | |
| for name in list(self._tasks.keys()): | |
| self.cancel_task(name) | |