Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import numpy as np | |
| import pandas as pd | |
| # Mutation site headers removed 3614, | |
| mutation_site_headers_actual = [ | |
| 3244, 3297, 3350, 3399, 3455, 3509, 3562, | |
| 3665, 3720, 3773, 3824, 3879, 3933, 3985, 4039, | |
| 4089, 4145, 4190, 4245, 4298, 4349, 4402, 4455, | |
| 4510, 4561, 4615, 4668, 4720, 4773, 4828, 4882 | |
| ] | |
| # Thresholds for each mutation site removed 3614: 0.091557752, | |
| thresholds_actual = pd.Series({ | |
| 3244: 1.096910677, 3297: 0.923658795, 3350: 0.668939037, 3399: 0.914305214, | |
| 3455: 1.297392984, 3509: 1.812636208, 3562: 1.185047484, | |
| 3665: 0.298007308, 3720: 0.58857544, 3773: 0.882561082, 3824: 1.149082617, | |
| 3879: 0.816050702, 3933: 2.936517653, 3985: 1.597166791, 4039: 0.962108082, | |
| 4089: 1.479783497, 4145: 0.305853225, 4190: 1.311869541, 4245: 1.707556905, | |
| 4298: 0.875013076, 4349: 1.227704526, 4402: 0.593206446, 4455: 1.179633137, | |
| 4510: 1.272477799, 4561: 1.293841573, 4615: 1.16821885, 4668: 1.40306, | |
| 4720: 0.706530878, 4773: 1.483114072, 4828: 0.954939873, 4882: 1.47524328 | |
| }) | |
| # Mutation site headers reordered: 4402 to 3244, 4882 to 4455 | |
| mutation_site_headers = [ | |
| 4402, 4349, 4298, 4245, 4190, 4145, 4089, 4039, | |
| 3985, 3933, 3879, 3824, 3773, 3720, 3665, | |
| 3562, 3509, 3455, 3399, 3350, 3297, 3244, # 1–23 | |
| 4882, 4828, 4773, 4720, 4668, 4615, 4561, 4510, 4455 # 24–32 | |
| ] | |
| # Thresholds reordered accordingly | |
| thresholds = pd.Series({h: thresholds_actual[h] for h in mutation_site_headers}) | |
| # === Utility functions === | |
| # Voyager ASCII 6-bit conversion table | |
| voyager_table = { | |
| i: ch for i, ch in enumerate([ | |
| ' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', | |
| 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', | |
| 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', | |
| '3', '4', '5', '6', '7', '8', '9', '.', ',', '(', | |
| ')','+', '-', '*', '/', '=', '$', '!', ':', '%', | |
| '"', '#', '@', "'", '?', '&' | |
| ]) | |
| } | |
| reverse_voyager_table = {v: k for k, v in voyager_table.items()} | |
| def string_to_binary_labels(s: str) -> list[int]: | |
| bits = [] | |
| for char in s: | |
| val = reverse_voyager_table.get(char.upper(), 0) | |
| char_bits = [(val >> bit) & 1 for bit in range(5, -1, -1)] | |
| bits.extend(char_bits) | |
| return bits | |
| def binary_labels_to_string(bits: list[int]) -> str: | |
| chars = [] | |
| for i in range(0, len(bits), 6): | |
| chunk = bits[i:i+6] | |
| if len(chunk) < 6: | |
| chunk += [0] * (6 - len(chunk)) | |
| val = sum(b << (5 - j) for j, b in enumerate(chunk)) | |
| chars.append(voyager_table.get(val, '?')) | |
| return ''.join(chars) | |
| # === Streamlit App === | |
| st.title("ASCII & Binary Label Converter") | |
| tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs(["Text to Binary Labels (31)", "EF → Binary → String (31)", "Text to Binary Labels (32)", "EF → Binary (32)", "Binary → String", "Robot Script Generator"]) | |
| # Tab 1: Text to Binary | |
| with tab1: | |
| user_input = st.text_input("Enter text", value="DNA", key="input_text_31") | |
| if user_input: | |
| ascii_codes = [reverse_voyager_table.get(c.upper(), 0) for c in user_input] | |
| binary_labels = string_to_binary_labels(user_input) | |
| # st.subheader("Voyager ASCII Codes") | |
| # st.write(ascii_codes) | |
| st.subheader("Binary Labels per Character") | |
| grouped = [binary_labels[i:i+6] for i in range(0, len(binary_labels), 6)] | |
| for i, bits in enumerate(grouped): | |
| st.write(f"'{user_input[i]}' → {bits}") | |
| st.subheader("Binary Labels (31-bit groups)") | |
| groups = [] | |
| for i in range(0, len(binary_labels), 31): | |
| group = binary_labels[i:i+31] | |
| group += [0] * (31 - len(group)) | |
| groups.append(group + [sum(group)]) | |
| df = pd.DataFrame(groups, columns=[str(h) for h in mutation_site_headers] + ["Edited Sites"]) | |
| st.dataframe(df) | |
| st.download_button("Download as CSV", df.to_csv(index=False), "text_31_binary_labels.csv", key="download_csv_tab1_31csv") | |
| ascending_headers = sorted(mutation_site_headers_actual) | |
| df_sorted = df[[str(h) for h in ascending_headers if str(h) in df.columns]].copy() | |
| if "3614" not in df_sorted.columns: | |
| idx = df_sorted.columns.get_loc("3562") + 1 # Insert after 3562 | |
| df_sorted.insert(idx, "3614", 0) | |
| st.subheader("Binary Labels (Ascending Order 3244 → 4882)") | |
| st.dataframe(df_sorted) | |
| st.download_button("Download Ascending Order CSV", df_sorted.to_csv(index=False), "text_binary_labels_ascending.csv", key="download_csv_tab1_ascend") | |
| # === Robot Preparation Script Generation === | |
| st.subheader("Robot Preparation Script") | |
| robot_template = pd.read_csv("/home/user/app/Robot.csv", skiprows=3) | |
| robot_template.columns = ['Labware', 'Source', 'Labware_2', 'Destination', 'Volume', 'Tool', 'Name'] | |
| # Add Sample numbers for well referencing | |
| df_sorted.insert(0, 'Sample', range(1, len(df_sorted)+1)) | |
| # Step 1: Count the number of edited sites per row | |
| df_sorted['# donors'] = df_sorted.iloc[:, 1:].sum(axis=1) | |
| # Step 2: Calculate volume per donor (32 / # donors) | |
| df_sorted['volume donors (µl)'] = 32 / df_sorted['# donors'] | |
| # Step 3: Generate the robot script | |
| robot_script = [] | |
| source_wells = robot_template['Source'].unique().tolist() | |
| if len(source_wells) < 32: | |
| source_wells += [f"Fake{i}" for i in range(32 - len(source_wells))] | |
| source_wells = source_wells[:32] | |
| st.write(f"Number of source wells: {len(source_wells)}") | |
| st.write(f"Number of binary columns: {len(df_sorted.columns[1:33])}") | |
| for i, col in enumerate(df_sorted.columns[1:33]): | |
| for row_idx, sample in df_sorted.iterrows(): | |
| if sample[col] == 1: | |
| source = source_wells[i] | |
| dest = f"A{sample['Sample']}" | |
| vol = round(sample['volume donors (µl)'], 2) | |
| robot_script.append({'Source': source, 'Destination': dest, 'Volume': vol}) | |
| robot_script_df = pd.DataFrame(robot_script) | |
| st.dataframe(robot_script_df) | |
| st.download_button("Download Robot Script CSV", robot_script_df.to_csv(index=False), "robot_script.csv", key="download_csv_tab1_robot") | |
| # === Robot Preparation Script (Custom Order: 4402 → 3244, 4882 → 4455) === | |
| st.subheader("Robot Preparation Script (Custom Order: 4402 → 3244, 4882 → 4455)") | |
| # Include 3614 in custom header list | |
| custom_headers = [ | |
| 4402, 4349, 4298, 4245, 4190, 4145, 4089, 4039, | |
| 3985, 3933, 3879, 3824, 3773, 3720, 3665, 3614, | |
| 3562, 3509, 3455, 3399, 3350, 3297, 3244, | |
| 4882, 4828, 4773, 4720, 4668, 4615, 4561, 4510, 4455 | |
| ] | |
| # Create a copy of df and reorder columns based on custom headers | |
| df_sorted_custom = df[[str(h) for h in custom_headers if str(h) in df.columns]].copy() | |
| # Insert fake column "3614" if missing | |
| if "3614" not in df_sorted_custom.columns: | |
| idx = custom_headers.index(3614) | |
| insert_at = idx # 0-based index | |
| df_sorted_custom.insert(insert_at, "3614", 0) | |
| # Insert 'Sample' if missing | |
| if "Sample" not in df_sorted_custom.columns: | |
| df_sorted_custom.insert(0, 'Sample', range(1, len(df_sorted_custom) + 1)) | |
| # Calculate donor info | |
| df_sorted_custom['# donors'] = df_sorted_custom.iloc[:, 1:].sum(axis=1) | |
| df_sorted_custom['volume donors (µl)'] = 32 / df_sorted_custom['# donors'] | |
| # Generate robot script | |
| robot_script_custom = [] | |
| for i, col in enumerate(df_sorted_custom.columns[1:33]): # 32 columns after Sample | |
| for row_idx, sample in df_sorted_custom.iterrows(): | |
| if sample[col] == 1: | |
| source = source_wells[i] | |
| dest = f"A{sample['Sample']}" | |
| vol = round(sample['volume donors (µl)'], 2) | |
| robot_script_custom.append({'Source': source, 'Destination': dest, 'Volume': vol}) | |
| robot_script_custom_df = pd.DataFrame(robot_script_custom) | |
| st.dataframe(robot_script_custom_df) | |
| st.download_button("Download Custom Order Robot Script CSV", robot_script_custom_df.to_csv(index=False), "robot_script_custom_order.csv", key="download_csv_tab1_robot_custom") | |
| # Tab 2: EF → Binary | |
| with tab2: | |
| st.write("Upload an Editing Frequency CSV or enter manually:") | |
| st.write("**Note:** Please upload CSV files **without column headers**, in ascending order from 3244 to 4882.") | |
| ef_file = st.file_uploader("Upload EF CSV", type=["csv"], key="ef") | |
| if ef_file: | |
| ef_df = pd.read_csv(ef_file, header=None) | |
| ef_df.columns = [str(site) for site in sorted(mutation_site_headers_actual)] | |
| else: | |
| ef_df = pd.DataFrame(columns=[str(site) for site in sorted(mutation_site_headers_actual)]) | |
| edited_df = st.data_editor(ef_df, num_rows="dynamic") | |
| if st.button("Convert to Binary Labels", key="convert_button_tab2"): | |
| binary_part = pd.DataFrame() | |
| for col in sorted(mutation_site_headers_actual): | |
| col_str = str(col) | |
| threshold = thresholds_actual[col] | |
| binary_part[col_str] = (edited_df[col_str].astype(float) >= threshold).astype(int) | |
| binary_reordered = binary_part[[str(h) for h in mutation_site_headers if str(h) in binary_part.columns]] | |
| def color_binary(val): | |
| if val == 1: return "background-color: lightgreen" | |
| if val == 0: return "background-color: lightcoral" | |
| return "" | |
| st.subheader("Binary Labels (Reordered 4402→3244, 4882→4455)") | |
| styled = binary_reordered.style.applymap(color_binary) | |
| st.dataframe(styled) | |
| st.download_button("Download CSV", binary_reordered.to_csv(index=False), "ef_binary_labels.csv", key="download_csv_tab2_csv") | |
| all_bits = binary_reordered.values.flatten().tolist() | |
| decoded_string = binary_labels_to_string(all_bits) | |
| st.subheader("Decoded String (continuous across rows)") | |
| st.write(decoded_string) | |
| st.subheader("Binary Labels (Ascending 3244→4882)") | |
| st.dataframe(binary_part.style.applymap(color_binary)) | |
| st.download_button("Download Ascending Order CSV", binary_part.to_csv(index=False), "ef_binary_labels_ascending.csv", key="download_csv_tab2_ascend") | |
| all_bits = binary_part.values.flatten().tolist() | |
| decoded_string = binary_labels_to_string(all_bits) | |
| st.subheader("Decoded String (continuous across rows)") | |
| st.write(decoded_string) | |
| # Mutation site headers did not remove 3614, | |
| mutation_site_headers_actual_3614 = [ | |
| 3244, 3297, 3350, 3399, 3455, 3509, 3562, 3614, | |
| 3665, 3720, 3773, 3824, 3879, 3933, 3985, 4039, | |
| 4089, 4145, 4190, 4245, 4298, 4349, 4402, 4455, | |
| 4510, 4561, 4615, 4668, 4720, 4773, 4828, 4882 | |
| ] | |
| # Thresholds for each mutation site removed 3614: 0.091557752, | |
| thresholds_actual_3614 = pd.Series({ | |
| 3244: 1.096910677, 3297: 0.923658795, 3350: 0.668939037, 3399: 0.914305214, | |
| 3455: 1.297392984, 3509: 1.812636208, 3562: 1.185047484, 3614: 0.157969131375, | |
| 3665: 0.298007308, 3720: 0.58857544, 3773: 0.882561082, 3824: 1.149082617, | |
| 3879: 0.816050702, 3933: 2.936517653, 3985: 1.597166791, 4039: 0.962108082, | |
| 4089: 1.479783497, 4145: 0.305853225, 4190: 1.311869541, 4245: 1.707556905, | |
| 4298: 0.875013076, 4349: 1.227704526, 4402: 0.593206446, 4455: 1.179633137, | |
| 4510: 1.272477799, 4561: 1.293841573, 4615: 1.16821885, 4668: 1.40306, | |
| 4720: 0.706530878, 4773: 1.483114072, 4828: 0.954939873, 4882: 1.47524328 | |
| }) | |
| # Mutation site headers reordered: 4402 to 3244, 4882 to 4455 | |
| mutation_site_headers_3614 = [ | |
| 4402, 4349, 4298, 4245, 4190, 4145, 4089, 4039, | |
| 3985, 3933, 3879, 3824, 3773, 3720, 3665, 3614, | |
| 3562, 3509, 3455, 3399, 3350, 3297, 3244, # 1–23 | |
| 4882, 4828, 4773, 4720, 4668, 4615, 4561, 4510, 4455 # 24–32 | |
| ] | |
| # Thresholds reordered accordingly | |
| thresholds_3614 = pd.Series({h: thresholds_actual_3614[h] for h in mutation_site_headers_3614}) | |
| # === Utility functions === | |
| reverse_voyager_table = {v: k for k, v in voyager_table.items()} | |
| # Tab 3: Text to Binary (32) | |
| with tab3: | |
| user_input_32 = st.text_input("Enter text", value="DNA", key="input_text_32") | |
| if user_input_32: | |
| ascii_codes = [ord(c) for c in user_input_32] | |
| binary_labels = string_to_binary_labels(user_input_32) | |
| st.subheader("ASCII Codes") | |
| st.write(ascii_codes) | |
| st.subheader("Binary Labels per Character") | |
| grouped = [binary_labels[i:i+6] for i in range(0, len(binary_labels), 6)] | |
| for i, bits in enumerate(grouped): | |
| st.write(f"'{user_input_32[i]}' → {bits}") | |
| st.subheader("Binary Labels (32-bit groups)") | |
| groups = [] | |
| for i in range(0, len(binary_labels), 32): | |
| group = binary_labels[i:i+32] | |
| group += [0] * (32 - len(group)) | |
| groups.append(group + [sum(group)]) | |
| df = pd.DataFrame(groups, columns=[str(h) for h in mutation_site_headers_3614] + ["Edited Sites"]) | |
| st.dataframe(df) | |
| st.download_button("Download as CSV", df.to_csv(index=False), "text_32_binary_labels.csv", key="download_csv_tab3_csv") | |
| ascending_headers = sorted(mutation_site_headers_actual_3614) | |
| df_sorted = df[[str(h) for h in ascending_headers if str(h) in df.columns]] | |
| st.subheader("Binary Labels (Ascending Order 3244 → 4882)") | |
| st.dataframe(df_sorted) | |
| st.download_button("Download Ascending Order CSV", df_sorted.to_csv(index=False), "text_binary_labels_ascending.csv", key="download_csv_tab3_ascend") | |
| # === Robot Preparation Script Generation === | |
| st.subheader("Robot Preparation Script") | |
| robot_template = pd.read_csv("/home/user/app/Robot.csv", skiprows=3) | |
| robot_template.columns = ['Labware', 'Source', 'Labware_2', 'Destination', 'Volume', 'Tool', 'Name'] | |
| # Add Sample numbers for well referencing | |
| df_sorted.insert(0, 'Sample', range(1, len(df_sorted)+1)) | |
| # Step 1: Count the number of edited sites per row | |
| df_sorted['# donors'] = df_sorted.iloc[:, 1:].sum(axis=1) | |
| # Step 2: Calculate volume per donor (32 / # donors) | |
| df_sorted['volume donors (µl)'] = 32 / df_sorted['# donors'] | |
| # Step 3: Generate the robot script | |
| robot_script = [] | |
| source_wells = robot_template['Source'].unique().tolist()[:32] | |
| for i, col in enumerate(df_sorted.columns[1:33]): | |
| for row_idx, sample in df_sorted.iterrows(): | |
| if sample[col] == 1: | |
| source = source_wells[i] | |
| dest = f"A{sample['Sample']}" | |
| vol = round(sample['volume donors (µl)'], 2) | |
| robot_script.append({'Source': source, 'Destination': dest, 'Volume': vol}) | |
| robot_script_df = pd.DataFrame(robot_script) | |
| st.dataframe(robot_script_df) | |
| st.download_button("Download Robot Script CSV", robot_script_df.to_csv(index=False), "robot_script.csv", key="download_csv_tab3_robot") | |
| # Tab 4: EF → Binary (32) | |
| with tab4: | |
| st.write("Upload an Editing Frequency CSV or enter manually:") | |
| st.write("**Note:** Please upload CSV files **without column headers**, in ascending order from 3244 to 4882.") | |
| ef_file_2 = st.file_uploader("Upload EF CSV", type=["csv"], key="ef2") | |
| if ef_file_2: | |
| ef_df = pd.read_csv(ef_file_2, header=None) | |
| ef_df.columns = [str(site) for site in sorted(mutation_site_headers_actual_3614)] | |
| else: | |
| ef_df = pd.DataFrame(columns=[str(site) for site in sorted(mutation_site_headers_actual_3614)]) | |
| edited_df = st.data_editor(ef_df, num_rows="dynamic") | |
| if st.button("Convert to Binary Labels", key="convert_button_tab4"): | |
| binary_part = pd.DataFrame() | |
| for col in sorted(mutation_site_headers_actual_3614): | |
| col_str = str(col) | |
| threshold = thresholds_actual_3614[col] | |
| binary_part[col_str] = (edited_df[col_str].astype(float) >= threshold).astype(int) | |
| binary_reordered = binary_part[[str(h) for h in mutation_site_headers_3614 if str(h) in binary_part.columns]] | |
| def color_binary(val): | |
| if val == 1: return "background-color: lightgreen" | |
| if val == 0: return "background-color: lightcoral" | |
| return "" | |
| st.subheader("Binary Labels (Reordered 4402→3244, 4882→4455)") | |
| styled = binary_reordered.style.applymap(color_binary) | |
| st.dataframe(styled) | |
| st.download_button("Download CSV", binary_reordered.to_csv(index=False), "ef_binary_labels.csv", key="download_csv_tab4_csv") | |
| all_bits = binary_reordered.values.flatten().tolist() | |
| decoded_string = binary_labels_to_string(all_bits) | |
| st.subheader("Decoded String (continuous across rows)") | |
| st.write(decoded_string) | |
| st.subheader("Binary Labels (Ascending 3244→4882)") | |
| st.dataframe(binary_part.style.applymap(color_binary)) | |
| st.download_button("Download Ascending Order CSV", binary_part.to_csv(index=False), "ef_binary_labels_ascending.csv", key="download_csv_tab4_ascend") | |
| all_bits = binary_part.values.flatten().tolist() | |
| decoded_string = binary_labels_to_string(all_bits) | |
| st.subheader("Decoded String (continuous across rows)") | |
| st.write(decoded_string) | |
| def get_well_position(sample_index): | |
| """ | |
| Convert sample index (1-based) into well position (e.g., A1, A2, ..., B1, B2, ..., etc.) | |
| """ | |
| row_letter = chr(65 + (sample_index - 1) // 12) # 65 = 'A' | |
| col_number = ((sample_index - 1) % 12) + 1 | |
| return f"{row_letter}{col_number}" | |
| # # Tab 5: Binary → String | |
| # with tab5: | |
| # st.header("Decode Binary Labels to String") | |
| # # Utility: Track source volumes and update if exceeds limit | |
| # def track_and_replace_source(source_list, robot_script, volume_limit=180): | |
| # source_volumes = {} | |
| # adjusted_sources = [] | |
| # for entry in robot_script: | |
| # src = entry['Source'] | |
| # vol = entry['Volume'] | |
| # if src not in source_volumes: | |
| # source_volumes[src] = 0 | |
| # source_volumes[src] += vol | |
| # if source_volumes[src] > volume_limit: | |
| # row_letter = src[0] | |
| # col_number = src[1:] | |
| # new_row_letter = chr(ord(row_letter) + 4) | |
| # new_src = f"{new_row_letter}{col_number}" | |
| # entry['Source'] = new_src | |
| # if new_src not in source_volumes: | |
| # source_volumes[new_src] = 0 | |
| # source_volumes[new_src] += vol | |
| # source_volumes[src] -= vol | |
| # adjusted_sources.append(entry) | |
| # return adjusted_sources, source_volumes | |
| # # Utility: Generate fixed-volume D source to all sample wells | |
| # def generate_fixed_d_source_instructions_to_all_samples(n_samples, fixed_volume=16, volume_limit=170): | |
| # d_source_volumes = {} | |
| # d_source_script = [] | |
| # current_d_index = 1 | |
| # for i in range(n_samples): | |
| # dest = get_well_position(i + 1) | |
| # current_d_well = f"D{current_d_index}" | |
| # if current_d_well not in d_source_volumes: | |
| # d_source_volumes[current_d_well] = 0 | |
| # if d_source_volumes[current_d_well] + fixed_volume > volume_limit: | |
| # current_d_index += 1 | |
| # current_d_well = f"D{current_d_index}" | |
| # d_source_volumes[current_d_well] = 0 | |
| # d_source_volumes[current_d_well] += fixed_volume | |
| # tool = 'TS_10' if fixed_volume < 10 else 'TS_50' | |
| # d_source_script.append({ | |
| # 'Source': current_d_well, | |
| # 'Destination': dest, | |
| # 'Volume': fixed_volume, | |
| # 'Tool': tool | |
| # }) | |
| # return d_source_script, d_source_volumes | |
| # def generate_source_wells(n): | |
| # wells = [] | |
| # rows = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | |
| # for i in range(n): | |
| # row = rows[i // 12] # cycle through A, B, C... | |
| # col = (i % 12) + 1 # 1 to 12 | |
| # wells.append(f"{row}{col}") | |
| # return wells | |
| # st.subheader("Binary per Row") | |
| # st.write("Upload CSV with any number of columns (0 or 1), no headers, from EF Binary format or enter manually below.") | |
| # binary32_file = st.file_uploader("Upload Binary CSV", type=["csv"], key="binary_any") | |
| # st.subheader("Optional Metadata (Optional)") | |
| # barcode_id_input = st.text_input("Barcode ID (applied to all rows, optional)", value="") | |
| # labware_source_input = st.text_input("Labware for Source (optional, default = 1)", value="1") | |
| # labware_dest_input = st.text_input("Labware for Destination (optional, default = 1)", value="1") | |
| # name_input = st.text_input("Name field (optional, default = blank)", value="") | |
| # if binary32_file: | |
| # df_32 = pd.read_csv(binary32_file, header=None) | |
| # df_32.columns = [str(h) for h in range(1, len(df_32.columns)+1)] | |
| # else: | |
| # df_32 = st.data_editor( | |
| # pd.DataFrame(columns=[str(h) for h in range(1, 33)]), | |
| # num_rows="dynamic", | |
| # key="manual_any_input" | |
| # ) | |
| # if not df_32.empty: | |
| # st.subheader("Binary Labels (Uploaded)") | |
| # st.dataframe(df_32.style.applymap(lambda v: "background-color: lightgreen" if v == 1 else "background-color: lightcoral")) | |
| # st.download_button("Download CSV", df_32.to_csv(index=False), "decoded_binary_uploaded.csv", key="download_csv_uploaded") | |
| # decoded = binary_labels_to_string(df_32.values.flatten().astype(int).tolist()) | |
| # st.subheader("Decoded String") | |
| # st.write(decoded) | |
| # st.download_button("Download Concatenated Output", decoded, "decoded_binary_string.txt", key="download_txt_any") | |
| # st.subheader("Robot Preparation Script from Binary") | |
| # df_32_robot = df_32.copy() | |
| # df_32_robot.insert(0, 'Sample', range(1, len(df_32_robot)+1)) | |
| # df_32_robot['# donors'] = df_32_robot.iloc[:, 1:].astype(int).sum(axis=1) | |
| # df_32_robot['volume donors (µl)'] = 64 / df_32_robot['# donors'] | |
| # robot_script_32 = [] | |
| # source_wells_32 = generate_source_wells(df_32.shape[1]) | |
| # for i, col in enumerate(df_32.columns): | |
| # for row_idx, sample in df_32_robot.iterrows(): | |
| # if int(sample[col]) == 1: | |
| # source = source_wells_32[i] | |
| # dest = get_well_position(int(sample['Sample'])) | |
| # vol = round(sample['volume donors (µl)'], 2) | |
| # tool = 'TS_10' if vol < 10 else 'TS_50' | |
| # robot_script_32.append({ | |
| # 'Source': source, | |
| # 'Destination': dest, | |
| # 'Volume': vol, | |
| # 'Tool': tool | |
| # }) | |
| # robot_script_32, source_volumes_32 = track_and_replace_source(source_wells_32, robot_script_32) | |
| # d_script, d_volumes = generate_fixed_d_source_instructions_to_all_samples(len(df_32_robot)) | |
| # full_robot_script = robot_script_32 + d_script | |
| # robot_script_32_df = pd.DataFrame(full_robot_script) | |
| # robot_script_32_df.insert(0, 'Barcode ID', barcode_id_input) | |
| # robot_script_32_df.insert(1, 'Labware_Source', labware_source_input) | |
| # robot_script_32_df.insert(3, 'Labware_Destination', labware_dest_input) | |
| # robot_script_32_df['Name'] = name_input | |
| # robot_script_32_df = robot_script_32_df[['Barcode ID', 'Labware_Source', 'Source', 'Labware_Destination', 'Destination', 'Volume', 'Tool', 'Name']] | |
| # st.dataframe(robot_script_32_df) | |
| # st.download_button("Download Robot Script", robot_script_32_df.to_csv(index=False), "robot_script.csv", key="download_robot_any") | |
| # st.subheader("Total Volume Used Per Source") | |
| # combined_volumes = {**source_volumes_32, **d_volumes} | |
| # source_volume_df = pd.DataFrame(list(combined_volumes.items()), columns=['Source', 'Total Volume (µl)']) | |
| # st.dataframe(source_volume_df) | |
| # st.download_button("Download Source Volumes", source_volume_df.to_csv(index=False), "source_total_volumes.csv", key="download_volume_any") | |
| # Tab 5: Binary → String | |
| with tab5: | |
| st.header("Decode Binary Labels to String") | |
| # Utility: Track source volumes and update if exceeds limit | |
| def track_and_replace_source(source_list, robot_script, volume_limit=150): | |
| source_volumes = {} | |
| adjusted_sources = [] | |
| for entry in robot_script: | |
| src = entry['Source'] | |
| vol = entry['Volume'] | |
| if src not in source_volumes: | |
| source_volumes[src] = 0 | |
| source_volumes[src] += vol | |
| if source_volumes[src] > volume_limit: | |
| row_letter = src[0] | |
| col_number = src[1:] | |
| new_row_letter = chr(ord(row_letter) + 4) | |
| new_src = f"{new_row_letter}{col_number}" | |
| entry['Source'] = new_src | |
| if new_src not in source_volumes: | |
| source_volumes[new_src] = 0 | |
| source_volumes[new_src] += vol | |
| source_volumes[src] -= vol | |
| adjusted_sources.append(entry) | |
| return adjusted_sources, source_volumes | |
| # Utility: Generate fixed-volume D source to all sample wells | |
| def generate_fixed_d_source_instructions_to_all_samples(n_samples, fixed_volume=16, volume_limit=170): | |
| d_source_volumes = {} | |
| d_source_script = [] | |
| current_d_index = 1 | |
| for i in range(n_samples): | |
| dest = get_well_position(i + 1) | |
| current_d_well = f"D{current_d_index}" | |
| if current_d_well not in d_source_volumes: | |
| d_source_volumes[current_d_well] = 0 | |
| if d_source_volumes[current_d_well] + fixed_volume > volume_limit: | |
| current_d_index += 1 | |
| current_d_well = f"D{current_d_index}" | |
| d_source_volumes[current_d_well] = 0 | |
| d_source_volumes[current_d_well] += fixed_volume | |
| # Split if >10 and assign TS_10 | |
| if fixed_volume > 10: | |
| half_vol = round(fixed_volume / 2, 2) | |
| d_source_script.append({ | |
| 'Source': current_d_well, | |
| 'Destination': dest, | |
| 'Volume': half_vol, | |
| 'Tool': 'TS_10' | |
| }) | |
| d_source_script.append({ | |
| 'Source': current_d_well, | |
| 'Destination': dest, | |
| 'Volume': fixed_volume - half_vol, | |
| 'Tool': 'TS_10' | |
| }) | |
| else: | |
| d_source_script.append({ | |
| 'Source': current_d_well, | |
| 'Destination': dest, | |
| 'Volume': fixed_volume, | |
| 'Tool': 'TS_10' | |
| }) | |
| return d_source_script, d_source_volumes | |
| def generate_source_wells(n): | |
| wells = [] | |
| rows = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | |
| for i in range(n): | |
| row = rows[i // 12] # cycle through A, B, C... | |
| col = (i % 12) + 1 # 1 to 12 | |
| wells.append(f"{row}{col}") | |
| return wells | |
| st.subheader("Binary per Row") | |
| st.write("Upload CSV with any number of columns (0 or 1), no headers, from EF Binary format or enter manually below.") | |
| binary32_file = st.file_uploader("Upload Binary CSV", type=["csv"], key="binary_any") | |
| st.subheader("Optional Metadata (Optional)") | |
| barcode_id_input = st.text_input("Barcode ID (applied to all rows, optional)", value="") | |
| labware_source_input = st.text_input("Labware for Source (optional, default = 1)", value="1") | |
| labware_dest_input = st.text_input("Labware for Destination (optional, default = 1)", value="1") | |
| name_input = st.text_input("Name field (optional, default = blank)", value="") | |
| volume_limit_input = st.number_input("Maximum Volume Per Source Well (µl)", value=150) | |
| if binary32_file: | |
| df_32 = pd.read_csv(binary32_file, header=None) | |
| df_32.columns = [str(h) for h in range(1, len(df_32.columns)+1)] | |
| else: | |
| df_32 = st.data_editor( | |
| pd.DataFrame(columns=[str(h) for h in range(1, 33)]), | |
| num_rows="dynamic", | |
| key="manual_any_input" | |
| ) | |
| if not df_32.empty: | |
| st.subheader("Binary Labels (Uploaded)") | |
| st.dataframe(df_32.style.applymap(lambda v: "background-color: lightgreen" if v == 1 else "background-color: lightcoral")) | |
| st.download_button("Download CSV", df_32.to_csv(index=False), "decoded_binary_uploaded.csv", key="download_csv_uploaded") | |
| decoded = binary_labels_to_string(df_32.values.flatten().astype(int).tolist()) | |
| st.subheader("Decoded String") | |
| st.write(decoded) | |
| st.download_button("Download Concatenated Output", decoded, "decoded_binary_string.txt", key="download_txt_any") | |
| st.subheader("Robot Preparation Script from Binary") | |
| df_32_robot = df_32.copy() | |
| df_32_robot.insert(0, 'Sample', range(1, len(df_32_robot)+1)) | |
| df_32_robot['# donors'] = df_32_robot.iloc[:, 1:].astype(int).sum(axis=1) | |
| df_32_robot['volume donors (µl)'] = 64 / df_32_robot['# donors'] | |
| robot_script_32 = [] | |
| source_wells_32 = generate_source_wells(df_32.shape[1]) | |
| for i, col in enumerate(df_32.columns): | |
| for row_idx, sample in df_32_robot.iterrows(): | |
| if int(sample[col]) == 1: | |
| source = source_wells_32[i] | |
| dest = get_well_position(int(sample['Sample'])) | |
| vol = round(sample['volume donors (µl)'], 2) | |
| if vol > 10: | |
| half_vol = round(vol / 2, 2) | |
| robot_script_32.append({ | |
| 'Source': source, | |
| 'Destination': dest, | |
| 'Volume': half_vol, | |
| 'Tool': 'TS_10' | |
| }) | |
| robot_script_32.append({ | |
| 'Source': source, | |
| 'Destination': dest, | |
| 'Volume': vol - half_vol, | |
| 'Tool': 'TS_10' | |
| }) | |
| else: | |
| robot_script_32.append({ | |
| 'Source': source, | |
| 'Destination': dest, | |
| 'Volume': vol, | |
| 'Tool': 'TS_10' | |
| }) | |
| robot_script_32, source_volumes_32 = track_and_replace_source(source_wells_32, robot_script_32, volume_limit=volume_limit_input) | |
| d_script, d_volumes = generate_fixed_d_source_instructions_to_all_samples(len(df_32_robot), fixed_volume=16, volume_limit=volume_limit_input) | |
| full_robot_script = robot_script_32 + d_script | |
| robot_script_32_df = pd.DataFrame(full_robot_script) | |
| robot_script_32_df.insert(0, 'Barcode ID', barcode_id_input) | |
| robot_script_32_df.insert(1, 'Labware_Source', labware_source_input) | |
| robot_script_32_df.insert(3, 'Labware_Destination', labware_dest_input) | |
| robot_script_32_df['Name'] = name_input | |
| robot_script_32_df = robot_script_32_df[['Barcode ID', 'Labware_Source', 'Source', 'Labware_Destination', 'Destination', 'Volume', 'Tool', 'Name']] | |
| st.dataframe(robot_script_32_df) | |
| st.download_button("Download Robot Script", robot_script_32_df.to_csv(index=False), "robot_script.csv", key="download_robot_any") | |
| st.subheader("Total Volume Used Per Source") | |
| combined_volumes = {**source_volumes_32, **d_volumes} | |
| source_volume_df = pd.DataFrame(list(combined_volumes.items()), columns=['Source', 'Total Volume (µl)']) | |
| st.dataframe(source_volume_df) | |
| st.download_button("Download Source Volumes", source_volume_df.to_csv(index=False), "source_total_volumes.csv", key="download_volume_any") | |
| import streamlit as st | |
| import pandas as pd | |
| # === App Title === | |
| with tab6: | |
| st.header("Robot Script Generator") | |
| # === Voyager ASCII 6-bit conversion table === | |
| voyager_table = { | |
| i: ch for i, ch in enumerate([ | |
| ' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', | |
| 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', | |
| 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', | |
| '3', '4', '5', '6', '7', '8', '9', '.', ',', '(', | |
| ')', '+', '-', '*', '/', '=', '$', '!', ':', '%', | |
| '"', '#', '@', "'", '?', '&' | |
| ]) | |
| } | |
| reverse_voyager_table = {v: k for k, v in voyager_table.items()} | |
| # === Binary → String conversion === | |
| def binary_labels_to_string(bits: list[int]) -> str: | |
| chars = [] | |
| for i in range(0, len(bits), 6): | |
| chunk = bits[i:i+6] | |
| if len(chunk) < 6: | |
| chunk += [0] * (6 - len(chunk)) | |
| val = sum(b << (5 - j) for j, b in enumerate(chunk)) | |
| chars.append(voyager_table.get(val, '?')) | |
| return ''.join(chars) | |
| # === Well mapping === | |
| def get_well_position(sample_index): | |
| """Convert sample index (1-based) into A1–H12 pattern within its plate""" | |
| row_letter = chr(65 + ((sample_index - 1) % 96) // 12) # 8 rows (A–H) | |
| col_number = ((sample_index - 1) % 12) + 1 # 12 columns | |
| return f"{row_letter}{col_number}" | |
| def get_plate_id(sample_index): | |
| """Return Plate number based on 96 samples per plate""" | |
| plate_number = math.ceil(sample_index / 96) | |
| return f"Plate {plate_number}" | |
| # === Track and replace source if volume exceeded === | |
| def track_and_replace_source(source_list, robot_script, volume_limit=150): | |
| source_volumes = {} | |
| adjusted_sources = [] | |
| for entry in robot_script: | |
| src = entry['Source'] | |
| vol = entry['Volume'] | |
| source_volumes[src] = source_volumes.get(src, 0) + vol | |
| if source_volumes[src] > volume_limit: | |
| row_letter = src[0] | |
| col_number = src[1:] | |
| new_row_letter = chr(ord(row_letter) + 4) | |
| new_src = f"{new_row_letter}{col_number}" | |
| entry['Source'] = new_src | |
| source_volumes[new_src] = source_volumes.get(new_src, 0) + vol | |
| source_volumes[src] -= vol | |
| adjusted_sources.append(entry) | |
| return adjusted_sources, source_volumes | |
| # === Fixed D-source transfers === | |
| def generate_fixed_d_source_instructions_to_all_samples(n_samples, fixed_volume=16, volume_limit=170): | |
| d_source_volumes = {} | |
| d_source_script = [] | |
| current_d_index = 1 | |
| for i in range(n_samples): | |
| dest = get_well_position(i + 1) | |
| plate = get_plate_id(i + 1) | |
| current_d_well = f"D{current_d_index}" | |
| d_source_volumes.setdefault(current_d_well, 0) | |
| if d_source_volumes[current_d_well] + fixed_volume > volume_limit: | |
| current_d_index += 1 | |
| current_d_well = f"D{current_d_index}" | |
| d_source_volumes[current_d_well] = 0 | |
| d_source_volumes[current_d_well] += fixed_volume | |
| tool = 'TS_50' if fixed_volume > 10 else 'TS_10' | |
| d_source_script.append({ | |
| 'Plate': plate, | |
| 'Source': current_d_well, | |
| 'Destination': dest, | |
| 'Volume': fixed_volume, | |
| 'Tool': tool | |
| }) | |
| return d_source_script, d_source_volumes | |
| def generate_source_wells(n): | |
| wells, rows = [], 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | |
| for i in range(n): | |
| row, col = rows[i // 12], (i % 12) + 1 | |
| wells.append(f"{row}{col}") | |
| return wells | |
| # === Main UI === | |
| st.header("Upload Binary Data (0/1)") | |
| binary_file = st.file_uploader("Upload Binary CSV", type=["csv"]) | |
| st.divider() | |
| st.subheader("Optional Metadata") | |
| barcode_id_input = st.text_input("Barcode ID (optional)", value="") | |
| labware_source_input = st.text_input("Labware Source ID", value="1") | |
| labware_dest_input = st.text_input("Labware Destination ID", value="1") | |
| name_input = st.text_input("Name field (optional)", value="") | |
| volume_limit_input = st.number_input("Maximum Volume per Source Well (µL)", value=150, min_value=10, step=10) | |
| # === Load Data === | |
| if binary_file: | |
| df_binary = pd.read_csv(binary_file, header=None) | |
| df_binary.columns = [str(i+1) for i in range(df_binary.shape[1])] | |
| else: | |
| st.info("No file uploaded — manually enter binary data below.") | |
| df_binary = st.data_editor( | |
| pd.DataFrame(columns=[str(i) for i in range(1, 33)]), | |
| num_rows="dynamic", key="manual_input" | |
| ) | |
| if not df_binary.empty: | |
| st.subheader("Binary Matrix") | |
| st.dataframe(df_binary.style.applymap(lambda v: "background-color: lightgreen" if v == 1 else "background-color: lightcoral")) | |
| st.download_button("⬇️ Download Binary CSV", df_binary.to_csv(index=False), "binary_matrix.csv") | |
| # Decode to string | |
| decoded = binary_labels_to_string(df_binary.values.flatten().astype(int).tolist()) | |
| st.subheader("Decoded String Output") | |
| st.code(decoded) | |
| st.download_button("⬇️ Download Decoded String", decoded, "decoded_string.txt") | |
| # === Generate Robot Script === | |
| st.divider() | |
| st.subheader("Generated Robot Script") | |
| df_robot = df_binary.copy() | |
| df_robot.insert(0, 'Sample', range(1, len(df_robot) + 1)) | |
| df_robot['# donors'] = df_robot.iloc[:, 1:].astype(int).sum(axis=1) | |
| df_robot['volume donors (µL)'] = 64 / df_robot['# donors'] | |
| robot_script = [] | |
| source_wells = generate_source_wells(df_robot.shape[1] - 1) | |
| for i, col in enumerate(df_robot.columns[1:]): | |
| for _, sample in df_robot.iterrows(): | |
| if int(sample[col]) == 1: | |
| sample_id = int(sample['Sample']) | |
| plate = get_plate_id(sample_id) | |
| source = source_wells[i] | |
| dest = get_well_position(sample_id) | |
| vol = round(sample['volume donors (µL)'], 2) | |
| tool = 'TS_50' if vol > 10 else 'TS_10' | |
| robot_script.append({ | |
| 'Plate': plate, # ✅ New Column | |
| 'Source': source, | |
| 'Destination': dest, | |
| 'Volume': vol, | |
| 'Tool': tool | |
| }) | |
| robot_script, source_volumes = track_and_replace_source(source_wells, robot_script, volume_limit=volume_limit_input) | |
| d_script, d_volumes = generate_fixed_d_source_instructions_to_all_samples( | |
| len(df_robot), fixed_volume=16, volume_limit=volume_limit_input | |
| ) | |
| full_script = robot_script + d_script | |
| robot_script_df = pd.DataFrame(full_script) | |
| robot_script_df.insert(0, 'Barcode ID', barcode_id_input) | |
| robot_script_df.insert(1, 'Labware_Source', labware_source_input) | |
| robot_script_df.insert(3, 'Labware_Destination', labware_dest_input) | |
| robot_script_df['Name'] = name_input | |
| robot_script_df = robot_script_df[['Barcode ID', 'Labware_Source', 'Plate', | |
| 'Source', 'Labware_Destination', 'Destination', | |
| 'Volume', 'Tool', 'Name']] | |
| st.dataframe(robot_script_df) | |
| st.download_button("⬇️ Download Robot Script", robot_script_df.to_csv(index=False), "robot_script.csv") | |
| # === Source Volume Summary === | |
| st.divider() | |
| st.subheader("Total Volume Used Per Source") | |
| combined_volumes = {**source_volumes, **d_volumes} | |
| volume_df = pd.DataFrame(list(combined_volumes.items()), columns=['Source', 'Total Volume (µL)']) | |
| st.dataframe(volume_df) | |
| st.download_button("⬇️ Download Volume Summary", volume_df.to_csv(index=False), "source_volumes.csv") | |