DeepCRISPR / sequence_processor.py
mk6783336's picture
Upload 12 files
c1d5fee verified
"""
sequence_processor.py
=====================
Biological translation engine for DeepCRISPR.
Converts raw CRISPR guide RNA / DNA sequences into the 306-dimension
embedding format required by the Mega Model.
Strategy:
1. If HuggingFace `transformers` + `torch` are installed, uses a
pre-trained DNA language model (DNABERT-2 or similar) to generate
learned embeddings, then projects to 306 dimensions.
2. Otherwise, falls back to a deterministic k-mer frequency + positional
encoding method that produces a reproducible 306-dim vector from
any DNA string β€” no GPU or pip install required.
Author: Mujahid
"""
import pandas as pd
import numpy as np
import hashlib
import math
from typing import List, Union
# ────────────────────────────────────────────────────────────────────────────
# Public API
# ────────────────────────────────────────────────────────────────────────────
def extract_306_embeddings(raw_sequence: str) -> pd.DataFrame:
"""
Convert a raw DNA/RNA sequence (or multiple sequences separated by
newlines) into a DataFrame with exactly 306 embedding columns.
Parameters
----------
raw_sequence : str
One or more DNA/RNA sequences, one per line.
Valid characters: A, T/U, G, C, N (case-insensitive).
Returns
-------
pd.DataFrame
Shape (n_sequences, 306) with columns matching the Mega Model's
expected feature names (loaded from mega_feature_importance.csv
if available, otherwise emb_0 … emb_305).
"""
sequences = _parse_sequences(raw_sequence)
if not sequences:
raise ValueError("No valid DNA/RNA sequences found in input.")
# Try transformer-based embeddings first
try:
embeddings = _transformer_embeddings(sequences)
except Exception:
embeddings = _kmer_embeddings(sequences)
# Build DataFrame with correct column names
col_names = _get_column_names()
return pd.DataFrame(embeddings, columns=col_names)
# ────────────────────────────────────────────────────────────────────────────
# Sequence Parsing
# ────────────────────────────────────────────────────────────────────────────
_VALID_BASES = set("ATUGCNatugcn")
def _parse_sequences(raw: str) -> List[str]:
"""Parse, clean and validate DNA/RNA sequences from raw text."""
lines = raw.strip().split("\n")
sequences = []
for line in lines:
seq = line.strip().upper().replace("U", "T") # RNA β†’ DNA
# Skip FASTA headers and empty lines
if not seq or seq.startswith(">"):
continue
# Remove whitespace and non-base characters
seq = "".join(c for c in seq if c in "ATGCN")
if len(seq) >= 10: # Minimum viable guide length
sequences.append(seq)
return sequences
# ────────────────────────────────────────────────────────────────────────────
# Column Names (match mega_feature_importance.csv)
# ────────────────────────────────────────────────────────────────────────────
def _get_column_names() -> List[str]:
"""Get the 306 feature names from mega_feature_importance.csv or fallback."""
import os
fi_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"mega_feature_importance.csv"
)
try:
fi_df = pd.read_csv(fi_path)
names = fi_df.iloc[:, 0].tolist()
return names[:306]
except Exception:
return [f"emb_{i}" for i in range(306)]
# ────────────────────────────────────────────────────────────────────────────
# Strategy 1: Transformer-Based Embeddings (if available)
# ────────────────────────────────────────────────────────────────────────────
def _transformer_embeddings(sequences: List[str]) -> np.ndarray:
"""
Use a HuggingFace DNA language model to produce 306-dim embeddings.
Raises ImportError / Exception if transformers/torch not available.
"""
from transformers import AutoTokenizer, AutoModel
import torch
model_name = "zhihan1996/DNABERT-2-117M"
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
model = AutoModel.from_pretrained(model_name, trust_remote_code=True)
model.eval()
all_embeddings = []
for seq in sequences:
inputs = tokenizer(seq, return_tensors="pt", padding=True, truncation=True,
max_length=512)
with torch.no_grad():
outputs = model(**inputs)
# Mean-pool token embeddings β†’ single vector
hidden = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
# Project / pad / truncate to exactly 306 dimensions
emb_306 = _project_to_306(hidden)
all_embeddings.append(emb_306)
return np.array(all_embeddings)
# ────────────────────────────────────────────────────────────────────────────
# Strategy 2: K-mer Frequency + Positional Encoding (fallback)
# ────────────────────────────────────────────────────────────────────────────
def _kmer_embeddings(sequences: List[str]) -> np.ndarray:
"""
Deterministic embedding: combines k-mer frequencies (k=1..6) with
positional encodings and a hash-based expansion to fill 306 dims.
Fully reproducible, no dependencies beyond numpy.
"""
all_embeddings = []
for seq in sequences:
features = []
# ── 1) Mono/di/tri/tetra/penta/hexa-nucleotide frequencies ──
bases = ["A", "T", "G", "C"]
for k in range(1, 7):
kmers = {}
for i in range(len(seq) - k + 1):
kmer = seq[i:i + k]
if all(c in "ATGC" for c in kmer):
kmers[kmer] = kmers.get(kmer, 0) + 1
total = sum(kmers.values()) or 1
# Sorted k-mer frequencies
all_kmers = _generate_kmers(bases, k)
for km in all_kmers:
features.append(kmers.get(km, 0) / total)
# ── 2) Sequence-level statistics ──
n = len(seq)
gc = (seq.count("G") + seq.count("C")) / max(n, 1)
features.extend([
gc, # GC content
1 - gc, # AT content
n / 100.0, # Normalized length
seq.count("N") / max(n, 1), # N fraction
])
# ── 3) Positional encoding of first 20 bases ──
for pos in range(20):
base = seq[pos] if pos < n else "N"
one_hot = [1.0 if base == b else 0.0 for b in "ATGC"]
features.extend(one_hot)
# ── 4) Hash-based expansion to exactly 306 ──
features = np.array(features, dtype=np.float64)
emb_306 = _project_to_306(features)
all_embeddings.append(emb_306)
return np.array(all_embeddings)
def _generate_kmers(bases: List[str], k: int) -> List[str]:
"""Generate all possible k-mers from the given bases, sorted."""
if k == 1:
return sorted(bases)
shorter = _generate_kmers(bases, k - 1)
return sorted([s + b for s in shorter for b in bases])
# ────────────────────────────────────────────────────────────────────────────
# Projection helper
# ────────────────────────────────────────────────────────────────────────────
def _project_to_306(vec: np.ndarray) -> np.ndarray:
"""
Deterministically project / pad / truncate a vector to exactly 306 dims.
Uses a seeded random projection matrix for expansion.
"""
target = 306
vec = np.array(vec, dtype=np.float64).flatten()
if len(vec) == target:
return vec
elif len(vec) > target:
return vec[:target]
else:
# Expand using a deterministic hash-based projection
result = np.zeros(target, dtype=np.float64)
result[:len(vec)] = vec
# Fill remaining dims with sinusoidal combinations of existing features
for i in range(len(vec), target):
seed_val = vec[i % len(vec)]
result[i] = math.sin(seed_val * (i + 1) * 0.1) * 0.5
return result