| | import streamlit as st |
| | from src.dev_pilot.LLMS.groqllm import GroqLLM |
| | from src.dev_pilot.LLMS.geminillm import GeminiLLM |
| | from src.dev_pilot.LLMS.openai_llm import OpenAILLM |
| | from src.dev_pilot.graph.graph_builder import GraphBuilder |
| | from src.dev_pilot.ui.uiconfigfile import Config |
| | import src.dev_pilot.utils.constants as const |
| | from src.dev_pilot.graph.graph_executor import GraphExecutor |
| | from src.dev_pilot.state.sdlc_state import UserStoryList |
| | import os |
| |
|
| | def initialize_session(): |
| | st.session_state.stage = const.PROJECT_INITILIZATION |
| | st.session_state.project_name = "" |
| | st.session_state.requirements = "" |
| | st.session_state.task_id = "" |
| | st.session_state.state = {} |
| |
|
| |
|
| | def load_sidebar_ui(config): |
| | user_controls = {} |
| | |
| | with st.sidebar: |
| | |
| | llm_options = config.get_llm_options() |
| |
|
| | |
| | user_controls["selected_llm"] = st.selectbox("Select LLM", llm_options) |
| |
|
| | if user_controls["selected_llm"] == 'Groq': |
| | |
| | model_options = config.get_groq_model_options() |
| | user_controls["selected_groq_model"] = st.selectbox("Select Model", model_options) |
| | |
| | os.environ["GROQ_API_KEY"] = user_controls["GROQ_API_KEY"] = st.session_state["GROQ_API_KEY"] = st.text_input("API Key", |
| | type="password", |
| | value=os.getenv("GROQ_API_KEY", "")) |
| | |
| | if not user_controls["GROQ_API_KEY"]: |
| | st.warning("⚠️ Please enter your GROQ API key to proceed. Don't have? refer : https://console.groq.com/keys ") |
| | |
| | if user_controls["selected_llm"] == 'Gemini': |
| | |
| | model_options = config.get_gemini_model_options() |
| | user_controls["selected_gemini_model"] = st.selectbox("Select Model", model_options) |
| | |
| | os.environ["GEMINI_API_KEY"] = user_controls["GEMINI_API_KEY"] = st.session_state["GEMINI_API_KEY"] = st.text_input("API Key", |
| | type="password", |
| | value=os.getenv("GEMINI_API_KEY", "")) |
| | |
| | if not user_controls["GEMINI_API_KEY"]: |
| | st.warning("⚠️ Please enter your GEMINI API key to proceed. Don't have? refer : https://ai.google.dev/gemini-api/docs/api-key ") |
| | |
| | |
| | if user_controls["selected_llm"] == 'OpenAI': |
| | |
| | model_options = config.get_openai_model_options() |
| | user_controls["selected_openai_model"] = st.selectbox("Select Model", model_options) |
| | |
| | os.environ["OPENAI_API_KEY"] = user_controls["OPENAI_API_KEY"] = st.session_state["OPENAI_API_KEY"] = st.text_input("API Key", |
| | type="password", |
| | value=os.getenv("OPENAI_API_KEY", "")) |
| | |
| | if not user_controls["OPENAI_API_KEY"]: |
| | st.warning("⚠️ Please enter your OPENAI API key to proceed. Don't have? refer : https://platform.openai.com/api-keys ") |
| | |
| | if st.button("Reset Session"): |
| | for key in list(st.session_state.keys()): |
| | del st.session_state[key] |
| | |
| | initialize_session() |
| | st.rerun() |
| | |
| | st.subheader("Workflow Overview") |
| | st.image("workflow_graph.png") |
| | |
| | return user_controls |
| |
|
| |
|
| | def load_streamlit_ui(config): |
| | st.set_page_config(page_title=config.get_page_title(), layout="wide") |
| | st.header(config.get_page_title()) |
| | st.subheader("Let AI agents plan your SDLC journey", divider="rainbow", anchor=False) |
| | user_controls = load_sidebar_ui(config) |
| | return user_controls |
| |
|
| |
|
| | |
| | def load_app(): |
| | """ |
| | Main entry point for the Streamlit app using tab-based UI. |
| | """ |
| | config = Config() |
| | if 'stage' not in st.session_state: |
| | initialize_session() |
| |
|
| | user_input = load_streamlit_ui(config) |
| | if not user_input: |
| | st.error("Error: Failed to load user input from the UI.") |
| | return |
| |
|
| | try: |
| | |
| | selectedLLM = user_input.get("selected_llm") |
| | model = None |
| | if selectedLLM == "Gemini": |
| | obj_llm_config = GeminiLLM(user_controls_input=user_input) |
| | model = obj_llm_config.get_llm_model() |
| | elif selectedLLM == "Groq": |
| | obj_llm_config = GroqLLM(user_controls_input=user_input) |
| | model = obj_llm_config.get_llm_model() |
| | elif selectedLLM == "OpenAI": |
| | obj_llm_config = OpenAILLM(user_controls_input=user_input) |
| | model = obj_llm_config.get_llm_model() |
| | if not model: |
| | st.error("Error: LLM model could not be initialized.") |
| | return |
| |
|
| | |
| | graph_builder = GraphBuilder(model) |
| | try: |
| | graph = graph_builder.setup_graph() |
| | graph_executor = GraphExecutor(graph) |
| | except Exception as e: |
| | st.error(f"Error: Graph setup failed - {e}") |
| | return |
| |
|
| | |
| | tabs = st.tabs(["Project Requirement", "User Stories", "Design Documents", "Code Generation", "Test Cases", "QA Testing", "Deployment", "Download Artifacts"]) |
| |
|
| | |
| | with tabs[0]: |
| | st.header("Project Requirement") |
| | project_name = st.text_input("Enter the project name:", value=st.session_state.get("project_name", "")) |
| | st.session_state.project_name = project_name |
| |
|
| | if st.session_state.stage == const.PROJECT_INITILIZATION: |
| | if st.button("🚀 Let's Start"): |
| | if not project_name: |
| | st.error("Please enter a project name.") |
| | st.stop() |
| | graph_response = graph_executor.start_workflow(project_name) |
| | st.session_state.task_id = graph_response["task_id"] |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.project_name = project_name |
| | st.session_state.stage = const.REQUIREMENT_COLLECTION |
| | st.rerun() |
| |
|
| | |
| | if st.session_state.stage in [const.REQUIREMENT_COLLECTION, const.GENERATE_USER_STORIES]: |
| | requirements_input = st.text_area( |
| | "Enter the requirements. Write each requirement on a new line:", |
| | value="\n".join(st.session_state.get("requirements", [])) |
| | ) |
| | if st.button("Submit Requirements"): |
| | requirements = [req.strip() for req in requirements_input.split("\n") if req.strip()] |
| | st.session_state.requirements = requirements |
| | if not requirements: |
| | st.error("Please enter at least one requirement.") |
| | else: |
| | st.success("Project details saved successfully!") |
| | st.subheader("Project Details:") |
| | st.write(f"**Project Name:** {st.session_state.project_name}") |
| | st.subheader("Requirements:") |
| | for req in requirements: |
| | st.write(req) |
| | graph_response = graph_executor.generate_stories(st.session_state.task_id, requirements) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.GENERATE_USER_STORIES |
| | st.rerun() |
| |
|
| | |
| | with tabs[1]: |
| | st.header("User Stories") |
| | if "user_stories" in st.session_state.state: |
| | user_story_list = st.session_state.state["user_stories"] |
| | st.divider() |
| | st.subheader("Generated User Stories") |
| | if isinstance(user_story_list, UserStoryList): |
| | for story in user_story_list.user_stories: |
| | unique_id = f"US-{story.id:03}" |
| | with st.container(): |
| | st.markdown(f"#### {story.title} ({unique_id})") |
| | st.write(f"**Priority:** {story.priority}") |
| | st.write(f"**Description:** {story.description}") |
| | st.write(f"**Acceptance Criteria:**") |
| | st.markdown(story.acceptance_criteria.replace("\n", "<br>"), unsafe_allow_html=True) |
| | st.divider() |
| |
|
| | |
| | if st.session_state.stage == const.GENERATE_USER_STORIES: |
| | st.subheader("Review User Stories") |
| | feedback_text = st.text_area("Provide feedback for improving the user stories (optional):") |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | if st.button("✅ Approve User Stories"): |
| | st.success("✅ User stories approved.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="approved", feedback=None, review_type=const.REVIEW_USER_STORIES |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.CREATE_DESIGN_DOC |
| | |
| | |
| | |
| | |
| | |
| | with col2: |
| | if st.button("✍️ Give User Stories Feedback"): |
| | if not feedback_text.strip(): |
| | st.warning("⚠️ Please enter feedback before submitting.") |
| | else: |
| | st.info("🔄 Sending feedback to revise user stories.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="feedback", feedback=feedback_text.strip(),review_type=const.REVIEW_USER_STORIES |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.GENERATE_USER_STORIES |
| | st.rerun() |
| | else: |
| | st.info("User stories generation pending or not reached yet.") |
| |
|
| | |
| | with tabs[2]: |
| | st.header("Design Documents") |
| | if st.session_state.stage == const.CREATE_DESIGN_DOC: |
| | |
| | graph_response = graph_executor.get_updated_state(st.session_state.task_id) |
| | st.session_state.state = graph_response["state"] |
| | |
| | if "design_documents" in st.session_state.state: |
| | design_doc = st.session_state.state["design_documents"] |
| | st.subheader("Functional Design Document") |
| | st.markdown(design_doc.get("functional", "No functional design document available.")) |
| | st.subheader("Technical Design Document") |
| | st.markdown(design_doc.get("technical", "No technical design document available.")) |
| | |
| | |
| | st.divider() |
| | st.subheader("Review Design Documents") |
| | feedback_text = st.text_area("Provide feedback for improving the design documents (optional):") |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | if st.button("✅ Approve Design Documents"): |
| | st.success("✅ Design documents approved.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="approved", feedback=None, review_type=const.REVIEW_DESIGN_DOCUMENTS |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.CODE_GENERATION |
| | |
| | with col2: |
| | if st.button("✍️ Give Design Documents Feedback"): |
| | if not feedback_text.strip(): |
| | st.warning("⚠️ Please enter feedback before submitting.") |
| | else: |
| | st.info("🔄 Sending feedback to revise design documents.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="feedback", feedback=feedback_text.strip(),review_type=const.REVIEW_DESIGN_DOCUMENTS |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.CREATE_DESIGN_DOC |
| | st.rerun() |
| | |
| | else: |
| | st.info("Design document generation pending or not reached yet.") |
| |
|
| | |
| | with tabs[3]: |
| | st.header("Code Genearation") |
| | if st.session_state.stage in [const.CODE_GENERATION, const.SECURITY_REVIEW]: |
| | |
| | graph_response = graph_executor.get_updated_state(st.session_state.task_id) |
| | st.session_state.state = graph_response["state"] |
| | |
| | if "code_generated" in st.session_state.state: |
| | code_generated = st.session_state.state["code_generated"] |
| | st.subheader("Code Files") |
| | st.markdown(code_generated) |
| | st.divider() |
| | |
| | if st.session_state.stage == const.CODE_GENERATION: |
| | review_type = const.REVIEW_CODE |
| | elif st.session_state.stage == const.SECURITY_REVIEW: |
| | if "security_recommendations" in st.session_state.state: |
| | security_recommendations = st.session_state.state["security_recommendations"] |
| | st.subheader("Security Recommendations") |
| | st.markdown(security_recommendations) |
| | review_type = const.REVIEW_SECURITY_RECOMMENDATIONS |
| | |
| | |
| | st.divider() |
| | st.subheader("Review Details") |
| | |
| | if st.session_state.stage == const.CODE_GENERATION: |
| | feedback_text = st.text_area("Provide feedback (optional):") |
| | |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | if st.button("✅ Approve Code"): |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="approved", feedback=None, review_type=review_type |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | if st.session_state.stage == const.CODE_GENERATION: |
| | st.session_state.stage = const.SECURITY_REVIEW |
| | st.rerun() |
| | elif st.session_state.stage == const.SECURITY_REVIEW: |
| | st.session_state.stage = const.WRITE_TEST_CASES |
| | |
| | with col2: |
| | if st.session_state.stage == const.SECURITY_REVIEW: |
| | if st.button("✍️ Implment Security Recommendations"): |
| | st.info("🔄 Sending feedback to revise code generation.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="feedback", feedback=None, review_type=review_type |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.CODE_GENERATION |
| | st.rerun() |
| | else: |
| | if st.button("✍️ Give Feedback"): |
| | if not feedback_text.strip(): |
| | st.warning("⚠️ Please enter feedback before submitting.") |
| | else: |
| | st.info("🔄 Sending feedback to revise code generation.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="feedback", feedback=feedback_text.strip(),review_type=review_type |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.CODE_GENERATION |
| | st.rerun() |
| | |
| | else: |
| | st.info("Code generation pending or not reached yet.") |
| | |
| | |
| | with tabs[4]: |
| | st.header("Test Cases") |
| | if st.session_state.stage == const.WRITE_TEST_CASES: |
| | |
| | graph_response = graph_executor.get_updated_state(st.session_state.task_id) |
| | st.session_state.state = graph_response["state"] |
| | |
| | if "test_cases" in st.session_state.state: |
| | test_cases = st.session_state.state["test_cases"] |
| | st.markdown(test_cases) |
| | |
| | |
| | st.divider() |
| | st.subheader("Review Test Cases") |
| | feedback_text = st.text_area("Provide feedback for improving the test cases (optional):") |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | if st.button("✅ Approve Test Cases"): |
| | st.success("✅ Test cases approved.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="approved", feedback=None, review_type=const.REVIEW_TEST_CASES |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.QA_TESTING |
| | |
| | with col2: |
| | if st.button("✍️ Give Test Cases Feedback"): |
| | if not feedback_text.strip(): |
| | st.warning("⚠️ Please enter feedback before submitting.") |
| | else: |
| | st.info("🔄 Sending feedback to revise test cases.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="feedback", feedback=feedback_text.strip(),review_type=const.REVIEW_TEST_CASES |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.WRITE_TEST_CASES |
| | st.rerun() |
| | |
| | else: |
| | st.info("Test Cases generation pending or not reached yet.") |
| | |
| | |
| | with tabs[5]: |
| | st.header("QA Testing") |
| | if st.session_state.stage == const.QA_TESTING: |
| | |
| | graph_response = graph_executor.get_updated_state(st.session_state.task_id) |
| | st.session_state.state = graph_response["state"] |
| | |
| | if "qa_testing_comments" in st.session_state.state: |
| | qa_testing = st.session_state.state["qa_testing_comments"] |
| | st.markdown(qa_testing) |
| | |
| | |
| | st.divider() |
| | st.subheader("Review QA Testing Comments") |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | if st.button("✅ Approve Testing"): |
| | st.success("✅ QA Testing approved.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="approved", feedback=None, review_type=const.REVIEW_QA_TESTING |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.DEPLOYMENT |
| | |
| | with col2: |
| | if st.button("✍️ Fix testing issues"): |
| | st.info("🔄 Sending feedback to revise code.") |
| | graph_response = graph_executor.graph_review_flow( |
| | st.session_state.task_id, status="feedback", feedback=feedback_text.strip(),review_type=const.REVIEW_QA_TESTING |
| | ) |
| | st.session_state.state = graph_response["state"] |
| | st.session_state.stage = const.CODE_GENERATION |
| | st.rerun() |
| | |
| | else: |
| | st.info("QA Testing Report generation pending or not reached yet.") |
| | |
| | |
| | with tabs[6]: |
| | st.header("Deployment") |
| | if st.session_state.stage == const.DEPLOYMENT: |
| | |
| | graph_response = graph_executor.get_updated_state(st.session_state.task_id) |
| | st.session_state.state = graph_response["state"] |
| | |
| | if "deployment_feedback" in st.session_state.state: |
| | deployment_feedback = st.session_state.state["deployment_feedback"] |
| | st.markdown(deployment_feedback) |
| | st.session_state.stage = const.ARTIFACTS |
| | |
| | else: |
| | st.info("Deplopment verification pending or not reached yet.") |
| | |
| | |
| | with tabs[7]: |
| | st.header("Artifacts") |
| | if "artifacts" in st.session_state.state and st.session_state.state["artifacts"]: |
| | st.subheader("Download Artifacts") |
| | for artifact_name, artifact_path in st.session_state.state["artifacts"].items(): |
| | if artifact_path: |
| | try: |
| | with open(artifact_path, "rb") as f: |
| | file_bytes = f.read() |
| | st.download_button( |
| | label=f"Download {artifact_name}", |
| | data=file_bytes, |
| | file_name=os.path.basename(artifact_path), |
| | mime="application/octet-stream" |
| | ) |
| | except Exception as e: |
| | st.error(f"Error reading {artifact_name}: {e}") |
| | else: |
| | st.info(f"{artifact_name} not available.") |
| | else: |
| | st.info("No artifacts generated yet.") |
| |
|
| | except Exception as e: |
| | raise ValueError(f"Error occured with Exception : {e}") |
| | |