#include "model_manager.h" #include #include #include #include "http_helpers.h" #include #include #include #include namespace asio = boost::asio; namespace beast = boost::beast; namespace http = beast::http; bool is_alive(pid_t pid) { if (pid <= 0) return false; return kill(pid, 0) == 0; } void shutdown_worker(pid_t pid, int wait_seconds) { if (pid <= 0) return; kill(pid, SIGTERM); const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(wait_seconds); while (std::chrono::steady_clock::now() < deadline) { int status = 0; pid_t r = waitpid(pid, &status, WNOHANG); if (r == pid) return; std::this_thread::sleep_for(std::chrono::milliseconds(200)); } kill(pid, SIGKILL); int status = 0; waitpid(pid, &status, 0); } ModelManager::ModelManager(const ManagerConfig &config) : default_model_(config.worker.default_model), llama_server_bin_(config.worker.llama_server_bin), worker_host_(config.worker.host), worker_bind_host_(config.worker.bind_host), base_port_(config.worker.base_port), switch_timeout_sec_(config.worker.switch_timeout_sec), n_ctx_(config.llama.n_ctx), n_threads_(config.llama.threads), n_gpu_layers_(config.llama.ngl), n_batch_(config.llama.batch), n_ubatch_(config.llama.ubatch), next_port_(base_port_) {} bool ModelManager::initialize_default(std::string &error) { return switch_model(default_model_, error); } bool ModelManager::switch_model(const std::string &model, std::string &error) { { std::lock_guard lock(mu_); if (switch_in_progress_) { error = "Switch already in progress"; return false; } if (active_ && active_->model == model && is_alive(active_->pid)) { return true; } switch_in_progress_ = true; } std::optional old_worker; { std::lock_guard lock(mu_); if (active_) old_worker = active_; } int port = allocate_port(); pid_t pid = spawn_worker(model, port); if (pid <= 0) { finish_switch(false); error = "Failed to start worker process"; return false; } if (!wait_until_ready(pid, port, switch_timeout_sec_)) { shutdown_worker(pid); finish_switch(false); error = "New model did not become ready in time"; return false; } WorkerInfo new_worker{model, port, pid, now_utc_iso()}; { std::lock_guard lock(mu_); active_ = new_worker; switch_in_progress_ = false; } if (old_worker && old_worker->pid != pid) { shutdown_worker(old_worker->pid); } return true; } bool ModelManager::restart_active(std::string &error) { std::optional old_worker; std::string model; { std::lock_guard lock(mu_); if (switch_in_progress_) { error = "Switch already in progress"; return false; } if (!active_ || !is_alive(active_->pid)) { error = "No active model"; return false; } switch_in_progress_ = true; old_worker = active_; model = active_->model; } shutdown_worker(old_worker->pid); int port = allocate_port(); pid_t pid = spawn_worker(model, port); if (pid <= 0) { std::lock_guard lock(mu_); active_ = std::nullopt; switch_in_progress_ = false; error = "Failed to start worker process"; return false; } if (!wait_until_ready(pid, port, switch_timeout_sec_)) { shutdown_worker(pid); std::lock_guard lock(mu_); active_ = std::nullopt; switch_in_progress_ = false; error = "New model did not become ready in time"; return false; } WorkerInfo new_worker{model, port, pid, now_utc_iso()}; { std::lock_guard lock(mu_); active_ = new_worker; switch_in_progress_ = false; } return true; } std::optional ModelManager::active_worker() { std::lock_guard lock(mu_); if (active_ && is_alive(active_->pid)) return active_; return std::nullopt; } json ModelManager::models_view() { std::lock_guard lock(mu_); json out; out["status"] = (active_ && is_alive(active_->pid)) ? "ready" : "no_active_model"; out["switch_in_progress"] = switch_in_progress_; if (active_ && is_alive(active_->pid)) { out["current_model"] = active_->model; out["last_loaded"] = active_->last_loaded; out["active_pid"] = active_->pid; out["active_port"] = active_->port; } else { out["current_model"] = nullptr; out["last_loaded"] = nullptr; out["active_pid"] = nullptr; out["active_port"] = nullptr; } return out; } int ModelManager::allocate_port() { std::lock_guard lock(mu_); return next_port_++; } void ModelManager::finish_switch(bool ok) { std::lock_guard lock(mu_); if (!ok) switch_in_progress_ = false; } pid_t ModelManager::spawn_worker(const std::string &model, int port) { pid_t pid = fork(); if (pid < 0) return -1; if (pid == 0) { setsid(); std::string port_s = std::to_string(port); std::string n_ctx_s = std::to_string(n_ctx_); std::string threads_s = std::to_string(n_threads_); std::string ngl_s = std::to_string(n_gpu_layers_); std::string batch_s = std::to_string(n_batch_); std::string ubatch_s = std::to_string(n_ubatch_); std::vector args = { llama_server_bin_, "-hf", model, "--host", worker_bind_host_, "--port", port_s, "-c", n_ctx_s, "-t", threads_s, "-ngl", ngl_s, "--cont-batching", "-b", batch_s, "--ubatch-size", ubatch_s }; std::vector argv; argv.reserve(args.size() + 1); for (auto &s : args) argv.push_back(const_cast(s.c_str())); argv.push_back(nullptr); execvp(argv[0], argv.data()); _exit(127); } return pid; } bool ModelManager::wait_until_ready(pid_t pid, int port, int timeout_sec) { const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_sec); while (std::chrono::steady_clock::now() < deadline) { if (!is_alive(pid)) return false; try { auto [status, _] = http_get(port, "/"); if (status == 200) return true; } catch (...) { } std::this_thread::sleep_for(std::chrono::milliseconds(800)); } return false; } std::pair ModelManager::http_get(int port, const std::string &target) { asio::io_context ioc; asio::ip::tcp::resolver resolver(ioc); beast::tcp_stream stream(ioc); auto const results = resolver.resolve(worker_host_, std::to_string(port)); stream.connect(results); http::request req{http::verb::get, target, 11}; req.set(http::field::host, worker_host_); req.set(http::field::user_agent, "llm-manager"); http::write(stream, req); beast::flat_buffer buffer; http::response res; http::read(stream, buffer, res); beast::error_code ec; stream.socket().shutdown(asio::ip::tcp::socket::shutdown_both, ec); return {res.result_int(), res.body()}; }