Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from pathlib import Path | |
| import pandas as pd | |
| import re | |
| import json | |
| import warnings | |
| from io import BytesIO | |
| warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl") | |
| # --- helper functions (kept from your script) --- | |
| STYLE_PATTERNS = [re.compile(p, re.I) for p in [ | |
| r"^style[_\s\-]?id$", r"styleid", r"style[_\s\-]?code", r"^sku$" | |
| ]] | |
| def find_styleid_column(columns): | |
| for col in columns: | |
| s = str(col) | |
| for p in STYLE_PATTERNS: | |
| if p.search(s): | |
| return col | |
| for col in columns: | |
| low = str(col).lower() | |
| if 'style' in low and 'id' in low: | |
| return col | |
| return None | |
| def find_brand_size_start(columns): | |
| for col in columns: | |
| s = str(col).lower() | |
| if "brand" in s and "size" in s: | |
| return col | |
| for col in columns: | |
| if "size" in str(col).lower(): | |
| return col | |
| return None | |
| def unique_preserve_order(seq): | |
| seen = set() | |
| out = [] | |
| for x in seq: | |
| if x not in seen: | |
| seen.add(x) | |
| out.append(x) | |
| return out | |
| # --- Streamlit UI --- | |
| st.set_page_config(page_title="Size Chart Merger", layout="wide") | |
| st.title("Size Chart \u2194 Product Details Merger") | |
| st.write("Upload a Size Chart workbook and a Product Details workbook (matching sheet names). This app merges size columns into product details.") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| size_file = st.file_uploader("Upload Size Chart Excel", type=["xlsx", "xlsm"], key="size") | |
| with col2: | |
| prod_file = st.file_uploader("Upload Product Details Excel", type=["xlsx", "xlsm"], key="prod") | |
| output_name = st.text_input("Output filename (optional)", value="input.xlsx") | |
| show_logs = st.checkbox("Show detailed log", value=True) | |
| if st.button("Run merge"): | |
| if size_file is None or prod_file is None: | |
| st.error("Please upload both files before running the merge.") | |
| else: | |
| try: | |
| size_xl = pd.ExcelFile(size_file, engine="openpyxl") | |
| prod_xl = pd.ExcelFile(prod_file, engine="openpyxl") | |
| size_sheets = size_xl.sheet_names | |
| prod_sheets = prod_xl.sheet_names | |
| product_dfs = {name: prod_xl.parse(name, dtype=str) for name in prod_sheets} | |
| log = [] | |
| progress_bar = st.progress(0) | |
| total = len(size_sheets) | |
| for i, sheet_name in enumerate(size_sheets): | |
| # update progress | |
| progress_bar.progress(int((i / max(total, 1)) * 100)) | |
| st.write(f"Processing sheet: **{sheet_name}**") | |
| if sheet_name not in prod_sheets: | |
| log.append(f"Skipped '{sheet_name}': not present in Product Details.") | |
| continue | |
| size_df = size_xl.parse(sheet_name, dtype=str) | |
| prod_df = product_dfs[sheet_name] | |
| size_df.columns = [str(c) for c in size_df.columns] | |
| prod_df.columns = [str(c) for c in prod_df.columns] | |
| style_col_size = find_styleid_column(size_df.columns) | |
| style_col_prod = find_styleid_column(prod_df.columns) | |
| if style_col_size is None: | |
| log.append(f"Sheet '{sheet_name}': could not detect style id in Size Chart.") | |
| continue | |
| if style_col_prod is None: | |
| style_col_prod = style_col_size | |
| prod_df[style_col_prod] = pd.NA | |
| log.append(f"Sheet '{sheet_name}': Product Details missing style id; created '{style_col_prod}'.") | |
| brand_size_col = find_brand_size_start(size_df.columns) | |
| if brand_size_col is None: | |
| log.append(f"Sheet '{sheet_name}': no size column found. Skipping.") | |
| continue | |
| size_cols = list(size_df.columns) | |
| start_idx = size_cols.index(brand_size_col) | |
| size_columns_to_merge = size_cols[start_idx:] | |
| if brand_size_col not in prod_df.columns: | |
| prod_df[brand_size_col] = pd.NA | |
| log.append(f"Sheet '{sheet_name}': created '{brand_size_col}' in Product Details.") | |
| for col in size_columns_to_merge: | |
| if col not in prod_df.columns: | |
| prod_df[col] = pd.NA | |
| log.append(f"Sheet '{sheet_name}': inserted missing column '{col}'.") | |
| long = size_df.melt(id_vars=[style_col_size], value_vars=size_columns_to_merge, | |
| var_name="col_name", value_name="value") | |
| long["value"] = long["value"].astype(str).str.strip() | |
| invalid = long["value"].isin(["", "nan", "none", "na"]) | |
| long = long[~invalid] | |
| if long.empty: | |
| log.append(f"Sheet '{sheet_name}': no valid size entries to merge.") | |
| continue | |
| grouped = ( | |
| long.groupby([style_col_size, "col_name"])['value'] | |
| .apply(lambda s: json.dumps(unique_preserve_order(list(s)), ensure_ascii=False)) | |
| .reset_index() | |
| ) | |
| pivot = grouped.pivot(index=style_col_size, columns="col_name", values="value") | |
| pivot.reset_index(inplace=True) | |
| pivot.rename(columns={style_col_size: style_col_prod}, inplace=True) | |
| merged = prod_df.merge(pivot, on=style_col_prod, how="outer", suffixes=("", "_new")) | |
| for col in size_columns_to_merge: | |
| newcol = col + "_new" | |
| if newcol in merged.columns: | |
| merged[col] = merged[newcol].combine_first(merged[col]) | |
| merged.drop(columns=newcol, inplace=True) | |
| product_dfs[sheet_name] = merged | |
| log.append(f"Sheet '{sheet_name}': merged {pivot.shape[0]} style ids.") | |
| progress_bar.progress(100) | |
| # write result to an in-memory Excel file | |
| output = BytesIO() | |
| with pd.ExcelWriter(output, engine="xlsxwriter") as writer: | |
| for name, df in product_dfs.items(): | |
| df.to_excel(writer, sheet_name=name[:31], index=False) | |
| output.seek(0) | |
| st.success("Merge complete!") | |
| st.write(f"Output ready: **{output_name}**") | |
| if show_logs: | |
| st.subheader("Merge Log") | |
| for line in log: | |
| st.write("- ", line) | |
| # show previews and download | |
| st.subheader("Preview of merged sheets") | |
| for name, df in product_dfs.items(): | |
| with st.expander(f"Sheet: {name} ({len(df)} rows)"): | |
| st.dataframe(df.head(200)) | |
| st.download_button( | |
| label="Download merged workbook", | |
| data=output.getvalue(), | |
| file_name=output_name if output_name.endswith('.xlsx') else output_name + '.xlsx', | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| except Exception as e: | |
| st.exception(e) | |