File size: 5,060 Bytes
974e5e3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | from src.dev_pilot.state.sdlc_state import SDLCState
from src.dev_pilot.cache.redis_cache import flush_redis_cache, save_state_to_redis, get_state_from_redis
import uuid
import src.dev_pilot.utils.constants as const
from loguru import logger
class GraphExecutor:
def __init__(self, graph):
self.graph = graph
def get_thread(self, task_id):
return {"configurable": {"thread_id": task_id}}
## ------- Start the Workflow ------- ##
def start_workflow(self, project_name: str):
graph = self.graph
flush_redis_cache()
# Generate a unique task id
task_id = f"sdlc-session-{uuid.uuid4().hex[:8]}"
thread = self.get_thread(task_id)
state = None
for event in graph.stream({"project_name": project_name},thread, stream_mode="values"):
state = event
current_state = graph.get_state(thread)
save_state_to_redis(task_id, current_state)
return {"task_id" : task_id, "state": state}
## ------- User Story Generation ------- ##
def generate_stories(self, task_id:str, requirements: list[str]):
saved_state = get_state_from_redis(task_id)
if saved_state:
saved_state['requirements'] = requirements
saved_state['next_node'] = const.REVIEW_USER_STORIES
return self.update_and_resume_graph(saved_state,task_id,"get_user_requirements")
## ------- Generic Review Flow for all the feedback stages ------- ##
def graph_review_flow(self, task_id, status, feedback, review_type):
saved_state = get_state_from_redis(task_id)
if saved_state:
if review_type == const.REVIEW_USER_STORIES:
saved_state['user_stories_review_status'] = status
saved_state['user_stories_feedback'] = feedback
node_name = "review_user_stories"
saved_state['next_node'] = const.REVIEW_USER_STORIES if status == "feedback" else const.REVIEW_DESIGN_DOCUMENTS
elif review_type == const.REVIEW_DESIGN_DOCUMENTS:
saved_state['design_documents_review_status'] = status
saved_state['design_documents_feedback'] = feedback
node_name = "review_design_documents"
saved_state['next_node'] = const.REVIEW_DESIGN_DOCUMENTS if status == "feedback" else const.REVIEW_CODE
elif review_type == const.REVIEW_CODE:
saved_state['code_review_status'] = status
saved_state['code_review_feedback'] = feedback
node_name = "code_review"
saved_state['next_node'] = const.REVIEW_CODE if status == "feedback" else const.REVIEW_SECURITY_RECOMMENDATIONS
elif review_type == const.REVIEW_SECURITY_RECOMMENDATIONS:
saved_state['security_review_status'] = status
saved_state['security_review_comments'] = feedback
node_name = "security_review"
saved_state['next_node'] = const.REVIEW_SECURITY_RECOMMENDATIONS if status == "feedback" else const.REVIEW_TEST_CASES
elif review_type == const.REVIEW_TEST_CASES:
saved_state['test_case_review_status'] = status
saved_state['test_case_review_feedback'] = feedback
node_name = "review_test_cases"
saved_state['next_node'] = const.REVIEW_TEST_CASES if status == "feedback" else const.REVIEW_QA_TESTING
elif review_type == const.REVIEW_QA_TESTING:
saved_state['qa_testing_status'] = status
saved_state['qa_testing_feedback'] = feedback
node_name = "qa_review"
saved_state['next_node'] = const.REVIEW_QA_TESTING if status == "feedback" else const.END_NODE
else:
raise ValueError(f"Unsupported review type: {review_type}")
return self.update_and_resume_graph(saved_state,task_id,node_name)
## -------- Helper Method to handle the graph resume state ------- ##
def update_and_resume_graph(self, saved_state,task_id, as_node):
graph = self.graph
thread = self.get_thread(task_id)
graph.update_state(thread, saved_state, as_node=as_node)
# Resume the graph
state = None
for event in graph.stream(None, thread, stream_mode="values"):
logger.debug(f"Event Received: {event}")
state = event
# saving the state before asking the product owner for review
current_state = graph.get_state(thread)
save_state_to_redis(task_id, current_state)
return {"task_id" : task_id, "state": state}
def get_updated_state(self, task_id):
saved_state = get_state_from_redis(task_id)
return {"task_id" : task_id, "state": saved_state}
|