File size: 6,195 Bytes
974e5e3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import os
from dotenv import load_dotenv
from functools import lru_cache
from src.dev_pilot.LLMS.groqllm import GroqLLM
from src.dev_pilot.LLMS.geminillm import GeminiLLM
from src.dev_pilot.graph.graph_builder import GraphBuilder
from src.dev_pilot.graph.graph_executor import GraphExecutor
from src.dev_pilot.dto.sdlc_request import SDLCRequest
from src.dev_pilot.dto.sdlc_response import SDLCResponse
import uvicorn
from contextlib import asynccontextmanager
from src.dev_pilot.utils.logging_config import setup_logging
from loguru import logger
## Setup logging level
setup_logging(log_level="DEBUG")
gemini_models = [
"gemini-2.0-flash",
"gemini-2.0-flash-lite",
"gemini-2.5-pro-exp-03-25"
]
groq_models = [
"gemma2-9b-it",
"llama3-8b-8192",
"llama3-70b-8192"
]
def load_app():
uvicorn.run(app, host="0.0.0.0", port=8000)
class Settings:
def __init__(self):
self.GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
self.GROQ_API_KEY = os.getenv("GROQ_API_KEY")
@lru_cache()
def get_settings():
return Settings()
def validate_api_keys(settings: Settings = Depends(get_settings)):
required_keys = {
'GEMINI_API_KEY': settings.GEMINI_API_KEY,
'GROQ_API_KEY': settings.GROQ_API_KEY
}
missing_keys = [key for key, value in required_keys.items() if not value]
if missing_keys:
raise HTTPException(
status_code=500,
detail=f"Missing required API keys: {', '.join(missing_keys)}"
)
return settings
# Initialize the LLM and GraphBuilder instances once and store them in the app state
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
llm = GeminiLLM(model=gemini_models[0], api_key=settings.GEMINI_API_KEY).get_llm_model()
graph_builder = GraphBuilder(llm=llm)
graph = graph_builder.setup_graph()
graph_executor = GraphExecutor(graph)
app.state.llm = llm
app.state.graph = graph
app.state.graph_executor = graph_executor
yield
# Clean up resources if needed
app.state.llm = None
app.state.graph = None
app.state.graph_executor = None
app = FastAPI(
title="DevPilot API",
description="AI-powered SDLC API using Langgraph",
version="1.0.0",
lifespan=lifespan
)
logger.info("Application starting up...")
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"message": "Welcome to DevPilot API",
"docs_url": "/docs",
"redoc_url": "/redoc"
}
@app.post("/api/v1/sdlc/start", response_model=SDLCResponse)
async def start_sdlc(
sdlc_request: SDLCRequest,
settings: Settings = Depends(validate_api_keys)
):
try:
graph_executor = app.state.graph_executor
if isinstance (graph_executor, GraphExecutor) == False:
raise Exception("Graph Executor not initialized")
graph_response = graph_executor.start_workflow(sdlc_request.project_name)
logger.debug(f"Start Workflow Response: {graph_response}")
return SDLCResponse(
status="success",
message="SDLC process started successfully",
task_id=graph_response["task_id"],
state=graph_response["state"]
)
except Exception as e:
error_response = SDLCResponse(
status="error",
message="Failed to start the process",
error=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
@app.post("/api/v1/sdlc/user_stories", response_model=SDLCResponse)
async def start_sdlc(
sdlc_request: SDLCRequest,
settings: Settings = Depends(validate_api_keys)
):
try:
graph_executor = app.state.graph_executor
if isinstance (graph_executor, GraphExecutor) == False:
raise Exception("Graph Executor not initialized")
graph_response = graph_executor.generate_stories(sdlc_request.task_id, sdlc_request.requirements)
logger.debug(f"Generate Stories Response: {graph_response}")
return SDLCResponse(
status="success",
message="User Stories generated successfully",
task_id=graph_response["task_id"],
state=graph_response["state"]
)
except Exception as e:
error_response = SDLCResponse(
status="error",
message="Failed to generate user stories",
error=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
@app.post("/api/v1/sdlc/progress_flow", response_model=SDLCResponse)
async def progress_sdlc(
sdlc_request: SDLCRequest,
settings: Settings = Depends(validate_api_keys)
):
try:
graph_executor = app.state.graph_executor
if isinstance (graph_executor, GraphExecutor) == False:
raise Exception("Graph Executor not initialized")
graph_response = graph_executor.graph_review_flow(
sdlc_request.task_id,
sdlc_request.status,
sdlc_request.feedback,
sdlc_request.next_node)
logger.debug(f"Flow Node: {sdlc_request.next_node}")
logger.debug(f"Progress Flow Response: {graph_response}")
return SDLCResponse(
status="success",
message="Flow progressed successfully to next step",
task_id=graph_response["task_id"],
state=graph_response["state"]
)
except Exception as e:
error_response = SDLCResponse(
status="error",
message="Failed to progress the flow",
error=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump()) |