|
|
""" |
|
|
Developer Productivity Agent |
|
|
RAG-based system using Pinecone for vector storage and GPT-4o-mini. |
|
|
|
|
|
Features: |
|
|
- Pinecone vector database (2GB free tier) |
|
|
- Divided LLM Architecture for cost optimization |
|
|
- Real-time cost tracking and analytics |
|
|
- OpenAI embeddings (text-embedding-3-small) |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any, Optional |
|
|
import hashlib |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel |
|
|
import uvicorn |
|
|
|
|
|
|
|
|
from pinecone import Pinecone, ServerlessSpec |
|
|
|
|
|
|
|
|
from openai import OpenAI |
|
|
|
|
|
|
|
|
import ast |
|
|
import re |
|
|
from dataclasses import dataclass, field |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Config: |
|
|
"""Application configuration""" |
|
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") |
|
|
|
|
|
|
|
|
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "") |
|
|
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "codebase-index") |
|
|
PINECONE_CLOUD = "aws" |
|
|
PINECONE_REGION = "us-east-1" |
|
|
|
|
|
|
|
|
ARCHITECT_MODEL = "gpt-4o-mini" |
|
|
DEVELOPER_MODEL = "gpt-4o-mini" |
|
|
EMBEDDING_MODEL = "text-embedding-3-small" |
|
|
EMBEDDING_DIM = 1536 |
|
|
|
|
|
|
|
|
CHUNK_SIZE = 1500 |
|
|
CHUNK_OVERLAP = 200 |
|
|
TOP_K_RESULTS = 10 |
|
|
|
|
|
|
|
|
COST_GPT4O_MINI_INPUT = 0.15 |
|
|
COST_GPT4O_MINI_OUTPUT = 0.60 |
|
|
COST_EMBEDDING = 0.02 |
|
|
COST_GPT4_INPUT = 30.0 |
|
|
COST_GPT4_OUTPUT = 60.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CostTracker: |
|
|
"""Tracks API costs and calculates savings""" |
|
|
|
|
|
def __init__(self): |
|
|
self.reset() |
|
|
|
|
|
def reset(self): |
|
|
"""Reset all counters""" |
|
|
self.embedding_tokens = 0 |
|
|
self.architect_input_tokens = 0 |
|
|
self.architect_output_tokens = 0 |
|
|
self.developer_input_tokens = 0 |
|
|
self.developer_output_tokens = 0 |
|
|
self.api_calls = 0 |
|
|
self.tickets_processed = 0 |
|
|
self.questions_answered = 0 |
|
|
self.start_time = datetime.now() |
|
|
self.history = [] |
|
|
|
|
|
def add_embedding(self, tokens: int): |
|
|
"""Track embedding tokens""" |
|
|
self.embedding_tokens += tokens |
|
|
self.api_calls += 1 |
|
|
|
|
|
def add_architect_call(self, input_tokens: int, output_tokens: int): |
|
|
"""Track architect LLM call""" |
|
|
self.architect_input_tokens += input_tokens |
|
|
self.architect_output_tokens += output_tokens |
|
|
self.api_calls += 1 |
|
|
|
|
|
def add_developer_call(self, input_tokens: int, output_tokens: int): |
|
|
"""Track developer LLM call""" |
|
|
self.developer_input_tokens += input_tokens |
|
|
self.developer_output_tokens += output_tokens |
|
|
self.api_calls += 1 |
|
|
|
|
|
def record_ticket(self): |
|
|
"""Record a processed ticket""" |
|
|
self.tickets_processed += 1 |
|
|
self._add_to_history("ticket") |
|
|
|
|
|
def record_question(self): |
|
|
"""Record an answered question""" |
|
|
self.questions_answered += 1 |
|
|
self._add_to_history("question") |
|
|
|
|
|
def _add_to_history(self, event_type: str): |
|
|
"""Add event to history""" |
|
|
self.history.append({ |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"type": event_type, |
|
|
"cumulative_cost": self.get_actual_cost(), |
|
|
"cumulative_savings": self.get_savings() |
|
|
}) |
|
|
|
|
|
def get_actual_cost(self) -> float: |
|
|
"""Calculate actual cost with our approach""" |
|
|
config = Config() |
|
|
|
|
|
embedding_cost = (self.embedding_tokens / 1_000_000) * config.COST_EMBEDDING |
|
|
architect_cost = ( |
|
|
(self.architect_input_tokens / 1_000_000) * config.COST_GPT4O_MINI_INPUT + |
|
|
(self.architect_output_tokens / 1_000_000) * config.COST_GPT4O_MINI_OUTPUT |
|
|
) |
|
|
developer_cost = ( |
|
|
(self.developer_input_tokens / 1_000_000) * config.COST_GPT4O_MINI_INPUT + |
|
|
(self.developer_output_tokens / 1_000_000) * config.COST_GPT4O_MINI_OUTPUT |
|
|
) |
|
|
|
|
|
return embedding_cost + architect_cost + developer_cost |
|
|
|
|
|
def get_traditional_cost(self) -> float: |
|
|
"""Calculate what it would cost with traditional GPT-4 approach""" |
|
|
config = Config() |
|
|
|
|
|
|
|
|
total_input = self.architect_input_tokens + self.developer_input_tokens |
|
|
total_output = self.architect_output_tokens + self.developer_output_tokens |
|
|
|
|
|
return ( |
|
|
(total_input / 1_000_000) * config.COST_GPT4_INPUT + |
|
|
(total_output / 1_000_000) * config.COST_GPT4_OUTPUT |
|
|
) |
|
|
|
|
|
def get_savings(self) -> float: |
|
|
"""Calculate cost savings""" |
|
|
return self.get_traditional_cost() - self.get_actual_cost() |
|
|
|
|
|
def get_savings_percentage(self) -> float: |
|
|
"""Calculate savings as percentage""" |
|
|
traditional = self.get_traditional_cost() |
|
|
if traditional == 0: |
|
|
return 0 |
|
|
return ((traditional - self.get_actual_cost()) / traditional) * 100 |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive statistics""" |
|
|
return { |
|
|
"actual_cost": round(self.get_actual_cost(), 6), |
|
|
"traditional_cost": round(self.get_traditional_cost(), 6), |
|
|
"savings": round(self.get_savings(), 6), |
|
|
"savings_percentage": round(self.get_savings_percentage(), 2), |
|
|
"total_tokens": { |
|
|
"embedding": self.embedding_tokens, |
|
|
"architect_input": self.architect_input_tokens, |
|
|
"architect_output": self.architect_output_tokens, |
|
|
"developer_input": self.developer_input_tokens, |
|
|
"developer_output": self.developer_output_tokens, |
|
|
"total": (self.embedding_tokens + self.architect_input_tokens + |
|
|
self.architect_output_tokens + self.developer_input_tokens + |
|
|
self.developer_output_tokens) |
|
|
}, |
|
|
"api_calls": self.api_calls, |
|
|
"tickets_processed": self.tickets_processed, |
|
|
"questions_answered": self.questions_answered, |
|
|
"session_duration_minutes": round((datetime.now() - self.start_time).seconds / 60, 2), |
|
|
"cost_per_ticket": round(self.get_actual_cost() / max(self.tickets_processed, 1), 6), |
|
|
"history": self.history[-50:] |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
cost_tracker = CostTracker() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class JiraTicket(BaseModel): |
|
|
ticket_id: str |
|
|
title: str |
|
|
description: str |
|
|
acceptance_criteria: Optional[str] = None |
|
|
labels: Optional[List[str]] = None |
|
|
|
|
|
class ImplementationPlan(BaseModel): |
|
|
ticket_summary: str |
|
|
key_entities: List[str] |
|
|
relevant_files: List[Dict[str, str]] |
|
|
implementation_steps: List[str] |
|
|
prerequisites: List[str] |
|
|
boilerplate_code: Dict[str, str] |
|
|
architecture_notes: str |
|
|
estimated_complexity: str |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CodebaseIndexer: |
|
|
"""Indexes codebase into Pinecone vector database""" |
|
|
|
|
|
def __init__(self, config: Config): |
|
|
self.config = config |
|
|
self._openai_client = None |
|
|
self._pinecone_client = None |
|
|
self._index = None |
|
|
|
|
|
@property |
|
|
def openai_client(self): |
|
|
if self._openai_client is None: |
|
|
if not self.config.OPENAI_API_KEY: |
|
|
raise ValueError("OpenAI API key required") |
|
|
self._openai_client = OpenAI(api_key=self.config.OPENAI_API_KEY) |
|
|
return self._openai_client |
|
|
|
|
|
@property |
|
|
def index(self): |
|
|
if self._index is None: |
|
|
if not self.config.PINECONE_API_KEY: |
|
|
raise ValueError("Pinecone API key required") |
|
|
|
|
|
try: |
|
|
|
|
|
pc = Pinecone(api_key=self.config.PINECONE_API_KEY) |
|
|
|
|
|
|
|
|
existing_indexes = pc.list_indexes() |
|
|
index_names = [idx.name for idx in existing_indexes] if hasattr(existing_indexes, '__iter__') else [] |
|
|
|
|
|
if self.config.PINECONE_INDEX_NAME not in index_names: |
|
|
pc.create_index( |
|
|
name=self.config.PINECONE_INDEX_NAME, |
|
|
dimension=self.config.EMBEDDING_DIM, |
|
|
metric="cosine", |
|
|
spec=ServerlessSpec( |
|
|
cloud=self.config.PINECONE_CLOUD, |
|
|
region=self.config.PINECONE_REGION |
|
|
) |
|
|
) |
|
|
|
|
|
print(f"⏳ Waiting for index to be ready...") |
|
|
time.sleep(10) |
|
|
|
|
|
self._index = pc.Index(self.config.PINECONE_INDEX_NAME) |
|
|
print(f"📂 Pinecone index ready: {self.config.PINECONE_INDEX_NAME}") |
|
|
except Exception as e: |
|
|
print(f"❌ Pinecone initialization error: {str(e)}") |
|
|
raise ValueError(f"Failed to initialize Pinecone: {str(e)}") |
|
|
|
|
|
return self._index |
|
|
|
|
|
def _get_embedding(self, text: str) -> List[float]: |
|
|
"""Get embedding and track cost""" |
|
|
|
|
|
tokens = len(text) // 4 |
|
|
cost_tracker.add_embedding(tokens) |
|
|
|
|
|
response = self.openai_client.embeddings.create( |
|
|
model=self.config.EMBEDDING_MODEL, |
|
|
input=text |
|
|
) |
|
|
return response.data[0].embedding |
|
|
|
|
|
def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]: |
|
|
"""Batch embeddings with cost tracking""" |
|
|
if not texts: |
|
|
return [] |
|
|
|
|
|
tokens = sum(len(t) // 4 for t in texts) |
|
|
cost_tracker.add_embedding(tokens) |
|
|
|
|
|
response = self.openai_client.embeddings.create( |
|
|
model=self.config.EMBEDDING_MODEL, |
|
|
input=texts |
|
|
) |
|
|
return [item.embedding for item in response.data] |
|
|
|
|
|
def _detect_language(self, file_path: str) -> str: |
|
|
ext_map = { |
|
|
'.py': 'python', '.js': 'javascript', '.jsx': 'javascript', |
|
|
'.ts': 'typescript', '.tsx': 'typescript', '.java': 'java', |
|
|
'.go': 'go', '.rs': 'rust', '.cpp': 'cpp', '.c': 'c', |
|
|
} |
|
|
return ext_map.get(Path(file_path).suffix.lower(), 'unknown') |
|
|
|
|
|
def _chunk_content(self, content: str, file_path: str) -> List[Dict[str, Any]]: |
|
|
"""Chunk content with overlap""" |
|
|
chunks = [] |
|
|
lines = content.split('\n') |
|
|
chunk_lines = self.config.CHUNK_SIZE // 50 |
|
|
overlap_lines = self.config.CHUNK_OVERLAP // 50 |
|
|
|
|
|
i = 0 |
|
|
chunk_idx = 0 |
|
|
while i < len(lines): |
|
|
end = min(i + chunk_lines, len(lines)) |
|
|
chunk_content = '\n'.join(lines[i:end]) |
|
|
|
|
|
if chunk_content.strip(): |
|
|
chunks.append({ |
|
|
'content': chunk_content, |
|
|
'file_path': file_path, |
|
|
'chunk_index': chunk_idx, |
|
|
'line_start': i + 1, |
|
|
'line_end': end, |
|
|
'language': self._detect_language(file_path) |
|
|
}) |
|
|
|
|
|
i = end - overlap_lines if end < len(lines) else end |
|
|
chunk_idx += 1 |
|
|
|
|
|
return chunks |
|
|
|
|
|
def index_file(self, file_path: str, content: str) -> int: |
|
|
"""Index a single file into Pinecone""" |
|
|
chunks = self._chunk_content(content, file_path) |
|
|
|
|
|
if not chunks: |
|
|
return 0 |
|
|
|
|
|
|
|
|
texts = [c['content'] for c in chunks] |
|
|
embeddings = self._get_embeddings_batch(texts) |
|
|
|
|
|
|
|
|
vectors = [] |
|
|
for i, chunk in enumerate(chunks): |
|
|
vector_id = hashlib.md5( |
|
|
f"{file_path}_{chunk['chunk_index']}".encode() |
|
|
).hexdigest() |
|
|
|
|
|
vectors.append({ |
|
|
"id": vector_id, |
|
|
"values": embeddings[i], |
|
|
"metadata": { |
|
|
"file_path": file_path, |
|
|
"chunk_index": chunk['chunk_index'], |
|
|
"language": chunk['language'], |
|
|
"line_start": chunk['line_start'], |
|
|
"line_end": chunk['line_end'], |
|
|
"content": chunk['content'][:1000] |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
self.index.upsert(vectors=vectors) |
|
|
|
|
|
return len(chunks) |
|
|
|
|
|
def index_directory(self, directory_path: str, extensions: List[str] = None) -> Dict[str, int]: |
|
|
"""Index all files in a directory""" |
|
|
if extensions is None: |
|
|
extensions = ['.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.go'] |
|
|
|
|
|
results = {} |
|
|
directory = Path(directory_path) |
|
|
|
|
|
for ext in extensions: |
|
|
for file_path in directory.rglob(f"*{ext}"): |
|
|
if any(skip in str(file_path) for skip in ['node_modules', '__pycache__', '.git', 'venv']): |
|
|
continue |
|
|
|
|
|
try: |
|
|
content = file_path.read_text(encoding='utf-8') |
|
|
chunks = self.index_file(str(file_path), content) |
|
|
results[str(file_path)] = chunks |
|
|
print(f" ✅ {file_path.name}: {chunks} chunks") |
|
|
except Exception as e: |
|
|
results[str(file_path)] = f"Error: {e}" |
|
|
|
|
|
return results |
|
|
|
|
|
def search(self, query: str, top_k: int = None) -> List[Dict[str, Any]]: |
|
|
"""Search codebase""" |
|
|
if top_k is None: |
|
|
top_k = self.config.TOP_K_RESULTS |
|
|
|
|
|
query_embedding = self._get_embedding(query) |
|
|
|
|
|
results = self.index.query( |
|
|
vector=query_embedding, |
|
|
top_k=top_k, |
|
|
include_metadata=True |
|
|
) |
|
|
|
|
|
formatted = [] |
|
|
for match in results.matches: |
|
|
formatted.append({ |
|
|
'content': match.metadata.get('content', ''), |
|
|
'metadata': { |
|
|
'file_path': match.metadata.get('file_path', ''), |
|
|
'line_start': match.metadata.get('line_start', 0), |
|
|
'line_end': match.metadata.get('line_end', 0), |
|
|
'language': match.metadata.get('language', '') |
|
|
}, |
|
|
'score': match.score |
|
|
}) |
|
|
|
|
|
return formatted |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Get index statistics""" |
|
|
try: |
|
|
stats = self.index.describe_index_stats() |
|
|
return { |
|
|
'total_chunks': stats.total_vector_count, |
|
|
'index_name': self.config.PINECONE_INDEX_NAME, |
|
|
'dimension': stats.dimension |
|
|
} |
|
|
except: |
|
|
return {'total_chunks': 0, 'index_name': self.config.PINECONE_INDEX_NAME} |
|
|
|
|
|
def clear_index(self): |
|
|
"""Clear all vectors""" |
|
|
try: |
|
|
self.index.delete(delete_all=True) |
|
|
print("⚠️ Index cleared!") |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ArchitectLLM: |
|
|
"""LLM #1: Architect - planning and analysis""" |
|
|
|
|
|
def __init__(self, config: Config): |
|
|
self.config = config |
|
|
self._client = None |
|
|
self.model = config.ARCHITECT_MODEL |
|
|
|
|
|
@property |
|
|
def client(self): |
|
|
if self._client is None: |
|
|
if not self.config.OPENAI_API_KEY: |
|
|
raise ValueError("OpenAI API key not set!") |
|
|
self._client = OpenAI(api_key=self.config.OPENAI_API_KEY) |
|
|
return self._client |
|
|
|
|
|
def reset_client(self): |
|
|
self._client = None |
|
|
|
|
|
def analyze_ticket(self, ticket: JiraTicket) -> Dict[str, Any]: |
|
|
prompt = f"""Analyze this Jira ticket for implementation: |
|
|
|
|
|
ID: {ticket.ticket_id} |
|
|
Title: {ticket.title} |
|
|
Description: {ticket.description} |
|
|
Acceptance Criteria: {ticket.acceptance_criteria or 'Not specified'} |
|
|
|
|
|
Provide JSON: |
|
|
{{ |
|
|
"summary": "2-3 sentence summary", |
|
|
"key_entities": ["entity1", "entity2"], |
|
|
"technical_keywords": ["keyword1", "keyword2"], |
|
|
"prerequisites": ["prereq1"], |
|
|
"complexity": "Low/Medium/High", |
|
|
"complexity_reason": "why", |
|
|
"risks": ["risk1"] |
|
|
}}""" |
|
|
|
|
|
response = self.client.chat.completions.create( |
|
|
model=self.model, |
|
|
messages=[{"role": "user", "content": prompt}], |
|
|
temperature=0.3 |
|
|
) |
|
|
|
|
|
|
|
|
usage = response.usage |
|
|
cost_tracker.add_architect_call(usage.prompt_tokens, usage.completion_tokens) |
|
|
|
|
|
content = response.choices[0].message.content |
|
|
try: |
|
|
content = re.sub(r'^```json?\s*', '', content.strip()) |
|
|
content = re.sub(r'\s*```$', '', content) |
|
|
return json.loads(content) |
|
|
except: |
|
|
return {"summary": content, "key_entities": [], "technical_keywords": [], |
|
|
"prerequisites": [], "complexity": "Unknown", "complexity_reason": "", "risks": []} |
|
|
|
|
|
def create_implementation_strategy(self, ticket_analysis: Dict, code_context: List[Dict]) -> Dict: |
|
|
context_str = "\n".join([ |
|
|
f"File: {c['metadata'].get('file_path', '?')}\n{c['content'][:500]}" |
|
|
for c in code_context[:5] |
|
|
]) |
|
|
|
|
|
prompt = f"""Create implementation strategy: |
|
|
|
|
|
Analysis: {json.dumps(ticket_analysis)} |
|
|
|
|
|
Code Context: |
|
|
{context_str} |
|
|
|
|
|
Provide JSON: |
|
|
{{ |
|
|
"architecture_notes": "how it fits", |
|
|
"implementation_steps": ["step1", "step2"], |
|
|
"files_to_modify": [{{"path": "file", "action": "modify/create", "reason": "why"}}], |
|
|
"patterns_to_follow": ["pattern1"], |
|
|
"integration_points": ["point1"] |
|
|
}}""" |
|
|
|
|
|
response = self.client.chat.completions.create( |
|
|
model=self.model, |
|
|
messages=[{"role": "user", "content": prompt}], |
|
|
temperature=0.3 |
|
|
) |
|
|
|
|
|
usage = response.usage |
|
|
cost_tracker.add_architect_call(usage.prompt_tokens, usage.completion_tokens) |
|
|
|
|
|
content = response.choices[0].message.content |
|
|
try: |
|
|
content = re.sub(r'^```json?\s*', '', content.strip()) |
|
|
content = re.sub(r'\s*```$', '', content) |
|
|
return json.loads(content) |
|
|
except: |
|
|
return {"architecture_notes": content, "implementation_steps": [], |
|
|
"files_to_modify": [], "patterns_to_follow": [], "integration_points": []} |
|
|
|
|
|
|
|
|
class DeveloperLLM: |
|
|
"""LLM #2: Developer - code generation""" |
|
|
|
|
|
def __init__(self, config: Config): |
|
|
self.config = config |
|
|
self._client = None |
|
|
self.model = config.DEVELOPER_MODEL |
|
|
|
|
|
@property |
|
|
def client(self): |
|
|
if self._client is None: |
|
|
if not self.config.OPENAI_API_KEY: |
|
|
raise ValueError("OpenAI API key not set!") |
|
|
self._client = OpenAI(api_key=self.config.OPENAI_API_KEY) |
|
|
return self._client |
|
|
|
|
|
def reset_client(self): |
|
|
self._client = None |
|
|
|
|
|
def generate_boilerplate(self, ticket_analysis: Dict, strategy: Dict, code_context: List[Dict]) -> Dict[str, str]: |
|
|
context_str = "\n".join([f"// {c['metadata'].get('file_path', '?')}\n{c['content'][:400]}" |
|
|
for c in code_context[:3]]) |
|
|
|
|
|
prompt = f"""Generate boilerplate code: |
|
|
|
|
|
Summary: {ticket_analysis.get('summary', '')} |
|
|
Entities: {ticket_analysis.get('key_entities', [])} |
|
|
Steps: {strategy.get('implementation_steps', [])} |
|
|
|
|
|
Existing patterns: |
|
|
{context_str} |
|
|
|
|
|
Respond with JSON where keys are file paths: |
|
|
{{"path/file.py": "# code with TODOs"}}""" |
|
|
|
|
|
response = self.client.chat.completions.create( |
|
|
model=self.model, |
|
|
messages=[{"role": "user", "content": prompt}], |
|
|
temperature=0.2 |
|
|
) |
|
|
|
|
|
usage = response.usage |
|
|
cost_tracker.add_developer_call(usage.prompt_tokens, usage.completion_tokens) |
|
|
|
|
|
content = response.choices[0].message.content |
|
|
try: |
|
|
content = re.sub(r'^```json?\s*', '', content.strip()) |
|
|
content = re.sub(r'\s*```$', '', content) |
|
|
return json.loads(content) |
|
|
except: |
|
|
return {"generated_code.txt": content} |
|
|
|
|
|
def explain_code_context(self, code_context: List[Dict], question: str) -> str: |
|
|
context_str = "\n".join([f"File: {c['metadata'].get('file_path', '?')}\n{c['content']}" |
|
|
for c in code_context[:5]]) |
|
|
|
|
|
prompt = f"""Explain this code: |
|
|
|
|
|
{context_str} |
|
|
|
|
|
Question: {question} |
|
|
|
|
|
Be concise and helpful.""" |
|
|
|
|
|
response = self.client.chat.completions.create( |
|
|
model=self.model, |
|
|
messages=[{"role": "user", "content": prompt}], |
|
|
temperature=0.3 |
|
|
) |
|
|
|
|
|
usage = response.usage |
|
|
cost_tracker.add_developer_call(usage.prompt_tokens, usage.completion_tokens) |
|
|
|
|
|
return response.choices[0].message.content |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DevProductivityAgent: |
|
|
"""Main orchestrator with Pinecone and cost tracking""" |
|
|
|
|
|
def __init__(self, config: Config = None): |
|
|
self.config = config or Config() |
|
|
self.indexer = CodebaseIndexer(self.config) |
|
|
self.architect = ArchitectLLM(self.config) |
|
|
self.developer = DeveloperLLM(self.config) |
|
|
|
|
|
def set_api_keys(self, openai_key: str = None, pinecone_key: str = None): |
|
|
"""Set API keys""" |
|
|
if openai_key: |
|
|
self.config.OPENAI_API_KEY = openai_key |
|
|
self.architect.reset_client() |
|
|
self.developer.reset_client() |
|
|
self.indexer._openai_client = None |
|
|
if pinecone_key: |
|
|
self.config.PINECONE_API_KEY = pinecone_key |
|
|
self.indexer._index = None |
|
|
|
|
|
def index_codebase(self, directory: str, extensions: List[str] = None) -> Dict: |
|
|
print(f"📂 Indexing: {directory}") |
|
|
results = self.indexer.index_directory(directory, extensions) |
|
|
stats = self.indexer.get_stats() |
|
|
return { |
|
|
"files_indexed": len([r for r in results.values() if isinstance(r, int)]), |
|
|
"total_chunks": stats['total_chunks'], |
|
|
"details": results |
|
|
} |
|
|
|
|
|
def process_ticket(self, ticket: JiraTicket) -> ImplementationPlan: |
|
|
print("📋 Analyzing...") |
|
|
analysis = self.architect.analyze_ticket(ticket) |
|
|
|
|
|
print("🔍 Searching...") |
|
|
queries = analysis.get('technical_keywords', []) + analysis.get('key_entities', []) |
|
|
|
|
|
all_results = [] |
|
|
seen = set() |
|
|
for q in queries[:5]: |
|
|
for r in self.indexer.search(q, top_k=5): |
|
|
fp = r['metadata'].get('file_path', '') |
|
|
if fp not in seen: |
|
|
all_results.append(r) |
|
|
seen.add(fp) |
|
|
|
|
|
print("📐 Planning...") |
|
|
strategy = self.architect.create_implementation_strategy(analysis, all_results) |
|
|
|
|
|
print("💻 Generating...") |
|
|
code = self.developer.generate_boilerplate(analysis, strategy, all_results) |
|
|
|
|
|
cost_tracker.record_ticket() |
|
|
|
|
|
return ImplementationPlan( |
|
|
ticket_summary=analysis.get('summary', ''), |
|
|
key_entities=analysis.get('key_entities', []), |
|
|
relevant_files=[{ |
|
|
'path': r['metadata'].get('file_path', ''), |
|
|
'relevance': f"Lines {r['metadata'].get('line_start', '?')}-{r['metadata'].get('line_end', '?')}", |
|
|
'preview': r['content'][:200] |
|
|
} for r in all_results[:10]], |
|
|
implementation_steps=strategy.get('implementation_steps', []), |
|
|
prerequisites=analysis.get('prerequisites', []), |
|
|
boilerplate_code=code, |
|
|
architecture_notes=strategy.get('architecture_notes', ''), |
|
|
estimated_complexity=analysis.get('complexity', 'Unknown') |
|
|
) |
|
|
|
|
|
def ask_about_code(self, question: str) -> str: |
|
|
results = self.indexer.search(question) |
|
|
if not results: |
|
|
return "No relevant code found. Index your codebase first." |
|
|
answer = self.developer.explain_code_context(results, question) |
|
|
cost_tracker.record_question() |
|
|
return answer |
|
|
|
|
|
def get_cost_stats(self) -> Dict: |
|
|
return cost_tracker.get_stats() |
|
|
|
|
|
def reset_cost_tracking(self): |
|
|
cost_tracker.reset() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI(title="Developer Productivity Agent", version="2.0.0") |
|
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, |
|
|
allow_methods=["*"], allow_headers=["*"]) |
|
|
|
|
|
agent = DevProductivityAgent() |
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
stats = agent.indexer.get_stats() |
|
|
return {"status": "healthy", "vector_db": "Pinecone", "chunks": stats['total_chunks']} |
|
|
|
|
|
@app.get("/stats") |
|
|
async def get_stats(): |
|
|
return agent.indexer.get_stats() |
|
|
|
|
|
@app.get("/cost-analytics") |
|
|
async def get_cost_analytics(): |
|
|
"""Get cost analytics and savings""" |
|
|
return agent.get_cost_stats() |
|
|
|
|
|
@app.post("/reset-costs") |
|
|
async def reset_costs(): |
|
|
agent.reset_cost_tracking() |
|
|
return {"status": "reset"} |
|
|
|
|
|
@app.post("/index") |
|
|
async def index_codebase(directory: str, extensions: List[str] = None): |
|
|
try: |
|
|
return {"status": "success", "results": agent.index_codebase(directory, extensions)} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/process-ticket", response_model=ImplementationPlan) |
|
|
async def process_ticket(ticket: JiraTicket): |
|
|
try: |
|
|
return agent.process_ticket(ticket) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/ask") |
|
|
async def ask(question: str): |
|
|
try: |
|
|
return {"answer": agent.ask_about_code(question)} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/search") |
|
|
async def search(query: str, top_k: int = 10): |
|
|
try: |
|
|
return {"results": agent.indexer.search(query, top_k)} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.delete("/clear") |
|
|
async def clear(): |
|
|
agent.indexer.clear_index() |
|
|
return {"status": "cleared"} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse |
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument("--index", type=str) |
|
|
parser.add_argument("--serve", action="store_true") |
|
|
parser.add_argument("--port", type=int, default=8000) |
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.index: |
|
|
agent.index_codebase(args.index) |
|
|
if args.serve: |
|
|
uvicorn.run(app, host="0.0.0.0", port=args.port) |