"use strict"; var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) { var c = arguments.length, r = c < 3 ? target : desc === null ? desc = Object.getOwnPropertyDescriptor(target, key) : desc, d; if (typeof Reflect === "object" && typeof Reflect.decorate === "function") r = Reflect.decorate(decorators, target, key, desc); else for (var i = decorators.length - 1; i >= 0; i--) if (d = decorators[i]) r = (c < 3 ? d(r) : c > 3 ? d(target, key, r) : d(target, key)) || r; return c > 3 && r && Object.defineProperty(target, key, r), r; }; var __metadata = (this && this.__metadata) || function (k, v) { if (typeof Reflect === "object" && typeof Reflect.metadata === "function") return Reflect.metadata(k, v); }; var __param = (this && this.__param) || function (paramIndex, decorator) { return function (target, key) { decorator(target, key, paramIndex); } }; var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; var _a, _b, _c; Object.defineProperty(exports, "__esModule", { value: true }); exports.DataCrunchingHost = void 0; const civkit_1 = require("civkit"); const tsyringe_1 = require("tsyringe"); const shared_1 = require("../shared"); const lodash_1 = __importDefault(require("lodash")); const crawler_1 = require("../api/crawler"); const crawled_1 = require("../db/crawled"); const dayjs_1 = __importDefault(require("dayjs")); const fs_1 = require("fs"); const promises_1 = require("fs/promises"); const zlib_1 = require("zlib"); const functions_1 = require("firebase-admin/functions"); const snapshot_formatter_1 = require("../services/snapshot-formatter"); const get_function_url_1 = require("../utils/get-function-url"); dayjs_1.default.extend(require('dayjs/plugin/utc')); let DataCrunchingHost = class DataCrunchingHost extends civkit_1.RPCHost { constructor(globalLogger, crawler, snapshotFormatter, tempFileManager, firebaseObjectStorage) { super(...lodash_1.default.without(arguments, crawler)); this.globalLogger = globalLogger; this.crawler = crawler; this.snapshotFormatter = snapshotFormatter; this.tempFileManager = tempFileManager; this.firebaseObjectStorage = firebaseObjectStorage; this.logger = this.globalLogger.child({ service: this.constructor.name }); this.pageCacheCrunchingPrefix = 'crunched-pages'; this.pageCacheCrunchingBatchSize = 5000; this.pageCacheCrunchingTMinus = 6 * 24 * 60 * 60 * 1000; this.rev = 7; } async init() { await this.dependencyReady(); this.emit('ready'); } // @CloudTaskV2({ // runtime: { // cpu: 2, // memory: '4GiB', // timeoutSeconds: 3600, // concurrency: 2, // maxInstances: 200, // retryConfig: { // maxAttempts: 3, // minBackoffSeconds: 60, // }, // rateLimits: { // maxConcurrentDispatches: 150, // maxDispatchesPerSecond: 2, // }, // }, // tags: ['DataCrunching'], // }) async crunchPageCacheWorker(date, offset) { this.logger.info(`Crunching page cache @${date}+${offset}...`); for await (const { fileName, records } of this.iterPageCacheRecords(date, offset)) { this.logger.info(`Crunching ${fileName}...`); const fileOnDrive = await this.crunchCacheRecords(records); const fstream = (0, fs_1.createReadStream)(fileOnDrive.path); const gzipStream = (0, zlib_1.createGzip)(); fstream.pipe(gzipStream, { end: true }); await this.firebaseObjectStorage.bucket.file(fileName).save(gzipStream, { contentType: 'application/jsonl+gzip', }); } this.logger.info(`Crunching page cache @${date}+${offset} done.`); return true; } // @CloudScheduleV2('2 0 * * *', { // name: 'crunchPageCacheEveryday', // runtime: { // cpu: 2, // memory: '4GiB', // timeoutSeconds: 1800, // timeZone: 'UTC', // retryCount: 3, // minBackoffSeconds: 60, // }, // tags: ['DataCrunching'], // }) async dispatchPageCacheCrunching() { for await (const { fileName, date, offset } of this.iterPageCacheChunks()) { this.logger.info(`Dispatching ${fileName}...`); // sse.write({ data: `Dispatching ${fileName}...` }); await (0, functions_1.getFunctions)().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, { dispatchDeadlineSeconds: 1800, uri: await (0, get_function_url_1.getFunctionUrl)('crunchPageCacheWorker'), }); } return true; } // @CloudHTTPv2({ // runtime: { // cpu: 2, // memory: '4GiB', // timeoutSeconds: 3600, // concurrency: 2, // maxInstances: 200, // }, // tags: ['DataCrunching'], // }) // async dispatchPageCacheCrunching( // @RPCReflect() rpcReflect: RPCReflection // ) { // const sse = new OutputServerEventStream({ highWaterMark: 4096 }); // rpcReflect.return(sse); // rpcReflect.catch((err) => { // sse.end({ data: `Error: ${err.message}` }); // }); // for await (const { fileName, date, offset } of this.iterPageCacheChunks()) { // this.logger.info(`Dispatching ${fileName}...`); // sse.write({ data: `Dispatching ${fileName}...` }); // await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, { // dispatchDeadlineSeconds: 1800, // uri: await getFunctionUrl('crunchPageCacheWorker'), // }); // } // sse.end({ data: 'done' }); // return true; // } async *iterPageCacheRecords(date, inputOffset) { const startOfToday = (0, dayjs_1.default)().utc().startOf('day'); const startingPoint = (0, dayjs_1.default)().utc().subtract(this.pageCacheCrunchingTMinus, 'ms').startOf('day'); let theDay = startingPoint; if (date) { theDay = (0, dayjs_1.default)(date).utc().startOf('day'); } let counter = 0; if (inputOffset) { counter = parseInt(inputOffset, 10); } while (theDay.isBefore(startOfToday)) { const fileName = `${this.pageCacheCrunchingPrefix}/r${this.rev}/${theDay.format('YYYY-MM-DD')}/${counter}.jsonl.gz`; const offset = counter; counter += this.pageCacheCrunchingBatchSize; const fileExists = (await this.firebaseObjectStorage.bucket.file(fileName).exists())[0]; if (fileExists) { continue; } const records = await crawled_1.Crawled.fromFirestoreQuery(crawled_1.Crawled.COLLECTION .where('createdAt', '>=', theDay.toDate()) .where('createdAt', '<', theDay.add(1, 'day').toDate()) .orderBy('createdAt', 'asc') .offset(offset) .limit(this.pageCacheCrunchingBatchSize)); this.logger.info(`Found ${records.length} records for ${theDay.format('YYYY-MM-DD')} at offset ${offset}`, { fileName, counter }); if (!records.length) { if (date) { break; } theDay = theDay.add(1, 'day'); counter = 0; continue; } yield { fileName, records }; if (offset) { break; } } } async *iterPageCacheChunks() { const startOfToday = (0, dayjs_1.default)().utc().startOf('day'); const startingPoint = (0, dayjs_1.default)().utc().subtract(this.pageCacheCrunchingTMinus, 'ms').startOf('day'); let theDay = startingPoint; let counter = 0; while (theDay.isBefore(startOfToday)) { const fileName = `${this.pageCacheCrunchingPrefix}/r${this.rev}/${theDay.format('YYYY-MM-DD')}/${counter}.jsonl.gz`; const offset = counter; counter += this.pageCacheCrunchingBatchSize; const fileExists = (await this.firebaseObjectStorage.bucket.file(fileName).exists())[0]; if (fileExists) { continue; } const nRecords = (await crawled_1.Crawled.COLLECTION .where('createdAt', '>=', theDay.toDate()) .where('createdAt', '<', theDay.add(1, 'day').toDate()) .orderBy('createdAt', 'asc') .offset(offset) .limit(this.pageCacheCrunchingBatchSize) .count().get()).data().count; this.logger.info(`Found ${nRecords} records for ${theDay.format('YYYY-MM-DD')} at offset ${offset}`, { fileName, counter }); if (nRecords < this.pageCacheCrunchingBatchSize) { theDay = theDay.add(1, 'day'); counter = 0; } if (nRecords) { yield { fileName, date: theDay.toISOString(), offset }; } } } async crunchCacheRecords(records) { const throttle = new civkit_1.PromiseThrottle(30); const localFilePath = this.tempFileManager.alloc(); let nextDrainDeferred = (0, civkit_1.Defer)(); nextDrainDeferred.resolve(); for (const record of records) { await throttle.acquire(); this.firebaseObjectStorage.downloadFile(`snapshots/${record._id}`) .then(async (snapshotTxt) => { try { const snapshot = JSON.parse(snapshotTxt.toString('utf-8')); let formatted = await this.snapshotFormatter.formatSnapshot('default', snapshot); if (!formatted.content) { formatted = await this.snapshotFormatter.formatSnapshot('markdown', snapshot); } await nextDrainDeferred.promise; await (0, promises_1.appendFile)(localFilePath, JSON.stringify({ url: snapshot.href, title: snapshot.title || '', html: snapshot.html || '', text: snapshot.text || '', content: formatted.content || '', }) + '\n', { encoding: 'utf-8' }); } catch (err) { this.logger.warn(`Failed to parse snapshot for ${record._id}`, { err }); } }) .finally(() => { throttle.release(); }); } await throttle.nextDrain(); const ro = { path: localFilePath }; this.tempFileManager.bindPathTo(ro, localFilePath); return ro; } }; exports.DataCrunchingHost = DataCrunchingHost; __decorate([ __param(0, (0, shared_1.Param)('date')), __param(1, (0, shared_1.Param)('offset', { default: 0 })), __metadata("design:type", Function), __metadata("design:paramtypes", [String, Number]), __metadata("design:returntype", Promise) ], DataCrunchingHost.prototype, "crunchPageCacheWorker", null); exports.DataCrunchingHost = DataCrunchingHost = __decorate([ (0, tsyringe_1.singleton)(), __metadata("design:paramtypes", [typeof (_a = typeof shared_1.Logger !== "undefined" && shared_1.Logger) === "function" ? _a : Object, crawler_1.CrawlerHost, snapshot_formatter_1.SnapshotFormatter, typeof (_b = typeof shared_1.TempFileManager !== "undefined" && shared_1.TempFileManager) === "function" ? _b : Object, typeof (_c = typeof shared_1.FirebaseStorageBucketControl !== "undefined" && shared_1.FirebaseStorageBucketControl) === "function" ? _c : Object]) ], DataCrunchingHost); //# sourceMappingURL=data-crunching.js.map