Spaces:
Running
Running
| import streamlit as st | |
| import pandas as pd | |
| import math | |
| from user_script import ( | |
| read_uploaded_fasta_or_zip, | |
| derive_serotype_names_from_sources, | |
| compute_unique_kmers_per_serotype, | |
| classify_unknown_sequences, | |
| parse_k_input, | |
| plot_counts_by_serotype, | |
| ) | |
| st.set_page_config(page_title="Serotype k-mer classifier", layout="wide") | |
| st.title("🧬 Serotype k-mer classifier — STRICT uniques (Streamlit)") | |
| st.markdown("""Upload **known serotype** sequences (FASTA or a ZIP with one FASTA per serotype) and **unknown** sequences, | |
| choose parameters, then click **Run**. The app computes strictly unique k-mers per serotype across k and classifies unknowns. | |
| """) | |
| st.sidebar.header("Inputs") | |
| known_file = st.sidebar.file_uploader("Known serotypes (FASTA or ZIP of FASTA files)", type=["fasta","fa","fas","fna","zip"]) | |
| unknown_file = st.sidebar.file_uploader("Unknown sequences (FASTA or ZIP of FASTA files)", type=["fasta","fa","fas","fna","zip"]) | |
| seqtype = st.sidebar.selectbox("Sequence type", ["DNA", "Protein"]) | |
| is_protein = (seqtype == "Protein") | |
| default_k = 9 if is_protein else 21 | |
| k_input = st.sidebar.text_input("k values (e.g. 21 or 15-21 or 7,9,11)", value=str(default_k)) | |
| fdr_alpha = st.sidebar.number_input("FDR α", min_value=0.0, value=0.05, step=0.01) | |
| run = st.sidebar.button("▶ Run analysis") | |
| if run: | |
| if not known_file or not unknown_file: | |
| st.error("Please upload both known and unknown sequences.") | |
| st.stop() | |
| with st.spinner("Reading uploads..."): | |
| known_records = read_uploaded_fasta_or_zip(known_file) # list of (src, header, seq) | |
| unknown_records = read_uploaded_fasta_or_zip(unknown_file) | |
| if not known_records: | |
| st.error("No records found in the known upload."); st.stop() | |
| if not unknown_records: | |
| st.error("No records found in the unknown upload."); st.stop() | |
| # Map headers to serotype names | |
| name_map = derive_serotype_names_from_sources(known_records) | |
| serotype_to_seq = {} | |
| for src, header, seq in known_records: | |
| sero = name_map.get(header, header.split()[0]) | |
| if sero not in serotype_to_seq: | |
| serotype_to_seq[sero] = seq | |
| k_values = parse_k_input(k_input, default_single=default_k) | |
| k_values = sorted({k for k in k_values if k >= 3}) | |
| if not k_values: | |
| st.error("No valid k values (>=3)."); st.stop() | |
| st.info(f"Detected serotypes: {list(serotype_to_seq.keys())}") | |
| st.info(f"k values: {k_values}") | |
| with st.spinner("Computing strict-unique k-mers per serotype..."): | |
| uniques = compute_unique_kmers_per_serotype(serotype_to_seq, is_protein=is_protein, k_values=k_values) | |
| with st.spinner("Classifying unknown sequences..."): | |
| df_full = classify_unknown_sequences(unknown_records, uniques, is_protein=is_protein, fdr_alpha=fdr_alpha) | |
| def best_score(row): | |
| g = row["Predicted_serotype"] | |
| if g == "NoMatch": | |
| return 0.0 | |
| q = row.get(f"FDR_{g}", 1.0) | |
| return 0.0 if (q is None or q <= 0) else -math.log10(max(q, 1e-300)) | |
| df_full["Score_-log10FDR"] = df_full.apply(best_score, axis=1) | |
| show_cols = ["Source","Sequence","Predicted_serotype","Matches_total","Confidence_by_present","Confidence_by_serotype_vocab","Score_-log10FDR"] | |
| st.subheader("Predictions") | |
| st.dataframe(df_full[show_cols].sort_values("Score_-log10FDR", ascending=False), use_container_width=True) | |
| fig = plot_counts_by_serotype(df_full) | |
| st.subheader("Predicted serotype counts") | |
| st.pyplot(fig) | |
| st.subheader("Downloads") | |
| csv = df_full.to_csv(index=False).encode("utf-8") | |
| st.download_button("Download predictions_by_serotype.csv", data=csv, file_name="predictions_by_serotype.csv", mime="text/csv") | |
| else: | |
| st.info("Upload the two files on the left, set parameters, then click **Run analysis**.") | |