DevPilot / src /dev_pilot /graph /graph_executor.py
msaifee's picture
DevPilot
974e5e3
from src.dev_pilot.state.sdlc_state import SDLCState
from src.dev_pilot.cache.redis_cache import flush_redis_cache, save_state_to_redis, get_state_from_redis
import uuid
import src.dev_pilot.utils.constants as const
from loguru import logger
class GraphExecutor:
def __init__(self, graph):
self.graph = graph
def get_thread(self, task_id):
return {"configurable": {"thread_id": task_id}}
## ------- Start the Workflow ------- ##
def start_workflow(self, project_name: str):
graph = self.graph
flush_redis_cache()
# Generate a unique task id
task_id = f"sdlc-session-{uuid.uuid4().hex[:8]}"
thread = self.get_thread(task_id)
state = None
for event in graph.stream({"project_name": project_name},thread, stream_mode="values"):
state = event
current_state = graph.get_state(thread)
save_state_to_redis(task_id, current_state)
return {"task_id" : task_id, "state": state}
## ------- User Story Generation ------- ##
def generate_stories(self, task_id:str, requirements: list[str]):
saved_state = get_state_from_redis(task_id)
if saved_state:
saved_state['requirements'] = requirements
saved_state['next_node'] = const.REVIEW_USER_STORIES
return self.update_and_resume_graph(saved_state,task_id,"get_user_requirements")
## ------- Generic Review Flow for all the feedback stages ------- ##
def graph_review_flow(self, task_id, status, feedback, review_type):
saved_state = get_state_from_redis(task_id)
if saved_state:
if review_type == const.REVIEW_USER_STORIES:
saved_state['user_stories_review_status'] = status
saved_state['user_stories_feedback'] = feedback
node_name = "review_user_stories"
saved_state['next_node'] = const.REVIEW_USER_STORIES if status == "feedback" else const.REVIEW_DESIGN_DOCUMENTS
elif review_type == const.REVIEW_DESIGN_DOCUMENTS:
saved_state['design_documents_review_status'] = status
saved_state['design_documents_feedback'] = feedback
node_name = "review_design_documents"
saved_state['next_node'] = const.REVIEW_DESIGN_DOCUMENTS if status == "feedback" else const.REVIEW_CODE
elif review_type == const.REVIEW_CODE:
saved_state['code_review_status'] = status
saved_state['code_review_feedback'] = feedback
node_name = "code_review"
saved_state['next_node'] = const.REVIEW_CODE if status == "feedback" else const.REVIEW_SECURITY_RECOMMENDATIONS
elif review_type == const.REVIEW_SECURITY_RECOMMENDATIONS:
saved_state['security_review_status'] = status
saved_state['security_review_comments'] = feedback
node_name = "security_review"
saved_state['next_node'] = const.REVIEW_SECURITY_RECOMMENDATIONS if status == "feedback" else const.REVIEW_TEST_CASES
elif review_type == const.REVIEW_TEST_CASES:
saved_state['test_case_review_status'] = status
saved_state['test_case_review_feedback'] = feedback
node_name = "review_test_cases"
saved_state['next_node'] = const.REVIEW_TEST_CASES if status == "feedback" else const.REVIEW_QA_TESTING
elif review_type == const.REVIEW_QA_TESTING:
saved_state['qa_testing_status'] = status
saved_state['qa_testing_feedback'] = feedback
node_name = "qa_review"
saved_state['next_node'] = const.REVIEW_QA_TESTING if status == "feedback" else const.END_NODE
else:
raise ValueError(f"Unsupported review type: {review_type}")
return self.update_and_resume_graph(saved_state,task_id,node_name)
## -------- Helper Method to handle the graph resume state ------- ##
def update_and_resume_graph(self, saved_state,task_id, as_node):
graph = self.graph
thread = self.get_thread(task_id)
graph.update_state(thread, saved_state, as_node=as_node)
# Resume the graph
state = None
for event in graph.stream(None, thread, stream_mode="values"):
logger.debug(f"Event Received: {event}")
state = event
# saving the state before asking the product owner for review
current_state = graph.get_state(thread)
save_state_to_redis(task_id, current_state)
return {"task_id" : task_id, "state": state}
def get_updated_state(self, task_id):
saved_state = get_state_from_redis(task_id)
return {"task_id" : task_id, "state": saved_state}