Tumo505's picture
update LR database
b101933
"""
Data processing utilities for HeartMAP
"""
import os
import hashlib
from pathlib import Path
from typing import Tuple, List, Union
import warnings
import scanpy as sc
import numpy as np
import anndata as ad
from scipy.sparse import issparse
from ..config import Config
class DataValidator:
"""Validate data integrity and format"""
@staticmethod
def verify_checksum(file_path: str, expected_checksum: str) -> bool:
"""Verify file checksum"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest() == expected_checksum
@staticmethod
def validate_anndata(adata: ad.AnnData, check_qc_metrics: bool = True) -> Tuple[bool, List[str]]:
"""Validate AnnData object structure"""
issues = []
if adata.n_obs == 0:
issues.append("No cells in dataset")
if adata.n_vars == 0:
issues.append("No genes in dataset")
# Check for QC metrics only if requested (after they should be calculated)
if check_qc_metrics:
# scanpy creates these standard QC metric columns
required_obs = ['n_genes_by_counts', 'total_counts']
for col in required_obs:
if col not in adata.obs.columns:
issues.append(f"Missing required obs column: {col}")
# Check for NaN/inf values
if issparse(adata.X):
if not np.isfinite(adata.X.data).all():
issues.append("Non-finite values in X matrix")
else:
if not np.isfinite(adata.X).all():
issues.append("Non-finite values in X matrix")
return len(issues) == 0, issues
class DataLoader:
"""Load and preprocess data"""
def __init__(self, config: Config):
self.config = config
def load_raw_data(
self, file_path: Union[str, Path], verify_integrity: bool = True
) -> ad.AnnData:
"""Load raw single-cell data"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"Data file not found: {file_path}")
# Load data based on file format
if file_path.suffix == '.h5ad':
adata = sc.read_h5ad(file_path)
elif file_path.suffix == '.h5':
adata = sc.read_10x_h5(file_path, genome=None, gex_only=True)
elif file_path.suffix == '.csv':
adata = sc.read_csv(file_path).T # Transpose to have genes as variables
else:
raise ValueError(f"Unsupported file format: {file_path.suffix}")
# Validate data (skip QC metrics check for raw data)
is_valid, issues = DataValidator.validate_anndata(adata, check_qc_metrics=False)
if not is_valid:
warnings.warn(f"Data validation issues: {'; '.join(issues)}")
return adata
def preprocess_basic(self, adata: ad.AnnData) -> ad.AnnData:
"""Basic preprocessing pipeline"""
adata = adata.copy()
# Make gene names unique
adata.var_names_make_unique()
# Store raw data
adata.raw = adata
# Basic filtering
sc.pp.filter_cells(adata, min_genes=self.config.data.min_genes)
sc.pp.filter_genes(adata, min_cells=self.config.data.min_cells)
return adata
def calculate_qc_metrics(self, adata: ad.AnnData) -> ad.AnnData:
"""Calculate quality control metrics"""
adata = adata.copy()
# Mitochondrial genes
adata.var['mt'] = adata.var_names.str.startswith('MT-')
# Ribosomal genes
adata.var['ribo'] = adata.var_names.str.startswith(('RPS', 'RPL'))
# Hemoglobin genes
adata.var['hb'] = adata.var_names.str.contains('^HB[^(P)]')
# Calculate QC metrics
sc.pp.calculate_qc_metrics(
adata,
percent_top=None,
log1p=False,
inplace=True
)
sc.pp.calculate_qc_metrics(
adata,
qc_vars=['mt', 'ribo', 'hb'],
percent_top=None,
log1p=False,
inplace=True
)
return adata
def scale_for_memory(self, adata: ad.AnnData) -> ad.AnnData:
"""Scale dataset for memory constraints"""
if self.config.data.max_cells_subset and adata.n_obs > self.config.data.max_cells_subset:
np.random.seed(self.config.data.random_seed)
cell_indices = np.random.choice(
adata.n_obs,
size=self.config.data.max_cells_subset,
replace=False
)
adata = adata[cell_indices].copy()
if self.config.data.max_genes_subset and adata.n_vars > self.config.data.max_genes_subset:
# Select most variable genes
if issparse(adata.X):
# For sparse matrices, convert to dense temporarily for variance calculation
dense_subset = adata.X[:min(1000, adata.n_obs), :].toarray()
gene_vars = np.var(dense_subset, axis=0)
else:
gene_vars = np.var(adata.X, axis=0)
top_gene_indices = np.argsort(gene_vars)[-self.config.data.max_genes_subset:]
adata = adata[:, top_gene_indices].copy()
return adata
def normalize_and_scale(self, adata: ad.AnnData) -> ad.AnnData:
"""Normalize and scale data"""
adata = adata.copy()
# Clean data - remove infinite values
if issparse(adata.X):
adata.X.data = np.nan_to_num(adata.X.data, nan=0, posinf=0, neginf=0)
else:
adata.X = np.nan_to_num(adata.X, nan=0, posinf=0, neginf=0)
# Normalize to target sum
sc.pp.normalize_total(adata, target_sum=self.config.data.target_sum)
# Log transform
sc.pp.log1p(adata)
# Sanitize after log1p (can create NaNs/Inf from edge cases)
if issparse(adata.X):
adata.X.data = np.nan_to_num(adata.X.data, nan=0, posinf=0, neginf=0)
else:
adata.X = np.nan_to_num(adata.X, nan=0, posinf=0, neginf=0)
return adata
def preprocess(self, adata: ad.AnnData) -> ad.AnnData:
"""Complete preprocessing pipeline (convenience method)"""
adata = self.preprocess_basic(adata)
adata = self.scale_for_memory(adata)
adata = self.normalize_and_scale(adata)
return adata
class DataProcessor:
"""Main data processing class"""
def __init__(self, config: Config):
self.config = config
self.loader = DataLoader(config)
@staticmethod
def _sanitize_before_pca(adata: ad.AnnData) -> ad.AnnData:
"""Ensure finite values and remove empty genes/cells before PCA."""
adata = adata.copy()
# Replace NaN/Inf with zeros
if issparse(adata.X):
import numpy as _np
data = adata.X.data
if data.size:
adata.X.data = _np.nan_to_num(data, nan=0, posinf=0, neginf=0)
else:
adata.X = np.nan_to_num(adata.X, nan=0, posinf=0, neginf=0)
# Drop all-zero genes/cells to avoid zero-variance issues
try:
sc.pp.filter_genes(adata, min_counts=1)
sc.pp.filter_cells(adata, min_counts=1)
except Exception:
pass
return adata
def process_from_raw(self, file_path: str, save_intermediate: bool = True) -> ad.AnnData:
"""Complete processing pipeline from raw data"""
# Ensure processed data directory exists
if save_intermediate:
os.makedirs(self.config.paths.processed_data_dir, exist_ok=True)
# Load raw data
adata = self.loader.load_raw_data(file_path)
# Basic preprocessing
adata = self.loader.preprocess_basic(adata)
if save_intermediate:
adata.write(os.path.join(
self.config.paths.processed_data_dir,
"preprocessed.h5ad"
))
# Calculate QC metrics
adata = self.loader.calculate_qc_metrics(adata)
# Validate data with QC metrics
is_valid, issues = DataValidator.validate_anndata(adata, check_qc_metrics=True)
if not is_valid:
warnings.warn(f"Data validation issues after QC calculation: {'; '.join(issues)}")
if save_intermediate:
adata.write(os.path.join(
self.config.paths.processed_data_dir,
"qc_calculated.h5ad"
))
# Scale for memory if needed
if (self.config.data.max_cells_subset or
self.config.data.max_genes_subset):
adata = self.loader.scale_for_memory(adata)
if save_intermediate:
adata.write(os.path.join(
self.config.paths.processed_data_dir,
"scaled.h5ad"
))
# Normalize and scale
adata = self.loader.normalize_and_scale(adata)
if save_intermediate:
adata.write(os.path.join(
self.config.paths.processed_data_dir,
"normalized.h5ad"
))
# Final sanitization before PCA (handles web deployment NaNs)
adata = self._sanitize_before_pca(adata)
# Compute PCA for dimensionality reduction
sc.tl.pca(adata, svd_solver='arpack')
# Compute neighborhood graph (required for clustering)
sc.pp.neighbors(adata, n_neighbors=15, n_pcs=40)
if save_intermediate:
adata.write(os.path.join(
self.config.paths.processed_data_dir,
"processed_with_neighbors.h5ad"
))
return adata
def create_test_dataset(self, adata: ad.AnnData, n_cells: int = 1000) -> ad.AnnData:
"""Create small test dataset"""
np.random.seed(self.config.data.random_seed)
n_cells = min(n_cells, adata.n_obs)
cell_indices = np.random.choice(adata.n_obs, size=n_cells, replace=False)
return adata[cell_indices].copy()
# Import ligand-receptor database module
try:
from .lr_database import get_ligand_receptor_pairs, LigandReceptorDatabase
LR_DATABASE_AVAILABLE = True
except ImportError:
LR_DATABASE_AVAILABLE = False
warnings.warn("Ligand-receptor database module not available. Install liana for full functionality.")
# Export data processing classes
__all__ = [
'DataValidator',
'DataLoader',
'DataProcessor',
'get_ligand_receptor_pairs',
'LigandReceptorDatabase',
'LR_DATABASE_AVAILABLE'
]