#pragma once #include #include #include #include #include #include #include "llm_manager_types.h" class ModelManager; namespace http = boost::beast::http; class RateLimiterStore { public: explicit RateLimiterStore(const RateLimitConfig &config); RateLimitDecision allow(const std::string &api_key_id, int estimated_tokens); private: struct Bucket { double request_tokens = 0.0; double estimated_tokens = 0.0; std::chrono::steady_clock::time_point last_request_refill{}; std::chrono::steady_clock::time_point last_estimated_refill{}; }; std::mutex mu_; std::unordered_map buckets_; int requests_per_minute_; int estimated_tokens_per_minute_; static void refill( double &tokens, std::chrono::steady_clock::time_point &last_refill, int limit_per_minute, std::chrono::steady_clock::time_point now); }; class RequestRegistry { public: std::shared_ptr create( const std::string &request_id, const ApiKeyRecord &principal, const TokenEstimate &estimate, const std::string &request_body); std::shared_ptr find(const std::string &request_id) const; void complete(const std::shared_ptr &ctx, RequestState state, RequestResult result); std::shared_ptr cancel_request(const std::string &request_id); std::vector> cancel_all(); private: mutable std::mutex mu_; std::unordered_map> requests_; }; class MetricsRegistry { public: void inc_requests_total(); void inc_requests_inflight(); void dec_requests_inflight(); void inc_queue_rejected_total(); void inc_rate_limited_total(); void add_cancellations_total(uint64_t delta = 1); void inc_switch_total(); void inc_worker_restarts_total(); void observe_request_latency_ms(int64_t value); void observe_queue_wait_ms(int64_t value); std::string render_prometheus(const QueueSnapshot &queue, ModelManager &manager) const; private: std::atomic requests_total_{0}; std::atomic requests_inflight_{0}; std::atomic request_latency_ms_total_{0}; std::atomic request_latency_samples_{0}; std::atomic queue_rejected_total_{0}; std::atomic rate_limited_total_{0}; std::atomic queue_wait_ms_total_{0}; std::atomic queue_wait_samples_{0}; std::atomic cancellations_total_{0}; std::atomic switch_total_{0}; std::atomic worker_restarts_total_{0}; }; class PrioritySchedulerQueue { public: explicit PrioritySchedulerQueue(const QueueConfig &config); bool try_push(const std::shared_ptr &ctx); std::shared_ptr pop_next(); void stop(); int retry_after_sec() const; QueueSnapshot snapshot() const; private: mutable std::mutex mu_; std::condition_variable cv_; std::deque> admin_queue_; std::deque> user_queue_; size_t max_size_; size_t current_size_ = 0; int max_tokens_; int current_tokens_ = 0; int admin_quota_; int admin_streak_ = 0; int retry_after_sec_; bool stopped_ = false; }; class Scheduler { public: Scheduler( ModelManager &manager, RequestRegistry ®istry, MetricsRegistry &metrics, const QueueConfig &queue_config); ~Scheduler(); bool try_enqueue(const std::shared_ptr &ctx); int retry_after_sec() const; QueueSnapshot snapshot() const; private: ModelManager &manager_; RequestRegistry ®istry_; MetricsRegistry &metrics_; PrioritySchedulerQueue queue_; std::thread worker_; void worker_loop(); }; class ApiKeyAuth { public: explicit ApiKeyAuth(const ManagerConfig &config); bool enabled() const; std::optional authenticate( const http::request &req, std::string &error) const; private: std::string header_name_; std::string scheme_; std::unordered_map records_by_secret_; std::optional extract_bearer_token( const http::request &req, std::string &error) const; };