Spaces:
Running
Running
File size: 4,527 Bytes
332826f 58d70b1 332826f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | #pragma once
#include <boost/beast/http.hpp>
#include <deque>
#include <memory>
#include <optional>
#include <thread>
#include <unordered_map>
#include "llm_manager_types.h"
class ModelManager;
namespace http = boost::beast::http;
class RateLimiterStore {
public:
explicit RateLimiterStore(const RateLimitConfig &config);
RateLimitDecision allow(const std::string &api_key_id, int estimated_tokens);
private:
struct Bucket {
double request_tokens = 0.0;
double estimated_tokens = 0.0;
std::chrono::steady_clock::time_point last_request_refill{};
std::chrono::steady_clock::time_point last_estimated_refill{};
};
std::mutex mu_;
std::unordered_map<std::string, Bucket> buckets_;
int requests_per_minute_;
int estimated_tokens_per_minute_;
static void refill(
double &tokens,
std::chrono::steady_clock::time_point &last_refill,
int limit_per_minute,
std::chrono::steady_clock::time_point now);
};
class RequestRegistry {
public:
std::shared_ptr<RequestContext> create(
const std::string &request_id,
const ApiKeyRecord &principal,
const TokenEstimate &estimate,
const std::string &request_body);
std::shared_ptr<RequestContext> find(const std::string &request_id) const;
void complete(const std::shared_ptr<RequestContext> &ctx, RequestState state, RequestResult result);
std::shared_ptr<RequestContext> cancel_request(const std::string &request_id);
std::vector<std::shared_ptr<RequestContext>> cancel_all();
private:
mutable std::mutex mu_;
std::unordered_map<std::string, std::shared_ptr<RequestContext>> requests_;
};
class MetricsRegistry {
public:
void inc_requests_total();
void inc_requests_inflight();
void dec_requests_inflight();
void inc_queue_rejected_total();
void inc_rate_limited_total();
void add_cancellations_total(uint64_t delta = 1);
void inc_switch_total();
void inc_worker_restarts_total();
void observe_request_latency_ms(int64_t value);
void observe_queue_wait_ms(int64_t value);
std::string render_prometheus(const QueueSnapshot &queue, ModelManager &manager) const;
private:
std::atomic<uint64_t> requests_total_{0};
std::atomic<int64_t> requests_inflight_{0};
std::atomic<uint64_t> request_latency_ms_total_{0};
std::atomic<uint64_t> request_latency_samples_{0};
std::atomic<uint64_t> queue_rejected_total_{0};
std::atomic<uint64_t> rate_limited_total_{0};
std::atomic<uint64_t> queue_wait_ms_total_{0};
std::atomic<uint64_t> queue_wait_samples_{0};
std::atomic<uint64_t> cancellations_total_{0};
std::atomic<uint64_t> switch_total_{0};
std::atomic<uint64_t> worker_restarts_total_{0};
};
class PrioritySchedulerQueue {
public:
explicit PrioritySchedulerQueue(const QueueConfig &config);
bool try_push(const std::shared_ptr<RequestContext> &ctx);
std::shared_ptr<RequestContext> pop_next();
void stop();
int retry_after_sec() const;
QueueSnapshot snapshot() const;
private:
mutable std::mutex mu_;
std::condition_variable cv_;
std::deque<std::shared_ptr<RequestContext>> admin_queue_;
std::deque<std::shared_ptr<RequestContext>> user_queue_;
size_t max_size_;
size_t current_size_ = 0;
int max_tokens_;
int current_tokens_ = 0;
int admin_quota_;
int admin_streak_ = 0;
int retry_after_sec_;
bool stopped_ = false;
};
class Scheduler {
public:
Scheduler(
ModelManager &manager,
RequestRegistry ®istry,
MetricsRegistry &metrics,
const QueueConfig &queue_config);
~Scheduler();
bool try_enqueue(const std::shared_ptr<RequestContext> &ctx);
int retry_after_sec() const;
QueueSnapshot snapshot() const;
private:
ModelManager &manager_;
RequestRegistry ®istry_;
MetricsRegistry &metrics_;
PrioritySchedulerQueue queue_;
std::thread worker_;
void worker_loop();
};
class ApiKeyAuth {
public:
explicit ApiKeyAuth(const ManagerConfig &config);
bool enabled() const;
std::optional<ApiKeyRecord> authenticate(
const http::request<http::string_body> &req,
std::string &error) const;
private:
std::string header_name_;
std::string scheme_;
std::unordered_map<std::string, ApiKeyRecord> records_by_secret_;
std::optional<std::string> extract_bearer_token(
const http::request<http::string_body> &req,
std::string &error) const;
};
|