nikhmr1235 commited on
Commit
91567a3
·
verified ·
1 Parent(s): e25c06d

make new graph execution unique thread_id as local variable => allows PR bot to handle multiple PRs concurrently

Browse files
Files changed (1) hide show
  1. app.py +12 -21
app.py CHANGED
@@ -74,11 +74,6 @@ SQLITE_DB_PATH = tempfile.gettempdir() + '/langgraph_checkpoints.sqlite'
74
 
75
  # --- Checkpointer and Graph Compilation ---
76
  global_memory_saver = None # Initialize to None
77
- new_thread_id = str(uuid.uuid4())
78
-
79
- #----- thread_id is made global until below gradio issue is resolved
80
- # gradio_issue: https://github.com/gradio-app/gradio/issues/10893
81
-
82
 
83
  try:
84
  # Attempt to connect to SQLite
@@ -235,8 +230,8 @@ async def github_webhook(
235
  print(f"pr_repo_name: {pr_repo_name} ")
236
 
237
  # each graph invocation will have unique thread_id, it can be used to RESUME graph post human feedback.
238
- #new_thread_id = str(uuid.uuid4())
239
- print(f"\n--- Starting New Graph Execution with thread_id: {new_thread_id} ---")
240
 
241
  if not global_graph:
242
  print("Error: Graph not initialized. Check server logs.")
@@ -269,7 +264,7 @@ async def github_webhook(
269
  print("Graph is being streamed....")
270
 
271
  try:
272
- for s in global_graph.stream(initial_state, {"configurable": {"thread_id": new_thread_id}}):
273
  if "__end__" in s:
274
  break
275
  elif "__interrupt__" in s:
@@ -279,7 +274,7 @@ async def github_webhook(
279
  pass
280
 
281
  print("printing state from web_hook since graph is paused at this point")
282
- current_state_snapshot = global_graph.get_state({"configurable": {"thread_id": new_thread_id}})
283
  current_state = current_state_snapshot.values
284
 
285
  require_human_approval = current_state['require_human_approval']
@@ -296,10 +291,10 @@ async def github_webhook(
296
  return
297
 
298
 
299
- encoded_review_link = quote_plus(review_url)
300
- gradio_review_url = f"{GRADIO_SPACE_BASE_URL}?review_id={review_id}&review_link={encoded_review_link}"
301
 
302
- print(f"gradio_review_url:{gradio_review_url}\nencoded_review_link:{encoded_review_link}")
303
  comment_body = f"""
304
  🤖 **Human Review Required!** 🤖
305
 
@@ -356,6 +351,7 @@ class HumanInputPayload(BaseModel):
356
  review_id: int
357
  approval_status: str # "approved" or "rejected"
358
  feedback_message: Optional[str] = None # Optional feedback
 
359
 
360
 
361
  @app.post("/resume-review")
@@ -366,15 +362,15 @@ async def resume_review(payload: HumanInputPayload):
366
  print(f"Received human input for Review ID: {payload.review_id}")
367
  print(f"Status: {payload.approval_status}")
368
  print(f"Feedback: {payload.feedback_message}")
369
-
370
- #TODO: get the thread_id_state here from payload
371
 
372
  if not global_graph or not global_memory_saver:
373
  print("Error: Graph or checkpointer not initialized. Check server logs.")
374
  return
375
 
376
  try:
377
- current_state_snapshot = global_graph.get_state({"configurable": {"thread_id": new_thread_id}})
 
378
  current_state_values = current_state_snapshot.values
379
 
380
  if payload.approval_status.lower() == "rejected":
@@ -384,12 +380,7 @@ async def resume_review(payload: HumanInputPayload):
384
 
385
  current_state_values["human_feedback_message"] = payload.feedback_message
386
 
387
-
388
- # Use put_state() from the SqliteSaver instance
389
- #global_memory_saver.put_state(current_state_values, {"configurable": {"thread_id": thread_id_state}})
390
-
391
  # Update the state directly
392
- config = {"configurable": {"thread_id": new_thread_id}}
393
  global_graph.update_state(config, current_state_values)
394
  print("graph state updated successfuly with human feedback")
395
 
@@ -445,4 +436,4 @@ async def resume_review(payload: HumanInputPayload):
445
  # detail=f"Failed to resume workflow: {e}"
446
  # )
447
 
448
- return {"message": "Human input received and processed successfully!"}
 
74
 
75
  # --- Checkpointer and Graph Compilation ---
76
  global_memory_saver = None # Initialize to None
 
 
 
 
 
77
 
78
  try:
79
  # Attempt to connect to SQLite
 
230
  print(f"pr_repo_name: {pr_repo_name} ")
231
 
232
  # each graph invocation will have unique thread_id, it can be used to RESUME graph post human feedback.
233
+ graph_thread_id = str(uuid.uuid4())
234
+ print(f"\n--- Starting New Graph Execution with thread_id: {graph_thread_id} ---")
235
 
236
  if not global_graph:
237
  print("Error: Graph not initialized. Check server logs.")
 
264
  print("Graph is being streamed....")
265
 
266
  try:
267
+ for s in global_graph.stream(initial_state, {"configurable": {"thread_id": graph_thread_id}}):
268
  if "__end__" in s:
269
  break
270
  elif "__interrupt__" in s:
 
274
  pass
275
 
276
  print("printing state from web_hook since graph is paused at this point")
277
+ current_state_snapshot = global_graph.get_state({"configurable": {"thread_id": graph_thread_id}})
278
  current_state = current_state_snapshot.values
279
 
280
  require_human_approval = current_state['require_human_approval']
 
291
  return
292
 
293
 
294
+ thread_id = graph_thread_id
295
+ gradio_review_url = f"{GRADIO_SPACE_BASE_URL}?review_id={review_id}&thread_id={thread_id}"
296
 
297
+ print(f"gradio_review_url:{gradio_review_url}\nthread_id:{thread_id}")
298
  comment_body = f"""
299
  🤖 **Human Review Required!** 🤖
300
 
 
351
  review_id: int
352
  approval_status: str # "approved" or "rejected"
353
  feedback_message: Optional[str] = None # Optional feedback
354
+ thread_id: str
355
 
356
 
357
  @app.post("/resume-review")
 
362
  print(f"Received human input for Review ID: {payload.review_id}")
363
  print(f"Status: {payload.approval_status}")
364
  print(f"Feedback: {payload.feedback_message}")
365
+ print(f"Thread ID: {payload.thread_id}")
 
366
 
367
  if not global_graph or not global_memory_saver:
368
  print("Error: Graph or checkpointer not initialized. Check server logs.")
369
  return
370
 
371
  try:
372
+ config = {"configurable": {"thread_id": payload.thread_id}}
373
+ current_state_snapshot = global_graph.get_state(config)
374
  current_state_values = current_state_snapshot.values
375
 
376
  if payload.approval_status.lower() == "rejected":
 
380
 
381
  current_state_values["human_feedback_message"] = payload.feedback_message
382
 
 
 
 
 
383
  # Update the state directly
 
384
  global_graph.update_state(config, current_state_values)
385
  print("graph state updated successfuly with human feedback")
386
 
 
436
  # detail=f"Failed to resume workflow: {e}"
437
  # )
438
 
439
+ return {"message": "Human input received and processed successfully!"}