starry / backend /omr-service /src /services /task.service.ts
k-l-lambda's picture
Initial deployment: frontend + omr-service + cluster-server + nginx proxy
6f1c297
import { query } from '../db/client.js';
export type TaskStatus = 'pending' | 'running' | 'completed' | 'failed';
export type TaskType = 'predict_page' | 'predict_all' | 'predict_custom';
export interface Task {
id: string;
score_id: string;
type: TaskType;
status: TaskStatus;
progress: number;
current_step: string | null;
result: Record<string, any> | null;
error: string | null;
created_at: Date;
started_at: Date | null;
completed_at: Date | null;
}
export interface CreateTaskInput {
score_id: string;
type: TaskType;
}
export async function createTask(input: CreateTaskInput): Promise<Task> {
const { rows } = await query<Task>(
`INSERT INTO tasks (score_id, type)
VALUES ($1, $2)
RETURNING *`,
[input.score_id, input.type]
);
return rows[0];
}
export async function getTask(id: string): Promise<Task | null> {
const { rows } = await query<Task>('SELECT * FROM tasks WHERE id = $1', [id]);
return rows[0] || null;
}
export async function listTasks(scoreId?: string, status?: TaskStatus): Promise<Task[]> {
let sql = 'SELECT * FROM tasks';
const conditions: string[] = [];
const values: any[] = [];
let paramIndex = 1;
if (scoreId) {
conditions.push(`score_id = $${paramIndex++}`);
values.push(scoreId);
}
if (status) {
conditions.push(`status = $${paramIndex++}`);
values.push(status);
}
if (conditions.length > 0) {
sql += ' WHERE ' + conditions.join(' AND ');
}
sql += ' ORDER BY created_at DESC';
const { rows } = await query<Task>(sql, values);
return rows;
}
export async function updateTaskStatus(id: string, status: TaskStatus, progress?: number, currentStep?: string): Promise<Task | null> {
const updates: string[] = ['status = $2'];
const values: any[] = [id, status];
let paramIndex = 3;
if (progress !== undefined) {
updates.push(`progress = $${paramIndex++}`);
values.push(progress);
}
if (currentStep !== undefined) {
updates.push(`current_step = $${paramIndex++}`);
values.push(currentStep);
}
if (status === 'running') {
updates.push('started_at = NOW()');
} else if (status === 'completed' || status === 'failed') {
updates.push('completed_at = NOW()');
}
const { rows } = await query<Task>(`UPDATE tasks SET ${updates.join(', ')} WHERE id = $1 RETURNING *`, values);
return rows[0] || null;
}
export async function setTaskResult(id: string, result: Record<string, any>): Promise<Task | null> {
const { rows } = await query<Task>(
`UPDATE tasks SET result = $2, status = 'completed', progress = 100, completed_at = NOW()
WHERE id = $1 RETURNING *`,
[id, JSON.stringify(result)]
);
return rows[0] || null;
}
export async function setTaskError(id: string, error: string): Promise<Task | null> {
const { rows } = await query<Task>(
`UPDATE tasks SET error = $2, status = 'failed', completed_at = NOW()
WHERE id = $1 RETURNING *`,
[id, error]
);
return rows[0] || null;
}
export async function getPendingTasks(limit = 10): Promise<Task[]> {
const { rows } = await query<Task>(`SELECT * FROM tasks WHERE status = 'pending' ORDER BY created_at LIMIT $1`, [limit]);
return rows;
}
export async function claimTask(id: string): Promise<Task | null> {
// Atomic update to prevent race conditions
const { rows } = await query<Task>(
`UPDATE tasks SET status = 'running', started_at = NOW()
WHERE id = $1 AND status = 'pending'
RETURNING *`,
[id]
);
return rows[0] || null;
}