/** * src/services/JobQueue.ts * * Job routing layer for titan_builder_queue. * * Changes from v2: * - handleWebView: يقرأ buildType + manifestUrl من Firestore * ويمررهم لـ triggerWebViewBuild * - لو buildType = pwa_twa أو twa_native يسجل ذلك في اللوج */ import admin from 'firebase-admin'; import { logger } from '../utils/logger'; import { FirebaseManager, QUEUE_COLLECTION, type QueueDoc, type TranspilerJob, } from '../managers/FirebaseManager'; import { LLMOrchestrator } from './llmOrchestrator'; import { GitHubActionsService } from './githubActions'; import { hfStorage } from './hfStorage'; // ─── Job Queue Service ──────────────────────────────────────────────────────── export class JobQueue { private readonly firebase: FirebaseManager; private readonly orchestrator: LLMOrchestrator; private readonly github: GitHubActionsService; private readonly inFlight = new Set(); private stopFn: (() => void) | null = null; constructor() { this.firebase = FirebaseManager.getInstance(); this.orchestrator = new LLMOrchestrator(); this.github = new GitHubActionsService(); } // ═══════════════════════════════════════════════════════════════════════════ // Lifecycle // ═══════════════════════════════════════════════════════════════════════════ start(): () => void { const db = (this.firebase as unknown as { db: admin.firestore.Firestore }).db ?? admin.firestore(); const query = db .collection(QUEUE_COLLECTION) .where('status', '==', 'pending') .orderBy('createdAt', 'asc'); const unsubscribe = query.onSnapshot( async (snapshot) => { for (const change of snapshot.docChanges()) { if (change.type !== 'added' && change.type !== 'modified') continue; const doc = change.doc; const data = doc.data() as Omit; if (data.status !== 'pending') continue; const job: QueueDoc = { id: doc.id, ...data }; if (this.inFlight.has(job.id)) continue; const claimed = await this.claimJob(doc.ref); if (!claimed) continue; this.inFlight.add(job.id); logger.info(`JobQueue: claimed job ${job.id} (type="${job.type}")`); this.dispatch(job) .catch(err => logger.error(`JobQueue: unhandled error for job ${job.id}`, err) ) .finally(() => this.inFlight.delete(job.id)); } }, (err) => { logger.error('JobQueue: Firestore listener error', err); } ); this.stopFn = unsubscribe; logger.info(`JobQueue: ✅ listening on "${QUEUE_COLLECTION}"`); return unsubscribe; } stop(): void { this.stopFn?.(); this.stopFn = null; logger.info('JobQueue: listener stopped'); } // ═══════════════════════════════════════════════════════════════════════════ // Distributed Claim // ═══════════════════════════════════════════════════════════════════════════ private async claimJob( docRef: admin.firestore.DocumentReference ): Promise { try { await admin.firestore().runTransaction(async (tx) => { const snap = await tx.get(docRef); if (!snap.exists) throw new Error('Job disappeared'); const status = (snap.data() as Omit).status; if (status !== 'pending') throw new Error(`Already claimed: ${status}`); tx.update(docRef, { status: 'processing', updatedAt: admin.firestore.Timestamp.now(), }); }); return true; } catch { return false; } } // ═══════════════════════════════════════════════════════════════════════════ // Dispatch // ═══════════════════════════════════════════════════════════════════════════ private async dispatch(job: QueueDoc): Promise { try { switch (job.type) { case 'transpile': await this.handleTranspile(job); break; case 'build': await this.handleBuild(job); break; case 'webview': await this.handleWebView(job); break; default: { const unknown = (job as QueueDoc).type; logger.warn(`JobQueue: unknown type "${unknown}" for job ${job.id}`); await this.firebase.updateQueueDoc(job.id, { status: 'failed', errorMessage: `Unknown job type: "${unknown}"`, }); } } } catch (err) { const msg = err instanceof Error ? err.message : String(err); logger.error(`JobQueue: top-level dispatch error for job ${job.id} — ${msg}`); await this.firebase.updateQueueDoc(job.id, { status: 'failed', errorMessage: msg, }).catch(() => undefined); } } // ═══════════════════════════════════════════════════════════════════════════ // Handler: 'transpile' // ═══════════════════════════════════════════════════════════════════════════ private async handleTranspile(job: QueueDoc): Promise { logger.info(`JobQueue[transpile]: starting job ${job.id}`); if (!job.zipUrl) { await this.firebase.updateQueueDoc(job.id, { status: 'failed', errorMessage: 'transpile job is missing required field: zipUrl', }); return; } const transpilerJob: TranspilerJob = { id: job.id, zipUrl: job.zipUrl, appName: job.appName, status: 'processing', createdAt: job.createdAt ?? admin.firestore.Timestamp.now(), }; await this.firebase.createTranspilerJobDoc(transpilerJob); await this.orchestrator.processJob(transpilerJob); const result = await this.firebase.getJob(job.id); if (result?.status === 'completed' && result.outputUrl) { await this.firebase.updateQueueDoc(job.id, { status: 'completed', resultZipUrl: result.outputUrl, }); logger.info(`JobQueue[transpile]: ✅ completed — job ${job.id}`); } else { const reason = result?.errorMessage ?? 'Pipeline ended without an output URL'; await this.firebase.updateQueueDoc(job.id, { status: 'failed', errorMessage: reason, }); logger.warn(`JobQueue[transpile]: ❌ failed — job ${job.id}: ${reason}`); } } // ═══════════════════════════════════════════════════════════════════════════ // Handler: 'build' // ═══════════════════════════════════════════════════════════════════════════ /** * The ZIP arrives as a base64 string in job.zipBase64. * No Firebase Storage needed. */ private async handleBuild(job: QueueDoc): Promise { logger.info(`JobQueue[build]: starting job ${job.id}`); if (!job.zipBase64) { await this.firebase.updateQueueDoc(job.id, { status: 'failed', errorMessage: 'build job is missing required field: zipBase64', }); return; } try { const buffer = Buffer.from(job.zipBase64, 'base64'); await this.github.triggerBuild(buffer, job.id); await this.firebase.updateQueueDoc(job.id, { status: 'building' }); logger.info(`JobQueue[build]: ✅ GitHub build triggered — job ${job.id}`); } catch (err) { const msg = err instanceof Error ? err.message : String(err); logger.error(`JobQueue[build]: ❌ failed — job ${job.id}: ${msg}`); await this.firebase.updateQueueDoc(job.id, { status: 'failed', errorMessage: msg, }); } } // ═══════════════════════════════════════════════════════════════════════════ // Handler: 'webview' // ═══════════════════════════════════════════════════════════════════════════ /** * Handles webview / pwa_twa / twa_native jobs. * * Changes v3: * - يقرأ buildType + manifestUrl من الـ Firestore doc * - يسجل في اللوج لو كان TWA build * - يمرر buildType + manifestUrl لـ triggerWebViewBuild */ private async handleWebView(job: QueueDoc): Promise { logger.info(`JobQueue[webview]: starting job ${job.id}`); if (!job.url || !job.appName) { await this.firebase.updateQueueDoc(job.id, { status: 'failed', errorMessage: 'webview job is missing required fields: url, appName', }); return; } // ── قراءة buildType + manifestUrl ───────────────────────────────────────── const buildType = job.buildType ?? 'webview'; const manifestUrl = job.manifestUrl ?? ''; // ── تسجيل خاص للـ TWA builds ───────────────────────────────────────────── if (buildType === 'pwa_twa' || buildType === 'twa_native') { logger.info( `JobQueue[webview]: 🔷 TWA build detected — ` + `buildType="${buildType}", manifestUrl="${manifestUrl}", job=${job.id}` ); } try { // ── رفع الأيقونة على HF Dataset لو موجودة ────────────────────────────── let finalIconUrl = job.iconUrl || ''; if (job.iconBase64 && job.iconMime) { logger.info(`JobQueue[webview]: uploading icon for job ${job.id}`); const ext = job.iconMime.split('/')[1] || 'png'; const filename = `icons/${job.id}/icon.${ext}`; const buf = Buffer.from(job.iconBase64, 'base64'); finalIconUrl = await hfStorage.uploadBuffer(buf, filename, job.iconMime); logger.info(`JobQueue[webview]: icon uploaded → ${finalIconUrl}`); } else if (job.iconUrl && job.iconUrl.startsWith('http')) { try { finalIconUrl = await hfStorage.uploadIconFromUrl(job.iconUrl, job.id); } catch { logger.warn(`JobQueue[webview]: icon re-upload failed, using original URL`); finalIconUrl = job.iconUrl; } } // ── Trigger GitHub Actions ──────────────────────────────────────────── await this.github.triggerWebViewBuild({ url: job.url, appName: job.appName, iconUrl: finalIconUrl, trackingId: job.id, enableCamera: job.enableCamera ?? true, enableStorage: job.enableStorage ?? true, enableLocation: job.enableLocation ?? false, enableNotifications: job.enableNotifications ?? false, buildType, // ← جديد: يُمرَّر من Firestore manifestUrl, // ← جديد: يُمرَّر من Firestore }); await this.firebase.updateQueueDoc(job.id, { status: 'building' }); logger.info( `JobQueue[webview]: ✅ GitHub ${buildType} build triggered — job ${job.id}` ); } catch (err) { const msg = err instanceof Error ? err.message : String(err); logger.error(`JobQueue[webview]: ❌ failed — job ${job.id}: ${msg}`); await this.firebase.updateQueueDoc(job.id, { status: 'failed', errorMessage: msg, }); } } }