Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.preprocessing import OneHotEncoder | |
| BASES = ["A", "C", "G", "U"] | |
| BASE_TO_IDX = {b: i for i, b in enumerate(BASES)} | |
| def clean_seq(seq: str, length: int = 19) -> str: | |
| seq = (seq or "").upper().replace("T", "U") | |
| cleaned = "".join([ch if ch in BASES else "N" for ch in seq]) | |
| if len(cleaned) < length: | |
| cleaned = cleaned + "N" * (length - len(cleaned)) | |
| else: | |
| cleaned = cleaned[:length] | |
| return cleaned | |
| def one_hot_seq(seq: str) -> np.ndarray: | |
| arr = np.zeros((len(seq), 4), dtype=np.float32) | |
| for i, ch in enumerate(seq): | |
| if ch in BASE_TO_IDX: | |
| arr[i, BASE_TO_IDX[ch]] = 1.0 | |
| return arr.reshape(-1) | |
| def interaction_features(si: str, mr: str): | |
| wc_set = {("A", "U"), ("U", "A"), ("G", "C"), ("C", "G")} | |
| wobble_set = {("G", "U"), ("U", "G")} | |
| wc = np.zeros(len(si), dtype=np.float32) | |
| wob = np.zeros(len(si), dtype=np.float32) | |
| mm = np.zeros(len(si), dtype=np.float32) | |
| for i, (a, b) in enumerate(zip(si, mr)): | |
| if a in BASES and b in BASES: | |
| pair = (a, b) | |
| if pair in wc_set: | |
| wc[i] = 1.0 | |
| elif pair in wobble_set: | |
| wob[i] = 1.0 | |
| else: | |
| mm[i] = 1.0 | |
| else: | |
| mm[i] = 1.0 | |
| total_wc = wc.sum() | |
| total_wob = wob.sum() | |
| total_mm = mm.sum() | |
| seed_slice = slice(1, 8) | |
| seed_wc = wc[seed_slice].sum() | |
| seed_wob = wob[seed_slice].sum() | |
| per_pos = np.concatenate([wc, wob, mm]).astype(np.float32) | |
| summary = np.array([total_wc, total_wob, total_mm, seed_wc, seed_wob], dtype=np.float32) | |
| return per_pos, summary | |
| def kmer_counts(seq: str): | |
| mono = np.zeros(4, dtype=np.float32) | |
| for ch in seq: | |
| if ch in BASE_TO_IDX: | |
| mono[BASE_TO_IDX[ch]] += 1 | |
| if len(seq) > 0: | |
| mono /= len(seq) | |
| di = np.zeros(16, dtype=np.float32) | |
| for i in range(len(seq) - 1): | |
| a, b = seq[i], seq[i + 1] | |
| if a in BASE_TO_IDX and b in BASE_TO_IDX: | |
| idx = BASE_TO_IDX[a] * 4 + BASE_TO_IDX[b] | |
| di[idx] += 1 | |
| if len(seq) > 1: | |
| di /= (len(seq) - 1) | |
| return mono, di | |
| def build_feature_matrix( | |
| df: pd.DataFrame, | |
| encoder: OneHotEncoder | None = None, | |
| fit_encoder: bool = False, | |
| artifacts_path: str | None = None, | |
| ): | |
| work_df = df.copy() | |
| if "source" not in work_df.columns: | |
| work_df["source"] = "unknown" | |
| if "cell_line" not in work_df.columns: | |
| work_df["cell_line"] = "unknown" | |
| si_clean = work_df["siRNA"].apply(clean_seq) | |
| mr_clean = work_df["mRNA"].apply(clean_seq) | |
| seq_features = [] | |
| inter_per_pos = [] | |
| inter_summary = [] | |
| kmer_feats = [] | |
| for s, m in zip(si_clean, mr_clean): | |
| seq_features.append(np.concatenate([one_hot_seq(s), one_hot_seq(m)])) | |
| per_pos, summary = interaction_features(s, m) | |
| inter_per_pos.append(per_pos) | |
| inter_summary.append(summary) | |
| mono_si, di_si = kmer_counts(s) | |
| mono_mr, di_mr = kmer_counts(m) | |
| kmer_feats.append(np.concatenate([mono_si, di_si, mono_mr, di_mr])) | |
| seq_arr = np.vstack(seq_features) | |
| inter_arr = np.vstack(inter_per_pos) | |
| inter_sum_arr = np.vstack(inter_summary) | |
| kmer_arr = np.vstack(kmer_feats) | |
| drop_cols = ["siRNA", "mRNA", "extended_mRNA", "efficiency", "numeric_label", "id", "source", "cell_line"] | |
| numeric_cols = [c for c in work_df.columns if c not in drop_cols] | |
| numeric_arr = work_df[numeric_cols].astype(np.float32).to_numpy() if numeric_cols else np.zeros((len(work_df), 0), dtype=np.float32) | |
| cat_df = work_df[["source", "cell_line"]] | |
| if encoder is None: | |
| encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False) | |
| if fit_encoder: | |
| cat_arr = encoder.fit_transform(cat_df) | |
| else: | |
| cat_arr = encoder.transform(cat_df) | |
| feats = np.concatenate([seq_arr, inter_arr, inter_sum_arr, kmer_arr, numeric_arr, cat_arr], axis=1) | |
| feature_names: list[str] = [] | |
| feature_names += [f"siRNA_pos{i + 1}_{b}" for i in range(19) for b in BASES] | |
| feature_names += [f"mRNA_pos{i + 1}_{b}" for i in range(19) for b in BASES] | |
| feature_names += [f"inter_wc_pos{i + 1}" for i in range(19)] | |
| feature_names += [f"inter_wobble_pos{i + 1}" for i in range(19)] | |
| feature_names += [f"inter_mismatch_pos{i + 1}" for i in range(19)] | |
| feature_names += ["total_wc", "total_wobble", "total_mismatch", "seed_wc", "seed_wobble"] | |
| feature_names += [f"si_mono_{b}" for b in BASES] | |
| feature_names += [f"si_di_{i}" for i in range(16)] | |
| feature_names += [f"mr_mono_{b}" for b in BASES] | |
| feature_names += [f"mr_di_{i}" for i in range(16)] | |
| feature_names += numeric_cols | |
| feature_names += encoder.get_feature_names_out(["source", "cell_line"]).tolist() | |
| if fit_encoder and artifacts_path: | |
| artifact = { | |
| "categories": [cats.tolist() for cats in encoder.categories_], | |
| "category_feature_names": encoder.get_feature_names_out(["source", "cell_line"]).tolist(), | |
| "numeric_cols": numeric_cols, | |
| "feature_names": feature_names, | |
| } | |
| Path(artifacts_path).write_text(json.dumps(artifact, indent=2)) | |
| return feats.astype(np.float32), feature_names, encoder | |