import streamlit as st import pandas as pd import io import re import numpy as np import openpyxl # ========================= # Streamlit App Setup # ========================= st.set_page_config(page_title="ASCII ↔ Binary Converter", layout="wide") st.title("ASCII ↔ Binary Converter") # ========================= # Voyager ASCII 6-bit Table # ========================= voyager_table = { i: ch for i, ch in enumerate([ ' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '.', ',', '(', ')','+', '-', '*', '/', '=', '$', '!', ':', '%', '"', '#', '@', "'", '?', '&' ]) } reverse_voyager_table = {v: k for k, v in voyager_table.items()} # ========================= # Helper Functions # ========================= def string_to_binary_labels(s: str) -> list[int]: bits = [] for char in s: val = reverse_voyager_table.get(char.upper(), 0) char_bits = [(val >> bit) & 1 for bit in range(5, -1, -1)] bits.extend(char_bits) return bits def binary_labels_to_string(bits: list[int]) -> str: chars = [] for i in range(0, len(bits), 6): chunk = bits[i:i+6] if len(chunk) < 6: chunk += [0] * (6 - len(chunk)) val = sum(b << (5 - j) for j, b in enumerate(chunk)) chars.append(voyager_table.get(val, '?')) return ''.join(chars) # ========================= # Tabs # ========================= tab1, tab2, tab3 = st.tabs(["Text → Binary", "Binary → Text", "Robot Script"]) # -------------------------------------------------- # TAB 1: Text → Binary # -------------------------------------------------- with tab1: st.markdown(""" Convert any text into binary labels using the **Voyager 6-bit ASCII table**. You can control how many positions (columns) are grouped per row. """) st.subheader("Step 1 – Input Text") user_input = st.text_input("Enter your text:", value="DNA", key="input_text") col1, col2 = st.columns([2, 1]) with col1: group_size = st.slider("Select number of positions per row:", min_value=12, max_value=32, value=25) with col2: custom_cols = st.number_input("Or enter custom number:", min_value=1, max_value=128, value=group_size) if custom_cols != group_size: group_size = custom_cols if user_input: binary_labels = string_to_binary_labels(user_input) binary_concat = ''.join(map(str, binary_labels)) # Step 2: Binary Labels per Character st.markdown("### Step 2 – Binary Labels per Character") st.caption("Scroll to view all characters") # Scrollable block grouped_bits = [binary_labels[i:i+6] for i in range(0, len(binary_labels), 6)] scroll_html = "
" for i, bits in enumerate(grouped_bits): ch = user_input[i] if i < len(user_input) else "?" scroll_html += f"
'{ch}' → {bits}
" scroll_html += "
" st.markdown(scroll_html, unsafe_allow_html=True) # Download full concatenated binary text st.download_button( "⬇️ Download Full Binary (.txt)", data=binary_concat, file_name="binary_full.txt", mime="text/plain", key="download_binary_txt" ) # Step 3: Grouped Binary Matrix st.markdown("### Step 3 – Grouped Binary Matrix") groups = [] for i in range(0, len(binary_labels), group_size): group = binary_labels[i:i+group_size] if len(group) < group_size: group += [0] * (group_size - len(group)) groups.append(group) columns = [f"Position {i+1}" for i in range(group_size)] df = pd.DataFrame(groups, columns=columns) st.dataframe(df, use_container_width=True) st.download_button( "⬇️ Download as CSV", df.to_csv(index=False), file_name=f"binary_labels_{group_size}_positions.csv", mime="text/csv", key="download_binary_csv" ) else: st.info("👆 Enter text above to see binary labels.") # -------------------------------------------------- # TAB 2: Binary → Text # -------------------------------------------------- with tab2: st.markdown(""" Convert binary data back into readable text. Upload either: - `.csv` file with 0/1 values (any number of columns/rows) - `.xlsx` Excel file - `.txt` file containing a concatenated binary string (e.g. `010101...`) """) uploaded = st.file_uploader("Upload your file (.csv, .xlsx, or .txt):", type=["csv", "xlsx", "txt"]) if uploaded is not None: try: if uploaded.name.endswith(".csv"): df = pd.read_csv(uploaded) bits = df.values.flatten().astype(int).tolist() elif uploaded.name.endswith(".xlsx"): df = pd.read_excel(uploaded) bits = df.values.flatten().astype(int).tolist() elif uploaded.name.endswith(".txt"): content = uploaded.read().decode().strip() bits = [int(b) for b in content if b in ['0', '1']] else: bits = [] if not bits: st.warning("No binary data detected.") else: recovered_text = binary_labels_to_string(bits) st.success("✅ Conversion complete!") st.markdown("**Recovered text:**") st.text_area("Output", recovered_text, height=150) st.download_button( "⬇️ Download Recovered Text (.txt)", data=recovered_text, file_name="recovered_text.txt", mime="text/plain", key="download_recovered" ) except Exception as e: st.error(f"Error reading or converting file: {e}") else: st.info("👆 Upload a file to start the reverse conversion.") # -------------------------------------------------- # TAB 3: Pipetting Command Generator # -------------------------------------------------- with tab3: import numpy as np import pandas as pd import re from math import ceil st.header("🧪 Pipetting Command Generator") st.markdown(""" Upload your sample file (Excel, CSV, or TXT) containing binary mutation data. The app will: - Auto-detect or create `Sample`, `Position#`, `Total edited`, and `Volume per "1"` columns - Let you set the **Desired total volume per sample (µL)** used to compute `Volume per "1"` - Calculate total demand per input and suggest a **uniform layout** (same # consecutive wells per input) - **Preview** the layout on a plate map (with tooltips) - After confirmation, generate pipetting commands and a source volume summary """) uploaded = st.file_uploader("📤 Upload data file", type=["xlsx", "csv", "txt"]) max_per_well_ul = st.number_input( "Maximum volume per source well (µL)", min_value=10.0, max_value=2000.0, value=160.0, step=10.0 ) # ---------- Helpers (plate geometry, parsing, viz) ---------- ROWS_96 = ["A", "B", "C", "D", "E", "F", "G", "H"] COLS_96 = list(range(1, 13)) def well_name(row_letter, col_number): return f"{row_letter}{col_number}" def enumerate_plate_wells(): """Yield wells A1..A12, B1..B12, ..., H12 for a single plate.""" for r in ROWS_96: for c in COLS_96: yield f"{r}{c}" def parse_well_name(well: str): """Split 'A1'/'H12' → (row_letter, col_num). Robust to stray spaces.""" m = re.match(r"([A-Ha-h])\s*([0-9]+)", str(well).strip()) if not m: return ("A", 0) return (m.group(1).upper(), int(m.group(2))) def sample_index_to_plate_and_well(sample_idx: int): """Destination mapping: 96-well plates in reading order, extends to multiple plates.""" plate_num = ((sample_idx - 1) // 96) + 1 within_plate = (sample_idx - 1) % 96 row_idx = within_plate // 12 col_idx = within_plate % 12 return plate_num, well_name(ROWS_96[row_idx], COLS_96[col_idx]) def build_global_wells_list(n_plates: int): out = [] for p in range(1, n_plates + 1): for w in enumerate_plate_wells(): out.append((p, w)) return out def pick_tool(volume_ul: float) -> str: return "TS_10" if volume_ul <= 10.0 else "TS_50" # Color palette (cycled if many inputs) PALETTE = [ "#4F46E5", "#22C55E", "#F59E0B", "#EF4444", "#06B6D4", "#A855F7", "#84CC16", "#F97316", "#0EA5E9", "#E11D48", "#10B981", "#7C3AED", "#15803D", "#EA580C", "#2563EB", "#DC2626" ] def render_plate_map_html(plates_used, well_to_input, max_wells_per_source, inputs_count): """Fancy HTML plate grids with tooltips.""" legend_spans = [] for i in range(1, inputs_count + 1): color = PALETTE[(i-1) % len(PALETTE)] legend_spans.append( f"" f"" f"Input {i}" ) legend_html = "
" + "".join(legend_spans) + "
" css = """ """ body = [css, legend_html] for p in range(1, plates_used + 1): body.append(f"
Plate {p}
") body.append("
") body.append("
") for c in COLS_96: body.append(f"
{c}
") for r in ROWS_96: body.append(f"
{r}
") for c in COLS_96: well = f"{r}{c}" key = (p, well) if key in well_to_input: input_idx, within_idx = well_to_input[key] color = PALETTE[(input_idx-1) % len(PALETTE)] tip = f"Input {input_idx} • P{p}:{well} • Block well {within_idx}/{max_wells_per_source}" cell_html = ( f"
" f"{tip}" "
" ) else: cell_html = "
" body.append(cell_html) body.append("
") return "".join(body) # ---------- Main flow ---------- if uploaded is not None: try: # --- Load file --- if uploaded.name.endswith(".xlsx"): df = pd.read_excel(uploaded) elif uploaded.name.endswith(".csv"): df = pd.read_csv(uploaded) else: # TXT (tab-delimited try, else CSV) try: df = pd.read_csv(uploaded, sep="\t") except Exception: df = pd.read_csv(uploaded) st.success(f"✅ Loaded file with {len(df)} rows and {len(df.columns)} columns") # --- Clean column names --- df.columns = [str(c).strip() for c in df.columns] # --- Ensure Sample column --- if not any(c.lower() == "sample" for c in df.columns): df.insert(0, "Sample", np.arange(1, len(df) + 1)) st.info("`Sample` column missing — automatically generated 1..N.") # --- Detect & numerically sort Position columns --- position_cols = [c for c in df.columns if re.match(r"(?i)^position\s*\d+", c)] if not position_cols: non_pos_cols = {"sample", "total edited", 'volume per "1"', "volume per 1"} candidate_cols = [c for c in df.columns if c.lower() not in non_pos_cols] position_cols = candidate_cols st.info(f"Position columns inferred automatically: {len(position_cols)} detected.") def pos_key(col_name: str): m = re.search(r"(\d+)", col_name) return int(m.group(1)) if m else 10**9 position_cols = sorted(position_cols, key=pos_key) # Normalize Position columns to numeric {0,1} df[position_cols] = df[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0).astype(int) # --- Ensure Total edited --- if "Total edited" not in df.columns: df["Total edited"] = df[position_cols].sum(axis=1).astype(int) st.info("`Total edited` column missing — calculated automatically as sum of 1s per row.") # --- User setting for Volume per "1" calculation --- st.markdown("#### ⚙️ Volume Calculation Settings") default_total_vol = st.number_input( "Desired total volume per sample (µL)", min_value=1.0, max_value=10000.0, value=64.0, step=1.0, help="Used to compute Volume per '1' as (Desired total volume / Total edited) when not provided." ) vol_candidates = [c for c in df.columns if "volume per" in c.lower()] if not vol_candidates: df['Volume per "1"'] = default_total_vol / df["Total edited"].replace(0, np.nan) df['Volume per "1"'] = df['Volume per "1"'].fillna(0) # rows with 0 edits → 0 µL st.info(f'`Volume per "1"` column missing — calculated automatically as {default_total_vol:.0f} µL / Total edited.') volume_col = 'Volume per "1"' else: volume_col = vol_candidates[0] # Safety: per-transfer must not exceed per-well cap if df[volume_col].max() > max_per_well_ul: st.error( f"❌ At least one row has `Volume per \"1\"` greater than the per-well cap ({max_per_well_ul} µL). " "Increase the cap or reduce per-transfer volume." ) st.stop() # --- Compute total demand per input --- vol_per_one_series = pd.to_numeric(df[volume_col], errors="coerce").fillna(0.0) total_volume_per_input = [float(vol_per_one_series[df[pos] == 1].sum()) for pos in position_cols] wells_needed_per_input = [int(ceil(tv / max_per_well_ul)) if tv > 0 else 0 for tv in total_volume_per_input] num_inputs = len(position_cols) max_wells_per_source = max(wells_needed_per_input) if wells_needed_per_input else 0 st.markdown("### 👀 Preview: Suggested Uniform Layout") if max_wells_per_source == 0: st.info("No edits detected — nothing to allocate.") st.stop() st.write( f"💡 Suggested layout: **{max_wells_per_source} consecutive wells per input** " f"(cap {max_per_well_ul:.0f} µL/well)." ) # Total wells and plates needed total_wells_needed_uniform = num_inputs * max_wells_per_source plates_needed = int(ceil(total_wells_needed_uniform / 96)) or 1 # ✅ Correct, robust well ordering for layout global_wells = sorted( build_global_wells_list(plates_needed), key=lambda x: ( x[0], # plate ROWS_96.index(parse_well_name(x[1])[0]), # row index parse_well_name(x[1])[1] # column number ) ) global_wells = global_wells[:total_wells_needed_uniform] # Assign uniform blocks to each input assigned_wells_map, well_to_input, preview_rows = {}, {}, [] for i in range(1, num_inputs + 1): start, end = (i - 1) * max_wells_per_source, i * max_wells_per_source block = global_wells[start:end] assigned_wells_map[i] = block for j, (p, w) in enumerate(block, start=1): well_to_input[(p, w)] = (i, j) block_str = ", ".join([f"P{p}:{w}" for (p, w) in block]) preview_rows.append({ "Input (Position #)": i, "Total demand (µL)": round(total_volume_per_input[i-1], 2), "Wells needed (actual)": wells_needed_per_input[i-1], "Allocated (uniform)": max_wells_per_source, "Assigned wells": block_str }) preview_df = pd.DataFrame(preview_rows) st.dataframe(preview_df, use_container_width=True, height=300) st.markdown("#### Plate Map (hover cells for details)") plate_html = render_plate_map_html(plates_needed, well_to_input, max_wells_per_source, num_inputs) st.markdown(plate_html, unsafe_allow_html=True) # --- Generate Commands --- st.markdown("### ✅ Generate Pipetting Commands") if st.button("Generate using this layout"): # Track per-input per-well usage (µL) per_input_well_cum = {i: [0.0] * max_wells_per_source for i in range(1, num_inputs + 1)} commands, source_volume_totals = [], {} for _, row in df.iterrows(): sample_id = int(row["Sample"]) vol_per_one = float(row[volume_col]) if vol_per_one <= 0: continue dest_plate, dest_well = sample_index_to_plate_and_well(sample_id) tool = pick_tool(vol_per_one) for pos_idx, col in enumerate(position_cols, start=1): if int(row[col]) != 1: continue wells_for_input = assigned_wells_map[pos_idx] cum_list = per_input_well_cum[pos_idx] chosen = None for j, ((src_plate, src_well), current_vol) in enumerate(zip(wells_for_input, cum_list)): if current_vol + vol_per_one <= max_per_well_ul: chosen = (j, src_plate, src_well) break if chosen is None: st.error( f"Allocation exhausted for Input {pos_idx} while creating commands. " "Increase the max volume per well or review per-transfer volume." ) st.stop() j, src_plate, src_well = chosen cum_list[j] += vol_per_one per_input_well_cum[pos_idx] = cum_list source_volume_totals[(src_plate, src_well)] = source_volume_totals.get((src_plate, src_well), 0.0) + vol_per_one commands.append({ "Input #": pos_idx, "Source plate": src_plate, "Source well": src_well, "Destination plate": dest_plate, "Destination well": dest_well, "Volume": round(vol_per_one, 2), "Tool": tool }) commands_df = pd.DataFrame(commands) # ✅ Add helper sort columns to ensure Source/Destination wells sort A1→A12, B1→B12, ... def row_idx_from_well(w): return ROWS_96.index(parse_well_name(w)[0]) def col_num_from_well(w): return parse_well_name(w)[1] commands_df["Src_row_idx"] = commands_df["Source well"].apply(row_idx_from_well) commands_df["Src_col_num"] = commands_df["Source well"].apply(col_num_from_well) commands_df["Dst_row_idx"] = commands_df["Destination well"].apply(row_idx_from_well) commands_df["Dst_col_num"] = commands_df["Destination well"].apply(col_num_from_well) commands_df = commands_df.sort_values( by=["Input #", "Source plate", "Src_row_idx", "Src_col_num", "Destination plate", "Dst_row_idx", "Dst_col_num"], kind="stable" ) # Drop helper columns & order final columns commands_df = commands_df[[ "Input #", "Source plate", "Source well", "Destination plate", "Destination well", "Volume", "Tool" ]] st.success(f"✅ Generated {len(commands_df)} commands across {num_inputs} inputs.") # ✅ Source summary numeric sort by plate → row → col summary_rows = [] for i in range(1, num_inputs + 1): for (p, w), used in zip(assigned_wells_map[i], per_input_well_cum[i]): total = source_volume_totals.get((p, w), 0.0) summary_rows.append({ "Source": i, "Source plate": p, "Source well": w, "Total volume taken (µL)": round(total, 2), "Allocated capacity (µL)": round(max_per_well_ul, 2) }) summary_df = pd.DataFrame(summary_rows) summary_df["Src_row_idx"] = summary_df["Source well"].apply(row_idx_from_well) summary_df["Src_col_num"] = summary_df["Source well"].apply(col_num_from_well) summary_df = summary_df.sort_values( by=["Source", "Source plate", "Src_row_idx", "Src_col_num"], kind="stable" )[ ["Source", "Source plate", "Source well", "Total volume taken (µL)", "Allocated capacity (µL)"] ] # Display results st.markdown("### 💧 Pipetting Commands") st.dataframe(commands_df, use_container_width=True, height=400) st.download_button("⬇️ Download Commands CSV", commands_df.to_csv(index=False), "pipetting_commands.csv", mime="text/csv") st.markdown("### 📊 Source Volume Summary") st.dataframe(summary_df, use_container_width=True, height=400) st.download_button("⬇️ Download Source Summary CSV", summary_df.to_csv(index=False), "source_volume_summary.csv", mime="text/csv") except Exception as e: st.error(f"❌ Error processing file: {e}") else: st.info("👆 Upload an Excel/CSV/TXT file to start.")