| """ |
| Storage management for extracted files |
| """ |
| import os |
| import shutil |
| from pathlib import Path |
| from typing import Optional |
| from datetime import datetime |
|
|
| from backend.models.schemas import FileDiscoveryOutput |
|
|
|
|
| class StorageManager: |
| """ |
| Manages storage directories and file organization |
| """ |
| |
| def __init__( |
| self, |
| storage_base: str = "./storage", |
| uploads_dir: str = "uploads", |
| extracted_dir: str = "extracted", |
| profiles_dir: str = "profiles", |
| index_dir: str = "index", |
| temp_dir: str = "temp" |
| ): |
| self.storage_base = Path(storage_base) |
| self.uploads_dir = self.storage_base / uploads_dir |
| self.extracted_dir = self.storage_base / extracted_dir |
| self.profiles_dir = self.storage_base / profiles_dir |
| self.index_dir = self.storage_base / index_dir |
| self.temp_dir = self.storage_base / temp_dir |
| |
| |
| self._ensure_directories() |
| |
| def _ensure_directories(self): |
| """ |
| Create storage directories if they don't exist |
| """ |
| for directory in [ |
| self.storage_base, |
| self.uploads_dir, |
| self.extracted_dir, |
| self.profiles_dir, |
| self.index_dir, |
| self.temp_dir |
| ]: |
| directory.mkdir(parents=True, exist_ok=True) |
| |
| def create_job_directory(self, job_id: str) -> Path: |
| """ |
| Create extraction directory for a job |
| |
| Args: |
| job_id: Unique job identifier |
| |
| Returns: |
| Path to created directory |
| """ |
| job_dir = self.extracted_dir / job_id |
| |
| if job_dir.exists(): |
| |
| shutil.rmtree(job_dir) |
| |
| job_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| (job_dir / "documents").mkdir(exist_ok=True) |
| (job_dir / "spreadsheets").mkdir(exist_ok=True) |
| (job_dir / "images").mkdir(exist_ok=True) |
| (job_dir / "videos").mkdir(exist_ok=True) |
| (job_dir / "unknown").mkdir(exist_ok=True) |
| |
| return job_dir |
| |
| def get_job_directory(self, job_id: str) -> Path: |
| """ |
| Get job directory path |
| |
| Args: |
| job_id: Unique job identifier |
| |
| Returns: |
| Path to job directory |
| """ |
| return self.extracted_dir / job_id |
| |
| def get_upload_directory(self) -> Path: |
| """ |
| Get uploads directory |
| |
| Returns: |
| Path to uploads directory |
| """ |
| return self.uploads_dir |
| |
| def get_profile_path(self, job_id: str) -> Path: |
| """ |
| Get path for storing business profile |
| |
| Args: |
| job_id: Unique job identifier |
| |
| Returns: |
| Path to profile file |
| """ |
| self.profiles_dir.mkdir(parents=True, exist_ok=True) |
| return self.profiles_dir / f"{job_id}.json" |
| |
| def get_index_path(self, job_id: str) -> Path: |
| """ |
| Get path for storing page index |
| |
| Args: |
| job_id: Unique job identifier |
| |
| Returns: |
| Path to index directory |
| """ |
| index_path = self.index_dir / job_id |
| index_path.mkdir(parents=True, exist_ok=True) |
| return index_path |
| |
| def get_temp_directory(self, job_id: Optional[str] = None) -> Path: |
| """ |
| Get temporary directory |
| |
| Args: |
| job_id: Optional job ID for job-specific temp dir |
| |
| Returns: |
| Path to temp directory |
| """ |
| if job_id: |
| temp_path = self.temp_dir / job_id |
| temp_path.mkdir(parents=True, exist_ok=True) |
| return temp_path |
| return self.temp_dir |
| |
| def cleanup_job_directory(self, job_id: str, keep_profiles: bool = True) -> bool: |
| """ |
| Clean up job extraction directory |
| |
| Args: |
| job_id: Unique job identifier |
| keep_profiles: If True, don't delete profile files |
| |
| Returns: |
| True if cleanup successful |
| """ |
| job_dir = self.get_job_directory(job_id) |
| |
| if not job_dir.exists(): |
| return True |
| |
| try: |
| if keep_profiles: |
| |
| profiles_subdir = job_dir / "profiles" |
| if profiles_subdir.exists(): |
| shutil.copytree( |
| profiles_subdir, |
| self.profiles_dir / job_id, |
| dirs_exist_ok=True |
| ) |
| |
| shutil.rmtree(job_dir) |
| return True |
| |
| except Exception as e: |
| print(f"Error cleaning up job directory: {e}") |
| return False |
| |
| def cleanup_temp_directory(self, job_id: str) -> bool: |
| """ |
| Clean up job temporary directory |
| |
| Args: |
| job_id: Unique job identifier |
| |
| Returns: |
| True if cleanup successful |
| """ |
| temp_dir = self.get_temp_directory(job_id) |
| |
| if not temp_dir.exists(): |
| return True |
| |
| try: |
| shutil.rmtree(temp_dir) |
| return True |
| except Exception as e: |
| print(f"Error cleaning up temp directory: {e}") |
| return False |
| |
| def get_storage_stats(self) -> dict: |
| """ |
| Get storage usage statistics |
| |
| Returns: |
| Dictionary with storage statistics |
| """ |
| def get_dir_size(path: Path) -> int: |
| total = 0 |
| if path.exists(): |
| for entry in path.rglob('*'): |
| if entry.is_file(): |
| total += entry.stat().st_size |
| return total |
| |
| return { |
| "storage_base": str(self.storage_base), |
| "uploads_size_bytes": get_dir_size(self.uploads_dir), |
| "extracted_size_bytes": get_dir_size(self.extracted_dir), |
| "profiles_size_bytes": get_dir_size(self.profiles_dir), |
| "index_size_bytes": get_dir_size(self.index_dir), |
| "temp_size_bytes": get_dir_size(self.temp_dir), |
| } |
| |
| def organize_extracted_file( |
| self, |
| file_path: Path, |
| file_category: str, |
| job_id: str |
| ) -> Path: |
| """ |
| Move extracted file to appropriate category subdirectory |
| |
| Args: |
| file_path: Path to extracted file |
| file_category: Category (document, spreadsheet, image, video, unknown) |
| job_id: Unique job identifier |
| |
| Returns: |
| New path of organized file |
| """ |
| job_dir = self.get_job_directory(job_id) |
| category_dir = job_dir / file_category |
| |
| if not category_dir.exists(): |
| category_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| new_path = category_dir / file_path.name |
| |
| |
| if new_path.exists(): |
| stem = file_path.stem |
| suffix = file_path.suffix |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| new_path = category_dir / f"{stem}_{timestamp}{suffix}" |
| |
| shutil.move(str(file_path), str(new_path)) |
| return new_path |
| |
| def save_discovery_output( |
| self, |
| output: FileDiscoveryOutput, |
| job_id: str |
| ) -> Path: |
| """ |
| Save file discovery output to job directory |
| |
| Args: |
| output: File discovery output |
| job_id: Unique job identifier |
| |
| Returns: |
| Path to saved metadata file |
| """ |
| job_dir = self.get_job_directory(job_id) |
| metadata_path = job_dir / "discovery_metadata.json" |
| |
| import json |
| with open(metadata_path, 'w') as f: |
| |
| output_dict = output.model_dump(mode='json') |
| json.dump(output_dict, f, indent=2) |
| |
| return metadata_path |
|
|