Spaces:
Sleeping
Sleeping
| """ | |
| Vector Store Management Module | |
| Handles creation, file upload, and management of OpenAI vector stores | |
| """ | |
| from typing import Dict, List, Optional | |
| from concurrent.futures import ThreadPoolExecutor | |
| from tqdm import tqdm | |
| import concurrent.futures | |
| import os | |
| from openai import OpenAI | |
| class VectorStoreManager: | |
| """Manages OpenAI vector store operations""" | |
| def __init__(self, client: OpenAI): | |
| self.client = client | |
| def create_vector_store(self, store_name: str) -> Optional[Dict]: | |
| """ | |
| Create a Vector Store on OpenAI's servers | |
| Args: | |
| store_name: Name for the vector store | |
| Returns: | |
| Dictionary with vector store details or None if failed | |
| """ | |
| try: | |
| vector_store = self.client.vector_stores.create(name=store_name) | |
| details = { | |
| "id": vector_store.id, | |
| "name": vector_store.name, | |
| "created_at": vector_store.created_at, | |
| "file_count": vector_store.file_counts.completed | |
| } | |
| print(f"β Vector store created: {details}") | |
| return details | |
| except Exception as e: | |
| print(f"β Error creating vector store: {e}") | |
| return None | |
| def upload_single_pdf(self, file_path: str, vector_store_id: str) -> Dict: | |
| """ | |
| Upload a single PDF file to the vector store | |
| Args: | |
| file_path: Path to the PDF file | |
| vector_store_id: ID of the vector store | |
| Returns: | |
| Dictionary with upload status | |
| """ | |
| file_name = os.path.basename(file_path) | |
| try: | |
| # Create file | |
| with open(file_path, 'rb') as f: | |
| file_response = self.client.files.create( | |
| file=f, | |
| purpose="assistants" | |
| ) | |
| # Attach to vector store | |
| attach_response = self.client.vector_stores.files.create( | |
| vector_store_id=vector_store_id, | |
| file_id=file_response.id | |
| ) | |
| return {"file": file_name, "status": "success"} | |
| except Exception as e: | |
| print(f"β Error uploading {file_name}: {str(e)}") | |
| return {"file": file_name, "status": "failed", "error": str(e)} | |
| def upload_pdf_files(self, pdf_files: List[str], vector_store_id: str, | |
| max_workers: int = 10) -> Dict: | |
| """ | |
| Upload multiple PDF files to vector store in parallel | |
| Args: | |
| pdf_files: List of PDF file paths | |
| vector_store_id: ID of the vector store | |
| max_workers: Maximum number of parallel workers | |
| Returns: | |
| Dictionary with upload statistics | |
| """ | |
| stats = { | |
| "total_files": len(pdf_files), | |
| "successful_uploads": 0, | |
| "failed_uploads": 0, | |
| "errors": [] | |
| } | |
| if not pdf_files: | |
| print("β οΈ No PDF files to upload") | |
| return stats | |
| print(f"π€ Uploading {len(pdf_files)} PDF files in parallel...") | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| futures = { | |
| executor.submit(self.upload_single_pdf, file_path, vector_store_id): file_path | |
| for file_path in pdf_files | |
| } | |
| for future in tqdm(concurrent.futures.as_completed(futures), | |
| total=len(pdf_files), desc="Uploading"): | |
| result = future.result() | |
| if result["status"] == "success": | |
| stats["successful_uploads"] += 1 | |
| else: | |
| stats["failed_uploads"] += 1 | |
| stats["errors"].append(result) | |
| print(f"β Upload complete: {stats['successful_uploads']}/{stats['total_files']} successful") | |
| return stats | |
| def search_vector_store(self, query: str, vector_store_id: str, | |
| max_results: int = 10): | |
| """ | |
| Search the vector store directly | |
| Args: | |
| query: Search query | |
| vector_store_id: ID of the vector store | |
| max_results: Maximum number of results | |
| Returns: | |
| Search results | |
| """ | |
| try: | |
| search_results = self.client.vector_stores.search( | |
| vector_store_id=vector_store_id, | |
| query=query | |
| ) | |
| return search_results | |
| except Exception as e: | |
| print(f"β Error searching vector store: {e}") | |
| return None | |