File size: 5,554 Bytes
a16e0e5 851495c a16e0e5 61524a8 a16e0e5 61524a8 a16e0e5 f5226c0 a16e0e5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import time
from dataclasses import dataclass
from typing import Dict, List, Callable, Any, Optional
from voice_dialogue.utils.logger import logger
@dataclass
class ServiceDefinition:
"""服务定义类,用于配置化管理服务"""
name: str
factory: Callable[[], Any]
dependencies: List[str] = None
required: bool = True
startup_timeout: int = 60 # 启动超时时间(秒)
health_check: Optional[Callable[[Any], bool]] = None
def __post_init__(self):
if self.dependencies is None:
self.dependencies = []
class ServiceManager:
"""服务管理器,负责服务的统一启动、停止和监控"""
def __init__(self):
self.services: Dict[str, Any] = {}
self.startup_errors: Dict[str, str] = {}
self.startup_times: Dict[str, float] = {}
self._shutdown_hooks: List[Callable] = []
def add_shutdown_hook(self, hook: Callable):
"""添加关闭钩子函数"""
self._shutdown_hooks.append(hook)
def start_service(self, service_def: ServiceDefinition) -> bool:
"""启动单个服务"""
start_time = time.time()
try:
logger.info(f"正在启动服务: {service_def.name}")
# 检查依赖服务
if not self._check_dependencies(service_def.dependencies):
raise RuntimeError(f"服务 {service_def.name} 的依赖服务未就绪")
service = service_def.factory()
service.daemon = True
service.start()
# 等待服务就绪
if not self._wait_for_service_ready(service, service_def):
raise TimeoutError(f"服务 {service_def.name} 启动超时")
# 健康检查
if service_def.health_check and not service_def.health_check(service):
raise RuntimeError(f"服务 {service_def.name} 健康检查失败")
self.services[service_def.name] = service
self.startup_times[service_def.name] = time.time() - start_time
logger.info(f"服务 {service_def.name} 启动成功,耗时: {self.startup_times[service_def.name]:.2f}秒")
return True
except Exception as e:
error_msg = f"服务 {service_def.name} 启动失败: {e}"
logger.error(error_msg, exc_info=True)
self.startup_errors[service_def.name] = error_msg
if service_def.required:
raise RuntimeError(error_msg)
return False
def _check_dependencies(self, dependencies: List[str]) -> bool:
"""检查依赖服务是否已启动并就绪"""
for dep in dependencies:
if dep not in self.services:
logger.error(f"依赖服务 {dep} 未启动")
return False
if not self.services[dep].is_ready:
logger.error(f"依赖服务 {dep} 未就绪")
return False
return True
def _wait_for_service_ready(self, service: Any, service_def: ServiceDefinition) -> bool:
"""等待服务就绪"""
timeout = service_def.startup_timeout
start_time = time.time()
while not service.is_ready and (time.time() - start_time) < timeout:
time.sleep(0.1)
return service.is_ready
def stop_all_services(self):
"""停止所有服务"""
logger.info("正在停止所有服务...")
# 执行关闭钩子
for hook in self._shutdown_hooks:
try:
hook()
except Exception as e:
logger.error(f"执行关闭钩子时发生错误: {e}", exc_info=True)
# 按启动顺序的逆序停止服务
for service_name in reversed(list(self.services.keys())):
self._stop_single_service(service_name)
self.services.clear()
def _stop_single_service(self, service_name: str):
"""停止单个服务"""
try:
service = self.services[service_name]
logger.info(f"正在停止服务: {service_name}")
service.exit()
# 等待服务停止(最多等待5秒)
timeout = 5
start_time = time.time()
while service.is_alive() and (time.time() - start_time) < timeout:
time.sleep(0.1)
if service.is_alive():
logger.warning(f"服务 {service_name} 未能在超时时间内停止")
else:
logger.info(f"服务 {service_name} 已停止")
except Exception as e:
logger.error(f"停止服务 {service_name} 时发生错误: {e}", exc_info=True)
def get_service_status(self) -> dict:
"""获取所有服务状态"""
return {
"total_services": len(self.services),
"startup_errors": len(self.startup_errors),
"startup_times": self.startup_times.copy(),
"errors": self.startup_errors.copy(),
"services": {
name: {
"running": service.is_alive(),
"ready": service.is_ready
}
for name, service in self.services.items()
}
}
def get_service(self, name: str) -> Optional[Any]:
"""获取指定服务实例"""
return self.services.get(name)
def is_service_running(self, name: str) -> bool:
"""检查服务是否正在运行"""
service = self.services.get(name)
return service is not None and service.is_alive()
|