File size: 3,297 Bytes
1905805
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
'''
Global processes manager

Enables expensive processes used in one place to be reused elsewhere
For example: Kobold server shared between STT and T2T operation implementation
'''

import logging
from enum import Enum

from utils.helpers.singleton import Singleton

from .error import UnknownProcessError, UnloadedProcessError

class ProcessType(Enum):
    KOBOLD = "kobold"

class ProcessManager(metaclass=Singleton):
    loaded_processes = dict()
    
    '''Perform initial load'''
    async def load(self, process_type: ProcessType):
        logging.info("Loading process by type {}".format(process_type.value))
        match process_type:
            case ProcessType.KOBOLD:
                from .processes.koboldcpp import KoboldCPPProcess
                self.loaded_processes[ProcessType.KOBOLD] = KoboldCPPProcess()
                await self.loaded_processes[ProcessType.KOBOLD].reload()
            case _:
                raise UnknownProcessError(process_type)
        
    '''Reload any process where reload_signal is True'''
    async def reload(self):
        for process_type in self.loaded_processes:
            if self.loaded_processes[process_type] and self.loaded_processes[process_type].reload_signal:
                logging.info("Reloading process {}".format(self.loaded_processes[process_type].id))
                await self.loaded_processes[process_type].reload()
        
    '''Unload any process where unload_signal is True'''
    async def unload(self):
        for process_type in self.loaded_processes:
            if self.loaded_processes[process_type] and self.loaded_processes[process_type].unload_signal:
                logging.info("Unloading process {}".format(self.loaded_processes[process_type].id))
                await self.loaded_processes[process_type].unload()
                
    async def link(self, link_id: str, process_type: ProcessType):
        if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
            await self.load(process_type)
            
        await self.loaded_processes[process_type].link(link_id)
        
    async def unlink(self, link_id: str, process_type: ProcessType):
        if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
            raise UnloadedProcessError(process_type.value)
            
        await self.loaded_processes[process_type].unlink(link_id)
    
    def signal_reload(self, process_type: ProcessType):
        if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
            raise UnloadedProcessError(process_type.value)
            
        self.loaded_processes[process_type].reload_signal = True
        
    def signal_unload(self, process_type: ProcessType):
        if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
            raise UnloadedProcessError(process_type.value)
            
        self.loaded_processes[process_type].unload_signal = True
        
    def get_process(self, process_type: ProcessType):
        if not (process_type in self.loaded_processes and self.loaded_processes[process_type]):
            raise UnloadedProcessError(process_type.value)
            
        return self.loaded_processes[process_type]