nomagick commited on
Commit
988413d
·
unverified ·
1 Parent(s): 9fbd751

saas: async auth/rate-limit path for high-freq keys

Browse files
Files changed (2) hide show
  1. src/api/searcher-serper.ts +95 -3
  2. thinapps-shared +1 -1
src/api/searcher-serper.ts CHANGED
@@ -19,9 +19,11 @@ import { AsyncLocalContext } from '../services/async-context';
19
  import { Context, Ctx, Method, Param, RPCReflect } from '../services/registry';
20
  import { OutputServerEventStream } from '../lib/transform-server-event-stream';
21
  import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth';
22
- import { InsufficientBalanceError } from '../services/errors';
23
  import { SerperImageSearchResponse, SerperNewsSearchResponse, SerperSearchQueryParams, SerperSearchResponse, SerperWebSearchResponse, WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search';
24
  import { toAsyncGenerator } from '../utils/misc';
 
 
25
 
26
  const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase());
27
 
@@ -30,6 +32,11 @@ interface FormattedPage extends RealFormattedPage {
30
  date?: string;
31
  }
32
 
 
 
 
 
 
33
  @singleton()
34
  export class SearcherHost extends RPCHost {
35
  logger = this.globalLogger.child({ service: this.constructor.name });
@@ -42,6 +49,13 @@ export class SearcherHost extends RPCHost {
42
 
43
  targetResultCount = 5;
44
 
 
 
 
 
 
 
 
45
  constructor(
46
  protected globalLogger: GlobalLogger,
47
  protected rateLimitControl: RateLimitControl,
@@ -105,6 +119,14 @@ export class SearcherHost extends RPCHost {
105
  // Here we combine 'count' and 'num' to 'count' for the rest of the function.
106
  count = (num !== undefined ? num : count) ?? 10;
107
 
 
 
 
 
 
 
 
 
108
  const uid = await auth.solveUID();
109
  // Return content by default
110
  const crawlWithoutContent = crawlerOptions.respondWith.includes('no-content');
@@ -134,6 +156,17 @@ export class SearcherHost extends RPCHost {
134
  throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`);
135
  }
136
 
 
 
 
 
 
 
 
 
 
 
 
137
  const rateLimitPolicy = auth.getRateLimits(rpcReflect.name.toUpperCase()) || [
138
  parseInt(user.metadata?.speed_level) >= 2 ?
139
  RateLimitDesc.from({
@@ -146,16 +179,75 @@ export class SearcherHost extends RPCHost {
146
  })
147
  ];
148
 
149
- const apiRoll = await this.rateLimitControl.simpleRPCUidBasedLimit(
150
  rpcReflect, uid!, [rpcReflect.name.toUpperCase()],
151
  ...rateLimitPolicy
152
  );
153
 
154
- rpcReflect.finally(() => {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
  if (chargeAmount) {
156
  auth.reportUsage(chargeAmount, `reader-${rpcReflect.name}`).catch((err) => {
157
  this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) });
158
  });
 
159
  apiRoll.chargeAmount = chargeAmount;
160
  }
161
  });
 
19
  import { Context, Ctx, Method, Param, RPCReflect } from '../services/registry';
20
  import { OutputServerEventStream } from '../lib/transform-server-event-stream';
21
  import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth';
22
+ import { InsufficientBalanceError, RateLimitTriggeredError } from '../services/errors';
23
  import { SerperImageSearchResponse, SerperNewsSearchResponse, SerperSearchQueryParams, SerperSearchResponse, SerperWebSearchResponse, WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search';
24
  import { toAsyncGenerator } from '../utils/misc';
25
+ import type { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account';
26
+ import { LRUCache } from 'lru-cache';
27
 
28
  const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase());
29
 
 
32
  date?: string;
33
  }
34
 
35
+ type RateLimitCache = {
36
+ blockedUntil?: Date;
37
+ user?: JinaEmbeddingsTokenAccount;
38
+ };
39
+
40
  @singleton()
41
  export class SearcherHost extends RPCHost {
42
  logger = this.globalLogger.child({ service: this.constructor.name });
 
49
 
50
  targetResultCount = 5;
51
 
52
+ highFreqKeyCache = new LRUCache<string, RateLimitCache>({
53
+ max: 256,
54
+ ttl: 60 * 60 * 1000,
55
+ updateAgeOnGet: false,
56
+ updateAgeOnHas: false,
57
+ });
58
+
59
  constructor(
60
  protected globalLogger: GlobalLogger,
61
  protected rateLimitControl: RateLimitControl,
 
119
  // Here we combine 'count' and 'num' to 'count' for the rest of the function.
120
  count = (num !== undefined ? num : count) ?? 10;
121
 
122
+ const authToken = auth.bearerToken;
123
+ let highFreqKey: RateLimitCache | undefined;
124
+ if (authToken && this.highFreqKeyCache.has(authToken)) {
125
+ highFreqKey = this.highFreqKeyCache.get(authToken)!;
126
+ auth.user = highFreqKey.user;
127
+ auth.uid = highFreqKey.user?.user_id;
128
+ }
129
+
130
  const uid = await auth.solveUID();
131
  // Return content by default
132
  const crawlWithoutContent = crawlerOptions.respondWith.includes('no-content');
 
156
  throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`);
157
  }
158
 
159
+ if (highFreqKey?.blockedUntil) {
160
+ const now = new Date();
161
+ const blockedTimeRemaining = (highFreqKey.blockedUntil.valueOf() - now.valueOf());
162
+ if (blockedTimeRemaining > 0) {
163
+ throw RateLimitTriggeredError.from({
164
+ message: `Per UID rate limit exceeded (async)`,
165
+ retryAfter: Math.ceil(blockedTimeRemaining / 1000),
166
+ });
167
+ }
168
+ }
169
+
170
  const rateLimitPolicy = auth.getRateLimits(rpcReflect.name.toUpperCase()) || [
171
  parseInt(user.metadata?.speed_level) >= 2 ?
172
  RateLimitDesc.from({
 
179
  })
180
  ];
181
 
182
+ const apiRollPromise = this.rateLimitControl.simpleRPCUidBasedLimit(
183
  rpcReflect, uid!, [rpcReflect.name.toUpperCase()],
184
  ...rateLimitPolicy
185
  );
186
 
187
+ if (!highFreqKey) {
188
+ // Normal path
189
+ await apiRollPromise;
190
+
191
+ if (rateLimitPolicy.some(
192
+ (x) => {
193
+ const rpm = x.occurrence / (x.periodSeconds / 60);
194
+ if (rpm >= 400) {
195
+ return true;
196
+ }
197
+
198
+ return false;
199
+ })
200
+ ) {
201
+ this.highFreqKeyCache.set(auth.bearerToken!, {
202
+ user,
203
+ });
204
+ }
205
+
206
+ } else {
207
+ // High freq key path
208
+ apiRollPromise.then(
209
+ // Rate limit not triggered, make sure not blocking.
210
+ () => {
211
+ delete highFreqKey.blockedUntil;
212
+ },
213
+ // Rate limit triggered
214
+ (err) => {
215
+ if (!(err instanceof RateLimitTriggeredError)) {
216
+ return;
217
+ }
218
+ const now = Date.now();
219
+ let tgtDate;
220
+ if (err.retryAfter) {
221
+ tgtDate = new Date(now + err.retryAfter * 1000);
222
+ } else if (err.retryAfterDate) {
223
+ tgtDate = err.retryAfterDate;
224
+ }
225
+
226
+ if (tgtDate) {
227
+ const dt = tgtDate.valueOf() - now;
228
+ highFreqKey.blockedUntil = tgtDate;
229
+ setTimeout(() => {
230
+ if (highFreqKey.blockedUntil === tgtDate) {
231
+ delete highFreqKey.blockedUntil;
232
+ }
233
+ }, dt).unref();
234
+ }
235
+ }
236
+ ).finally(async () => {
237
+ // Always asynchronously update user(wallet);
238
+ const user = await auth.getBrief().catch(() => undefined);
239
+ if (user) {
240
+ highFreqKey.user = user;
241
+ }
242
+ });
243
+ }
244
+
245
+ rpcReflect.finally(async () => {
246
  if (chargeAmount) {
247
  auth.reportUsage(chargeAmount, `reader-${rpcReflect.name}`).catch((err) => {
248
  this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) });
249
  });
250
+ const apiRoll = await apiRollPromise;
251
  apiRoll.chargeAmount = chargeAmount;
252
  }
253
  });
thinapps-shared CHANGED
@@ -1 +1 @@
1
- Subproject commit 07d23193d85b1d3c8bbd5d0b024a6884ecfe17fd
 
1
+ Subproject commit 492ed4cac38958c3715013da49a9e73b1d1ef8cb