Spaces:
Sleeping
Sleeping
File size: 4,269 Bytes
52e5b45 3cc5297 52e5b45 3cc5297 52e5b45 3cc5297 52e5b45 3cc5297 52e5b45 3cc5297 52e5b45 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | """
DNA Sequence Tokenization for CRISPR BERT model.
Token mapping:
- 0: PAD/OOV (padding or unknown)
- 1: A (Adenine)
- 2: C (Cytosine)
- 3: G (Guanine)
- 4: T (Thymine)
- 5: AMB (Ambiguous - N and other IUPAC codes)
"""
import numpy as np
VOCAB_SIZE = 6
WINDOW_SIZE = 1000
# Lookup table: ASCII -> token ID
# Default to 5 (AMB) for any unknown character
_LUT = np.full(256, 5, dtype=np.uint8)
_LUT[ord("A")] = 1
_LUT[ord("C")] = 2
_LUT[ord("G")] = 3
_LUT[ord("T")] = 4
# Also handle lowercase
_LUT[ord("a")] = 1
_LUT[ord("c")] = 2
_LUT[ord("g")] = 3
_LUT[ord("t")] = 4
def _coerce_positive_int(name: str, value) -> int:
"""Accept int-like values from UI/API inputs and reject unsafe strides."""
if isinstance(value, bool):
raise ValueError(f"{name} must be a positive integer")
if isinstance(value, (int, np.integer)):
parsed = int(value)
elif isinstance(value, float) and value.is_integer():
parsed = int(value)
else:
raise ValueError(f"{name} must be a positive integer")
if parsed <= 0:
raise ValueError(f"{name} must be a positive integer")
return parsed
def encode_sequence(sequence: str) -> np.ndarray:
"""
Convert DNA sequence string to integer token array.
Args:
sequence: DNA sequence string (A, C, G, T, N, etc.)
Returns:
numpy array of uint8 token IDs
"""
# Convert to uppercase for consistency
seq_upper = sequence.upper()
# Convert to bytes and apply lookup
try:
seq_bytes = np.frombuffer(seq_upper.encode("ascii"), dtype=np.uint8)
except UnicodeEncodeError as exc:
raise ValueError("Sequence contains non-ASCII characters") from exc
return _LUT[seq_bytes]
def validate_sequence(sequence: str) -> tuple[bool, str]:
"""
Validate a DNA sequence for API input.
Args:
sequence: Input DNA sequence
Returns:
Tuple of (is_valid, error_message)
"""
if not sequence:
return False, "Sequence is empty"
if len(sequence) < WINDOW_SIZE:
return False, f"Sequence must be at least {WINDOW_SIZE} nucleotides (got {len(sequence)})"
# Check for valid characters (allow standard IUPAC codes)
valid_chars = set("ACGTNacgtnRYSWKMBDHVryswkmbdhv")
seq_chars = set(sequence)
invalid_chars = seq_chars - valid_chars
if invalid_chars:
invalid = ", ".join(repr(c) for c in sorted(invalid_chars))
return False, f"Invalid characters in sequence: {invalid}"
return True, ""
def strip_fasta_header(text: str) -> str:
"""
Remove FASTA header lines from input text.
Args:
text: Input text that may contain FASTA headers
Returns:
Sequence string with headers removed
"""
lines = text.strip().splitlines()
sequence_lines = []
for line in lines:
line = line.strip()
if not line or line.startswith(">"):
continue
sequence_lines.append(line)
return "".join(sequence_lines)
def create_windows(
tokens: np.ndarray,
window_size: int = WINDOW_SIZE,
stride: int = 100
) -> tuple[np.ndarray, np.ndarray]:
"""
Create sliding windows from tokenized sequence.
Args:
tokens: Tokenized sequence array
window_size: Size of each window (default 1000)
stride: Step size between windows (default 100)
Returns:
Tuple of (windows array, start positions array)
"""
window_size = _coerce_positive_int("window_size", window_size)
stride = _coerce_positive_int("stride", stride)
seq_len = len(tokens)
if seq_len < window_size:
# Pad short sequences
padded = np.zeros(window_size, dtype=tokens.dtype)
padded[:seq_len] = tokens
return padded.reshape(1, -1), np.array([0])
# Calculate number of windows
n_windows = (seq_len - window_size) // stride + 1
# Ensure we cover the end of the sequence
starts = np.arange(0, n_windows * stride, stride, dtype=np.int32)
# Add final window if needed
if starts[-1] + window_size < seq_len:
starts = np.append(starts, seq_len - window_size)
# Create windows
windows = np.array([tokens[s:s + window_size] for s in starts])
return windows, starts
|