#!/usr/bin/env python
"""
HeartMAP Web Interface - Gradio app for Hugging Face Spaces deployment
Comprehensive chamber-specific cardiac analysis platform
"""
import gradio as gr
import tempfile
import sys
import shutil
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
from pathlib import Path
from typing import Tuple, List, Dict
# Add src to path
sys.path.insert(0, 'src')
try:
import scanpy as sc
import anndata as ad
from heartmap import Config
from heartmap.pipelines import (
BasicPipeline,
ComprehensivePipeline,
MultiChamberPipeline,
AdvancedCommunicationPipeline
)
from heartmap.data.lr_database import get_ligand_receptor_pairs, LigandReceptorDatabase
LR_DATABASE_AVAILABLE = True
HEARTMAP_AVAILABLE = True
except ImportError as e:
print(f"ImportError: {e}")
import traceback
traceback.print_exc()
HEARTMAP_AVAILABLE = False
def load_and_validate_data(uploaded_file) -> Tuple[ad.AnnData, str]:
"""Load and validate uploaded single-cell data in various formats with compatibility fallbacks
Supported formats:
- AnnData: .h5ad, .h5mu, .zarr
- 10X: .h5, .mtx (with genes/barcodes)
- Seurat: .rds, .rdata (h5seurat)
- Loom: .loom
- Text: .csv, .tsv, .txt (expression matrices)
- Archives: .tar, .tar.gz, .tgz
- HDF5: .hdf5, .h5 (generic)
- Parquet: .parquet
- And more...
"""
import h5py
import tarfile
import tempfile
import shutil
from pathlib import Path
# Determine file type
filename = uploaded_file if isinstance(uploaded_file, str) else getattr(uploaded_file, 'name', str(uploaded_file))
file_lower = filename.lower()
filepath = Path(uploaded_file)
# Categorize file types
is_h5ad = file_lower.endswith('.h5ad')
is_h5 = file_lower.endswith('.h5') and not file_lower.endswith('.h5ad') and not file_lower.endswith('.h5mu')
is_tar = file_lower.endswith(('.tar', '.tar.gz', '.tgz'))
is_loom = file_lower.endswith('.loom')
is_mtx = file_lower.endswith(('.mtx', '.mtx.gz'))
is_text = file_lower.endswith(('.csv', '.tsv', '.txt', '.csv.gz', '.tsv.gz', '.txt.gz'))
is_parquet = file_lower.endswith('.parquet')
is_hdf5 = file_lower.endswith('.hdf5')
is_h5mu = file_lower.endswith('.h5mu')
is_zarr = file_lower.endswith('.zarr') or filepath.is_dir()
is_rds = file_lower.endswith(('.rds', '.rdata'))
is_h5seurat = 'seurat' in file_lower and file_lower.endswith('.h5')
is_arrow = file_lower.endswith('.arrow')
is_json = file_lower.endswith(('.json', '.geojson'))
try:
# Handle TAR archives (GEO datasets)
if is_tar:
print("Detected TAR archive (GEO format). Extracting files...")
temp_dir = tempfile.mkdtemp()
try:
# Extract tar file (with filter for Python 3.12+ compatibility)
with tarfile.open(uploaded_file, 'r:*') as tar:
# Use data filter for Python 3.12+ to suppress warning
try:
tar.extractall(temp_dir, filter='data')
except TypeError:
# Fallback for older Python versions that don't support filter
tar.extractall(temp_dir)
print(f"✓ Extracted {len(tar.getmembers())} files")
# Look for compatible files recursively
extracted_files = list(Path(temp_dir).rglob('*'))
# Debug: Show what files were found
print(f" Analyzing {len(extracted_files)} extracted items...")
file_extensions = {}
for f in extracted_files:
if f.is_file():
ext = f.suffix.lower()
file_extensions[ext] = file_extensions.get(ext, 0) + 1
# Also check full name for multi-extension files
if ext == '.gz':
# Check what's before .gz
name_lower = f.name.lower()
if '.h5ad.gz' in name_lower:
file_extensions['.h5ad.gz'] = file_extensions.get('.h5ad.gz', 0) + 1
elif '.h5.gz' in name_lower:
file_extensions['.h5.gz'] = file_extensions.get('.h5.gz', 0) + 1
elif '.mtx.gz' in name_lower:
file_extensions['.mtx.gz'] = file_extensions.get('.mtx.gz', 0) + 1
if file_extensions:
print(f" File types found: {file_extensions}")
# Check for files with full name patterns (including .gz) - RECURSIVELY
h5ad_files = [f for f in extracted_files if f.is_file() and (f.name.lower().endswith('.h5ad') or f.name.lower().endswith('.h5ad.gz'))]
h5_files = [f for f in extracted_files if f.is_file() and (f.name.lower().endswith('.h5') or f.name.lower().endswith('.h5.gz'))
and not f.name.lower().endswith('.h5ad') and not f.name.lower().endswith('.h5ad.gz')]
# Find matrix.mtx files and their parent directories (support nested structures)
mtx_files_found = [f for f in extracted_files if f.is_file() and 'matrix.mtx' in f.name.lower()]
print(f" Found {len(mtx_files_found)} matrix.mtx files")
# For each matrix file, check if its directory contains the required companions
valid_mtx_dirs = []
seen_dirs = set() # Avoid duplicates
for mtx_file in mtx_files_found:
mtx_dir = mtx_file.parent
# Skip if we've already processed this directory
if str(mtx_dir) in seen_dirs:
continue
seen_dirs.add(str(mtx_dir))
# Check for genes/features and barcodes in the same directory
dir_files = [f.name.lower() for f in mtx_dir.iterdir() if f.is_file()]
has_genes = any('genes.tsv' in fn or 'features.tsv' in fn for fn in dir_files)
has_barcodes = any('barcodes.tsv' in fn for fn in dir_files)
if has_genes and has_barcodes:
valid_mtx_dirs.append(mtx_dir)
rel_path = mtx_dir.relative_to(temp_dir) if mtx_dir != Path(temp_dir) else Path('.')
print(f" ✓ Valid 10x directory found: {rel_path}")
mtx_dirs = valid_mtx_dirs
print(f" Total unique valid directories: {len(mtx_dirs)}")
# If no valid directories found yet, try looking for common 10x folder structures
if not mtx_dirs and not h5ad_files and not h5_files:
print(" Searching for common 10x folder structures...")
# Common Cell Ranger output paths
common_paths = [
'filtered_feature_bc_matrix',
'outs/filtered_feature_bc_matrix',
'filtered_gene_bc_matrices',
'raw_feature_bc_matrix',
'outs/raw_feature_bc_matrix'
]
for common_path in common_paths:
search_dirs = list(Path(temp_dir).rglob(common_path))
for search_dir in search_dirs:
if search_dir.is_dir():
dir_files = [f.name.lower() for f in search_dir.iterdir() if f.is_file()]
has_matrix = any('matrix.mtx' in fn for fn in dir_files)
has_genes = any('genes.tsv' in fn or 'features.tsv' in fn for fn in dir_files)
has_barcodes = any('barcodes.tsv' in fn for fn in dir_files)
if has_matrix and has_genes and has_barcodes:
mtx_dirs.append(search_dir)
print(f" ✓ Found 10x data in: {search_dir.relative_to(temp_dir)}")
if h5ad_files:
print(f"✓ Found {len(h5ad_files)} .h5ad file(s). Loading first one...")
h5ad_file = h5ad_files[0]
# If gzipped, decompress first
if h5ad_file.name.endswith('.gz'):
print(f" Decompressing {h5ad_file.name}...")
import gzip
decompressed_path = h5ad_file.parent / h5ad_file.stem # Remove .gz
with gzip.open(h5ad_file, 'rb') as f_in:
with open(decompressed_path, 'wb') as f_out:
f_out.write(f_in.read())
print(f" ✓ Decompressed to {decompressed_path.name}")
adata = sc.read_h5ad(str(decompressed_path))
else:
adata = sc.read_h5ad(str(h5ad_file))
elif h5_files:
print(f"✓ Found {len(h5_files)} .h5 file(s). Loading first one...")
# Try loading as 10X format first, fall back to AnnData h5ad format
h5_loaded = False
for h5_file in h5_files:
# Decompress if gzipped
file_to_load = h5_file
if h5_file.name.endswith('.gz'):
print(f" Decompressing {h5_file.name}...")
import gzip
decompressed_path = h5_file.parent / h5_file.stem # Remove .gz
with gzip.open(h5_file, 'rb') as f_in:
with open(decompressed_path, 'wb') as f_out:
f_out.write(f_in.read())
print(f" ✓ Decompressed to {decompressed_path.name}")
file_to_load = decompressed_path
try:
print(f" Trying to load {file_to_load.name} as 10X format...")
adata = sc.read_10x_h5(str(file_to_load))
h5_loaded = True
print(f" ✓ Successfully loaded as 10X format")
break
except Exception as e1:
print(f" ⚠ Not 10X format: {str(e1)[:100]}")
try:
print(f" Trying to load {file_to_load.name} as AnnData format...")
adata = sc.read_h5ad(str(file_to_load))
h5_loaded = True
print(f" ✓ Successfully loaded as AnnData format")
break
except Exception as e2:
print(f" ⚠ Not AnnData format: {str(e2)[:100]}")
continue
if not h5_loaded:
print(" ⚠ No .h5 files could be loaded, trying other formats...")
adata = None
elif mtx_dirs:
print(f"✓ Found {len(mtx_dirs)} matrix.mtx directories. Loading first valid one...")
adata = None
# Try each directory until one loads successfully
for idx, mtx_dir in enumerate(mtx_dirs):
try:
print(f" Attempting to load from: {mtx_dir.relative_to(temp_dir) if mtx_dir != Path(temp_dir) else 'root'}")
# List all files in this directory for debugging
all_files = [f for f in mtx_dir.iterdir() if f.is_file()]
print(f" Files in directory: {[f.name for f in all_files[:10]]}") # Show first 10
# Find the actual files (case-insensitive)
mtx_files = [f for f in all_files if 'matrix.mtx' in f.name.lower()]
gene_files = [f for f in all_files if 'genes.tsv' in f.name.lower() or 'features.tsv' in f.name.lower()]
barcode_files = [f for f in all_files if 'barcodes.tsv' in f.name.lower()]
print(f" Matrix files: {[f.name for f in mtx_files]}")
print(f" Gene/feature files: {[f.name for f in gene_files]}")
print(f" Barcode files: {[f.name for f in barcode_files]}")
if not mtx_files or not gene_files or not barcode_files:
print(f" ⚠ Missing required files, skipping...")
continue
# Check if files have standard names or prefixed names
has_standard_names = any(f.name.lower() in ['matrix.mtx', 'matrix.mtx.gz'] for f in mtx_files)
if not has_standard_names and len(mtx_files) > 0:
# Multiple samples with prefixed names - need to handle differently
print(f" ⚠ Non-standard naming detected ({len(mtx_files)} samples with prefixes)")
print(f" This archive contains multiple samples. Loading first sample: {mtx_files[0].stem}")
# Find matching features and barcodes for the first sample
# Extract sample prefix (e.g., 'GSM4307515_N-1-LVP' from 'GSM4307515_N-1-LVP_matrix.mtx.gz')
first_mtx = mtx_files[0]
sample_prefix = first_mtx.name.replace('_matrix.mtx.gz', '').replace('_matrix.mtx', '')
matching_features = [f for f in gene_files if sample_prefix in f.name]
matching_barcodes = [f for f in barcode_files if sample_prefix in f.name]
if not matching_features or not matching_barcodes:
print(f" ⚠ Could not find matching features/barcodes for {sample_prefix}")
continue
# Decompress and create symlinks with standard names
import gzip
standard_matrix = mtx_dir / 'matrix.mtx'
standard_features = mtx_dir / 'features.tsv'
standard_barcodes = mtx_dir / 'barcodes.tsv'
# Decompress and copy to standard names
for src, dst in [(first_mtx, standard_matrix),
(matching_features[0], standard_features),
(matching_barcodes[0], standard_barcodes)]:
if src.name.endswith('.gz'):
print(f" Extracting {src.name} → {dst.name}")
with gzip.open(src, 'rb') as f_in:
with open(dst, 'wb') as f_out:
f_out.write(f_in.read())
else:
import shutil
shutil.copy(src, dst)
print(f" ✓ Created standard 10x structure for sample: {sample_prefix}")
else:
# Standard naming - decompress .gz files if present
import gzip
for file_list in [mtx_files, gene_files, barcode_files]:
for f in file_list:
if f.name.endswith('.gz'):
decompressed_path = f.parent / f.name[:-3] # Remove .gz
if not decompressed_path.exists():
print(f" Decompressing {f.name}...")
with gzip.open(f, 'rb') as f_in:
with open(decompressed_path, 'wb') as f_out:
f_out.write(f_in.read())
print(f" ✓ Decompressed to {decompressed_path.name}")
# Try to read the MTX directory
adata = sc.read_10x_mtx(str(mtx_dir))
print(f" ✓ Successfully loaded MTX format from directory {idx+1}/{len(mtx_dirs)}")
break # Success, exit loop
except Exception as mtx_err:
print(f" ⚠ Failed to load from directory {idx+1}: {str(mtx_err)[:200]}")
if idx < len(mtx_dirs) - 1:
print(f" Trying next directory...")
continue
if adata is None:
print(f" ⚠ Could not load any of the {len(mtx_dirs)} matrix directories")
else:
adata = None
# If h5 files failed or no standard format found, try text files
if adata is None:
print("Searching for readable text files...")
adata = None
for ext_file in extracted_files:
if ext_file.is_file():
try:
if ext_file.suffix in ['.txt', '.csv', '.tsv', '.txt.gz', '.csv.gz', '.tsv.gz']:
print(f" Attempting to read as expression matrix: {ext_file.name}")
adata = sc.read_csv(str(ext_file), delimiter='\t' if 'tsv' in ext_file.name else ',')
break
except Exception as read_err:
print(f"⚠ Failed to read {ext_file.name}: {str(read_err)}")
continue
if adata is None:
shutil.rmtree(temp_dir)
# Provide detailed error message about what was found
error_details = f" **TAR Archive Extraction Failed**\n\n"
error_details += f"Extracted {len([f for f in extracted_files if f.is_file()])} files but couldn't find compatible single-cell data.\n\n"
error_details += f"**File types detected:** {file_extensions}\n\n"
error_details += f"**What we looked for:**\n"
error_details += f"- .h5ad files: Found {len(h5ad_files)}\n"
error_details += f"- .h5 files: Found {len(h5_files)}\n"
error_details += f"- matrix.mtx directories (with genes + barcodes): Found {len(valid_mtx_dirs)}\n\n"
error_details += f"**Common issues:**\n"
error_details += f"- Files are in non-standard nested directories\n"
error_details += f"- Missing required companion files (genes.tsv, barcodes.tsv)\n"
error_details += f"- Files use different compression or naming\n\n"
error_details += f"**Recommendation:** Extract the archive locally, locate the filtered_feature_bc_matrix folder or .h5ad file, and upload that specific file/folder."
return None, error_details
if adata is not None:
print(f"✓ Successfully loaded from TAR archive: {adata.n_obs:,} cells × {adata.n_vars:,} genes")
else:
shutil.rmtree(temp_dir)
# Provide detailed error message
error_details = f" **TAR Archive Loading Failed**\n\n"
error_details += f"Found potential data files but couldn't load them.\n\n"
error_details += f"**File types detected:** {file_extensions}\n\n"
error_details += f"**Files found:**\n"
error_details += f"- .h5ad: {len(h5ad_files)}\n"
error_details += f"- .h5: {len(h5_files)}\n"
error_details += f"- Valid matrix.mtx directories: {len(valid_mtx_dirs)}\n\n"
error_details += f"Try extracting and uploading the data file directly instead of as TAR."
return None, error_details
# Cleanup temp directory
shutil.rmtree(temp_dir)
except Exception as e:
if Path(temp_dir).exists():
shutil.rmtree(temp_dir)
raise e
# Try reading based on file type
elif is_h5ad:
print(" Loading AnnData (.h5ad)...")
# Try with retry logic for file locking issues
import time
max_retries = 3
retry_delay = 2 # seconds
for attempt in range(max_retries):
try:
# Use 'r' mode explicitly and ensure file is properly closed
adata = sc.read_h5ad(uploaded_file, backed=None)
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
break
except PermissionError as perm_err:
if attempt < max_retries - 1:
print(f" ⚠ File locked, retrying in {retry_delay}s... (attempt {attempt + 1}/{max_retries})")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
# If all retries fail, try copying to a new location
print(f" ⚠ File still locked after {max_retries} attempts, trying workaround...")
try:
import shutil
temp_copy = tempfile.NamedTemporaryFile(suffix='.h5ad', delete=False)
temp_copy_path = temp_copy.name
temp_copy.close()
# Copy file to new location
shutil.copy2(uploaded_file, temp_copy_path)
print(f" ✓ Copied to temporary location")
# Try reading from copy
adata = sc.read_h5ad(temp_copy_path, backed=None)
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
# Clean up temp file
try:
Path(temp_copy_path).unlink()
except:
pass
break
except Exception as copy_err:
return None, (f" **File Access Error**\n\n"
f"Cannot access the uploaded file. This usually happens when:\n"
f"- The file is open in another program\n"
f"- Antivirus is scanning the file\n"
f"- Insufficient permissions\n\n"
f"**Solution:** Close the file in other programs and try again.\n\n"
f"Technical details: {str(perm_err)}")
elif is_h5mu:
print(" Loading MuData (.h5mu)...")
try:
import mudata
mdata = mudata.read_h5mu(uploaded_file)
adata = mdata.mod[list(mdata.mod.keys())[0]]
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
except ImportError:
return None, " MuData support requires: pip install mudata"
elif is_zarr:
print(" Loading Zarr array...")
adata = sc.read_zarr(uploaded_file)
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
elif is_h5 and not is_h5seurat:
print(" Loading 10X Genomics (.h5)...")
adata = sc.read_10x_h5(uploaded_file)
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
elif is_mtx:
print(" Loading Matrix Market (.mtx)...")
mtx_dir = filepath.parent
if (mtx_dir / 'genes.tsv').exists():
adata = sc.read_10x_mtx(str(mtx_dir))
else:
adata = sc.read_mtx(uploaded_file).T
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
elif is_loom:
print(" Loading Loom (.loom)...")
adata = sc.read_loom(uploaded_file)
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
elif is_text:
print(f" Loading text matrix ({filepath.suffix})...")
delim = '\t' if 'tsv' in file_lower else ','
adata = sc.read_csv(uploaded_file, delimiter=delim, first_column_names=True)
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
elif is_parquet or is_arrow:
print(f" Loading {'Parquet' if is_parquet else 'Arrow'}...")
try:
import pandas as pd
df = pd.read_parquet(uploaded_file) if is_parquet else pd.read_feather(uploaded_file)
adata = ad.AnnData(df)
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
except ImportError:
return None, " Parquet/Arrow requires: pip install pyarrow"
elif is_hdf5:
print(" Loading generic HDF5...")
with h5py.File(uploaded_file, 'r') as f:
X = f.get('matrix', f.get('X', f.get('data', None)))
if X is None:
return None, " HDF5 file missing required keys (matrix/X/data)"
adata = ad.AnnData(X[()])
print(f"✓ {adata.n_obs:,} cells × {adata.n_vars:,} genes")
elif is_h5seurat or is_rds:
filename = filepath.name
return None, (f" **Seurat/R Format Detected: `{filename}`**\n\n"
f"This file format requires conversion to AnnData (.h5ad) before analysis.\n\n"
f"**Option 1: Using SeuratDisk (Recommended)**\n"
f"```r\n"
f"# In R/RStudio:\n"
f"library(Seurat)\n"
f"library(SeuratDisk)\n\n"
f"# Load your Seurat object\n"
f"seurat_obj <- readRDS('{filename}')\n\n"
f"# Convert to h5ad format\n"
f"SaveH5Seurat(seurat_obj, filename = 'output.h5Seurat')\n"
f"Convert('output.h5Seurat', dest = 'h5ad')\n"
f"```\n\n"
f"**Option 2: Using sceasy**\n"
f"```r\n"
f"library(sceasy)\n"
f"sceasy::convertFormat(seurat_obj, from='seurat', to='anndata',\n"
f" outFile='output.h5ad')\n"
f"```\n\n"
f"Then upload the generated `output.h5ad` file to HeartMAP.\n\n"
f"**Need help?** See [FORMAT_SUPPORT_GUIDE.md](https://github.com/Tumo505/HeartMap/blob/master/FORMAT_SUPPORT_GUIDE.md) for detailed instructions.")
else:
return None, (f" Unsupported format: {filepath.suffix}\n\n"
f"**Supported formats:**\n"
f"• AnnData: .h5ad, .h5mu (MuData), .zarr\n"
f"• 10X: .h5, .mtx (Matrix Market)\n"
f"• Loom: .loom\n"
f"• Text: .csv, .tsv, .txt\n"
f"• Archives: .tar, .tar.gz, .tgz\n"
f"• Columnar: .parquet, .arrow\n"
f"• HDF5: .hdf5\n\n"
f"For FASTQ/BAM/CRAM, use Cell Ranger or similar tools first.")
except Exception as e:
# Handle version compatibility issues
error_str = str(e)
is_compat_error = (
"IOSpec" in error_str or
"encoding_type" in error_str or
"No read method registered" in error_str or
"unexpected keyword argument" in error_str or
"'matrix'" in error_str or
"AnnData.__init__" in error_str
)
if is_compat_error:
print(f"AnnData version compatibility issue detected: {error_str[:100]}")
print(f" Attempting fallback read method...")
try:
# Fallback 1: Read with backed mode then load into memory
adata = sc.read_h5ad(uploaded_file, backed='r')
adata = adata.to_memory()
print("✓ Successfully loaded using backed mode")
except Exception as e2:
try:
# Fallback 2: Read manually from h5py with proper structure handling
print(" Attempting manual h5py read...")
with h5py.File(uploaded_file, 'r') as f:
# Read X matrix (handle different storage formats)
if 'X' in f:
X_group = f['X']
if isinstance(X_group, h5py.Dataset):
X = X_group[:]
else:
# Sparse matrix format
try:
from scipy import sparse
data = X_group['data'][:]
indices = X_group['indices'][:]
indptr = X_group['indptr'][:]
shape = X_group['shape'][:]
X = sparse.csr_matrix((data, indices, indptr), shape=shape)
except:
X = X_group['data'][:] # Fallback to data only
else:
raise ValueError("No X matrix found in file")
# Read obs (cell metadata)
obs_dict = {}
if 'obs' in f:
obs_group = f['obs']
for key in obs_group.keys():
try:
data = obs_group[key][:]
# Decode bytes if necessary
if data.dtype.kind == 'S' or data.dtype.kind == 'O':
data = [x.decode('utf-8') if isinstance(x, bytes) else str(x) for x in data]
obs_dict[key] = data
except Exception as e_key:
print(f"Skipping obs key '{key}': {e_key}")
obs = pd.DataFrame(obs_dict) if obs_dict else pd.DataFrame(index=range(X.shape[0]))
# Read var (gene metadata)
var_dict = {}
if 'var' in f:
var_group = f['var']
for key in var_group.keys():
try:
data = var_group[key][:]
# Decode bytes if necessary
if data.dtype.kind == 'S' or data.dtype.kind == 'O':
data = [x.decode('utf-8') if isinstance(x, bytes) else str(x) for x in data]
var_dict[key] = data
except Exception as e_key:
print(f"Skipping var key '{key}': {e_key}")
var = pd.DataFrame(var_dict) if var_dict else pd.DataFrame(index=range(X.shape[1]))
# Create basic AnnData object (skip problematic uns)
adata = ad.AnnData(X=X, obs=obs, var=var)
print("✓ Successfully loaded using manual h5py read")
except Exception as e3:
raise ValueError(
f"Unable to load file with any method.\n\n"
f"**Primary error:** {str(e)}\n\n"
f"**Suggestions:**\n"
f"1. The file may have been created with a newer AnnData version\n"
f"2. Try re-saving the file with: `adata.write('file.h5ad', compression='gzip')`\n"
f"3. Or use an older AnnData format: `adata.write_h5ad('file.h5ad', as_dense='X')`\n\n"
f"**Technical details:** {str(e3)}"
)
else:
# Not a compatibility error - return the original error
import traceback
return None, (f" **Error loading data:**\n\n{str(e)}\n\n"
f"**Traceback:**\n```\n{traceback.format_exc()}\n```")
try:
# Check for chamber information
chamber_info = ""
if 'chamber' in adata.obs.columns:
chambers = adata.obs['chamber'].unique()
chamber_info = f"\nChamber information detected: {', '.join(chambers)}"
else:
# Try to infer chamber from various metadata fields
chamber_assigned = False
# Debug: Show available metadata columns
print(f"Available metadata columns: {list(adata.obs.columns)}")
# Check for common chamber-related column names
chamber_keywords = ['tissue', 'location', 'sample', 'orig.ident', 'biosample', 'cell', 'batch', 'donor', 'patient']
for col in adata.obs.columns:
if any(keyword in col.lower() for keyword in chamber_keywords):
print(f"Attempting to infer chamber from column: {col}")
# Show sample values
sample_values = adata.obs[col].unique()[:5]
print(f" Sample values: {sample_values}")
values = adata.obs[col].astype(str).str.upper()
# Map common chamber identifiers
def map_chamber(val):
val = val.upper()
if any(x in val for x in ['RA', 'RIGHT ATRI', 'R_ATRI']):
return 'RA'
elif any(x in val for x in ['RV', 'RIGHT VENT', 'R_VENT']):
return 'RV'
elif any(x in val for x in ['LA', 'LEFT ATRI', 'L_ATRI']):
return 'LA'
elif any(x in val for x in ['LV', 'LEFT VENT', 'L_VENT']):
return 'LV'
elif any(x in val for x in ['ATRI']):
return 'RA' # Default atrium
elif any(x in val for x in ['VENT']):
return 'LV' # Default ventricle
else:
return None
adata.obs['chamber'] = adata.obs[col].apply(map_chamber)
# Check if we successfully assigned chambers
if adata.obs['chamber'].notna().sum() > 0:
chambers = adata.obs['chamber'].dropna().unique()
if len(chambers) > 1:
chamber_info = f"\nChamber information inferred from '{col}': {', '.join(chambers)}"
chamber_assigned = True
break
else:
# Fill NaN with the single detected chamber
adata.obs['chamber'] = adata.obs['chamber'].fillna(chambers[0])
chamber_info = f"\nSingle chamber detected from '{col}': {chambers[0]}"
chamber_assigned = True
break
if not chamber_assigned:
chamber_info = "\n⚠ No chamber information found"
chamber_info += "\n Single-chamber analysis will be performed"
chamber_info += "\n For multi-chamber analysis, data should have 'chamber' column with values: RA, RV, LA, LV"
# Don't assign a default chamber - let the analysis handle missing chamber info
adata.obs['chamber'] = 'Unknown'
validation_msg = f"""
Data loaded successfully!
- Cells: {adata.n_obs:,}
- Genes: {adata.n_vars:,}
{chamber_info}
"""
return adata, validation_msg
except Exception as e:
raise ValueError(f"Error validating loaded data: {str(e)}")
def create_communication_network(adata, hub_stats, chamber_stats=None):
"""
Create interactive Plotly network graph showing inferred cell-cell communication
based on co-expression of ligand-receptor genes
Args:
adata: AnnData object with clustering and gene expression
hub_stats: DataFrame with hub scores per cell type
chamber_stats: Optional DataFrame with chamber information
Returns:
Path to HTML file with interactive network
"""
import networkx as nx
try:
# Detect cluster column (try multiple common names)
cluster_col = None
for col in ['leiden', 'louvain', 'Cluster', 'cluster', 'cell_type', 'celltype']:
if col in adata.obs.columns:
cluster_col = col
break
if cluster_col is None:
print(" No clustering column found, skipping network graph")
return None
print(f" Using clustering column: '{cluster_col}'")
cell_types = adata.obs[cluster_col].unique()
n_types = len(cell_types)
# Load ligand-receptor pairs from database
print(" Loading ligand-receptor database...")
if LR_DATABASE_AVAILABLE:
try:
ligand_receptor_pairs = get_ligand_receptor_pairs(
adata,
resource='consensus',
confidence_threshold=0.7
)
print(f" Loaded {len(ligand_receptor_pairs)} ligand-receptor pairs from database")
except Exception as e:
print(f" Warning: Could not load L-R database: {e}")
print(" Using minimal fallback pairs")
ligand_receptor_pairs = [
('VEGFA', 'FLT1'), ('VEGFA', 'KDR'),
('TGFB1', 'TGFBR1'), ('TGFB1', 'TGFBR2'),
('FGF2', 'FGFR1'), ('IL6', 'IL6R'),
('TNF', 'TNFRSF1A'), ('CXCL12', 'CXCR4')
]
else:
print(" L-R database not available, using minimal fallback pairs")
ligand_receptor_pairs = [
('VEGFA', 'FLT1'), ('VEGFA', 'KDR'),
('TGFB1', 'TGFBR1'), ('TGFB1', 'TGFBR2'),
('FGF2', 'FGFR1'), ('IL6', 'IL6R'),
('TNF', 'TNFRSF1A'), ('CXCL12', 'CXCR4')
]
# Calculate mean expression per cell type
print(" Calculating cell type expression profiles...")
cell_type_expression = {}
for cell_type in cell_types:
cell_mask = adata.obs[cluster_col] == cell_type
# Convert pandas Series to numpy array for scipy sparse matrix indexing
cell_mask_array = cell_mask.values if hasattr(cell_mask, 'values') else np.asarray(cell_mask)
if hasattr(adata.X, 'toarray'):
subset_expr = adata.X[cell_mask_array].toarray()
else:
subset_expr = adata.X[cell_mask_array]
# Calculate mean and ensure it's a numpy array
mean_expr = np.mean(subset_expr, axis=0)
if hasattr(mean_expr, 'A1'):
mean_expr = mean_expr.A1
elif hasattr(mean_expr, 'values'):
mean_expr = mean_expr.values
# Ensure it's flattened
mean_expr = np.asarray(mean_expr).flatten()
cell_type_expression[str(cell_type)] = mean_expr
# Create network graph
G = nx.DiGraph() # Directed graph for ligand->receptor
# Add nodes for each cell type with metadata
node_info = []
for i, cell_type in enumerate(cell_types):
cell_mask = adata.obs[cluster_col] == cell_type
n_cells = int(cell_mask.sum())
# Get hub score if available
hub_score = 0
if hub_stats is not None and len(hub_stats) > 0:
type_label = f"Cluster {cell_type}"
matching = hub_stats[hub_stats['Cell Type'] == type_label]
if len(matching) > 0:
hub_score = matching.iloc[0]['Hub Score']
# Get chamber distribution
chamber_dist = ""
if chamber_stats is not None and 'chamber' in adata.obs.columns:
type_chambers = adata.obs[cell_mask]['chamber'].value_counts()
chamber_dist = ", ".join([f"{ch}: {cnt}" for ch, cnt in type_chambers.items()])
node_info.append({
'id': str(cell_type),
'label': f"Cluster {cell_type}",
'size': int(n_cells),
'hub_score': float(hub_score),
'chamber_dist': chamber_dist,
'color_idx': i
})
G.add_node(str(cell_type),
size=int(n_cells),
hub_score=float(hub_score),
label=f"Cluster {cell_type}",
color_idx=i)
# Infer communication edges from ligand-receptor co-expression
print(" Inferring cell-cell communication from gene expression...")
edge_info = []
for ligand, receptor in ligand_receptor_pairs:
# Check if genes exist in dataset
if ligand not in adata.var_names or receptor not in adata.var_names:
continue
ligand_idx = list(adata.var_names).index(ligand)
receptor_idx = list(adata.var_names).index(receptor)
# Find cell types that express ligand and receptor
for source_type in cell_types:
ligand_expr = float(cell_type_expression[str(source_type)][ligand_idx])
if ligand_expr > 0.1: # Threshold for meaningful expression
for target_type in cell_types:
if source_type == target_type:
continue
receptor_expr = float(cell_type_expression[str(target_type)][receptor_idx])
if receptor_expr > 0.1: # Both genes expressed
# Calculate interaction strength
strength = float(np.sqrt(ligand_expr * receptor_expr))
edge_key = (str(source_type), str(target_type))
edge_info.append({
'source': str(source_type),
'target': str(target_type),
'ligand': ligand,
'receptor': receptor,
'strength': strength
})
# Add or update edge
if G.has_edge(str(source_type), str(target_type)):
G[str(source_type)][str(target_type)]['weight'] += strength
G[str(source_type)][str(target_type)]['interactions'].append(f"{ligand}-{receptor}")
else:
G.add_edge(str(source_type), str(target_type),
weight=strength,
interactions=[f"{ligand}-{receptor}"])
print(f" Found {G.number_of_edges()} communication interactions between {G.number_of_nodes()} cell types")
# Create Plotly figure with layout
pos = nx.spring_layout(G, k=2.5, iterations=50, seed=42, weight='weight')
# Create edge traces (one per edge for hover info)
edge_traces = []
for edge in G.edges(data=True):
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
weight = edge[2].get('weight', 0)
interactions = edge[2].get('interactions', [])
# Create arrow shape for directed edge
edge_trace = go.Scatter(
x=[x0, x1, None],
y=[y0, y1, None],
mode='lines',
line=dict(
width=max(0.5, min(5, weight * 2)), # Scale by strength
color='rgba(150, 150, 150, 0.5)'
),
hoverinfo='text',
hovertext=f"{edge[0]} → {edge[1]}
" +
f"Interaction strength: {weight:.3f}
" +
f"Ligand-Receptor pairs: {len(interactions)}
" +
"
".join(interactions[:5]) + # Show first 5
(f"
... and {len(interactions)-5} more" if len(interactions) > 5 else ""),
showlegend=False
)
edge_traces.append(edge_trace)
# Generate distinct colors for cell types
import plotly.colors as pcolors
if n_types <= 10:
colors = pcolors.qualitative.Set3[:n_types]
else:
colors = pcolors.sample_colorscale("turbo", [i/n_types for i in range(n_types)])
# Create separate trace for each cell type (to show in legend)
node_traces = []
for node_data in node_info:
node_id = node_data['id']
if node_id not in pos:
continue
x, y = pos[node_id]
label = node_data['label']
size = node_data['size']
hub_score = node_data['hub_score']
chamber_dist = node_data['chamber_dist']
color_idx = node_data['color_idx']
# Count outgoing and incoming communications
out_edges = G.out_degree(node_id)
in_edges = G.in_degree(node_id)
hover_text = (
f"{label}
"
f"Cells: {size:,}
"
f"Hub Score: {hub_score:.4f}
"
f"Sends signals to: {out_edges} cell types
"
f"Receives from: {in_edges} cell types
"
f"{chamber_dist}"
)
node_trace = go.Scatter(
x=[x],
y=[y],
mode='markers+text',
marker=dict(
size=max(15, min(60, size / 20)), # Scale by cell count
color=colors[color_idx % len(colors)],
line=dict(width=2, color='white'),
symbol='circle'
),
text=label.replace('Cluster ', 'C'),
textposition="top center",
textfont=dict(size=10, color='black'),
hoverinfo='text',
hovertext=hover_text,
name=label,
legendgroup=label,
showlegend=True
)
node_traces.append(node_trace)
# Store edge information for click interactions (JSON format for JavaScript)
import json
edge_data_for_js = []
for edge in G.edges(data=True):
interactions = edge[2].get('interactions', [])
edge_data_for_js.append({
'source': edge[0],
'target': edge[1],
'weight': float(edge[2].get('weight', 0)),
'interactions': interactions,
'source_pos': list(pos[edge[0]]),
'target_pos': list(pos[edge[1]])
})
# Store node information for highlighting
node_data_for_js = []
for node_data in node_info:
if node_data['id'] in pos:
node_data_for_js.append({
'id': node_data['id'],
'label': node_data['label'],
'pos': list(pos[node_data['id']]),
'connected_to': [str(t) for t in G.successors(node_data['id'])],
'connected_from': [str(s) for s in G.predecessors(node_data['id'])]
})
# Create figure with all traces
fig = go.Figure(data=edge_traces + node_traces)
fig.update_layout(
title={
'text': "Interactive Cell-Cell Communication Network
Click nodes to highlight connections | Click edges to see L-R pairs",
'x': 0.5,
'xanchor': 'center',
'font': {'size': 20}
},
showlegend=True,
legend=dict(
title="Cell Types (click to toggle)",
orientation="v",
yanchor="top",
y=1,
xanchor="left",
x=1.05,
bgcolor="rgba(255,255,255,0.8)",
bordercolor="gray",
borderwidth=1
),
hovermode='closest',
width=1400,
height=900,
plot_bgcolor='#f8f9fa',
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
annotations=[
dict(
text="🖱️ Click cluster to see detailed view with all incoming/outgoing communications | 🖱️ Click edge to see L-R pairs",
showarrow=False,
xref="paper", yref="paper",
x=0.5, y=-0.02,
xanchor='center',
font=dict(size=11, color='gray')
)
],
margin=dict(l=20, r=250, t=80, b=40),
clickmode='event+select'
)
# Save as HTML with custom JavaScript for interactivity
html_path = tempfile.NamedTemporaryFile(mode='w', suffix='.html', delete=False).name
# Generate base HTML
fig.write_html(
html_path,
include_plotlyjs='cdn',
config={
'displayModeBar': True,
'displaylogo': False,
'toImageButtonOptions': {
'format': 'png',
'filename': 'communication_network',
'height': 800,
'width': 1200,
'scale': 2
}
}
)
# Add custom JavaScript for node/edge interactivity
with open(html_path, 'r', encoding='utf-8') as f:
html_content = f.read()
# Inject JavaScript before
Comprehensive view of cell population and communication patterns
# Use triple quotes and escape only what's needed edge_data_json = json.dumps(edge_data_for_js) node_data_json = json.dumps(node_data_for_js) custom_js = ''' ''' # Insert JavaScript before html_content = html_content.replace('', custom_js + '') # Write back with open(html_path, 'w', encoding='utf-8') as f: f.write(html_content) print(f"✓ Created interactive communication network: {html_path}") return html_path except Exception as e: import traceback print(f"Warning: Could not create network graph: {e}") print(f"Traceback: {traceback.format_exc()}") return None def create_cluster_detail_view(adata, cluster_id, cluster_col='leiden'): """ Create detailed view of a specific cluster showing: 1. Cell type composition and statistics 2. Sub-network of cells within that cluster 3. Communication patterns (incoming/outgoing) Args: adata: AnnData object cluster_id: The cluster ID to analyze cluster_col: Column name containing cluster assignments Returns: Dictionary with cluster details and HTML visualizations """ import networkx as nx try: # Get cells in this cluster cluster_mask = adata.obs[cluster_col] == cluster_id cluster_adata = adata[cluster_mask].copy() n_cells = cluster_adata.n_obs # Calculate cluster statistics stats = { 'cluster_id': str(cluster_id), 'n_cells': int(n_cells), 'n_genes': int(cluster_adata.n_vars), 'total_counts_mean': float(np.mean(cluster_adata.obs.get('total_counts', [0]))), 'n_genes_by_counts_mean': float(np.mean(cluster_adata.obs.get('n_genes_by_counts', [0]))), } # Get chamber distribution if available if 'chamber' in cluster_adata.obs.columns: chamber_counts = cluster_adata.obs['chamber'].value_counts() stats['chamber_distribution'] = chamber_counts.to_dict() # Get top expressed genes in this cluster if hasattr(cluster_adata.X, 'toarray'): mean_expr = np.mean(cluster_adata.X.toarray(), axis=0) else: mean_expr = np.mean(cluster_adata.X, axis=0) if hasattr(mean_expr, 'A1'): mean_expr = mean_expr.A1 mean_expr = np.asarray(mean_expr).flatten() top_gene_indices = np.argsort(mean_expr)[-20:][::-1] top_genes = [(str(cluster_adata.var_names[i]), float(mean_expr[i])) for i in top_gene_indices] stats['top_genes'] = top_genes # Create HTML visualization html_content = f"""