Shadow-Generator / app /queues /taskQueue.js
karthikeya1212's picture
Upload 7 files
e890ceb verified
// const { Queue, Worker, QueueScheduler, Job } = require('bullmq');
// const IORedis = require('ioredis');
// const fs = require('fs-extra');
// const path = require('path');
// const { v4: uuidv4 } = require('uuid');
// const { generateShadow } = require('./shadowGenerator'); // your existing shadow logic
// // Redis connection
// const connection = new IORedis();
// // Queue setup
// const shadowQueue = new Queue('shadowQueue', { connection });
// new QueueScheduler('shadowQueue', { connection });
// // RAM and disk buffers
// const RAM_LIMIT = 20; // max tasks in RAM
// const RAM_BUFFER = [];
// const DISK_BUFFER = path.join(__dirname, 'diskQueue');
// fs.ensureDirSync(DISK_BUFFER);
// // In-memory results
// const RESULTS = {};
// // ---------------- Add Task ----------------
// async function addTask(fileBuffer, options = {}) {
// const taskId = uuidv4(); // unique ID for each task
// const task = { id: taskId, file: fileBuffer, options };
// if (RAM_BUFFER.length < RAM_LIMIT) {
// RAM_BUFFER.push(task);
// processRAMBuffer();
// } else {
// // Move to disk
// const filePath = path.join(DISK_BUFFER, `${taskId}.json`);
// await fs.writeJson(filePath, task, { spaces: 0 });
// }
// return taskId;
// }
// // ---------------- Process RAM tasks ----------------
// let processing = false;
// async function processRAMBuffer() {
// if (processing || RAM_BUFFER.length === 0) return;
// processing = true;
// while (RAM_BUFFER.length > 0) {
// const task = RAM_BUFFER.shift();
// try {
// const imgBuffer = await generateShadow(task.file, task.options);
// RESULTS[task.id] = imgBuffer;
// } catch (err) {
// console.error(`Task ${task.id} failed:`, err);
// RESULTS[task.id] = null;
// }
// // Move tasks from disk to RAM if space available
// const diskFiles = await fs.readdir(DISK_BUFFER);
// while (RAM_BUFFER.length < RAM_LIMIT && diskFiles.length > 0) {
// const diskFile = diskFiles.shift();
// const taskFromDisk = await fs.readJson(path.join(DISK_BUFFER, diskFile));
// RAM_BUFFER.push(taskFromDisk);
// await fs.remove(path.join(DISK_BUFFER, diskFile));
// }
// }
// processing = false;
// }
// // ---------------- Check Task Status ----------------
// function getTaskStatus(taskId) {
// if (RESULTS[taskId] === undefined) return 'pending';
// if (RESULTS[taskId] === null) return 'failed';
// return 'done';
// }
// // ---------------- Fetch Result ----------------
// function getTaskResult(taskId) {
// return RESULTS[taskId] || null;
// }
// module.exports = {
// addTask,
// getTaskStatus,
// getTaskResult
// };
const { v4: uuidv4 } = require('uuid');
const { generateShadow } = require('../core/shadowGenerator');
const RAM_LIMIT = 20;
const RAM_BUFFER = [];
const RESULTS = {};
const CLEANUP_TIME = 10 * 60 * 1000; // 10 minutes
let processing = false;
// ---------------- Add Task ----------------
async function addTask(fileBuffer, options = {}) {
const taskId = uuidv4();
const task = { id: taskId, file: fileBuffer, options };
if (RAM_BUFFER.length < RAM_LIMIT) {
RAM_BUFFER.push(task);
processRAMBuffer();
} else {
return Promise.reject(new Error('Server busy, try again'));
}
return taskId;
}
// ---------------- Process RAM tasks ----------------
async function processRAMBuffer() {
if (processing || RAM_BUFFER.length === 0) return;
processing = true;
while (RAM_BUFFER.length > 0) {
const task = RAM_BUFFER.shift();
try {
const imgBuffer = await generateShadow(task.file, task.options);
RESULTS[task.id] = imgBuffer;
// Auto-clean after CLEANUP_TIME
setTimeout(() => {
delete RESULTS[task.id];
}, CLEANUP_TIME);
} catch (err) {
console.error(`Task ${task.id} failed:`, err);
RESULTS[task.id] = null;
}
}
processing = false;
}
// ---------------- Check Task Status ----------------
function getTaskStatus(taskId) {
if (RESULTS[taskId] === undefined) return 'pending';
if (RESULTS[taskId] === null) return 'failed';
return 'done';
}
// ---------------- Fetch Result ----------------
function getTaskResult(taskId) {
return RESULTS[taskId] || null;
}
// ---------------- Queue Length ----------------
function getQueueLength() {
return RAM_BUFFER.length;
}
module.exports = {
addTask,
getTaskStatus,
getTaskResult,
getQueueLength
};