Spaces:
Running
Running
File size: 3,416 Bytes
6f1c297 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | import { query } from '../db/client.js';
export type TaskStatus = 'pending' | 'running' | 'completed' | 'failed';
export type TaskType = 'predict_page' | 'predict_all' | 'predict_custom';
export interface Task {
id: string;
score_id: string;
type: TaskType;
status: TaskStatus;
progress: number;
current_step: string | null;
result: Record<string, any> | null;
error: string | null;
created_at: Date;
started_at: Date | null;
completed_at: Date | null;
}
export interface CreateTaskInput {
score_id: string;
type: TaskType;
}
export async function createTask(input: CreateTaskInput): Promise<Task> {
const { rows } = await query<Task>(
`INSERT INTO tasks (score_id, type)
VALUES ($1, $2)
RETURNING *`,
[input.score_id, input.type]
);
return rows[0];
}
export async function getTask(id: string): Promise<Task | null> {
const { rows } = await query<Task>('SELECT * FROM tasks WHERE id = $1', [id]);
return rows[0] || null;
}
export async function listTasks(scoreId?: string, status?: TaskStatus): Promise<Task[]> {
let sql = 'SELECT * FROM tasks';
const conditions: string[] = [];
const values: any[] = [];
let paramIndex = 1;
if (scoreId) {
conditions.push(`score_id = $${paramIndex++}`);
values.push(scoreId);
}
if (status) {
conditions.push(`status = $${paramIndex++}`);
values.push(status);
}
if (conditions.length > 0) {
sql += ' WHERE ' + conditions.join(' AND ');
}
sql += ' ORDER BY created_at DESC';
const { rows } = await query<Task>(sql, values);
return rows;
}
export async function updateTaskStatus(id: string, status: TaskStatus, progress?: number, currentStep?: string): Promise<Task | null> {
const updates: string[] = ['status = $2'];
const values: any[] = [id, status];
let paramIndex = 3;
if (progress !== undefined) {
updates.push(`progress = $${paramIndex++}`);
values.push(progress);
}
if (currentStep !== undefined) {
updates.push(`current_step = $${paramIndex++}`);
values.push(currentStep);
}
if (status === 'running') {
updates.push('started_at = NOW()');
} else if (status === 'completed' || status === 'failed') {
updates.push('completed_at = NOW()');
}
const { rows } = await query<Task>(`UPDATE tasks SET ${updates.join(', ')} WHERE id = $1 RETURNING *`, values);
return rows[0] || null;
}
export async function setTaskResult(id: string, result: Record<string, any>): Promise<Task | null> {
const { rows } = await query<Task>(
`UPDATE tasks SET result = $2, status = 'completed', progress = 100, completed_at = NOW()
WHERE id = $1 RETURNING *`,
[id, JSON.stringify(result)]
);
return rows[0] || null;
}
export async function setTaskError(id: string, error: string): Promise<Task | null> {
const { rows } = await query<Task>(
`UPDATE tasks SET error = $2, status = 'failed', completed_at = NOW()
WHERE id = $1 RETURNING *`,
[id, error]
);
return rows[0] || null;
}
export async function getPendingTasks(limit = 10): Promise<Task[]> {
const { rows } = await query<Task>(`SELECT * FROM tasks WHERE status = 'pending' ORDER BY created_at LIMIT $1`, [limit]);
return rows;
}
export async function claimTask(id: string): Promise<Task | null> {
// Atomic update to prevent race conditions
const { rows } = await query<Task>(
`UPDATE tasks SET status = 'running', started_at = NOW()
WHERE id = $1 AND status = 'pending'
RETURNING *`,
[id]
);
return rows[0] || null;
}
|