import { singleton } from 'tsyringe'; import { RPCHost, RPCReflection, assignMeta, RawString, ParamValidationError, assignTransferProtocolMeta, } from 'civkit/civ-rpc'; import { marshalErrorLike } from 'civkit/lang'; import _ from 'lodash'; import { RateLimitControl, RateLimitDesc, RateLimitTriggeredError } from '../shared/services/rate-limit'; import { GlobalLogger } from '../services/logger'; import { AsyncLocalContext } from '../services/async-context'; import { Context, Ctx, Method, Param, RPCReflect } from '../services/registry'; import { OutputServerEventStream } from '../lib/transform-server-event-stream'; import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth'; import { InsufficientBalanceError } from '../services/errors'; import { WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search'; import { GoogleSERP } from '../services/serp/google'; import { WebSearchEntry } from '../services/serp/compat'; import { CrawlerOptions } from '../dto/crawler-options'; import { ScrappingOptions } from '../services/serp/puppeteer'; import { objHashMd5B64Of } from 'civkit/hash'; import { SERPResult } from '../db/searched'; import { SerperBingSearchService, SerperGoogleSearchService } from '../services/serp/serper'; import type { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account'; import { LRUCache } from 'lru-cache'; import { API_CALL_STATUS } from '../shared/db/api-roll'; import { InternalJinaSerpService } from '../services/serp/internal'; const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase()); type RateLimitCache = { blockedUntil?: Date; user?: JinaEmbeddingsTokenAccount; }; const indexProto = { toString: function (): string { return _(this) .toPairs() .map(([k, v]) => k ? `[${_.upperFirst(_.lowerCase(k))}] ${v}` : '') .value() .join('\n') + '\n'; } }; @singleton() export class SerpHost extends RPCHost { logger = this.globalLogger.child({ service: this.constructor.name }); cacheRetentionMs = 1000 * 3600 * 24 * 7; cacheValidMs = 1000 * 3600; pageCacheToleranceMs = 1000 * 3600 * 24; reasonableDelayMs = 15_000; targetResultCount = 5; highFreqKeyCache = new LRUCache({ max: 256, ttl: 60 * 60 * 1000, updateAgeOnGet: false, updateAgeOnHas: false, }); batchedCaches: SERPResult[] = []; async getIndex(ctx: Context, auth?: JinaEmbeddingsAuthDTO) { const indexObject: Record = Object.create(indexProto); Object.assign(indexObject, { usage1: 'https://r.jina.ai/YOUR_URL', usage2: 'https://s.jina.ai/YOUR_SEARCH_QUERY', usage3: `${ctx.origin}/?q=YOUR_SEARCH_QUERY`, homepage: 'https://jina.ai/reader', }); if (auth && auth.user) { indexObject[''] = undefined; indexObject.authenticatedAs = `${auth.user.user_id} (${auth.user.full_name})`; indexObject.balanceLeft = auth.user.wallet.total_balance; } else { indexObject.note = 'Authentication is required to use this endpoint. Please provide a valid API key via Authorization header.'; } return indexObject; } constructor( protected globalLogger: GlobalLogger, protected rateLimitControl: RateLimitControl, protected threadLocal: AsyncLocalContext, protected googleSerp: GoogleSERP, protected serperGoogle: SerperGoogleSearchService, protected serperBing: SerperBingSearchService, protected jinaSerp: InternalJinaSerpService, ) { super(...arguments); setInterval(() => { const thisBatch = this.batchedCaches; this.batchedCaches = []; if (!thisBatch.length) { return; } const batch = SERPResult.DB.batch(); for (const x of thisBatch) { batch.set(SERPResult.COLLECTION.doc(), x.degradeForFireStore()); } batch.commit() .then(() => { this.logger.debug(`Saved ${thisBatch.length} caches by batch`); }) .catch((err) => { this.logger.warn(`Failed to cache search result in batch`, { err }); }); }, 1000 * 10 + Math.round(1000 * Math.random())).unref(); } override async init() { await this.dependencyReady(); this.emit('ready'); } @Method({ name: 'searchIndex', ext: { http: { action: ['get', 'post'], path: '/' } }, tags: ['search'], returnType: [String, OutputServerEventStream, RawString], }) @Method({ ext: { http: { action: ['get', 'post'], } }, tags: ['search'], returnType: [String, OutputServerEventStream, RawString], }) async search( @RPCReflect() rpcReflect: RPCReflection, @Ctx() ctx: Context, crawlerOptions: CrawlerOptions, auth: JinaEmbeddingsAuthDTO, @Param('type', { type: new Set(['web', 'images', 'news']), default: 'web' }) variant: 'web' | 'images' | 'news', @Param('q') q?: string, @Param('provider', { type: new Set(['google', 'bing']) }) searchEngine?: 'google' | 'bing', @Param('num', { validate: (v: number) => v >= 0 && v <= 20 }) num?: number, @Param('gl', { validate: (v: string) => WORLD_COUNTRY_CODES.includes(v?.toLowerCase()) }) gl?: string, @Param('hl', { validate: (v: string) => WORLD_LANGUAGES.some(l => l.code === v) }) _hl?: string, @Param('location') location?: string, @Param('page') page?: number, @Param('fallback') fallback?: boolean, ) { const authToken = auth.bearerToken; let highFreqKey: RateLimitCache | undefined; if (authToken && this.highFreqKeyCache.has(authToken)) { highFreqKey = this.highFreqKeyCache.get(authToken)!; auth.user = highFreqKey.user; auth.uid = highFreqKey.user?.user_id; } const uid = await auth.solveUID(); if (!q) { if (ctx.path === '/') { const indexObject = await this.getIndex(ctx, auth); if (!ctx.accepts('text/plain') && (ctx.accepts('text/json') || ctx.accepts('application/json'))) { return indexObject; } return assignTransferProtocolMeta(`${indexObject}`, { contentType: 'text/plain; charset=utf-8', envelope: null } ); } throw new ParamValidationError({ path: 'q', message: `Required but not provided` }); } // Return content by default const user = await auth.assertUser(); if (!(user.wallet.total_balance > 0)) { throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`); } if (highFreqKey?.blockedUntil) { const now = new Date(); const blockedTimeRemaining = (highFreqKey.blockedUntil.valueOf() - now.valueOf()); if (blockedTimeRemaining > 0) { this.logger.warn(`Rate limit triggered for ${uid}, this request should have been blocked`); // throw RateLimitTriggeredError.from({ // message: `Per UID rate limit exceeded (async)`, // retryAfter: Math.ceil(blockedTimeRemaining / 1000), // }); } } const PREMIUM_KEY_LIMIT = 400; const rateLimitPolicy = auth.getRateLimits('SEARCH') || [ parseInt(user.metadata?.speed_level) >= 2 ? RateLimitDesc.from({ occurrence: PREMIUM_KEY_LIMIT, periodSeconds: 60 }) : RateLimitDesc.from({ occurrence: 40, periodSeconds: 60 }) ]; const apiRollPromise = this.rateLimitControl.simpleRPCUidBasedLimit( rpcReflect, uid!, ['SEARCH'], ...rateLimitPolicy ); if (!highFreqKey) { // Normal path await apiRollPromise; if (rateLimitPolicy.some( (x) => { const rpm = x.occurrence / (x.periodSeconds / 60); if (rpm >= PREMIUM_KEY_LIMIT) { return true; } return false; }) ) { this.highFreqKeyCache.set(auth.bearerToken!, { user, }); } } else { // High freq key path apiRollPromise.then( // Rate limit not triggered, make sure not blocking. () => { delete highFreqKey.blockedUntil; }, // Rate limit triggered (err) => { if (!(err instanceof RateLimitTriggeredError)) { return; } const now = Date.now(); let tgtDate; if (err.retryAfterDate) { tgtDate = err.retryAfterDate; } else if (err.retryAfter) { tgtDate = new Date(now + err.retryAfter * 1000); } if (tgtDate) { const dt = tgtDate.valueOf() - now; highFreqKey.blockedUntil = tgtDate; setTimeout(() => { if (highFreqKey.blockedUntil === tgtDate) { delete highFreqKey.blockedUntil; } }, dt).unref(); } } ).finally(async () => { // Always asynchronously update user(wallet); const user = await auth.getBrief().catch(() => undefined); if (user) { highFreqKey.user = user; } }); } let chargeAmount = 0; rpcReflect.finally(async () => { if (chargeAmount) { auth.reportUsage(chargeAmount, `reader-search`).catch((err) => { this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) }); }); try { const apiRoll = await apiRollPromise; apiRoll.chargeAmount = chargeAmount; } catch (err) { await this.rateLimitControl.record({ uid, tags: [rpcReflect.name.toUpperCase()], status: API_CALL_STATUS.SUCCESS, chargeAmount, }).save().catch((err) => { this.logger.warn(`Failed to save rate limit record`, { err: marshalErrorLike(err) }); }); } } }); let chargeAmountScaler = 1; if (searchEngine === 'bing') { chargeAmountScaler = 3; } if (variant !== 'web') { chargeAmountScaler = 5; } let realQuery = q; let queryTerms = q.split(/\s+/g).filter((x) => !!x); let results = await this.cachedSearch(variant, { provider: searchEngine, q, num, gl, // hl, location, page, }, crawlerOptions); if (fallback && !results?.length && (!page || page === 1)) { let tryTimes = 1; const containsRTL = /[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\u0590-\u05FF\uFB1D-\uFB4F\u0700-\u074F\u0780-\u07BF\u07C0-\u07FF]/.test(q); const lastResort = (containsRTL ? queryTerms.slice(queryTerms.length - 2) : queryTerms.slice(0, 2)).join(' '); const n = 4; let terms: string[] = []; while (tryTimes < n) { const delta = Math.ceil(queryTerms.length / n) * tryTimes; terms = containsRTL ? queryTerms.slice(delta) : queryTerms.slice(0, queryTerms.length - delta); const query = terms.join(' '); if (!query) { break; } if (realQuery === query) { continue; } tryTimes += 1; realQuery = query; this.logger.info(`Retrying search with fallback query: "${realQuery}"`); results = await this.cachedSearch(variant, { provider: searchEngine, q: realQuery, num, gl, // hl, location, }, crawlerOptions); if (results?.length) { break; } } if (!results?.length && realQuery.length > lastResort.length) { realQuery = lastResort; this.logger.info(`Retrying search with fallback query: "${realQuery}"`); tryTimes += 1; results = await this.cachedSearch(variant, { provider: searchEngine, q: realQuery, num, gl, // hl, location, }, crawlerOptions); } chargeAmountScaler *= tryTimes; } if (!results?.length) { results = []; } const finalResults = results.map((x: any) => this.mapToFinalResults(x)); await Promise.all(finalResults.map((x: any) => this.assignGeneralMixin(x))); chargeAmount = this.assignChargeAmount(finalResults, chargeAmountScaler); assignMeta(finalResults, { query: realQuery, fallback: realQuery === q ? undefined : realQuery, }); return finalResults; } assignChargeAmount(items: unknown[], scaler: number) { const numCharge = Math.ceil(items.length / 10) * 10000 * scaler; assignMeta(items, { usage: { tokens: numCharge } }); return numCharge; } async getFavicon(domain: string) { const url = `https://www.google.com/s2/favicons?sz=32&domain_url=${domain}`; try { const response = await fetch(url); if (!response.ok) { return ''; } const ab = await response.arrayBuffer(); const buffer = Buffer.from(ab); const base64 = buffer.toString('base64'); return `data:image/png;base64,${base64}`; } catch (error: any) { this.logger.warn(`Failed to get favicon base64 string`, { err: marshalErrorLike(error) }); return ''; } } async configure(opts: CrawlerOptions) { const crawlOpts: ScrappingOptions = { proxyUrl: opts.proxyUrl, cookies: opts.setCookies, overrideUserAgent: opts.userAgent, timeoutMs: opts.timeout ? opts.timeout * 1000 : undefined, locale: opts.locale, referer: opts.referer, viewport: opts.viewport, proxyResources: (opts.proxyUrl || opts.proxy?.endsWith('+')) ? true : false, allocProxy: opts.proxy?.endsWith('+') ? opts.proxy.slice(0, -1) : opts.proxy, }; if (opts.locale) { crawlOpts.extraHeaders ??= {}; crawlOpts.extraHeaders['Accept-Language'] = opts.locale; } return crawlOpts; } mapToFinalResults(input: WebSearchEntry) { const whitelistedProps = [ 'imageUrl', 'imageWidth', 'imageHeight', 'source', 'date', 'siteLinks' ]; const result = { title: input.title, url: input.link, description: Reflect.get(input, 'snippet'), ..._.pick(input, whitelistedProps), }; return result; } *iterProviders(preference?: string, variant?: string) { if (preference === 'bing') { yield this.serperBing; yield this.serperGoogle; yield this.googleSerp; return; } if (preference === 'google') { yield this.googleSerp; yield this.googleSerp; yield this.serperGoogle; return; } // yield variant === 'web' ? this.jinaSerp : this.serperGoogle; yield this.serperGoogle yield this.serperGoogle; yield this.googleSerp; } async cachedSearch(variant: 'web' | 'news' | 'images', query: Record, opts: CrawlerOptions) { const queryDigest = objHashMd5B64Of({ ...query, variant }); const provider = query.provider; Reflect.deleteProperty(query, 'provider'); const noCache = opts.noCache; let cache; if (!noCache) { cache = (await SERPResult.fromFirestoreQuery( SERPResult.COLLECTION.where('queryDigest', '==', queryDigest) .orderBy('createdAt', 'desc') .limit(1) ))[0]; if (cache) { const age = Date.now() - cache.createdAt.valueOf(); const stale = cache.createdAt.valueOf() < (Date.now() - this.cacheValidMs); this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for search query "${query.q}", normalized digest: ${queryDigest}, ${age}ms old`, { query, digest: queryDigest, age, stale }); if (!stale) { return cache.response as any; } } } const scrappingOptions = await this.configure(opts); try { let r: any[] | undefined; let lastError; outerLoop: for (const client of this.iterProviders(provider, variant)) { const t0 = Date.now(); try { switch (variant) { case 'images': { r = await Reflect.apply(client.imageSearch, client, [query, scrappingOptions]); break; } case 'news': { r = await Reflect.apply(client.newsSearch, client, [query, scrappingOptions]); break; } case 'web': default: { r = await Reflect.apply(client.webSearch, client, [query, scrappingOptions]); break; } } const dt = Date.now() - t0; this.logger.info(`Search took ${dt}ms, ${client.constructor.name}(${variant})`, { searchDt: dt, variant, client: client.constructor.name }); break outerLoop; } catch (err) { lastError = err; const dt = Date.now() - t0; this.logger.warn(`Failed to do ${variant} search using ${client.constructor.name}`, { err, variant, searchDt: dt, }); } } if (r?.length) { const nowDate = new Date(); const record = SERPResult.from({ query, queryDigest, response: r, createdAt: nowDate, expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) }); this.batchedCaches.push(record); } else if (lastError) { throw lastError; } return r; } catch (err: any) { if (cache) { this.logger.warn(`Failed to fetch search result, but a stale cache is available. falling back to stale cache`, { err: marshalErrorLike(err) }); return cache.response as any; } throw err; } } async assignGeneralMixin(result: Partial) { const collectFavicon = this.threadLocal.get('collect-favicon'); if (collectFavicon && result.link) { const url = new URL(result.link); Reflect.set(result, 'favicon', await this.getFavicon(url.origin)); } } }