Spaces:
Build error
Build error
| # utils/storage.py | |
| import os | |
| import shutil | |
| import json | |
| from pathlib import Path | |
| from typing import Optional | |
| import faiss | |
| import pickle | |
| import streamlit as st | |
| from datetime import datetime | |
| import numpy as np | |
| class PersistentStorage: | |
| """Handles persistent storage for the application.""" | |
| def __init__(self): | |
| # Base paths | |
| self.base_path = Path("/data") | |
| # Create necessary subdirectories | |
| self.db_path = self.base_path / "database" | |
| self.files_path = self.base_path / "files" | |
| self.vectorstore_path = self.base_path / "vectorstore" | |
| self.metadata_path = self.base_path / "metadata" | |
| # Ensure directories exist | |
| self._create_directories() | |
| def _create_directories(self): | |
| """Create necessary directory structure.""" | |
| for path in [self.db_path, self.files_path, self.vectorstore_path, self.metadata_path]: | |
| path.mkdir(parents=True, exist_ok=True) | |
| def get_db_path(self) -> str: | |
| """Get the path to the SQLite database file.""" | |
| return str(self.db_path / "rfp_analysis.db") | |
| def save_uploaded_file(self, uploaded_file, collection_id: Optional[int] = None) -> Path: | |
| """Save an uploaded file to persistent storage.""" | |
| # Create collection subdirectory if needed | |
| if collection_id: | |
| save_dir = self.files_path / str(collection_id) | |
| save_dir.mkdir(exist_ok=True) | |
| else: | |
| save_dir = self.files_path | |
| # Create timestamped filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"{timestamp}_{uploaded_file.name}" | |
| file_path = save_dir / filename | |
| # Save file | |
| with file_path.open("wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| # Save metadata | |
| metadata = { | |
| "original_name": uploaded_file.name, | |
| "upload_time": timestamp, | |
| "collection_id": collection_id, | |
| "size": uploaded_file.size, | |
| "type": uploaded_file.type | |
| } | |
| self._save_metadata(file_path.stem, metadata) | |
| return file_path | |
| def _save_metadata(self, file_id: str, metadata: dict): | |
| """Save metadata for a file.""" | |
| metadata_file = self.metadata_path / f"{file_id}.json" | |
| with metadata_file.open("w") as f: | |
| json.dump(metadata, f) | |
| def save_vectorstore(self, vectorstore, collection_id: Optional[int] = None): | |
| """Save FAISS vector store to persistent storage.""" | |
| # Determine save path | |
| if collection_id: | |
| save_path = self.vectorstore_path / f"collection_{collection_id}" | |
| else: | |
| save_path = self.vectorstore_path / "main" | |
| save_path.mkdir(exist_ok=True) | |
| # Save the index | |
| faiss.write_index(vectorstore.index, str(save_path / "index.faiss")) | |
| # Save the documents and metadata | |
| with (save_path / "store.pkl").open("wb") as f: | |
| store_data = { | |
| "documents": vectorstore.docstore._dict, | |
| "index_to_docstore_id": vectorstore.index_to_docstore_id | |
| } | |
| pickle.dump(store_data, f) | |
| def load_vectorstore(self, collection_id: Optional[int] = None): | |
| """Load FAISS vector store from persistent storage.""" | |
| # Determine load path | |
| if collection_id: | |
| load_path = self.vectorstore_path / f"collection_{collection_id}" | |
| else: | |
| load_path = self.vectorstore_path / "main" | |
| if not load_path.exists(): | |
| return None | |
| try: | |
| # Load the index | |
| index = faiss.read_index(str(load_path / "index.faiss")) | |
| # Load the documents and metadata | |
| with (load_path / "store.pkl").open("rb") as f: | |
| store_data = pickle.load(f) | |
| # Reconstruct the vector store | |
| vectorstore = FAISS( | |
| embedding_function=get_embeddings_model(), | |
| index=index, | |
| docstore=store_data["documents"], | |
| index_to_docstore_id=store_data["index_to_docstore_id"] | |
| ) | |
| return vectorstore | |
| except Exception as e: | |
| st.error(f"Error loading vector store: {e}") | |
| return None | |
| def get_file_path(self, file_id: str, collection_id: Optional[int] = None) -> Optional[Path]: | |
| """Get the path to a stored file.""" | |
| if collection_id: | |
| file_path = self.files_path / str(collection_id) / file_id | |
| else: | |
| file_path = self.files_path / file_id | |
| return file_path if file_path.exists() else None | |
| def cleanup_old_files(self, max_age_days: int = 30): | |
| """Clean up files older than specified days.""" | |
| current_time = datetime.now() | |
| for file_path in self.files_path.rglob("*"): | |
| if file_path.is_file(): | |
| file_age = current_time - datetime.fromtimestamp(file_path.stat().st_mtime) | |
| if file_age.days > max_age_days: | |
| file_path.unlink() | |
| # Remove associated metadata | |
| metadata_file = self.metadata_path / f"{file_path.stem}.json" | |
| if metadata_file.exists(): | |
| metadata_file.unlink() | |
| # Update database.py to use persistent storage | |
| def create_connection(storage): | |
| """Create database connection using persistent storage.""" | |
| try: | |
| conn = sqlite3.connect(storage.get_db_path(), check_same_thread=False) | |
| return conn | |
| except Error as e: | |
| st.error(f"Failed to connect to database: {e}") | |
| return None | |
| # Update document handling to use persistent storage | |
| def handle_document_upload(uploaded_files, **kwargs): | |
| try: | |
| storage = PersistentStorage() | |
| collection_id = kwargs.get('collection_id') | |
| for uploaded_file in uploaded_files: | |
| # Save file to persistent storage | |
| file_path = storage.save_uploaded_file(uploaded_file, collection_id) | |
| # Process document | |
| chunks, content = process_document(str(file_path)) | |
| # Store in database | |
| doc_id = insert_document(st.session_state.db_conn, uploaded_file.name, content) | |
| # Add to collection if specified | |
| if collection_id: | |
| add_document_to_collection(st.session_state.db_conn, doc_id, collection_id) | |
| # Update vector store | |
| vector_store = process_chunks_to_vectorstore(chunks) | |
| storage.save_vectorstore(vector_store, collection_id) | |
| return True | |
| except Exception as e: | |
| st.error(f"Error processing documents: {e}") | |
| return False |