titan-server / src /managers /FirebaseManager.ts
m-h2's picture
Upload FirebaseManager.ts
03194b1 verified
Raw
History Blame Contribute Delete
17.1 kB
/**
* src/managers/FirebaseManager.ts
*
* Singleton wrapper around the Firebase Admin SDK.
* Responsibilities:
* - Initialize the Admin SDK once from env (service account JSON string)
* - Real-time Firestore listener for `titan_transpiler_jobs`
* - Atomic job claiming via Firestore transactions (prevents duplicate processing
* if multiple workers are ever running)
* - Firebase Storage upload / download helpers
*/
import admin from 'firebase-admin';
import axios from 'axios';
import fs from 'fs';
import { pipeline } from 'stream/promises';
import type { Firestore } from 'firebase-admin/firestore';
import { logger } from '../utils/logger';
import { ENV } from '../config/env';
// ─── Job Document Types ───────────────────────────────────────────────────────
export type JobStatus =
| 'pending'
| 'processing'
| 'transpiling'
| 'auditing'
| 'completed'
| 'failed';
export interface TranspilerJob {
/** Firestore document ID */
id: string;
/** gs:// or https:// URL of the uploaded Website.zip in Firebase Storage */
zipUrl: string;
status: JobStatus;
appName?: string;
createdAt: admin.firestore.Timestamp;
updatedAt?: admin.firestore.Timestamp;
/** Signed download URL for the generated Flutter_Code.zip */
outputUrl?: string;
/** Human-readable error message when status === 'failed' */
errorMessage?: string;
/** 0–100 overall progress indicator */
progress?: number;
/** Per-file Dart confidence scores */
confidenceScores?: Record<string, number>;
/** Per-file audit pass/fail */
auditResults?: Record<string, boolean>;
}
export interface JobUpdate {
status?: JobStatus;
outputUrl?: string;
errorMessage?: string;
progress?: number;
confidenceScores?: Record<string, number>;
auditResults?: Record<string, boolean>;
}
// ─── FirebaseManager (Singleton) ─────────────────────────────────────────────
export class FirebaseManager {
private static instance: FirebaseManager | null = null;
private readonly db: Firestore;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private readonly bucket: any; // admin.storage().bucket() β€” type varies by firebase-admin version
private unsubscribeListener: (() => void) | null = null;
// ─── Initialization ──────────────────────────────────────────────────────
private constructor() {
// Guard: only call initializeApp once (important for hot-reload in dev)
if (!admin.apps.length) {
const serviceAccount = JSON.parse(ENV.FIREBASE_SERVICE_ACCOUNT) as admin.ServiceAccount;
admin.initializeApp({
credential: admin.credential.cert(serviceAccount),
storageBucket: ENV.FIREBASE_STORAGE_BUCKET,
});
}
this.db = admin.firestore();
this.bucket = admin.storage().bucket();
logger.info('FirebaseManager: Admin SDK initialized');
logger.info(`FirebaseManager: bucket="${ENV.FIREBASE_STORAGE_BUCKET}"`);
}
static getInstance(): FirebaseManager {
if (!FirebaseManager.instance) {
FirebaseManager.instance = new FirebaseManager();
}
return FirebaseManager.instance;
}
// ─── Firestore Job Operations ────────────────────────────────────────────
/**
* Fetches a single job document by its Firestore document ID.
* Returns null if the document does not exist.
*/
async getJob(jobId: string): Promise<TranspilerJob | null> {
const snap = await this.db
.collection(ENV.FIRESTORE_JOBS_COLLECTION)
.doc(jobId)
.get();
if (!snap.exists) return null;
return { id: snap.id, ...(snap.data() as Omit<TranspilerJob, 'id'>) };
}
/**
* Partially updates a job document. Always stamps `updatedAt`.
*/
async updateJob(jobId: string, update: JobUpdate): Promise<void> {
const payload: Record<string, unknown> = {
...update,
updatedAt: admin.firestore.Timestamp.now(),
};
await this.db
.collection(ENV.FIRESTORE_JOBS_COLLECTION)
.doc(jobId)
.update(payload);
logger.info(
`FirebaseManager: job ${jobId} updated` +
(update.status ? ` β†’ status="${update.status}"` : '') +
(update.progress !== undefined ? ` progress=${update.progress}%` : '')
);
}
/**
* Starts a Firestore real-time listener on the jobs collection.
* When a document is added with status === "pending", it:
* 1. Runs a Firestore transaction to atomically claim the job
* (sets status β†’ "processing"), preventing races.
* 2. Calls `onNewJob(job)` asynchronously.
*
* @returns An unsubscribe function that stops the listener.
*/
listenForPendingJobs(
onNewJob: (job: TranspilerJob) => Promise<void>
): () => void {
const col = this.db.collection(ENV.FIRESTORE_JOBS_COLLECTION);
const query = col
.where('status', '==', 'pending')
.orderBy('createdAt', 'asc');
const unsubscribe = query.onSnapshot(
async (snapshot) => {
for (const change of snapshot.docChanges()) {
// We only care about documents that just became 'pending'
if (change.type !== 'added') continue;
const docRef = change.doc.ref;
const rawData = change.doc.data() as Omit<TranspilerJob, 'id'>;
const job: TranspilerJob = { id: change.doc.id, ...rawData };
logger.info(`FirebaseManager: new pending job β†’ id=${job.id}`);
// ── Atomic claim via transaction ─────────────────────────────────
let claimed = false;
try {
await this.db.runTransaction(async (tx) => {
const freshSnap = await tx.get(docRef);
if (!freshSnap.exists) throw new Error('Job document disappeared');
const freshStatus = (freshSnap.data() as Omit<TranspilerJob, 'id'>).status;
if (freshStatus !== 'pending') {
// Another worker claimed it first β€” abort silently
throw new Error(`Job ${job.id} already claimed (status="${freshStatus}")`);
}
tx.update(docRef, {
status: 'processing',
updatedAt: admin.firestore.Timestamp.now(),
});
});
claimed = true;
} catch (txErr) {
logger.warn(`FirebaseManager: job ${job.id} claim failed β€” ${String(txErr)}`);
}
if (!claimed) continue;
// ── Dispatch to orchestrator (non-blocking) ──────────────────────
onNewJob(job).catch((err: unknown) => {
logger.error(`FirebaseManager: uncaught error in onNewJob for ${job.id}`, err);
});
}
},
(err) => {
logger.error('FirebaseManager: Firestore listener error', err);
}
);
this.unsubscribeListener = unsubscribe;
logger.info(
`FirebaseManager: listening on collection "${ENV.FIRESTORE_JOBS_COLLECTION}"`
);
return unsubscribe;
}
/** Stops the active Firestore listener, if any. */
stopListening(): void {
this.unsubscribeListener?.();
this.unsubscribeListener = null;
logger.info('FirebaseManager: listener stopped');
}
// ─── Queue Operations (titan_builder_queue) ──────────────────────────────
//
// These methods target titan_builder_queue exclusively.
// No existing method above is touched or modified.
/**
* Writes a partial update to a document in `titan_builder_queue`.
* Uses set({ merge: true }) so it is safe even if the doc is brand-new,
* and always stamps updatedAt.
*/
async updateQueueDoc(docId: string, update: QueueDocUpdate): Promise<void> {
const payload: Record<string, unknown> = {
...update,
updatedAt: admin.firestore.Timestamp.now(),
};
await this.db
.collection(QUEUE_COLLECTION)
.doc(docId)
.set(payload, { merge: true });
logger.info(
`FirebaseManager[queue]: ${docId} updated` +
(update.status ? ` β†’ "${update.status}"` : '')
);
}
/**
* Creates a seed document in `titan_transpiler_jobs` via .set().
*
* WHY THIS EXISTS:
* updateJob() uses .update() which throws NOT_FOUND if the doc is absent.
* JobQueue calls this BEFORE LLMOrchestrator.processJob() so that the
* orchestrators internal updateJob() calls have a real document to land on.
*/
async createTranspilerJobDoc(job: TranspilerJob): Promise<void> {
const { id, ...data } = job;
await this.db
.collection(ENV.FIRESTORE_JOBS_COLLECTION)
.doc(id)
.set({
...data,
createdAt: data.createdAt ?? admin.firestore.Timestamp.now(),
updatedAt: admin.firestore.Timestamp.now(),
});
logger.info(`FirebaseManager: transpiler job seeded β†’ id=${id}`);
}
/**
* Downloads a Firebase Storage object entirely into a Node.js Buffer.
*
* Use-case: the build queue handler needs the reviewed Dart ZIP as bytes
* to pass directly to GitHubActionsService.triggerBuild().
*
* Memory note: keep payloads under ~50 MB β€” fully buffered in RAM.
*/
async downloadToBuffer(storageUrl: string): Promise<Buffer> {
const storagePath = this.parseStoragePath(storageUrl);
logger.info(`FirebaseManager: downloading to buffer β€” "${storagePath}"`);
const [contents] = await this.bucket.file(storagePath).download();
const buf = Buffer.isBuffer(contents) ? contents : Buffer.from(contents as Uint8Array);
logger.info(`FirebaseManager: buffer ready β€” ${buf.length} bytes`);
return buf;
}
// ─── Firebase Storage Operations ─────────────────────────────────────────
/**
* Downloads a file from Firebase Storage to a local path.
* Accepts gs://, public storage URLs, or firebasestorage.googleapis.com URLs.
*/
async downloadFile(storageUrl: string, localPath: string): Promise<void> {
const storagePath = this.parseStoragePath(storageUrl);
logger.info(`FirebaseManager: downloading "${storagePath}" β†’ "${localPath}"`);
await this.bucket.file(storagePath).download({ destination: localPath });
logger.info(`FirebaseManager: download complete`);
}
/**
* Downloads a file from any HTTPS URL (HF Dataset, etc.) to a local path.
* Used when zipUrl is an HF Dataset URL instead of a Firebase Storage URL.
*/
async downloadFileFromUrl(url: string, localPath: string): Promise<void> {
// If it's a Firebase Storage URL, use the old method
if (url.startsWith('gs://') || url.includes('firebasestorage') || url.includes('storage.googleapis')) {
return this.downloadFile(url, localPath);
}
// Otherwise download via HTTP (HF Dataset, file.io, etc.)
logger.info(`FirebaseManager: downloading "${url}" β†’ "${localPath}"`);
const response = await axios.get<NodeJS.ReadableStream>(url, {
responseType: 'stream',
timeout: 120_000,
});
const writer = fs.createWriteStream(localPath);
await pipeline(response.data, writer);
logger.info(`FirebaseManager: download complete`);
}
/**
* Uploads a local file to Firebase Storage.
* @returns A signed URL valid for 7 days.
*/
async uploadFile(
localPath: string,
destinationPath: string,
contentType = 'application/zip'
): Promise<string> {
logger.info(`FirebaseManager: uploading "${localPath}" β†’ "${destinationPath}"`);
await this.bucket.upload(localPath, {
destination: destinationPath,
metadata: { contentType },
});
return this.getSignedUrl(destinationPath);
}
/**
* Uploads a Buffer directly to Firebase Storage without touching disk.
* @returns A signed URL valid for 7 days.
*/
async uploadBuffer(
buffer: Buffer,
destinationPath: string,
contentType = 'application/zip'
): Promise<string> {
logger.info(`FirebaseManager: uploading buffer (${buffer.length} bytes) β†’ "${destinationPath}"`);
const file = this.bucket.file(destinationPath);
await new Promise<void>((resolve, reject) => {
const stream = file.createWriteStream({ metadata: { contentType } });
stream.on('error', reject);
stream.on('finish', resolve);
stream.end(buffer);
});
return this.getSignedUrl(destinationPath);
}
// ─── Private Helpers ─────────────────────────────────────────────────────
/** Generates a 7-day signed download URL for a storage path. */
private async getSignedUrl(storagePath: string): Promise<string> {
const [url] = await this.bucket.file(storagePath).getSignedUrl({
action: 'read',
expires: Date.now() + 7 * 24 * 60 * 60 * 1000, // 7 days
});
return url;
}
/**
* Normalises various Firebase Storage URL formats to a raw storage path.
*
* Handles:
* gs://bucket-name/path/to/file.zip
* https://storage.googleapis.com/bucket/path/to/file.zip
* https://firebasestorage.googleapis.com/v0/b/bucket/o/path%2Fto%2Ffile.zip?token=…
*/
private parseStoragePath(url: string): string {
// gs:// protocol
if (url.startsWith('gs://')) {
const withoutScheme = url.slice(5); // strip "gs://"
const firstSlash = withoutScheme.indexOf('/');
return firstSlash === -1 ? withoutScheme : withoutScheme.slice(firstSlash + 1);
}
// Public Google Storage CDN URL
if (url.includes('storage.googleapis.com/')) {
const u = new URL(url);
// pathname = /bucket-name/path/to/file β€” drop first two segments
const parts = u.pathname.split('/').filter(Boolean);
return parts.slice(1).join('/');
}
// Firebase Storage REST URL
if (url.includes('firebasestorage.googleapis.com')) {
const u = new URL(url);
const oSegment = u.pathname.split('/o/')[1];
if (oSegment) return decodeURIComponent(oSegment.split('?')[0]);
}
// Fallback: treat as a raw storage path
return url;
}
}
// ═══════════════════════════════════════════════════════════════════════════
// QUEUE TYPES (added for titan_builder_queue β€” existing types unchanged)
// ═══════════════════════════════════════════════════════════════════════════
/** The Firestore collection that acts as the job message broker. */
export const QUEUE_COLLECTION = 'titan_builder_queue';
export type QueueStatus =
| 'pending'
| 'processing'
| 'building'
| 'completed'
| 'failed';
export type QueueJobType = 'transpile' | 'build' | 'webview';
/**
* Shape of a document in `titan_builder_queue`.
* Clients (mobile app, web dashboard) write this; the worker reads it.
*/
export interface QueueDoc {
id: string;
type: QueueJobType;
status: QueueStatus;
// ── 'transpile' fields ─────────────────────────────────────────────────
/** HF Dataset URL or HTTPS URL pointing to the ZIP */
zipUrl?: string;
appName?: string;
// ── 'build' fields ─────────────────────────────────────────────────────
/** Flutter ZIP as base64 string */
zipBase64?: string;
// ── 'webview' fields ───────────────────────────────────────────────────
url?: string;
iconUrl?: string;
iconBase64?: string;
iconMime?: string;
enableCamera?: boolean;
enableStorage?: boolean;
enableLocation?: boolean;
enableNotifications?: boolean;
buildType?: string;
manifestUrl?: string;
templateType?: string;
// ── Result fields (set by the worker) ──────────────────────────────────
/** HF Dataset public download URL for the output ZIP */
resultZipUrl?: string;
errorMessage?: string;
createdAt: admin.firestore.Timestamp;
updatedAt?: admin.firestore.Timestamp;
}
export interface QueueDocUpdate {
status?: QueueStatus;
resultZipUrl?: string;
errorMessage?: string;
}