| import logging |
| import asyncio |
| import uuid |
| import base64 |
| import datetime |
| from typing import Dict, Coroutine, List, Any, Tuple |
| from enum import Enum |
|
|
| from utils.helpers.singleton import Singleton |
| from utils.helpers.iterable import chunk_buffer |
| from utils.helpers.observer import ObserverServer |
|
|
| from utils.config import Config, UnknownField, UnknownFile |
| from utils.prompter import Prompter |
| from utils.prompter.message import ( |
| RawMessage, |
| RequestMessage, |
| ChatMessage, |
| MCPMessage, |
| CustomMessage |
| ) |
| from utils.processes import ProcessManager |
| from utils.operations import ( |
| OperationManager, |
| OpRoles, |
| Operation, |
| UnknownOpType, |
| UnknownOpRole, |
| UnknownOpID, |
| DuplicateFilter, |
| OperationUnloaded, |
| StartActiveError, |
| CloseInactiveError, |
| UsedInactiveError |
| ) |
| from utils.mcp import MCPManager |
|
|
| class NonexistantJobException(Exception): |
| pass |
|
|
| class UnknownJobType(Exception): |
| pass |
|
|
| class JobType(Enum): |
| RESPONSE = 'response' |
| CONTEXT_CLEAR = 'context_clear' |
| CONTEXT_CONFIGURE = "context_configure" |
| CONTEXT_REQUEST_ADD = 'context_request_add' |
| CONTEXT_CONVERSATION_ADD_TEXT = 'context_conversation_add_text' |
| CONTEXT_CONVERSATION_ADD_AUDIO = 'context_conversation_add_audio' |
| CONTEXT_CUSTOM_REGISTER = 'context_custom_register' |
| CONTEXT_CUSTOM_REMOVE = 'context_custom_remove' |
| CONTEXT_CUSTOM_ADD = 'context_custom_add' |
| OPERATION_LOAD = 'operation_load' |
| OPERATION_CONFIG_RELOAD = "operation_reload_from_config" |
| OPERATION_UNLOAD = 'operation_unload' |
| OPERATION_CONFIGURE = 'operation_configure' |
| OPERATION_USE = 'operation_use' |
| CONFIG_LOAD = 'config_load' |
| CONFIG_UPDATE = 'config_update' |
| CONFIG_SAVE = 'config_save' |
| |
| class JAIson(metaclass=Singleton): |
| def __init__(self): |
| self.job_loop: asyncio.Task = None |
| self.job_queue: asyncio.Queue = None |
| self.job_map: Dict[str, Tuple[JobType, Coroutine]] = None |
| self.job_current_id: str = None |
| self.job_current: asyncio.Task = None |
| self.job_skips: dict = None |
| |
| |
| self.tasks_to_clean: List = list() |
| |
| self.event_server: ObserverServer = None |
| |
| self.prompter: Prompter = None |
| self.process_manager: ProcessManager = None |
| self.op_manager: OperationManager = None |
| self.mcp_manager: MCPManager = None |
| |
| async def start(self): |
| logging.info("Starting JAIson application layer.") |
| self.job_queue = asyncio.Queue() |
| self.job_map = dict() |
| self.job_skips = dict() |
| self.job_loop = asyncio.create_task(self._process_job_loop()) |
| |
| self.event_server = ObserverServer() |
| |
| self.prompter = Prompter() |
| await self.prompter.configure(Config().prompter) |
| |
| self.process_manager = ProcessManager() |
| self.op_manager = OperationManager() |
| self.mcp_manager = MCPManager() |
| await self.mcp_manager.start() |
| self.prompter.add_mcp_usage_prompt(self.mcp_manager.get_tooling_prompt(), self.mcp_manager.get_response_prompt()) |
| await self.op_manager.load_operations_from_config() |
| await self.process_manager.reload() |
| logging.info("JAIson application layer has started.") |
| |
| async def stop(self): |
| logging.info("Shutting down JAIson application layer") |
| await self.op_manager.close_operation_all() |
| await self.mcp_manager.close() |
| await self.process_manager.unload() |
| logging.info("JAIson application layer has been shut down") |
| |
| |
| |
| |
| async def create_job(self, job_type: Enum, **kwargs): |
| new_job_id = str(uuid.uuid4()) |
| |
| job_type_enum = JobType(job_type) |
| |
| coro = None |
| if job_type_enum == JobType.RESPONSE: coro = self.response_pipeline(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONTEXT_REQUEST_ADD: coro = self.append_request_context(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONTEXT_CONVERSATION_ADD_TEXT: coro = self.append_conversation_context_text(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONTEXT_CONVERSATION_ADD_AUDIO: coro = self.append_conversation_context_audio(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONTEXT_CLEAR: coro = self.clear_context(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONTEXT_CONFIGURE: coro = self.configure_context(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONTEXT_CUSTOM_REGISTER: coro = self.register_custom_context(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONTEXT_CUSTOM_REMOVE: coro = self.remove_custom_context(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONTEXT_CUSTOM_ADD: coro = self.add_custom_context(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.OPERATION_LOAD: coro = self.load_operations(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.OPERATION_CONFIG_RELOAD: coro = self.load_operations_from_config(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.OPERATION_UNLOAD: coro = self.unload_operations(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.OPERATION_CONFIGURE: coro = self.configure_operations(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.OPERATION_USE: coro = self.use_operation(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONFIG_LOAD: coro = self.load_config(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONFIG_UPDATE: coro = self.update_config(new_job_id, job_type_enum, **kwargs) |
| elif job_type_enum == JobType.CONFIG_SAVE: coro = self.save_config(new_job_id, job_type_enum, **kwargs) |
| self.job_map[new_job_id] = (job_type_enum, coro) |
| |
| await self.job_queue.put(new_job_id) |
| |
| logging.info("Queued new {} job {}".format(job_type_enum.value, new_job_id)) |
| return new_job_id |
| |
| async def cancel_job(self, job_id: str, reason: str = None): |
| if job_id not in self.job_map: raise NonexistantJobException(f"Job {job_id} does not exist or already finished") |
| |
| cancel_message = f"Setting job {job_id} to cancel" |
| if reason: cancel_message += f" because {reason}" |
| logging.info(cancel_message) |
|
|
| if job_id == self.job_current_id: |
| |
| self._clear_current_job(reason=cancel_message) |
| else: |
| |
| |
| self.job_skips[job_id](cancel_message) |
| |
| def _clear_current_job(self, reason: str = None): |
| self.job_map.pop(self.job_current_id, None) |
| self.job_skips.pop(self.job_current_id, None) |
| self.job_current_id = None |
| |
| for task in self.tasks_to_clean: |
| task.cancel(reason) |
| self.tasks_to_clean.clear() |
| |
| if self.job_current is not None: |
| self.job_current.cancel(reason) |
| self.job_current = None |
| |
| |
| async def _process_job_loop(self): |
| while True: |
| try: |
| await self.process_manager.reload() |
| await self.process_manager.unload() |
| |
| self.job_current_id = await self.job_queue.get() |
| job_type, coro = self.job_map[self.job_current_id] |
| |
| if self.job_current_id in self.job_skips: |
| |
| reason = self.job_skips[self.job_current_id] |
| await self._handle_broadcast_error(self.job_current_id, job_type, asyncio.CancelledError(reason)) |
| self._clear_current_job(reason=reason) |
| del coro |
| else: |
| |
| self.job_current = asyncio.create_task(coro) |
| await asyncio.wait([self.job_current]) |
| |
| |
| err = self.job_current.exception() if self.job_current else None |
| if err is not None: |
| logging.warning(f"Job was cancelled due to an error: {err}", exc_info=err) |
| await self._handle_broadcast_error(self.job_current_id, job_type, err) |
| |
| |
| self._clear_current_job() |
| except Exception as err: |
| logging.error("Encountered error in main job processing loop", exc_info=True) |
| await asyncio.sleep(1) |
| |
| |
| |
| def get_loaded_operations(self): |
| op_d = self.op_manager.get_operation_all() |
| for key in op_d: |
| if isinstance(op_d[key], Operation): |
| op_d[key] = op_d[key].op_id |
| elif isinstance(op_d[key], list): |
| op_d[key] = list(map(lambda x: x.op_id, op_d[key])) |
| else: |
| op_d[key] = "unknown" |
| |
| return op_d |
| |
| def get_current_config(self): |
| return Config().get_config_dict() |
| |
| |
| |
| ''' |
| Generate responses from the current contexts. |
| This does not take an input. Context for what to repond to must be added prior to running this. |
| ''' |
| async def response_pipeline( |
| self, |
| job_id: str, |
| job_type: JobType, |
| include_audio: bool = True |
| ): |
| |
| |
| if not self.op_manager.get_operation(OpRoles.TTS): include_audio = False |
| |
| |
| await self._handle_broadcast_start(job_id, job_type, {"include_audio": include_audio}) |
| |
| |
| if self.op_manager.get_operation(OpRoles.MCP): |
| self.prompter.add_mcp_usage_prompt(self.mcp_manager.get_tooling_prompt(), self.mcp_manager.get_response_prompt()) |
| mcp_sys_prompt, mcp_user_prompt = self.prompter.generate_mcp_system_context(), self.prompter.generate_mcp_user_context() |
| tooling_response = "" |
| async for chunk in self.op_manager.use_operation(OpRoles.MCP, {"instruction_prompt": mcp_sys_prompt, "messages": [RawMessage(mcp_user_prompt)]}): |
| tooling_response += chunk['content'] |
|
|
| |
| tool_call_results = await self.mcp_manager.use(tooling_response) |
| |
| |
| self.prompter.add_mcp_results(tool_call_results) |
|
|
| |
| instruction_prompt, history = self.prompter.get_sys_prompt(), self.prompter.get_history() |
| |
| |
| t2t_result = "" |
| async for chunk_out in self.op_manager.use_operation(OpRoles.T2T, {"instruction_prompt": instruction_prompt, "messages": history}): |
| t2t_result += chunk_out["content"] |
| |
| |
| await self._handle_broadcast_event(job_id, job_type, {"instruction_prompt": instruction_prompt}) |
| await self._handle_broadcast_event(job_id, job_type, {"history": [msg.to_dict() for msg in history]}) |
| await self._handle_broadcast_event(job_id, job_type, {"raw_content": t2t_result}) |
|
|
| |
| async for text_chunk_out in self.op_manager.use_operation(OpRoles.FILTER_TEXT, {"content": t2t_result}): |
| self.prompter.add_chat(self.prompter.character_name, text_chunk_out['content']) |
| await self._handle_broadcast_event(job_id, job_type, text_chunk_out) |
| if include_audio: |
| |
| async for audio_chunk_out in self.op_manager.use_operation(OpRoles.TTS, text_chunk_out): |
| |
| async for final_audio_chunk_out in self.op_manager.use_operation(OpRoles.FILTER_AUDIO, audio_chunk_out): |
| |
| for ws_chunk in chunk_buffer(base64.b64encode(final_audio_chunk_out['audio_bytes']).decode('utf-8')): |
| await self._handle_broadcast_event(job_id, job_type, { |
| "audio_bytes": ws_chunk, |
| "sr": final_audio_chunk_out['sr'], |
| "sw": final_audio_chunk_out['sw'], |
| "ch": final_audio_chunk_out['ch'] |
| }) |
| |
| |
| await self._handle_broadcast_success(job_id, job_type) |
|
|
|
|
| |
| async def clear_context( |
| self, |
| job_id: str, |
| job_type: JobType |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {}) |
| self.prompter.clear_history() |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def configure_context( |
| self, |
| job_id: str, |
| job_type: JobType, |
| name_translations: Dict[str, str] = None, |
| character_name: str = None, |
| history_length: int = None, |
| instruction_prompt_filename: str = None, |
| character_prompt_filename: str = None, |
| scene_prompt_filename: str = None |
| ): |
| await self._handle_broadcast_start(job_id, job_type, { |
| "name_translations": name_translations, |
| "character_name": character_name, |
| "history_length": history_length, |
| "instruction_prompt_filename": instruction_prompt_filename, |
| "character_prompt_filename": character_prompt_filename, |
| "scene_prompt_filename": scene_prompt_filename |
| }) |
| payload = dict() |
| if name_translations: payload |= {"name_translations": name_translations} |
| if character_name: payload |= {"character_name": character_name} |
| if history_length: payload |= {"history_length": history_length} |
| if history_length: payload |= {"history_length": history_length} |
| if instruction_prompt_filename: payload |= {"instruction_prompt_filename": instruction_prompt_filename} |
| if character_prompt_filename: payload |= {"character_prompt_filename": character_prompt_filename} |
| if scene_prompt_filename: payload |= {"scene_prompt_filename": scene_prompt_filename} |
| |
| self.prompter.configure(payload) |
| |
| await self._handle_broadcast_success(job_id, job_type) |
|
|
| async def append_request_context( |
| self, |
| job_id: str, |
| job_type: JobType, |
| content: str = None |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"content": content}) |
| self.prompter.add_request(content) |
| last_line_o = self.prompter.history[-1] |
| await self._handle_broadcast_event(job_id, job_type, { |
| "timestamp": last_line_o.time.timestamp(), |
| "content": last_line_o.message, |
| "line": last_line_o.to_line() |
| }) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def append_conversation_context_text( |
| self, |
| job_id: str, |
| job_type: JobType, |
| user: str = None, |
| timestamp: int = None, |
| content: str = None |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"user": user, "timestamp": timestamp, "content": content}) |
| self.prompter.add_chat( |
| user, |
| content, |
| time=( |
| datetime.datetime.fromtimestamp(timestamp) \ |
| if not isinstance(timestamp, datetime.datetime) else timestamp |
| ) |
| ) |
| last_line_o = self.prompter.history[-1] |
| await self._handle_broadcast_event(job_id, job_type, { |
| "user": last_line_o.user, |
| "timestamp": last_line_o.time.timestamp(), |
| "content": last_line_o.message, |
| "line": last_line_o.to_line() |
| }) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def append_conversation_context_audio( |
| self, |
| job_id: str, |
| job_type: JobType, |
| user: str = None, |
| timestamp: int = None, |
| audio_bytes: str = None, |
| sr: int = None, |
| sw: int = None, |
| ch: int = None |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"user": user, "timestamp": timestamp, "sr": sr, "sw": sw, "ch": ch, "audio_bytes": (audio_bytes is not None)}) |
| audio_bytes: bytes = base64.b64decode(audio_bytes) |
| prompt = self.prompter.get_history_text() or "You're name is {}".format(self.prompter.character_name) |
| content = "" |
| async for out_d in self.op_manager.use_operation(OpRoles.STT, {"prompt": prompt, "audio_bytes": audio_bytes, "sr": sr, "sw": sw, "ch": ch}): |
| content += out_d['transcription'] |
| |
| self.prompter.add_chat( |
| user, |
| content, |
| time=( |
| datetime.datetime.fromtimestamp(timestamp) \ |
| if isinstance(timestamp, int) else timestamp |
| ) |
| ) |
| last_line_o = self.prompter.history[-1] |
| await self._handle_broadcast_event(job_id, job_type, { |
| "user": last_line_o.user, |
| "timestamp": last_line_o.time.timestamp(), |
| "content": last_line_o.message, |
| "line": last_line_o.to_line() |
| }) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def register_custom_context( |
| self, |
| job_id: str, |
| job_type: JobType, |
| context_id: str = None, |
| context_name: str = None, |
| context_description: str = None |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"context_id": context_id, "context_name": context_name, "context_description": context_description}) |
| self.prompter.register_custom_context(context_id, context_name, context_description=context_description) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def remove_custom_context(self, |
| job_id: str, |
| job_type: JobType, |
| context_id: str = None |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"context_id": context_id}) |
| self.prompter.remove_custom_context(context_id) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def add_custom_context( |
| self, |
| job_id: str, |
| job_type: JobType, |
| context_id: str = None, |
| context_contents: str = None, |
| timestamp: int = None |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"context_id": context_id, "context_contents": context_contents, "timestamp": timestamp}) |
| if timestamp is not None: timestamp = datetime.datetime.fromtimestamp(timestamp) |
| self.prompter.add_custom_context(context_id, context_contents) |
| last_line_o = self.prompter.history[-1] |
| await self._handle_broadcast_event(job_id, job_type, { |
| "timestamp": last_line_o.time.timestamp(), |
| "content": last_line_o.message, |
| "line": last_line_o.to_line() |
| }) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| |
| async def load_operations( |
| self, |
| job_id: str, |
| job_type: JobType, |
| ops: List[Dict[str, str]] = [] |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"ops": ops}) |
| for op_d in ops: |
| await self.op_manager.load_operation(OpRoles(op_d.get('role', None)), op_d.get('id', None), op_d.get('config', dict())) |
| await self._handle_broadcast_event(job_id, job_type, { |
| "role": op_d.get('role', None), |
| "id": op_d.get('id', None), |
| "loose_key": op_d.get("loose_key", None) |
| }) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def load_operations_from_config( |
| self, |
| job_id: str, |
| job_type: JobType, |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {}) |
| await self.op_manager.load_operations_from_config() |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def unload_operations( |
| self, |
| job_id: str, |
| job_type: JobType, |
| ops: List[Dict[str, str]] = [] |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"ops": ops}) |
| for op_d in ops: |
| await self.op_manager.close_operation(OpRoles(op_d.get('role', None)), op_d.get('id', None)) |
| await self._handle_broadcast_event(job_id, job_type, { |
| "role": op_d.get('role', None), |
| "id": op_d.get('id', None) |
| }) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def configure_operations( |
| self, |
| job_id: str, |
| job_type: JobType, |
| ops: List[Dict[str, str]] = [] |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"ops": ops}) |
| for op_d in ops: |
| await self.op_manager.configure(OpRoles(op_d.get('role', None)), op_d, op_id=op_d.get('id', None)) |
| await self._handle_broadcast_event(job_id, job_type, op_d) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def use_operation( |
| self, |
| job_id: str, |
| job_type: JobType, |
| role: str = None, |
| id: str = None, |
| payload: Dict[str, Any] = None |
| ): |
| await self._handle_broadcast_start(job_id, job_type, {"role": role, "id": id}) |
| |
| if 'audio_bytes' in payload: |
| payload['audio_bytes'] = base64.b64decode(payload['audio_bytes']) |
|
|
| if 'messages' in payload: |
| msg_list = list() |
| for msg in payload['messages']: |
| assert 'type' in msg |
| if msg['type'] == "raw": |
| msg_list.append(RawMessage(msg['message'])) |
| elif msg['type'] == "request": |
| msg_list.append(RequestMessage(msg['message'], datetime.datetime.fromtimestamp(msg['time']))) |
| elif msg['type'] == "chat": |
| msg_list.append(ChatMessage(msg['user'], msg['message'], datetime.datetime.fromtimestamp(msg['time']))) |
| elif msg['type'] == "tool": |
| msg_list.append(MCPMessage(msg['tool'], msg['message'], datetime.datetime.fromtimestamp(msg['time']))) |
| elif msg['type'] == "custom": |
| msg_list.append(CustomMessage(msg['id'], msg['message'], datetime.datetime.fromtimestamp(msg['time']))) |
| else: |
| raise Exception("Invalid message type") |
| payload['messages'] = msg_list |
|
|
| try: |
| async for chunk_out in self.op_manager.use_operation(OpRoles(role), payload, op_id=id): |
| await self._handle_broadcast_event(job_id, job_type, chunk_out) |
| except OperationUnloaded: |
| op = self.op_manager.loose_load_operation(OpRoles(role), id) |
| await op.start() |
| async for chunk_out in op(payload): |
| if "audio_bytes" in chunk_out: chunk_out["audio_bytes"] = base64.b64encode(chunk_out['audio_bytes']).decode('utf-8') |
| await self._handle_broadcast_event(job_id, job_type, chunk_out) |
| await op.close() |
| |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| |
| async def load_config(self, job_id: str, job_type: JobType, config_name: str): |
| await self._handle_broadcast_start(job_id, job_type, {"config_name": config_name}) |
| Config().load_from_name(config_name) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def update_config(self, job_id: str, job_type: JobType, config_d: str): |
| await self._handle_broadcast_start(job_id, job_type, {"config_d": config_d}) |
| Config().load_from_dict(config_d) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| async def save_config(self, job_id: str, job_type: JobType, config_name: str): |
| await self._handle_broadcast_start(job_id, job_type, {"config_name": config_name}) |
| Config().save(config_name) |
| await self._handle_broadcast_success(job_id, job_type) |
| |
| |
| async def _handle_broadcast_start(self, job_id: str, job_type: JobType, payload: dict): |
| to_broadcast = { |
| "job_id": job_id, |
| "start": payload |
| } |
| logging.debug("Broadcasting start ({}) {} {:.500}".format(job_id, job_type.value, str(to_broadcast))) |
| await self.event_server.broadcast_event(job_type.value, to_broadcast) |
| |
| async def _handle_broadcast_event(self, job_id: str, job_type: JobType, payload: dict): |
| to_broadcast = { |
| "job_id": job_id, |
| "finished": False, |
| "result": payload |
| } |
| logging.debug("Broadcasting event ({}) {} {:.500}".format(job_id, job_type.value, str(to_broadcast))) |
| await self.event_server.broadcast_event(job_type.value, to_broadcast) |
| |
| async def _handle_broadcast_success(self, job_id: str, job_type: JobType): |
| to_broadcast = { |
| "job_id": job_id, |
| "finished": True, |
| "success": True |
| } |
| logging.debug("Broadcasting success ({}) {} {}".format(job_id, job_type.value, str(to_broadcast))) |
| await self.event_server.broadcast_event(job_type.value, to_broadcast) |
| |
| async def _handle_broadcast_error(self, job_id: str, job_type: JobType, err: Exception): |
| |
| error_type = "unknown" |
| if isinstance(err, UnknownOpType): error_type = "operation_unknown_type" |
| if isinstance(err, UnknownOpRole): error_type = "operation_unknown_role" |
| elif isinstance(err, UnknownOpID): error_type = "operation_unknown_id" |
| elif isinstance(err, DuplicateFilter): error_type = "operation_duplicate" |
| elif isinstance(err, OperationUnloaded): error_type = "operation_unloaded" |
| elif isinstance(err, StartActiveError): error_type = "operation_active" |
| elif isinstance(err, CloseInactiveError): error_type = "operation_inactive" |
| elif isinstance(err, UsedInactiveError): error_type = "operation_inactive" |
| elif isinstance(err, UnknownField): error_type = "config_unknown_field" |
| elif isinstance(err, UnknownFile): error_type = "config_unknown_file" |
| elif isinstance(err, UnknownJobType): error_type = "job_unknown" |
| elif isinstance(err, asyncio.CancelledError): error_type = "job_cancelled" |
| |
| to_broadcast = { |
| "job_id": job_id, |
| "finished": True, |
| "success": False, |
| "result": { |
| "type": error_type, |
| "reason": str(err) |
| } |
| } |
| |
| logging.debug("Broadcasting error ({}) {} {}".format(job_id, job_type.value, str(to_broadcast))) |
| await self.event_server.broadcast_event(job_type.value, to_broadcast) |