|
|
|
|
|
|
|
|
import os |
|
|
import shutil |
|
|
import json |
|
|
from pathlib import Path |
|
|
from typing import Optional |
|
|
import faiss |
|
|
import pickle |
|
|
import streamlit as st |
|
|
from datetime import datetime |
|
|
import numpy as np |
|
|
|
|
|
class PersistentStorage: |
|
|
"""Handles persistent storage for the application.""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.base_path = Path("/data") |
|
|
|
|
|
|
|
|
self.db_path = self.base_path / "database" |
|
|
self.files_path = self.base_path / "files" |
|
|
self.vectorstore_path = self.base_path / "vectorstore" |
|
|
self.metadata_path = self.base_path / "metadata" |
|
|
|
|
|
|
|
|
self._create_directories() |
|
|
|
|
|
def _create_directories(self): |
|
|
"""Create necessary directory structure.""" |
|
|
for path in [self.db_path, self.files_path, self.vectorstore_path, self.metadata_path]: |
|
|
path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def get_db_path(self) -> str: |
|
|
"""Get the path to the SQLite database file.""" |
|
|
return str(self.db_path / "rfp_analysis.db") |
|
|
|
|
|
def save_uploaded_file(self, uploaded_file, collection_id: Optional[int] = None) -> Path: |
|
|
"""Save an uploaded file to persistent storage.""" |
|
|
|
|
|
if collection_id: |
|
|
save_dir = self.files_path / str(collection_id) |
|
|
save_dir.mkdir(exist_ok=True) |
|
|
else: |
|
|
save_dir = self.files_path |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"{timestamp}_{uploaded_file.name}" |
|
|
file_path = save_dir / filename |
|
|
|
|
|
|
|
|
with file_path.open("wb") as f: |
|
|
f.write(uploaded_file.getbuffer()) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"original_name": uploaded_file.name, |
|
|
"upload_time": timestamp, |
|
|
"collection_id": collection_id, |
|
|
"size": uploaded_file.size, |
|
|
"type": uploaded_file.type |
|
|
} |
|
|
self._save_metadata(file_path.stem, metadata) |
|
|
|
|
|
return file_path |
|
|
|
|
|
def _save_metadata(self, file_id: str, metadata: dict): |
|
|
"""Save metadata for a file.""" |
|
|
metadata_file = self.metadata_path / f"{file_id}.json" |
|
|
with metadata_file.open("w") as f: |
|
|
json.dump(metadata, f) |
|
|
|
|
|
def save_vectorstore(self, vectorstore, collection_id: Optional[int] = None): |
|
|
"""Save FAISS vector store to persistent storage.""" |
|
|
|
|
|
if collection_id: |
|
|
save_path = self.vectorstore_path / f"collection_{collection_id}" |
|
|
else: |
|
|
save_path = self.vectorstore_path / "main" |
|
|
|
|
|
save_path.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
faiss.write_index(vectorstore.index, str(save_path / "index.faiss")) |
|
|
|
|
|
|
|
|
with (save_path / "store.pkl").open("wb") as f: |
|
|
store_data = { |
|
|
"documents": vectorstore.docstore._dict, |
|
|
"index_to_docstore_id": vectorstore.index_to_docstore_id |
|
|
} |
|
|
pickle.dump(store_data, f) |
|
|
|
|
|
def load_vectorstore(self, collection_id: Optional[int] = None): |
|
|
"""Load FAISS vector store from persistent storage.""" |
|
|
|
|
|
if collection_id: |
|
|
load_path = self.vectorstore_path / f"collection_{collection_id}" |
|
|
else: |
|
|
load_path = self.vectorstore_path / "main" |
|
|
|
|
|
if not load_path.exists(): |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
index = faiss.read_index(str(load_path / "index.faiss")) |
|
|
|
|
|
|
|
|
with (load_path / "store.pkl").open("rb") as f: |
|
|
store_data = pickle.load(f) |
|
|
|
|
|
|
|
|
vectorstore = FAISS( |
|
|
embedding_function=get_embeddings_model(), |
|
|
index=index, |
|
|
docstore=store_data["documents"], |
|
|
index_to_docstore_id=store_data["index_to_docstore_id"] |
|
|
) |
|
|
|
|
|
return vectorstore |
|
|
except Exception as e: |
|
|
st.error(f"Error loading vector store: {e}") |
|
|
return None |
|
|
|
|
|
def get_file_path(self, file_id: str, collection_id: Optional[int] = None) -> Optional[Path]: |
|
|
"""Get the path to a stored file.""" |
|
|
if collection_id: |
|
|
file_path = self.files_path / str(collection_id) / file_id |
|
|
else: |
|
|
file_path = self.files_path / file_id |
|
|
|
|
|
return file_path if file_path.exists() else None |
|
|
|
|
|
def cleanup_old_files(self, max_age_days: int = 30): |
|
|
"""Clean up files older than specified days.""" |
|
|
current_time = datetime.now() |
|
|
|
|
|
for file_path in self.files_path.rglob("*"): |
|
|
if file_path.is_file(): |
|
|
file_age = current_time - datetime.fromtimestamp(file_path.stat().st_mtime) |
|
|
if file_age.days > max_age_days: |
|
|
file_path.unlink() |
|
|
|
|
|
|
|
|
metadata_file = self.metadata_path / f"{file_path.stem}.json" |
|
|
if metadata_file.exists(): |
|
|
metadata_file.unlink() |
|
|
|
|
|
|
|
|
def create_connection(storage): |
|
|
"""Create database connection using persistent storage.""" |
|
|
try: |
|
|
conn = sqlite3.connect(storage.get_db_path(), check_same_thread=False) |
|
|
return conn |
|
|
except Error as e: |
|
|
st.error(f"Failed to connect to database: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def handle_document_upload(uploaded_files, **kwargs): |
|
|
try: |
|
|
storage = PersistentStorage() |
|
|
collection_id = kwargs.get('collection_id') |
|
|
|
|
|
for uploaded_file in uploaded_files: |
|
|
|
|
|
file_path = storage.save_uploaded_file(uploaded_file, collection_id) |
|
|
|
|
|
|
|
|
chunks, content = process_document(str(file_path)) |
|
|
|
|
|
|
|
|
doc_id = insert_document(st.session_state.db_conn, uploaded_file.name, content) |
|
|
|
|
|
|
|
|
if collection_id: |
|
|
add_document_to_collection(st.session_state.db_conn, doc_id, collection_id) |
|
|
|
|
|
|
|
|
vector_store = process_chunks_to_vectorstore(chunks) |
|
|
storage.save_vectorstore(vector_store, collection_id) |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
st.error(f"Error processing documents: {e}") |
|
|
return False |