CSRC-Car-Manual-RAG / src /vector_store.py
Bryceeee's picture
Upload 17 files
0cfa3a6 verified
"""
Vector Store Management Module
Handles creation, file upload, and management of OpenAI vector stores
"""
from typing import Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import concurrent.futures
import os
from openai import OpenAI
class VectorStoreManager:
"""Manages OpenAI vector store operations"""
def __init__(self, client: OpenAI):
self.client = client
def create_vector_store(self, store_name: str) -> Optional[Dict]:
"""
Create a Vector Store on OpenAI's servers
Args:
store_name: Name for the vector store
Returns:
Dictionary with vector store details or None if failed
"""
try:
vector_store = self.client.vector_stores.create(name=store_name)
details = {
"id": vector_store.id,
"name": vector_store.name,
"created_at": vector_store.created_at,
"file_count": vector_store.file_counts.completed
}
print(f"βœ… Vector store created: {details}")
return details
except Exception as e:
print(f"❌ Error creating vector store: {e}")
return None
def upload_single_pdf(self, file_path: str, vector_store_id: str) -> Dict:
"""
Upload a single PDF file to the vector store
Args:
file_path: Path to the PDF file
vector_store_id: ID of the vector store
Returns:
Dictionary with upload status
"""
file_name = os.path.basename(file_path)
try:
# Create file
with open(file_path, 'rb') as f:
file_response = self.client.files.create(
file=f,
purpose="assistants"
)
# Attach to vector store
attach_response = self.client.vector_stores.files.create(
vector_store_id=vector_store_id,
file_id=file_response.id
)
return {"file": file_name, "status": "success"}
except Exception as e:
print(f"❌ Error uploading {file_name}: {str(e)}")
return {"file": file_name, "status": "failed", "error": str(e)}
def upload_pdf_files(self, pdf_files: List[str], vector_store_id: str,
max_workers: int = 10) -> Dict:
"""
Upload multiple PDF files to vector store in parallel
Args:
pdf_files: List of PDF file paths
vector_store_id: ID of the vector store
max_workers: Maximum number of parallel workers
Returns:
Dictionary with upload statistics
"""
stats = {
"total_files": len(pdf_files),
"successful_uploads": 0,
"failed_uploads": 0,
"errors": []
}
if not pdf_files:
print("⚠️ No PDF files to upload")
return stats
print(f"πŸ“€ Uploading {len(pdf_files)} PDF files in parallel...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.upload_single_pdf, file_path, vector_store_id): file_path
for file_path in pdf_files
}
for future in tqdm(concurrent.futures.as_completed(futures),
total=len(pdf_files), desc="Uploading"):
result = future.result()
if result["status"] == "success":
stats["successful_uploads"] += 1
else:
stats["failed_uploads"] += 1
stats["errors"].append(result)
print(f"βœ… Upload complete: {stats['successful_uploads']}/{stats['total_files']} successful")
return stats
def search_vector_store(self, query: str, vector_store_id: str,
max_results: int = 10):
"""
Search the vector store directly
Args:
query: Search query
vector_store_id: ID of the vector store
max_results: Maximum number of results
Returns:
Search results
"""
try:
search_results = self.client.vector_stores.search(
vector_store_id=vector_store_id,
query=query
)
return search_results
except Exception as e:
print(f"❌ Error searching vector store: {e}")
return None