File size: 4,269 Bytes
52e5b45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3cc5297
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52e5b45
 
 
 
 
 
 
 
 
 
 
 
 
3cc5297
 
 
 
52e5b45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3cc5297
 
52e5b45
 
 
 
 
 
 
 
 
 
 
 
 
 
3cc5297
 
 
 
 
 
 
52e5b45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3cc5297
 
52e5b45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
DNA Sequence Tokenization for CRISPR BERT model.

Token mapping:
  - 0: PAD/OOV (padding or unknown)
  - 1: A (Adenine)
  - 2: C (Cytosine)
  - 3: G (Guanine)
  - 4: T (Thymine)
  - 5: AMB (Ambiguous - N and other IUPAC codes)
"""

import numpy as np

VOCAB_SIZE = 6
WINDOW_SIZE = 1000

# Lookup table: ASCII -> token ID
# Default to 5 (AMB) for any unknown character
_LUT = np.full(256, 5, dtype=np.uint8)
_LUT[ord("A")] = 1
_LUT[ord("C")] = 2
_LUT[ord("G")] = 3
_LUT[ord("T")] = 4
# Also handle lowercase
_LUT[ord("a")] = 1
_LUT[ord("c")] = 2
_LUT[ord("g")] = 3
_LUT[ord("t")] = 4


def _coerce_positive_int(name: str, value) -> int:
    """Accept int-like values from UI/API inputs and reject unsafe strides."""
    if isinstance(value, bool):
        raise ValueError(f"{name} must be a positive integer")
    if isinstance(value, (int, np.integer)):
        parsed = int(value)
    elif isinstance(value, float) and value.is_integer():
        parsed = int(value)
    else:
        raise ValueError(f"{name} must be a positive integer")

    if parsed <= 0:
        raise ValueError(f"{name} must be a positive integer")
    return parsed


def encode_sequence(sequence: str) -> np.ndarray:
    """
    Convert DNA sequence string to integer token array.

    Args:
        sequence: DNA sequence string (A, C, G, T, N, etc.)

    Returns:
        numpy array of uint8 token IDs
    """
    # Convert to uppercase for consistency
    seq_upper = sequence.upper()
    # Convert to bytes and apply lookup
    try:
        seq_bytes = np.frombuffer(seq_upper.encode("ascii"), dtype=np.uint8)
    except UnicodeEncodeError as exc:
        raise ValueError("Sequence contains non-ASCII characters") from exc
    return _LUT[seq_bytes]


def validate_sequence(sequence: str) -> tuple[bool, str]:
    """
    Validate a DNA sequence for API input.

    Args:
        sequence: Input DNA sequence

    Returns:
        Tuple of (is_valid, error_message)
    """
    if not sequence:
        return False, "Sequence is empty"

    if len(sequence) < WINDOW_SIZE:
        return False, f"Sequence must be at least {WINDOW_SIZE} nucleotides (got {len(sequence)})"

    # Check for valid characters (allow standard IUPAC codes)
    valid_chars = set("ACGTNacgtnRYSWKMBDHVryswkmbdhv")
    seq_chars = set(sequence)
    invalid_chars = seq_chars - valid_chars

    if invalid_chars:
        invalid = ", ".join(repr(c) for c in sorted(invalid_chars))
        return False, f"Invalid characters in sequence: {invalid}"

    return True, ""


def strip_fasta_header(text: str) -> str:
    """
    Remove FASTA header lines from input text.

    Args:
        text: Input text that may contain FASTA headers

    Returns:
        Sequence string with headers removed
    """
    lines = text.strip().splitlines()
    sequence_lines = []
    for line in lines:
        line = line.strip()
        if not line or line.startswith(">"):
            continue
        sequence_lines.append(line)
    return "".join(sequence_lines)


def create_windows(
    tokens: np.ndarray,
    window_size: int = WINDOW_SIZE,
    stride: int = 100
) -> tuple[np.ndarray, np.ndarray]:
    """
    Create sliding windows from tokenized sequence.

    Args:
        tokens: Tokenized sequence array
        window_size: Size of each window (default 1000)
        stride: Step size between windows (default 100)

    Returns:
        Tuple of (windows array, start positions array)
    """
    window_size = _coerce_positive_int("window_size", window_size)
    stride = _coerce_positive_int("stride", stride)
    seq_len = len(tokens)

    if seq_len < window_size:
        # Pad short sequences
        padded = np.zeros(window_size, dtype=tokens.dtype)
        padded[:seq_len] = tokens
        return padded.reshape(1, -1), np.array([0])

    # Calculate number of windows
    n_windows = (seq_len - window_size) // stride + 1

    # Ensure we cover the end of the sequence
    starts = np.arange(0, n_windows * stride, stride, dtype=np.int32)

    # Add final window if needed
    if starts[-1] + window_size < seq_len:
        starts = np.append(starts, seq_len - window_size)

    # Create windows
    windows = np.array([tokens[s:s + window_size] for s in starts])

    return windows, starts