everydaytok commited on
Commit
d27bfbf
·
verified ·
1 Parent(s): 1b06827

Update stateManager.js

Browse files
Files changed (1) hide show
  1. stateManager.js +24 -28
stateManager.js CHANGED
@@ -5,9 +5,8 @@ let supabase = null;
5
  const activeProjects = new Map();
6
  const initializationLocks = new Set();
7
 
8
- // --- NEW STREAMING BUFFERS ---
9
- const streamBuffers = new Map(); // projectId -> String (accumulated partial text)
10
- const statusBuffers = new Map(); // projectId -> String (current status message)
11
 
12
  export const initDB = () => {
13
  if (!process.env.SUPABASE_URL || !process.env.SUPABASE_SERVICE_ROLE_KEY) {
@@ -98,20 +97,24 @@ export const StateManager = {
98
  // LOGIC: If we have a chunk and it has room (< 20 items), UPDATE it.
99
  if (latest && currentSize < 20) {
100
  const updatedPayload = [...currentPayload, newMessage];
 
101
  const { error: updateError } = await supabase.from('message_chunks')
102
  .update({ payload: updatedPayload })
103
  .eq('id', latest.id);
 
104
  if (updateError) console.error(`[DB Error] Update chunk failed:`, updateError.message);
105
  }
106
  // ELSE: Create a NEW chunk
107
  else {
108
  const nextIndex = latestIndex + 1;
 
109
  const { error: insertError } = await supabase.from('message_chunks').insert({
110
  project_id: projectId,
111
  type,
112
  chunk_index: nextIndex,
113
  payload: [newMessage] // Start new array
114
  });
 
115
  if (insertError) console.error(`[DB Error] Insert new chunk failed:`, insertError.message);
116
  }
117
  } catch (e) {
@@ -119,6 +122,18 @@ export const StateManager = {
119
  }
120
  },
121
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  // --- COMMANDS (MEMORY ONLY + PARSING) ---
123
  queueCommand: async (projectId, input) => {
124
  let project = activeProjects.get(projectId);
@@ -167,26 +182,6 @@ export const StateManager = {
167
  return project.commandQueue.shift();
168
  },
169
 
170
- // --- STREAMING HELPERS (FOR POLLING) ---
171
- setStatus: (projectId, status) => {
172
- statusBuffers.set(projectId, status);
173
- },
174
-
175
- getStatus: (projectId) => {
176
- return statusBuffers.get(projectId) || "Idle";
177
- },
178
-
179
- appendStream: (projectId, chunk) => {
180
- const current = streamBuffers.get(projectId) || "";
181
- streamBuffers.set(projectId, current + chunk);
182
- },
183
-
184
- popStream: (projectId) => {
185
- const content = streamBuffers.get(projectId) || "";
186
- streamBuffers.set(projectId, ""); // Clear buffer after read
187
- return content;
188
- },
189
-
190
  // --- METADATA UPDATE ---
191
  updateProject: async (projectId, data) => {
192
  // 1. Update Memory
@@ -196,7 +191,7 @@ export const StateManager = {
196
  activeProjects.set(projectId, newData);
197
  }
198
 
199
- // 2. Prepare DB Payload
200
  const payload = {
201
  info: {
202
  title: data.title,
@@ -207,13 +202,14 @@ export const StateManager = {
207
  }
208
  };
209
 
 
210
  Object.keys(payload.info).forEach(key => payload.info[key] === undefined && delete payload.info[key]);
211
 
212
  const { data: currentDb } = await supabase.from('projects').select('info').eq('id', projectId).single();
213
 
214
  if (currentDb) {
215
  const mergedInfo = { ...currentDb.info, ...payload.info };
216
- delete mergedInfo.commandQueue;
217
 
218
  const { error } = await supabase.from('projects').update({ info: mergedInfo }).eq('id', projectId);
219
  if (error) console.error("[DB ERROR] Update Project failed:", error.message);
@@ -226,12 +222,12 @@ export const StateManager = {
226
  let count = 0;
227
 
228
  for (const [id, data] of activeProjects.entries()) {
229
- if (!data.lastActive) data.lastActive = now;
 
230
  if (now - data.lastActive > FOUR_HOURS) {
 
231
  activeProjects.delete(id);
232
- // Also clean streams
233
  streamBuffers.delete(id);
234
- statusBuffers.delete(id);
235
  count++;
236
  }
237
  }
 
5
  const activeProjects = new Map();
6
  const initializationLocks = new Set();
7
 
8
+ // --- NEW STREAM BUFFERS ---
9
+ const streamBuffers = new Map();
 
10
 
11
  export const initDB = () => {
12
  if (!process.env.SUPABASE_URL || !process.env.SUPABASE_SERVICE_ROLE_KEY) {
 
97
  // LOGIC: If we have a chunk and it has room (< 20 items), UPDATE it.
98
  if (latest && currentSize < 20) {
99
  const updatedPayload = [...currentPayload, newMessage];
100
+
101
  const { error: updateError } = await supabase.from('message_chunks')
102
  .update({ payload: updatedPayload })
103
  .eq('id', latest.id);
104
+
105
  if (updateError) console.error(`[DB Error] Update chunk failed:`, updateError.message);
106
  }
107
  // ELSE: Create a NEW chunk
108
  else {
109
  const nextIndex = latestIndex + 1;
110
+
111
  const { error: insertError } = await supabase.from('message_chunks').insert({
112
  project_id: projectId,
113
  type,
114
  chunk_index: nextIndex,
115
  payload: [newMessage] // Start new array
116
  });
117
+
118
  if (insertError) console.error(`[DB Error] Insert new chunk failed:`, insertError.message);
119
  }
120
  } catch (e) {
 
122
  }
123
  },
124
 
125
+ // --- STREAMING HELPERS (FOR POLLING) ---
126
+ appendStream: (projectId, chunk) => {
127
+ const current = streamBuffers.get(projectId) || "";
128
+ streamBuffers.set(projectId, current + chunk);
129
+ },
130
+
131
+ popStream: (projectId) => {
132
+ const content = streamBuffers.get(projectId) || "";
133
+ streamBuffers.set(projectId, ""); // Clear after pop
134
+ return content;
135
+ },
136
+
137
  // --- COMMANDS (MEMORY ONLY + PARSING) ---
138
  queueCommand: async (projectId, input) => {
139
  let project = activeProjects.get(projectId);
 
182
  return project.commandQueue.shift();
183
  },
184
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  // --- METADATA UPDATE ---
186
  updateProject: async (projectId, data) => {
187
  // 1. Update Memory
 
191
  activeProjects.set(projectId, newData);
192
  }
193
 
194
+ // 2. Prepare DB Payload (Exclude commandQueue)
195
  const payload = {
196
  info: {
197
  title: data.title,
 
202
  }
203
  };
204
 
205
+ // Clean undefined keys
206
  Object.keys(payload.info).forEach(key => payload.info[key] === undefined && delete payload.info[key]);
207
 
208
  const { data: currentDb } = await supabase.from('projects').select('info').eq('id', projectId).single();
209
 
210
  if (currentDb) {
211
  const mergedInfo = { ...currentDb.info, ...payload.info };
212
+ delete mergedInfo.commandQueue; // Sanity check
213
 
214
  const { error } = await supabase.from('projects').update({ info: mergedInfo }).eq('id', projectId);
215
  if (error) console.error("[DB ERROR] Update Project failed:", error.message);
 
222
  let count = 0;
223
 
224
  for (const [id, data] of activeProjects.entries()) {
225
+ if (!data.lastActive) data.lastActive = now; // Heal missing timestamp
226
+
227
  if (now - data.lastActive > FOUR_HOURS) {
228
+ console.log(`[StateManager] 🧹 Removing expired project: ${id}`);
229
  activeProjects.delete(id);
 
230
  streamBuffers.delete(id);
 
231
  count++;
232
  }
233
  }