Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| BigWig export functionality for NTv3 tracks. | |
| """ | |
| import os | |
| import tempfile | |
| import uuid | |
| import zipfile | |
| from typing import TYPE_CHECKING | |
| import numpy as np | |
| try: | |
| import pyBigWig # noqa: N816 | |
| except ImportError: | |
| pyBigWig = None # noqa: N816 | |
| if TYPE_CHECKING: | |
| from ntv3_tracks_pipeline import NTv3TracksOutput | |
| def _softmax_last(x: np.ndarray) -> np.ndarray: | |
| """Compute softmax over the last dimension.""" | |
| x = x - x.max(axis=-1, keepdims=True) | |
| ex = np.exp(x) | |
| return ex / ex.sum(axis=-1, keepdims=True) | |
| def create_bigwig_zip( | |
| out: "NTv3TracksOutput", | |
| bigwig_selected: list[str], | |
| bed_elements: list[str], | |
| ) -> str: | |
| """ | |
| Create BigWig files for selected tracks and save them in a zip file. | |
| Parameters | |
| ---------- | |
| out : NTv3TracksOutput | |
| The prediction output from the pipeline. | |
| bigwig_selected : list[str] | |
| List of BigWig track IDs to export. | |
| bed_elements : list[str] | |
| List of BED element names to export. | |
| Returns | |
| ------- | |
| str | |
| Path to the created zip file containing BigWig files. | |
| Raises | |
| ------ | |
| ImportError | |
| If pyBigWig is not installed. | |
| ValueError | |
| If no predictions are available or no tracks are selected. | |
| """ | |
| if pyBigWig is None: | |
| raise ImportError( | |
| "pyBigWig is required for BigWig export. Install with: pip install pyBigWig" | |
| ) | |
| if out is None: | |
| raise ValueError("No predictions available. Please run a prediction first.") | |
| bw_names = out.bigwig_track_names or [] | |
| bw_logits = out.bigwig_tracks_logits | |
| bed_names = out.bed_element_names or [] | |
| bed_logits = out.bed_tracks_logits | |
| if bw_logits is None or not bw_names: | |
| raise ValueError("No BigWig tracks available in model output.") | |
| # Get genomic coordinates | |
| chrom = out.chrom | |
| if chrom is None: | |
| raise ValueError( | |
| "Chromosome information not available. Use genomic coordinates." | |
| ) | |
| start = out.start | |
| end = out.end | |
| if start is None or end is None: | |
| raise ValueError("Start and end coordinates are required for BigWig export.") | |
| window_len = out.window_len or (end - start) | |
| # Calculate prediction region (center 37.5%) | |
| if out.pred_start is not None: | |
| pred_start = out.pred_start | |
| else: | |
| pred_start = start + int(window_len * 0.3125) | |
| # Create temporary directory for BigWig files | |
| tmpdir = tempfile.gettempdir() | |
| output_dir = os.path.join(tmpdir, f"bigwig_outputs_{uuid.uuid4().hex}") | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Prepare track data list | |
| track_data_list = [] | |
| # Add BigWig tracks | |
| for track_id in bigwig_selected: | |
| if track_id in bw_names: | |
| idx = bw_names.index(track_id) | |
| track_data_list.append(("bigwig", track_id, idx, None)) | |
| # Add BED elements (as probabilities) | |
| if bed_logits is not None and bed_elements: | |
| probs = _softmax_last(bed_logits) | |
| for elem_name in bed_elements: | |
| if elem_name in bed_names: | |
| eidx = bed_names.index(elem_name) | |
| # Store as bed element with probability data | |
| track_data_list.append(("bed", elem_name, eidx, probs[:, eidx, 1])) | |
| if not track_data_list: | |
| raise ValueError("No tracks selected for export.") | |
| # Create BigWig files | |
| created_files = [] | |
| for track_type, track_id, track_idx, bed_probs in track_data_list: | |
| if track_type == "bigwig": | |
| track_data = bw_logits[:, track_idx].astype(np.float32) | |
| display_name = track_id | |
| else: # bed | |
| if bed_probs is None: | |
| continue | |
| track_data = bed_probs.astype(np.float32) | |
| display_name = track_id | |
| # Clean filename | |
| clean_name = display_name.replace(" ", "_").replace("/", "_").replace("-", "_") | |
| bw_filename = os.path.join(output_dir, f"{clean_name}.bw") | |
| # Create BigWig file | |
| bw = pyBigWig.open(bw_filename, "w") | |
| # Add header - use end of genomic window as chromosome size | |
| bw.addHeader([(chrom, end)]) | |
| # Add entries | |
| num_positions = len(track_data) | |
| starts = np.arange(pred_start, pred_start + num_positions, dtype=np.int64) | |
| ends = starts + 1 | |
| values = track_data.tolist() | |
| bw.addEntries( | |
| chroms=[chrom] * len(starts), | |
| starts=starts.tolist(), | |
| ends=ends.tolist(), | |
| values=values, | |
| ) | |
| bw.close() | |
| created_files.append(bw_filename) | |
| # Create zip file | |
| zip_path = os.path.join(tmpdir, f"ntv3_tracks_{uuid.uuid4().hex}.zip") | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: | |
| for bw_file in created_files: | |
| zipf.write(bw_file, os.path.basename(bw_file)) | |
| # Clean up individual BigWig files | |
| for bw_file in created_files: | |
| try: | |
| os.remove(bw_file) | |
| except Exception: | |
| pass | |
| try: | |
| os.rmdir(output_dir) | |
| except Exception: | |
| pass | |
| return zip_path | |