Spaces:
Sleeping
Sleeping
| """ | |
| Memory management utilities for UAP Data Analysis Tool | |
| Handles large file processing and memory optimization | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import streamlit as st | |
| from typing import Iterator, Callable, Any, Optional, List | |
| import logging | |
| import gc | |
| import psutil | |
| import os | |
| logger = logging.getLogger(__name__) | |
| class MemoryManager: | |
| """Memory management for processing large UAP datasets""" | |
| def get_memory_usage() -> dict: | |
| """Get current memory usage statistics""" | |
| process = psutil.Process(os.getpid()) | |
| memory_info = process.memory_info() | |
| return { | |
| 'rss_mb': memory_info.rss / 1024 / 1024, # Resident Set Size in MB | |
| 'vms_mb': memory_info.vms / 1024 / 1024, # Virtual Memory Size in MB | |
| 'percent': process.memory_percent(), | |
| 'available_mb': psutil.virtual_memory().available / 1024 / 1024 | |
| } | |
| def get_data_iterator(file_path: str, chunksize: int = 10000) -> Iterator[pd.DataFrame]: | |
| """ | |
| Create an iterator for reading large HDF5 files in chunks | |
| Args: | |
| file_path: Path to HDF5 file | |
| chunksize: Number of rows per chunk | |
| Returns: | |
| Iterator yielding DataFrame chunks | |
| """ | |
| try: | |
| # For HDF5 files | |
| if file_path.endswith('.h5') or file_path.endswith('.hdf5'): | |
| return pd.read_hdf(file_path, iterator=True, chunksize=chunksize) | |
| # For CSV files | |
| elif file_path.endswith('.csv'): | |
| return pd.read_csv(file_path, chunksize=chunksize) | |
| else: | |
| raise ValueError(f"Unsupported file type: {file_path}") | |
| except Exception as e: | |
| logger.error(f"Error creating data iterator: {e}") | |
| raise | |
| def process_data_in_chunks( | |
| iterator: Iterator[pd.DataFrame], | |
| process_func: Callable[[pd.DataFrame], Any], | |
| combine_func: Optional[Callable[[List[Any]], Any]] = None, | |
| progress_bar: bool = True | |
| ) -> Any: | |
| """ | |
| Process data in chunks to manage memory usage | |
| Args: | |
| iterator: Data chunk iterator | |
| process_func: Function to apply to each chunk | |
| combine_func: Function to combine results (default: pd.concat) | |
| progress_bar: Show progress bar | |
| Returns: | |
| Combined results from all chunks | |
| """ | |
| results = [] | |
| chunk_count = 0 | |
| if progress_bar: | |
| progress = st.progress(0) | |
| status_text = st.empty() | |
| try: | |
| for chunk in iterator: | |
| # Process chunk | |
| result = process_func(chunk) | |
| results.append(result) | |
| chunk_count += 1 | |
| # Update progress | |
| if progress_bar: | |
| memory_stats = MemoryManager.get_memory_usage() | |
| status_text.text( | |
| f"Processed {chunk_count} chunks | " | |
| f"Memory: {memory_stats['rss_mb']:.1f}MB ({memory_stats['percent']:.1f}%)" | |
| ) | |
| # Garbage collection every 10 chunks | |
| if chunk_count % 10 == 0: | |
| gc.collect() | |
| except Exception as e: | |
| logger.error(f"Error processing chunk {chunk_count}: {e}") | |
| raise | |
| finally: | |
| if progress_bar: | |
| progress.empty() | |
| status_text.empty() | |
| # Combine results | |
| if combine_func: | |
| return combine_func(results) | |
| elif results and isinstance(results[0], pd.DataFrame): | |
| return pd.concat(results, ignore_index=True) | |
| else: | |
| return results | |
| def load_data_subset( | |
| file_path: str, | |
| start_row: int = 0, | |
| num_rows: int = 10000, | |
| columns: Optional[List[str]] = None | |
| ) -> pd.DataFrame: | |
| """ | |
| Load a subset of data from file with disk caching | |
| Args: | |
| file_path: Path to data file | |
| start_row: Starting row index | |
| num_rows: Number of rows to load | |
| columns: Specific columns to load | |
| Returns: | |
| DataFrame subset | |
| """ | |
| try: | |
| if file_path.endswith('.h5') or file_path.endswith('.hdf5'): | |
| # For HDF5, we need to load and then slice | |
| with pd.HDFStore(file_path, mode='r') as store: | |
| # Get the first key (assumes single dataset) | |
| key = list(store.keys())[0] | |
| # Use where parameter for efficient loading | |
| if columns: | |
| df = store.select( | |
| key, | |
| start=start_row, | |
| stop=start_row + num_rows, | |
| columns=columns | |
| ) | |
| else: | |
| df = store.select( | |
| key, | |
| start=start_row, | |
| stop=start_row + num_rows | |
| ) | |
| return df | |
| elif file_path.endswith('.csv'): | |
| # For CSV, use skiprows and nrows | |
| if columns: | |
| return pd.read_csv( | |
| file_path, | |
| skiprows=range(1, start_row + 1), # Skip header + rows | |
| nrows=num_rows, | |
| usecols=columns | |
| ) | |
| else: | |
| return pd.read_csv( | |
| file_path, | |
| skiprows=range(1, start_row + 1), | |
| nrows=num_rows | |
| ) | |
| else: | |
| raise ValueError(f"Unsupported file type: {file_path}") | |
| except Exception as e: | |
| logger.error(f"Error loading data subset: {e}") | |
| raise | |
| def optimize_dataframe_memory(df: pd.DataFrame, deep: bool = True) -> pd.DataFrame: | |
| """ | |
| Optimize DataFrame memory usage by downcasting numeric types | |
| Args: | |
| df: DataFrame to optimize | |
| deep: Whether to return a deep copy | |
| Returns: | |
| Memory-optimized DataFrame | |
| """ | |
| if deep: | |
| df = df.copy() | |
| # Optimize numeric columns | |
| for col in df.columns: | |
| col_type = df[col].dtype | |
| if col_type != 'object': | |
| c_min = df[col].min() | |
| c_max = df[col].max() | |
| # Integer optimization | |
| if str(col_type)[:3] == 'int': | |
| if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: | |
| df[col] = df[col].astype(np.int8) | |
| elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: | |
| df[col] = df[col].astype(np.int16) | |
| elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: | |
| df[col] = df[col].astype(np.int32) | |
| # Float optimization | |
| else: | |
| if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: | |
| df[col] = df[col].astype(np.float16) | |
| elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: | |
| df[col] = df[col].astype(np.float32) | |
| # Convert string columns with low cardinality to category | |
| for col in df.select_dtypes(include=['object']).columns: | |
| num_unique = df[col].nunique() | |
| num_total = len(df[col]) | |
| if num_unique / num_total < 0.5: # Less than 50% unique values | |
| df[col] = df[col].astype('category') | |
| return df | |
| def estimate_dataframe_memory(df: pd.DataFrame) -> dict: | |
| """Estimate memory usage of a DataFrame""" | |
| memory_usage = df.memory_usage(deep=True) | |
| total_memory = memory_usage.sum() | |
| return { | |
| 'total_mb': total_memory / 1024 / 1024, | |
| 'columns': { | |
| col: mem / 1024 / 1024 | |
| for col, mem in memory_usage.items() | |
| if col != 'Index' | |
| } | |
| } | |
| def clear_memory_cache() -> None: | |
| """Clear memory and caches""" | |
| # Clear Streamlit cache | |
| st.cache_data.clear() | |
| st.cache_resource.clear() | |
| # Force garbage collection | |
| gc.collect() | |
| logger.info("Memory cache cleared") | |
| def sample_large_dataset( | |
| file_path: str, | |
| sample_size: int = 10000, | |
| random_state: int = 42 | |
| ) -> pd.DataFrame: | |
| """ | |
| Get a random sample from a large dataset | |
| Args: | |
| file_path: Path to data file | |
| sample_size: Number of rows to sample | |
| random_state: Random seed for reproducibility | |
| Returns: | |
| Sampled DataFrame | |
| """ | |
| try: | |
| # First, get total number of rows | |
| if file_path.endswith('.h5') or file_path.endswith('.hdf5'): | |
| with pd.HDFStore(file_path, mode='r') as store: | |
| key = list(store.keys())[0] | |
| total_rows = store.get_storer(key).nrows | |
| elif file_path.endswith('.csv'): | |
| # Count rows without loading entire file | |
| total_rows = sum(1 for _ in open(file_path)) - 1 # Subtract header | |
| else: | |
| raise ValueError(f"Unsupported file type: {file_path}") | |
| # Generate random indices | |
| if sample_size >= total_rows: | |
| return MemoryManager.load_data_subset(file_path, 0, total_rows) | |
| np.random.seed(random_state) | |
| sample_indices = np.sort(np.random.choice(total_rows, sample_size, replace=False)) | |
| # Load sampled data | |
| sampled_dfs = [] | |
| for idx in sample_indices: | |
| df_row = MemoryManager.load_data_subset(file_path, idx, 1) | |
| sampled_dfs.append(df_row) | |
| return pd.concat(sampled_dfs, ignore_index=True) | |
| except Exception as e: | |
| logger.error(f"Error sampling dataset: {e}") | |
| raise |