ntv3_tracks / bigwig_export.py
bernardo-de-almeida's picture
refactor: clean code
b65f002
"""
BigWig export functionality for NTv3 tracks.
"""
import os
import tempfile
import uuid
import zipfile
from typing import TYPE_CHECKING
import numpy as np
try:
import pyBigWig # noqa: N816
except ImportError:
pyBigWig = None # noqa: N816
if TYPE_CHECKING:
from ntv3_tracks_pipeline import NTv3TracksOutput
def _softmax_last(x: np.ndarray) -> np.ndarray:
"""Compute softmax over the last dimension."""
x = x - x.max(axis=-1, keepdims=True)
ex = np.exp(x)
return ex / ex.sum(axis=-1, keepdims=True)
def create_bigwig_zip(
out: "NTv3TracksOutput",
bigwig_selected: list[str],
bed_elements: list[str],
) -> str:
"""
Create BigWig files for selected tracks and save them in a zip file.
Parameters
----------
out : NTv3TracksOutput
The prediction output from the pipeline.
bigwig_selected : list[str]
List of BigWig track IDs to export.
bed_elements : list[str]
List of BED element names to export.
Returns
-------
str
Path to the created zip file containing BigWig files.
Raises
------
ImportError
If pyBigWig is not installed.
ValueError
If no predictions are available or no tracks are selected.
"""
if pyBigWig is None:
raise ImportError(
"pyBigWig is required for BigWig export. Install with: pip install pyBigWig"
)
if out is None:
raise ValueError("No predictions available. Please run a prediction first.")
bw_names = out.bigwig_track_names or []
bw_logits = out.bigwig_tracks_logits
bed_names = out.bed_element_names or []
bed_logits = out.bed_tracks_logits
if bw_logits is None or not bw_names:
raise ValueError("No BigWig tracks available in model output.")
# Get genomic coordinates
chrom = out.chrom
if chrom is None:
raise ValueError(
"Chromosome information not available. Use genomic coordinates."
)
start = out.start
end = out.end
if start is None or end is None:
raise ValueError("Start and end coordinates are required for BigWig export.")
window_len = out.window_len or (end - start)
# Calculate prediction region (center 37.5%)
if out.pred_start is not None:
pred_start = out.pred_start
else:
pred_start = start + int(window_len * 0.3125)
# Create temporary directory for BigWig files
tmpdir = tempfile.gettempdir()
output_dir = os.path.join(tmpdir, f"bigwig_outputs_{uuid.uuid4().hex}")
os.makedirs(output_dir, exist_ok=True)
# Prepare track data list
track_data_list = []
# Add BigWig tracks
for track_id in bigwig_selected:
if track_id in bw_names:
idx = bw_names.index(track_id)
track_data_list.append(("bigwig", track_id, idx, None))
# Add BED elements (as probabilities)
if bed_logits is not None and bed_elements:
probs = _softmax_last(bed_logits)
for elem_name in bed_elements:
if elem_name in bed_names:
eidx = bed_names.index(elem_name)
# Store as bed element with probability data
track_data_list.append(("bed", elem_name, eidx, probs[:, eidx, 1]))
if not track_data_list:
raise ValueError("No tracks selected for export.")
# Create BigWig files
created_files = []
for track_type, track_id, track_idx, bed_probs in track_data_list:
if track_type == "bigwig":
track_data = bw_logits[:, track_idx].astype(np.float32)
display_name = track_id
else: # bed
if bed_probs is None:
continue
track_data = bed_probs.astype(np.float32)
display_name = track_id
# Clean filename
clean_name = display_name.replace(" ", "_").replace("/", "_").replace("-", "_")
bw_filename = os.path.join(output_dir, f"{clean_name}.bw")
# Create BigWig file
bw = pyBigWig.open(bw_filename, "w")
# Add header - use end of genomic window as chromosome size
bw.addHeader([(chrom, end)])
# Add entries
num_positions = len(track_data)
starts = np.arange(pred_start, pred_start + num_positions, dtype=np.int64)
ends = starts + 1
values = track_data.tolist()
bw.addEntries(
chroms=[chrom] * len(starts),
starts=starts.tolist(),
ends=ends.tolist(),
values=values,
)
bw.close()
created_files.append(bw_filename)
# Create zip file
zip_path = os.path.join(tmpdir, f"ntv3_tracks_{uuid.uuid4().hex}.zip")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for bw_file in created_files:
zipf.write(bw_file, os.path.basename(bw_file))
# Clean up individual BigWig files
for bw_file in created_files:
try:
os.remove(bw_file)
except Exception:
pass
try:
os.rmdir(output_dir)
except Exception:
pass
return zip_path