Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import scanpy as sc | |
| import pandas as pd | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Optional | |
| from huggingface_hub import hf_hub_download, snapshot_download | |
| logger = logging.getLogger(__name__) | |
| REPO_ID = 'Angione-Lab/spMetaTME-Atlas' | |
| def get_metadata(): | |
| """Fetch and cache metadata from Hugging Face.""" | |
| try: | |
| metadata_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="sp_metabolic_metadata.csv", | |
| repo_type="dataset" | |
| ) | |
| return pd.read_csv(metadata_path) | |
| except Exception as e: | |
| logger.error(f"Error loading metadata: {e}") | |
| return pd.DataFrame() | |
| def get_organ_stats(meta_df: pd.DataFrame): | |
| """Calculate summary statistics for organs from metadata.""" | |
| if meta_df.empty: | |
| return pd.DataFrame() | |
| # Check if necessary columns exist | |
| if 'organ' not in meta_df.columns: | |
| return pd.DataFrame() | |
| # Try to find a column for reaction count | |
| count_col = 'n_vars' if 'n_vars' in meta_df.columns else ('n_genes' if 'n_genes' in meta_df.columns else None) | |
| # Basic aggregation | |
| stats = meta_df.groupby('organ').agg( | |
| sample_count=('id', 'count') if 'id' in meta_df.columns else ('dataset_title', 'count') | |
| ).reset_index() | |
| # Add average reactions if column exists | |
| if count_col: | |
| avg_stats = meta_df.groupby('organ')[count_col].mean().reset_index() | |
| avg_stats.columns = ['organ', 'avg_reactions'] | |
| stats = stats.merge(avg_stats, on='organ') | |
| else: | |
| stats['avg_reactions'] = 0 | |
| # Sort by sample count descending | |
| stats = stats.sort_values('sample_count', ascending=False) | |
| return stats | |
| def load_metabolic_flux_from_hf(filename: str): | |
| """ | |
| Load spatial metabolic flux data from Hugging Face Hub with caching. | |
| """ | |
| # Priority to local example data for faster dev cycle | |
| example_path = os.path.join(os.getcwd(), "example_data", filename) | |
| if os.path.exists(example_path): | |
| try: | |
| adata = sc.read_h5ad(example_path) | |
| logger.info(f"Loaded {filename} from local example_data folder.") | |
| return adata | |
| except Exception as e: | |
| logger.warning(f"Could not load local {filename}: {e}. Retrying HF.") | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=f"SM/{filename}", | |
| repo_type="dataset" | |
| ) | |
| adata = sc.read_h5ad(local_path) | |
| return adata | |
| except Exception as e: | |
| logger.error(f"Error loading {filename}: {str(e)}") | |
| return None | |
| def download_metabolic_flux_from_hf(filename: str, local_dir: Optional[str] = None): | |
| """ | |
| Download spatial metabolic flux file from Hugging Face Hub to local directory. | |
| """ | |
| try: | |
| if local_dir is None: | |
| local_dir = os.path.expanduser("~/Downloads/spMetaTME-Atlas") | |
| os.makedirs(local_dir, exist_ok=True) | |
| snapshot_download( | |
| repo_id=REPO_ID, | |
| allow_patterns=[f"SM/{filename}"], | |
| repo_type="dataset", | |
| local_dir=local_dir | |
| ) | |
| return local_dir | |
| except Exception as e: | |
| logger.error(f"Error downloading {filename}: {str(e)}") | |
| return None | |
| def process_upload(uploaded_file, data_type: str): | |
| """ | |
| Process uploaded file and return AnnData object. | |
| """ | |
| try: | |
| import tempfile | |
| # Save uploaded file to temp location | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".h5ad") as tmp: | |
| tmp.write(uploaded_file.getvalue()) | |
| temp_path = tmp.name | |
| adata = sc.read_h5ad(temp_path) | |
| # Clean up temp file | |
| os.unlink(temp_path) | |
| return adata | |
| except Exception as e: | |
| logger.error(f"Error loading {data_type} file: {str(e)}") | |
| return None | |