File size: 3,416 Bytes
6f1c297
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import { query } from '../db/client.js';

export type TaskStatus = 'pending' | 'running' | 'completed' | 'failed';
export type TaskType = 'predict_page' | 'predict_all' | 'predict_custom';

export interface Task {
	id: string;
	score_id: string;
	type: TaskType;
	status: TaskStatus;
	progress: number;
	current_step: string | null;
	result: Record<string, any> | null;
	error: string | null;
	created_at: Date;
	started_at: Date | null;
	completed_at: Date | null;
}

export interface CreateTaskInput {
	score_id: string;
	type: TaskType;
}

export async function createTask(input: CreateTaskInput): Promise<Task> {
	const { rows } = await query<Task>(
		`INSERT INTO tasks (score_id, type)
		 VALUES ($1, $2)
		 RETURNING *`,
		[input.score_id, input.type]
	);
	return rows[0];
}

export async function getTask(id: string): Promise<Task | null> {
	const { rows } = await query<Task>('SELECT * FROM tasks WHERE id = $1', [id]);
	return rows[0] || null;
}

export async function listTasks(scoreId?: string, status?: TaskStatus): Promise<Task[]> {
	let sql = 'SELECT * FROM tasks';
	const conditions: string[] = [];
	const values: any[] = [];
	let paramIndex = 1;

	if (scoreId) {
		conditions.push(`score_id = $${paramIndex++}`);
		values.push(scoreId);
	}
	if (status) {
		conditions.push(`status = $${paramIndex++}`);
		values.push(status);
	}

	if (conditions.length > 0) {
		sql += ' WHERE ' + conditions.join(' AND ');
	}
	sql += ' ORDER BY created_at DESC';

	const { rows } = await query<Task>(sql, values);
	return rows;
}

export async function updateTaskStatus(id: string, status: TaskStatus, progress?: number, currentStep?: string): Promise<Task | null> {
	const updates: string[] = ['status = $2'];
	const values: any[] = [id, status];
	let paramIndex = 3;

	if (progress !== undefined) {
		updates.push(`progress = $${paramIndex++}`);
		values.push(progress);
	}
	if (currentStep !== undefined) {
		updates.push(`current_step = $${paramIndex++}`);
		values.push(currentStep);
	}

	if (status === 'running') {
		updates.push('started_at = NOW()');
	} else if (status === 'completed' || status === 'failed') {
		updates.push('completed_at = NOW()');
	}

	const { rows } = await query<Task>(`UPDATE tasks SET ${updates.join(', ')} WHERE id = $1 RETURNING *`, values);
	return rows[0] || null;
}

export async function setTaskResult(id: string, result: Record<string, any>): Promise<Task | null> {
	const { rows } = await query<Task>(
		`UPDATE tasks SET result = $2, status = 'completed', progress = 100, completed_at = NOW()
		 WHERE id = $1 RETURNING *`,
		[id, JSON.stringify(result)]
	);
	return rows[0] || null;
}

export async function setTaskError(id: string, error: string): Promise<Task | null> {
	const { rows } = await query<Task>(
		`UPDATE tasks SET error = $2, status = 'failed', completed_at = NOW()
		 WHERE id = $1 RETURNING *`,
		[id, error]
	);
	return rows[0] || null;
}

export async function getPendingTasks(limit = 10): Promise<Task[]> {
	const { rows } = await query<Task>(`SELECT * FROM tasks WHERE status = 'pending' ORDER BY created_at LIMIT $1`, [limit]);
	return rows;
}

export async function claimTask(id: string): Promise<Task | null> {
	// Atomic update to prevent race conditions
	const { rows } = await query<Task>(
		`UPDATE tasks SET status = 'running', started_at = NOW()
		 WHERE id = $1 AND status = 'pending'
		 RETURNING *`,
		[id]
	);
	return rows[0] || null;
}