Spaces:
Paused
Paused
| # MultiMolecule | |
| # Copyright (C) 2024-Present MultiMolecule | |
| from __future__ import annotations | |
| import csv | |
| import json | |
| import re | |
| import tempfile | |
| import time | |
| from functools import lru_cache | |
| from typing import Any, Mapping | |
| from urllib.parse import parse_qs, urlparse | |
| import gradio as gr | |
| import matplotlib | |
| import numpy as np | |
| import torch | |
| from transformers import pipeline | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt # noqa: E402 | |
| import multimolecule # noqa: E402, F401 - registers MultiMolecule models and pipelines with Transformers | |
| MODEL_OPTIONS = { | |
| "DeepCpG-DNA Smallwood 2014 serum mESC": "multimolecule/deepcpgdna-smallwood2014-serum", | |
| "DeepCpG-DNA Smallwood 2014 2i mESC": "multimolecule/deepcpgdna-smallwood2014-2i", | |
| "DeepCpG-DNA Hou 2016 HCC": "multimolecule/deepcpgdna-hou2016-hcc", | |
| "DeepCpG-DNA Hou 2016 HepG2": "multimolecule/deepcpgdna-hou2016-hepg2", | |
| "DeepCpG-DNA Hou 2016 mESC": "multimolecule/deepcpgdna-hou2016-mesc", | |
| } | |
| MODEL_LABELS = {model_id: label for label, model_id in MODEL_OPTIONS.items()} | |
| DEFAULT_MODEL_LABEL = "DeepCpG-DNA Smallwood 2014 serum mESC" | |
| DEFAULT_SEQUENCE = ("ACGT" * 125)[:499] + "CG" + ("TGCA" * 125)[:500] | |
| DNA_ALPHABET = set("ACGTN") | |
| def _device() -> int: | |
| return 0 if torch.cuda.is_available() else -1 | |
| def _device_label() -> str: | |
| return "cuda" if torch.cuda.is_available() else "cpu" | |
| def load_predictor(model_id: str): | |
| return pipeline("methylation", model=model_id, device=_device()) | |
| def clean_sequence(sequence: str) -> str: | |
| lines = [] | |
| for line in str(sequence or "").splitlines(): | |
| line = line.strip() | |
| if line and not line.startswith(">"): | |
| lines.append(line) | |
| sequence = re.sub(r"\s+", "", "".join(lines)).upper().replace("U", "T") | |
| if not sequence: | |
| raise gr.Error("Sequence is empty.") | |
| invalid = sorted(set(sequence) - DNA_ALPHABET) | |
| if invalid: | |
| raise gr.Error(f"DNA sequence contains unsupported characters: {', '.join(invalid)}.") | |
| return sequence | |
| def unpack_prediction_result(result: Any) -> dict[str, Any]: | |
| if isinstance(result, list): | |
| if len(result) != 1: | |
| raise gr.Error(f"Expected one prediction result, got {len(result)}.") | |
| result = result[0] | |
| if not isinstance(result, dict): | |
| raise gr.Error(f"Expected a prediction dictionary, got {type(result).__name__}.") | |
| return result | |
| def score_rows_from_result(result: Mapping[str, Any]) -> list[list[Any]]: | |
| channels = [str(channel) for channel in result.get("channels", [])] | |
| if "score" in result: | |
| return rows_from_values(result["score"], channels or ["methylation"]) | |
| if "scores" in result: | |
| scores = result["scores"] | |
| if isinstance(scores, Mapping): | |
| return [[str(channel), number_value(score)] for channel, score in scores.items()] | |
| if isinstance(scores, list): | |
| return rows_from_values(scores, channels) | |
| raise gr.Error("The selected model did not return methylation scores.") | |
| def rows_from_values(values: Any, channels: list[str]) -> list[list[Any]]: | |
| if isinstance(values, (list, tuple)): | |
| if len(channels) != len(values): | |
| channels = [f"methylation_{index}" for index in range(len(values))] | |
| return [[channel, number_value(value)] for channel, value in zip(channels, values)] | |
| return [[channels[0] if channels else "methylation", number_value(values)]] | |
| def number_value(value: Any) -> float: | |
| try: | |
| number = float(value) | |
| except (TypeError, ValueError) as error: | |
| raise gr.Error(f"Score value {value!r} is not numeric.") from error | |
| if not np.isfinite(number): | |
| raise gr.Error(f"Score value {value!r} is not finite.") | |
| return number | |
| def plot_scores(rows: list[list[Any]], top_n: int | float): | |
| top_n = max(1, int(top_n or 25)) | |
| values = [(str(channel), float(score)) for channel, score in rows] | |
| values = sorted(values, key=lambda item: item[1], reverse=True)[:top_n] | |
| height = max(3.0, min(12.0, 1.2 + 0.34 * len(values))) | |
| fig, ax = plt.subplots(figsize=(8.0, height)) | |
| if not values: | |
| ax.set_axis_off() | |
| return fig | |
| labels = [label if len(label) <= 58 else f"{label[:55]}..." for label, _ in values] | |
| scores = [score for _, score in values] | |
| y_positions = np.arange(len(values)) | |
| ax.barh(y_positions, scores, color="#2f6f9f") | |
| ax.set_yticks(y_positions, labels) | |
| ax.invert_yaxis() | |
| if all(0.0 <= score <= 1.0 for score in scores): | |
| ax.set_xlim(0.0, 1.0) | |
| ax.set_xlabel("Methylation score") | |
| ax.grid(axis="x", alpha=0.2) | |
| fig.tight_layout() | |
| return fig | |
| def write_result_files( | |
| metadata: Mapping[str, Any], result: Mapping[str, Any], rows: list[list[Any]] | |
| ) -> tuple[str, str]: | |
| csv_file = tempfile.NamedTemporaryFile("w", suffix=".csv", delete=False, newline="") | |
| writer = csv.writer(csv_file) | |
| writer.writerow(["channel", "score"]) | |
| writer.writerows(rows) | |
| csv_file.close() | |
| json_file = tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) | |
| json.dump( | |
| { | |
| "metadata": dict(metadata), | |
| "scores": [{"channel": channel, "score": score} for channel, score in rows], | |
| "raw_result": result, | |
| }, | |
| json_file, | |
| indent=2, | |
| ) | |
| json_file.close() | |
| return csv_file.name, json_file.name | |
| def predict(model_label: str, sequence: str, top_n: int | float): | |
| model_id = MODEL_OPTIONS[model_label] | |
| sequence = clean_sequence(sequence) | |
| started = time.perf_counter() | |
| try: | |
| result = load_predictor(model_id)(sequence) | |
| except gr.Error: | |
| raise | |
| except Exception as error: | |
| raise gr.Error(f"Prediction failed for {model_id}: {error}") from error | |
| result = unpack_prediction_result(result) | |
| rows = score_rows_from_result(result) | |
| metadata = { | |
| "task": "methylation", | |
| "model": model_id, | |
| "model_label": model_label, | |
| "device": _device_label(), | |
| "sequence_length": len(sequence), | |
| "score_count": len(rows), | |
| "channels": result.get("channels", []), | |
| "elapsed_seconds": round(time.perf_counter() - started, 3), | |
| } | |
| csv_path, json_path = write_result_files(metadata, result, rows) | |
| return rows, metadata, plot_scores(rows, top_n), csv_path, json_path | |
| def initial_model(request: gr.Request): | |
| if request is None: | |
| return DEFAULT_MODEL_LABEL | |
| query_params = getattr(request, "query_params", None) | |
| model_id = query_params.get("model") if query_params is not None else None | |
| if not model_id and getattr(request, "url", None): | |
| parsed = parse_qs(urlparse(str(request.url)).query) | |
| model_values = parsed.get("model") | |
| model_id = model_values[0] if model_values else None | |
| return MODEL_LABELS.get(model_id, DEFAULT_MODEL_LABEL) | |
| with gr.Blocks(title="Methylation") as demo: | |
| gr.Markdown( | |
| "# Methylation\n" "Run MultiMolecule DNA methylation checkpoints and inspect per-cell methylation scores." | |
| ) | |
| with gr.Row(): | |
| model = gr.Dropdown(choices=list(MODEL_OPTIONS.keys()), value=DEFAULT_MODEL_LABEL, label="Checkpoint") | |
| top_n = gr.Slider(1, 50, value=25, step=1, label="Bar count") | |
| sequence = gr.Textbox(label="DNA sequence", value=DEFAULT_SEQUENCE, lines=7) | |
| run = gr.Button("Run prediction", variant="primary") | |
| with gr.Row(): | |
| scores = gr.Dataframe(headers=["channel", "score"], datatype=["str", "number"], label="Score table") | |
| metadata = gr.JSON(label="Run metadata") | |
| score_plot = gr.Plot(label="Score bar plot") | |
| with gr.Row(): | |
| csv_download = gr.File(label="Download CSV") | |
| json_download = gr.File(label="Download JSON") | |
| run.click( | |
| predict, inputs=[model, sequence, top_n], outputs=[scores, metadata, score_plot, csv_download, json_download] | |
| ) | |
| demo.load(initial_model, outputs=model) | |
| if __name__ == "__main__": | |
| demo.launch() | |