File size: 4,527 Bytes
332826f
 
 
 
 
 
58d70b1
332826f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#pragma once

#include <boost/beast/http.hpp>

#include <deque>
#include <memory>
#include <optional>
#include <thread>
#include <unordered_map>

#include "llm_manager_types.h"

class ModelManager;

namespace http = boost::beast::http;

class RateLimiterStore {
public:
    explicit RateLimiterStore(const RateLimitConfig &config);
    RateLimitDecision allow(const std::string &api_key_id, int estimated_tokens);

private:
    struct Bucket {
        double request_tokens = 0.0;
        double estimated_tokens = 0.0;
        std::chrono::steady_clock::time_point last_request_refill{};
        std::chrono::steady_clock::time_point last_estimated_refill{};
    };

    std::mutex mu_;
    std::unordered_map<std::string, Bucket> buckets_;
    int requests_per_minute_;
    int estimated_tokens_per_minute_;

    static void refill(
        double &tokens,
        std::chrono::steady_clock::time_point &last_refill,
        int limit_per_minute,
        std::chrono::steady_clock::time_point now);
};

class RequestRegistry {
public:
    std::shared_ptr<RequestContext> create(
        const std::string &request_id,
        const ApiKeyRecord &principal,
        const TokenEstimate &estimate,
        const std::string &request_body);
    std::shared_ptr<RequestContext> find(const std::string &request_id) const;
    void complete(const std::shared_ptr<RequestContext> &ctx, RequestState state, RequestResult result);
    std::shared_ptr<RequestContext> cancel_request(const std::string &request_id);
    std::vector<std::shared_ptr<RequestContext>> cancel_all();

private:
    mutable std::mutex mu_;
    std::unordered_map<std::string, std::shared_ptr<RequestContext>> requests_;
};

class MetricsRegistry {
public:
    void inc_requests_total();
    void inc_requests_inflight();
    void dec_requests_inflight();
    void inc_queue_rejected_total();
    void inc_rate_limited_total();
    void add_cancellations_total(uint64_t delta = 1);
    void inc_switch_total();
    void inc_worker_restarts_total();
    void observe_request_latency_ms(int64_t value);
    void observe_queue_wait_ms(int64_t value);
    std::string render_prometheus(const QueueSnapshot &queue, ModelManager &manager) const;

private:
    std::atomic<uint64_t> requests_total_{0};
    std::atomic<int64_t> requests_inflight_{0};
    std::atomic<uint64_t> request_latency_ms_total_{0};
    std::atomic<uint64_t> request_latency_samples_{0};
    std::atomic<uint64_t> queue_rejected_total_{0};
    std::atomic<uint64_t> rate_limited_total_{0};
    std::atomic<uint64_t> queue_wait_ms_total_{0};
    std::atomic<uint64_t> queue_wait_samples_{0};
    std::atomic<uint64_t> cancellations_total_{0};
    std::atomic<uint64_t> switch_total_{0};
    std::atomic<uint64_t> worker_restarts_total_{0};
};

class PrioritySchedulerQueue {
public:
    explicit PrioritySchedulerQueue(const QueueConfig &config);
    bool try_push(const std::shared_ptr<RequestContext> &ctx);
    std::shared_ptr<RequestContext> pop_next();
    void stop();
    int retry_after_sec() const;
    QueueSnapshot snapshot() const;

private:
    mutable std::mutex mu_;
    std::condition_variable cv_;
    std::deque<std::shared_ptr<RequestContext>> admin_queue_;
    std::deque<std::shared_ptr<RequestContext>> user_queue_;
    size_t max_size_;
    size_t current_size_ = 0;
    int max_tokens_;
    int current_tokens_ = 0;
    int admin_quota_;
    int admin_streak_ = 0;
    int retry_after_sec_;
    bool stopped_ = false;
};

class Scheduler {
public:
    Scheduler(
        ModelManager &manager,
        RequestRegistry &registry,
        MetricsRegistry &metrics,
        const QueueConfig &queue_config);
    ~Scheduler();

    bool try_enqueue(const std::shared_ptr<RequestContext> &ctx);
    int retry_after_sec() const;
    QueueSnapshot snapshot() const;

private:
    ModelManager &manager_;
    RequestRegistry &registry_;
    MetricsRegistry &metrics_;
    PrioritySchedulerQueue queue_;
    std::thread worker_;

    void worker_loop();
};

class ApiKeyAuth {
public:
    explicit ApiKeyAuth(const ManagerConfig &config);
    bool enabled() const;
    std::optional<ApiKeyRecord> authenticate(
        const http::request<http::string_body> &req,
        std::string &error) const;

private:
    std::string header_name_;
    std::string scheme_;
    std::unordered_map<std::string, ApiKeyRecord> records_by_secret_;

    std::optional<std::string> extract_bearer_token(
        const http::request<http::string_body> &req,
        std::string &error) const;
};