Timothy Eastridge commited on
Commit
12db5fc
·
1 Parent(s): 7faf776

commit step 7

Browse files
Files changed (1) hide show
  1. agent/main.py +120 -11
agent/main.py CHANGED
@@ -2,6 +2,8 @@ import os
2
  import time
3
  import json
4
  import requests
 
 
5
  from datetime import datetime
6
  import openai
7
  from anthropic import Anthropic
@@ -20,6 +22,17 @@ if "gpt" in LLM_MODEL:
20
  else:
21
  llm_client = Anthropic(api_key=LLM_API_KEY)
22
 
 
 
 
 
 
 
 
 
 
 
 
23
  def call_mcp(tool, params=None):
24
  response = requests.post(
25
  MCP_URL,
@@ -238,10 +251,76 @@ Return ONLY the SQL query, no explanations or markdown."""
238
  "error": str(e)
239
  }
240
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
241
  def main():
 
242
  print(f"[{datetime.now()}] Agent starting, polling every {POLL_INTERVAL}s")
243
 
244
- while True:
245
  try:
246
  result = call_mcp("get_next_instruction")
247
  instruction = result.get("instruction")
@@ -249,22 +328,48 @@ def main():
249
  if instruction:
250
  print(f"[{datetime.now()}] Found instruction: {instruction['id']}, type: {instruction['type']}")
251
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
252
  # Update status to executing
253
  call_mcp("query_graph", {
254
  "query": "MATCH (i:Instruction {id: $id}) SET i.status = 'executing'",
255
  "parameters": {"id": instruction['id']}
256
  })
257
 
258
- # Handle different instruction types
259
  if instruction['type'] == 'discover_schema':
260
  exec_result = handle_discover_schema(instruction)
261
  elif instruction['type'] == 'generate_sql':
262
  exec_result = handle_generate_sql(instruction)
263
  else:
264
- # Default dummy execution
265
- exec_result = {"status": "success", "result": "Dummy execution"}
266
 
267
- # Create Execution node
268
  exec_node = call_mcp("write_graph", {
269
  "action": "create_node",
270
  "label": "Execution",
@@ -276,7 +381,7 @@ def main():
276
  }
277
  })
278
 
279
- # Link execution to instruction
280
  call_mcp("query_graph", {
281
  "query": """
282
  MATCH (i:Instruction {id: $iid}), (e:Execution {id: $eid})
@@ -288,21 +393,25 @@ def main():
288
  }
289
  })
290
 
291
- # Update instruction status
292
  final_status = 'complete' if exec_result.get('status') == 'success' else 'failed'
293
  call_mcp("query_graph", {
294
  "query": "MATCH (i:Instruction {id: $id}) SET i.status = $status",
295
  "parameters": {"id": instruction['id'], "status": final_status}
296
  })
297
 
298
- print(f"[{datetime.now()}] Completed instruction: {instruction['id']} with status: {final_status}")
 
299
  else:
300
  print(f"[{datetime.now()}] No pending instructions")
301
-
 
 
302
  except Exception as e:
303
  print(f"[{datetime.now()}] Error: {e}")
304
-
305
- time.sleep(POLL_INTERVAL)
 
306
 
307
  if __name__ == "__main__":
308
  main()
 
2
  import time
3
  import json
4
  import requests
5
+ import signal
6
+ import sys
7
  from datetime import datetime
8
  import openai
9
  from anthropic import Anthropic
 
22
  else:
23
  llm_client = Anthropic(api_key=LLM_API_KEY)
24
 
25
+ # Global flag for interrupt handling
26
+ interrupted = False
27
+
28
+ def signal_handler(sig, frame):
29
+ global interrupted
30
+ interrupted = True
31
+ print(f"\n[{datetime.now()}] Interrupt received, will stop after current instruction")
32
+
33
+ signal.signal(signal.SIGINT, signal_handler)
34
+ signal.signal(signal.SIGTERM, signal_handler)
35
+
36
  def call_mcp(tool, params=None):
37
  response = requests.post(
38
  MCP_URL,
 
251
  "error": str(e)
252
  }
253
 
254
+ def check_workflow_stop(workflow_id):
255
+ """Check if workflow has been marked to stop"""
256
+ result = call_mcp("query_graph", {
257
+ "query": "MATCH (w:Workflow {id: $id}) RETURN w.status as status",
258
+ "parameters": {"id": workflow_id}
259
+ })
260
+
261
+ if result['data'] and result['data'][0]['status'] == 'stopped':
262
+ return True
263
+ return False
264
+
265
+ def pause_with_interrupt(duration, instruction_id, workflow_id=None):
266
+ """Pause for duration seconds with interrupt checking every 10 seconds"""
267
+ print(f"[{datetime.now()}] Pausing for {duration} seconds for human review")
268
+ print(f"[{datetime.now()}] You can edit instruction in Neo4j Browser:")
269
+ print(f" MATCH (i:Instruction {{id: '{instruction_id}'}}) SET i.parameters = '{{\"key\": \"value\"}}'")
270
+
271
+ # Log pause start
272
+ call_mcp("write_graph", {
273
+ "action": "create_node",
274
+ "label": "Log",
275
+ "properties": {
276
+ "type": "pause_started",
277
+ "instruction_id": instruction_id,
278
+ "duration": duration,
279
+ "timestamp": datetime.now().isoformat()
280
+ }
281
+ })
282
+
283
+ elapsed = 0
284
+ while elapsed < duration:
285
+ # Check every 10 seconds
286
+ sleep_time = min(10, duration - elapsed)
287
+ time.sleep(sleep_time)
288
+ elapsed += sleep_time
289
+
290
+ # Check for workflow stop
291
+ if workflow_id and check_workflow_stop(workflow_id):
292
+ print(f"[{datetime.now()}] Workflow stopped during pause")
293
+ return False
294
+
295
+ # Check for global interrupt
296
+ if interrupted:
297
+ print(f"[{datetime.now()}] Interrupted during pause")
298
+ return False
299
+
300
+ # Show progress
301
+ remaining = duration - elapsed
302
+ if remaining > 0 and elapsed % 30 == 0: # Update every 30 seconds
303
+ print(f"[{datetime.now()}] Pause remaining: {remaining} seconds")
304
+
305
+ # Log pause end
306
+ call_mcp("write_graph", {
307
+ "action": "create_node",
308
+ "label": "Log",
309
+ "properties": {
310
+ "type": "pause_completed",
311
+ "instruction_id": instruction_id,
312
+ "timestamp": datetime.now().isoformat()
313
+ }
314
+ })
315
+
316
+ print(f"[{datetime.now()}] Pause complete, continuing execution")
317
+ return True
318
+
319
  def main():
320
+ global interrupted
321
  print(f"[{datetime.now()}] Agent starting, polling every {POLL_INTERVAL}s")
322
 
323
+ while not interrupted:
324
  try:
325
  result = call_mcp("get_next_instruction")
326
  instruction = result.get("instruction")
 
328
  if instruction:
329
  print(f"[{datetime.now()}] Found instruction: {instruction['id']}, type: {instruction['type']}")
330
 
331
+ # Get workflow ID
332
+ workflow_result = call_mcp("query_graph", {
333
+ "query": """
334
+ MATCH (w:Workflow)-[:HAS_INSTRUCTION]->(i:Instruction {id: $id})
335
+ RETURN w.id as workflow_id
336
+ """,
337
+ "parameters": {"id": instruction['id']}
338
+ })
339
+ workflow_id = workflow_result['data'][0]['workflow_id'] if workflow_result['data'] else None
340
+
341
+ # PAUSE BEFORE EXECUTION
342
+ pause_duration = instruction.get('pause_duration', 300)
343
+ if pause_duration > 0:
344
+ if not pause_with_interrupt(pause_duration, instruction['id'], workflow_id):
345
+ print(f"[{datetime.now()}] Execution cancelled during pause")
346
+ continue
347
+
348
+ # Re-fetch instruction to get any edits made during pause
349
+ refetch_result = call_mcp("query_graph", {
350
+ "query": "MATCH (i:Instruction {id: $id}) RETURN i",
351
+ "parameters": {"id": instruction['id']}
352
+ })
353
+
354
+ if refetch_result['data']:
355
+ instruction = refetch_result['data'][0]['i']
356
+ print(f"[{datetime.now()}] Re-fetched instruction after pause, parameters: {instruction.get('parameters')}")
357
+
358
  # Update status to executing
359
  call_mcp("query_graph", {
360
  "query": "MATCH (i:Instruction {id: $id}) SET i.status = 'executing'",
361
  "parameters": {"id": instruction['id']}
362
  })
363
 
364
+ # Execute based on type
365
  if instruction['type'] == 'discover_schema':
366
  exec_result = handle_discover_schema(instruction)
367
  elif instruction['type'] == 'generate_sql':
368
  exec_result = handle_generate_sql(instruction)
369
  else:
370
+ exec_result = {"status": "success", "result": "Reviewed"}
 
371
 
372
+ # Store execution result
373
  exec_node = call_mcp("write_graph", {
374
  "action": "create_node",
375
  "label": "Execution",
 
381
  }
382
  })
383
 
384
+ # Link execution
385
  call_mcp("query_graph", {
386
  "query": """
387
  MATCH (i:Instruction {id: $iid}), (e:Execution {id: $eid})
 
393
  }
394
  })
395
 
396
+ # Update status
397
  final_status = 'complete' if exec_result.get('status') == 'success' else 'failed'
398
  call_mcp("query_graph", {
399
  "query": "MATCH (i:Instruction {id: $id}) SET i.status = $status",
400
  "parameters": {"id": instruction['id'], "status": final_status}
401
  })
402
 
403
+ print(f"[{datetime.now()}] Completed instruction: {instruction['id']}")
404
+
405
  else:
406
  print(f"[{datetime.now()}] No pending instructions")
407
+
408
+ time.sleep(POLL_INTERVAL)
409
+
410
  except Exception as e:
411
  print(f"[{datetime.now()}] Error: {e}")
412
+ time.sleep(POLL_INTERVAL)
413
+
414
+ print(f"[{datetime.now()}] Agent shutting down")
415
 
416
  if __name__ == "__main__":
417
  main()