THISverse / src /main.py
DD009's picture
Update src/main.py
ff9e1b0 verified
"""
Developer Productivity Agent
RAG-based system using Pinecone for vector storage and GPT-4o-mini.
Features:
- Pinecone vector database (2GB free tier)
- Divided LLM Architecture for cost optimization
- Real-time cost tracking and analytics
- OpenAI embeddings (text-embedding-3-small)
"""
import os
import json
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
import hashlib
from datetime import datetime
# Core dependencies
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
# Vector database - Pinecone
from pinecone import Pinecone, ServerlessSpec
# LLM client
from openai import OpenAI
# Code parsing
import ast
import re
from dataclasses import dataclass, field
# ============================================================================
# Configuration
# ============================================================================
class Config:
"""Application configuration"""
# OpenAI
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
# Pinecone
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "")
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "codebase-index")
PINECONE_CLOUD = "aws"
PINECONE_REGION = "us-east-1"
# Models
ARCHITECT_MODEL = "gpt-4o-mini"
DEVELOPER_MODEL = "gpt-4o-mini"
EMBEDDING_MODEL = "text-embedding-3-small"
EMBEDDING_DIM = 1536
# Chunking
CHUNK_SIZE = 1500
CHUNK_OVERLAP = 200
TOP_K_RESULTS = 10
# Cost tracking (per 1M tokens)
COST_GPT4O_MINI_INPUT = 0.15 # $0.15 per 1M input tokens
COST_GPT4O_MINI_OUTPUT = 0.60 # $0.60 per 1M output tokens
COST_EMBEDDING = 0.02 # $0.02 per 1M tokens
COST_GPT4_INPUT = 30.0 # For comparison - traditional approach
COST_GPT4_OUTPUT = 60.0
# ============================================================================
# Cost Tracker
# ============================================================================
class CostTracker:
"""Tracks API costs and calculates savings"""
def __init__(self):
self.reset()
def reset(self):
"""Reset all counters"""
self.embedding_tokens = 0
self.architect_input_tokens = 0
self.architect_output_tokens = 0
self.developer_input_tokens = 0
self.developer_output_tokens = 0
self.api_calls = 0
self.tickets_processed = 0
self.questions_answered = 0
self.start_time = datetime.now()
self.history = []
def add_embedding(self, tokens: int):
"""Track embedding tokens"""
self.embedding_tokens += tokens
self.api_calls += 1
def add_architect_call(self, input_tokens: int, output_tokens: int):
"""Track architect LLM call"""
self.architect_input_tokens += input_tokens
self.architect_output_tokens += output_tokens
self.api_calls += 1
def add_developer_call(self, input_tokens: int, output_tokens: int):
"""Track developer LLM call"""
self.developer_input_tokens += input_tokens
self.developer_output_tokens += output_tokens
self.api_calls += 1
def record_ticket(self):
"""Record a processed ticket"""
self.tickets_processed += 1
self._add_to_history("ticket")
def record_question(self):
"""Record an answered question"""
self.questions_answered += 1
self._add_to_history("question")
def _add_to_history(self, event_type: str):
"""Add event to history"""
self.history.append({
"timestamp": datetime.now().isoformat(),
"type": event_type,
"cumulative_cost": self.get_actual_cost(),
"cumulative_savings": self.get_savings()
})
def get_actual_cost(self) -> float:
"""Calculate actual cost with our approach"""
config = Config()
embedding_cost = (self.embedding_tokens / 1_000_000) * config.COST_EMBEDDING
architect_cost = (
(self.architect_input_tokens / 1_000_000) * config.COST_GPT4O_MINI_INPUT +
(self.architect_output_tokens / 1_000_000) * config.COST_GPT4O_MINI_OUTPUT
)
developer_cost = (
(self.developer_input_tokens / 1_000_000) * config.COST_GPT4O_MINI_INPUT +
(self.developer_output_tokens / 1_000_000) * config.COST_GPT4O_MINI_OUTPUT
)
return embedding_cost + architect_cost + developer_cost
def get_traditional_cost(self) -> float:
"""Calculate what it would cost with traditional GPT-4 approach"""
config = Config()
# Traditional approach uses GPT-4 for everything
total_input = self.architect_input_tokens + self.developer_input_tokens
total_output = self.architect_output_tokens + self.developer_output_tokens
return (
(total_input / 1_000_000) * config.COST_GPT4_INPUT +
(total_output / 1_000_000) * config.COST_GPT4_OUTPUT
)
def get_savings(self) -> float:
"""Calculate cost savings"""
return self.get_traditional_cost() - self.get_actual_cost()
def get_savings_percentage(self) -> float:
"""Calculate savings as percentage"""
traditional = self.get_traditional_cost()
if traditional == 0:
return 0
return ((traditional - self.get_actual_cost()) / traditional) * 100
def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive statistics"""
return {
"actual_cost": round(self.get_actual_cost(), 6),
"traditional_cost": round(self.get_traditional_cost(), 6),
"savings": round(self.get_savings(), 6),
"savings_percentage": round(self.get_savings_percentage(), 2),
"total_tokens": {
"embedding": self.embedding_tokens,
"architect_input": self.architect_input_tokens,
"architect_output": self.architect_output_tokens,
"developer_input": self.developer_input_tokens,
"developer_output": self.developer_output_tokens,
"total": (self.embedding_tokens + self.architect_input_tokens +
self.architect_output_tokens + self.developer_input_tokens +
self.developer_output_tokens)
},
"api_calls": self.api_calls,
"tickets_processed": self.tickets_processed,
"questions_answered": self.questions_answered,
"session_duration_minutes": round((datetime.now() - self.start_time).seconds / 60, 2),
"cost_per_ticket": round(self.get_actual_cost() / max(self.tickets_processed, 1), 6),
"history": self.history[-50:] # Last 50 events
}
# Global cost tracker
cost_tracker = CostTracker()
# ============================================================================
# Data Models
# ============================================================================
class JiraTicket(BaseModel):
ticket_id: str
title: str
description: str
acceptance_criteria: Optional[str] = None
labels: Optional[List[str]] = None
class ImplementationPlan(BaseModel):
ticket_summary: str
key_entities: List[str]
relevant_files: List[Dict[str, str]]
implementation_steps: List[str]
prerequisites: List[str]
boilerplate_code: Dict[str, str]
architecture_notes: str
estimated_complexity: str
# ============================================================================
# Pinecone-based Codebase Indexer
# ============================================================================
class CodebaseIndexer:
"""Indexes codebase into Pinecone vector database"""
def __init__(self, config: Config):
self.config = config
self._openai_client = None
self._pinecone_client = None
self._index = None
@property
def openai_client(self):
if self._openai_client is None:
if not self.config.OPENAI_API_KEY:
raise ValueError("OpenAI API key required")
self._openai_client = OpenAI(api_key=self.config.OPENAI_API_KEY)
return self._openai_client
@property
def index(self):
if self._index is None:
if not self.config.PINECONE_API_KEY:
raise ValueError("Pinecone API key required")
try:
# Initialize Pinecone (v5+ syntax)
pc = Pinecone(api_key=self.config.PINECONE_API_KEY)
# Create index if not exists
existing_indexes = pc.list_indexes()
index_names = [idx.name for idx in existing_indexes] if hasattr(existing_indexes, '__iter__') else []
if self.config.PINECONE_INDEX_NAME not in index_names:
pc.create_index(
name=self.config.PINECONE_INDEX_NAME,
dimension=self.config.EMBEDDING_DIM,
metric="cosine",
spec=ServerlessSpec(
cloud=self.config.PINECONE_CLOUD,
region=self.config.PINECONE_REGION
)
)
# Wait for index to be ready
print(f"⏳ Waiting for index to be ready...")
time.sleep(10)
self._index = pc.Index(self.config.PINECONE_INDEX_NAME)
print(f"📂 Pinecone index ready: {self.config.PINECONE_INDEX_NAME}")
except Exception as e:
print(f"❌ Pinecone initialization error: {str(e)}")
raise ValueError(f"Failed to initialize Pinecone: {str(e)}")
return self._index
def _get_embedding(self, text: str) -> List[float]:
"""Get embedding and track cost"""
# Estimate tokens (rough: 1 token ≈ 4 chars)
tokens = len(text) // 4
cost_tracker.add_embedding(tokens)
response = self.openai_client.embeddings.create(
model=self.config.EMBEDDING_MODEL,
input=text
)
return response.data[0].embedding
def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
"""Batch embeddings with cost tracking"""
if not texts:
return []
tokens = sum(len(t) // 4 for t in texts)
cost_tracker.add_embedding(tokens)
response = self.openai_client.embeddings.create(
model=self.config.EMBEDDING_MODEL,
input=texts
)
return [item.embedding for item in response.data]
def _detect_language(self, file_path: str) -> str:
ext_map = {
'.py': 'python', '.js': 'javascript', '.jsx': 'javascript',
'.ts': 'typescript', '.tsx': 'typescript', '.java': 'java',
'.go': 'go', '.rs': 'rust', '.cpp': 'cpp', '.c': 'c',
}
return ext_map.get(Path(file_path).suffix.lower(), 'unknown')
def _chunk_content(self, content: str, file_path: str) -> List[Dict[str, Any]]:
"""Chunk content with overlap"""
chunks = []
lines = content.split('\n')
chunk_lines = self.config.CHUNK_SIZE // 50
overlap_lines = self.config.CHUNK_OVERLAP // 50
i = 0
chunk_idx = 0
while i < len(lines):
end = min(i + chunk_lines, len(lines))
chunk_content = '\n'.join(lines[i:end])
if chunk_content.strip(): # Skip empty chunks
chunks.append({
'content': chunk_content,
'file_path': file_path,
'chunk_index': chunk_idx,
'line_start': i + 1,
'line_end': end,
'language': self._detect_language(file_path)
})
i = end - overlap_lines if end < len(lines) else end
chunk_idx += 1
return chunks
def index_file(self, file_path: str, content: str) -> int:
"""Index a single file into Pinecone"""
chunks = self._chunk_content(content, file_path)
if not chunks:
return 0
# Get embeddings
texts = [c['content'] for c in chunks]
embeddings = self._get_embeddings_batch(texts)
# Prepare vectors for Pinecone
vectors = []
for i, chunk in enumerate(chunks):
vector_id = hashlib.md5(
f"{file_path}_{chunk['chunk_index']}".encode()
).hexdigest()
vectors.append({
"id": vector_id,
"values": embeddings[i],
"metadata": {
"file_path": file_path,
"chunk_index": chunk['chunk_index'],
"language": chunk['language'],
"line_start": chunk['line_start'],
"line_end": chunk['line_end'],
"content": chunk['content'][:1000] # Pinecone metadata limit
}
})
# Upsert to Pinecone
self.index.upsert(vectors=vectors)
return len(chunks)
def index_directory(self, directory_path: str, extensions: List[str] = None) -> Dict[str, int]:
"""Index all files in a directory"""
if extensions is None:
extensions = ['.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.go']
results = {}
directory = Path(directory_path)
for ext in extensions:
for file_path in directory.rglob(f"*{ext}"):
if any(skip in str(file_path) for skip in ['node_modules', '__pycache__', '.git', 'venv']):
continue
try:
content = file_path.read_text(encoding='utf-8')
chunks = self.index_file(str(file_path), content)
results[str(file_path)] = chunks
print(f" ✅ {file_path.name}: {chunks} chunks")
except Exception as e:
results[str(file_path)] = f"Error: {e}"
return results
def search(self, query: str, top_k: int = None) -> List[Dict[str, Any]]:
"""Search codebase"""
if top_k is None:
top_k = self.config.TOP_K_RESULTS
query_embedding = self._get_embedding(query)
results = self.index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
formatted = []
for match in results.matches:
formatted.append({
'content': match.metadata.get('content', ''),
'metadata': {
'file_path': match.metadata.get('file_path', ''),
'line_start': match.metadata.get('line_start', 0),
'line_end': match.metadata.get('line_end', 0),
'language': match.metadata.get('language', '')
},
'score': match.score
})
return formatted
def get_stats(self) -> Dict[str, Any]:
"""Get index statistics"""
try:
stats = self.index.describe_index_stats()
return {
'total_chunks': stats.total_vector_count,
'index_name': self.config.PINECONE_INDEX_NAME,
'dimension': stats.dimension
}
except:
return {'total_chunks': 0, 'index_name': self.config.PINECONE_INDEX_NAME}
def clear_index(self):
"""Clear all vectors"""
try:
self.index.delete(delete_all=True)
print("⚠️ Index cleared!")
except:
pass
# ============================================================================
# LLM Specialists with Cost Tracking
# ============================================================================
class ArchitectLLM:
"""LLM #1: Architect - planning and analysis"""
def __init__(self, config: Config):
self.config = config
self._client = None
self.model = config.ARCHITECT_MODEL
@property
def client(self):
if self._client is None:
if not self.config.OPENAI_API_KEY:
raise ValueError("OpenAI API key not set!")
self._client = OpenAI(api_key=self.config.OPENAI_API_KEY)
return self._client
def reset_client(self):
self._client = None
def analyze_ticket(self, ticket: JiraTicket) -> Dict[str, Any]:
prompt = f"""Analyze this Jira ticket for implementation:
ID: {ticket.ticket_id}
Title: {ticket.title}
Description: {ticket.description}
Acceptance Criteria: {ticket.acceptance_criteria or 'Not specified'}
Provide JSON:
{{
"summary": "2-3 sentence summary",
"key_entities": ["entity1", "entity2"],
"technical_keywords": ["keyword1", "keyword2"],
"prerequisites": ["prereq1"],
"complexity": "Low/Medium/High",
"complexity_reason": "why",
"risks": ["risk1"]
}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
# Track costs
usage = response.usage
cost_tracker.add_architect_call(usage.prompt_tokens, usage.completion_tokens)
content = response.choices[0].message.content
try:
content = re.sub(r'^```json?\s*', '', content.strip())
content = re.sub(r'\s*```$', '', content)
return json.loads(content)
except:
return {"summary": content, "key_entities": [], "technical_keywords": [],
"prerequisites": [], "complexity": "Unknown", "complexity_reason": "", "risks": []}
def create_implementation_strategy(self, ticket_analysis: Dict, code_context: List[Dict]) -> Dict:
context_str = "\n".join([
f"File: {c['metadata'].get('file_path', '?')}\n{c['content'][:500]}"
for c in code_context[:5]
])
prompt = f"""Create implementation strategy:
Analysis: {json.dumps(ticket_analysis)}
Code Context:
{context_str}
Provide JSON:
{{
"architecture_notes": "how it fits",
"implementation_steps": ["step1", "step2"],
"files_to_modify": [{{"path": "file", "action": "modify/create", "reason": "why"}}],
"patterns_to_follow": ["pattern1"],
"integration_points": ["point1"]
}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
usage = response.usage
cost_tracker.add_architect_call(usage.prompt_tokens, usage.completion_tokens)
content = response.choices[0].message.content
try:
content = re.sub(r'^```json?\s*', '', content.strip())
content = re.sub(r'\s*```$', '', content)
return json.loads(content)
except:
return {"architecture_notes": content, "implementation_steps": [],
"files_to_modify": [], "patterns_to_follow": [], "integration_points": []}
class DeveloperLLM:
"""LLM #2: Developer - code generation"""
def __init__(self, config: Config):
self.config = config
self._client = None
self.model = config.DEVELOPER_MODEL
@property
def client(self):
if self._client is None:
if not self.config.OPENAI_API_KEY:
raise ValueError("OpenAI API key not set!")
self._client = OpenAI(api_key=self.config.OPENAI_API_KEY)
return self._client
def reset_client(self):
self._client = None
def generate_boilerplate(self, ticket_analysis: Dict, strategy: Dict, code_context: List[Dict]) -> Dict[str, str]:
context_str = "\n".join([f"// {c['metadata'].get('file_path', '?')}\n{c['content'][:400]}"
for c in code_context[:3]])
prompt = f"""Generate boilerplate code:
Summary: {ticket_analysis.get('summary', '')}
Entities: {ticket_analysis.get('key_entities', [])}
Steps: {strategy.get('implementation_steps', [])}
Existing patterns:
{context_str}
Respond with JSON where keys are file paths:
{{"path/file.py": "# code with TODOs"}}"""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.2
)
usage = response.usage
cost_tracker.add_developer_call(usage.prompt_tokens, usage.completion_tokens)
content = response.choices[0].message.content
try:
content = re.sub(r'^```json?\s*', '', content.strip())
content = re.sub(r'\s*```$', '', content)
return json.loads(content)
except:
return {"generated_code.txt": content}
def explain_code_context(self, code_context: List[Dict], question: str) -> str:
context_str = "\n".join([f"File: {c['metadata'].get('file_path', '?')}\n{c['content']}"
for c in code_context[:5]])
prompt = f"""Explain this code:
{context_str}
Question: {question}
Be concise and helpful."""
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
usage = response.usage
cost_tracker.add_developer_call(usage.prompt_tokens, usage.completion_tokens)
return response.choices[0].message.content
# ============================================================================
# Main Agent
# ============================================================================
class DevProductivityAgent:
"""Main orchestrator with Pinecone and cost tracking"""
def __init__(self, config: Config = None):
self.config = config or Config()
self.indexer = CodebaseIndexer(self.config)
self.architect = ArchitectLLM(self.config)
self.developer = DeveloperLLM(self.config)
def set_api_keys(self, openai_key: str = None, pinecone_key: str = None):
"""Set API keys"""
if openai_key:
self.config.OPENAI_API_KEY = openai_key
self.architect.reset_client()
self.developer.reset_client()
self.indexer._openai_client = None
if pinecone_key:
self.config.PINECONE_API_KEY = pinecone_key
self.indexer._index = None
def index_codebase(self, directory: str, extensions: List[str] = None) -> Dict:
print(f"📂 Indexing: {directory}")
results = self.indexer.index_directory(directory, extensions)
stats = self.indexer.get_stats()
return {
"files_indexed": len([r for r in results.values() if isinstance(r, int)]),
"total_chunks": stats['total_chunks'],
"details": results
}
def process_ticket(self, ticket: JiraTicket) -> ImplementationPlan:
print("📋 Analyzing...")
analysis = self.architect.analyze_ticket(ticket)
print("🔍 Searching...")
queries = analysis.get('technical_keywords', []) + analysis.get('key_entities', [])
all_results = []
seen = set()
for q in queries[:5]:
for r in self.indexer.search(q, top_k=5):
fp = r['metadata'].get('file_path', '')
if fp not in seen:
all_results.append(r)
seen.add(fp)
print("📐 Planning...")
strategy = self.architect.create_implementation_strategy(analysis, all_results)
print("💻 Generating...")
code = self.developer.generate_boilerplate(analysis, strategy, all_results)
cost_tracker.record_ticket()
return ImplementationPlan(
ticket_summary=analysis.get('summary', ''),
key_entities=analysis.get('key_entities', []),
relevant_files=[{
'path': r['metadata'].get('file_path', ''),
'relevance': f"Lines {r['metadata'].get('line_start', '?')}-{r['metadata'].get('line_end', '?')}",
'preview': r['content'][:200]
} for r in all_results[:10]],
implementation_steps=strategy.get('implementation_steps', []),
prerequisites=analysis.get('prerequisites', []),
boilerplate_code=code,
architecture_notes=strategy.get('architecture_notes', ''),
estimated_complexity=analysis.get('complexity', 'Unknown')
)
def ask_about_code(self, question: str) -> str:
results = self.indexer.search(question)
if not results:
return "No relevant code found. Index your codebase first."
answer = self.developer.explain_code_context(results, question)
cost_tracker.record_question()
return answer
def get_cost_stats(self) -> Dict:
return cost_tracker.get_stats()
def reset_cost_tracking(self):
cost_tracker.reset()
# ============================================================================
# FastAPI
# ============================================================================
app = FastAPI(title="Developer Productivity Agent", version="2.0.0")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"])
agent = DevProductivityAgent()
@app.get("/")
async def root():
stats = agent.indexer.get_stats()
return {"status": "healthy", "vector_db": "Pinecone", "chunks": stats['total_chunks']}
@app.get("/stats")
async def get_stats():
return agent.indexer.get_stats()
@app.get("/cost-analytics")
async def get_cost_analytics():
"""Get cost analytics and savings"""
return agent.get_cost_stats()
@app.post("/reset-costs")
async def reset_costs():
agent.reset_cost_tracking()
return {"status": "reset"}
@app.post("/index")
async def index_codebase(directory: str, extensions: List[str] = None):
try:
return {"status": "success", "results": agent.index_codebase(directory, extensions)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/process-ticket", response_model=ImplementationPlan)
async def process_ticket(ticket: JiraTicket):
try:
return agent.process_ticket(ticket)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/ask")
async def ask(question: str):
try:
return {"answer": agent.ask_about_code(question)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search")
async def search(query: str, top_k: int = 10):
try:
return {"results": agent.indexer.search(query, top_k)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/clear")
async def clear():
agent.indexer.clear_index()
return {"status": "cleared"}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--index", type=str)
parser.add_argument("--serve", action="store_true")
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
if args.index:
agent.index_codebase(args.index)
if args.serve:
uvicorn.run(app, host="0.0.0.0", port=args.port)