""" Vector Store Management Module Handles creation, file upload, and management of OpenAI vector stores """ from typing import Dict, List, Optional from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm import concurrent.futures import os from openai import OpenAI class VectorStoreManager: """Manages OpenAI vector store operations""" def __init__(self, client: OpenAI): self.client = client def create_vector_store(self, store_name: str) -> Optional[Dict]: """ Create a Vector Store on OpenAI's servers Args: store_name: Name for the vector store Returns: Dictionary with vector store details or None if failed """ try: vector_store = self.client.vector_stores.create(name=store_name) details = { "id": vector_store.id, "name": vector_store.name, "created_at": vector_store.created_at, "file_count": vector_store.file_counts.completed } print(f"✅ Vector store created: {details}") return details except Exception as e: print(f"❌ Error creating vector store: {e}") return None def upload_single_pdf(self, file_path: str, vector_store_id: str) -> Dict: """ Upload a single PDF file to the vector store Args: file_path: Path to the PDF file vector_store_id: ID of the vector store Returns: Dictionary with upload status """ file_name = os.path.basename(file_path) try: # Create file with open(file_path, 'rb') as f: file_response = self.client.files.create( file=f, purpose="assistants" ) # Attach to vector store attach_response = self.client.vector_stores.files.create( vector_store_id=vector_store_id, file_id=file_response.id ) return {"file": file_name, "status": "success"} except Exception as e: print(f"❌ Error uploading {file_name}: {str(e)}") return {"file": file_name, "status": "failed", "error": str(e)} def upload_pdf_files(self, pdf_files: List[str], vector_store_id: str, max_workers: int = 10) -> Dict: """ Upload multiple PDF files to vector store in parallel Args: pdf_files: List of PDF file paths vector_store_id: ID of the vector store max_workers: Maximum number of parallel workers Returns: Dictionary with upload statistics """ stats = { "total_files": len(pdf_files), "successful_uploads": 0, "failed_uploads": 0, "errors": [] } if not pdf_files: print("⚠️ No PDF files to upload") return stats print(f"📤 Uploading {len(pdf_files)} PDF files in parallel...") with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(self.upload_single_pdf, file_path, vector_store_id): file_path for file_path in pdf_files } for future in tqdm(concurrent.futures.as_completed(futures), total=len(pdf_files), desc="Uploading"): result = future.result() if result["status"] == "success": stats["successful_uploads"] += 1 else: stats["failed_uploads"] += 1 stats["errors"].append(result) print(f"✅ Upload complete: {stats['successful_uploads']}/{stats['total_files']} successful") return stats def search_vector_store(self, query: str, vector_store_id: str, max_results: int = 10): """ Search the vector store directly Args: query: Search query vector_store_id: ID of the vector store max_results: Maximum number of results Returns: Search results """ try: search_results = self.client.vector_stores.search( vector_store_id=vector_store_id, query=query ) return search_results except Exception as e: print(f"❌ Error searching vector store: {e}") return None