Digi-Biz / backend /utils /storage_manager.py
Deployment Bot
Automated deployment to Hugging Face
255cbd1
"""
Storage management for extracted files
"""
import os
import shutil
from pathlib import Path
from typing import Optional
from datetime import datetime
from backend.models.schemas import FileDiscoveryOutput
class StorageManager:
"""
Manages storage directories and file organization
"""
def __init__(
self,
storage_base: str = "./storage",
uploads_dir: str = "uploads",
extracted_dir: str = "extracted",
profiles_dir: str = "profiles",
index_dir: str = "index",
temp_dir: str = "temp"
):
self.storage_base = Path(storage_base)
self.uploads_dir = self.storage_base / uploads_dir
self.extracted_dir = self.storage_base / extracted_dir
self.profiles_dir = self.storage_base / profiles_dir
self.index_dir = self.storage_base / index_dir
self.temp_dir = self.storage_base / temp_dir
# Ensure directories exist
self._ensure_directories()
def _ensure_directories(self):
"""
Create storage directories if they don't exist
"""
for directory in [
self.storage_base,
self.uploads_dir,
self.extracted_dir,
self.profiles_dir,
self.index_dir,
self.temp_dir
]:
directory.mkdir(parents=True, exist_ok=True)
def create_job_directory(self, job_id: str) -> Path:
"""
Create extraction directory for a job
Args:
job_id: Unique job identifier
Returns:
Path to created directory
"""
job_dir = self.extracted_dir / job_id
if job_dir.exists():
# Clean existing directory
shutil.rmtree(job_dir)
job_dir.mkdir(parents=True, exist_ok=True)
# Create subdirectories for organization
(job_dir / "documents").mkdir(exist_ok=True)
(job_dir / "spreadsheets").mkdir(exist_ok=True)
(job_dir / "images").mkdir(exist_ok=True)
(job_dir / "videos").mkdir(exist_ok=True)
(job_dir / "unknown").mkdir(exist_ok=True)
return job_dir
def get_job_directory(self, job_id: str) -> Path:
"""
Get job directory path
Args:
job_id: Unique job identifier
Returns:
Path to job directory
"""
return self.extracted_dir / job_id
def get_upload_directory(self) -> Path:
"""
Get uploads directory
Returns:
Path to uploads directory
"""
return self.uploads_dir
def get_profile_path(self, job_id: str) -> Path:
"""
Get path for storing business profile
Args:
job_id: Unique job identifier
Returns:
Path to profile file
"""
self.profiles_dir.mkdir(parents=True, exist_ok=True)
return self.profiles_dir / f"{job_id}.json"
def get_index_path(self, job_id: str) -> Path:
"""
Get path for storing page index
Args:
job_id: Unique job identifier
Returns:
Path to index directory
"""
index_path = self.index_dir / job_id
index_path.mkdir(parents=True, exist_ok=True)
return index_path
def get_temp_directory(self, job_id: Optional[str] = None) -> Path:
"""
Get temporary directory
Args:
job_id: Optional job ID for job-specific temp dir
Returns:
Path to temp directory
"""
if job_id:
temp_path = self.temp_dir / job_id
temp_path.mkdir(parents=True, exist_ok=True)
return temp_path
return self.temp_dir
def cleanup_job_directory(self, job_id: str, keep_profiles: bool = True) -> bool:
"""
Clean up job extraction directory
Args:
job_id: Unique job identifier
keep_profiles: If True, don't delete profile files
Returns:
True if cleanup successful
"""
job_dir = self.get_job_directory(job_id)
if not job_dir.exists():
return True
try:
if keep_profiles:
# Move profiles to safe location before cleanup
profiles_subdir = job_dir / "profiles"
if profiles_subdir.exists():
shutil.copytree(
profiles_subdir,
self.profiles_dir / job_id,
dirs_exist_ok=True
)
shutil.rmtree(job_dir)
return True
except Exception as e:
print(f"Error cleaning up job directory: {e}")
return False
def cleanup_temp_directory(self, job_id: str) -> bool:
"""
Clean up job temporary directory
Args:
job_id: Unique job identifier
Returns:
True if cleanup successful
"""
temp_dir = self.get_temp_directory(job_id)
if not temp_dir.exists():
return True
try:
shutil.rmtree(temp_dir)
return True
except Exception as e:
print(f"Error cleaning up temp directory: {e}")
return False
def get_storage_stats(self) -> dict:
"""
Get storage usage statistics
Returns:
Dictionary with storage statistics
"""
def get_dir_size(path: Path) -> int:
total = 0
if path.exists():
for entry in path.rglob('*'):
if entry.is_file():
total += entry.stat().st_size
return total
return {
"storage_base": str(self.storage_base),
"uploads_size_bytes": get_dir_size(self.uploads_dir),
"extracted_size_bytes": get_dir_size(self.extracted_dir),
"profiles_size_bytes": get_dir_size(self.profiles_dir),
"index_size_bytes": get_dir_size(self.index_dir),
"temp_size_bytes": get_dir_size(self.temp_dir),
}
def organize_extracted_file(
self,
file_path: Path,
file_category: str,
job_id: str
) -> Path:
"""
Move extracted file to appropriate category subdirectory
Args:
file_path: Path to extracted file
file_category: Category (document, spreadsheet, image, video, unknown)
job_id: Unique job identifier
Returns:
New path of organized file
"""
job_dir = self.get_job_directory(job_id)
category_dir = job_dir / file_category
if not category_dir.exists():
category_dir.mkdir(parents=True, exist_ok=True)
# Move file to category directory
new_path = category_dir / file_path.name
# Handle name conflicts
if new_path.exists():
stem = file_path.stem
suffix = file_path.suffix
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
new_path = category_dir / f"{stem}_{timestamp}{suffix}"
shutil.move(str(file_path), str(new_path))
return new_path
def save_discovery_output(
self,
output: FileDiscoveryOutput,
job_id: str
) -> Path:
"""
Save file discovery output to job directory
Args:
output: File discovery output
job_id: Unique job identifier
Returns:
Path to saved metadata file
"""
job_dir = self.get_job_directory(job_id)
metadata_path = job_dir / "discovery_metadata.json"
import json
with open(metadata_path, 'w') as f:
# Convert Pydantic model to dict
output_dict = output.model_dump(mode='json')
json.dump(output_dict, f, indent=2)
return metadata_path