Spaces:
Sleeping
Sleeping
| """ | |
| Pinecone Vector Database Integration for Strike Capital. | |
| Handles document storage, embedding, and retrieval. | |
| """ | |
| import json | |
| import hashlib | |
| from datetime import datetime | |
| from openai import OpenAI | |
| from pinecone import Pinecone | |
| from config import ( | |
| OPENAI_API_KEY, | |
| OPENAI_EMBEDDING_MODEL, | |
| PINECONE_API_KEY, | |
| PINECONE_HOST, | |
| PINECONE_INDEX_NAME | |
| ) | |
| # Lazy initialization - clients created on first use | |
| _openai_client = None | |
| _pinecone_index = None | |
| def get_openai_client(): | |
| """Get or create OpenAI client.""" | |
| global _openai_client | |
| if _openai_client is None: | |
| _openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
| return _openai_client | |
| def get_pinecone_index(): | |
| """Get or create Pinecone index connection.""" | |
| global _pinecone_index | |
| if _pinecone_index is None: | |
| if not PINECONE_HOST: | |
| raise ValueError("PINECONE_HOST environment variable is not set") | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| _pinecone_index = pc.Index(host=PINECONE_HOST) | |
| return _pinecone_index | |
| def generate_embedding(text: str) -> list[float]: | |
| """ | |
| Generate embedding for text using OpenAI text-embedding-3-small. | |
| Args: | |
| text: Text to embed | |
| Returns: | |
| List of floats representing the embedding vector (1536 dimensions) | |
| """ | |
| client = get_openai_client() | |
| response = client.embeddings.create( | |
| model=OPENAI_EMBEDDING_MODEL, | |
| input=text | |
| ) | |
| return response.data[0].embedding | |
| def generate_doc_id(company_name: str, section: str, version: str = "v0") -> str: | |
| """ | |
| Generate a unique document ID for Pinecone. | |
| Args: | |
| company_name: Name of the company | |
| section: Document section (e.g., "company_overview", "market") | |
| version: Document version (v0, v1, etc.) | |
| Returns: | |
| Unique document ID | |
| """ | |
| unique_string = f"{company_name}_{section}_{version}" | |
| return hashlib.md5(unique_string.encode()).hexdigest() | |
| def store_sentence_chunks(company_name: str, sentence_chunks: list[str], version: str = "v0") -> dict: | |
| """ | |
| Store sentence chunks in Pinecone with embeddings. | |
| Each sentence (with context window) becomes a separate vector. | |
| Args: | |
| company_name: Name of the company | |
| sentence_chunks: List of sentence chunks with context | |
| version: Document version | |
| Returns: | |
| Dictionary with storage status and IDs | |
| """ | |
| vectors_to_upsert = [] | |
| stored_ids = [] | |
| timestamp = datetime.now().isoformat() | |
| print(f"Storing {len(sentence_chunks)} sentence chunks for {company_name}...") | |
| # Process in batches of 100 to avoid rate limits | |
| batch_size = 100 | |
| for i, chunk_text in enumerate(sentence_chunks): | |
| if not chunk_text or len(chunk_text.strip()) < 20: | |
| continue | |
| # Generate unique ID for this chunk | |
| doc_id = f"{company_name}_{version}_chunk_{i}" | |
| doc_id = hashlib.md5(doc_id.encode()).hexdigest() | |
| # Generate embedding | |
| try: | |
| embedding = generate_embedding(chunk_text) | |
| except Exception as e: | |
| print(f"Error embedding chunk {i}: {e}") | |
| continue | |
| # Create vector with metadata | |
| vector = { | |
| "id": doc_id, | |
| "values": embedding, | |
| "metadata": { | |
| "company_name": company_name, | |
| "chunk_type": "sentence", | |
| "chunk_index": i, | |
| "version": version, | |
| "timestamp": timestamp, | |
| "content": chunk_text[:1000] | |
| } | |
| } | |
| vectors_to_upsert.append(vector) | |
| stored_ids.append(doc_id) | |
| # Upsert in batches | |
| if len(vectors_to_upsert) >= batch_size: | |
| get_pinecone_index().upsert(vectors=vectors_to_upsert) | |
| print(f"Upserted batch: {len(stored_ids)} vectors so far...") | |
| vectors_to_upsert = [] | |
| # Upsert remaining vectors | |
| if vectors_to_upsert: | |
| get_pinecone_index().upsert(vectors=vectors_to_upsert) | |
| print(f"Successfully stored {len(stored_ids)} sentence chunks") | |
| return { | |
| "status": "success", | |
| "company_name": company_name, | |
| "version": version, | |
| "chunks_stored": len(stored_ids), | |
| "ids": stored_ids | |
| } | |
| def store_extracted_data(company_name: str, extracted_data: dict, version: str = "v0") -> dict: | |
| """ | |
| Store extracted Harmonic data in Pinecone with embeddings. | |
| Creates section-level chunks with metadata. | |
| Args: | |
| company_name: Name of the company | |
| extracted_data: Dictionary of extracted data from PDF | |
| version: Document version | |
| Returns: | |
| Dictionary with storage status and IDs | |
| """ | |
| vectors_to_upsert = [] | |
| stored_ids = [] | |
| timestamp = datetime.now().isoformat() | |
| # Store each section as a separate vector | |
| sections = [ | |
| ("company_info", extracted_data.get("company_info", {})), | |
| ("funding", extracted_data.get("funding", {})), | |
| ("founders", extracted_data.get("founders", [])), | |
| ("product", extracted_data.get("product", {})), | |
| ("market", extracted_data.get("market", {})), | |
| ("traction", extracted_data.get("traction", {})) | |
| ] | |
| for section_name, section_data in sections: | |
| if not section_data: | |
| continue | |
| # Convert section data to text for embedding | |
| section_text = json.dumps(section_data, indent=2) | |
| # Skip empty sections | |
| if len(section_text) < 10: | |
| continue | |
| # Generate embedding | |
| embedding = generate_embedding(section_text) | |
| # Generate unique ID | |
| doc_id = generate_doc_id(company_name, section_name, version) | |
| # Create vector with metadata | |
| vector = { | |
| "id": doc_id, | |
| "values": embedding, | |
| "metadata": { | |
| "company_name": company_name, | |
| "section": section_name, | |
| "version": version, | |
| "timestamp": timestamp, | |
| "content": section_text[:1000], # Store truncated content in metadata | |
| "full_content": section_text | |
| } | |
| } | |
| vectors_to_upsert.append(vector) | |
| stored_ids.append(doc_id) | |
| # Upsert vectors to Pinecone | |
| if vectors_to_upsert: | |
| get_pinecone_index().upsert(vectors=vectors_to_upsert) | |
| return { | |
| "status": "success", | |
| "company_name": company_name, | |
| "version": version, | |
| "sections_stored": len(vectors_to_upsert), | |
| "ids": stored_ids | |
| } | |
| def store_v0_document(company_name: str, v0_document: str, version: str = "v0") -> dict: | |
| """ | |
| Store the generated V0 document as a single vector. | |
| Args: | |
| company_name: Name of the company | |
| v0_document: Full V0 document text (markdown) | |
| version: Document version | |
| Returns: | |
| Dictionary with storage status | |
| """ | |
| embedding = generate_embedding(v0_document[:8000]) # Truncate for embedding | |
| doc_id = generate_doc_id(company_name, "full_document", version) | |
| vector = { | |
| "id": doc_id, | |
| "values": embedding, | |
| "metadata": { | |
| "company_name": company_name, | |
| "section": "full_document", | |
| "version": version, | |
| "timestamp": datetime.now().isoformat(), | |
| "document_type": "v0_dd_doc", | |
| "content": v0_document[:1000], | |
| "full_content": v0_document | |
| } | |
| } | |
| get_pinecone_index().upsert(vectors=[vector]) | |
| return { | |
| "status": "success", | |
| "id": doc_id | |
| } | |
| def retrieve_company_data(company_name: str, version: str = "v0") -> dict: | |
| """ | |
| Retrieve all stored data for a company. | |
| Args: | |
| company_name: Name of the company | |
| version: Document version to retrieve | |
| Returns: | |
| Dictionary of all stored sections | |
| """ | |
| # Query for all sections of this company | |
| sections = ["company_info", "funding", "founders", "product", "market", "traction", "full_document"] | |
| results = {} | |
| for section in sections: | |
| doc_id = generate_doc_id(company_name, section, version) | |
| try: | |
| fetch_result = get_pinecone_index().fetch(ids=[doc_id]) | |
| if fetch_result.vectors and doc_id in fetch_result.vectors: | |
| metadata = fetch_result.vectors[doc_id].metadata | |
| if "full_content" in metadata: | |
| results[section] = json.loads(metadata["full_content"]) if section != "full_document" else metadata["full_content"] | |
| except Exception as e: | |
| print(f"Warning: Could not fetch {section}: {e}") | |
| return results | |
| def semantic_search(query: str, company_name: str = None, top_k: int = 5) -> list[dict]: | |
| """ | |
| Perform semantic search across stored documents. | |
| Args: | |
| query: Search query | |
| company_name: Optional filter by company name | |
| top_k: Number of results to return | |
| Returns: | |
| List of matching documents with scores | |
| """ | |
| query_embedding = generate_embedding(query) | |
| # Build filter | |
| filter_dict = {} | |
| if company_name: | |
| filter_dict["company_name"] = {"$eq": company_name} | |
| # Query Pinecone | |
| results = get_pinecone_index().query( | |
| vector=query_embedding, | |
| top_k=top_k, | |
| include_metadata=True, | |
| filter=filter_dict if filter_dict else None | |
| ) | |
| return [ | |
| { | |
| "id": match.id, | |
| "score": match.score, | |
| "company": match.metadata.get("company_name"), | |
| "section": match.metadata.get("section"), | |
| "content": match.metadata.get("content") | |
| } | |
| for match in results.matches | |
| ] | |
| def list_companies() -> list[str]: | |
| """ | |
| List all companies stored in the database. | |
| Returns: | |
| List of company names | |
| """ | |
| # Query with a generic vector to get all results | |
| # This is a workaround since Pinecone doesn't have a "list all" function | |
| dummy_embedding = [0.0] * 1536 | |
| results = get_pinecone_index().query( | |
| vector=dummy_embedding, | |
| top_k=100, | |
| include_metadata=True | |
| ) | |
| companies = set() | |
| for match in results.matches: | |
| if match.metadata and "company_name" in match.metadata: | |
| companies.add(match.metadata["company_name"]) | |
| return list(companies) | |
| def delete_company(company_name: str) -> dict: | |
| """ | |
| Delete all vectors for a company. | |
| Args: | |
| company_name: Name of the company to delete | |
| Returns: | |
| Status dictionary | |
| """ | |
| sections = ["company_info", "funding", "founders", "product", "market", "traction", "full_document"] | |
| versions = ["v0", "v0.1", "v1"] | |
| ids_to_delete = [] | |
| for section in sections: | |
| for version in versions: | |
| ids_to_delete.append(generate_doc_id(company_name, section, version)) | |
| get_pinecone_index().delete(ids=ids_to_delete) | |
| return { | |
| "status": "success", | |
| "company_name": company_name, | |
| "ids_deleted": len(ids_to_delete) | |
| } | |
| if __name__ == "__main__": | |
| # Test connection | |
| print("Testing Pinecone connection...") | |
| stats = get_pinecone_index().describe_index_stats() | |
| print(f"Index stats: {stats}") | |