File size: 3,297 Bytes
1905805 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | '''
Global processes manager
Enables expensive processes used in one place to be reused elsewhere
For example: Kobold server shared between STT and T2T operation implementation
'''
import logging
from enum import Enum
from utils.helpers.singleton import Singleton
from .error import UnknownProcessError, UnloadedProcessError
class ProcessType(Enum):
KOBOLD = "kobold"
class ProcessManager(metaclass=Singleton):
loaded_processes = dict()
'''Perform initial load'''
async def load(self, process_type: ProcessType):
logging.info("Loading process by type {}".format(process_type.value))
match process_type:
case ProcessType.KOBOLD:
from .processes.koboldcpp import KoboldCPPProcess
self.loaded_processes[ProcessType.KOBOLD] = KoboldCPPProcess()
await self.loaded_processes[ProcessType.KOBOLD].reload()
case _:
raise UnknownProcessError(process_type)
'''Reload any process where reload_signal is True'''
async def reload(self):
for process_type in self.loaded_processes:
if self.loaded_processes[process_type] and self.loaded_processes[process_type].reload_signal:
logging.info("Reloading process {}".format(self.loaded_processes[process_type].id))
await self.loaded_processes[process_type].reload()
'''Unload any process where unload_signal is True'''
async def unload(self):
for process_type in self.loaded_processes:
if self.loaded_processes[process_type] and self.loaded_processes[process_type].unload_signal:
logging.info("Unloading process {}".format(self.loaded_processes[process_type].id))
await self.loaded_processes[process_type].unload()
async def link(self, link_id: str, process_type: ProcessType):
if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
await self.load(process_type)
await self.loaded_processes[process_type].link(link_id)
async def unlink(self, link_id: str, process_type: ProcessType):
if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
raise UnloadedProcessError(process_type.value)
await self.loaded_processes[process_type].unlink(link_id)
def signal_reload(self, process_type: ProcessType):
if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
raise UnloadedProcessError(process_type.value)
self.loaded_processes[process_type].reload_signal = True
def signal_unload(self, process_type: ProcessType):
if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
raise UnloadedProcessError(process_type.value)
self.loaded_processes[process_type].unload_signal = True
def get_process(self, process_type: ProcessType):
if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
raise UnloadedProcessError(process_type.value)
return self.loaded_processes[process_type] |