Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import io | |
| import re | |
| import numpy as np | |
| import openpyxl | |
| # ========================= | |
| # Streamlit App Setup | |
| # ========================= | |
| st.set_page_config(page_title="ASCII ↔ Binary Converter", layout="wide") | |
| st.title("ASCII ↔ Binary Converter") | |
| # ========================= | |
| # Voyager ASCII 6-bit Table | |
| # ========================= | |
| voyager_table = { | |
| i: ch for i, ch in enumerate([ | |
| ' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', | |
| 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', | |
| 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', | |
| '3', '4', '5', '6', '7', '8', '9', '.', ',', '(', | |
| ')','+', '-', '*', '/', '=', '$', '!', ':', '%', | |
| '"', '#', '@', "'", '?', '&' | |
| ]) | |
| } | |
| reverse_voyager_table = {v: k for k, v in voyager_table.items()} | |
| # ========================= | |
| # Helper Functions | |
| # ========================= | |
| def string_to_binary_labels(s: str) -> list[int]: | |
| bits = [] | |
| for char in s: | |
| val = reverse_voyager_table.get(char.upper(), 0) | |
| char_bits = [(val >> bit) & 1 for bit in range(5, -1, -1)] | |
| bits.extend(char_bits) | |
| return bits | |
| def binary_labels_to_string(bits: list[int]) -> str: | |
| chars = [] | |
| for i in range(0, len(bits), 6): | |
| chunk = bits[i:i+6] | |
| if len(chunk) < 6: | |
| chunk += [0] * (6 - len(chunk)) | |
| val = sum(b << (5 - j) for j, b in enumerate(chunk)) | |
| chars.append(voyager_table.get(val, '?')) | |
| return ''.join(chars) | |
| # ========================= | |
| # Tabs | |
| # ========================= | |
| tab1, tab2, tab3 = st.tabs(["Text → Binary", "Binary → Text", "Robot Script"]) | |
| # -------------------------------------------------- | |
| # TAB 1: Text → Binary | |
| # -------------------------------------------------- | |
| with tab1: | |
| st.markdown(""" | |
| Convert any text into binary labels using the **Voyager 6-bit ASCII table**. | |
| You can control how many positions (columns) are grouped per row. | |
| """) | |
| st.subheader("Step 1 – Input Text") | |
| user_input = st.text_input("Enter your text:", value="DNA", key="input_text") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| group_size = st.slider("Select number of positions per row:", min_value=12, max_value=32, value=25) | |
| with col2: | |
| custom_cols = st.number_input("Or enter custom number:", min_value=1, max_value=128, value=group_size) | |
| if custom_cols != group_size: | |
| group_size = custom_cols | |
| if user_input: | |
| binary_labels = string_to_binary_labels(user_input) | |
| binary_concat = ''.join(map(str, binary_labels)) | |
| # Step 2: Binary Labels per Character | |
| st.markdown("### Step 2 – Binary Labels per Character") | |
| st.caption("Scroll to view all characters") | |
| # Scrollable block | |
| grouped_bits = [binary_labels[i:i+6] for i in range(0, len(binary_labels), 6)] | |
| scroll_html = "<div style='max-height: 300px; overflow-y: auto; font-family: monospace; padding: 6px; border: 1px solid #ccc;'>" | |
| for i, bits in enumerate(grouped_bits): | |
| ch = user_input[i] if i < len(user_input) else "?" | |
| scroll_html += f"<div>'{ch}' → {bits}</div>" | |
| scroll_html += "</div>" | |
| st.markdown(scroll_html, unsafe_allow_html=True) | |
| # Download full concatenated binary text | |
| st.download_button( | |
| "⬇️ Download Full Binary (.txt)", | |
| data=binary_concat, | |
| file_name="binary_full.txt", | |
| mime="text/plain", | |
| key="download_binary_txt" | |
| ) | |
| # Step 3: Grouped Binary Matrix | |
| st.markdown("### Step 3 – Grouped Binary Matrix") | |
| groups = [] | |
| for i in range(0, len(binary_labels), group_size): | |
| group = binary_labels[i:i+group_size] | |
| if len(group) < group_size: | |
| group += [0] * (group_size - len(group)) | |
| groups.append(group) | |
| columns = [f"Position {i+1}" for i in range(group_size)] | |
| df = pd.DataFrame(groups, columns=columns) | |
| st.dataframe(df, use_container_width=True) | |
| st.download_button( | |
| "⬇️ Download as CSV", | |
| df.to_csv(index=False), | |
| file_name=f"binary_labels_{group_size}_positions.csv", | |
| mime="text/csv", | |
| key="download_binary_csv" | |
| ) | |
| else: | |
| st.info("👆 Enter text above to see binary labels.") | |
| # -------------------------------------------------- | |
| # TAB 2: Binary → Text | |
| # -------------------------------------------------- | |
| with tab2: | |
| st.markdown(""" | |
| Convert binary data back into readable text. | |
| Upload either: | |
| - `.csv` file with 0/1 values (any number of columns/rows) | |
| - `.xlsx` Excel file | |
| - `.txt` file containing a concatenated binary string (e.g. `010101...`) | |
| """) | |
| uploaded = st.file_uploader("Upload your file (.csv, .xlsx, or .txt):", type=["csv", "xlsx", "txt"]) | |
| if uploaded is not None: | |
| try: | |
| if uploaded.name.endswith(".csv"): | |
| df = pd.read_csv(uploaded) | |
| bits = df.values.flatten().astype(int).tolist() | |
| elif uploaded.name.endswith(".xlsx"): | |
| df = pd.read_excel(uploaded) | |
| bits = df.values.flatten().astype(int).tolist() | |
| elif uploaded.name.endswith(".txt"): | |
| content = uploaded.read().decode().strip() | |
| bits = [int(b) for b in content if b in ['0', '1']] | |
| else: | |
| bits = [] | |
| if not bits: | |
| st.warning("No binary data detected.") | |
| else: | |
| recovered_text = binary_labels_to_string(bits) | |
| st.success("✅ Conversion complete!") | |
| st.markdown("**Recovered text:**") | |
| st.text_area("Output", recovered_text, height=150) | |
| st.download_button( | |
| "⬇️ Download Recovered Text (.txt)", | |
| data=recovered_text, | |
| file_name="recovered_text.txt", | |
| mime="text/plain", | |
| key="download_recovered" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error reading or converting file: {e}") | |
| else: | |
| st.info("👆 Upload a file to start the reverse conversion.") | |
| # -------------------------------------------------- | |
| # TAB 3: Pipetting Command Generator | |
| # -------------------------------------------------- | |
| with tab3: | |
| import numpy as np | |
| import pandas as pd | |
| import re | |
| from math import ceil | |
| st.header("🧪 Pipetting Command Generator") | |
| st.markdown(""" | |
| Upload your sample file (Excel, CSV, or TXT) containing binary mutation data. | |
| The app will: | |
| - Auto-detect or create `Sample`, `Position#`, `Total edited`, and `Volume per "1"` columns | |
| - Let you set the **Desired total volume per sample (µL)** used to compute `Volume per "1"` | |
| - Calculate total demand per input and suggest a **uniform layout** (same # consecutive wells per input) | |
| - **Preview** the layout on a plate map (with tooltips) | |
| - After confirmation, generate pipetting commands and a source volume summary | |
| """) | |
| uploaded = st.file_uploader("📤 Upload data file", type=["xlsx", "csv", "txt"]) | |
| max_per_well_ul = st.number_input( | |
| "Maximum volume per source well (µL)", | |
| min_value=10.0, max_value=2000.0, value=160.0, step=10.0 | |
| ) | |
| # ---------- Helpers (plate geometry, parsing, viz) ---------- | |
| ROWS_96 = ["A", "B", "C", "D", "E", "F", "G", "H"] | |
| COLS_96 = list(range(1, 13)) | |
| def well_name(row_letter, col_number): | |
| return f"{row_letter}{col_number}" | |
| def enumerate_plate_wells(): | |
| """Yield wells A1..A12, B1..B12, ..., H12 for a single plate.""" | |
| for r in ROWS_96: | |
| for c in COLS_96: | |
| yield f"{r}{c}" | |
| def parse_well_name(well: str): | |
| """Split 'A1'/'H12' → (row_letter, col_num). Robust to stray spaces.""" | |
| m = re.match(r"([A-Ha-h])\s*([0-9]+)", str(well).strip()) | |
| if not m: | |
| return ("A", 0) | |
| return (m.group(1).upper(), int(m.group(2))) | |
| def sample_index_to_plate_and_well(sample_idx: int): | |
| """Destination mapping: 96-well plates in reading order, extends to multiple plates.""" | |
| plate_num = ((sample_idx - 1) // 96) + 1 | |
| within_plate = (sample_idx - 1) % 96 | |
| row_idx = within_plate // 12 | |
| col_idx = within_plate % 12 | |
| return plate_num, well_name(ROWS_96[row_idx], COLS_96[col_idx]) | |
| def build_global_wells_list(n_plates: int): | |
| out = [] | |
| for p in range(1, n_plates + 1): | |
| for w in enumerate_plate_wells(): | |
| out.append((p, w)) | |
| return out | |
| def pick_tool(volume_ul: float) -> str: | |
| return "TS_10" if volume_ul <= 10.0 else "TS_50" | |
| # Color palette (cycled if many inputs) | |
| PALETTE = [ | |
| "#4F46E5", "#22C55E", "#F59E0B", "#EF4444", "#06B6D4", "#A855F7", "#84CC16", "#F97316", | |
| "#0EA5E9", "#E11D48", "#10B981", "#7C3AED", "#15803D", "#EA580C", "#2563EB", "#DC2626" | |
| ] | |
| def render_plate_map_html(plates_used, well_to_input, max_wells_per_source, inputs_count): | |
| """Fancy HTML plate grids with tooltips.""" | |
| legend_spans = [] | |
| for i in range(1, inputs_count + 1): | |
| color = PALETTE[(i-1) % len(PALETTE)] | |
| legend_spans.append( | |
| f"<span style='display:inline-block;margin-right:12px'>" | |
| f"<span style='display:inline-block;width:12px;height:12px;background:{color};border:1px solid #333;margin-right:6px;vertical-align:middle'></span>" | |
| f"Input {i}</span>" | |
| ) | |
| legend_html = "<div style='margin:8px 0 16px 0'>" + "".join(legend_spans) + "</div>" | |
| css = """ | |
| <style> | |
| .plate { margin: 10px 0 24px 0; } | |
| .plate-title { font-weight: 600; margin: 4px 0 8px 0; } | |
| .grid { display: grid; grid-template-columns: 32px repeat(12, 38px); grid-auto-rows: 32px; gap: 4px; } | |
| .cell { width: 38px; height: 32px; border: 1px solid #DDD; display:flex; align-items:center; justify-content:center; font-size:12px; background:#FAFAFA; position:relative; } | |
| .head { font-weight:600; background:#F3F4F6; } | |
| .cell[data-color] { color:#111; } | |
| .cell .tip { visibility:hidden; opacity:0; transition:opacity 0.15s ease; position:absolute; bottom:100%; transform:translateY(-6px); left:50%; transform:translate(-50%, -6px); background:#111; color:#fff; padding:4px 6px; font-size:11px; border-radius:4px; white-space:nowrap; pointer-events:none; } | |
| .cell:hover .tip { visibility:visible; opacity:0.95; } | |
| </style> | |
| """ | |
| body = [css, legend_html] | |
| for p in range(1, plates_used + 1): | |
| body.append(f"<div class='plate'><div class='plate-title'>Plate {p}</div>") | |
| body.append("<div class='grid'>") | |
| body.append("<div class='cell head'></div>") | |
| for c in COLS_96: | |
| body.append(f"<div class='cell head'>{c}</div>") | |
| for r in ROWS_96: | |
| body.append(f"<div class='cell head'>{r}</div>") | |
| for c in COLS_96: | |
| well = f"{r}{c}" | |
| key = (p, well) | |
| if key in well_to_input: | |
| input_idx, within_idx = well_to_input[key] | |
| color = PALETTE[(input_idx-1) % len(PALETTE)] | |
| tip = f"Input {input_idx} • P{p}:{well} • Block well {within_idx}/{max_wells_per_source}" | |
| cell_html = ( | |
| f"<div class='cell' data-color style='background:{color};border-color:#555' title='{tip}'>" | |
| f"<span class='tip'>{tip}</span>" | |
| "</div>" | |
| ) | |
| else: | |
| cell_html = "<div class='cell'></div>" | |
| body.append(cell_html) | |
| body.append("</div></div>") | |
| return "".join(body) | |
| # ---------- Main flow ---------- | |
| if uploaded is not None: | |
| try: | |
| # --- Load file --- | |
| if uploaded.name.endswith(".xlsx"): | |
| df = pd.read_excel(uploaded) | |
| elif uploaded.name.endswith(".csv"): | |
| df = pd.read_csv(uploaded) | |
| else: # TXT (tab-delimited try, else CSV) | |
| try: | |
| df = pd.read_csv(uploaded, sep="\t") | |
| except Exception: | |
| df = pd.read_csv(uploaded) | |
| st.success(f"✅ Loaded file with {len(df)} rows and {len(df.columns)} columns") | |
| # --- Clean column names --- | |
| df.columns = [str(c).strip() for c in df.columns] | |
| # --- Ensure Sample column --- | |
| if not any(c.lower() == "sample" for c in df.columns): | |
| df.insert(0, "Sample", np.arange(1, len(df) + 1)) | |
| st.info("`Sample` column missing — automatically generated 1..N.") | |
| # --- Detect & numerically sort Position columns --- | |
| position_cols = [c for c in df.columns if re.match(r"(?i)^position\s*\d+", c)] | |
| if not position_cols: | |
| non_pos_cols = {"sample", "total edited", 'volume per "1"', "volume per 1"} | |
| candidate_cols = [c for c in df.columns if c.lower() not in non_pos_cols] | |
| position_cols = candidate_cols | |
| st.info(f"Position columns inferred automatically: {len(position_cols)} detected.") | |
| def pos_key(col_name: str): | |
| m = re.search(r"(\d+)", col_name) | |
| return int(m.group(1)) if m else 10**9 | |
| position_cols = sorted(position_cols, key=pos_key) | |
| # Normalize Position columns to numeric {0,1} | |
| df[position_cols] = df[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0).astype(int) | |
| # --- Ensure Total edited --- | |
| if "Total edited" not in df.columns: | |
| df["Total edited"] = df[position_cols].sum(axis=1).astype(int) | |
| st.info("`Total edited` column missing — calculated automatically as sum of 1s per row.") | |
| # --- User setting for Volume per "1" calculation --- | |
| st.markdown("#### ⚙️ Volume Calculation Settings") | |
| default_total_vol = st.number_input( | |
| "Desired total volume per sample (µL)", | |
| min_value=1.0, max_value=10000.0, value=64.0, step=1.0, | |
| help="Used to compute Volume per '1' as (Desired total volume / Total edited) when not provided." | |
| ) | |
| vol_candidates = [c for c in df.columns if "volume per" in c.lower()] | |
| if not vol_candidates: | |
| df['Volume per "1"'] = default_total_vol / df["Total edited"].replace(0, np.nan) | |
| df['Volume per "1"'] = df['Volume per "1"'].fillna(0) # rows with 0 edits → 0 µL | |
| st.info(f'`Volume per "1"` column missing — calculated automatically as {default_total_vol:.0f} µL / Total edited.') | |
| volume_col = 'Volume per "1"' | |
| else: | |
| volume_col = vol_candidates[0] | |
| # Safety: per-transfer must not exceed per-well cap | |
| if df[volume_col].max() > max_per_well_ul: | |
| st.error( | |
| f"❌ At least one row has `Volume per \"1\"` greater than the per-well cap ({max_per_well_ul} µL). " | |
| "Increase the cap or reduce per-transfer volume." | |
| ) | |
| st.stop() | |
| # --- Compute total demand per input --- | |
| vol_per_one_series = pd.to_numeric(df[volume_col], errors="coerce").fillna(0.0) | |
| total_volume_per_input = [float(vol_per_one_series[df[pos] == 1].sum()) for pos in position_cols] | |
| wells_needed_per_input = [int(ceil(tv / max_per_well_ul)) if tv > 0 else 0 for tv in total_volume_per_input] | |
| num_inputs = len(position_cols) | |
| max_wells_per_source = max(wells_needed_per_input) if wells_needed_per_input else 0 | |
| st.markdown("### 👀 Preview: Suggested Uniform Layout") | |
| if max_wells_per_source == 0: | |
| st.info("No edits detected — nothing to allocate.") | |
| st.stop() | |
| st.write( | |
| f"💡 Suggested layout: **{max_wells_per_source} consecutive wells per input** " | |
| f"(cap {max_per_well_ul:.0f} µL/well)." | |
| ) | |
| # Total wells and plates needed | |
| total_wells_needed_uniform = num_inputs * max_wells_per_source | |
| plates_needed = int(ceil(total_wells_needed_uniform / 96)) or 1 | |
| # ✅ Correct, robust well ordering for layout | |
| global_wells = sorted( | |
| build_global_wells_list(plates_needed), | |
| key=lambda x: ( | |
| x[0], # plate | |
| ROWS_96.index(parse_well_name(x[1])[0]), # row index | |
| parse_well_name(x[1])[1] # column number | |
| ) | |
| ) | |
| global_wells = global_wells[:total_wells_needed_uniform] | |
| # Assign uniform blocks to each input | |
| assigned_wells_map, well_to_input, preview_rows = {}, {}, [] | |
| for i in range(1, num_inputs + 1): | |
| start, end = (i - 1) * max_wells_per_source, i * max_wells_per_source | |
| block = global_wells[start:end] | |
| assigned_wells_map[i] = block | |
| for j, (p, w) in enumerate(block, start=1): | |
| well_to_input[(p, w)] = (i, j) | |
| block_str = ", ".join([f"P{p}:{w}" for (p, w) in block]) | |
| preview_rows.append({ | |
| "Input (Position #)": i, | |
| "Total demand (µL)": round(total_volume_per_input[i-1], 2), | |
| "Wells needed (actual)": wells_needed_per_input[i-1], | |
| "Allocated (uniform)": max_wells_per_source, | |
| "Assigned wells": block_str | |
| }) | |
| preview_df = pd.DataFrame(preview_rows) | |
| st.dataframe(preview_df, use_container_width=True, height=300) | |
| st.markdown("#### Plate Map (hover cells for details)") | |
| plate_html = render_plate_map_html(plates_needed, well_to_input, max_wells_per_source, num_inputs) | |
| st.markdown(plate_html, unsafe_allow_html=True) | |
| # --- Generate Commands --- | |
| st.markdown("### ✅ Generate Pipetting Commands") | |
| if st.button("Generate using this layout"): | |
| # Track per-input per-well usage (µL) | |
| per_input_well_cum = {i: [0.0] * max_wells_per_source for i in range(1, num_inputs + 1)} | |
| commands, source_volume_totals = [], {} | |
| for _, row in df.iterrows(): | |
| sample_id = int(row["Sample"]) | |
| vol_per_one = float(row[volume_col]) | |
| if vol_per_one <= 0: | |
| continue | |
| dest_plate, dest_well = sample_index_to_plate_and_well(sample_id) | |
| tool = pick_tool(vol_per_one) | |
| for pos_idx, col in enumerate(position_cols, start=1): | |
| if int(row[col]) != 1: | |
| continue | |
| wells_for_input = assigned_wells_map[pos_idx] | |
| cum_list = per_input_well_cum[pos_idx] | |
| chosen = None | |
| for j, ((src_plate, src_well), current_vol) in enumerate(zip(wells_for_input, cum_list)): | |
| if current_vol + vol_per_one <= max_per_well_ul: | |
| chosen = (j, src_plate, src_well) | |
| break | |
| if chosen is None: | |
| st.error( | |
| f"Allocation exhausted for Input {pos_idx} while creating commands. " | |
| "Increase the max volume per well or review per-transfer volume." | |
| ) | |
| st.stop() | |
| j, src_plate, src_well = chosen | |
| cum_list[j] += vol_per_one | |
| per_input_well_cum[pos_idx] = cum_list | |
| source_volume_totals[(src_plate, src_well)] = source_volume_totals.get((src_plate, src_well), 0.0) + vol_per_one | |
| commands.append({ | |
| "Input #": pos_idx, | |
| "Source plate": src_plate, | |
| "Source well": src_well, | |
| "Destination plate": dest_plate, | |
| "Destination well": dest_well, | |
| "Volume": round(vol_per_one, 2), | |
| "Tool": tool | |
| }) | |
| commands_df = pd.DataFrame(commands) | |
| # ✅ Add helper sort columns to ensure Source/Destination wells sort A1→A12, B1→B12, ... | |
| def row_idx_from_well(w): return ROWS_96.index(parse_well_name(w)[0]) | |
| def col_num_from_well(w): return parse_well_name(w)[1] | |
| commands_df["Src_row_idx"] = commands_df["Source well"].apply(row_idx_from_well) | |
| commands_df["Src_col_num"] = commands_df["Source well"].apply(col_num_from_well) | |
| commands_df["Dst_row_idx"] = commands_df["Destination well"].apply(row_idx_from_well) | |
| commands_df["Dst_col_num"] = commands_df["Destination well"].apply(col_num_from_well) | |
| commands_df = commands_df.sort_values( | |
| by=["Input #", "Source plate", "Src_row_idx", "Src_col_num", | |
| "Destination plate", "Dst_row_idx", "Dst_col_num"], | |
| kind="stable" | |
| ) | |
| # Drop helper columns & order final columns | |
| commands_df = commands_df[[ | |
| "Input #", "Source plate", "Source well", | |
| "Destination plate", "Destination well", "Volume", "Tool" | |
| ]] | |
| st.success(f"✅ Generated {len(commands_df)} commands across {num_inputs} inputs.") | |
| # ✅ Source summary numeric sort by plate → row → col | |
| summary_rows = [] | |
| for i in range(1, num_inputs + 1): | |
| for (p, w), used in zip(assigned_wells_map[i], per_input_well_cum[i]): | |
| total = source_volume_totals.get((p, w), 0.0) | |
| summary_rows.append({ | |
| "Source": i, "Source plate": p, "Source well": w, | |
| "Total volume taken (µL)": round(total, 2), | |
| "Allocated capacity (µL)": round(max_per_well_ul, 2) | |
| }) | |
| summary_df = pd.DataFrame(summary_rows) | |
| summary_df["Src_row_idx"] = summary_df["Source well"].apply(row_idx_from_well) | |
| summary_df["Src_col_num"] = summary_df["Source well"].apply(col_num_from_well) | |
| summary_df = summary_df.sort_values( | |
| by=["Source", "Source plate", "Src_row_idx", "Src_col_num"], | |
| kind="stable" | |
| )[ | |
| ["Source", "Source plate", "Source well", "Total volume taken (µL)", "Allocated capacity (µL)"] | |
| ] | |
| # Display results | |
| st.markdown("### 💧 Pipetting Commands") | |
| st.dataframe(commands_df, use_container_width=True, height=400) | |
| st.download_button("⬇️ Download Commands CSV", commands_df.to_csv(index=False), "pipetting_commands.csv", mime="text/csv") | |
| st.markdown("### 📊 Source Volume Summary") | |
| st.dataframe(summary_df, use_container_width=True, height=400) | |
| st.download_button("⬇️ Download Source Summary CSV", summary_df.to_csv(index=False), "source_volume_summary.csv", mime="text/csv") | |
| except Exception as e: | |
| st.error(f"❌ Error processing file: {e}") | |
| else: | |
| st.info("👆 Upload an Excel/CSV/TXT file to start.") | |