Spaces:
Sleeping
Sleeping
File size: 5,601 Bytes
20eb0ca e40ec5e 20eb0ca e40ec5e 20eb0ca 2780361 20eb0ca | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | import os
import signal
import subprocess
import time
from pathlib import Path
from typing import TextIO
import httpx
class ProcessManager:
def __init__(
self, project_root: Path | None = None, mesh_root: Path | None = None
) -> None:
self.project_root = (
project_root or Path(__file__).resolve().parents[3]
).resolve()
self.mesh_root = (
mesh_root or Path(os.getenv("MESH_ROOT", self.project_root / "mesh"))
).resolve()
self._service_scripts = {
"gateway": self.project_root / "mesh" / "gateway" / "index.ts",
"auth": self.project_root / "mesh" / "auth" / "index.ts",
"worker": self.project_root / "mesh" / "worker" / "index.ts",
}
self._job_generator_script = (
self.project_root / "mesh" / "worker" / "job_generator.ts"
)
self._health_urls = {
"gateway": "http://localhost:3000/health",
"auth": "http://localhost:3001/health",
}
self._processes: dict[str, subprocess.Popen[str]] = {}
self._log_handles: dict[str, TextIO] = {}
@staticmethod
def _pid_path(service: str) -> Path:
return Path(f"/tmp/{service}.pid")
@staticmethod
def _is_pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
except OSError:
return False
status = subprocess.run(
["ps", "-o", "stat=", "-p", str(pid)],
capture_output=True,
text=True,
timeout=1,
check=False,
)
if status.returncode != 0:
return False
return "Z" not in status.stdout.strip()
def _read_pid(self, service: str) -> int | None:
path = self._pid_path(service)
if not path.exists():
return None
try:
pid = int(path.read_text().strip())
except (TypeError, ValueError):
return None
return pid if self._is_pid_alive(pid) else None
def _write_pid(self, service: str, pid: int) -> None:
self._pid_path(service).write_text(str(pid))
def _spawn_service(self, service: str, script: Path, log_path: Path) -> None:
log_handle = open(log_path, "a", encoding="utf-8")
env = {
**os.environ,
"MESH_ROOT": str(self.mesh_root),
}
process = subprocess.Popen(
["bun", "run", str(script)],
cwd=str(self.project_root),
stdout=log_handle,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
self._processes[service] = process
self._log_handles[service] = log_handle
self._write_pid(service, process.pid)
def start_all(self) -> None:
for service, script in self._service_scripts.items():
existing_pid = self._read_pid(service)
if existing_pid:
continue
self._spawn_service(service, script, Path(f"/tmp/{service}.log"))
if not self._read_pid("job_generator"):
self._spawn_service(
"job_generator", self._job_generator_script, Path("/tmp/job_gen.log")
)
def _terminate_pid(self, pid: int, timeout_s: float = 0.5) -> None:
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
return
deadline = time.time() + timeout_s
while time.time() < deadline:
if not self._is_pid_alive(pid):
return
time.sleep(0.05)
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
return
def stop_all(self) -> None:
for service in ["gateway", "auth", "worker", "job_generator"]:
pid = self._read_pid(service)
if pid:
self._terminate_pid(pid)
pid_path = self._pid_path(service)
if pid_path.exists():
pid_path.unlink(missing_ok=True)
for handle in self._log_handles.values():
try:
handle.close()
except Exception:
pass
self._processes.clear()
self._log_handles.clear()
def restart_all(self) -> None:
self.stop_all()
self.start_all()
def sighup(self, service: str) -> None:
pid = self._read_pid(service)
if not pid:
raise RuntimeError(f"Service not running: {service}")
os.kill(pid, signal.SIGHUP)
def wait_healthy(self, timeout_s: int = 30) -> bool:
deadline = time.time() + timeout_s
with httpx.Client(timeout=1.0) as client:
while time.time() < deadline:
try:
gateway_ok = (
client.get(self._health_urls["gateway"]).status_code == 200
)
auth_ok = client.get(self._health_urls["auth"]).status_code == 200
if gateway_ok and auth_ok:
return True
except Exception:
pass
time.sleep(1)
return False
def get_status(self) -> dict[str, str]:
status: dict[str, str] = {}
for service in ["gateway", "auth", "worker", "job_generator"]:
pid = self._read_pid(service)
status[service] = f"running pid={pid}" if pid else "stopped"
return status
def get_pid(self, service: str) -> int | None:
return self._read_pid(service)
def close(self) -> None:
self.stop_all()
|