Spaces:
Configuration error
Configuration error
feat: Introduce new backend architecture with notebooks, sources, chat, and CLaRa models, alongside database schema and updated deployment scripts, while removing old frontend, deployment files, and previous backend components.
88f8604
| """ | |
| Antigravity Notebook - Storage Service | |
| Handles saving and loading latent tensors to/from filesystem. | |
| """ | |
| import torch | |
| import os | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| from uuid import UUID | |
| from sqlalchemy.orm import Session | |
| from backend.config import settings | |
| from backend.database import LatentTensor, Source | |
| class StorageService: | |
| """Service for managing latent tensor storage""" | |
| def __init__(self, base_dir: str = None): | |
| self.base_dir = Path(base_dir or settings.LATENT_TENSOR_DIR) | |
| self.base_dir.mkdir(parents=True, exist_ok=True) | |
| def get_tensor_path( | |
| self, | |
| notebook_id: UUID, | |
| source_id: UUID, | |
| segment_index: int | |
| ) -> Path: | |
| """ | |
| Generate file path for a latent tensor. | |
| Structure: data/latent_tensors/{notebook_id}/{source_id}/segment_{idx}.pt | |
| Args: | |
| notebook_id: Notebook UUID | |
| source_id: Source UUID | |
| segment_index: Segment index | |
| Returns: | |
| Path to tensor file | |
| """ | |
| path = self.base_dir / str(notebook_id) / str(source_id) | |
| path.mkdir(parents=True, exist_ok=True) | |
| return path / f"segment_{segment_index}.pt" | |
| def save_tensor( | |
| self, | |
| tensor: torch.Tensor, | |
| notebook_id: UUID, | |
| source_id: UUID, | |
| segment_index: int | |
| ) -> str: | |
| """ | |
| Save a latent tensor to disk. | |
| Args: | |
| tensor: PyTorch tensor to save | |
| notebook_id: Notebook UUID | |
| source_id: Source UUID | |
| segment_index: Segment index | |
| Returns: | |
| Relative path to saved tensor | |
| """ | |
| path = self.get_tensor_path(notebook_id, source_id, segment_index) | |
| # Save with compression | |
| torch.save( | |
| tensor, | |
| path, | |
| _use_new_zipfile_serialization=True | |
| ) | |
| # Return relative path for database storage | |
| relative_path = str(path.relative_to(self.base_dir)) | |
| return relative_path | |
| def load_tensor(self, relative_path: str) -> torch.Tensor: | |
| """ | |
| Load a latent tensor from disk. | |
| Args: | |
| relative_path: Relative path from base_dir | |
| Returns: | |
| Loaded tensor | |
| """ | |
| full_path = self.base_dir / relative_path | |
| if not full_path.exists(): | |
| raise FileNotFoundError(f"Tensor file not found: {full_path}") | |
| tensor = torch.load(full_path, map_location="cpu") | |
| return tensor | |
| def get_notebook_sources( | |
| self, | |
| db: Session, | |
| notebook_id: UUID | |
| ) -> List[Dict]: | |
| """ | |
| Get all sources for a notebook with their metadata. | |
| Args: | |
| db: Database session | |
| notebook_id: Notebook UUID | |
| Returns: | |
| List of source dictionaries with metadata | |
| """ | |
| sources = db.query(Source).filter( | |
| Source.notebook_id == notebook_id | |
| ).all() | |
| source_data = [] | |
| for source in sources: | |
| source_data.append({ | |
| "id": source.id, | |
| "filename": source.filename, | |
| "source_type": source.source_type, | |
| "url": source.url, | |
| "metadata": source.metadata | |
| }) | |
| return source_data | |
| def get_source_tensors( | |
| self, | |
| db: Session, | |
| source_id: UUID | |
| ) -> List[LatentTensor]: | |
| """ | |
| Get all latent tensors for a source. | |
| Args: | |
| db: Database session | |
| source_id: Source UUID | |
| Returns: | |
| List of LatentTensor objects | |
| """ | |
| tensors = db.query(LatentTensor).filter( | |
| LatentTensor.source_id == source_id | |
| ).order_by(LatentTensor.segment_index).all() | |
| return tensors | |
| def get_notebook_tensors( | |
| self, | |
| db: Session, | |
| notebook_id: UUID | |
| ) -> List[Dict]: | |
| """ | |
| Get ALL latent tensors for a notebook across all sources. | |
| This is used by ContextManager to prepare the full notebook context. | |
| Args: | |
| db: Database session | |
| notebook_id: Notebook UUID | |
| Returns: | |
| List of dicts with tensor metadata and loaded tensors | |
| """ | |
| # Get all sources for notebook | |
| sources = db.query(Source).filter( | |
| Source.notebook_id == notebook_id | |
| ).all() | |
| all_tensors = [] | |
| for source in sources: | |
| # Get all tensors for this source | |
| tensors = db.query(LatentTensor).filter( | |
| LatentTensor.source_id == source.id | |
| ).order_by(LatentTensor.segment_index).all() | |
| for tensor_record in tensors: | |
| # Load the actual tensor | |
| tensor = self.load_tensor(tensor_record.tensor_path) | |
| all_tensors.append({ | |
| "tensor": tensor, | |
| "source_id": source.id, | |
| "source_filename": source.filename, | |
| "source_type": source.source_type, | |
| "segment_index": tensor_record.segment_index, | |
| "token_count": tensor_record.token_count, | |
| "metadata": tensor_record.metadata | |
| }) | |
| return all_tensors | |
| def delete_source_tensors( | |
| self, | |
| db: Session, | |
| source_id: UUID, | |
| notebook_id: UUID | |
| ) -> int: | |
| """ | |
| Delete all tensors associated with a source. | |
| Args: | |
| db: Database session | |
| source_id: Source UUID | |
| notebook_id: Notebook UUID | |
| Returns: | |
| Number of tensors deleted | |
| """ | |
| # Get all tensors for source | |
| tensors = self.get_source_tensors(db, source_id) | |
| deleted_count = 0 | |
| for tensor in tensors: | |
| # Delete file | |
| full_path = self.base_dir / tensor.tensor_path | |
| if full_path.exists(): | |
| full_path.unlink() | |
| deleted_count += 1 | |
| # Clean up empty directories | |
| source_dir = self.base_dir / str(notebook_id) / str(source_id) | |
| if source_dir.exists() and not any(source_dir.iterdir()): | |
| source_dir.rmdir() | |
| notebook_dir = self.base_dir / str(notebook_id) | |
| if notebook_dir.exists() and not any(notebook_dir.iterdir()): | |
| notebook_dir.rmdir() | |
| return deleted_count | |
| def get_storage_stats(self) -> Dict: | |
| """ | |
| Get storage statistics. | |
| Returns: | |
| Dictionary with storage stats | |
| """ | |
| total_files = 0 | |
| total_size = 0 | |
| for root, dirs, files in os.walk(self.base_dir): | |
| for file in files: | |
| if file.endswith('.pt'): | |
| file_path = Path(root) / file | |
| total_files += 1 | |
| total_size += file_path.stat().st_size | |
| return { | |
| "total_tensors": total_files, | |
| "total_size_bytes": total_size, | |
| "total_size_mb": round(total_size / (1024 * 1024), 2), | |
| "base_directory": str(self.base_dir) | |
| } | |
| # Global storage service instance | |
| _storage_instance: Optional[StorageService] = None | |
| def get_storage_service() -> StorageService: | |
| """Get or create the global storage service instance""" | |
| global _storage_instance | |
| if _storage_instance is None: | |
| _storage_instance = StorageService() | |
| return _storage_instance | |