strike-capital-dd / pinecone_store.py
EnesDS's picture
Upload 9 files
107dab7 verified
"""
Pinecone Vector Database Integration for Strike Capital.
Handles document storage, embedding, and retrieval.
"""
import json
import hashlib
from datetime import datetime
from openai import OpenAI
from pinecone import Pinecone
from config import (
OPENAI_API_KEY,
OPENAI_EMBEDDING_MODEL,
PINECONE_API_KEY,
PINECONE_HOST,
PINECONE_INDEX_NAME
)
# Lazy initialization - clients created on first use
_openai_client = None
_pinecone_index = None
def get_openai_client():
"""Get or create OpenAI client."""
global _openai_client
if _openai_client is None:
_openai_client = OpenAI(api_key=OPENAI_API_KEY)
return _openai_client
def get_pinecone_index():
"""Get or create Pinecone index connection."""
global _pinecone_index
if _pinecone_index is None:
if not PINECONE_HOST:
raise ValueError("PINECONE_HOST environment variable is not set")
pc = Pinecone(api_key=PINECONE_API_KEY)
_pinecone_index = pc.Index(host=PINECONE_HOST)
return _pinecone_index
def generate_embedding(text: str) -> list[float]:
"""
Generate embedding for text using OpenAI text-embedding-3-small.
Args:
text: Text to embed
Returns:
List of floats representing the embedding vector (1536 dimensions)
"""
client = get_openai_client()
response = client.embeddings.create(
model=OPENAI_EMBEDDING_MODEL,
input=text
)
return response.data[0].embedding
def generate_doc_id(company_name: str, section: str, version: str = "v0") -> str:
"""
Generate a unique document ID for Pinecone.
Args:
company_name: Name of the company
section: Document section (e.g., "company_overview", "market")
version: Document version (v0, v1, etc.)
Returns:
Unique document ID
"""
unique_string = f"{company_name}_{section}_{version}"
return hashlib.md5(unique_string.encode()).hexdigest()
def store_sentence_chunks(company_name: str, sentence_chunks: list[str], version: str = "v0") -> dict:
"""
Store sentence chunks in Pinecone with embeddings.
Each sentence (with context window) becomes a separate vector.
Args:
company_name: Name of the company
sentence_chunks: List of sentence chunks with context
version: Document version
Returns:
Dictionary with storage status and IDs
"""
vectors_to_upsert = []
stored_ids = []
timestamp = datetime.now().isoformat()
print(f"Storing {len(sentence_chunks)} sentence chunks for {company_name}...")
# Process in batches of 100 to avoid rate limits
batch_size = 100
for i, chunk_text in enumerate(sentence_chunks):
if not chunk_text or len(chunk_text.strip()) < 20:
continue
# Generate unique ID for this chunk
doc_id = f"{company_name}_{version}_chunk_{i}"
doc_id = hashlib.md5(doc_id.encode()).hexdigest()
# Generate embedding
try:
embedding = generate_embedding(chunk_text)
except Exception as e:
print(f"Error embedding chunk {i}: {e}")
continue
# Create vector with metadata
vector = {
"id": doc_id,
"values": embedding,
"metadata": {
"company_name": company_name,
"chunk_type": "sentence",
"chunk_index": i,
"version": version,
"timestamp": timestamp,
"content": chunk_text[:1000]
}
}
vectors_to_upsert.append(vector)
stored_ids.append(doc_id)
# Upsert in batches
if len(vectors_to_upsert) >= batch_size:
get_pinecone_index().upsert(vectors=vectors_to_upsert)
print(f"Upserted batch: {len(stored_ids)} vectors so far...")
vectors_to_upsert = []
# Upsert remaining vectors
if vectors_to_upsert:
get_pinecone_index().upsert(vectors=vectors_to_upsert)
print(f"Successfully stored {len(stored_ids)} sentence chunks")
return {
"status": "success",
"company_name": company_name,
"version": version,
"chunks_stored": len(stored_ids),
"ids": stored_ids
}
def store_extracted_data(company_name: str, extracted_data: dict, version: str = "v0") -> dict:
"""
Store extracted Harmonic data in Pinecone with embeddings.
Creates section-level chunks with metadata.
Args:
company_name: Name of the company
extracted_data: Dictionary of extracted data from PDF
version: Document version
Returns:
Dictionary with storage status and IDs
"""
vectors_to_upsert = []
stored_ids = []
timestamp = datetime.now().isoformat()
# Store each section as a separate vector
sections = [
("company_info", extracted_data.get("company_info", {})),
("funding", extracted_data.get("funding", {})),
("founders", extracted_data.get("founders", [])),
("product", extracted_data.get("product", {})),
("market", extracted_data.get("market", {})),
("traction", extracted_data.get("traction", {}))
]
for section_name, section_data in sections:
if not section_data:
continue
# Convert section data to text for embedding
section_text = json.dumps(section_data, indent=2)
# Skip empty sections
if len(section_text) < 10:
continue
# Generate embedding
embedding = generate_embedding(section_text)
# Generate unique ID
doc_id = generate_doc_id(company_name, section_name, version)
# Create vector with metadata
vector = {
"id": doc_id,
"values": embedding,
"metadata": {
"company_name": company_name,
"section": section_name,
"version": version,
"timestamp": timestamp,
"content": section_text[:1000], # Store truncated content in metadata
"full_content": section_text
}
}
vectors_to_upsert.append(vector)
stored_ids.append(doc_id)
# Upsert vectors to Pinecone
if vectors_to_upsert:
get_pinecone_index().upsert(vectors=vectors_to_upsert)
return {
"status": "success",
"company_name": company_name,
"version": version,
"sections_stored": len(vectors_to_upsert),
"ids": stored_ids
}
def store_v0_document(company_name: str, v0_document: str, version: str = "v0") -> dict:
"""
Store the generated V0 document as a single vector.
Args:
company_name: Name of the company
v0_document: Full V0 document text (markdown)
version: Document version
Returns:
Dictionary with storage status
"""
embedding = generate_embedding(v0_document[:8000]) # Truncate for embedding
doc_id = generate_doc_id(company_name, "full_document", version)
vector = {
"id": doc_id,
"values": embedding,
"metadata": {
"company_name": company_name,
"section": "full_document",
"version": version,
"timestamp": datetime.now().isoformat(),
"document_type": "v0_dd_doc",
"content": v0_document[:1000],
"full_content": v0_document
}
}
get_pinecone_index().upsert(vectors=[vector])
return {
"status": "success",
"id": doc_id
}
def retrieve_company_data(company_name: str, version: str = "v0") -> dict:
"""
Retrieve all stored data for a company.
Args:
company_name: Name of the company
version: Document version to retrieve
Returns:
Dictionary of all stored sections
"""
# Query for all sections of this company
sections = ["company_info", "funding", "founders", "product", "market", "traction", "full_document"]
results = {}
for section in sections:
doc_id = generate_doc_id(company_name, section, version)
try:
fetch_result = get_pinecone_index().fetch(ids=[doc_id])
if fetch_result.vectors and doc_id in fetch_result.vectors:
metadata = fetch_result.vectors[doc_id].metadata
if "full_content" in metadata:
results[section] = json.loads(metadata["full_content"]) if section != "full_document" else metadata["full_content"]
except Exception as e:
print(f"Warning: Could not fetch {section}: {e}")
return results
def semantic_search(query: str, company_name: str = None, top_k: int = 5) -> list[dict]:
"""
Perform semantic search across stored documents.
Args:
query: Search query
company_name: Optional filter by company name
top_k: Number of results to return
Returns:
List of matching documents with scores
"""
query_embedding = generate_embedding(query)
# Build filter
filter_dict = {}
if company_name:
filter_dict["company_name"] = {"$eq": company_name}
# Query Pinecone
results = get_pinecone_index().query(
vector=query_embedding,
top_k=top_k,
include_metadata=True,
filter=filter_dict if filter_dict else None
)
return [
{
"id": match.id,
"score": match.score,
"company": match.metadata.get("company_name"),
"section": match.metadata.get("section"),
"content": match.metadata.get("content")
}
for match in results.matches
]
def list_companies() -> list[str]:
"""
List all companies stored in the database.
Returns:
List of company names
"""
# Query with a generic vector to get all results
# This is a workaround since Pinecone doesn't have a "list all" function
dummy_embedding = [0.0] * 1536
results = get_pinecone_index().query(
vector=dummy_embedding,
top_k=100,
include_metadata=True
)
companies = set()
for match in results.matches:
if match.metadata and "company_name" in match.metadata:
companies.add(match.metadata["company_name"])
return list(companies)
def delete_company(company_name: str) -> dict:
"""
Delete all vectors for a company.
Args:
company_name: Name of the company to delete
Returns:
Status dictionary
"""
sections = ["company_info", "funding", "founders", "product", "market", "traction", "full_document"]
versions = ["v0", "v0.1", "v1"]
ids_to_delete = []
for section in sections:
for version in versions:
ids_to_delete.append(generate_doc_id(company_name, section, version))
get_pinecone_index().delete(ids=ids_to_delete)
return {
"status": "success",
"company_name": company_name,
"ids_deleted": len(ids_to_delete)
}
if __name__ == "__main__":
# Test connection
print("Testing Pinecone connection...")
stats = get_pinecone_index().describe_index_stats()
print(f"Index stats: {stats}")