Tumo505's picture
Fix: UMAP computation and NaN handling
18a6dd5
"""
Utility functions and classes for HeartMAP
"""
import hashlib
from pathlib import Path
from typing import Dict, Union
import warnings
try:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import scanpy as sc
PLOTTING_AVAILABLE = True
except ImportError:
PLOTTING_AVAILABLE = False
warnings.warn("Plotting dependencies not available")
from ..config import Config
class Visualizer:
"""Visualization utilities for HeartMAP"""
def __init__(self, config: Config):
self.config = config
def plot_qc_metrics(self, adata, save_dir: Path) -> None:
"""Plot quality control metrics"""
if not PLOTTING_AVAILABLE:
return
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# Number of genes
axes[0, 0].hist(adata.obs['n_genes'], bins=50, alpha=0.7)
axes[0, 0].set_xlabel('Number of genes')
axes[0, 0].set_ylabel('Number of cells')
axes[0, 0].set_title('Genes per cell')
# Total counts
axes[0, 1].hist(adata.obs['total_counts'], bins=50, alpha=0.7)
axes[0, 1].set_xlabel('Total counts')
axes[0, 1].set_ylabel('Number of cells')
axes[0, 1].set_title('UMI per cell')
# Mitochondrial percentage
if 'pct_counts_mt' in adata.obs.columns:
axes[1, 0].hist(adata.obs['pct_counts_mt'], bins=50, alpha=0.7)
axes[1, 0].set_xlabel('Mitochondrial %')
axes[1, 0].set_ylabel('Number of cells')
axes[1, 0].set_title('Mitochondrial gene %')
# Scatter plot
axes[1, 1].scatter(adata.obs['total_counts'], adata.obs['n_genes'], alpha=0.6, s=1)
axes[1, 1].set_xlabel('Total counts')
axes[1, 1].set_ylabel('Number of genes')
axes[1, 1].set_title('Genes vs UMI')
plt.tight_layout()
plt.savefig(save_dir / "qc_metrics.png", dpi=300, bbox_inches='tight')
plt.close()
def plot_communication_heatmap(self, comm_scores: pd.DataFrame, save_dir: Path) -> None:
"""Plot cell-cell communication heatmap"""
if not PLOTTING_AVAILABLE:
return
# Create pivot table
pivot_df = comm_scores.pivot(
index='source',
columns='target',
values='communication_score'
)
plt.figure(figsize=(10, 8))
sns.heatmap(pivot_df, annot=True, cmap='viridis', fmt='.3f')
plt.title('Cell-Cell Communication Specificity')
plt.tight_layout()
plt.savefig(save_dir / "communication_heatmap.png", dpi=300, bbox_inches='tight')
plt.close()
def plot_hub_scores(self, adata, hub_scores: pd.Series, save_dir: Path) -> None:
"""Plot communication hub scores"""
if not PLOTTING_AVAILABLE:
return
# Add hub scores to adata for plotting
adata.obs['hub_score'] = hub_scores
# Ensure UMAP exists
if 'X_umap' not in adata.obsm:
import scanpy as sc
sc.tl.umap(adata)
sc.pl.umap(adata, color='hub_score', title='Communication Hub Score', show=False)
plt.savefig(save_dir / "hub_scores.png", dpi=300, bbox_inches='tight')
plt.close()
def plot_pathway_scores(self, pathway_scores: pd.DataFrame, save_dir: Path) -> None:
"""Plot pathway activity scores"""
if not PLOTTING_AVAILABLE or pathway_scores.empty:
return
plt.figure(figsize=(12, 8))
sns.heatmap(pathway_scores, annot=True, cmap='Blues', fmt='.3f')
plt.title('Pathway Activity by Cell Type')
plt.tight_layout()
plt.savefig(save_dir / "pathway_scores.png", dpi=300, bbox_inches='tight')
plt.close()
def plot_chamber_composition(self, adata, save_dir: Path) -> None:
"""Plot chamber composition"""
if not PLOTTING_AVAILABLE:
return
if 'chamber' not in adata.obs.columns:
return
chamber_counts = adata.obs['chamber'].value_counts()
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# Bar plot
chamber_counts.plot(kind='bar', ax=ax1)
ax1.set_title('Cell Counts by Chamber')
ax1.set_xlabel('Chamber')
ax1.set_ylabel('Number of Cells')
# Pie chart
ax2.pie(chamber_counts.values, labels=chamber_counts.index, autopct='%1.1f%%')
ax2.set_title('Chamber Proportions')
plt.tight_layout()
plt.savefig(save_dir / "chamber_composition.png", dpi=300, bbox_inches='tight')
plt.close()
def plot_chamber_markers(self, chamber_markers: Dict, save_dir: Path) -> None:
"""Plot chamber-specific markers"""
if not PLOTTING_AVAILABLE:
return
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
axes = axes.flatten()
for i, (chamber, markers) in enumerate(chamber_markers.items()):
if i < 4 and not markers.empty:
top_10 = markers.head(10)
axes[i].barh(range(len(top_10)), -np.log10(top_10['pvals_adj']))
axes[i].set_yticks(range(len(top_10)))
axes[i].set_yticklabels(top_10['names'])
axes[i].set_xlabel('-log10(adjusted p-value)')
axes[i].set_title(f'Top Markers - {chamber}')
plt.tight_layout()
plt.savefig(save_dir / "chamber_markers.png", dpi=300, bbox_inches='tight')
plt.close()
def plot_cross_chamber_correlations(self, correlations: pd.DataFrame, save_dir: Path) -> None:
"""Plot cross-chamber correlations"""
if not PLOTTING_AVAILABLE:
return
if correlations is None or getattr(correlations, 'empty', True):
return
plt.figure(figsize=(8, 6))
sns.heatmap(correlations, annot=True, cmap='coolwarm', center=0, fmt='.3f')
plt.title('Cross-Chamber Expression Correlations')
plt.tight_layout()
plt.savefig(save_dir / "cross_chamber_correlations.png", dpi=300, bbox_inches='tight')
plt.close()
def create_comprehensive_dashboard(self, adata, results: Dict, save_dir: Path) -> None:
"""Create comprehensive analysis dashboard"""
if not PLOTTING_AVAILABLE:
return
# Ensure UMAP is computed
if 'X_umap' not in adata.obsm:
if 'neighbors' not in adata.uns:
if 'X_pca' not in adata.obsm:
sc.tl.pca(adata, svd_solver='arpack')
sc.pp.neighbors(adata, n_neighbors=15, n_pcs=40)
sc.tl.umap(adata)
# Create a large multi-panel figure
plt.figure(figsize=(20, 16))
# Panel 1: UMAP with clusters
ax1 = plt.subplot(3, 3, 1)
sc.pl.umap(adata, color='leiden', ax=ax1, show=False, frameon=False)
ax1.set_title('Cell Type Clusters')
# Panel 2: UMAP with hub scores
ax2 = plt.subplot(3, 3, 2)
sc.pl.umap(adata, color='hub_score', ax=ax2, show=False, frameon=False)
ax2.set_title('Communication Hubs')
# Panel 3: Chamber composition (if available)
if 'chamber' in adata.obs.columns:
ax3 = plt.subplot(3, 3, 3)
chamber_counts = adata.obs['chamber'].value_counts()
ax3.pie(chamber_counts.values, labels=chamber_counts.index, autopct='%1.1f%%')
ax3.set_title('Chamber Distribution')
# Additional panels for other analyses...
plt.tight_layout()
plt.savefig(save_dir / "comprehensive_dashboard.png", dpi=300, bbox_inches='tight')
plt.close()
class ResultsExporter:
"""Export analysis results in various formats"""
def __init__(self, config: Config):
self.config = config
def export_results(self, results: Dict, output_dir: Path) -> None:
"""Export all results to files"""
output_dir.mkdir(parents=True, exist_ok=True)
# Export tables
tables_dir = output_dir / "tables"
tables_dir.mkdir(exist_ok=True)
# Export specific result types
if 'results' in results and 'marker_genes' in results['results']:
marker_genes = results['results']['marker_genes']
if marker_genes is not None:
marker_genes.to_csv(tables_dir / "marker_genes.csv")
if 'results' in results and 'communication_scores' in results['results']:
comm_scores = results['results']['communication_scores']
if comm_scores is not None:
comm_scores.to_csv(tables_dir / "communication_scores.csv", index=False)
def generate_comprehensive_report(self, results: Dict, output_dir: str) -> None:
"""Generate comprehensive analysis report"""
output_path = Path(output_dir)
# Get basic statistics
adata = results.get('adata')
if adata is None:
return
n_cells = adata.n_obs
n_genes = adata.n_vars
# Generate report
report = f"""# HeartMAP Analysis Report
## Dataset Overview
- **Total Cells**: {n_cells:,}
- **Total Genes**: {n_genes:,}
## Analysis Components Completed
- ✅ Cell type annotation
- ✅ Cell-cell communication analysis
- ✅ Multi-chamber analysis
## Key Findings
### Cell Type Annotation
"""
if 'leiden' in adata.obs.columns:
cluster_counts = adata.obs['leiden'].value_counts()
report += f"- **Number of cell types identified**: {len(cluster_counts)}\n"
report += "- **Cell type distribution**:\n"
for cluster, count in cluster_counts.head(5).items():
pct = 100 * count / n_cells
report += f" - Cluster {cluster}: {count:,} cells ({pct:.1f}%)\n"
if 'chamber' in adata.obs.columns:
chamber_counts = adata.obs['chamber'].value_counts()
report += "\n### Chamber Distribution\n"
for chamber, count in chamber_counts.items():
pct = 100 * count / n_cells
report += f"- **{chamber}**: {count:,} cells ({pct:.1f}%)\n"
report += """
### Communication Analysis
- Cell-cell communication patterns identified
- Communication hub cells detected
- Pathway activity scores calculated
## Files Generated
- `heartmap_complete.h5ad`: Complete processed dataset
- `heartmap_model.pkl`: Trained HeartMAP model
- `figures/`: All visualization outputs
- `tables/`: Exported data tables
## Next Steps
1. Validate findings with literature
2. Investigate specific cell type interactions
3. Apply model to new datasets
"""
# Save report
with open(output_path / "analysis_report.md", 'w') as f:
f.write(report)
class ChecksumValidator:
"""Validate data integrity using checksums"""
@staticmethod
def calculate_sha256(file_path: str) -> str:
"""Calculate SHA-256 checksum of file"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
@staticmethod
def verify_file(file_path: str, expected_checksum: str) -> bool:
"""Verify file against expected checksum"""
actual_checksum = ChecksumValidator.calculate_sha256(file_path)
return actual_checksum == expected_checksum
@staticmethod
def generate_checksums(directory: Union[str, Path], output_file: str) -> None:
"""Generate checksums for all files in directory"""
directory = Path(directory)
checksums = {}
for file_path in directory.rglob('*'):
if file_path.is_file():
rel_path = file_path.relative_to(directory)
checksum = ChecksumValidator.calculate_sha256(str(file_path))
checksums[str(rel_path)] = checksum
# Save checksums
with open(output_file, 'w') as f:
for rel_path_str, checksum in checksums.items():
f.write(f"{checksum} {rel_path_str}\n")
def setup_logging(level: str = "INFO") -> None:
"""Setup logging configuration"""
import logging
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('heartmap.log')
]
)
def create_logger(name: str):
"""Create a logger with the given name"""
import logging
return logging.getLogger(name)
# Export utility classes and functions
__all__ = [
'Visualizer',
'ResultsExporter',
'ChecksumValidator',
'setup_logging',
'create_logger'
]