Spaces:
Sleeping
Sleeping
| """ | |
| Utility functions and classes for HeartMAP | |
| """ | |
| import hashlib | |
| from pathlib import Path | |
| from typing import Dict, Union | |
| import warnings | |
| try: | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import pandas as pd | |
| import numpy as np | |
| import scanpy as sc | |
| PLOTTING_AVAILABLE = True | |
| except ImportError: | |
| PLOTTING_AVAILABLE = False | |
| warnings.warn("Plotting dependencies not available") | |
| from ..config import Config | |
| class Visualizer: | |
| """Visualization utilities for HeartMAP""" | |
| def __init__(self, config: Config): | |
| self.config = config | |
| def plot_qc_metrics(self, adata, save_dir: Path) -> None: | |
| """Plot quality control metrics""" | |
| if not PLOTTING_AVAILABLE: | |
| return | |
| fig, axes = plt.subplots(2, 2, figsize=(12, 10)) | |
| # Number of genes | |
| axes[0, 0].hist(adata.obs['n_genes'], bins=50, alpha=0.7) | |
| axes[0, 0].set_xlabel('Number of genes') | |
| axes[0, 0].set_ylabel('Number of cells') | |
| axes[0, 0].set_title('Genes per cell') | |
| # Total counts | |
| axes[0, 1].hist(adata.obs['total_counts'], bins=50, alpha=0.7) | |
| axes[0, 1].set_xlabel('Total counts') | |
| axes[0, 1].set_ylabel('Number of cells') | |
| axes[0, 1].set_title('UMI per cell') | |
| # Mitochondrial percentage | |
| if 'pct_counts_mt' in adata.obs.columns: | |
| axes[1, 0].hist(adata.obs['pct_counts_mt'], bins=50, alpha=0.7) | |
| axes[1, 0].set_xlabel('Mitochondrial %') | |
| axes[1, 0].set_ylabel('Number of cells') | |
| axes[1, 0].set_title('Mitochondrial gene %') | |
| # Scatter plot | |
| axes[1, 1].scatter(adata.obs['total_counts'], adata.obs['n_genes'], alpha=0.6, s=1) | |
| axes[1, 1].set_xlabel('Total counts') | |
| axes[1, 1].set_ylabel('Number of genes') | |
| axes[1, 1].set_title('Genes vs UMI') | |
| plt.tight_layout() | |
| plt.savefig(save_dir / "qc_metrics.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| def plot_communication_heatmap(self, comm_scores: pd.DataFrame, save_dir: Path) -> None: | |
| """Plot cell-cell communication heatmap""" | |
| if not PLOTTING_AVAILABLE: | |
| return | |
| # Create pivot table | |
| pivot_df = comm_scores.pivot( | |
| index='source', | |
| columns='target', | |
| values='communication_score' | |
| ) | |
| plt.figure(figsize=(10, 8)) | |
| sns.heatmap(pivot_df, annot=True, cmap='viridis', fmt='.3f') | |
| plt.title('Cell-Cell Communication Specificity') | |
| plt.tight_layout() | |
| plt.savefig(save_dir / "communication_heatmap.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| def plot_hub_scores(self, adata, hub_scores: pd.Series, save_dir: Path) -> None: | |
| """Plot communication hub scores""" | |
| if not PLOTTING_AVAILABLE: | |
| return | |
| # Add hub scores to adata for plotting | |
| adata.obs['hub_score'] = hub_scores | |
| # Ensure UMAP exists | |
| if 'X_umap' not in adata.obsm: | |
| import scanpy as sc | |
| sc.tl.umap(adata) | |
| sc.pl.umap(adata, color='hub_score', title='Communication Hub Score', show=False) | |
| plt.savefig(save_dir / "hub_scores.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| def plot_pathway_scores(self, pathway_scores: pd.DataFrame, save_dir: Path) -> None: | |
| """Plot pathway activity scores""" | |
| if not PLOTTING_AVAILABLE or pathway_scores.empty: | |
| return | |
| plt.figure(figsize=(12, 8)) | |
| sns.heatmap(pathway_scores, annot=True, cmap='Blues', fmt='.3f') | |
| plt.title('Pathway Activity by Cell Type') | |
| plt.tight_layout() | |
| plt.savefig(save_dir / "pathway_scores.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| def plot_chamber_composition(self, adata, save_dir: Path) -> None: | |
| """Plot chamber composition""" | |
| if not PLOTTING_AVAILABLE: | |
| return | |
| if 'chamber' not in adata.obs.columns: | |
| return | |
| chamber_counts = adata.obs['chamber'].value_counts() | |
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) | |
| # Bar plot | |
| chamber_counts.plot(kind='bar', ax=ax1) | |
| ax1.set_title('Cell Counts by Chamber') | |
| ax1.set_xlabel('Chamber') | |
| ax1.set_ylabel('Number of Cells') | |
| # Pie chart | |
| ax2.pie(chamber_counts.values, labels=chamber_counts.index, autopct='%1.1f%%') | |
| ax2.set_title('Chamber Proportions') | |
| plt.tight_layout() | |
| plt.savefig(save_dir / "chamber_composition.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| def plot_chamber_markers(self, chamber_markers: Dict, save_dir: Path) -> None: | |
| """Plot chamber-specific markers""" | |
| if not PLOTTING_AVAILABLE: | |
| return | |
| fig, axes = plt.subplots(2, 2, figsize=(15, 12)) | |
| axes = axes.flatten() | |
| for i, (chamber, markers) in enumerate(chamber_markers.items()): | |
| if i < 4 and not markers.empty: | |
| top_10 = markers.head(10) | |
| axes[i].barh(range(len(top_10)), -np.log10(top_10['pvals_adj'])) | |
| axes[i].set_yticks(range(len(top_10))) | |
| axes[i].set_yticklabels(top_10['names']) | |
| axes[i].set_xlabel('-log10(adjusted p-value)') | |
| axes[i].set_title(f'Top Markers - {chamber}') | |
| plt.tight_layout() | |
| plt.savefig(save_dir / "chamber_markers.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| def plot_cross_chamber_correlations(self, correlations: pd.DataFrame, save_dir: Path) -> None: | |
| """Plot cross-chamber correlations""" | |
| if not PLOTTING_AVAILABLE: | |
| return | |
| if correlations is None or getattr(correlations, 'empty', True): | |
| return | |
| plt.figure(figsize=(8, 6)) | |
| sns.heatmap(correlations, annot=True, cmap='coolwarm', center=0, fmt='.3f') | |
| plt.title('Cross-Chamber Expression Correlations') | |
| plt.tight_layout() | |
| plt.savefig(save_dir / "cross_chamber_correlations.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| def create_comprehensive_dashboard(self, adata, results: Dict, save_dir: Path) -> None: | |
| """Create comprehensive analysis dashboard""" | |
| if not PLOTTING_AVAILABLE: | |
| return | |
| # Ensure UMAP is computed | |
| if 'X_umap' not in adata.obsm: | |
| if 'neighbors' not in adata.uns: | |
| if 'X_pca' not in adata.obsm: | |
| sc.tl.pca(adata, svd_solver='arpack') | |
| sc.pp.neighbors(adata, n_neighbors=15, n_pcs=40) | |
| sc.tl.umap(adata) | |
| # Create a large multi-panel figure | |
| plt.figure(figsize=(20, 16)) | |
| # Panel 1: UMAP with clusters | |
| ax1 = plt.subplot(3, 3, 1) | |
| sc.pl.umap(adata, color='leiden', ax=ax1, show=False, frameon=False) | |
| ax1.set_title('Cell Type Clusters') | |
| # Panel 2: UMAP with hub scores | |
| ax2 = plt.subplot(3, 3, 2) | |
| sc.pl.umap(adata, color='hub_score', ax=ax2, show=False, frameon=False) | |
| ax2.set_title('Communication Hubs') | |
| # Panel 3: Chamber composition (if available) | |
| if 'chamber' in adata.obs.columns: | |
| ax3 = plt.subplot(3, 3, 3) | |
| chamber_counts = adata.obs['chamber'].value_counts() | |
| ax3.pie(chamber_counts.values, labels=chamber_counts.index, autopct='%1.1f%%') | |
| ax3.set_title('Chamber Distribution') | |
| # Additional panels for other analyses... | |
| plt.tight_layout() | |
| plt.savefig(save_dir / "comprehensive_dashboard.png", dpi=300, bbox_inches='tight') | |
| plt.close() | |
| class ResultsExporter: | |
| """Export analysis results in various formats""" | |
| def __init__(self, config: Config): | |
| self.config = config | |
| def export_results(self, results: Dict, output_dir: Path) -> None: | |
| """Export all results to files""" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Export tables | |
| tables_dir = output_dir / "tables" | |
| tables_dir.mkdir(exist_ok=True) | |
| # Export specific result types | |
| if 'results' in results and 'marker_genes' in results['results']: | |
| marker_genes = results['results']['marker_genes'] | |
| if marker_genes is not None: | |
| marker_genes.to_csv(tables_dir / "marker_genes.csv") | |
| if 'results' in results and 'communication_scores' in results['results']: | |
| comm_scores = results['results']['communication_scores'] | |
| if comm_scores is not None: | |
| comm_scores.to_csv(tables_dir / "communication_scores.csv", index=False) | |
| def generate_comprehensive_report(self, results: Dict, output_dir: str) -> None: | |
| """Generate comprehensive analysis report""" | |
| output_path = Path(output_dir) | |
| # Get basic statistics | |
| adata = results.get('adata') | |
| if adata is None: | |
| return | |
| n_cells = adata.n_obs | |
| n_genes = adata.n_vars | |
| # Generate report | |
| report = f"""# HeartMAP Analysis Report | |
| ## Dataset Overview | |
| - **Total Cells**: {n_cells:,} | |
| - **Total Genes**: {n_genes:,} | |
| ## Analysis Components Completed | |
| - ✅ Cell type annotation | |
| - ✅ Cell-cell communication analysis | |
| - ✅ Multi-chamber analysis | |
| ## Key Findings | |
| ### Cell Type Annotation | |
| """ | |
| if 'leiden' in adata.obs.columns: | |
| cluster_counts = adata.obs['leiden'].value_counts() | |
| report += f"- **Number of cell types identified**: {len(cluster_counts)}\n" | |
| report += "- **Cell type distribution**:\n" | |
| for cluster, count in cluster_counts.head(5).items(): | |
| pct = 100 * count / n_cells | |
| report += f" - Cluster {cluster}: {count:,} cells ({pct:.1f}%)\n" | |
| if 'chamber' in adata.obs.columns: | |
| chamber_counts = adata.obs['chamber'].value_counts() | |
| report += "\n### Chamber Distribution\n" | |
| for chamber, count in chamber_counts.items(): | |
| pct = 100 * count / n_cells | |
| report += f"- **{chamber}**: {count:,} cells ({pct:.1f}%)\n" | |
| report += """ | |
| ### Communication Analysis | |
| - Cell-cell communication patterns identified | |
| - Communication hub cells detected | |
| - Pathway activity scores calculated | |
| ## Files Generated | |
| - `heartmap_complete.h5ad`: Complete processed dataset | |
| - `heartmap_model.pkl`: Trained HeartMAP model | |
| - `figures/`: All visualization outputs | |
| - `tables/`: Exported data tables | |
| ## Next Steps | |
| 1. Validate findings with literature | |
| 2. Investigate specific cell type interactions | |
| 3. Apply model to new datasets | |
| """ | |
| # Save report | |
| with open(output_path / "analysis_report.md", 'w') as f: | |
| f.write(report) | |
| class ChecksumValidator: | |
| """Validate data integrity using checksums""" | |
| def calculate_sha256(file_path: str) -> str: | |
| """Calculate SHA-256 checksum of file""" | |
| sha256_hash = hashlib.sha256() | |
| with open(file_path, "rb") as f: | |
| for byte_block in iter(lambda: f.read(4096), b""): | |
| sha256_hash.update(byte_block) | |
| return sha256_hash.hexdigest() | |
| def verify_file(file_path: str, expected_checksum: str) -> bool: | |
| """Verify file against expected checksum""" | |
| actual_checksum = ChecksumValidator.calculate_sha256(file_path) | |
| return actual_checksum == expected_checksum | |
| def generate_checksums(directory: Union[str, Path], output_file: str) -> None: | |
| """Generate checksums for all files in directory""" | |
| directory = Path(directory) | |
| checksums = {} | |
| for file_path in directory.rglob('*'): | |
| if file_path.is_file(): | |
| rel_path = file_path.relative_to(directory) | |
| checksum = ChecksumValidator.calculate_sha256(str(file_path)) | |
| checksums[str(rel_path)] = checksum | |
| # Save checksums | |
| with open(output_file, 'w') as f: | |
| for rel_path_str, checksum in checksums.items(): | |
| f.write(f"{checksum} {rel_path_str}\n") | |
| def setup_logging(level: str = "INFO") -> None: | |
| """Setup logging configuration""" | |
| import logging | |
| logging.basicConfig( | |
| level=getattr(logging, level.upper()), | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler('heartmap.log') | |
| ] | |
| ) | |
| def create_logger(name: str): | |
| """Create a logger with the given name""" | |
| import logging | |
| return logging.getLogger(name) | |
| # Export utility classes and functions | |
| __all__ = [ | |
| 'Visualizer', | |
| 'ResultsExporter', | |
| 'ChecksumValidator', | |
| 'setup_logging', | |
| 'create_logger' | |
| ] | |