Spaces:
Running
Running
| /** | |
| * src/services/JobQueue.ts | |
| * | |
| * Job routing layer for titan_builder_queue. | |
| * | |
| * Changes from v2: | |
| * - handleWebView: ููุฑุฃ buildType + manifestUrl ู ู Firestore | |
| * ููู ุฑุฑูู ูู triggerWebViewBuild | |
| * - ูู buildType = pwa_twa ุฃู twa_native ูุณุฌู ุฐูู ูู ุงูููุฌ | |
| */ | |
| import admin from 'firebase-admin'; | |
| import { logger } from '../utils/logger'; | |
| import { | |
| FirebaseManager, | |
| QUEUE_COLLECTION, | |
| type QueueDoc, | |
| type TranspilerJob, | |
| } from '../managers/FirebaseManager'; | |
| import { LLMOrchestrator } from './llmOrchestrator'; | |
| import { GitHubActionsService } from './githubActions'; | |
| import { hfStorage } from './hfStorage'; | |
| // โโโ Job Queue Service โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| export class JobQueue { | |
| private readonly firebase: FirebaseManager; | |
| private readonly orchestrator: LLMOrchestrator; | |
| private readonly github: GitHubActionsService; | |
| private readonly inFlight = new Set<string>(); | |
| private stopFn: (() => void) | null = null; | |
| constructor() { | |
| this.firebase = FirebaseManager.getInstance(); | |
| this.orchestrator = new LLMOrchestrator(); | |
| this.github = new GitHubActionsService(); | |
| } | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| // Lifecycle | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| start(): () => void { | |
| const db = (this.firebase as unknown as { db: admin.firestore.Firestore }).db | |
| ?? admin.firestore(); | |
| const query = db | |
| .collection(QUEUE_COLLECTION) | |
| .where('status', '==', 'pending') | |
| .orderBy('createdAt', 'asc'); | |
| const unsubscribe = query.onSnapshot( | |
| async (snapshot) => { | |
| for (const change of snapshot.docChanges()) { | |
| if (change.type !== 'added' && change.type !== 'modified') continue; | |
| const doc = change.doc; | |
| const data = doc.data() as Omit<QueueDoc, 'id'>; | |
| if (data.status !== 'pending') continue; | |
| const job: QueueDoc = { id: doc.id, ...data }; | |
| if (this.inFlight.has(job.id)) continue; | |
| const claimed = await this.claimJob(doc.ref); | |
| if (!claimed) continue; | |
| this.inFlight.add(job.id); | |
| logger.info(`JobQueue: claimed job ${job.id} (type="${job.type}")`); | |
| this.dispatch(job) | |
| .catch(err => | |
| logger.error(`JobQueue: unhandled error for job ${job.id}`, err) | |
| ) | |
| .finally(() => this.inFlight.delete(job.id)); | |
| } | |
| }, | |
| (err) => { | |
| logger.error('JobQueue: Firestore listener error', err); | |
| } | |
| ); | |
| this.stopFn = unsubscribe; | |
| logger.info(`JobQueue: โ listening on "${QUEUE_COLLECTION}"`); | |
| return unsubscribe; | |
| } | |
| stop(): void { | |
| this.stopFn?.(); | |
| this.stopFn = null; | |
| logger.info('JobQueue: listener stopped'); | |
| } | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| // Distributed Claim | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| private async claimJob( | |
| docRef: admin.firestore.DocumentReference | |
| ): Promise<boolean> { | |
| try { | |
| await admin.firestore().runTransaction(async (tx) => { | |
| const snap = await tx.get(docRef); | |
| if (!snap.exists) throw new Error('Job disappeared'); | |
| const status = (snap.data() as Omit<QueueDoc, 'id'>).status; | |
| if (status !== 'pending') throw new Error(`Already claimed: ${status}`); | |
| tx.update(docRef, { | |
| status: 'processing', | |
| updatedAt: admin.firestore.Timestamp.now(), | |
| }); | |
| }); | |
| return true; | |
| } catch { | |
| return false; | |
| } | |
| } | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| // Dispatch | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| private async dispatch(job: QueueDoc): Promise<void> { | |
| try { | |
| switch (job.type) { | |
| case 'transpile': await this.handleTranspile(job); break; | |
| case 'build': await this.handleBuild(job); break; | |
| case 'webview': await this.handleWebView(job); break; | |
| default: { | |
| const unknown = (job as QueueDoc).type; | |
| logger.warn(`JobQueue: unknown type "${unknown}" for job ${job.id}`); | |
| await this.firebase.updateQueueDoc(job.id, { | |
| status: 'failed', | |
| errorMessage: `Unknown job type: "${unknown}"`, | |
| }); | |
| } | |
| } | |
| } catch (err) { | |
| const msg = err instanceof Error ? err.message : String(err); | |
| logger.error(`JobQueue: top-level dispatch error for job ${job.id} โ ${msg}`); | |
| await this.firebase.updateQueueDoc(job.id, { | |
| status: 'failed', | |
| errorMessage: msg, | |
| }).catch(() => undefined); | |
| } | |
| } | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| // Handler: 'transpile' | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| private async handleTranspile(job: QueueDoc): Promise<void> { | |
| logger.info(`JobQueue[transpile]: starting job ${job.id}`); | |
| if (!job.zipUrl) { | |
| await this.firebase.updateQueueDoc(job.id, { | |
| status: 'failed', | |
| errorMessage: 'transpile job is missing required field: zipUrl', | |
| }); | |
| return; | |
| } | |
| const transpilerJob: TranspilerJob = { | |
| id: job.id, | |
| zipUrl: job.zipUrl, | |
| appName: job.appName, | |
| status: 'processing', | |
| createdAt: job.createdAt ?? admin.firestore.Timestamp.now(), | |
| }; | |
| await this.firebase.createTranspilerJobDoc(transpilerJob); | |
| await this.orchestrator.processJob(transpilerJob); | |
| const result = await this.firebase.getJob(job.id); | |
| if (result?.status === 'completed' && result.outputUrl) { | |
| await this.firebase.updateQueueDoc(job.id, { | |
| status: 'completed', | |
| resultZipUrl: result.outputUrl, | |
| }); | |
| logger.info(`JobQueue[transpile]: โ completed โ job ${job.id}`); | |
| } else { | |
| const reason = result?.errorMessage ?? 'Pipeline ended without an output URL'; | |
| await this.firebase.updateQueueDoc(job.id, { | |
| status: 'failed', | |
| errorMessage: reason, | |
| }); | |
| logger.warn(`JobQueue[transpile]: โ failed โ job ${job.id}: ${reason}`); | |
| } | |
| } | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| // Handler: 'build' | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| /** | |
| * The ZIP arrives as a base64 string in job.zipBase64. | |
| * No Firebase Storage needed. | |
| */ | |
| private async handleBuild(job: QueueDoc): Promise<void> { | |
| logger.info(`JobQueue[build]: starting job ${job.id}`); | |
| if (!job.zipBase64) { | |
| await this.firebase.updateQueueDoc(job.id, { | |
| status: 'failed', | |
| errorMessage: 'build job is missing required field: zipBase64', | |
| }); | |
| return; | |
| } | |
| try { | |
| const buffer = Buffer.from(job.zipBase64, 'base64'); | |
| await this.github.triggerBuild(buffer, job.id); | |
| await this.firebase.updateQueueDoc(job.id, { status: 'building' }); | |
| logger.info(`JobQueue[build]: โ GitHub build triggered โ job ${job.id}`); | |
| } catch (err) { | |
| const msg = err instanceof Error ? err.message : String(err); | |
| logger.error(`JobQueue[build]: โ failed โ job ${job.id}: ${msg}`); | |
| await this.firebase.updateQueueDoc(job.id, { | |
| status: 'failed', | |
| errorMessage: msg, | |
| }); | |
| } | |
| } | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| // Handler: 'webview' | |
| // โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| /** | |
| * Handles webview / pwa_twa / twa_native jobs. | |
| * | |
| * Changes v3: | |
| * - ููุฑุฃ buildType + manifestUrl ู ู ุงูู Firestore doc | |
| * - ูุณุฌู ูู ุงูููุฌ ูู ูุงู TWA build | |
| * - ูู ุฑุฑ buildType + manifestUrl ูู triggerWebViewBuild | |
| */ | |
| private async handleWebView(job: QueueDoc): Promise<void> { | |
| logger.info(`JobQueue[webview]: starting job ${job.id}`); | |
| if (!job.url || !job.appName) { | |
| await this.firebase.updateQueueDoc(job.id, { | |
| status: 'failed', | |
| errorMessage: 'webview job is missing required fields: url, appName', | |
| }); | |
| return; | |
| } | |
| // โโ ูุฑุงุกุฉ buildType + manifestUrl โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| const buildType = job.buildType ?? 'webview'; | |
| const manifestUrl = job.manifestUrl ?? ''; | |
| // โโ ุชุณุฌูู ุฎุงุต ููู TWA builds โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| if (buildType === 'pwa_twa' || buildType === 'twa_native') { | |
| logger.info( | |
| `JobQueue[webview]: ๐ท TWA build detected โ ` + | |
| `buildType="${buildType}", manifestUrl="${manifestUrl}", job=${job.id}` | |
| ); | |
| } | |
| try { | |
| // โโ ุฑูุน ุงูุฃููููุฉ ุนูู HF Dataset ูู ู ูุฌูุฏุฉ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| let finalIconUrl = job.iconUrl || ''; | |
| if (job.iconBase64 && job.iconMime) { | |
| logger.info(`JobQueue[webview]: uploading icon for job ${job.id}`); | |
| const ext = job.iconMime.split('/')[1] || 'png'; | |
| const filename = `icons/${job.id}/icon.${ext}`; | |
| const buf = Buffer.from(job.iconBase64, 'base64'); | |
| finalIconUrl = await hfStorage.uploadBuffer(buf, filename, job.iconMime); | |
| logger.info(`JobQueue[webview]: icon uploaded โ ${finalIconUrl}`); | |
| } else if (job.iconUrl && job.iconUrl.startsWith('http')) { | |
| try { | |
| finalIconUrl = await hfStorage.uploadIconFromUrl(job.iconUrl, job.id); | |
| } catch { | |
| logger.warn(`JobQueue[webview]: icon re-upload failed, using original URL`); | |
| finalIconUrl = job.iconUrl; | |
| } | |
| } | |
| // โโ Trigger GitHub Actions โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| await this.github.triggerWebViewBuild({ | |
| url: job.url, | |
| appName: job.appName, | |
| iconUrl: finalIconUrl, | |
| trackingId: job.id, | |
| enableCamera: job.enableCamera ?? true, | |
| enableStorage: job.enableStorage ?? true, | |
| enableLocation: job.enableLocation ?? false, | |
| enableNotifications: job.enableNotifications ?? false, | |
| buildType, // โ ุฌุฏูุฏ: ููู ุฑููุฑ ู ู Firestore | |
| manifestUrl, // โ ุฌุฏูุฏ: ููู ุฑููุฑ ู ู Firestore | |
| }); | |
| await this.firebase.updateQueueDoc(job.id, { status: 'building' }); | |
| logger.info( | |
| `JobQueue[webview]: โ GitHub ${buildType} build triggered โ job ${job.id}` | |
| ); | |
| } catch (err) { | |
| const msg = err instanceof Error ? err.message : String(err); | |
| logger.error(`JobQueue[webview]: โ failed โ job ${job.id}: ${msg}`); | |
| await this.firebase.updateQueueDoc(job.id, { | |
| status: 'failed', | |
| errorMessage: msg, | |
| }); | |
| } | |
| } | |
| } | |