File size: 7,502 Bytes
5b86222 99a1431 5b86222 99a1431 5b86222 99a1431 5b86222 99a1431 5b86222 99a1431 5b86222 99a1431 5b86222 99a1431 5b86222 99a1431 5b86222 99a1431 5b86222 99a1431 5b86222 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | from pinecone import Pinecone, ServerlessSpec
from typing import List, Dict, Optional
import logging
import os
from app.core.config import settings
logger = logging.getLogger(__name__)
class PineconeService:
def __init__(self):
try:
print("π§ [PINECONE] Initializing Pinecone client...", flush=True)
if not settings.pinecone_api_key:
raise Exception("PINECONE_API_KEY environment variable is required")
# Initialize Pinecone client
self.pc = Pinecone(api_key=settings.pinecone_api_key)
# Check if index exists, create if not
self.index_name = settings.pinecone_index_name
self._ensure_index_exists()
# Connect to index
self.index = self.pc.Index(self.index_name)
print(f"β
[PINECONE] Connected to index: {self.index_name}", flush=True)
logger.info(f"π― Pinecone service initialized with index: {self.index_name}")
except Exception as e:
print(f"β [PINECONE] Failed to initialize: {e}", flush=True)
logger.error(f"β Failed to initialize Pinecone: {e}")
raise Exception(f"Failed to initialize Pinecone: {e}")
def _ensure_index_exists(self):
"""Create index if it doesn't exist"""
try:
existing_indexes = [index.name for index in self.pc.list_indexes()]
if self.index_name not in existing_indexes:
print(f"π [PINECONE] Creating new index: {self.index_name}", flush=True)
self.pc.create_index(
name=self.index_name,
dimension=384, # all-MiniLM-L6-v2 embedding dimension
metric='cosine',
spec=ServerlessSpec(
cloud='aws',
region='us-east-1'
)
)
print(f"β
[PINECONE] Index created successfully: {self.index_name}", flush=True)
else:
print(f"π [PINECONE] Using existing index: {self.index_name}", flush=True)
except Exception as e:
print(f"β [PINECONE] Error with index: {e}", flush=True)
raise
async def store_embeddings(self, repository_id: int, embedded_chunks: List[Dict]):
"""Store embeddings in Pinecone with minimal metadata (content stored in PostgreSQL)"""
print(f"πΎ [PINECONE] Storing {len(embedded_chunks)} embeddings for repository {repository_id}", flush=True)
logger.info(f"πΎ Storing {len(embedded_chunks)} embeddings for repository {repository_id}")
try:
vectors = []
for i, chunk in enumerate(embedded_chunks):
vector_id = f"repo_{repository_id}_chunk_{chunk['chunk_index']}_{i}"
# Store ONLY identifiers - full content is in PostgreSQL
vector = {
"id": vector_id,
"values": chunk['embedding'],
"metadata": {
"repository_id": repository_id,
"file_path": chunk['file_path'],
"chunk_index": chunk['chunk_index'],
"start_line": chunk['start_line'],
"end_line": chunk['end_line'],
"chunk_type": chunk['chunk_type']
# NO content field - saves Pinecone storage!
}
}
vectors.append(vector)
# Batch upsert in chunks of 100
batch_size = 100
total_batches = (len(vectors) + batch_size - 1) // batch_size
for batch_num, i in enumerate(range(0, len(vectors), batch_size), 1):
end_idx = min(i + batch_size, len(vectors))
batch_vectors = vectors[i:end_idx]
# Upsert to Pinecone
self.index.upsert(
vectors=batch_vectors,
namespace=f"repo_{repository_id}"
)
print(f"β
[PINECONE] Stored batch {batch_num}/{total_batches} ({len(batch_vectors)} vectors)", flush=True)
print(f"π [PINECONE] Successfully stored all {len(embedded_chunks)} embeddings for repository {repository_id}!", flush=True)
logger.info(f"β
Successfully stored all embeddings for repository {repository_id}")
except Exception as e:
print(f"β [PINECONE] Error storing embeddings: {e}", flush=True)
logger.error(f"β Error storing embeddings in Pinecone: {e}")
raise
async def search_similar_code(self, repository_id: int, query_embedding: List[float], top_k: int = 5) -> List[Dict]:
"""Search for similar code using Pinecone - returns identifiers only"""
try:
print(f"π [PINECONE] Searching for {top_k} similar chunks in repository {repository_id}", flush=True)
# Query Pinecone with repository namespace
results = self.index.query(
vector=query_embedding,
top_k=top_k,
namespace=f"repo_{repository_id}",
include_metadata=True,
include_values=False
)
search_results = []
for match in results.matches:
similarity = match.score # Cosine similarity (0-1, higher is better)
metadata = match.metadata
# Return identifiers to fetch full content from PostgreSQL
search_results.append({
'repository_id': metadata.get('repository_id'),
'file_path': metadata.get('file_path', ''),
'chunk_index': metadata.get('chunk_index', 0),
'start_line': metadata.get('start_line', 0),
'end_line': metadata.get('end_line', 0),
'chunk_type': metadata.get('chunk_type', ''),
'similarity': similarity
})
print(f"β
[PINECONE] Found {len(search_results)} similar code chunks (identifiers only)", flush=True)
logger.info(f"π Found {len(search_results)} similar code chunks")
return search_results
except Exception as e:
print(f"β [PINECONE] Error searching: {e}", flush=True)
logger.error(f"β Error searching in Pinecone: {e}")
return []
async def delete_repository_data(self, repository_id: int):
"""Delete all vectors for a repository"""
try:
namespace = f"repo_{repository_id}"
# Delete all vectors in the namespace
self.index.delete(delete_all=True, namespace=namespace)
print(f"ποΈ [PINECONE] Deleted all data for repository {repository_id}", flush=True)
logger.info(f"ποΈ Deleted all data for repository {repository_id}")
except Exception as e:
print(f"β οΈ [PINECONE] Error deleting repository data: {e}", flush=True)
logger.warning(f"β οΈ Error deleting repository data: {e}") |