Spaces:
Running
on
Zero
Running
on
Zero
File size: 5,104 Bytes
b21c184 9dd80fe b21c184 b65f002 b21c184 b65f002 b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b65f002 9dd80fe b21c184 b65f002 b21c184 9dd80fe b21c184 b65f002 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 9dd80fe b21c184 b65f002 b21c184 b65f002 b21c184 9dd80fe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
"""
BigWig export functionality for NTv3 tracks.
"""
import os
import tempfile
import uuid
import zipfile
from typing import TYPE_CHECKING
import numpy as np
try:
import pyBigWig # noqa: N816
except ImportError:
pyBigWig = None # noqa: N816
if TYPE_CHECKING:
from ntv3_tracks_pipeline import NTv3TracksOutput
def _softmax_last(x: np.ndarray) -> np.ndarray:
"""Compute softmax over the last dimension."""
x = x - x.max(axis=-1, keepdims=True)
ex = np.exp(x)
return ex / ex.sum(axis=-1, keepdims=True)
def create_bigwig_zip(
out: "NTv3TracksOutput",
bigwig_selected: list[str],
bed_elements: list[str],
) -> str:
"""
Create BigWig files for selected tracks and save them in a zip file.
Parameters
----------
out : NTv3TracksOutput
The prediction output from the pipeline.
bigwig_selected : list[str]
List of BigWig track IDs to export.
bed_elements : list[str]
List of BED element names to export.
Returns
-------
str
Path to the created zip file containing BigWig files.
Raises
------
ImportError
If pyBigWig is not installed.
ValueError
If no predictions are available or no tracks are selected.
"""
if pyBigWig is None:
raise ImportError(
"pyBigWig is required for BigWig export. Install with: pip install pyBigWig"
)
if out is None:
raise ValueError("No predictions available. Please run a prediction first.")
bw_names = out.bigwig_track_names or []
bw_logits = out.bigwig_tracks_logits
bed_names = out.bed_element_names or []
bed_logits = out.bed_tracks_logits
if bw_logits is None or not bw_names:
raise ValueError("No BigWig tracks available in model output.")
# Get genomic coordinates
chrom = out.chrom
if chrom is None:
raise ValueError(
"Chromosome information not available. Use genomic coordinates."
)
start = out.start
end = out.end
if start is None or end is None:
raise ValueError("Start and end coordinates are required for BigWig export.")
window_len = out.window_len or (end - start)
# Calculate prediction region (center 37.5%)
if out.pred_start is not None:
pred_start = out.pred_start
else:
pred_start = start + int(window_len * 0.3125)
# Create temporary directory for BigWig files
tmpdir = tempfile.gettempdir()
output_dir = os.path.join(tmpdir, f"bigwig_outputs_{uuid.uuid4().hex}")
os.makedirs(output_dir, exist_ok=True)
# Prepare track data list
track_data_list = []
# Add BigWig tracks
for track_id in bigwig_selected:
if track_id in bw_names:
idx = bw_names.index(track_id)
track_data_list.append(("bigwig", track_id, idx, None))
# Add BED elements (as probabilities)
if bed_logits is not None and bed_elements:
probs = _softmax_last(bed_logits)
for elem_name in bed_elements:
if elem_name in bed_names:
eidx = bed_names.index(elem_name)
# Store as bed element with probability data
track_data_list.append(("bed", elem_name, eidx, probs[:, eidx, 1]))
if not track_data_list:
raise ValueError("No tracks selected for export.")
# Create BigWig files
created_files = []
for track_type, track_id, track_idx, bed_probs in track_data_list:
if track_type == "bigwig":
track_data = bw_logits[:, track_idx].astype(np.float32)
display_name = track_id
else: # bed
if bed_probs is None:
continue
track_data = bed_probs.astype(np.float32)
display_name = track_id
# Clean filename
clean_name = display_name.replace(" ", "_").replace("/", "_").replace("-", "_")
bw_filename = os.path.join(output_dir, f"{clean_name}.bw")
# Create BigWig file
bw = pyBigWig.open(bw_filename, "w")
# Add header - use end of genomic window as chromosome size
bw.addHeader([(chrom, end)])
# Add entries
num_positions = len(track_data)
starts = np.arange(pred_start, pred_start + num_positions, dtype=np.int64)
ends = starts + 1
values = track_data.tolist()
bw.addEntries(
chroms=[chrom] * len(starts),
starts=starts.tolist(),
ends=ends.tolist(),
values=values,
)
bw.close()
created_files.append(bw_filename)
# Create zip file
zip_path = os.path.join(tmpdir, f"ntv3_tracks_{uuid.uuid4().hex}.zip")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for bw_file in created_files:
zipf.write(bw_file, os.path.basename(bw_file))
# Clean up individual BigWig files
for bw_file in created_files:
try:
os.remove(bw_file)
except Exception:
pass
try:
os.rmdir(output_dir)
except Exception:
pass
return zip_path
|