AGI / cpp /model_manager.cpp
Dmitry Beresnev
Refactor the C++ LLM manager into modular components, moves Python modules under python/, and keeps the current control-plane behavior intact. The C++ server now has clearer separation for config, model lifecycle, runtime services, request parsing, HTTP helpers, and server routing, while Docker build/runtime paths were updated to compile multiple C++ files and load Python code from the new package folder
332826f
#include "model_manager.h"
#include <boost/asio.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include "http_helpers.h"
#include <csignal>
#include <thread>
#include <unistd.h>
#include <sys/wait.h>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace http = beast::http;
bool is_alive(pid_t pid) {
if (pid <= 0) return false;
return kill(pid, 0) == 0;
}
void shutdown_worker(pid_t pid, int wait_seconds) {
if (pid <= 0) return;
kill(pid, SIGTERM);
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(wait_seconds);
while (std::chrono::steady_clock::now() < deadline) {
int status = 0;
pid_t r = waitpid(pid, &status, WNOHANG);
if (r == pid) return;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
kill(pid, SIGKILL);
int status = 0;
waitpid(pid, &status, 0);
}
ModelManager::ModelManager(const ManagerConfig &config)
: default_model_(config.worker.default_model),
llama_server_bin_(config.worker.llama_server_bin),
worker_host_(config.worker.host),
worker_bind_host_(config.worker.bind_host),
base_port_(config.worker.base_port),
switch_timeout_sec_(config.worker.switch_timeout_sec),
n_ctx_(config.llama.n_ctx),
n_threads_(config.llama.threads),
n_gpu_layers_(config.llama.ngl),
n_batch_(config.llama.batch),
n_ubatch_(config.llama.ubatch),
next_port_(base_port_) {}
bool ModelManager::initialize_default(std::string &error) {
return switch_model(default_model_, error);
}
bool ModelManager::switch_model(const std::string &model, std::string &error) {
{
std::lock_guard<std::mutex> lock(mu_);
if (switch_in_progress_) {
error = "Switch already in progress";
return false;
}
if (active_ && active_->model == model && is_alive(active_->pid)) {
return true;
}
switch_in_progress_ = true;
}
std::optional<WorkerInfo> old_worker;
{
std::lock_guard<std::mutex> lock(mu_);
if (active_) old_worker = active_;
}
int port = allocate_port();
pid_t pid = spawn_worker(model, port);
if (pid <= 0) {
finish_switch(false);
error = "Failed to start worker process";
return false;
}
if (!wait_until_ready(pid, port, switch_timeout_sec_)) {
shutdown_worker(pid);
finish_switch(false);
error = "New model did not become ready in time";
return false;
}
WorkerInfo new_worker{model, port, pid, now_utc_iso()};
{
std::lock_guard<std::mutex> lock(mu_);
active_ = new_worker;
switch_in_progress_ = false;
}
if (old_worker && old_worker->pid != pid) {
shutdown_worker(old_worker->pid);
}
return true;
}
bool ModelManager::restart_active(std::string &error) {
std::optional<WorkerInfo> old_worker;
std::string model;
{
std::lock_guard<std::mutex> lock(mu_);
if (switch_in_progress_) {
error = "Switch already in progress";
return false;
}
if (!active_ || !is_alive(active_->pid)) {
error = "No active model";
return false;
}
switch_in_progress_ = true;
old_worker = active_;
model = active_->model;
}
shutdown_worker(old_worker->pid);
int port = allocate_port();
pid_t pid = spawn_worker(model, port);
if (pid <= 0) {
std::lock_guard<std::mutex> lock(mu_);
active_ = std::nullopt;
switch_in_progress_ = false;
error = "Failed to start worker process";
return false;
}
if (!wait_until_ready(pid, port, switch_timeout_sec_)) {
shutdown_worker(pid);
std::lock_guard<std::mutex> lock(mu_);
active_ = std::nullopt;
switch_in_progress_ = false;
error = "New model did not become ready in time";
return false;
}
WorkerInfo new_worker{model, port, pid, now_utc_iso()};
{
std::lock_guard<std::mutex> lock(mu_);
active_ = new_worker;
switch_in_progress_ = false;
}
return true;
}
std::optional<WorkerInfo> ModelManager::active_worker() {
std::lock_guard<std::mutex> lock(mu_);
if (active_ && is_alive(active_->pid)) return active_;
return std::nullopt;
}
json ModelManager::models_view() {
std::lock_guard<std::mutex> lock(mu_);
json out;
out["status"] = (active_ && is_alive(active_->pid)) ? "ready" : "no_active_model";
out["switch_in_progress"] = switch_in_progress_;
if (active_ && is_alive(active_->pid)) {
out["current_model"] = active_->model;
out["last_loaded"] = active_->last_loaded;
out["active_pid"] = active_->pid;
out["active_port"] = active_->port;
} else {
out["current_model"] = nullptr;
out["last_loaded"] = nullptr;
out["active_pid"] = nullptr;
out["active_port"] = nullptr;
}
return out;
}
int ModelManager::allocate_port() {
std::lock_guard<std::mutex> lock(mu_);
return next_port_++;
}
void ModelManager::finish_switch(bool ok) {
std::lock_guard<std::mutex> lock(mu_);
if (!ok) switch_in_progress_ = false;
}
pid_t ModelManager::spawn_worker(const std::string &model, int port) {
pid_t pid = fork();
if (pid < 0) return -1;
if (pid == 0) {
setsid();
std::string port_s = std::to_string(port);
std::string n_ctx_s = std::to_string(n_ctx_);
std::string threads_s = std::to_string(n_threads_);
std::string ngl_s = std::to_string(n_gpu_layers_);
std::string batch_s = std::to_string(n_batch_);
std::string ubatch_s = std::to_string(n_ubatch_);
std::vector<std::string> args = {
llama_server_bin_,
"-hf", model,
"--host", worker_bind_host_,
"--port", port_s,
"-c", n_ctx_s,
"-t", threads_s,
"-ngl", ngl_s,
"--cont-batching",
"-b", batch_s,
"--ubatch-size", ubatch_s
};
std::vector<char *> argv;
argv.reserve(args.size() + 1);
for (auto &s : args) argv.push_back(const_cast<char *>(s.c_str()));
argv.push_back(nullptr);
execvp(argv[0], argv.data());
_exit(127);
}
return pid;
}
bool ModelManager::wait_until_ready(pid_t pid, int port, int timeout_sec) {
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_sec);
while (std::chrono::steady_clock::now() < deadline) {
if (!is_alive(pid)) return false;
try {
auto [status, _] = http_get(port, "/");
if (status == 200) return true;
} catch (...) {
}
std::this_thread::sleep_for(std::chrono::milliseconds(800));
}
return false;
}
std::pair<int, std::string> ModelManager::http_get(int port, const std::string &target) {
asio::io_context ioc;
asio::ip::tcp::resolver resolver(ioc);
beast::tcp_stream stream(ioc);
auto const results = resolver.resolve(worker_host_, std::to_string(port));
stream.connect(results);
http::request<http::string_body> req{http::verb::get, target, 11};
req.set(http::field::host, worker_host_);
req.set(http::field::user_agent, "llm-manager");
http::write(stream, req);
beast::flat_buffer buffer;
http::response<http::string_body> res;
http::read(stream, buffer, res);
beast::error_code ec;
stream.socket().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
return {res.result_int(), res.body()};
}