AGI / cpp /runtime_components.h
Dmitry Beresnev
Fix Docker build for modular llm-manager
58d70b1
#pragma once
#include <boost/beast/http.hpp>
#include <deque>
#include <memory>
#include <optional>
#include <thread>
#include <unordered_map>
#include "llm_manager_types.h"
class ModelManager;
namespace http = boost::beast::http;
class RateLimiterStore {
public:
explicit RateLimiterStore(const RateLimitConfig &config);
RateLimitDecision allow(const std::string &api_key_id, int estimated_tokens);
private:
struct Bucket {
double request_tokens = 0.0;
double estimated_tokens = 0.0;
std::chrono::steady_clock::time_point last_request_refill{};
std::chrono::steady_clock::time_point last_estimated_refill{};
};
std::mutex mu_;
std::unordered_map<std::string, Bucket> buckets_;
int requests_per_minute_;
int estimated_tokens_per_minute_;
static void refill(
double &tokens,
std::chrono::steady_clock::time_point &last_refill,
int limit_per_minute,
std::chrono::steady_clock::time_point now);
};
class RequestRegistry {
public:
std::shared_ptr<RequestContext> create(
const std::string &request_id,
const ApiKeyRecord &principal,
const TokenEstimate &estimate,
const std::string &request_body);
std::shared_ptr<RequestContext> find(const std::string &request_id) const;
void complete(const std::shared_ptr<RequestContext> &ctx, RequestState state, RequestResult result);
std::shared_ptr<RequestContext> cancel_request(const std::string &request_id);
std::vector<std::shared_ptr<RequestContext>> cancel_all();
private:
mutable std::mutex mu_;
std::unordered_map<std::string, std::shared_ptr<RequestContext>> requests_;
};
class MetricsRegistry {
public:
void inc_requests_total();
void inc_requests_inflight();
void dec_requests_inflight();
void inc_queue_rejected_total();
void inc_rate_limited_total();
void add_cancellations_total(uint64_t delta = 1);
void inc_switch_total();
void inc_worker_restarts_total();
void observe_request_latency_ms(int64_t value);
void observe_queue_wait_ms(int64_t value);
std::string render_prometheus(const QueueSnapshot &queue, ModelManager &manager) const;
private:
std::atomic<uint64_t> requests_total_{0};
std::atomic<int64_t> requests_inflight_{0};
std::atomic<uint64_t> request_latency_ms_total_{0};
std::atomic<uint64_t> request_latency_samples_{0};
std::atomic<uint64_t> queue_rejected_total_{0};
std::atomic<uint64_t> rate_limited_total_{0};
std::atomic<uint64_t> queue_wait_ms_total_{0};
std::atomic<uint64_t> queue_wait_samples_{0};
std::atomic<uint64_t> cancellations_total_{0};
std::atomic<uint64_t> switch_total_{0};
std::atomic<uint64_t> worker_restarts_total_{0};
};
class PrioritySchedulerQueue {
public:
explicit PrioritySchedulerQueue(const QueueConfig &config);
bool try_push(const std::shared_ptr<RequestContext> &ctx);
std::shared_ptr<RequestContext> pop_next();
void stop();
int retry_after_sec() const;
QueueSnapshot snapshot() const;
private:
mutable std::mutex mu_;
std::condition_variable cv_;
std::deque<std::shared_ptr<RequestContext>> admin_queue_;
std::deque<std::shared_ptr<RequestContext>> user_queue_;
size_t max_size_;
size_t current_size_ = 0;
int max_tokens_;
int current_tokens_ = 0;
int admin_quota_;
int admin_streak_ = 0;
int retry_after_sec_;
bool stopped_ = false;
};
class Scheduler {
public:
Scheduler(
ModelManager &manager,
RequestRegistry &registry,
MetricsRegistry &metrics,
const QueueConfig &queue_config);
~Scheduler();
bool try_enqueue(const std::shared_ptr<RequestContext> &ctx);
int retry_after_sec() const;
QueueSnapshot snapshot() const;
private:
ModelManager &manager_;
RequestRegistry &registry_;
MetricsRegistry &metrics_;
PrioritySchedulerQueue queue_;
std::thread worker_;
void worker_loop();
};
class ApiKeyAuth {
public:
explicit ApiKeyAuth(const ManagerConfig &config);
bool enabled() const;
std::optional<ApiKeyRecord> authenticate(
const http::request<http::string_body> &req,
std::string &error) const;
private:
std::string header_name_;
std::string scheme_;
std::unordered_map<std::string, ApiKeyRecord> records_by_secret_;
std::optional<std::string> extract_bearer_token(
const http::request<http::string_body> &req,
std::string &error) const;
};