Spaces:
Sleeping
Sleeping
| """ | |
| sequence_processor.py | |
| ===================== | |
| Biological translation engine for DeepCRISPR. | |
| Converts raw CRISPR guide RNA / DNA sequences into the 306-dimension | |
| embedding format required by the Mega Model. | |
| Strategy: | |
| 1. If HuggingFace `transformers` + `torch` are installed, uses a | |
| pre-trained DNA language model (DNABERT-2 or similar) to generate | |
| learned embeddings, then projects to 306 dimensions. | |
| 2. Otherwise, falls back to a deterministic k-mer frequency + positional | |
| encoding method that produces a reproducible 306-dim vector from | |
| any DNA string β no GPU or pip install required. | |
| Author: Mujahid | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import hashlib | |
| import math | |
| from typing import List, Union | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Public API | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_306_embeddings(raw_sequence: str) -> pd.DataFrame: | |
| """ | |
| Convert a raw DNA/RNA sequence (or multiple sequences separated by | |
| newlines) into a DataFrame with exactly 306 embedding columns. | |
| Parameters | |
| ---------- | |
| raw_sequence : str | |
| One or more DNA/RNA sequences, one per line. | |
| Valid characters: A, T/U, G, C, N (case-insensitive). | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| Shape (n_sequences, 306) with columns matching the Mega Model's | |
| expected feature names (loaded from mega_feature_importance.csv | |
| if available, otherwise emb_0 β¦ emb_305). | |
| """ | |
| sequences = _parse_sequences(raw_sequence) | |
| if not sequences: | |
| raise ValueError("No valid DNA/RNA sequences found in input.") | |
| # Try transformer-based embeddings first | |
| try: | |
| embeddings = _transformer_embeddings(sequences) | |
| except Exception: | |
| embeddings = _kmer_embeddings(sequences) | |
| # Build DataFrame with correct column names | |
| col_names = _get_column_names() | |
| return pd.DataFrame(embeddings, columns=col_names) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Sequence Parsing | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _VALID_BASES = set("ATUGCNatugcn") | |
| def _parse_sequences(raw: str) -> List[str]: | |
| """Parse, clean and validate DNA/RNA sequences from raw text.""" | |
| lines = raw.strip().split("\n") | |
| sequences = [] | |
| for line in lines: | |
| seq = line.strip().upper().replace("U", "T") # RNA β DNA | |
| # Skip FASTA headers and empty lines | |
| if not seq or seq.startswith(">"): | |
| continue | |
| # Remove whitespace and non-base characters | |
| seq = "".join(c for c in seq if c in "ATGCN") | |
| if len(seq) >= 10: # Minimum viable guide length | |
| sequences.append(seq) | |
| return sequences | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Column Names (match mega_feature_importance.csv) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_column_names() -> List[str]: | |
| """Get the 306 feature names from mega_feature_importance.csv or fallback.""" | |
| import os | |
| fi_path = os.path.join( | |
| os.path.dirname(os.path.abspath(__file__)), | |
| "mega_feature_importance.csv" | |
| ) | |
| try: | |
| fi_df = pd.read_csv(fi_path) | |
| names = fi_df.iloc[:, 0].tolist() | |
| return names[:306] | |
| except Exception: | |
| return [f"emb_{i}" for i in range(306)] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Strategy 1: Transformer-Based Embeddings (if available) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _transformer_embeddings(sequences: List[str]) -> np.ndarray: | |
| """ | |
| Use a HuggingFace DNA language model to produce 306-dim embeddings. | |
| Raises ImportError / Exception if transformers/torch not available. | |
| """ | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| model_name = "zhihan1996/DNABERT-2-117M" | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
| model = AutoModel.from_pretrained(model_name, trust_remote_code=True) | |
| model.eval() | |
| all_embeddings = [] | |
| for seq in sequences: | |
| inputs = tokenizer(seq, return_tensors="pt", padding=True, truncation=True, | |
| max_length=512) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| # Mean-pool token embeddings β single vector | |
| hidden = outputs.last_hidden_state.mean(dim=1).squeeze().numpy() | |
| # Project / pad / truncate to exactly 306 dimensions | |
| emb_306 = _project_to_306(hidden) | |
| all_embeddings.append(emb_306) | |
| return np.array(all_embeddings) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Strategy 2: K-mer Frequency + Positional Encoding (fallback) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _kmer_embeddings(sequences: List[str]) -> np.ndarray: | |
| """ | |
| Deterministic embedding: combines k-mer frequencies (k=1..6) with | |
| positional encodings and a hash-based expansion to fill 306 dims. | |
| Fully reproducible, no dependencies beyond numpy. | |
| """ | |
| all_embeddings = [] | |
| for seq in sequences: | |
| features = [] | |
| # ββ 1) Mono/di/tri/tetra/penta/hexa-nucleotide frequencies ββ | |
| bases = ["A", "T", "G", "C"] | |
| for k in range(1, 7): | |
| kmers = {} | |
| for i in range(len(seq) - k + 1): | |
| kmer = seq[i:i + k] | |
| if all(c in "ATGC" for c in kmer): | |
| kmers[kmer] = kmers.get(kmer, 0) + 1 | |
| total = sum(kmers.values()) or 1 | |
| # Sorted k-mer frequencies | |
| all_kmers = _generate_kmers(bases, k) | |
| for km in all_kmers: | |
| features.append(kmers.get(km, 0) / total) | |
| # ββ 2) Sequence-level statistics ββ | |
| n = len(seq) | |
| gc = (seq.count("G") + seq.count("C")) / max(n, 1) | |
| features.extend([ | |
| gc, # GC content | |
| 1 - gc, # AT content | |
| n / 100.0, # Normalized length | |
| seq.count("N") / max(n, 1), # N fraction | |
| ]) | |
| # ββ 3) Positional encoding of first 20 bases ββ | |
| for pos in range(20): | |
| base = seq[pos] if pos < n else "N" | |
| one_hot = [1.0 if base == b else 0.0 for b in "ATGC"] | |
| features.extend(one_hot) | |
| # ββ 4) Hash-based expansion to exactly 306 ββ | |
| features = np.array(features, dtype=np.float64) | |
| emb_306 = _project_to_306(features) | |
| all_embeddings.append(emb_306) | |
| return np.array(all_embeddings) | |
| def _generate_kmers(bases: List[str], k: int) -> List[str]: | |
| """Generate all possible k-mers from the given bases, sorted.""" | |
| if k == 1: | |
| return sorted(bases) | |
| shorter = _generate_kmers(bases, k - 1) | |
| return sorted([s + b for s in shorter for b in bases]) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Projection helper | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _project_to_306(vec: np.ndarray) -> np.ndarray: | |
| """ | |
| Deterministically project / pad / truncate a vector to exactly 306 dims. | |
| Uses a seeded random projection matrix for expansion. | |
| """ | |
| target = 306 | |
| vec = np.array(vec, dtype=np.float64).flatten() | |
| if len(vec) == target: | |
| return vec | |
| elif len(vec) > target: | |
| return vec[:target] | |
| else: | |
| # Expand using a deterministic hash-based projection | |
| result = np.zeros(target, dtype=np.float64) | |
| result[:len(vec)] = vec | |
| # Fill remaining dims with sinusoidal combinations of existing features | |
| for i in range(len(vec), target): | |
| seed_val = vec[i % len(vec)] | |
| result[i] = math.sin(seed_val * (i + 1) * 0.1) * 0.5 | |
| return result | |