modified_ai_server / stateManager.js
everydaytok's picture
Update stateManager.js
a7a7ca5 verified
import { createClient } from '@supabase/supabase-js';
let supabase = null;
const activeProjects = new Map();
const initializationLocks = new Set();
// --- REALTIME BUFFERS ---
const streamBuffers = new Map(); // Destructive (Plugin)
const statusBuffers = new Map(); // Status (Frontend)
const snapshotBuffers = new Map(); // Non-Destructive (Frontend)
export const initDB = () => {
if (!process.env.SUPABASE_URL || !process.env.SUPABASE_SERVICE_ROLE_KEY) {
console.error("Missing Supabase Env Variables");
return;
}
supabase = createClient(process.env.SUPABASE_URL, process.env.SUPABASE_SERVICE_ROLE_KEY);
};
export const StateManager = {
isLocked: (projectId) => initializationLocks.has(projectId),
lock: (projectId) => initializationLocks.add(projectId),
unlock: (projectId) => initializationLocks.delete(projectId),
getProject: async (projectId) => {
if (activeProjects.has(projectId)) {
const cached = activeProjects.get(projectId);
if (cached.workerHistory && cached.pmHistory) {
return cached;
}
}
const { data: proj, error } = await supabase.from('projects').select('*').eq('id', projectId).single();
if (error || !proj) return null;
const { data: chunks } = await supabase.from('message_chunks')
.select('*').eq('project_id', projectId)
.order('chunk_index', { ascending: false }).limit(10);
const memoryObject = {
...proj.info,
id: proj.id,
userId: proj.user_id,
thumbnail: proj.info?.thumbnail || null,
gdd: proj.info?.gdd || null,
workerHistory: (chunks || []).filter(c => c.type === 'worker').reverse().flatMap(c => c.payload || []),
pmHistory: (chunks || []).filter(c => c.type === 'pm').reverse().flatMap(c => c.payload || []),
commandQueue: [],
failureCount: proj.info?.failureCount || 0,
lastActive: Date.now()
};
activeProjects.set(projectId, memoryObject);
return memoryObject;
},
addHistory: async (projectId, type, role, text) => {
const newMessage = { role, parts: [{ text }] };
const project = activeProjects.get(projectId);
if (project) {
const historyKey = type === 'pm' ? 'pmHistory' : 'workerHistory';
if (!Array.isArray(project[historyKey])) project[historyKey] = [];
project[historyKey].push(newMessage);
}
try {
const { data: chunks, error: fetchError } = await supabase.from('message_chunks')
.select('id, chunk_index, payload')
.eq('project_id', projectId)
.eq('type', type)
.order('chunk_index', { ascending: false })
.limit(10);
if (fetchError) {
console.error(`[DB Error] Failed to fetch history for ${projectId}:`, fetchError.message);
return;
}
const latest = chunks?.[0];
const currentPayload = (latest && Array.isArray(latest.payload)) ? latest.payload : [];
const latestIndex = (latest && typeof latest.chunk_index === 'number') ? latest.chunk_index : -1;
if (latest && currentPayload.length < 20) {
const updatedPayload = [...currentPayload, newMessage];
const { error: updateError } = await supabase.from('message_chunks')
.update({ payload: updatedPayload })
.eq('id', latest.id);
if (updateError) console.error(`[DB Error] Update chunk failed:`, updateError.message);
}
else {
const nextIndex = latestIndex + 1;
const { error: insertError } = await supabase.from('message_chunks').insert({
project_id: projectId,
type,
chunk_index: nextIndex,
payload: [newMessage]
});
if (insertError) console.error(`[DB Error] Insert new chunk failed:`, insertError.message);
}
} catch (e) {
console.error("[StateManager] Unexpected error in addHistory:", e);
}
},
setStatus: (projectId, status) => {
statusBuffers.set(projectId, status);
},
getStatus: (projectId) => {
// FIX: Default to "Idle" instead of "Working..." to prevent plugin lockup
return statusBuffers.get(projectId) || "Idle";
},
appendStream: (projectId, chunk) => {
const currentDestructive = streamBuffers.get(projectId) || "";
streamBuffers.set(projectId, currentDestructive + chunk);
const currentSnapshot = snapshotBuffers.get(projectId) || "";
snapshotBuffers.set(projectId, currentSnapshot + chunk);
},
appendSnapshotOnly: (projectId, chunk) => {
const currentSnapshot = snapshotBuffers.get(projectId) || "";
snapshotBuffers.set(projectId, currentSnapshot + chunk);
},
popStream: (projectId) => {
const content = streamBuffers.get(projectId) || "";
streamBuffers.set(projectId, "");
return content;
},
getSnapshot: (projectId) => {
return snapshotBuffers.get(projectId) || "";
},
clearSnapshot: (projectId) => {
snapshotBuffers.delete(projectId);
statusBuffers.delete(projectId);
},
queueCommand: async (projectId, input) => {
let project = activeProjects.get(projectId);
if (!project) {
project = await StateManager.getProject(projectId);
}
if (!project) return;
let command = null;
if (typeof input === 'object' && input.type && input.payload) {
command = input;
}
else if (typeof input === 'string') {
const rawResponse = input;
if (rawResponse.includes("[ASK_PM:")) return;
if (rawResponse.includes("[ROUTE_TO_PM:")) return;
// Only skip image generation if there's no code block, otherwise logic flow handles it
if (rawResponse.includes("[GENERATE_IMAGE:") && !rawResponse.includes("```")) return;
const codeMatch = rawResponse.match(/```(?:lua|luau)?([\s\S]*?)```/i);
const readScriptMatch = rawResponse.match(/\[READ_SCRIPT:\s*(.*?)\]/i);
const readHierarchyMatch = rawResponse.match(/\[READ_HIERARCHY(?::\s*(.*?))?\]/i);
const readLogsMatch = rawResponse.includes("[READ_LOGS]");
if (codeMatch) {
command = { type: "EXECUTE", payload: codeMatch[1].trim() };
}
else if (readScriptMatch) {
command = { type: "READ_SCRIPT", payload: readScriptMatch[1].trim() };
}
else if (readHierarchyMatch) {
command = { type: "READ_HIERARCHY", payload: readHierarchyMatch[1] ? readHierarchyMatch[1].trim() : "All" };
}
else if (readLogsMatch) {
command = { type: "READ_LOGS", payload: null };
}
}
if (command) {
if (!project.commandQueue) project.commandQueue = [];
project.commandQueue.push(command);
console.log(`[Memory] Queued command for ${projectId}: ${command.type}`);
}
},
popCommand: async (projectId) => {
const project = activeProjects.get(projectId);
if (!project || !project.commandQueue || project.commandQueue.length === 0) return null;
return project.commandQueue.shift();
},
updateProject: async (projectId, data) => {
const now = new Date().toISOString();
if (activeProjects.has(projectId)) {
const current = activeProjects.get(projectId);
const newData = {
...current,
...data,
lastActive: Date.now()
};
activeProjects.set(projectId, newData);
}
const payload = {
info: {
title: data.title,
status: data.status,
stats: data.stats,
description: data.description,
failureCount: data.failureCount,
last_edited: now
}
};
Object.keys(payload.info).forEach(key => payload.info[key] === undefined && delete payload.info[key]);
const { data: currentDb } = await supabase.from('projects').select('info').eq('id', projectId).single();
if (currentDb) {
const mergedInfo = { ...currentDb.info, ...payload.info };
delete mergedInfo.commandQueue;
const { error: infoError } = await supabase.from('projects')
.update({ info: mergedInfo })
.eq('id', projectId);
if (infoError) console.error("[DB ERROR] Update Project Info failed:", infoError.message);
}
},
cleanupMemory: () => {
const now = Date.now();
const FOUR_HOURS = 4 * 60 * 60 * 1000;
let count = 0;
for (const [id, data] of activeProjects.entries()) {
if (!data.lastActive) data.lastActive = now;
if (now - data.lastActive > FOUR_HOURS) {
console.log(`[StateManager] 🧹 Removing expired project: ${id}`);
activeProjects.delete(id);
streamBuffers.delete(id);
snapshotBuffers.delete(id);
statusBuffers.delete(id);
count++;
}
}
return count;
},
getSupabaseClient: () => supabase
};