bitconverter3 / app.py
wenjun99's picture
Update app.py
4be5ea2 verified
import streamlit as st
import pandas as pd
import io
import re
import numpy as np
import openpyxl
# =========================
# Streamlit App Setup
# =========================
st.set_page_config(page_title="ASCII ↔ Binary Converter", layout="wide")
st.title("ASCII ↔ Binary Converter")
# =========================
# Voyager ASCII 6-bit Table
# =========================
voyager_table = {
i: ch for i, ch in enumerate([
' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I',
'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S',
'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2',
'3', '4', '5', '6', '7', '8', '9', '.', ',', '(',
')','+', '-', '*', '/', '=', '$', '!', ':', '%',
'"', '#', '@', "'", '?', '&'
])
}
reverse_voyager_table = {v: k for k, v in voyager_table.items()}
# =========================
# Helper Functions
# =========================
def string_to_binary_labels(s: str) -> list[int]:
bits = []
for char in s:
val = reverse_voyager_table.get(char.upper(), 0)
char_bits = [(val >> bit) & 1 for bit in range(5, -1, -1)]
bits.extend(char_bits)
return bits
def binary_labels_to_string(bits: list[int]) -> str:
chars = []
for i in range(0, len(bits), 6):
chunk = bits[i:i+6]
if len(chunk) < 6:
chunk += [0] * (6 - len(chunk))
val = sum(b << (5 - j) for j, b in enumerate(chunk))
chars.append(voyager_table.get(val, '?'))
return ''.join(chars)
# =========================
# Tabs
# =========================
tab1, tab2, tab3 = st.tabs(["Text → Binary", "Binary → Text", "Robot Script"])
# --------------------------------------------------
# TAB 1: Text → Binary
# --------------------------------------------------
with tab1:
st.markdown("""
Convert any text into binary labels using the **Voyager 6-bit ASCII table**.
You can control how many positions (columns) are grouped per row.
""")
st.subheader("Step 1 – Input Text")
user_input = st.text_input("Enter your text:", value="DNA", key="input_text")
col1, col2 = st.columns([2, 1])
with col1:
group_size = st.slider("Select number of positions per row:", min_value=12, max_value=32, value=25)
with col2:
custom_cols = st.number_input("Or enter custom number:", min_value=1, max_value=128, value=group_size)
if custom_cols != group_size:
group_size = custom_cols
if user_input:
binary_labels = string_to_binary_labels(user_input)
binary_concat = ''.join(map(str, binary_labels))
# Step 2: Binary Labels per Character
st.markdown("### Step 2 – Binary Labels per Character")
st.caption("Scroll to view all characters")
# Scrollable block
grouped_bits = [binary_labels[i:i+6] for i in range(0, len(binary_labels), 6)]
scroll_html = "<div style='max-height: 300px; overflow-y: auto; font-family: monospace; padding: 6px; border: 1px solid #ccc;'>"
for i, bits in enumerate(grouped_bits):
ch = user_input[i] if i < len(user_input) else "?"
scroll_html += f"<div>'{ch}' → {bits}</div>"
scroll_html += "</div>"
st.markdown(scroll_html, unsafe_allow_html=True)
# Download full concatenated binary text
st.download_button(
"⬇️ Download Full Binary (.txt)",
data=binary_concat,
file_name="binary_full.txt",
mime="text/plain",
key="download_binary_txt"
)
# Step 3: Grouped Binary Matrix
st.markdown("### Step 3 – Grouped Binary Matrix")
groups = []
for i in range(0, len(binary_labels), group_size):
group = binary_labels[i:i+group_size]
if len(group) < group_size:
group += [0] * (group_size - len(group))
groups.append(group)
columns = [f"Position {i+1}" for i in range(group_size)]
df = pd.DataFrame(groups, columns=columns)
st.dataframe(df, use_container_width=True)
st.download_button(
"⬇️ Download as CSV",
df.to_csv(index=False),
file_name=f"binary_labels_{group_size}_positions.csv",
mime="text/csv",
key="download_binary_csv"
)
else:
st.info("👆 Enter text above to see binary labels.")
# --------------------------------------------------
# TAB 2: Binary → Text
# --------------------------------------------------
with tab2:
st.markdown("""
Convert binary data back into readable text.
Upload either:
- `.csv` file with 0/1 values (any number of columns/rows)
- `.xlsx` Excel file
- `.txt` file containing a concatenated binary string (e.g. `010101...`)
""")
uploaded = st.file_uploader("Upload your file (.csv, .xlsx, or .txt):", type=["csv", "xlsx", "txt"])
if uploaded is not None:
try:
if uploaded.name.endswith(".csv"):
df = pd.read_csv(uploaded)
bits = df.values.flatten().astype(int).tolist()
elif uploaded.name.endswith(".xlsx"):
df = pd.read_excel(uploaded)
bits = df.values.flatten().astype(int).tolist()
elif uploaded.name.endswith(".txt"):
content = uploaded.read().decode().strip()
bits = [int(b) for b in content if b in ['0', '1']]
else:
bits = []
if not bits:
st.warning("No binary data detected.")
else:
recovered_text = binary_labels_to_string(bits)
st.success("✅ Conversion complete!")
st.markdown("**Recovered text:**")
st.text_area("Output", recovered_text, height=150)
st.download_button(
"⬇️ Download Recovered Text (.txt)",
data=recovered_text,
file_name="recovered_text.txt",
mime="text/plain",
key="download_recovered"
)
except Exception as e:
st.error(f"Error reading or converting file: {e}")
else:
st.info("👆 Upload a file to start the reverse conversion.")
# --------------------------------------------------
# TAB 3: Pipetting Command Generator
# --------------------------------------------------
with tab3:
import numpy as np
import pandas as pd
import re
from math import ceil
st.header("🧪 Pipetting Command Generator")
st.markdown("""
Upload your sample file (Excel, CSV, or TXT) containing binary mutation data.
The app will:
- Auto-detect or create `Sample`, `Position#`, `Total edited`, and `Volume per "1"` columns
- Let you set the **Desired total volume per sample (µL)** used to compute `Volume per "1"`
- Calculate total demand per input and suggest a **uniform layout** (same # consecutive wells per input)
- **Preview** the layout on a plate map (with tooltips)
- After confirmation, generate pipetting commands and a source volume summary
""")
uploaded = st.file_uploader("📤 Upload data file", type=["xlsx", "csv", "txt"])
max_per_well_ul = st.number_input(
"Maximum volume per source well (µL)",
min_value=10.0, max_value=2000.0, value=160.0, step=10.0
)
# ---------- Helpers (plate geometry, parsing, viz) ----------
ROWS_96 = ["A", "B", "C", "D", "E", "F", "G", "H"]
COLS_96 = list(range(1, 13))
def well_name(row_letter, col_number):
return f"{row_letter}{col_number}"
def enumerate_plate_wells():
"""Yield wells A1..A12, B1..B12, ..., H12 for a single plate."""
for r in ROWS_96:
for c in COLS_96:
yield f"{r}{c}"
def parse_well_name(well: str):
"""Split 'A1'/'H12' → (row_letter, col_num). Robust to stray spaces."""
m = re.match(r"([A-Ha-h])\s*([0-9]+)", str(well).strip())
if not m:
return ("A", 0)
return (m.group(1).upper(), int(m.group(2)))
def sample_index_to_plate_and_well(sample_idx: int):
"""Destination mapping: 96-well plates in reading order, extends to multiple plates."""
plate_num = ((sample_idx - 1) // 96) + 1
within_plate = (sample_idx - 1) % 96
row_idx = within_plate // 12
col_idx = within_plate % 12
return plate_num, well_name(ROWS_96[row_idx], COLS_96[col_idx])
def build_global_wells_list(n_plates: int):
out = []
for p in range(1, n_plates + 1):
for w in enumerate_plate_wells():
out.append((p, w))
return out
def pick_tool(volume_ul: float) -> str:
return "TS_10" if volume_ul <= 10.0 else "TS_50"
# Color palette (cycled if many inputs)
PALETTE = [
"#4F46E5", "#22C55E", "#F59E0B", "#EF4444", "#06B6D4", "#A855F7", "#84CC16", "#F97316",
"#0EA5E9", "#E11D48", "#10B981", "#7C3AED", "#15803D", "#EA580C", "#2563EB", "#DC2626"
]
def render_plate_map_html(plates_used, well_to_input, max_wells_per_source, inputs_count):
"""Fancy HTML plate grids with tooltips."""
legend_spans = []
for i in range(1, inputs_count + 1):
color = PALETTE[(i-1) % len(PALETTE)]
legend_spans.append(
f"<span style='display:inline-block;margin-right:12px'>"
f"<span style='display:inline-block;width:12px;height:12px;background:{color};border:1px solid #333;margin-right:6px;vertical-align:middle'></span>"
f"Input {i}</span>"
)
legend_html = "<div style='margin:8px 0 16px 0'>" + "".join(legend_spans) + "</div>"
css = """
<style>
.plate { margin: 10px 0 24px 0; }
.plate-title { font-weight: 600; margin: 4px 0 8px 0; }
.grid { display: grid; grid-template-columns: 32px repeat(12, 38px); grid-auto-rows: 32px; gap: 4px; }
.cell { width: 38px; height: 32px; border: 1px solid #DDD; display:flex; align-items:center; justify-content:center; font-size:12px; background:#FAFAFA; position:relative; }
.head { font-weight:600; background:#F3F4F6; }
.cell[data-color] { color:#111; }
.cell .tip { visibility:hidden; opacity:0; transition:opacity 0.15s ease; position:absolute; bottom:100%; transform:translateY(-6px); left:50%; transform:translate(-50%, -6px); background:#111; color:#fff; padding:4px 6px; font-size:11px; border-radius:4px; white-space:nowrap; pointer-events:none; }
.cell:hover .tip { visibility:visible; opacity:0.95; }
</style>
"""
body = [css, legend_html]
for p in range(1, plates_used + 1):
body.append(f"<div class='plate'><div class='plate-title'>Plate {p}</div>")
body.append("<div class='grid'>")
body.append("<div class='cell head'></div>")
for c in COLS_96:
body.append(f"<div class='cell head'>{c}</div>")
for r in ROWS_96:
body.append(f"<div class='cell head'>{r}</div>")
for c in COLS_96:
well = f"{r}{c}"
key = (p, well)
if key in well_to_input:
input_idx, within_idx = well_to_input[key]
color = PALETTE[(input_idx-1) % len(PALETTE)]
tip = f"Input {input_idx} • P{p}:{well} • Block well {within_idx}/{max_wells_per_source}"
cell_html = (
f"<div class='cell' data-color style='background:{color};border-color:#555' title='{tip}'>"
f"<span class='tip'>{tip}</span>"
"</div>"
)
else:
cell_html = "<div class='cell'></div>"
body.append(cell_html)
body.append("</div></div>")
return "".join(body)
# ---------- Main flow ----------
if uploaded is not None:
try:
# --- Load file ---
if uploaded.name.endswith(".xlsx"):
df = pd.read_excel(uploaded)
elif uploaded.name.endswith(".csv"):
df = pd.read_csv(uploaded)
else: # TXT (tab-delimited try, else CSV)
try:
df = pd.read_csv(uploaded, sep="\t")
except Exception:
df = pd.read_csv(uploaded)
st.success(f"✅ Loaded file with {len(df)} rows and {len(df.columns)} columns")
# --- Clean column names ---
df.columns = [str(c).strip() for c in df.columns]
# --- Ensure Sample column ---
if not any(c.lower() == "sample" for c in df.columns):
df.insert(0, "Sample", np.arange(1, len(df) + 1))
st.info("`Sample` column missing — automatically generated 1..N.")
# --- Detect & numerically sort Position columns ---
position_cols = [c for c in df.columns if re.match(r"(?i)^position\s*\d+", c)]
if not position_cols:
non_pos_cols = {"sample", "total edited", 'volume per "1"', "volume per 1"}
candidate_cols = [c for c in df.columns if c.lower() not in non_pos_cols]
position_cols = candidate_cols
st.info(f"Position columns inferred automatically: {len(position_cols)} detected.")
def pos_key(col_name: str):
m = re.search(r"(\d+)", col_name)
return int(m.group(1)) if m else 10**9
position_cols = sorted(position_cols, key=pos_key)
# Normalize Position columns to numeric {0,1}
df[position_cols] = df[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0).astype(int)
# --- Ensure Total edited ---
if "Total edited" not in df.columns:
df["Total edited"] = df[position_cols].sum(axis=1).astype(int)
st.info("`Total edited` column missing — calculated automatically as sum of 1s per row.")
# --- User setting for Volume per "1" calculation ---
st.markdown("#### ⚙️ Volume Calculation Settings")
default_total_vol = st.number_input(
"Desired total volume per sample (µL)",
min_value=1.0, max_value=10000.0, value=64.0, step=1.0,
help="Used to compute Volume per '1' as (Desired total volume / Total edited) when not provided."
)
vol_candidates = [c for c in df.columns if "volume per" in c.lower()]
if not vol_candidates:
df['Volume per "1"'] = default_total_vol / df["Total edited"].replace(0, np.nan)
df['Volume per "1"'] = df['Volume per "1"'].fillna(0) # rows with 0 edits → 0 µL
st.info(f'`Volume per "1"` column missing — calculated automatically as {default_total_vol:.0f} µL / Total edited.')
volume_col = 'Volume per "1"'
else:
volume_col = vol_candidates[0]
# Safety: per-transfer must not exceed per-well cap
if df[volume_col].max() > max_per_well_ul:
st.error(
f"❌ At least one row has `Volume per \"1\"` greater than the per-well cap ({max_per_well_ul} µL). "
"Increase the cap or reduce per-transfer volume."
)
st.stop()
# --- Compute total demand per input ---
vol_per_one_series = pd.to_numeric(df[volume_col], errors="coerce").fillna(0.0)
total_volume_per_input = [float(vol_per_one_series[df[pos] == 1].sum()) for pos in position_cols]
wells_needed_per_input = [int(ceil(tv / max_per_well_ul)) if tv > 0 else 0 for tv in total_volume_per_input]
num_inputs = len(position_cols)
max_wells_per_source = max(wells_needed_per_input) if wells_needed_per_input else 0
st.markdown("### 👀 Preview: Suggested Uniform Layout")
if max_wells_per_source == 0:
st.info("No edits detected — nothing to allocate.")
st.stop()
st.write(
f"💡 Suggested layout: **{max_wells_per_source} consecutive wells per input** "
f"(cap {max_per_well_ul:.0f} µL/well)."
)
# Total wells and plates needed
total_wells_needed_uniform = num_inputs * max_wells_per_source
plates_needed = int(ceil(total_wells_needed_uniform / 96)) or 1
# ✅ Correct, robust well ordering for layout
global_wells = sorted(
build_global_wells_list(plates_needed),
key=lambda x: (
x[0], # plate
ROWS_96.index(parse_well_name(x[1])[0]), # row index
parse_well_name(x[1])[1] # column number
)
)
global_wells = global_wells[:total_wells_needed_uniform]
# Assign uniform blocks to each input
assigned_wells_map, well_to_input, preview_rows = {}, {}, []
for i in range(1, num_inputs + 1):
start, end = (i - 1) * max_wells_per_source, i * max_wells_per_source
block = global_wells[start:end]
assigned_wells_map[i] = block
for j, (p, w) in enumerate(block, start=1):
well_to_input[(p, w)] = (i, j)
block_str = ", ".join([f"P{p}:{w}" for (p, w) in block])
preview_rows.append({
"Input (Position #)": i,
"Total demand (µL)": round(total_volume_per_input[i-1], 2),
"Wells needed (actual)": wells_needed_per_input[i-1],
"Allocated (uniform)": max_wells_per_source,
"Assigned wells": block_str
})
preview_df = pd.DataFrame(preview_rows)
st.dataframe(preview_df, use_container_width=True, height=300)
st.markdown("#### Plate Map (hover cells for details)")
plate_html = render_plate_map_html(plates_needed, well_to_input, max_wells_per_source, num_inputs)
st.markdown(plate_html, unsafe_allow_html=True)
# --- Generate Commands ---
st.markdown("### ✅ Generate Pipetting Commands")
if st.button("Generate using this layout"):
# Track per-input per-well usage (µL)
per_input_well_cum = {i: [0.0] * max_wells_per_source for i in range(1, num_inputs + 1)}
commands, source_volume_totals = [], {}
for _, row in df.iterrows():
sample_id = int(row["Sample"])
vol_per_one = float(row[volume_col])
if vol_per_one <= 0:
continue
dest_plate, dest_well = sample_index_to_plate_and_well(sample_id)
tool = pick_tool(vol_per_one)
for pos_idx, col in enumerate(position_cols, start=1):
if int(row[col]) != 1:
continue
wells_for_input = assigned_wells_map[pos_idx]
cum_list = per_input_well_cum[pos_idx]
chosen = None
for j, ((src_plate, src_well), current_vol) in enumerate(zip(wells_for_input, cum_list)):
if current_vol + vol_per_one <= max_per_well_ul:
chosen = (j, src_plate, src_well)
break
if chosen is None:
st.error(
f"Allocation exhausted for Input {pos_idx} while creating commands. "
"Increase the max volume per well or review per-transfer volume."
)
st.stop()
j, src_plate, src_well = chosen
cum_list[j] += vol_per_one
per_input_well_cum[pos_idx] = cum_list
source_volume_totals[(src_plate, src_well)] = source_volume_totals.get((src_plate, src_well), 0.0) + vol_per_one
commands.append({
"Input #": pos_idx,
"Source plate": src_plate,
"Source well": src_well,
"Destination plate": dest_plate,
"Destination well": dest_well,
"Volume": round(vol_per_one, 2),
"Tool": tool
})
commands_df = pd.DataFrame(commands)
# ✅ Add helper sort columns to ensure Source/Destination wells sort A1→A12, B1→B12, ...
def row_idx_from_well(w): return ROWS_96.index(parse_well_name(w)[0])
def col_num_from_well(w): return parse_well_name(w)[1]
commands_df["Src_row_idx"] = commands_df["Source well"].apply(row_idx_from_well)
commands_df["Src_col_num"] = commands_df["Source well"].apply(col_num_from_well)
commands_df["Dst_row_idx"] = commands_df["Destination well"].apply(row_idx_from_well)
commands_df["Dst_col_num"] = commands_df["Destination well"].apply(col_num_from_well)
commands_df = commands_df.sort_values(
by=["Input #", "Source plate", "Src_row_idx", "Src_col_num",
"Destination plate", "Dst_row_idx", "Dst_col_num"],
kind="stable"
)
# Drop helper columns & order final columns
commands_df = commands_df[[
"Input #", "Source plate", "Source well",
"Destination plate", "Destination well", "Volume", "Tool"
]]
st.success(f"✅ Generated {len(commands_df)} commands across {num_inputs} inputs.")
# ✅ Source summary numeric sort by plate → row → col
summary_rows = []
for i in range(1, num_inputs + 1):
for (p, w), used in zip(assigned_wells_map[i], per_input_well_cum[i]):
total = source_volume_totals.get((p, w), 0.0)
summary_rows.append({
"Source": i, "Source plate": p, "Source well": w,
"Total volume taken (µL)": round(total, 2),
"Allocated capacity (µL)": round(max_per_well_ul, 2)
})
summary_df = pd.DataFrame(summary_rows)
summary_df["Src_row_idx"] = summary_df["Source well"].apply(row_idx_from_well)
summary_df["Src_col_num"] = summary_df["Source well"].apply(col_num_from_well)
summary_df = summary_df.sort_values(
by=["Source", "Source plate", "Src_row_idx", "Src_col_num"],
kind="stable"
)[
["Source", "Source plate", "Source well", "Total volume taken (µL)", "Allocated capacity (µL)"]
]
# Display results
st.markdown("### 💧 Pipetting Commands")
st.dataframe(commands_df, use_container_width=True, height=400)
st.download_button("⬇️ Download Commands CSV", commands_df.to_csv(index=False), "pipetting_commands.csv", mime="text/csv")
st.markdown("### 📊 Source Volume Summary")
st.dataframe(summary_df, use_container_width=True, height=400)
st.download_button("⬇️ Download Source Summary CSV", summary_df.to_csv(index=False), "source_volume_summary.csv", mime="text/csv")
except Exception as e:
st.error(f"❌ Error processing file: {e}")
else:
st.info("👆 Upload an Excel/CSV/TXT file to start.")