|
|
|
|
|
|
|
|
import os |
|
|
import streamlit as st |
|
|
from pathlib import Path |
|
|
|
|
|
def initialize_storage_structure(): |
|
|
"""Initialize the storage structure and verify its existence.""" |
|
|
|
|
|
if os.path.exists('/data'): |
|
|
base_path = Path('/data') |
|
|
else: |
|
|
|
|
|
base_path = Path(os.getcwd()) / 'data' |
|
|
|
|
|
try: |
|
|
|
|
|
directories = [ |
|
|
base_path / 'database', |
|
|
base_path / 'files', |
|
|
base_path / 'vectorstore', |
|
|
base_path / 'metadata' |
|
|
] |
|
|
|
|
|
|
|
|
for directory in directories: |
|
|
directory.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
st.write("Storage directories initialized at:", str(base_path)) |
|
|
st.write("Available directories:") |
|
|
for directory in directories: |
|
|
st.write(f"- {directory}: {'Exists' if directory.exists() else 'Failed to create'}") |
|
|
|
|
|
|
|
|
test_file = base_path / 'database' / '.test_write' |
|
|
try: |
|
|
test_file.touch() |
|
|
test_file.unlink() |
|
|
st.success("Storage system initialized successfully!") |
|
|
except Exception as e: |
|
|
st.error(f"Write permission test failed: {e}") |
|
|
|
|
|
return base_path |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error initializing storage structure: {e}") |
|
|
return None |
|
|
|
|
|
def verify_storage_access(base_path: Path): |
|
|
"""Verify storage access and permissions.""" |
|
|
try: |
|
|
|
|
|
checks = { |
|
|
'exists': base_path.exists(), |
|
|
'is_dir': base_path.is_dir(), |
|
|
'readable': os.access(base_path, os.R_OK), |
|
|
'writable': os.access(base_path, os.W_OK), |
|
|
'executable': os.access(base_path, os.X_OK) |
|
|
} |
|
|
|
|
|
|
|
|
st.write("Storage Access Verification:") |
|
|
for check, result in checks.items(): |
|
|
st.write(f"- {check}: {'✅' if result else '❌'}") |
|
|
|
|
|
|
|
|
return all(checks.values()) |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error verifying storage access: {e}") |
|
|
return False |