File size: 5,104 Bytes
b21c184
 
 
 
 
 
9dd80fe
b21c184
 
 
 
 
 
b65f002
b21c184
b65f002
b21c184
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9dd80fe
b21c184
 
 
 
 
 
 
 
9dd80fe
b21c184
 
 
 
9dd80fe
b21c184
 
 
 
 
 
 
 
9dd80fe
 
 
 
b21c184
 
9dd80fe
b21c184
 
 
 
9dd80fe
b21c184
 
9dd80fe
b21c184
 
 
9dd80fe
b65f002
9dd80fe
 
b21c184
 
b65f002
 
b21c184
9dd80fe
b21c184
b65f002
 
 
 
9dd80fe
b21c184
 
 
 
9dd80fe
b21c184
 
9dd80fe
b21c184
 
 
 
 
9dd80fe
b21c184
 
 
 
 
 
 
 
9dd80fe
b21c184
 
9dd80fe
b21c184
 
 
 
 
 
 
 
 
 
 
9dd80fe
b21c184
 
 
9dd80fe
b21c184
 
9dd80fe
b21c184
 
9dd80fe
b21c184
 
 
 
 
9dd80fe
b21c184
 
 
 
9dd80fe
b21c184
9dd80fe
b21c184
 
9dd80fe
b21c184
 
9dd80fe
b21c184
 
9dd80fe
b21c184
 
 
 
b65f002
b21c184
 
 
b65f002
b21c184
 
9dd80fe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
BigWig export functionality for NTv3 tracks.
"""

import os
import tempfile
import uuid
import zipfile
from typing import TYPE_CHECKING

import numpy as np

try:
    import pyBigWig  # noqa: N816
except ImportError:
    pyBigWig = None  # noqa: N816

if TYPE_CHECKING:
    from ntv3_tracks_pipeline import NTv3TracksOutput


def _softmax_last(x: np.ndarray) -> np.ndarray:
    """Compute softmax over the last dimension."""
    x = x - x.max(axis=-1, keepdims=True)
    ex = np.exp(x)
    return ex / ex.sum(axis=-1, keepdims=True)


def create_bigwig_zip(
    out: "NTv3TracksOutput",
    bigwig_selected: list[str],
    bed_elements: list[str],
) -> str:
    """
    Create BigWig files for selected tracks and save them in a zip file.

    Parameters
    ----------
    out : NTv3TracksOutput
        The prediction output from the pipeline.
    bigwig_selected : list[str]
        List of BigWig track IDs to export.
    bed_elements : list[str]
        List of BED element names to export.

    Returns
    -------
    str
        Path to the created zip file containing BigWig files.

    Raises
    ------
    ImportError
        If pyBigWig is not installed.
    ValueError
        If no predictions are available or no tracks are selected.
    """
    if pyBigWig is None:
        raise ImportError(
            "pyBigWig is required for BigWig export. Install with: pip install pyBigWig"
        )

    if out is None:
        raise ValueError("No predictions available. Please run a prediction first.")

    bw_names = out.bigwig_track_names or []
    bw_logits = out.bigwig_tracks_logits
    bed_names = out.bed_element_names or []
    bed_logits = out.bed_tracks_logits

    if bw_logits is None or not bw_names:
        raise ValueError("No BigWig tracks available in model output.")

    # Get genomic coordinates
    chrom = out.chrom
    if chrom is None:
        raise ValueError(
            "Chromosome information not available. Use genomic coordinates."
        )

    start = out.start
    end = out.end
    if start is None or end is None:
        raise ValueError("Start and end coordinates are required for BigWig export.")
    window_len = out.window_len or (end - start)

    # Calculate prediction region (center 37.5%)
    if out.pred_start is not None:
        pred_start = out.pred_start
    else:
        pred_start = start + int(window_len * 0.3125)

    # Create temporary directory for BigWig files
    tmpdir = tempfile.gettempdir()
    output_dir = os.path.join(tmpdir, f"bigwig_outputs_{uuid.uuid4().hex}")
    os.makedirs(output_dir, exist_ok=True)

    # Prepare track data list
    track_data_list = []

    # Add BigWig tracks
    for track_id in bigwig_selected:
        if track_id in bw_names:
            idx = bw_names.index(track_id)
            track_data_list.append(("bigwig", track_id, idx, None))

    # Add BED elements (as probabilities)
    if bed_logits is not None and bed_elements:
        probs = _softmax_last(bed_logits)
        for elem_name in bed_elements:
            if elem_name in bed_names:
                eidx = bed_names.index(elem_name)
                # Store as bed element with probability data
                track_data_list.append(("bed", elem_name, eidx, probs[:, eidx, 1]))

    if not track_data_list:
        raise ValueError("No tracks selected for export.")

    # Create BigWig files
    created_files = []
    for track_type, track_id, track_idx, bed_probs in track_data_list:
        if track_type == "bigwig":
            track_data = bw_logits[:, track_idx].astype(np.float32)
            display_name = track_id
        else:  # bed
            if bed_probs is None:
                continue
            track_data = bed_probs.astype(np.float32)
            display_name = track_id

        # Clean filename
        clean_name = display_name.replace(" ", "_").replace("/", "_").replace("-", "_")
        bw_filename = os.path.join(output_dir, f"{clean_name}.bw")

        # Create BigWig file
        bw = pyBigWig.open(bw_filename, "w")

        # Add header - use end of genomic window as chromosome size
        bw.addHeader([(chrom, end)])

        # Add entries
        num_positions = len(track_data)
        starts = np.arange(pred_start, pred_start + num_positions, dtype=np.int64)
        ends = starts + 1
        values = track_data.tolist()

        bw.addEntries(
            chroms=[chrom] * len(starts),
            starts=starts.tolist(),
            ends=ends.tolist(),
            values=values,
        )

        bw.close()
        created_files.append(bw_filename)

    # Create zip file
    zip_path = os.path.join(tmpdir, f"ntv3_tracks_{uuid.uuid4().hex}.zip")
    with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
        for bw_file in created_files:
            zipf.write(bw_file, os.path.basename(bw_file))

    # Clean up individual BigWig files
    for bw_file in created_files:
        try:
            os.remove(bw_file)
        except Exception:
            pass
    try:
        os.rmdir(output_dir)
    except Exception:
        pass

    return zip_path