Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import math | |
| # === App Title === | |
| st.set_page_config(page_title="Robot Script Generator", layout="wide") | |
| st.title("🧪 Robot Script Generator") | |
| # === Voyager ASCII 6-bit conversion table === | |
| voyager_table = { | |
| i: ch for i, ch in enumerate([ | |
| ' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', | |
| 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', | |
| 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', | |
| '3', '4', '5', '6', '7', '8', '9', '.', ',', '(', | |
| ')', '+', '-', '*', '/', '=', '$', '!', ':', '%', | |
| '"', '#', '@', "'", '?', '&' | |
| ]) | |
| } | |
| reverse_voyager_table = {v: k for k, v in voyager_table.items()} | |
| # === Binary → String conversion === | |
| def binary_labels_to_string(bits: list[int]) -> str: | |
| chars = [] | |
| for i in range(0, len(bits), 6): | |
| chunk = bits[i:i+6] | |
| if len(chunk) < 6: | |
| chunk += [0] * (6 - len(chunk)) | |
| val = sum(b << (5 - j) for j, b in enumerate(chunk)) | |
| chars.append(voyager_table.get(val, '?')) | |
| return ''.join(chars) | |
| # === Well mapping === | |
| def get_well_position(sample_index): | |
| """Convert sample index (1-based) into A1–H12 pattern within its plate""" | |
| row_letter = chr(65 + ((sample_index - 1) % 96) // 12) # 8 rows (A–H) | |
| col_number = ((sample_index - 1) % 12) + 1 # 12 columns | |
| return f"{row_letter}{col_number}" | |
| def get_plate_id(sample_index): | |
| """Return Plate number based on 96 samples per plate""" | |
| plate_number = math.ceil(sample_index / 96) | |
| return f"Plate {plate_number}" | |
| # === Track and replace source if volume exceeded === | |
| def track_and_replace_source(source_list, robot_script, volume_limit=150): | |
| source_volumes = {} | |
| adjusted_sources = [] | |
| for entry in robot_script: | |
| src = entry['Source'] | |
| vol = entry['Volume'] | |
| source_volumes[src] = source_volumes.get(src, 0) + vol | |
| if source_volumes[src] > volume_limit: | |
| row_letter = src[0] | |
| col_number = src[1:] | |
| new_row_letter = chr(ord(row_letter) + 4) | |
| new_src = f"{new_row_letter}{col_number}" | |
| entry['Source'] = new_src | |
| source_volumes[new_src] = source_volumes.get(new_src, 0) + vol | |
| source_volumes[src] -= vol | |
| adjusted_sources.append(entry) | |
| return adjusted_sources, source_volumes | |
| # === Fixed D-source transfers === | |
| def generate_fixed_d_source_instructions_to_all_samples(n_samples, fixed_volume=16, volume_limit=170): | |
| d_source_volumes = {} | |
| d_source_script = [] | |
| current_d_index = 1 | |
| for i in range(n_samples): | |
| dest = get_well_position(i + 1) | |
| plate = get_plate_id(i + 1) | |
| current_d_well = f"D{current_d_index}" | |
| d_source_volumes.setdefault(current_d_well, 0) | |
| if d_source_volumes[current_d_well] + fixed_volume > volume_limit: | |
| current_d_index += 1 | |
| current_d_well = f"D{current_d_index}" | |
| d_source_volumes[current_d_well] = 0 | |
| d_source_volumes[current_d_well] += fixed_volume | |
| tool = 'TS_50' if fixed_volume > 10 else 'TS_10' | |
| d_source_script.append({ | |
| 'Plate': plate, | |
| 'Source': current_d_well, | |
| 'Destination': dest, | |
| 'Volume': fixed_volume, | |
| 'Tool': tool | |
| }) | |
| return d_source_script, d_source_volumes | |
| def generate_source_wells(n): | |
| wells, rows = [], 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | |
| for i in range(n): | |
| row, col = rows[i // 12], (i % 12) + 1 | |
| wells.append(f"{row}{col}") | |
| return wells | |
| # === Main UI === | |
| st.header("Upload Binary Data (0/1)") | |
| binary_file = st.file_uploader("Upload Binary CSV", type=["csv"]) | |
| st.divider() | |
| st.subheader("Optional Metadata") | |
| barcode_id_input = st.text_input("Barcode ID (optional)", value="") | |
| labware_source_input = st.text_input("Labware Source ID", value="1") | |
| labware_dest_input = st.text_input("Labware Destination ID", value="1") | |
| name_input = st.text_input("Name field (optional)", value="") | |
| volume_limit_input = st.number_input("Maximum Volume per Source Well (µL)", value=150, min_value=10, step=10) | |
| # === Load Data === | |
| if binary_file: | |
| df_binary = pd.read_csv(binary_file, header=None) | |
| df_binary.columns = [str(i+1) for i in range(df_binary.shape[1])] | |
| else: | |
| st.info("No file uploaded — manually enter or paste your binary data below.") | |
| st.caption("💡 Tip: You can copy and paste entire datasets (Ctrl+V) directly here — rows and columns will adjust automatically.") | |
| # Ask the user how many columns (if they want to define manually) | |
| n_cols = st.number_input( | |
| "Number of columns (adjust if pasting a dataset)", | |
| min_value=1, value=8, step=1 | |
| ) | |
| # Create an empty DataFrame with that many columns | |
| initial_df = pd.DataFrame(columns=[str(i) for i in range(1, n_cols + 1)]) | |
| # Dynamic editor — allows copy-paste of arbitrary rows | |
| df_binary = st.data_editor( | |
| initial_df, | |
| num_rows="dynamic", | |
| key="manual_input", | |
| use_container_width=True, | |
| ) | |
| # If user pastes a larger dataset (more columns than n_cols) | |
| # detect and rename automatically | |
| if not df_binary.empty: | |
| n_detected = df_binary.shape[1] | |
| df_binary.columns = [str(i+1) for i in range(n_detected)] | |
| if not df_binary.empty: | |
| st.subheader("Binary Matrix") | |
| st.dataframe(df_binary.style.applymap(lambda v: "background-color: lightgreen" if v == 1 else "background-color: lightcoral")) | |
| st.download_button("⬇️ Download Binary CSV", df_binary.to_csv(index=False), "binary_matrix.csv") | |
| # Decode to string | |
| decoded = binary_labels_to_string(df_binary.values.flatten().astype(int).tolist()) | |
| st.subheader("Decoded String Output") | |
| st.code(decoded) | |
| st.download_button("⬇️ Download Decoded String", decoded, "decoded_string.txt") | |
| # === Generate Robot Script === | |
| st.divider() | |
| st.subheader("Generated Robot Script") | |
| df_robot = df_binary.copy() | |
| df_robot.insert(0, 'Sample', range(1, len(df_robot) + 1)) | |
| df_robot['# donors'] = df_robot.iloc[:, 1:].astype(int).sum(axis=1) | |
| df_robot['volume donors (µL)'] = df_robot['# donors'].apply( | |
| lambda x: 64 / x if x > 0 else 0 | |
| ) | |
| robot_script = [] | |
| source_wells = generate_source_wells(df_robot.shape[1] - 1) | |
| for i, col in enumerate(df_robot.columns[1:]): | |
| for row_idx, sample in df_robot.iterrows(): | |
| if sample['# donors'] == 0: | |
| continue # skip samples with no donors | |
| if int(sample[col]) == 1: | |
| sample_id = int(sample['Sample']) | |
| sample_index = row_idx + 1 | |
| plate = get_plate_id(sample_index) | |
| source = source_wells[i] | |
| dest = get_well_position(sample_index) | |
| vol = round(sample['volume donors (µL)'], 2) | |
| tool = 'TS_50' if vol > 10 else 'TS_10' | |
| robot_script.append({ | |
| 'Plate': plate, # ✅ New Column | |
| 'Source': source, | |
| 'Destination': dest, | |
| 'Volume': vol, | |
| 'Tool': tool | |
| }) | |
| robot_script, source_volumes = track_and_replace_source(source_wells, robot_script, volume_limit=volume_limit_input) | |
| d_script, d_volumes = generate_fixed_d_source_instructions_to_all_samples( | |
| len(df_robot), fixed_volume=16, volume_limit=volume_limit_input | |
| ) | |
| full_script = robot_script + d_script | |
| robot_script_df = pd.DataFrame(full_script) | |
| robot_script_df.insert(0, 'Barcode ID', barcode_id_input) | |
| robot_script_df.insert(1, 'Labware_Source', labware_source_input) | |
| robot_script_df.insert(3, 'Labware_Destination', labware_dest_input) | |
| robot_script_df['Name'] = name_input | |
| robot_script_df = robot_script_df[['Barcode ID', 'Labware_Source', 'Plate', | |
| 'Source', 'Labware_Destination', 'Destination', | |
| 'Volume', 'Tool', 'Name']] | |
| st.dataframe(robot_script_df) | |
| st.download_button("⬇️ Download Robot Script", robot_script_df.to_csv(index=False), "robot_script.csv") | |
| # === Source Volume Summary === | |
| st.divider() | |
| st.subheader("Total Volume Used Per Source") | |
| combined_volumes = {**source_volumes, **d_volumes} | |
| volume_df = pd.DataFrame(list(combined_volumes.items()), columns=['Source', 'Total Volume (µL)']) | |
| st.dataframe(volume_df) | |
| st.download_button("⬇️ Download Volume Summary", volume_df.to_csv(index=False), "source_volumes.csv") | |