| ''' |
| Global processes manager |
| |
| Enables expensive processes used in one place to be reused elsewhere |
| For example: Kobold server shared between STT and T2T operation implementation |
| ''' |
|
|
| import logging |
| from enum import Enum |
|
|
| from utils.helpers.singleton import Singleton |
|
|
| from .error import UnknownProcessError, UnloadedProcessError |
|
|
| class ProcessType(Enum): |
| KOBOLD = "kobold" |
|
|
| class ProcessManager(metaclass=Singleton): |
| loaded_processes = dict() |
| |
| '''Perform initial load''' |
| async def load(self, process_type: ProcessType): |
| logging.info("Loading process by type {}".format(process_type.value)) |
| match process_type: |
| case ProcessType.KOBOLD: |
| from .processes.koboldcpp import KoboldCPPProcess |
| self.loaded_processes[ProcessType.KOBOLD] = KoboldCPPProcess() |
| await self.loaded_processes[ProcessType.KOBOLD].reload() |
| case _: |
| raise UnknownProcessError(process_type) |
| |
| '''Reload any process where reload_signal is True''' |
| async def reload(self): |
| for process_type in self.loaded_processes: |
| if self.loaded_processes[process_type] and self.loaded_processes[process_type].reload_signal: |
| logging.info("Reloading process {}".format(self.loaded_processes[process_type].id)) |
| await self.loaded_processes[process_type].reload() |
| |
| '''Unload any process where unload_signal is True''' |
| async def unload(self): |
| for process_type in self.loaded_processes: |
| if self.loaded_processes[process_type] and self.loaded_processes[process_type].unload_signal: |
| logging.info("Unloading process {}".format(self.loaded_processes[process_type].id)) |
| await self.loaded_processes[process_type].unload() |
| |
| async def link(self, link_id: str, process_type: ProcessType): |
| if not (process_type in self.loaded_processes and self.loaded_processes[process_type]): |
| await self.load(process_type) |
| |
| await self.loaded_processes[process_type].link(link_id) |
| |
| async def unlink(self, link_id: str, process_type: ProcessType): |
| if not (process_type in self.loaded_processes and self.loaded_processes[process_type]): |
| raise UnloadedProcessError(process_type.value) |
| |
| await self.loaded_processes[process_type].unlink(link_id) |
| |
| def signal_reload(self, process_type: ProcessType): |
| if not (process_type in self.loaded_processes and self.loaded_processes[process_type]): |
| raise UnloadedProcessError(process_type.value) |
| |
| self.loaded_processes[process_type].reload_signal = True |
| |
| def signal_unload(self, process_type: ProcessType): |
| if not (process_type in self.loaded_processes and self.loaded_processes[process_type]): |
| raise UnloadedProcessError(process_type.value) |
| |
| self.loaded_processes[process_type].unload_signal = True |
| |
| def get_process(self, process_type: ProcessType): |
| if not (process_type in self.loaded_processes and self.loaded_processes[process_type]): |
| raise UnloadedProcessError(process_type.value) |
| |
| return self.loaded_processes[process_type] |