Spaces:
Sleeping
Sleeping
File size: 10,068 Bytes
c1d5fee | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | """
sequence_processor.py
=====================
Biological translation engine for DeepCRISPR.
Converts raw CRISPR guide RNA / DNA sequences into the 306-dimension
embedding format required by the Mega Model.
Strategy:
1. If HuggingFace `transformers` + `torch` are installed, uses a
pre-trained DNA language model (DNABERT-2 or similar) to generate
learned embeddings, then projects to 306 dimensions.
2. Otherwise, falls back to a deterministic k-mer frequency + positional
encoding method that produces a reproducible 306-dim vector from
any DNA string β no GPU or pip install required.
Author: Mujahid
"""
import pandas as pd
import numpy as np
import hashlib
import math
from typing import List, Union
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Public API
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def extract_306_embeddings(raw_sequence: str) -> pd.DataFrame:
"""
Convert a raw DNA/RNA sequence (or multiple sequences separated by
newlines) into a DataFrame with exactly 306 embedding columns.
Parameters
----------
raw_sequence : str
One or more DNA/RNA sequences, one per line.
Valid characters: A, T/U, G, C, N (case-insensitive).
Returns
-------
pd.DataFrame
Shape (n_sequences, 306) with columns matching the Mega Model's
expected feature names (loaded from mega_feature_importance.csv
if available, otherwise emb_0 β¦ emb_305).
"""
sequences = _parse_sequences(raw_sequence)
if not sequences:
raise ValueError("No valid DNA/RNA sequences found in input.")
# Try transformer-based embeddings first
try:
embeddings = _transformer_embeddings(sequences)
except Exception:
embeddings = _kmer_embeddings(sequences)
# Build DataFrame with correct column names
col_names = _get_column_names()
return pd.DataFrame(embeddings, columns=col_names)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Sequence Parsing
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_VALID_BASES = set("ATUGCNatugcn")
def _parse_sequences(raw: str) -> List[str]:
"""Parse, clean and validate DNA/RNA sequences from raw text."""
lines = raw.strip().split("\n")
sequences = []
for line in lines:
seq = line.strip().upper().replace("U", "T") # RNA β DNA
# Skip FASTA headers and empty lines
if not seq or seq.startswith(">"):
continue
# Remove whitespace and non-base characters
seq = "".join(c for c in seq if c in "ATGCN")
if len(seq) >= 10: # Minimum viable guide length
sequences.append(seq)
return sequences
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Column Names (match mega_feature_importance.csv)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _get_column_names() -> List[str]:
"""Get the 306 feature names from mega_feature_importance.csv or fallback."""
import os
fi_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"mega_feature_importance.csv"
)
try:
fi_df = pd.read_csv(fi_path)
names = fi_df.iloc[:, 0].tolist()
return names[:306]
except Exception:
return [f"emb_{i}" for i in range(306)]
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Strategy 1: Transformer-Based Embeddings (if available)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _transformer_embeddings(sequences: List[str]) -> np.ndarray:
"""
Use a HuggingFace DNA language model to produce 306-dim embeddings.
Raises ImportError / Exception if transformers/torch not available.
"""
from transformers import AutoTokenizer, AutoModel
import torch
model_name = "zhihan1996/DNABERT-2-117M"
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
model = AutoModel.from_pretrained(model_name, trust_remote_code=True)
model.eval()
all_embeddings = []
for seq in sequences:
inputs = tokenizer(seq, return_tensors="pt", padding=True, truncation=True,
max_length=512)
with torch.no_grad():
outputs = model(**inputs)
# Mean-pool token embeddings β single vector
hidden = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
# Project / pad / truncate to exactly 306 dimensions
emb_306 = _project_to_306(hidden)
all_embeddings.append(emb_306)
return np.array(all_embeddings)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Strategy 2: K-mer Frequency + Positional Encoding (fallback)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _kmer_embeddings(sequences: List[str]) -> np.ndarray:
"""
Deterministic embedding: combines k-mer frequencies (k=1..6) with
positional encodings and a hash-based expansion to fill 306 dims.
Fully reproducible, no dependencies beyond numpy.
"""
all_embeddings = []
for seq in sequences:
features = []
# ββ 1) Mono/di/tri/tetra/penta/hexa-nucleotide frequencies ββ
bases = ["A", "T", "G", "C"]
for k in range(1, 7):
kmers = {}
for i in range(len(seq) - k + 1):
kmer = seq[i:i + k]
if all(c in "ATGC" for c in kmer):
kmers[kmer] = kmers.get(kmer, 0) + 1
total = sum(kmers.values()) or 1
# Sorted k-mer frequencies
all_kmers = _generate_kmers(bases, k)
for km in all_kmers:
features.append(kmers.get(km, 0) / total)
# ββ 2) Sequence-level statistics ββ
n = len(seq)
gc = (seq.count("G") + seq.count("C")) / max(n, 1)
features.extend([
gc, # GC content
1 - gc, # AT content
n / 100.0, # Normalized length
seq.count("N") / max(n, 1), # N fraction
])
# ββ 3) Positional encoding of first 20 bases ββ
for pos in range(20):
base = seq[pos] if pos < n else "N"
one_hot = [1.0 if base == b else 0.0 for b in "ATGC"]
features.extend(one_hot)
# ββ 4) Hash-based expansion to exactly 306 ββ
features = np.array(features, dtype=np.float64)
emb_306 = _project_to_306(features)
all_embeddings.append(emb_306)
return np.array(all_embeddings)
def _generate_kmers(bases: List[str], k: int) -> List[str]:
"""Generate all possible k-mers from the given bases, sorted."""
if k == 1:
return sorted(bases)
shorter = _generate_kmers(bases, k - 1)
return sorted([s + b for s in shorter for b in bases])
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Projection helper
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _project_to_306(vec: np.ndarray) -> np.ndarray:
"""
Deterministically project / pad / truncate a vector to exactly 306 dims.
Uses a seeded random projection matrix for expansion.
"""
target = 306
vec = np.array(vec, dtype=np.float64).flatten()
if len(vec) == target:
return vec
elif len(vec) > target:
return vec[:target]
else:
# Expand using a deterministic hash-based projection
result = np.zeros(target, dtype=np.float64)
result[:len(vec)] = vec
# Fill remaining dims with sinusoidal combinations of existing features
for i in range(len(vec), target):
seed_val = vec[i % len(vec)]
result[i] = math.sin(seed_val * (i + 1) * 0.1) * 0.5
return result
|