""" Developer Productivity Agent RAG-based system using Pinecone for vector storage and GPT-4o-mini. Features: - Pinecone vector database (2GB free tier) - Divided LLM Architecture for cost optimization - Real-time cost tracking and analytics - OpenAI embeddings (text-embedding-3-small) """ import os import json import time from pathlib import Path from typing import List, Dict, Any, Optional import hashlib from datetime import datetime # Core dependencies from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import uvicorn # Vector database - Pinecone from pinecone import Pinecone, ServerlessSpec # LLM client from openai import OpenAI # Code parsing import ast import re from dataclasses import dataclass, field # ============================================================================ # Configuration # ============================================================================ class Config: """Application configuration""" # OpenAI OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") # Pinecone PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "") PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "codebase-index") PINECONE_CLOUD = "aws" PINECONE_REGION = "us-east-1" # Models ARCHITECT_MODEL = "gpt-4o-mini" DEVELOPER_MODEL = "gpt-4o-mini" EMBEDDING_MODEL = "text-embedding-3-small" EMBEDDING_DIM = 1536 # Chunking CHUNK_SIZE = 1500 CHUNK_OVERLAP = 200 TOP_K_RESULTS = 10 # Cost tracking (per 1M tokens) COST_GPT4O_MINI_INPUT = 0.15 # $0.15 per 1M input tokens COST_GPT4O_MINI_OUTPUT = 0.60 # $0.60 per 1M output tokens COST_EMBEDDING = 0.02 # $0.02 per 1M tokens COST_GPT4_INPUT = 30.0 # For comparison - traditional approach COST_GPT4_OUTPUT = 60.0 # ============================================================================ # Cost Tracker # ============================================================================ class CostTracker: """Tracks API costs and calculates savings""" def __init__(self): self.reset() def reset(self): """Reset all counters""" self.embedding_tokens = 0 self.architect_input_tokens = 0 self.architect_output_tokens = 0 self.developer_input_tokens = 0 self.developer_output_tokens = 0 self.api_calls = 0 self.tickets_processed = 0 self.questions_answered = 0 self.start_time = datetime.now() self.history = [] def add_embedding(self, tokens: int): """Track embedding tokens""" self.embedding_tokens += tokens self.api_calls += 1 def add_architect_call(self, input_tokens: int, output_tokens: int): """Track architect LLM call""" self.architect_input_tokens += input_tokens self.architect_output_tokens += output_tokens self.api_calls += 1 def add_developer_call(self, input_tokens: int, output_tokens: int): """Track developer LLM call""" self.developer_input_tokens += input_tokens self.developer_output_tokens += output_tokens self.api_calls += 1 def record_ticket(self): """Record a processed ticket""" self.tickets_processed += 1 self._add_to_history("ticket") def record_question(self): """Record an answered question""" self.questions_answered += 1 self._add_to_history("question") def _add_to_history(self, event_type: str): """Add event to history""" self.history.append({ "timestamp": datetime.now().isoformat(), "type": event_type, "cumulative_cost": self.get_actual_cost(), "cumulative_savings": self.get_savings() }) def get_actual_cost(self) -> float: """Calculate actual cost with our approach""" config = Config() embedding_cost = (self.embedding_tokens / 1_000_000) * config.COST_EMBEDDING architect_cost = ( (self.architect_input_tokens / 1_000_000) * config.COST_GPT4O_MINI_INPUT + (self.architect_output_tokens / 1_000_000) * config.COST_GPT4O_MINI_OUTPUT ) developer_cost = ( (self.developer_input_tokens / 1_000_000) * config.COST_GPT4O_MINI_INPUT + (self.developer_output_tokens / 1_000_000) * config.COST_GPT4O_MINI_OUTPUT ) return embedding_cost + architect_cost + developer_cost def get_traditional_cost(self) -> float: """Calculate what it would cost with traditional GPT-4 approach""" config = Config() # Traditional approach uses GPT-4 for everything total_input = self.architect_input_tokens + self.developer_input_tokens total_output = self.architect_output_tokens + self.developer_output_tokens return ( (total_input / 1_000_000) * config.COST_GPT4_INPUT + (total_output / 1_000_000) * config.COST_GPT4_OUTPUT ) def get_savings(self) -> float: """Calculate cost savings""" return self.get_traditional_cost() - self.get_actual_cost() def get_savings_percentage(self) -> float: """Calculate savings as percentage""" traditional = self.get_traditional_cost() if traditional == 0: return 0 return ((traditional - self.get_actual_cost()) / traditional) * 100 def get_stats(self) -> Dict[str, Any]: """Get comprehensive statistics""" return { "actual_cost": round(self.get_actual_cost(), 6), "traditional_cost": round(self.get_traditional_cost(), 6), "savings": round(self.get_savings(), 6), "savings_percentage": round(self.get_savings_percentage(), 2), "total_tokens": { "embedding": self.embedding_tokens, "architect_input": self.architect_input_tokens, "architect_output": self.architect_output_tokens, "developer_input": self.developer_input_tokens, "developer_output": self.developer_output_tokens, "total": (self.embedding_tokens + self.architect_input_tokens + self.architect_output_tokens + self.developer_input_tokens + self.developer_output_tokens) }, "api_calls": self.api_calls, "tickets_processed": self.tickets_processed, "questions_answered": self.questions_answered, "session_duration_minutes": round((datetime.now() - self.start_time).seconds / 60, 2), "cost_per_ticket": round(self.get_actual_cost() / max(self.tickets_processed, 1), 6), "history": self.history[-50:] # Last 50 events } # Global cost tracker cost_tracker = CostTracker() # ============================================================================ # Data Models # ============================================================================ class JiraTicket(BaseModel): ticket_id: str title: str description: str acceptance_criteria: Optional[str] = None labels: Optional[List[str]] = None class ImplementationPlan(BaseModel): ticket_summary: str key_entities: List[str] relevant_files: List[Dict[str, str]] implementation_steps: List[str] prerequisites: List[str] boilerplate_code: Dict[str, str] architecture_notes: str estimated_complexity: str # ============================================================================ # Pinecone-based Codebase Indexer # ============================================================================ class CodebaseIndexer: """Indexes codebase into Pinecone vector database""" def __init__(self, config: Config): self.config = config self._openai_client = None self._pinecone_client = None self._index = None @property def openai_client(self): if self._openai_client is None: if not self.config.OPENAI_API_KEY: raise ValueError("OpenAI API key required") self._openai_client = OpenAI(api_key=self.config.OPENAI_API_KEY) return self._openai_client @property def index(self): if self._index is None: if not self.config.PINECONE_API_KEY: raise ValueError("Pinecone API key required") try: # Initialize Pinecone (v5+ syntax) pc = Pinecone(api_key=self.config.PINECONE_API_KEY) # Create index if not exists existing_indexes = pc.list_indexes() index_names = [idx.name for idx in existing_indexes] if hasattr(existing_indexes, '__iter__') else [] if self.config.PINECONE_INDEX_NAME not in index_names: pc.create_index( name=self.config.PINECONE_INDEX_NAME, dimension=self.config.EMBEDDING_DIM, metric="cosine", spec=ServerlessSpec( cloud=self.config.PINECONE_CLOUD, region=self.config.PINECONE_REGION ) ) # Wait for index to be ready print(f"⏳ Waiting for index to be ready...") time.sleep(10) self._index = pc.Index(self.config.PINECONE_INDEX_NAME) print(f"📂 Pinecone index ready: {self.config.PINECONE_INDEX_NAME}") except Exception as e: print(f"❌ Pinecone initialization error: {str(e)}") raise ValueError(f"Failed to initialize Pinecone: {str(e)}") return self._index def _get_embedding(self, text: str) -> List[float]: """Get embedding and track cost""" # Estimate tokens (rough: 1 token ≈ 4 chars) tokens = len(text) // 4 cost_tracker.add_embedding(tokens) response = self.openai_client.embeddings.create( model=self.config.EMBEDDING_MODEL, input=text ) return response.data[0].embedding def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]: """Batch embeddings with cost tracking""" if not texts: return [] tokens = sum(len(t) // 4 for t in texts) cost_tracker.add_embedding(tokens) response = self.openai_client.embeddings.create( model=self.config.EMBEDDING_MODEL, input=texts ) return [item.embedding for item in response.data] def _detect_language(self, file_path: str) -> str: ext_map = { '.py': 'python', '.js': 'javascript', '.jsx': 'javascript', '.ts': 'typescript', '.tsx': 'typescript', '.java': 'java', '.go': 'go', '.rs': 'rust', '.cpp': 'cpp', '.c': 'c', } return ext_map.get(Path(file_path).suffix.lower(), 'unknown') def _chunk_content(self, content: str, file_path: str) -> List[Dict[str, Any]]: """Chunk content with overlap""" chunks = [] lines = content.split('\n') chunk_lines = self.config.CHUNK_SIZE // 50 overlap_lines = self.config.CHUNK_OVERLAP // 50 i = 0 chunk_idx = 0 while i < len(lines): end = min(i + chunk_lines, len(lines)) chunk_content = '\n'.join(lines[i:end]) if chunk_content.strip(): # Skip empty chunks chunks.append({ 'content': chunk_content, 'file_path': file_path, 'chunk_index': chunk_idx, 'line_start': i + 1, 'line_end': end, 'language': self._detect_language(file_path) }) i = end - overlap_lines if end < len(lines) else end chunk_idx += 1 return chunks def index_file(self, file_path: str, content: str) -> int: """Index a single file into Pinecone""" chunks = self._chunk_content(content, file_path) if not chunks: return 0 # Get embeddings texts = [c['content'] for c in chunks] embeddings = self._get_embeddings_batch(texts) # Prepare vectors for Pinecone vectors = [] for i, chunk in enumerate(chunks): vector_id = hashlib.md5( f"{file_path}_{chunk['chunk_index']}".encode() ).hexdigest() vectors.append({ "id": vector_id, "values": embeddings[i], "metadata": { "file_path": file_path, "chunk_index": chunk['chunk_index'], "language": chunk['language'], "line_start": chunk['line_start'], "line_end": chunk['line_end'], "content": chunk['content'][:1000] # Pinecone metadata limit } }) # Upsert to Pinecone self.index.upsert(vectors=vectors) return len(chunks) def index_directory(self, directory_path: str, extensions: List[str] = None) -> Dict[str, int]: """Index all files in a directory""" if extensions is None: extensions = ['.py', '.js', '.jsx', '.ts', '.tsx', '.java', '.go'] results = {} directory = Path(directory_path) for ext in extensions: for file_path in directory.rglob(f"*{ext}"): if any(skip in str(file_path) for skip in ['node_modules', '__pycache__', '.git', 'venv']): continue try: content = file_path.read_text(encoding='utf-8') chunks = self.index_file(str(file_path), content) results[str(file_path)] = chunks print(f" ✅ {file_path.name}: {chunks} chunks") except Exception as e: results[str(file_path)] = f"Error: {e}" return results def search(self, query: str, top_k: int = None) -> List[Dict[str, Any]]: """Search codebase""" if top_k is None: top_k = self.config.TOP_K_RESULTS query_embedding = self._get_embedding(query) results = self.index.query( vector=query_embedding, top_k=top_k, include_metadata=True ) formatted = [] for match in results.matches: formatted.append({ 'content': match.metadata.get('content', ''), 'metadata': { 'file_path': match.metadata.get('file_path', ''), 'line_start': match.metadata.get('line_start', 0), 'line_end': match.metadata.get('line_end', 0), 'language': match.metadata.get('language', '') }, 'score': match.score }) return formatted def get_stats(self) -> Dict[str, Any]: """Get index statistics""" try: stats = self.index.describe_index_stats() return { 'total_chunks': stats.total_vector_count, 'index_name': self.config.PINECONE_INDEX_NAME, 'dimension': stats.dimension } except: return {'total_chunks': 0, 'index_name': self.config.PINECONE_INDEX_NAME} def clear_index(self): """Clear all vectors""" try: self.index.delete(delete_all=True) print("⚠️ Index cleared!") except: pass # ============================================================================ # LLM Specialists with Cost Tracking # ============================================================================ class ArchitectLLM: """LLM #1: Architect - planning and analysis""" def __init__(self, config: Config): self.config = config self._client = None self.model = config.ARCHITECT_MODEL @property def client(self): if self._client is None: if not self.config.OPENAI_API_KEY: raise ValueError("OpenAI API key not set!") self._client = OpenAI(api_key=self.config.OPENAI_API_KEY) return self._client def reset_client(self): self._client = None def analyze_ticket(self, ticket: JiraTicket) -> Dict[str, Any]: prompt = f"""Analyze this Jira ticket for implementation: ID: {ticket.ticket_id} Title: {ticket.title} Description: {ticket.description} Acceptance Criteria: {ticket.acceptance_criteria or 'Not specified'} Provide JSON: {{ "summary": "2-3 sentence summary", "key_entities": ["entity1", "entity2"], "technical_keywords": ["keyword1", "keyword2"], "prerequisites": ["prereq1"], "complexity": "Low/Medium/High", "complexity_reason": "why", "risks": ["risk1"] }}""" response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.3 ) # Track costs usage = response.usage cost_tracker.add_architect_call(usage.prompt_tokens, usage.completion_tokens) content = response.choices[0].message.content try: content = re.sub(r'^```json?\s*', '', content.strip()) content = re.sub(r'\s*```$', '', content) return json.loads(content) except: return {"summary": content, "key_entities": [], "technical_keywords": [], "prerequisites": [], "complexity": "Unknown", "complexity_reason": "", "risks": []} def create_implementation_strategy(self, ticket_analysis: Dict, code_context: List[Dict]) -> Dict: context_str = "\n".join([ f"File: {c['metadata'].get('file_path', '?')}\n{c['content'][:500]}" for c in code_context[:5] ]) prompt = f"""Create implementation strategy: Analysis: {json.dumps(ticket_analysis)} Code Context: {context_str} Provide JSON: {{ "architecture_notes": "how it fits", "implementation_steps": ["step1", "step2"], "files_to_modify": [{{"path": "file", "action": "modify/create", "reason": "why"}}], "patterns_to_follow": ["pattern1"], "integration_points": ["point1"] }}""" response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.3 ) usage = response.usage cost_tracker.add_architect_call(usage.prompt_tokens, usage.completion_tokens) content = response.choices[0].message.content try: content = re.sub(r'^```json?\s*', '', content.strip()) content = re.sub(r'\s*```$', '', content) return json.loads(content) except: return {"architecture_notes": content, "implementation_steps": [], "files_to_modify": [], "patterns_to_follow": [], "integration_points": []} class DeveloperLLM: """LLM #2: Developer - code generation""" def __init__(self, config: Config): self.config = config self._client = None self.model = config.DEVELOPER_MODEL @property def client(self): if self._client is None: if not self.config.OPENAI_API_KEY: raise ValueError("OpenAI API key not set!") self._client = OpenAI(api_key=self.config.OPENAI_API_KEY) return self._client def reset_client(self): self._client = None def generate_boilerplate(self, ticket_analysis: Dict, strategy: Dict, code_context: List[Dict]) -> Dict[str, str]: context_str = "\n".join([f"// {c['metadata'].get('file_path', '?')}\n{c['content'][:400]}" for c in code_context[:3]]) prompt = f"""Generate boilerplate code: Summary: {ticket_analysis.get('summary', '')} Entities: {ticket_analysis.get('key_entities', [])} Steps: {strategy.get('implementation_steps', [])} Existing patterns: {context_str} Respond with JSON where keys are file paths: {{"path/file.py": "# code with TODOs"}}""" response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.2 ) usage = response.usage cost_tracker.add_developer_call(usage.prompt_tokens, usage.completion_tokens) content = response.choices[0].message.content try: content = re.sub(r'^```json?\s*', '', content.strip()) content = re.sub(r'\s*```$', '', content) return json.loads(content) except: return {"generated_code.txt": content} def explain_code_context(self, code_context: List[Dict], question: str) -> str: context_str = "\n".join([f"File: {c['metadata'].get('file_path', '?')}\n{c['content']}" for c in code_context[:5]]) prompt = f"""Explain this code: {context_str} Question: {question} Be concise and helpful.""" response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.3 ) usage = response.usage cost_tracker.add_developer_call(usage.prompt_tokens, usage.completion_tokens) return response.choices[0].message.content # ============================================================================ # Main Agent # ============================================================================ class DevProductivityAgent: """Main orchestrator with Pinecone and cost tracking""" def __init__(self, config: Config = None): self.config = config or Config() self.indexer = CodebaseIndexer(self.config) self.architect = ArchitectLLM(self.config) self.developer = DeveloperLLM(self.config) def set_api_keys(self, openai_key: str = None, pinecone_key: str = None): """Set API keys""" if openai_key: self.config.OPENAI_API_KEY = openai_key self.architect.reset_client() self.developer.reset_client() self.indexer._openai_client = None if pinecone_key: self.config.PINECONE_API_KEY = pinecone_key self.indexer._index = None def index_codebase(self, directory: str, extensions: List[str] = None) -> Dict: print(f"📂 Indexing: {directory}") results = self.indexer.index_directory(directory, extensions) stats = self.indexer.get_stats() return { "files_indexed": len([r for r in results.values() if isinstance(r, int)]), "total_chunks": stats['total_chunks'], "details": results } def process_ticket(self, ticket: JiraTicket) -> ImplementationPlan: print("📋 Analyzing...") analysis = self.architect.analyze_ticket(ticket) print("🔍 Searching...") queries = analysis.get('technical_keywords', []) + analysis.get('key_entities', []) all_results = [] seen = set() for q in queries[:5]: for r in self.indexer.search(q, top_k=5): fp = r['metadata'].get('file_path', '') if fp not in seen: all_results.append(r) seen.add(fp) print("📐 Planning...") strategy = self.architect.create_implementation_strategy(analysis, all_results) print("💻 Generating...") code = self.developer.generate_boilerplate(analysis, strategy, all_results) cost_tracker.record_ticket() return ImplementationPlan( ticket_summary=analysis.get('summary', ''), key_entities=analysis.get('key_entities', []), relevant_files=[{ 'path': r['metadata'].get('file_path', ''), 'relevance': f"Lines {r['metadata'].get('line_start', '?')}-{r['metadata'].get('line_end', '?')}", 'preview': r['content'][:200] } for r in all_results[:10]], implementation_steps=strategy.get('implementation_steps', []), prerequisites=analysis.get('prerequisites', []), boilerplate_code=code, architecture_notes=strategy.get('architecture_notes', ''), estimated_complexity=analysis.get('complexity', 'Unknown') ) def ask_about_code(self, question: str) -> str: results = self.indexer.search(question) if not results: return "No relevant code found. Index your codebase first." answer = self.developer.explain_code_context(results, question) cost_tracker.record_question() return answer def get_cost_stats(self) -> Dict: return cost_tracker.get_stats() def reset_cost_tracking(self): cost_tracker.reset() # ============================================================================ # FastAPI # ============================================================================ app = FastAPI(title="Developer Productivity Agent", version="2.0.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) agent = DevProductivityAgent() @app.get("/") async def root(): stats = agent.indexer.get_stats() return {"status": "healthy", "vector_db": "Pinecone", "chunks": stats['total_chunks']} @app.get("/stats") async def get_stats(): return agent.indexer.get_stats() @app.get("/cost-analytics") async def get_cost_analytics(): """Get cost analytics and savings""" return agent.get_cost_stats() @app.post("/reset-costs") async def reset_costs(): agent.reset_cost_tracking() return {"status": "reset"} @app.post("/index") async def index_codebase(directory: str, extensions: List[str] = None): try: return {"status": "success", "results": agent.index_codebase(directory, extensions)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/process-ticket", response_model=ImplementationPlan) async def process_ticket(ticket: JiraTicket): try: return agent.process_ticket(ticket) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/ask") async def ask(question: str): try: return {"answer": agent.ask_about_code(question)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/search") async def search(query: str, top_k: int = 10): try: return {"results": agent.indexer.search(query, top_k)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/clear") async def clear(): agent.indexer.clear_index() return {"status": "cleared"} if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--index", type=str) parser.add_argument("--serve", action="store_true") parser.add_argument("--port", type=int, default=8000) args = parser.parse_args() if args.index: agent.index_codebase(args.index) if args.serve: uvicorn.run(app, host="0.0.0.0", port=args.port)