File size: 5,554 Bytes
a16e0e5
 
 
 
851495c
a16e0e5
 
 
 
 
 
 
 
 
61524a8
a16e0e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61524a8
a16e0e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f5226c0
a16e0e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import time
from dataclasses import dataclass
from typing import Dict, List, Callable, Any, Optional

from voice_dialogue.utils.logger import logger


@dataclass
class ServiceDefinition:
    """服务定义类,用于配置化管理服务"""
    name: str
    factory: Callable[[], Any]
    dependencies: List[str] = None
    required: bool = True
    startup_timeout: int = 60  # 启动超时时间(秒)
    health_check: Optional[Callable[[Any], bool]] = None

    def __post_init__(self):
        if self.dependencies is None:
            self.dependencies = []


class ServiceManager:
    """服务管理器,负责服务的统一启动、停止和监控"""

    def __init__(self):
        self.services: Dict[str, Any] = {}
        self.startup_errors: Dict[str, str] = {}
        self.startup_times: Dict[str, float] = {}
        self._shutdown_hooks: List[Callable] = []

    def add_shutdown_hook(self, hook: Callable):
        """添加关闭钩子函数"""
        self._shutdown_hooks.append(hook)

    def start_service(self, service_def: ServiceDefinition) -> bool:
        """启动单个服务"""
        start_time = time.time()
        try:
            logger.info(f"正在启动服务: {service_def.name}")

            # 检查依赖服务
            if not self._check_dependencies(service_def.dependencies):
                raise RuntimeError(f"服务 {service_def.name} 的依赖服务未就绪")

            service = service_def.factory()
            service.daemon = True
            service.start()

            # 等待服务就绪
            if not self._wait_for_service_ready(service, service_def):
                raise TimeoutError(f"服务 {service_def.name} 启动超时")

            # 健康检查
            if service_def.health_check and not service_def.health_check(service):
                raise RuntimeError(f"服务 {service_def.name} 健康检查失败")

            self.services[service_def.name] = service
            self.startup_times[service_def.name] = time.time() - start_time

            logger.info(f"服务 {service_def.name} 启动成功,耗时: {self.startup_times[service_def.name]:.2f}秒")
            return True

        except Exception as e:
            error_msg = f"服务 {service_def.name} 启动失败: {e}"
            logger.error(error_msg, exc_info=True)
            self.startup_errors[service_def.name] = error_msg

            if service_def.required:
                raise RuntimeError(error_msg)
            return False

    def _check_dependencies(self, dependencies: List[str]) -> bool:
        """检查依赖服务是否已启动并就绪"""
        for dep in dependencies:
            if dep not in self.services:
                logger.error(f"依赖服务 {dep} 未启动")
                return False
            if not self.services[dep].is_ready:
                logger.error(f"依赖服务 {dep} 未就绪")
                return False
        return True

    def _wait_for_service_ready(self, service: Any, service_def: ServiceDefinition) -> bool:
        """等待服务就绪"""
        timeout = service_def.startup_timeout
        start_time = time.time()

        while not service.is_ready and (time.time() - start_time) < timeout:
            time.sleep(0.1)

        return service.is_ready

    def stop_all_services(self):
        """停止所有服务"""
        logger.info("正在停止所有服务...")

        # 执行关闭钩子
        for hook in self._shutdown_hooks:
            try:
                hook()
            except Exception as e:
                logger.error(f"执行关闭钩子时发生错误: {e}", exc_info=True)

        # 按启动顺序的逆序停止服务
        for service_name in reversed(list(self.services.keys())):
            self._stop_single_service(service_name)

        self.services.clear()

    def _stop_single_service(self, service_name: str):
        """停止单个服务"""
        try:
            service = self.services[service_name]
            logger.info(f"正在停止服务: {service_name}")

            service.exit()

            # 等待服务停止(最多等待5秒)
            timeout = 5
            start_time = time.time()
            while service.is_alive() and (time.time() - start_time) < timeout:
                time.sleep(0.1)

            if service.is_alive():
                logger.warning(f"服务 {service_name} 未能在超时时间内停止")
            else:
                logger.info(f"服务 {service_name} 已停止")

        except Exception as e:
            logger.error(f"停止服务 {service_name} 时发生错误: {e}", exc_info=True)

    def get_service_status(self) -> dict:
        """获取所有服务状态"""
        return {
            "total_services": len(self.services),
            "startup_errors": len(self.startup_errors),
            "startup_times": self.startup_times.copy(),
            "errors": self.startup_errors.copy(),
            "services": {
                name: {
                    "running": service.is_alive(),
                    "ready": service.is_ready
                }
                for name, service in self.services.items()
            }
        }

    def get_service(self, name: str) -> Optional[Any]:
        """获取指定服务实例"""
        return self.services.get(name)

    def is_service_running(self, name: str) -> bool:
        """检查服务是否正在运行"""
        service = self.services.get(name)
        return service is not None and service.is_alive()