Spaces:
Sleeping
Sleeping
| /** | |
| * src/managers/FirebaseManager.ts | |
| * | |
| * Singleton wrapper around the Firebase Admin SDK. | |
| * Responsibilities: | |
| * - Initialize the Admin SDK once from env (service account JSON string) | |
| * - Real-time Firestore listener for `titan_transpiler_jobs` | |
| * - Atomic job claiming via Firestore transactions (prevents duplicate processing | |
| * if multiple workers are ever running) | |
| * - Firebase Storage upload / download helpers | |
| */ | |
| import admin from 'firebase-admin'; | |
| import axios from 'axios'; | |
| import fs from 'fs'; | |
| import { pipeline } from 'stream/promises'; | |
| import type { Firestore } from 'firebase-admin/firestore'; | |
| import { logger } from '../utils/logger'; | |
| import { ENV } from '../config/env'; | |
| // βββ Job Document Types βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export type JobStatus = | |
| | 'pending' | |
| | 'processing' | |
| | 'transpiling' | |
| | 'auditing' | |
| | 'completed' | |
| | 'failed'; | |
| export interface TranspilerJob { | |
| /** Firestore document ID */ | |
| id: string; | |
| /** gs:// or https:// URL of the uploaded Website.zip in Firebase Storage */ | |
| zipUrl: string; | |
| status: JobStatus; | |
| appName?: string; | |
| createdAt: admin.firestore.Timestamp; | |
| updatedAt?: admin.firestore.Timestamp; | |
| /** Signed download URL for the generated Flutter_Code.zip */ | |
| outputUrl?: string; | |
| /** Human-readable error message when status === 'failed' */ | |
| errorMessage?: string; | |
| /** 0β100 overall progress indicator */ | |
| progress?: number; | |
| /** Per-file Dart confidence scores */ | |
| confidenceScores?: Record<string, number>; | |
| /** Per-file audit pass/fail */ | |
| auditResults?: Record<string, boolean>; | |
| } | |
| export interface JobUpdate { | |
| status?: JobStatus; | |
| outputUrl?: string; | |
| errorMessage?: string; | |
| progress?: number; | |
| confidenceScores?: Record<string, number>; | |
| auditResults?: Record<string, boolean>; | |
| } | |
| // βββ FirebaseManager (Singleton) βββββββββββββββββββββββββββββββββββββββββββββ | |
| export class FirebaseManager { | |
| private static instance: FirebaseManager | null = null; | |
| private readonly db: Firestore; | |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | |
| private readonly bucket: any; // admin.storage().bucket() β type varies by firebase-admin version | |
| private unsubscribeListener: (() => void) | null = null; | |
| // βββ Initialization ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| private constructor() { | |
| // Guard: only call initializeApp once (important for hot-reload in dev) | |
| if (!admin.apps.length) { | |
| const serviceAccount = JSON.parse(ENV.FIREBASE_SERVICE_ACCOUNT) as admin.ServiceAccount; | |
| admin.initializeApp({ | |
| credential: admin.credential.cert(serviceAccount), | |
| storageBucket: ENV.FIREBASE_STORAGE_BUCKET, | |
| }); | |
| } | |
| this.db = admin.firestore(); | |
| this.bucket = admin.storage().bucket(); | |
| logger.info('FirebaseManager: Admin SDK initialized'); | |
| logger.info(`FirebaseManager: bucket="${ENV.FIREBASE_STORAGE_BUCKET}"`); | |
| } | |
| static getInstance(): FirebaseManager { | |
| if (!FirebaseManager.instance) { | |
| FirebaseManager.instance = new FirebaseManager(); | |
| } | |
| return FirebaseManager.instance; | |
| } | |
| // βββ Firestore Job Operations ββββββββββββββββββββββββββββββββββββββββββββ | |
| /** | |
| * Fetches a single job document by its Firestore document ID. | |
| * Returns null if the document does not exist. | |
| */ | |
| async getJob(jobId: string): Promise<TranspilerJob | null> { | |
| const snap = await this.db | |
| .collection(ENV.FIRESTORE_JOBS_COLLECTION) | |
| .doc(jobId) | |
| .get(); | |
| if (!snap.exists) return null; | |
| return { id: snap.id, ...(snap.data() as Omit<TranspilerJob, 'id'>) }; | |
| } | |
| /** | |
| * Partially updates a job document. Always stamps `updatedAt`. | |
| */ | |
| async updateJob(jobId: string, update: JobUpdate): Promise<void> { | |
| const payload: Record<string, unknown> = { | |
| ...update, | |
| updatedAt: admin.firestore.Timestamp.now(), | |
| }; | |
| await this.db | |
| .collection(ENV.FIRESTORE_JOBS_COLLECTION) | |
| .doc(jobId) | |
| .update(payload); | |
| logger.info( | |
| `FirebaseManager: job ${jobId} updated` + | |
| (update.status ? ` β status="${update.status}"` : '') + | |
| (update.progress !== undefined ? ` progress=${update.progress}%` : '') | |
| ); | |
| } | |
| /** | |
| * Starts a Firestore real-time listener on the jobs collection. | |
| * When a document is added with status === "pending", it: | |
| * 1. Runs a Firestore transaction to atomically claim the job | |
| * (sets status β "processing"), preventing races. | |
| * 2. Calls `onNewJob(job)` asynchronously. | |
| * | |
| * @returns An unsubscribe function that stops the listener. | |
| */ | |
| listenForPendingJobs( | |
| onNewJob: (job: TranspilerJob) => Promise<void> | |
| ): () => void { | |
| const col = this.db.collection(ENV.FIRESTORE_JOBS_COLLECTION); | |
| const query = col | |
| .where('status', '==', 'pending') | |
| .orderBy('createdAt', 'asc'); | |
| const unsubscribe = query.onSnapshot( | |
| async (snapshot) => { | |
| for (const change of snapshot.docChanges()) { | |
| // We only care about documents that just became 'pending' | |
| if (change.type !== 'added') continue; | |
| const docRef = change.doc.ref; | |
| const rawData = change.doc.data() as Omit<TranspilerJob, 'id'>; | |
| const job: TranspilerJob = { id: change.doc.id, ...rawData }; | |
| logger.info(`FirebaseManager: new pending job β id=${job.id}`); | |
| // ββ Atomic claim via transaction βββββββββββββββββββββββββββββββββ | |
| let claimed = false; | |
| try { | |
| await this.db.runTransaction(async (tx) => { | |
| const freshSnap = await tx.get(docRef); | |
| if (!freshSnap.exists) throw new Error('Job document disappeared'); | |
| const freshStatus = (freshSnap.data() as Omit<TranspilerJob, 'id'>).status; | |
| if (freshStatus !== 'pending') { | |
| // Another worker claimed it first β abort silently | |
| throw new Error(`Job ${job.id} already claimed (status="${freshStatus}")`); | |
| } | |
| tx.update(docRef, { | |
| status: 'processing', | |
| updatedAt: admin.firestore.Timestamp.now(), | |
| }); | |
| }); | |
| claimed = true; | |
| } catch (txErr) { | |
| logger.warn(`FirebaseManager: job ${job.id} claim failed β ${String(txErr)}`); | |
| } | |
| if (!claimed) continue; | |
| // ββ Dispatch to orchestrator (non-blocking) ββββββββββββββββββββββ | |
| onNewJob(job).catch((err: unknown) => { | |
| logger.error(`FirebaseManager: uncaught error in onNewJob for ${job.id}`, err); | |
| }); | |
| } | |
| }, | |
| (err) => { | |
| logger.error('FirebaseManager: Firestore listener error', err); | |
| } | |
| ); | |
| this.unsubscribeListener = unsubscribe; | |
| logger.info( | |
| `FirebaseManager: listening on collection "${ENV.FIRESTORE_JOBS_COLLECTION}"` | |
| ); | |
| return unsubscribe; | |
| } | |
| /** Stops the active Firestore listener, if any. */ | |
| stopListening(): void { | |
| this.unsubscribeListener?.(); | |
| this.unsubscribeListener = null; | |
| logger.info('FirebaseManager: listener stopped'); | |
| } | |
| // βββ Queue Operations (titan_builder_queue) ββββββββββββββββββββββββββββββ | |
| // | |
| // These methods target titan_builder_queue exclusively. | |
| // No existing method above is touched or modified. | |
| /** | |
| * Writes a partial update to a document in `titan_builder_queue`. | |
| * Uses set({ merge: true }) so it is safe even if the doc is brand-new, | |
| * and always stamps updatedAt. | |
| */ | |
| async updateQueueDoc(docId: string, update: QueueDocUpdate): Promise<void> { | |
| const payload: Record<string, unknown> = { | |
| ...update, | |
| updatedAt: admin.firestore.Timestamp.now(), | |
| }; | |
| await this.db | |
| .collection(QUEUE_COLLECTION) | |
| .doc(docId) | |
| .set(payload, { merge: true }); | |
| logger.info( | |
| `FirebaseManager[queue]: ${docId} updated` + | |
| (update.status ? ` β "${update.status}"` : '') | |
| ); | |
| } | |
| /** | |
| * Creates a seed document in `titan_transpiler_jobs` via .set(). | |
| * | |
| * WHY THIS EXISTS: | |
| * updateJob() uses .update() which throws NOT_FOUND if the doc is absent. | |
| * JobQueue calls this BEFORE LLMOrchestrator.processJob() so that the | |
| * orchestrators internal updateJob() calls have a real document to land on. | |
| */ | |
| async createTranspilerJobDoc(job: TranspilerJob): Promise<void> { | |
| const { id, ...data } = job; | |
| await this.db | |
| .collection(ENV.FIRESTORE_JOBS_COLLECTION) | |
| .doc(id) | |
| .set({ | |
| ...data, | |
| createdAt: data.createdAt ?? admin.firestore.Timestamp.now(), | |
| updatedAt: admin.firestore.Timestamp.now(), | |
| }); | |
| logger.info(`FirebaseManager: transpiler job seeded β id=${id}`); | |
| } | |
| /** | |
| * Downloads a Firebase Storage object entirely into a Node.js Buffer. | |
| * | |
| * Use-case: the build queue handler needs the reviewed Dart ZIP as bytes | |
| * to pass directly to GitHubActionsService.triggerBuild(). | |
| * | |
| * Memory note: keep payloads under ~50 MB β fully buffered in RAM. | |
| */ | |
| async downloadToBuffer(storageUrl: string): Promise<Buffer> { | |
| const storagePath = this.parseStoragePath(storageUrl); | |
| logger.info(`FirebaseManager: downloading to buffer β "${storagePath}"`); | |
| const [contents] = await this.bucket.file(storagePath).download(); | |
| const buf = Buffer.isBuffer(contents) ? contents : Buffer.from(contents as Uint8Array); | |
| logger.info(`FirebaseManager: buffer ready β ${buf.length} bytes`); | |
| return buf; | |
| } | |
| // βββ Firebase Storage Operations βββββββββββββββββββββββββββββββββββββββββ | |
| /** | |
| * Downloads a file from Firebase Storage to a local path. | |
| * Accepts gs://, public storage URLs, or firebasestorage.googleapis.com URLs. | |
| */ | |
| async downloadFile(storageUrl: string, localPath: string): Promise<void> { | |
| const storagePath = this.parseStoragePath(storageUrl); | |
| logger.info(`FirebaseManager: downloading "${storagePath}" β "${localPath}"`); | |
| await this.bucket.file(storagePath).download({ destination: localPath }); | |
| logger.info(`FirebaseManager: download complete`); | |
| } | |
| /** | |
| * Downloads a file from any HTTPS URL (HF Dataset, etc.) to a local path. | |
| * Used when zipUrl is an HF Dataset URL instead of a Firebase Storage URL. | |
| */ | |
| async downloadFileFromUrl(url: string, localPath: string): Promise<void> { | |
| // If it's a Firebase Storage URL, use the old method | |
| if (url.startsWith('gs://') || url.includes('firebasestorage') || url.includes('storage.googleapis')) { | |
| return this.downloadFile(url, localPath); | |
| } | |
| // Otherwise download via HTTP (HF Dataset, file.io, etc.) | |
| logger.info(`FirebaseManager: downloading "${url}" β "${localPath}"`); | |
| const response = await axios.get<NodeJS.ReadableStream>(url, { | |
| responseType: 'stream', | |
| timeout: 120_000, | |
| }); | |
| const writer = fs.createWriteStream(localPath); | |
| await pipeline(response.data, writer); | |
| logger.info(`FirebaseManager: download complete`); | |
| } | |
| /** | |
| * Uploads a local file to Firebase Storage. | |
| * @returns A signed URL valid for 7 days. | |
| */ | |
| async uploadFile( | |
| localPath: string, | |
| destinationPath: string, | |
| contentType = 'application/zip' | |
| ): Promise<string> { | |
| logger.info(`FirebaseManager: uploading "${localPath}" β "${destinationPath}"`); | |
| await this.bucket.upload(localPath, { | |
| destination: destinationPath, | |
| metadata: { contentType }, | |
| }); | |
| return this.getSignedUrl(destinationPath); | |
| } | |
| /** | |
| * Uploads a Buffer directly to Firebase Storage without touching disk. | |
| * @returns A signed URL valid for 7 days. | |
| */ | |
| async uploadBuffer( | |
| buffer: Buffer, | |
| destinationPath: string, | |
| contentType = 'application/zip' | |
| ): Promise<string> { | |
| logger.info(`FirebaseManager: uploading buffer (${buffer.length} bytes) β "${destinationPath}"`); | |
| const file = this.bucket.file(destinationPath); | |
| await new Promise<void>((resolve, reject) => { | |
| const stream = file.createWriteStream({ metadata: { contentType } }); | |
| stream.on('error', reject); | |
| stream.on('finish', resolve); | |
| stream.end(buffer); | |
| }); | |
| return this.getSignedUrl(destinationPath); | |
| } | |
| // βββ Private Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| /** Generates a 7-day signed download URL for a storage path. */ | |
| private async getSignedUrl(storagePath: string): Promise<string> { | |
| const [url] = await this.bucket.file(storagePath).getSignedUrl({ | |
| action: 'read', | |
| expires: Date.now() + 7 * 24 * 60 * 60 * 1000, // 7 days | |
| }); | |
| return url; | |
| } | |
| /** | |
| * Normalises various Firebase Storage URL formats to a raw storage path. | |
| * | |
| * Handles: | |
| * gs://bucket-name/path/to/file.zip | |
| * https://storage.googleapis.com/bucket/path/to/file.zip | |
| * https://firebasestorage.googleapis.com/v0/b/bucket/o/path%2Fto%2Ffile.zip?token=β¦ | |
| */ | |
| private parseStoragePath(url: string): string { | |
| // gs:// protocol | |
| if (url.startsWith('gs://')) { | |
| const withoutScheme = url.slice(5); // strip "gs://" | |
| const firstSlash = withoutScheme.indexOf('/'); | |
| return firstSlash === -1 ? withoutScheme : withoutScheme.slice(firstSlash + 1); | |
| } | |
| // Public Google Storage CDN URL | |
| if (url.includes('storage.googleapis.com/')) { | |
| const u = new URL(url); | |
| // pathname = /bucket-name/path/to/file β drop first two segments | |
| const parts = u.pathname.split('/').filter(Boolean); | |
| return parts.slice(1).join('/'); | |
| } | |
| // Firebase Storage REST URL | |
| if (url.includes('firebasestorage.googleapis.com')) { | |
| const u = new URL(url); | |
| const oSegment = u.pathname.split('/o/')[1]; | |
| if (oSegment) return decodeURIComponent(oSegment.split('?')[0]); | |
| } | |
| // Fallback: treat as a raw storage path | |
| return url; | |
| } | |
| } | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| // QUEUE TYPES (added for titan_builder_queue β existing types unchanged) | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| /** The Firestore collection that acts as the job message broker. */ | |
| export const QUEUE_COLLECTION = 'titan_builder_queue'; | |
| export type QueueStatus = | |
| | 'pending' | |
| | 'processing' | |
| | 'building' | |
| | 'completed' | |
| | 'failed'; | |
| export type QueueJobType = 'transpile' | 'build' | 'webview'; | |
| /** | |
| * Shape of a document in `titan_builder_queue`. | |
| * Clients (mobile app, web dashboard) write this; the worker reads it. | |
| */ | |
| export interface QueueDoc { | |
| id: string; | |
| type: QueueJobType; | |
| status: QueueStatus; | |
| // ββ 'transpile' fields βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| /** HF Dataset URL or HTTPS URL pointing to the ZIP */ | |
| zipUrl?: string; | |
| appName?: string; | |
| // ββ 'build' fields βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| /** Flutter ZIP as base64 string */ | |
| zipBase64?: string; | |
| // ββ 'webview' fields βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| url?: string; | |
| iconUrl?: string; | |
| iconBase64?: string; | |
| iconMime?: string; | |
| enableCamera?: boolean; | |
| enableStorage?: boolean; | |
| enableLocation?: boolean; | |
| enableNotifications?: boolean; | |
| buildType?: string; | |
| manifestUrl?: string; | |
| templateType?: string; | |
| // ββ Result fields (set by the worker) ββββββββββββββββββββββββββββββββββ | |
| /** HF Dataset public download URL for the output ZIP */ | |
| resultZipUrl?: string; | |
| errorMessage?: string; | |
| createdAt: admin.firestore.Timestamp; | |
| updatedAt?: admin.firestore.Timestamp; | |
| } | |
| export interface QueueDocUpdate { | |
| status?: QueueStatus; | |
| resultZipUrl?: string; | |
| errorMessage?: string; | |
| } | |