UAP-Data-Analysis-Tool / utils /memory_manager.py
Ashoka74's picture
Deploy current work to HF Space (slim)
a1aef88
Raw
History Blame Contribute Delete
10.9 kB
"""
Memory management utilities for UAP Data Analysis Tool
Handles large file processing and memory optimization
"""
import pandas as pd
import numpy as np
import streamlit as st
from typing import Iterator, Callable, Any, Optional, List
import logging
import gc
import psutil
import os
logger = logging.getLogger(__name__)
class MemoryManager:
"""Memory management for processing large UAP datasets"""
@staticmethod
def get_memory_usage() -> dict:
"""Get current memory usage statistics"""
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return {
'rss_mb': memory_info.rss / 1024 / 1024, # Resident Set Size in MB
'vms_mb': memory_info.vms / 1024 / 1024, # Virtual Memory Size in MB
'percent': process.memory_percent(),
'available_mb': psutil.virtual_memory().available / 1024 / 1024
}
@staticmethod
@st.cache_resource
def get_data_iterator(file_path: str, chunksize: int = 10000) -> Iterator[pd.DataFrame]:
"""
Create an iterator for reading large HDF5 files in chunks
Args:
file_path: Path to HDF5 file
chunksize: Number of rows per chunk
Returns:
Iterator yielding DataFrame chunks
"""
try:
# For HDF5 files
if file_path.endswith('.h5') or file_path.endswith('.hdf5'):
return pd.read_hdf(file_path, iterator=True, chunksize=chunksize)
# For CSV files
elif file_path.endswith('.csv'):
return pd.read_csv(file_path, chunksize=chunksize)
else:
raise ValueError(f"Unsupported file type: {file_path}")
except Exception as e:
logger.error(f"Error creating data iterator: {e}")
raise
@staticmethod
def process_data_in_chunks(
iterator: Iterator[pd.DataFrame],
process_func: Callable[[pd.DataFrame], Any],
combine_func: Optional[Callable[[List[Any]], Any]] = None,
progress_bar: bool = True
) -> Any:
"""
Process data in chunks to manage memory usage
Args:
iterator: Data chunk iterator
process_func: Function to apply to each chunk
combine_func: Function to combine results (default: pd.concat)
progress_bar: Show progress bar
Returns:
Combined results from all chunks
"""
results = []
chunk_count = 0
if progress_bar:
progress = st.progress(0)
status_text = st.empty()
try:
for chunk in iterator:
# Process chunk
result = process_func(chunk)
results.append(result)
chunk_count += 1
# Update progress
if progress_bar:
memory_stats = MemoryManager.get_memory_usage()
status_text.text(
f"Processed {chunk_count} chunks | "
f"Memory: {memory_stats['rss_mb']:.1f}MB ({memory_stats['percent']:.1f}%)"
)
# Garbage collection every 10 chunks
if chunk_count % 10 == 0:
gc.collect()
except Exception as e:
logger.error(f"Error processing chunk {chunk_count}: {e}")
raise
finally:
if progress_bar:
progress.empty()
status_text.empty()
# Combine results
if combine_func:
return combine_func(results)
elif results and isinstance(results[0], pd.DataFrame):
return pd.concat(results, ignore_index=True)
else:
return results
@staticmethod
@st.cache_data(persist="disk", max_entries=10)
def load_data_subset(
file_path: str,
start_row: int = 0,
num_rows: int = 10000,
columns: Optional[List[str]] = None
) -> pd.DataFrame:
"""
Load a subset of data from file with disk caching
Args:
file_path: Path to data file
start_row: Starting row index
num_rows: Number of rows to load
columns: Specific columns to load
Returns:
DataFrame subset
"""
try:
if file_path.endswith('.h5') or file_path.endswith('.hdf5'):
# For HDF5, we need to load and then slice
with pd.HDFStore(file_path, mode='r') as store:
# Get the first key (assumes single dataset)
key = list(store.keys())[0]
# Use where parameter for efficient loading
if columns:
df = store.select(
key,
start=start_row,
stop=start_row + num_rows,
columns=columns
)
else:
df = store.select(
key,
start=start_row,
stop=start_row + num_rows
)
return df
elif file_path.endswith('.csv'):
# For CSV, use skiprows and nrows
if columns:
return pd.read_csv(
file_path,
skiprows=range(1, start_row + 1), # Skip header + rows
nrows=num_rows,
usecols=columns
)
else:
return pd.read_csv(
file_path,
skiprows=range(1, start_row + 1),
nrows=num_rows
)
else:
raise ValueError(f"Unsupported file type: {file_path}")
except Exception as e:
logger.error(f"Error loading data subset: {e}")
raise
@staticmethod
def optimize_dataframe_memory(df: pd.DataFrame, deep: bool = True) -> pd.DataFrame:
"""
Optimize DataFrame memory usage by downcasting numeric types
Args:
df: DataFrame to optimize
deep: Whether to return a deep copy
Returns:
Memory-optimized DataFrame
"""
if deep:
df = df.copy()
# Optimize numeric columns
for col in df.columns:
col_type = df[col].dtype
if col_type != 'object':
c_min = df[col].min()
c_max = df[col].max()
# Integer optimization
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
# Float optimization
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
# Convert string columns with low cardinality to category
for col in df.select_dtypes(include=['object']).columns:
num_unique = df[col].nunique()
num_total = len(df[col])
if num_unique / num_total < 0.5: # Less than 50% unique values
df[col] = df[col].astype('category')
return df
@staticmethod
def estimate_dataframe_memory(df: pd.DataFrame) -> dict:
"""Estimate memory usage of a DataFrame"""
memory_usage = df.memory_usage(deep=True)
total_memory = memory_usage.sum()
return {
'total_mb': total_memory / 1024 / 1024,
'columns': {
col: mem / 1024 / 1024
for col, mem in memory_usage.items()
if col != 'Index'
}
}
@staticmethod
def clear_memory_cache() -> None:
"""Clear memory and caches"""
# Clear Streamlit cache
st.cache_data.clear()
st.cache_resource.clear()
# Force garbage collection
gc.collect()
logger.info("Memory cache cleared")
@staticmethod
@st.cache_data
def sample_large_dataset(
file_path: str,
sample_size: int = 10000,
random_state: int = 42
) -> pd.DataFrame:
"""
Get a random sample from a large dataset
Args:
file_path: Path to data file
sample_size: Number of rows to sample
random_state: Random seed for reproducibility
Returns:
Sampled DataFrame
"""
try:
# First, get total number of rows
if file_path.endswith('.h5') or file_path.endswith('.hdf5'):
with pd.HDFStore(file_path, mode='r') as store:
key = list(store.keys())[0]
total_rows = store.get_storer(key).nrows
elif file_path.endswith('.csv'):
# Count rows without loading entire file
total_rows = sum(1 for _ in open(file_path)) - 1 # Subtract header
else:
raise ValueError(f"Unsupported file type: {file_path}")
# Generate random indices
if sample_size >= total_rows:
return MemoryManager.load_data_subset(file_path, 0, total_rows)
np.random.seed(random_state)
sample_indices = np.sort(np.random.choice(total_rows, sample_size, replace=False))
# Load sampled data
sampled_dfs = []
for idx in sample_indices:
df_row = MemoryManager.load_data_subset(file_path, idx, 1)
sampled_dfs.append(df_row)
return pd.concat(sampled_dfs, ignore_index=True)
except Exception as e:
logger.error(f"Error sampling dataset: {e}")
raise