Spaces:
Running
Running
| """ | |
| Unit tests for the LazyEventLoader class. | |
| This test suite covers: | |
| - LazyEventLoader initialization and file handling | |
| - Metadata extraction without loading full data | |
| - Memory usage estimation | |
| - Safety checks and risk assessment | |
| - File size formatting | |
| - Error handling for invalid files | |
| """ | |
| import pytest | |
| import os | |
| import tempfile | |
| import numpy as np | |
| from unittest.mock import MagicMock, patch, PropertyMock | |
| from utils.lazy_loader import LazyEventLoader, assess_loading_risk | |
| # ============================================================================= | |
| # Fixtures | |
| # ============================================================================= | |
| def mock_fits_file(): | |
| """Create a temporary mock FITS file.""" | |
| with tempfile.NamedTemporaryFile(suffix='.fits', delete=False) as f: | |
| # Write some dummy data to make it a non-zero size | |
| f.write(b'SIMPLE = T' * 100) # Fake FITS header | |
| temp_path = f.name | |
| yield temp_path | |
| # Cleanup | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| def mock_fits_reader(): | |
| """Create a mock FITSTimeseriesReader.""" | |
| mock_reader = MagicMock() | |
| mock_reader.gti = np.array([[0, 1000], [1100, 2000]]) | |
| mock_reader.mjdref = 58000.0 | |
| return mock_reader | |
| # ============================================================================= | |
| # Test: LazyEventLoader Initialization | |
| # ============================================================================= | |
| def test_lazy_loader_init_with_nonexistent_file(): | |
| """Test initialization with non-existent file raises FileNotFoundError.""" | |
| with pytest.raises(FileNotFoundError): | |
| LazyEventLoader("/path/to/nonexistent/file.fits") | |
| def test_lazy_loader_init_with_invalid_fits(mock_fits_file): | |
| """Test initialization with invalid FITS file raises ValueError.""" | |
| # The mock file isn't a real FITS file, so this should fail | |
| with pytest.raises(ValueError, match="Failed to open FITS file"): | |
| LazyEventLoader(mock_fits_file) | |
| def test_lazy_loader_init_success(mock_reader_class, mock_fits_file): | |
| """Test successful initialization.""" | |
| mock_reader_class.return_value = MagicMock() | |
| loader = LazyEventLoader(mock_fits_file) | |
| assert loader.file_path == mock_fits_file | |
| assert loader.file_size > 0 | |
| assert loader.reader is not None | |
| mock_reader_class.assert_called_once_with(mock_fits_file, data_kind="times") | |
| # ============================================================================= | |
| # Test: Metadata Extraction | |
| # ============================================================================= | |
| def test_get_metadata(mock_reader_class, mock_fits_file, mock_fits_reader): | |
| """Test metadata extraction without loading event data.""" | |
| mock_reader_class.return_value = mock_fits_reader | |
| loader = LazyEventLoader(mock_fits_file) | |
| metadata = loader.get_metadata() | |
| # Check all expected keys present | |
| assert 'gti' in metadata | |
| assert 'mjdref' in metadata | |
| assert 'n_events_estimate' in metadata | |
| assert 'time_range' in metadata | |
| assert 'file_size_mb' in metadata | |
| assert 'file_size_gb' in metadata | |
| assert 'duration_s' in metadata | |
| assert 'estimated_count_rate' in metadata | |
| # Check values | |
| assert np.array_equal(metadata['gti'], mock_fits_reader.gti) | |
| assert metadata['mjdref'] == 58000.0 | |
| assert metadata['duration_s'] == 1900.0 # (1000-0) + (2000-1100) | |
| assert metadata['n_events_estimate'] > 0 | |
| def test_get_metadata_time_range(mock_reader_class, mock_fits_file, mock_fits_reader): | |
| """Test that time_range is correctly extracted from GTIs.""" | |
| mock_reader_class.return_value = mock_fits_reader | |
| loader = LazyEventLoader(mock_fits_file) | |
| metadata = loader.get_metadata() | |
| time_range = metadata['time_range'] | |
| assert time_range == (0.0, 2000.0) # min and max from GTIs | |
| # ============================================================================= | |
| # Test: Memory Estimation | |
| # ============================================================================= | |
| def test_estimate_memory_usage_fits(mock_reader_class, mock_fits_file): | |
| """Test memory estimation for FITS files.""" | |
| mock_reader_class.return_value = MagicMock() | |
| loader = LazyEventLoader(mock_fits_file) | |
| estimated = loader.estimate_memory_usage('fits') | |
| # FITS multiplier is 3x (based on Stingray benchmarks: 2GB → 5.2GB = 2.6x, rounded to 3x) | |
| expected = loader.file_size * 3 | |
| assert estimated == expected | |
| def test_estimate_memory_usage_hdf5(mock_reader_class, mock_fits_file): | |
| """Test memory estimation for HDF5 files.""" | |
| mock_reader_class.return_value = MagicMock() | |
| loader = LazyEventLoader(mock_fits_file) | |
| estimated = loader.estimate_memory_usage('hdf5') | |
| # HDF5 multiplier is 2x (more efficient format) | |
| expected = loader.file_size * 2 | |
| assert estimated == expected | |
| def test_estimate_memory_usage_pickle(mock_reader_class, mock_fits_file): | |
| """Test memory estimation for pickle files.""" | |
| mock_reader_class.return_value = MagicMock() | |
| loader = LazyEventLoader(mock_fits_file) | |
| estimated = loader.estimate_memory_usage('pickle') | |
| # Pickle multiplier is 1.5x (most efficient format) | |
| expected = loader.file_size * 1.5 | |
| assert estimated == expected | |
| def test_estimate_memory_usage_unknown_format(mock_reader_class, mock_fits_file): | |
| """Test memory estimation for unknown format defaults to conservative multiplier.""" | |
| mock_reader_class.return_value = MagicMock() | |
| loader = LazyEventLoader(mock_fits_file) | |
| estimated = loader.estimate_memory_usage('unknown_format') | |
| # Default multiplier is 3x (conservative default, same as FITS) | |
| expected = loader.file_size * 3 | |
| assert estimated == expected | |
| # ============================================================================= | |
| # Test: Safety Checks | |
| # ============================================================================= | |
| def test_can_load_safely_safe(mock_vmem, mock_reader_class, mock_fits_file): | |
| """Test can_load_safely returns True when safe.""" | |
| mock_reader_class.return_value = MagicMock() | |
| # Mock large available memory | |
| mock_vmem.return_value.available = 16 * 1024**3 # 16 GB | |
| loader = LazyEventLoader(mock_fits_file) | |
| # Small file, lots of memory -> should be safe | |
| assert loader.can_load_safely(safety_margin=0.5) is True | |
| def test_can_load_safely_unsafe(mock_vmem, mock_reader_class, mock_fits_file): | |
| """Test can_load_safely returns False when unsafe.""" | |
| mock_reader_class.return_value = MagicMock() | |
| # Mock small available memory relative to file size | |
| # File is ~1.1 KB, with 3x multiplier = ~3.3 KB needed | |
| # Set available to 5 KB, so 50% margin = 2.5 KB safe limit | |
| # 3.3 KB > 2.5 KB -> should be unsafe | |
| mock_vmem.return_value.available = 5 * 1024 # 5 KB | |
| loader = LazyEventLoader(mock_fits_file) | |
| # File needs more memory than safe limit -> should be unsafe | |
| assert loader.can_load_safely(safety_margin=0.5) is False | |
| def test_can_load_safely_custom_margin(mock_vmem, mock_reader_class, mock_fits_file): | |
| """Test can_load_safely with custom safety margin.""" | |
| mock_reader_class.return_value = MagicMock() | |
| # Mock specific available memory | |
| mock_vmem.return_value.available = 1 * 1024**3 # 1 GB | |
| loader = LazyEventLoader(mock_fits_file) | |
| # With high safety margin (10%), should be safer | |
| result_high_margin = loader.can_load_safely(safety_margin=0.1) | |
| # With low safety margin (90%), should be less safe | |
| result_low_margin = loader.can_load_safely(safety_margin=0.9) | |
| # High margin is more conservative (more likely to be unsafe) | |
| # Low margin is less conservative (more likely to be safe) | |
| # For small test file, both might be True, but the logic is correct | |
| # ============================================================================= | |
| # Test: System Memory Info | |
| # ============================================================================= | |
| def test_get_system_memory_info(mock_process, mock_vmem, mock_reader_class, mock_fits_file): | |
| """Test system memory info retrieval.""" | |
| mock_reader_class.return_value = MagicMock() | |
| # Mock memory values | |
| mock_vmem.return_value.total = 16 * 1024**3 # 16 GB | |
| mock_vmem.return_value.available = 8 * 1024**3 # 8 GB | |
| mock_vmem.return_value.used = 8 * 1024**3 # 8 GB | |
| mock_vmem.return_value.percent = 50.0 | |
| mock_process.return_value.memory_info.return_value.rss = 256 * 1024**2 # 256 MB | |
| loader = LazyEventLoader(mock_fits_file) | |
| mem_info = loader.get_system_memory_info() | |
| # Check all expected keys | |
| assert 'total_mb' in mem_info | |
| assert 'available_mb' in mem_info | |
| assert 'used_mb' in mem_info | |
| assert 'percent' in mem_info | |
| assert 'process_mb' in mem_info | |
| # Check values | |
| assert mem_info['total_mb'] == 16 * 1024 # 16 GB in MB | |
| assert mem_info['available_mb'] == 8 * 1024 # 8 GB in MB | |
| assert mem_info['percent'] == 50.0 | |
| assert mem_info['process_mb'] == 256.0 | |
| # ============================================================================= | |
| # Test: File Size Formatting | |
| # ============================================================================= | |
| def test_format_file_size_bytes(): | |
| """Test formatting bytes.""" | |
| assert LazyEventLoader.format_file_size(500) == "500.0 B" | |
| def test_format_file_size_kilobytes(): | |
| """Test formatting kilobytes.""" | |
| assert LazyEventLoader.format_file_size(1500) == "1.5 KB" | |
| def test_format_file_size_megabytes(): | |
| """Test formatting megabytes.""" | |
| assert LazyEventLoader.format_file_size(2 * 1024**2) == "2.0 MB" | |
| def test_format_file_size_gigabytes(): | |
| """Test formatting gigabytes.""" | |
| assert LazyEventLoader.format_file_size(3.5 * 1024**3) == "3.5 GB" | |
| def test_format_file_size_terabytes(): | |
| """Test formatting terabytes.""" | |
| assert LazyEventLoader.format_file_size(1.2 * 1024**4) == "1.2 TB" | |
| # ============================================================================= | |
| # Test: Risk Assessment Function | |
| # ============================================================================= | |
| def test_assess_loading_risk_safe(mock_vmem): | |
| """Test risk assessment returns 'safe' for small files.""" | |
| mock_vmem.return_value.available = 16 * 1024**3 # 16 GB | |
| file_size = 100 * 1024**2 # 100 MB | |
| risk = assess_loading_risk(file_size, file_format='fits') | |
| # 100 MB * 3 = 300 MB needed | |
| # 300 MB / 16 GB = ~0.02 (2%) -> safe | |
| assert risk == 'safe' | |
| def test_assess_loading_risk_caution(mock_vmem): | |
| """Test risk assessment returns 'caution' for medium files.""" | |
| mock_vmem.return_value.available = 2 * 1024**3 # 2 GB | |
| file_size = 350 * 1024**2 # 350 MB | |
| risk = assess_loading_risk(file_size, file_format='fits') | |
| # 350 MB * 3 = 1050 MB needed | |
| # 1050 MB / 2048 MB = ~0.51 (51%) -> caution | |
| assert risk == 'caution' | |
| def test_assess_loading_risk_risky(mock_vmem): | |
| """Test risk assessment returns 'risky' for large files.""" | |
| mock_vmem.return_value.available = 2 * 1024**3 # 2 GB | |
| file_size = 480 * 1024**2 # 480 MB | |
| risk = assess_loading_risk(file_size, file_format='fits') | |
| # 480 MB * 3 = 1440 MB needed | |
| # 1440 MB / 2048 MB = ~0.70 (70%) -> risky | |
| assert risk == 'risky' | |
| def test_assess_loading_risk_critical(mock_vmem): | |
| """Test risk assessment returns 'critical' for very large files.""" | |
| mock_vmem.return_value.available = 1 * 1024**3 # 1 GB | |
| file_size = 350 * 1024**2 # 350 MB | |
| risk = assess_loading_risk(file_size, file_format='fits') | |
| # 350 MB * 3 = 1050 MB needed | |
| # 1050 MB / 1024 MB = ~1.03 (103%) -> critical | |
| assert risk == 'critical' | |
| def test_assess_loading_risk_different_formats(mock_vmem): | |
| """Test risk assessment with different file formats.""" | |
| mock_vmem.return_value.available = 4 * 1024**3 # 4 GB | |
| # Use different file sizes to test format-specific multipliers | |
| # FITS: 1000 MB * 3 = 3000 MB (73% -> risky) | |
| risk_fits = assess_loading_risk(1000 * 1024**2, file_format='fits', available_memory=4 * 1024**3) | |
| # HDF5: 850 MB * 2 = 1700 MB (41% -> caution) | |
| risk_hdf5 = assess_loading_risk(850 * 1024**2, file_format='hdf5', available_memory=4 * 1024**3) | |
| # Pickle: 600 MB * 1.5 = 900 MB (22% -> safe) | |
| risk_pickle = assess_loading_risk(600 * 1024**2, file_format='pickle', available_memory=4 * 1024**3) | |
| assert risk_fits in ['risky', 'critical'] | |
| assert risk_hdf5 in ['safe', 'caution'] | |
| assert risk_pickle == 'safe' | |
| # ============================================================================= | |
| # Test: Context Manager | |
| # ============================================================================= | |
| def test_context_manager(mock_reader_class, mock_fits_file): | |
| """Test LazyEventLoader as context manager.""" | |
| mock_reader_class.return_value = MagicMock() | |
| with LazyEventLoader(mock_fits_file) as loader: | |
| assert loader is not None | |
| assert isinstance(loader, LazyEventLoader) | |
| # ============================================================================= | |
| # Test: String Representation | |
| # ============================================================================= | |
| def test_repr(mock_reader_class, mock_fits_file): | |
| """Test string representation.""" | |
| mock_reader_class.return_value = MagicMock() | |
| loader = LazyEventLoader(mock_fits_file) | |
| repr_str = repr(loader) | |
| assert 'LazyEventLoader' in repr_str | |
| assert mock_fits_file in repr_str | |
| assert 'KB' in repr_str or 'MB' in repr_str or 'GB' in repr_str | |
| # ============================================================================= | |
| # Test: Load Full (with mocking) | |
| # ============================================================================= | |
| def test_load_full(mock_eventlist_class, mock_reader_class, mock_fits_file): | |
| """Test load_full method.""" | |
| mock_reader_class.return_value = MagicMock() | |
| mock_event_list = MagicMock() | |
| mock_event_list.time = np.arange(1000) | |
| mock_eventlist_class.read.return_value = mock_event_list | |
| loader = LazyEventLoader(mock_fits_file) | |
| events = loader.load_full() | |
| assert events is not None | |
| mock_eventlist_class.read.assert_called_once() | |
| def test_load_full_with_additional_columns(mock_eventlist_class, mock_reader_class, mock_fits_file): | |
| """Test load_full with additional columns.""" | |
| mock_reader_class.return_value = MagicMock() | |
| mock_event_list = MagicMock() | |
| mock_eventlist_class.read.return_value = mock_event_list | |
| loader = LazyEventLoader(mock_fits_file) | |
| loader.load_full(additional_columns=['DETID', 'RAWX']) | |
| # Verify additional_columns was passed | |
| call_kwargs = mock_eventlist_class.read.call_args[1] | |
| assert 'additional_columns' in call_kwargs | |
| assert call_kwargs['additional_columns'] == ['DETID', 'RAWX'] | |
| # ============================================================================= | |
| # Test: Stream Segments (with mocking) | |
| # ============================================================================= | |
| def test_stream_segments(mock_time_intervals, mock_reader_class, mock_fits_file, mock_fits_reader): | |
| """Test stream_segments method.""" | |
| mock_reader_class.return_value = mock_fits_reader | |
| # Mock time intervals | |
| mock_time_intervals.return_value = ( | |
| np.array([0, 100, 200]), | |
| np.array([100, 200, 300]) | |
| ) | |
| # Mock filtered times | |
| mock_fits_reader.filter_at_time_intervals.return_value = [ | |
| np.array([10, 20, 30]), | |
| np.array([110, 120]), | |
| np.array([210, 220, 230, 240]) | |
| ] | |
| loader = LazyEventLoader(mock_fits_file) | |
| segments = list(loader.stream_segments(segment_size=100)) | |
| assert len(segments) == 3 | |
| assert len(segments[0]) == 3 # First segment has 3 events | |
| assert len(segments[1]) == 2 # Second segment has 2 events | |
| assert len(segments[2]) == 4 # Third segment has 4 events | |
| # ============================================================================= | |
| # Test: Edge Cases | |
| # ============================================================================= | |
| def test_metadata_with_zero_duration(mock_reader_class, mock_fits_file): | |
| """Test metadata extraction with zero duration GTIs.""" | |
| mock_reader = MagicMock() | |
| mock_reader.gti = np.array([[0, 0]]) # Zero duration | |
| mock_reader.mjdref = 58000.0 | |
| mock_reader_class.return_value = mock_reader | |
| loader = LazyEventLoader(mock_fits_file) | |
| metadata = loader.get_metadata() | |
| # Should handle zero duration gracefully | |
| assert metadata['duration_s'] == 0.0 | |
| assert metadata['estimated_count_rate'] == 0 # Avoid division by zero | |
| def test_metadata_with_no_mjdref(mock_reader_class, mock_fits_file): | |
| """Test metadata extraction when MJDREF is missing.""" | |
| mock_reader = MagicMock() | |
| mock_reader.gti = np.array([[0, 1000]]) | |
| del mock_reader.mjdref # Remove attribute | |
| mock_reader_class.return_value = mock_reader | |
| loader = LazyEventLoader(mock_fits_file) | |
| metadata = loader.get_metadata() | |
| # Should default to 0.0 | |
| assert metadata['mjdref'] == 0.0 | |