|
|
import time |
|
|
from dataclasses import dataclass |
|
|
from typing import Dict, List, Callable, Any, Optional |
|
|
|
|
|
from voice_dialogue.utils.logger import logger |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ServiceDefinition: |
|
|
"""服务定义类,用于配置化管理服务""" |
|
|
name: str |
|
|
factory: Callable[[], Any] |
|
|
dependencies: List[str] = None |
|
|
required: bool = True |
|
|
startup_timeout: int = 60 |
|
|
health_check: Optional[Callable[[Any], bool]] = None |
|
|
|
|
|
def __post_init__(self): |
|
|
if self.dependencies is None: |
|
|
self.dependencies = [] |
|
|
|
|
|
|
|
|
class ServiceManager: |
|
|
"""服务管理器,负责服务的统一启动、停止和监控""" |
|
|
|
|
|
def __init__(self): |
|
|
self.services: Dict[str, Any] = {} |
|
|
self.startup_errors: Dict[str, str] = {} |
|
|
self.startup_times: Dict[str, float] = {} |
|
|
self._shutdown_hooks: List[Callable] = [] |
|
|
|
|
|
def add_shutdown_hook(self, hook: Callable): |
|
|
"""添加关闭钩子函数""" |
|
|
self._shutdown_hooks.append(hook) |
|
|
|
|
|
def start_service(self, service_def: ServiceDefinition) -> bool: |
|
|
"""启动单个服务""" |
|
|
start_time = time.time() |
|
|
try: |
|
|
logger.info(f"正在启动服务: {service_def.name}") |
|
|
|
|
|
|
|
|
if not self._check_dependencies(service_def.dependencies): |
|
|
raise RuntimeError(f"服务 {service_def.name} 的依赖服务未就绪") |
|
|
|
|
|
service = service_def.factory() |
|
|
service.daemon = True |
|
|
service.start() |
|
|
|
|
|
|
|
|
if not self._wait_for_service_ready(service, service_def): |
|
|
raise TimeoutError(f"服务 {service_def.name} 启动超时") |
|
|
|
|
|
|
|
|
if service_def.health_check and not service_def.health_check(service): |
|
|
raise RuntimeError(f"服务 {service_def.name} 健康检查失败") |
|
|
|
|
|
self.services[service_def.name] = service |
|
|
self.startup_times[service_def.name] = time.time() - start_time |
|
|
|
|
|
logger.info(f"服务 {service_def.name} 启动成功,耗时: {self.startup_times[service_def.name]:.2f}秒") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"服务 {service_def.name} 启动失败: {e}" |
|
|
logger.error(error_msg, exc_info=True) |
|
|
self.startup_errors[service_def.name] = error_msg |
|
|
|
|
|
if service_def.required: |
|
|
raise RuntimeError(error_msg) |
|
|
return False |
|
|
|
|
|
def _check_dependencies(self, dependencies: List[str]) -> bool: |
|
|
"""检查依赖服务是否已启动并就绪""" |
|
|
for dep in dependencies: |
|
|
if dep not in self.services: |
|
|
logger.error(f"依赖服务 {dep} 未启动") |
|
|
return False |
|
|
if not self.services[dep].is_ready: |
|
|
logger.error(f"依赖服务 {dep} 未就绪") |
|
|
return False |
|
|
return True |
|
|
|
|
|
def _wait_for_service_ready(self, service: Any, service_def: ServiceDefinition) -> bool: |
|
|
"""等待服务就绪""" |
|
|
timeout = service_def.startup_timeout |
|
|
start_time = time.time() |
|
|
|
|
|
while not service.is_ready and (time.time() - start_time) < timeout: |
|
|
time.sleep(0.1) |
|
|
|
|
|
return service.is_ready |
|
|
|
|
|
def stop_all_services(self): |
|
|
"""停止所有服务""" |
|
|
logger.info("正在停止所有服务...") |
|
|
|
|
|
|
|
|
for hook in self._shutdown_hooks: |
|
|
try: |
|
|
hook() |
|
|
except Exception as e: |
|
|
logger.error(f"执行关闭钩子时发生错误: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
for service_name in reversed(list(self.services.keys())): |
|
|
self._stop_single_service(service_name) |
|
|
|
|
|
self.services.clear() |
|
|
|
|
|
def _stop_single_service(self, service_name: str): |
|
|
"""停止单个服务""" |
|
|
try: |
|
|
service = self.services[service_name] |
|
|
logger.info(f"正在停止服务: {service_name}") |
|
|
|
|
|
service.exit() |
|
|
|
|
|
|
|
|
timeout = 5 |
|
|
start_time = time.time() |
|
|
while service.is_alive() and (time.time() - start_time) < timeout: |
|
|
time.sleep(0.1) |
|
|
|
|
|
if service.is_alive(): |
|
|
logger.warning(f"服务 {service_name} 未能在超时时间内停止") |
|
|
else: |
|
|
logger.info(f"服务 {service_name} 已停止") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"停止服务 {service_name} 时发生错误: {e}", exc_info=True) |
|
|
|
|
|
def get_service_status(self) -> dict: |
|
|
"""获取所有服务状态""" |
|
|
return { |
|
|
"total_services": len(self.services), |
|
|
"startup_errors": len(self.startup_errors), |
|
|
"startup_times": self.startup_times.copy(), |
|
|
"errors": self.startup_errors.copy(), |
|
|
"services": { |
|
|
name: { |
|
|
"running": service.is_alive(), |
|
|
"ready": service.is_ready |
|
|
} |
|
|
for name, service in self.services.items() |
|
|
} |
|
|
} |
|
|
|
|
|
def get_service(self, name: str) -> Optional[Any]: |
|
|
"""获取指定服务实例""" |
|
|
return self.services.get(name) |
|
|
|
|
|
def is_service_running(self, name: str) -> bool: |
|
|
"""检查服务是否正在运行""" |
|
|
service = self.services.get(name) |
|
|
return service is not None and service.is_alive() |
|
|
|