/** * src/managers/FirebaseManager.ts * * Singleton wrapper around the Firebase Admin SDK. * Responsibilities: * - Initialize the Admin SDK once from env (service account JSON string) * - Real-time Firestore listener for `titan_transpiler_jobs` * - Atomic job claiming via Firestore transactions (prevents duplicate processing * if multiple workers are ever running) * - Firebase Storage upload / download helpers */ import admin from 'firebase-admin'; import axios from 'axios'; import fs from 'fs'; import { pipeline } from 'stream/promises'; import type { Firestore } from 'firebase-admin/firestore'; import { logger } from '../utils/logger'; import { ENV } from '../config/env'; // ─── Job Document Types ─────────────────────────────────────────────────────── export type JobStatus = | 'pending' | 'processing' | 'transpiling' | 'auditing' | 'completed' | 'failed'; export interface TranspilerJob { /** Firestore document ID */ id: string; /** gs:// or https:// URL of the uploaded Website.zip in Firebase Storage */ zipUrl: string; status: JobStatus; appName?: string; createdAt: admin.firestore.Timestamp; updatedAt?: admin.firestore.Timestamp; /** Signed download URL for the generated Flutter_Code.zip */ outputUrl?: string; /** Human-readable error message when status === 'failed' */ errorMessage?: string; /** 0–100 overall progress indicator */ progress?: number; /** Per-file Dart confidence scores */ confidenceScores?: Record; /** Per-file audit pass/fail */ auditResults?: Record; } export interface JobUpdate { status?: JobStatus; outputUrl?: string; errorMessage?: string; progress?: number; confidenceScores?: Record; auditResults?: Record; } // ─── FirebaseManager (Singleton) ───────────────────────────────────────────── export class FirebaseManager { private static instance: FirebaseManager | null = null; private readonly db: Firestore; // eslint-disable-next-line @typescript-eslint/no-explicit-any private readonly bucket: any; // admin.storage().bucket() — type varies by firebase-admin version private unsubscribeListener: (() => void) | null = null; // ─── Initialization ────────────────────────────────────────────────────── private constructor() { // Guard: only call initializeApp once (important for hot-reload in dev) if (!admin.apps.length) { const serviceAccount = JSON.parse(ENV.FIREBASE_SERVICE_ACCOUNT) as admin.ServiceAccount; admin.initializeApp({ credential: admin.credential.cert(serviceAccount), storageBucket: ENV.FIREBASE_STORAGE_BUCKET, }); } this.db = admin.firestore(); this.bucket = admin.storage().bucket(); logger.info('FirebaseManager: Admin SDK initialized'); logger.info(`FirebaseManager: bucket="${ENV.FIREBASE_STORAGE_BUCKET}"`); } static getInstance(): FirebaseManager { if (!FirebaseManager.instance) { FirebaseManager.instance = new FirebaseManager(); } return FirebaseManager.instance; } // ─── Firestore Job Operations ──────────────────────────────────────────── /** * Fetches a single job document by its Firestore document ID. * Returns null if the document does not exist. */ async getJob(jobId: string): Promise { const snap = await this.db .collection(ENV.FIRESTORE_JOBS_COLLECTION) .doc(jobId) .get(); if (!snap.exists) return null; return { id: snap.id, ...(snap.data() as Omit) }; } /** * Partially updates a job document. Always stamps `updatedAt`. */ async updateJob(jobId: string, update: JobUpdate): Promise { const payload: Record = { ...update, updatedAt: admin.firestore.Timestamp.now(), }; await this.db .collection(ENV.FIRESTORE_JOBS_COLLECTION) .doc(jobId) .update(payload); logger.info( `FirebaseManager: job ${jobId} updated` + (update.status ? ` → status="${update.status}"` : '') + (update.progress !== undefined ? ` progress=${update.progress}%` : '') ); } /** * Starts a Firestore real-time listener on the jobs collection. * When a document is added with status === "pending", it: * 1. Runs a Firestore transaction to atomically claim the job * (sets status → "processing"), preventing races. * 2. Calls `onNewJob(job)` asynchronously. * * @returns An unsubscribe function that stops the listener. */ listenForPendingJobs( onNewJob: (job: TranspilerJob) => Promise ): () => void { const col = this.db.collection(ENV.FIRESTORE_JOBS_COLLECTION); const query = col .where('status', '==', 'pending') .orderBy('createdAt', 'asc'); const unsubscribe = query.onSnapshot( async (snapshot) => { for (const change of snapshot.docChanges()) { // We only care about documents that just became 'pending' if (change.type !== 'added') continue; const docRef = change.doc.ref; const rawData = change.doc.data() as Omit; const job: TranspilerJob = { id: change.doc.id, ...rawData }; logger.info(`FirebaseManager: new pending job → id=${job.id}`); // ── Atomic claim via transaction ───────────────────────────────── let claimed = false; try { await this.db.runTransaction(async (tx) => { const freshSnap = await tx.get(docRef); if (!freshSnap.exists) throw new Error('Job document disappeared'); const freshStatus = (freshSnap.data() as Omit).status; if (freshStatus !== 'pending') { // Another worker claimed it first — abort silently throw new Error(`Job ${job.id} already claimed (status="${freshStatus}")`); } tx.update(docRef, { status: 'processing', updatedAt: admin.firestore.Timestamp.now(), }); }); claimed = true; } catch (txErr) { logger.warn(`FirebaseManager: job ${job.id} claim failed — ${String(txErr)}`); } if (!claimed) continue; // ── Dispatch to orchestrator (non-blocking) ────────────────────── onNewJob(job).catch((err: unknown) => { logger.error(`FirebaseManager: uncaught error in onNewJob for ${job.id}`, err); }); } }, (err) => { logger.error('FirebaseManager: Firestore listener error', err); } ); this.unsubscribeListener = unsubscribe; logger.info( `FirebaseManager: listening on collection "${ENV.FIRESTORE_JOBS_COLLECTION}"` ); return unsubscribe; } /** Stops the active Firestore listener, if any. */ stopListening(): void { this.unsubscribeListener?.(); this.unsubscribeListener = null; logger.info('FirebaseManager: listener stopped'); } // ─── Queue Operations (titan_builder_queue) ────────────────────────────── // // These methods target titan_builder_queue exclusively. // No existing method above is touched or modified. /** * Writes a partial update to a document in `titan_builder_queue`. * Uses set({ merge: true }) so it is safe even if the doc is brand-new, * and always stamps updatedAt. */ async updateQueueDoc(docId: string, update: QueueDocUpdate): Promise { const payload: Record = { ...update, updatedAt: admin.firestore.Timestamp.now(), }; await this.db .collection(QUEUE_COLLECTION) .doc(docId) .set(payload, { merge: true }); logger.info( `FirebaseManager[queue]: ${docId} updated` + (update.status ? ` → "${update.status}"` : '') ); } /** * Creates a seed document in `titan_transpiler_jobs` via .set(). * * WHY THIS EXISTS: * updateJob() uses .update() which throws NOT_FOUND if the doc is absent. * JobQueue calls this BEFORE LLMOrchestrator.processJob() so that the * orchestrators internal updateJob() calls have a real document to land on. */ async createTranspilerJobDoc(job: TranspilerJob): Promise { const { id, ...data } = job; await this.db .collection(ENV.FIRESTORE_JOBS_COLLECTION) .doc(id) .set({ ...data, createdAt: data.createdAt ?? admin.firestore.Timestamp.now(), updatedAt: admin.firestore.Timestamp.now(), }); logger.info(`FirebaseManager: transpiler job seeded → id=${id}`); } /** * Downloads a Firebase Storage object entirely into a Node.js Buffer. * * Use-case: the build queue handler needs the reviewed Dart ZIP as bytes * to pass directly to GitHubActionsService.triggerBuild(). * * Memory note: keep payloads under ~50 MB — fully buffered in RAM. */ async downloadToBuffer(storageUrl: string): Promise { const storagePath = this.parseStoragePath(storageUrl); logger.info(`FirebaseManager: downloading to buffer — "${storagePath}"`); const [contents] = await this.bucket.file(storagePath).download(); const buf = Buffer.isBuffer(contents) ? contents : Buffer.from(contents as Uint8Array); logger.info(`FirebaseManager: buffer ready — ${buf.length} bytes`); return buf; } // ─── Firebase Storage Operations ───────────────────────────────────────── /** * Downloads a file from Firebase Storage to a local path. * Accepts gs://, public storage URLs, or firebasestorage.googleapis.com URLs. */ async downloadFile(storageUrl: string, localPath: string): Promise { const storagePath = this.parseStoragePath(storageUrl); logger.info(`FirebaseManager: downloading "${storagePath}" → "${localPath}"`); await this.bucket.file(storagePath).download({ destination: localPath }); logger.info(`FirebaseManager: download complete`); } /** * Downloads a file from any HTTPS URL (HF Dataset, etc.) to a local path. * Used when zipUrl is an HF Dataset URL instead of a Firebase Storage URL. */ async downloadFileFromUrl(url: string, localPath: string): Promise { // If it's a Firebase Storage URL, use the old method if (url.startsWith('gs://') || url.includes('firebasestorage') || url.includes('storage.googleapis')) { return this.downloadFile(url, localPath); } // Otherwise download via HTTP (HF Dataset, file.io, etc.) logger.info(`FirebaseManager: downloading "${url}" → "${localPath}"`); const response = await axios.get(url, { responseType: 'stream', timeout: 120_000, }); const writer = fs.createWriteStream(localPath); await pipeline(response.data, writer); logger.info(`FirebaseManager: download complete`); } /** * Uploads a local file to Firebase Storage. * @returns A signed URL valid for 7 days. */ async uploadFile( localPath: string, destinationPath: string, contentType = 'application/zip' ): Promise { logger.info(`FirebaseManager: uploading "${localPath}" → "${destinationPath}"`); await this.bucket.upload(localPath, { destination: destinationPath, metadata: { contentType }, }); return this.getSignedUrl(destinationPath); } /** * Uploads a Buffer directly to Firebase Storage without touching disk. * @returns A signed URL valid for 7 days. */ async uploadBuffer( buffer: Buffer, destinationPath: string, contentType = 'application/zip' ): Promise { logger.info(`FirebaseManager: uploading buffer (${buffer.length} bytes) → "${destinationPath}"`); const file = this.bucket.file(destinationPath); await new Promise((resolve, reject) => { const stream = file.createWriteStream({ metadata: { contentType } }); stream.on('error', reject); stream.on('finish', resolve); stream.end(buffer); }); return this.getSignedUrl(destinationPath); } // ─── Private Helpers ───────────────────────────────────────────────────── /** Generates a 7-day signed download URL for a storage path. */ private async getSignedUrl(storagePath: string): Promise { const [url] = await this.bucket.file(storagePath).getSignedUrl({ action: 'read', expires: Date.now() + 7 * 24 * 60 * 60 * 1000, // 7 days }); return url; } /** * Normalises various Firebase Storage URL formats to a raw storage path. * * Handles: * gs://bucket-name/path/to/file.zip * https://storage.googleapis.com/bucket/path/to/file.zip * https://firebasestorage.googleapis.com/v0/b/bucket/o/path%2Fto%2Ffile.zip?token=… */ private parseStoragePath(url: string): string { // gs:// protocol if (url.startsWith('gs://')) { const withoutScheme = url.slice(5); // strip "gs://" const firstSlash = withoutScheme.indexOf('/'); return firstSlash === -1 ? withoutScheme : withoutScheme.slice(firstSlash + 1); } // Public Google Storage CDN URL if (url.includes('storage.googleapis.com/')) { const u = new URL(url); // pathname = /bucket-name/path/to/file — drop first two segments const parts = u.pathname.split('/').filter(Boolean); return parts.slice(1).join('/'); } // Firebase Storage REST URL if (url.includes('firebasestorage.googleapis.com')) { const u = new URL(url); const oSegment = u.pathname.split('/o/')[1]; if (oSegment) return decodeURIComponent(oSegment.split('?')[0]); } // Fallback: treat as a raw storage path return url; } } // ═══════════════════════════════════════════════════════════════════════════ // QUEUE TYPES (added for titan_builder_queue — existing types unchanged) // ═══════════════════════════════════════════════════════════════════════════ /** The Firestore collection that acts as the job message broker. */ export const QUEUE_COLLECTION = 'titan_builder_queue'; export type QueueStatus = | 'pending' | 'processing' | 'building' | 'completed' | 'failed'; export type QueueJobType = 'transpile' | 'build' | 'webview'; /** * Shape of a document in `titan_builder_queue`. * Clients (mobile app, web dashboard) write this; the worker reads it. */ export interface QueueDoc { id: string; type: QueueJobType; status: QueueStatus; // ── 'transpile' fields ───────────────────────────────────────────────── /** HF Dataset URL or HTTPS URL pointing to the ZIP */ zipUrl?: string; appName?: string; // ── 'build' fields ───────────────────────────────────────────────────── /** Flutter ZIP as base64 string */ zipBase64?: string; // ── 'webview' fields ─────────────────────────────────────────────────── url?: string; iconUrl?: string; iconBase64?: string; iconMime?: string; enableCamera?: boolean; enableStorage?: boolean; enableLocation?: boolean; enableNotifications?: boolean; buildType?: string; manifestUrl?: string; templateType?: string; // ── Result fields (set by the worker) ────────────────────────────────── /** HF Dataset public download URL for the output ZIP */ resultZipUrl?: string; errorMessage?: string; createdAt: admin.firestore.Timestamp; updatedAt?: admin.firestore.Timestamp; } export interface QueueDocUpdate { status?: QueueStatus; resultZipUrl?: string; errorMessage?: string; }