data-browser / filter_engine.py
anapaulagomes's picture
Sync from GitHub via hub-sync
5f1fdca verified
import marimo
__generated_with = "0.14.16"
app = marimo.App(
width="medium",
app_title="Open Syndrome Definition - Data Browser",
)
@app.cell
def _():
import marimo as mo
import json
from pathlib import Path
import polars as pl
import plotly.graph_objects as go
import yaml
from opensyndrome.filter import OSDEngine, load_profile
from opensyndrome.artifacts import get_definition_dir
return (
OSDEngine,
Path,
get_definition_dir,
go,
json,
load_profile,
mo,
pl,
yaml,
)
@app.cell
def _(go, pl):
def plot_cases(
_df_filtered, definitions, date_column="date", date_format="%Y-%m-%d %H:%M:%S"
):
_definitions_columns_sum = [
pl.col(definition).sum().alias(definition) for definition in definitions
]
_agg_df = (
_df_filtered.with_columns(
pl.col(date_column)
.str.to_datetime(format=date_format, strict=False)
.cast(pl.Date)
.dt.truncate("1mo")
.alias("_month")
)
.group_by("_month")
.agg(_definitions_columns_sum)
.sort("_month")
)
_fig = go.Figure()
for definition in definitions:
_fig.add_trace(
go.Scatter(
x=_agg_df["_month"],
y=_agg_df[definition],
mode="lines+markers",
name=definition,
)
)
_fig.update_layout(yaxis=dict(tickformat="d", rangemode="tozero"))
return _fig
return (plot_cases,)
@app.cell
def _(go, pl):
def groupped_bar(_df_filtered, definitions, group_by_column="code", top_n=10):
_agg_df = (
_df_filtered.group_by(group_by_column)
.agg([pl.col(definition).sum() for definition in definitions])
.sort(group_by_column)
)
_agg_df = _agg_df.with_columns(
(sum([pl.col(definition) for definition in definitions])).alias("total")
)
_agg_df = _agg_df.sort("total", descending=True).head(
top_n
) # .sort(group_by_column)
_fig = go.Figure(
data=[
go.Bar(
name=definition, x=_agg_df[group_by_column], y=_agg_df[definition]
)
for definition in definitions
]
)
_fig.update_layout(barmode="group")
return _fig
return (groupped_bar,)
@app.cell
def _(mo):
mo.md(r"""# Open Syndrome Definition πŸ‘©πŸ½β€πŸ”¬""")
return
@app.cell
def _(mo):
mo.callout(
mo.md(
"This is a prototype of how to filter your data using definitions from the Open Syndrome Initiative.\n\n"
"We do not store any data."
),
kind="neutral",
)
return
@app.cell
def _(Path):
EXAMPLE_DATASETS = {
"Toy dataset": {
"csv": Path("toy_dataset.csv"),
"mapping": Path("mapping.yaml"),
"date_column": "recording_ts",
},
}
return (EXAMPLE_DATASETS,)
@app.cell
def _(EXAMPLE_DATASETS, mo):
data_source = mo.ui.radio(
options=["Load example", "Upload your own"],
value="Load example",
)
example_picker = mo.ui.dropdown(
options=list(EXAMPLE_DATASETS.keys()),
label="Example dataset",
)
sample_file = mo.ui.file(kind="area", filetypes=[".csv"])
return data_source, example_picker, sample_file
@app.cell
def _(data_source, example_picker, mo, sample_file):
mo.vstack(
[
mo.md("## Load your data"),
data_source,
example_picker if data_source.value == "Load example" else sample_file,
]
)
return
@app.cell
def _(EXAMPLE_DATASETS, data_source, example_picker, pl, sample_file):
if data_source.value == "Load example":
df_selected = (
pl.read_csv(EXAMPLE_DATASETS[example_picker.value]["csv"])
if example_picker.value
else None
)
else:
df_selected = pl.read_csv(sample_file.contents()) if sample_file.value else None
return (df_selected,)
@app.cell
def _(EXAMPLE_DATASETS, data_source, example_picker):
_default_yaml = """\
profiles:
- name: my_dataset
# value_encodings: # optional β€” map OSD canonical values to dataset-specific ones
# sex:
# male: "M"
# female: "F"
columns:
# Rename the keys below to match your dataset column names.
# Available concepts: diagnosis, demographic_criteria, symptom,
# diagnostic_test, epidemiological_history
my_diagnosis_column:
concept: diagnosis
dtype: string
# my_age_column:
# concept: demographic_criteria
# attribute: age
# dtype: integer
"""
if data_source.value == "Load example" and example_picker.value:
_example = EXAMPLE_DATASETS[example_picker.value]
initial_yaml = _example["mapping"].read_text()
initial_date_column = _example["date_column"]
else:
initial_yaml = _default_yaml
initial_date_column = None
return initial_date_column, initial_yaml
@app.cell
def _(df_selected, initial_date_column, initial_yaml, mo):
mo.stop(df_selected is None)
yaml_editor = mo.ui.code_editor(
value=initial_yaml,
language="yaml",
).form(label="Data to OSD mapping", show_clear_button=True, bordered=True)
date_column_picker = mo.ui.dropdown(
options=df_selected.columns,
label="Date column",
value=initial_date_column,
)
date_format_input = mo.ui.text(
value="%Y-%m-%d %H:%M:%S",
label="Date format",
)
_cols_hint = "`, `".join(df_selected.columns)
mo.vstack(
[
mo.md("### Mapping your data to the format"),
mo.md(
"Edit the YAML below to map your dataset columns to OSD concepts, "
"then click **Submit**. "
"Select the date column separately for the time-series view.\n\n"
f"Your dataset columns: `{_cols_hint}`"
),
mo.hstack(
[yaml_editor, mo.vstack([date_column_picker, date_format_input])],
widths=[3, 1],
align="start",
),
]
)
return date_column_picker, date_format_input, yaml_editor
@app.cell
def _(df_selected, load_profile, mo, yaml, yaml_editor):
mo.stop(yaml_editor.value is None)
try:
_parsed = yaml.safe_load(yaml_editor.value)
except yaml.YAMLError as _e:
mo.stop(True, mo.callout(mo.md(f"**Invalid YAML:** {_e}"), kind="danger"))
if not _parsed["profiles"][0]["columns"]:
mo.stop(
True,
mo.callout(mo.md("You need to map **at least one column**"), kind="danger"),
)
not_found = []
for declared_column in _parsed["profiles"][0]["columns"]:
if declared_column not in df_selected.columns:
not_found.append(declared_column)
if not_found:
mo.stop(
True,
mo.callout(
mo.md(f"**Columns not found:** {', '.join(not_found)}"), kind="danger"
),
)
try:
_profile_name = _parsed["profiles"][0]["name"]
profile = load_profile(_parsed, _profile_name)
except (KeyError, IndexError, ValueError) as _e:
mo.stop(True, mo.callout(mo.md(f"**Profile error:** {_e}"), kind="danger"))
return (profile,)
@app.cell
def _(date_column_picker, mo, profile):
mo.stop(date_column_picker.value is None)
date_column = date_column_picker.value
_diagnosis_cols = [c.col_name for c in profile.columns if c.concept == "diagnosis"]
code_column = _diagnosis_cols[0] if _diagnosis_cols else None
return code_column, date_column
@app.cell
def _(get_definition_dir):
definition_options = {
filepath.name.replace(".json", ""): filepath
for filepath in get_definition_dir().glob("**/*.json")
}
return (definition_options,)
@app.cell
def _(definition_options, mo):
definitions_dropdown = mo.ui.multiselect(
label="Select Syndromic Indicators", options=sorted(definition_options.keys())
)
return (definitions_dropdown,)
@app.cell
def _(mo):
mo.md(r"""### Data sample""")
return
@app.cell
def _(df_selected):
df_selected.sample(10)
return
@app.cell
def _(mo):
mo.md(r"""---""")
return
@app.cell
def _(mo):
mo.md(r"""## Data & Definitions""")
return
@app.cell
def _(definitions_dropdown, mo):
mo.hstack([mo.md("**::lucide:filter:: Filters:**"), definitions_dropdown])
return
@app.cell
def _(
OSDEngine,
definition_options,
definitions_dropdown,
df_selected,
json,
mo,
profile,
):
mo.stop(
df_selected is None or df_selected.is_empty() or not definitions_dropdown.value
)
definitions = definitions_dropdown.value
# skip criteria that can't be evaluated (e.g. professional_judgment)
engine = OSDEngine(profile, skip_unresolvable=True)
defs_dict = {
name: json.loads(definition_options[name].read_text()) for name in definitions
}
df_filtered = engine.label(df_selected, defs_dict)
return definitions, df_filtered
@app.cell
def _(definitions, df_filtered, df_selected, mo):
mo.stop(definitions is None or df_filtered is None)
_cards = [
mo.stat(
label="Syndromic Indicators",
value=len(definitions),
caption=", ".join([definition for definition in definitions]),
bordered=True,
),
mo.stat(
label="Rows",
value=df_selected.shape[0],
),
mo.stat(
label="Columns",
value=df_selected.shape[1],
),
]
mo.hstack(_cards, widths="equal", align="center")
return
@app.cell
def _(definition_options, json):
def load_definition(name: str) -> dict:
return json.loads(definition_options[name].read_text())
return (load_definition,)
@app.cell
def _(mo):
top_n = mo.ui.number(start=1, stop=10, label="Number of top codes", value=3, step=1)
return (top_n,)
@app.cell
def _(
code_column,
date_column,
date_format_input,
definitions,
df_filtered,
df_selected,
groupped_bar,
mo,
plot_cases,
top_n,
):
mo.stop(definitions is None or df_selected is None)
if code_column:
diagnosis_chart = [
mo.md("### Codes comparison per syndromic indicator"),
top_n.left(),
groupped_bar(
df_filtered,
definitions,
top_n=top_n.value or 3,
group_by_column=code_column,
),
]
else:
diagnosis_chart = []
timeseries = [
mo.md("### Time series"),
plot_cases(
df_filtered,
definitions,
date_column=date_column,
date_format=date_format_input.value,
),
]
mo.vstack(
[
mo.md("## Findings from the data πŸ“Š"),
*timeseries,
*diagnosis_chart,
]
)
return
@app.cell
def _(definitions, load_definition, mo):
mo.stop(definitions is None)
mo.vstack(
[
mo.md("### Definitions details"),
mo.md(
"Here the definitions used to filter the data. See here what criteria were applied. πŸ”Ž"
),
mo.accordion(
{
"JSONs": mo.accordion(
{
definition: mo.json(load_definition(definition))
for definition in definitions
}
),
},
),
]
)
return
if __name__ == "__main__":
app.run()