titan-server / src /services /JobQueue.ts
m-h2's picture
Upload 2 files
a69dd77 verified
Raw
History Blame Contribute Delete
13.3 kB
/**
* src/services/JobQueue.ts
*
* Job routing layer for titan_builder_queue.
*
* Changes from v2:
* - handleWebView: ูŠู‚ุฑุฃ buildType + manifestUrl ู…ู† Firestore
* ูˆูŠู…ุฑุฑู‡ู… ู„ู€ triggerWebViewBuild
* - ู„ูˆ buildType = pwa_twa ุฃูˆ twa_native ูŠุณุฌู„ ุฐู„ูƒ ููŠ ุงู„ู„ูˆุฌ
*/
import admin from 'firebase-admin';
import { logger } from '../utils/logger';
import {
FirebaseManager,
QUEUE_COLLECTION,
type QueueDoc,
type TranspilerJob,
} from '../managers/FirebaseManager';
import { LLMOrchestrator } from './llmOrchestrator';
import { GitHubActionsService } from './githubActions';
import { hfStorage } from './hfStorage';
// โ”€โ”€โ”€ Job Queue Service โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
export class JobQueue {
private readonly firebase: FirebaseManager;
private readonly orchestrator: LLMOrchestrator;
private readonly github: GitHubActionsService;
private readonly inFlight = new Set<string>();
private stopFn: (() => void) | null = null;
constructor() {
this.firebase = FirebaseManager.getInstance();
this.orchestrator = new LLMOrchestrator();
this.github = new GitHubActionsService();
}
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
// Lifecycle
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
start(): () => void {
const db = (this.firebase as unknown as { db: admin.firestore.Firestore }).db
?? admin.firestore();
const query = db
.collection(QUEUE_COLLECTION)
.where('status', '==', 'pending')
.orderBy('createdAt', 'asc');
const unsubscribe = query.onSnapshot(
async (snapshot) => {
for (const change of snapshot.docChanges()) {
if (change.type !== 'added' && change.type !== 'modified') continue;
const doc = change.doc;
const data = doc.data() as Omit<QueueDoc, 'id'>;
if (data.status !== 'pending') continue;
const job: QueueDoc = { id: doc.id, ...data };
if (this.inFlight.has(job.id)) continue;
const claimed = await this.claimJob(doc.ref);
if (!claimed) continue;
this.inFlight.add(job.id);
logger.info(`JobQueue: claimed job ${job.id} (type="${job.type}")`);
this.dispatch(job)
.catch(err =>
logger.error(`JobQueue: unhandled error for job ${job.id}`, err)
)
.finally(() => this.inFlight.delete(job.id));
}
},
(err) => {
logger.error('JobQueue: Firestore listener error', err);
}
);
this.stopFn = unsubscribe;
logger.info(`JobQueue: โœ… listening on "${QUEUE_COLLECTION}"`);
return unsubscribe;
}
stop(): void {
this.stopFn?.();
this.stopFn = null;
logger.info('JobQueue: listener stopped');
}
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
// Distributed Claim
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
private async claimJob(
docRef: admin.firestore.DocumentReference
): Promise<boolean> {
try {
await admin.firestore().runTransaction(async (tx) => {
const snap = await tx.get(docRef);
if (!snap.exists) throw new Error('Job disappeared');
const status = (snap.data() as Omit<QueueDoc, 'id'>).status;
if (status !== 'pending') throw new Error(`Already claimed: ${status}`);
tx.update(docRef, {
status: 'processing',
updatedAt: admin.firestore.Timestamp.now(),
});
});
return true;
} catch {
return false;
}
}
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
// Dispatch
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
private async dispatch(job: QueueDoc): Promise<void> {
try {
switch (job.type) {
case 'transpile': await this.handleTranspile(job); break;
case 'build': await this.handleBuild(job); break;
case 'webview': await this.handleWebView(job); break;
default: {
const unknown = (job as QueueDoc).type;
logger.warn(`JobQueue: unknown type "${unknown}" for job ${job.id}`);
await this.firebase.updateQueueDoc(job.id, {
status: 'failed',
errorMessage: `Unknown job type: "${unknown}"`,
});
}
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.error(`JobQueue: top-level dispatch error for job ${job.id} โ€” ${msg}`);
await this.firebase.updateQueueDoc(job.id, {
status: 'failed',
errorMessage: msg,
}).catch(() => undefined);
}
}
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
// Handler: 'transpile'
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
private async handleTranspile(job: QueueDoc): Promise<void> {
logger.info(`JobQueue[transpile]: starting job ${job.id}`);
if (!job.zipUrl) {
await this.firebase.updateQueueDoc(job.id, {
status: 'failed',
errorMessage: 'transpile job is missing required field: zipUrl',
});
return;
}
const transpilerJob: TranspilerJob = {
id: job.id,
zipUrl: job.zipUrl,
appName: job.appName,
status: 'processing',
createdAt: job.createdAt ?? admin.firestore.Timestamp.now(),
};
await this.firebase.createTranspilerJobDoc(transpilerJob);
await this.orchestrator.processJob(transpilerJob);
const result = await this.firebase.getJob(job.id);
if (result?.status === 'completed' && result.outputUrl) {
await this.firebase.updateQueueDoc(job.id, {
status: 'completed',
resultZipUrl: result.outputUrl,
});
logger.info(`JobQueue[transpile]: โœ… completed โ€” job ${job.id}`);
} else {
const reason = result?.errorMessage ?? 'Pipeline ended without an output URL';
await this.firebase.updateQueueDoc(job.id, {
status: 'failed',
errorMessage: reason,
});
logger.warn(`JobQueue[transpile]: โŒ failed โ€” job ${job.id}: ${reason}`);
}
}
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
// Handler: 'build'
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
/**
* The ZIP arrives as a base64 string in job.zipBase64.
* No Firebase Storage needed.
*/
private async handleBuild(job: QueueDoc): Promise<void> {
logger.info(`JobQueue[build]: starting job ${job.id}`);
if (!job.zipBase64) {
await this.firebase.updateQueueDoc(job.id, {
status: 'failed',
errorMessage: 'build job is missing required field: zipBase64',
});
return;
}
try {
const buffer = Buffer.from(job.zipBase64, 'base64');
await this.github.triggerBuild(buffer, job.id);
await this.firebase.updateQueueDoc(job.id, { status: 'building' });
logger.info(`JobQueue[build]: โœ… GitHub build triggered โ€” job ${job.id}`);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.error(`JobQueue[build]: โŒ failed โ€” job ${job.id}: ${msg}`);
await this.firebase.updateQueueDoc(job.id, {
status: 'failed',
errorMessage: msg,
});
}
}
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
// Handler: 'webview'
// โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
/**
* Handles webview / pwa_twa / twa_native jobs.
*
* Changes v3:
* - ูŠู‚ุฑุฃ buildType + manifestUrl ู…ู† ุงู„ู€ Firestore doc
* - ูŠุณุฌู„ ููŠ ุงู„ู„ูˆุฌ ู„ูˆ ูƒุงู† TWA build
* - ูŠู…ุฑุฑ buildType + manifestUrl ู„ู€ triggerWebViewBuild
*/
private async handleWebView(job: QueueDoc): Promise<void> {
logger.info(`JobQueue[webview]: starting job ${job.id}`);
if (!job.url || !job.appName) {
await this.firebase.updateQueueDoc(job.id, {
status: 'failed',
errorMessage: 'webview job is missing required fields: url, appName',
});
return;
}
// โ”€โ”€ ู‚ุฑุงุกุฉ buildType + manifestUrl โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
const buildType = job.buildType ?? 'webview';
const manifestUrl = job.manifestUrl ?? '';
// โ”€โ”€ ุชุณุฌูŠู„ ุฎุงุต ู„ู„ู€ TWA builds โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if (buildType === 'pwa_twa' || buildType === 'twa_native') {
logger.info(
`JobQueue[webview]: ๐Ÿ”ท TWA build detected โ€” ` +
`buildType="${buildType}", manifestUrl="${manifestUrl}", job=${job.id}`
);
}
try {
// โ”€โ”€ ุฑูุน ุงู„ุฃูŠู‚ูˆู†ุฉ ุนู„ู‰ HF Dataset ู„ูˆ ู…ูˆุฌูˆุฏุฉ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
let finalIconUrl = job.iconUrl || '';
if (job.iconBase64 && job.iconMime) {
logger.info(`JobQueue[webview]: uploading icon for job ${job.id}`);
const ext = job.iconMime.split('/')[1] || 'png';
const filename = `icons/${job.id}/icon.${ext}`;
const buf = Buffer.from(job.iconBase64, 'base64');
finalIconUrl = await hfStorage.uploadBuffer(buf, filename, job.iconMime);
logger.info(`JobQueue[webview]: icon uploaded โ†’ ${finalIconUrl}`);
} else if (job.iconUrl && job.iconUrl.startsWith('http')) {
try {
finalIconUrl = await hfStorage.uploadIconFromUrl(job.iconUrl, job.id);
} catch {
logger.warn(`JobQueue[webview]: icon re-upload failed, using original URL`);
finalIconUrl = job.iconUrl;
}
}
// โ”€โ”€ Trigger GitHub Actions โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
await this.github.triggerWebViewBuild({
url: job.url,
appName: job.appName,
iconUrl: finalIconUrl,
trackingId: job.id,
enableCamera: job.enableCamera ?? true,
enableStorage: job.enableStorage ?? true,
enableLocation: job.enableLocation ?? false,
enableNotifications: job.enableNotifications ?? false,
buildType, // โ† ุฌุฏูŠุฏ: ูŠูู…ุฑูŽู‘ุฑ ู…ู† Firestore
manifestUrl, // โ† ุฌุฏูŠุฏ: ูŠูู…ุฑูŽู‘ุฑ ู…ู† Firestore
});
await this.firebase.updateQueueDoc(job.id, { status: 'building' });
logger.info(
`JobQueue[webview]: โœ… GitHub ${buildType} build triggered โ€” job ${job.id}`
);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.error(`JobQueue[webview]: โŒ failed โ€” job ${job.id}: ${msg}`);
await this.firebase.updateQueueDoc(job.id, {
status: 'failed',
errorMessage: msg,
});
}
}
}