Spaces:
Sleeping
Sleeping
File size: 4,892 Bytes
78e8dd4 0cfa3a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
"""
Vector Store Management Module
Handles creation, file upload, and management of OpenAI vector stores
"""
from typing import Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import concurrent.futures
import os
from openai import OpenAI
class VectorStoreManager:
"""Manages OpenAI vector store operations"""
def __init__(self, client: OpenAI):
self.client = client
def create_vector_store(self, store_name: str) -> Optional[Dict]:
"""
Create a Vector Store on OpenAI's servers
Args:
store_name: Name for the vector store
Returns:
Dictionary with vector store details or None if failed
"""
try:
vector_store = self.client.vector_stores.create(name=store_name)
details = {
"id": vector_store.id,
"name": vector_store.name,
"created_at": vector_store.created_at,
"file_count": vector_store.file_counts.completed
}
print(f"β
Vector store created: {details}")
return details
except Exception as e:
print(f"β Error creating vector store: {e}")
return None
def upload_single_pdf(self, file_path: str, vector_store_id: str) -> Dict:
"""
Upload a single PDF file to the vector store
Args:
file_path: Path to the PDF file
vector_store_id: ID of the vector store
Returns:
Dictionary with upload status
"""
file_name = os.path.basename(file_path)
try:
# Create file
with open(file_path, 'rb') as f:
file_response = self.client.files.create(
file=f,
purpose="assistants"
)
# Attach to vector store
attach_response = self.client.vector_stores.files.create(
vector_store_id=vector_store_id,
file_id=file_response.id
)
return {"file": file_name, "status": "success"}
except Exception as e:
print(f"β Error uploading {file_name}: {str(e)}")
return {"file": file_name, "status": "failed", "error": str(e)}
def upload_pdf_files(self, pdf_files: List[str], vector_store_id: str,
max_workers: int = 10) -> Dict:
"""
Upload multiple PDF files to vector store in parallel
Args:
pdf_files: List of PDF file paths
vector_store_id: ID of the vector store
max_workers: Maximum number of parallel workers
Returns:
Dictionary with upload statistics
"""
stats = {
"total_files": len(pdf_files),
"successful_uploads": 0,
"failed_uploads": 0,
"errors": []
}
if not pdf_files:
print("β οΈ No PDF files to upload")
return stats
print(f"π€ Uploading {len(pdf_files)} PDF files in parallel...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.upload_single_pdf, file_path, vector_store_id): file_path
for file_path in pdf_files
}
for future in tqdm(concurrent.futures.as_completed(futures),
total=len(pdf_files), desc="Uploading"):
result = future.result()
if result["status"] == "success":
stats["successful_uploads"] += 1
else:
stats["failed_uploads"] += 1
stats["errors"].append(result)
print(f"β
Upload complete: {stats['successful_uploads']}/{stats['total_files']} successful")
return stats
def search_vector_store(self, query: str, vector_store_id: str,
max_results: int = 10):
"""
Search the vector store directly
Args:
query: Search query
vector_store_id: ID of the vector store
max_results: Maximum number of results
Returns:
Search results
"""
try:
search_results = self.client.vector_stores.search(
vector_store_id=vector_store_id,
query=query
)
return search_results
except Exception as e:
print(f"β Error searching vector store: {e}")
return None
|