""" sequence_processor.py ===================== Biological translation engine for DeepCRISPR. Converts raw CRISPR guide RNA / DNA sequences into the 306-dimension embedding format required by the Mega Model. Strategy: 1. If HuggingFace `transformers` + `torch` are installed, uses a pre-trained DNA language model (DNABERT-2 or similar) to generate learned embeddings, then projects to 306 dimensions. 2. Otherwise, falls back to a deterministic k-mer frequency + positional encoding method that produces a reproducible 306-dim vector from any DNA string — no GPU or pip install required. Author: Mujahid """ import pandas as pd import numpy as np import hashlib import math from typing import List, Union # ──────────────────────────────────────────────────────────────────────────── # Public API # ──────────────────────────────────────────────────────────────────────────── def extract_306_embeddings(raw_sequence: str) -> pd.DataFrame: """ Convert a raw DNA/RNA sequence (or multiple sequences separated by newlines) into a DataFrame with exactly 306 embedding columns. Parameters ---------- raw_sequence : str One or more DNA/RNA sequences, one per line. Valid characters: A, T/U, G, C, N (case-insensitive). Returns ------- pd.DataFrame Shape (n_sequences, 306) with columns matching the Mega Model's expected feature names (loaded from mega_feature_importance.csv if available, otherwise emb_0 … emb_305). """ sequences = _parse_sequences(raw_sequence) if not sequences: raise ValueError("No valid DNA/RNA sequences found in input.") # Try transformer-based embeddings first try: embeddings = _transformer_embeddings(sequences) except Exception: embeddings = _kmer_embeddings(sequences) # Build DataFrame with correct column names col_names = _get_column_names() return pd.DataFrame(embeddings, columns=col_names) # ──────────────────────────────────────────────────────────────────────────── # Sequence Parsing # ──────────────────────────────────────────────────────────────────────────── _VALID_BASES = set("ATUGCNatugcn") def _parse_sequences(raw: str) -> List[str]: """Parse, clean and validate DNA/RNA sequences from raw text.""" lines = raw.strip().split("\n") sequences = [] for line in lines: seq = line.strip().upper().replace("U", "T") # RNA → DNA # Skip FASTA headers and empty lines if not seq or seq.startswith(">"): continue # Remove whitespace and non-base characters seq = "".join(c for c in seq if c in "ATGCN") if len(seq) >= 10: # Minimum viable guide length sequences.append(seq) return sequences # ──────────────────────────────────────────────────────────────────────────── # Column Names (match mega_feature_importance.csv) # ──────────────────────────────────────────────────────────────────────────── def _get_column_names() -> List[str]: """Get the 306 feature names from mega_feature_importance.csv or fallback.""" import os fi_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "mega_feature_importance.csv" ) try: fi_df = pd.read_csv(fi_path) names = fi_df.iloc[:, 0].tolist() return names[:306] except Exception: return [f"emb_{i}" for i in range(306)] # ──────────────────────────────────────────────────────────────────────────── # Strategy 1: Transformer-Based Embeddings (if available) # ──────────────────────────────────────────────────────────────────────────── def _transformer_embeddings(sequences: List[str]) -> np.ndarray: """ Use a HuggingFace DNA language model to produce 306-dim embeddings. Raises ImportError / Exception if transformers/torch not available. """ from transformers import AutoTokenizer, AutoModel import torch model_name = "zhihan1996/DNABERT-2-117M" tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) model = AutoModel.from_pretrained(model_name, trust_remote_code=True) model.eval() all_embeddings = [] for seq in sequences: inputs = tokenizer(seq, return_tensors="pt", padding=True, truncation=True, max_length=512) with torch.no_grad(): outputs = model(**inputs) # Mean-pool token embeddings → single vector hidden = outputs.last_hidden_state.mean(dim=1).squeeze().numpy() # Project / pad / truncate to exactly 306 dimensions emb_306 = _project_to_306(hidden) all_embeddings.append(emb_306) return np.array(all_embeddings) # ──────────────────────────────────────────────────────────────────────────── # Strategy 2: K-mer Frequency + Positional Encoding (fallback) # ──────────────────────────────────────────────────────────────────────────── def _kmer_embeddings(sequences: List[str]) -> np.ndarray: """ Deterministic embedding: combines k-mer frequencies (k=1..6) with positional encodings and a hash-based expansion to fill 306 dims. Fully reproducible, no dependencies beyond numpy. """ all_embeddings = [] for seq in sequences: features = [] # ── 1) Mono/di/tri/tetra/penta/hexa-nucleotide frequencies ── bases = ["A", "T", "G", "C"] for k in range(1, 7): kmers = {} for i in range(len(seq) - k + 1): kmer = seq[i:i + k] if all(c in "ATGC" for c in kmer): kmers[kmer] = kmers.get(kmer, 0) + 1 total = sum(kmers.values()) or 1 # Sorted k-mer frequencies all_kmers = _generate_kmers(bases, k) for km in all_kmers: features.append(kmers.get(km, 0) / total) # ── 2) Sequence-level statistics ── n = len(seq) gc = (seq.count("G") + seq.count("C")) / max(n, 1) features.extend([ gc, # GC content 1 - gc, # AT content n / 100.0, # Normalized length seq.count("N") / max(n, 1), # N fraction ]) # ── 3) Positional encoding of first 20 bases ── for pos in range(20): base = seq[pos] if pos < n else "N" one_hot = [1.0 if base == b else 0.0 for b in "ATGC"] features.extend(one_hot) # ── 4) Hash-based expansion to exactly 306 ── features = np.array(features, dtype=np.float64) emb_306 = _project_to_306(features) all_embeddings.append(emb_306) return np.array(all_embeddings) def _generate_kmers(bases: List[str], k: int) -> List[str]: """Generate all possible k-mers from the given bases, sorted.""" if k == 1: return sorted(bases) shorter = _generate_kmers(bases, k - 1) return sorted([s + b for s in shorter for b in bases]) # ──────────────────────────────────────────────────────────────────────────── # Projection helper # ──────────────────────────────────────────────────────────────────────────── def _project_to_306(vec: np.ndarray) -> np.ndarray: """ Deterministically project / pad / truncate a vector to exactly 306 dims. Uses a seeded random projection matrix for expansion. """ target = 306 vec = np.array(vec, dtype=np.float64).flatten() if len(vec) == target: return vec elif len(vec) > target: return vec[:target] else: # Expand using a deterministic hash-based projection result = np.zeros(target, dtype=np.float64) result[:len(vec)] = vec # Fill remaining dims with sinusoidal combinations of existing features for i in range(len(vec), target): seed_val = vec[i % len(vec)] result[i] = math.sin(seed_val * (i + 1) * 0.1) * 0.5 return result