Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import tempfile | |
| import re | |
| import pandas as pd | |
| import streamlit as st | |
| import camelot | |
| from st_aggrid import AgGrid, GridOptionsBuilder, GridUpdateMode | |
| from rapidfuzz import fuzz | |
| APP_VERSION = "v0.0.1 (2025-09-03)" # <- update this when you ship | |
| st.set_page_config(page_title="PDF β Tables Cleaner", layout="wide") | |
| st.title("PDF Table Merge & Cleanup (Camelot β Ag-Grid)") | |
| # ---------------- Helpers ---------------- | |
| def parse_excel_to_all_dfs( | |
| file_bytes: bytes, | |
| sheet: str | int | None, | |
| first_row_is_header: bool, | |
| skiprows: int = 0, | |
| skipcols: int = 0, | |
| last_row: int = 0, # 1-based, 0 = till end | |
| last_col: int = 0, # 1-based, 0 = till end | |
| ): | |
| """ | |
| Return list[pd.DataFrame] from an Excel file, cropped to a rectangle. | |
| Cropping logic: | |
| - Drop the first `skiprows` rows and first `skipcols` columns | |
| - If last_row > 0, keep rows up to `last_row` (1-based) AFTER the initial sheet start | |
| - If last_col > 0, keep cols up to `last_col` (1-based) AFTER the initial sheet start | |
| """ | |
| header = 0 if first_row_is_header else None | |
| all_dfs = [] | |
| bio = io.BytesIO(file_bytes) | |
| xls = pd.ExcelFile(bio) | |
| sheet_names = xls.sheet_names | |
| targets = sheet_names if sheet is None else [sheet] | |
| for s in targets: | |
| # Read after skipping top rows | |
| df = pd.read_excel(io.BytesIO(file_bytes), sheet_name=s, header=header, dtype=str, skiprows=skiprows) | |
| # Apply last_row (relative to the sheet start; after skipping) | |
| if last_row and last_row > 0: | |
| # Convert to 0-based slice length AFTER skiprows | |
| nrows_after_skip = max(last_row - skiprows, 0) | |
| df = df.iloc[:nrows_after_skip, :] | |
| # Cut left columns, then apply last_col relative to that | |
| df = df.iloc[:, skipcols:] | |
| if last_col and last_col > 0: | |
| ncols_after_skip = max(last_col - skipcols, 0) | |
| df = df.iloc[:, :ncols_after_skip] | |
| df.columns = [str(c) for c in df.columns] | |
| all_dfs.append(df) | |
| return all_dfs, sheet_names | |
| def parse_csv_to_all_dfs( | |
| file_bytes: bytes, | |
| first_row_is_header: bool, | |
| sep: str = ",", | |
| skiprows: int = 0, | |
| skipcols: int = 0, | |
| last_row: int = 0, # 1-based, 0 = till end | |
| last_col: int = 0, # 1-based, 0 = till end | |
| ): | |
| """ | |
| Return list[pd.DataFrame] from a CSV file (single table). | |
| """ | |
| header = 0 if first_row_is_header else None | |
| # Read after skipping top rows | |
| df = pd.read_csv(io.BytesIO(file_bytes), header=header, sep=sep, dtype=str, encoding="utf-8-sig", skiprows=skiprows) | |
| # Apply last_row (after skips) | |
| if last_row and last_row > 0: | |
| nrows_after_skip = max(last_row - skiprows, 0) | |
| df = df.iloc[:nrows_after_skip, :] | |
| # Columns window | |
| df = df.iloc[:, skipcols:] | |
| if last_col and last_col > 0: | |
| ncols_after_skip = max(last_col - skipcols, 0) | |
| df = df.iloc[:, :ncols_after_skip] | |
| df.columns = [str(c) for c in df.columns] | |
| return [df] | |
| def prepend_header_as_row(df: pd.DataFrame) -> pd.DataFrame: | |
| cols = [str(c) for c in df.columns] | |
| header_row = pd.DataFrame([cols], columns=cols) | |
| return pd.concat([header_row, df.reset_index(drop=True)], ignore_index=True) | |
| def normalize_and_concat(dfs, fill_value=""): | |
| """Prepend header rows, pad to the widest table, rename columns to col_1.., then concat.""" | |
| if not dfs: | |
| return pd.DataFrame() | |
| dfs = [prepend_header_as_row(df) for df in dfs] | |
| max_cols = max(df.shape[1] for df in dfs) | |
| norm = [] | |
| for df in dfs: | |
| df2 = df.copy() | |
| df2.columns = [str(c) for c in df2.columns] | |
| # pad or trim | |
| if df2.shape[1] < max_cols: | |
| for k in range(df2.shape[1], max_cols): | |
| df2[f"__pad_{k+1}"] = fill_value | |
| elif df2.shape[1] > max_cols: | |
| df2 = df2.iloc[:, :max_cols] | |
| df2.columns = [f"col_{i+1}" for i in range(max_cols)] | |
| norm.append(df2.reset_index(drop=True)) | |
| out = pd.concat(norm, ignore_index=True) | |
| # add stable row ids for deletion | |
| out["_rid"] = range(len(out)) | |
| return out | |
| def apply_header_row(df: pd.DataFrame, header_idx: int, ensure_unique: bool = False): | |
| """ | |
| Promote the row at header_idx to be the header *as-is* (no lowercasing, no regex). | |
| Returns (df_with_header, header_vals). | |
| If ensure_unique=True, only then suffix duplicates with _2, _3, ...; | |
| otherwise duplicate column names are allowed (pandas can handle them, but be careful). | |
| """ | |
| # Preserve internal id if present | |
| has_rid = "_rid" in df.columns | |
| body_cols = [c for c in df.columns if c != "_rid"] | |
| # Get raw header values exactly as the user sees them | |
| header_vals = df.loc[header_idx, body_cols].tolist() | |
| # Optionally enforce unique column names without altering originals unless needed | |
| if ensure_unique: | |
| seen = {} | |
| uniq = [] | |
| for h in header_vals: | |
| h = "" if h is None else str(h) | |
| if h in seen: | |
| seen[h] += 1 | |
| uniq.append(f"{h}_{seen[h]}") | |
| else: | |
| seen[h] = 1 | |
| uniq.append(h) | |
| header_out = uniq | |
| else: | |
| header_out = header_vals | |
| # Drop the header row from the data (donβt transform any cell values) | |
| df2 = df.drop(index=header_idx).reset_index(drop=True) | |
| # Reorder columns so body columns are first, then _rid (if present) | |
| ordered_cols = body_cols + (["_rid"] if has_rid else []) | |
| df2 = df2[ordered_cols] | |
| # Set columns exactly as chosen header row (plus _rid if present) | |
| df2.columns = header_out + (["_rid"] if has_rid else []) | |
| return df2, header_vals # header_vals are the raw originals | |
| def is_header_like(row_vals, header_vals, min_ratio=90): | |
| sims = [] | |
| for a, b in zip(row_vals, header_vals): | |
| a, b = str(a or "").strip(), str(b or "").strip() | |
| sims.append(fuzz.token_set_ratio(a, b) if (a or b) else 100) | |
| return (sum(sims) / max(len(sims), 1)) >= min_ratio | |
| def drop_header_like_rows(df: pd.DataFrame, header_vals, min_ratio=90): | |
| body_cols = [c for c in df.columns if c != "_rid"] | |
| keep = [] | |
| for _, row in df.iterrows(): | |
| if not is_header_like([row[c] for c in body_cols], header_vals, min_ratio): | |
| keep.append(True) | |
| else: | |
| keep.append(False) | |
| out = df.loc[keep].reset_index(drop=True) | |
| out["_rid"] = range(len(out)) | |
| return out | |
| def parse_pdf_to_all_dfs(pdf_bytes: bytes): | |
| """Parse a PDF bytes object with Camelot, return list[pd.DataFrame].""" | |
| # write to temp file for Camelot | |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: | |
| tmp.write(pdf_bytes) | |
| tmp_path = tmp.name | |
| all_dfs = [] | |
| try: | |
| # 1) Try lattice (works best with ruled tables) | |
| tables = camelot.read_pdf(tmp_path, pages="all", flavor="lattice") | |
| if len(tables) == 0: | |
| # 2) fallback to stream (works for borderless tables) | |
| tables = camelot.read_pdf(tmp_path, pages="all", flavor="stream") | |
| for t in tables: | |
| df = t.df | |
| # Your logic: promote first row to header, drop that row | |
| if df.shape[0] > 0: | |
| df.columns = df.iloc[0] | |
| df = df.drop(0).reset_index(drop=True) | |
| # Standardize column names to strings | |
| df.columns = [str(c) for c in df.columns] | |
| all_dfs.append(df) | |
| finally: | |
| try: | |
| os.remove(tmp_path) | |
| except Exception: | |
| pass | |
| return all_dfs | |
| # ---------------- Sidebar: Upload & Parse ---------------- | |
| with st.sidebar: | |
| st.divider() | |
| st.caption(f"App {APP_VERSION}") | |
| st.header("1) Upload file") | |
| upl = st.file_uploader("Choose a PDF / Excel / CSV", type=["pdf", "xlsx", "xls", "csv"]) | |
| filetype = None | |
| if upl is not None: | |
| name = upl.name.lower() | |
| if name.endswith(".pdf"): | |
| filetype = "pdf" | |
| elif name.endswith(".xlsx") or name.endswith(".xls"): | |
| filetype = "excel" | |
| elif name.endswith(".csv"): | |
| filetype = "csv" | |
| # Common options for tabular files (Excel/CSV) | |
| first_row_is_header = st.checkbox("First row contains headers", value=True, help="For Excel/CSV only") | |
| # Excel/CSV region cropping | |
| skiprows = st.number_input("Skip N rows (top)", min_value=0, value=0, step=1) | |
| skipcols = st.number_input("Skip N columns (left)", min_value=0, value=0, step=1) | |
| last_row = st.number_input("Last data row (1-based, 0 = until end)", min_value=0, value=0, step=1) | |
| last_col = st.number_input("Last data column (1-based, 0 = until end)", min_value=0, value=0, step=1) | |
| # Excel sheet selection UI (shown only when an Excel is uploaded) | |
| selected_sheet = None | |
| parse_all_sheets = False | |
| excel_sheet_names = [] | |
| if filetype == "excel" and upl is not None: | |
| # Peek the workbook to list sheets | |
| _, excel_sheet_names = parse_excel_to_all_dfs(upl.read(), sheet=None, first_row_is_header=first_row_is_header) | |
| # re-read as the previous call consumed the stream | |
| upl.seek(0) | |
| if len(excel_sheet_names) > 1: | |
| mode = st.radio("Sheet mode", ["Select one sheet", "Parse all sheets"], index=0, horizontal=True) | |
| if mode == "Parse all sheets": | |
| parse_all_sheets = True | |
| else: | |
| selected_sheet = st.selectbox("Select sheet", excel_sheet_names, index=0) | |
| else: | |
| st.caption(f"Sheet: {excel_sheet_names[0]}") | |
| selected_sheet = excel_sheet_names[0] | |
| run_parse = st.button("Parse file") | |
| if "concat_df" not in st.session_state: | |
| st.session_state.concat_df = pd.DataFrame() | |
| if run_parse: | |
| if not upl: | |
| st.warning("Please upload a file first.") | |
| else: | |
| file_bytes = upl.read() | |
| all_dfs = [] | |
| if filetype == "pdf": | |
| # existing PDF β Camelot code path you already have | |
| all_dfs = parse_pdf_to_all_dfs(file_bytes) | |
| elif filetype == "excel": | |
| if parse_all_sheets: | |
| all_dfs, _ = parse_excel_to_all_dfs( | |
| file_bytes, | |
| sheet=None, | |
| first_row_is_header=first_row_is_header, | |
| skiprows=skiprows, | |
| skipcols=skipcols, | |
| last_row=last_row, | |
| last_col=last_col, | |
| ) | |
| else: | |
| # If workbook has only one sheet, selected_sheet is set above | |
| all_dfs, _ = parse_excel_to_all_dfs( | |
| file_bytes, | |
| sheet=selected_sheet, | |
| first_row_is_header=first_row_is_header, | |
| skiprows=skiprows, | |
| skipcols=skipcols, | |
| last_row=last_row, | |
| last_col=last_col, | |
| ) | |
| elif filetype == "csv": | |
| all_dfs = parse_csv_to_all_dfs( | |
| file_bytes, | |
| first_row_is_header=first_row_is_header, | |
| sep=",", | |
| skiprows=skiprows, | |
| skipcols=skipcols, | |
| last_row=last_row, | |
| last_col=last_col, | |
| ) | |
| else: | |
| st.error("Unsupported file type.") | |
| all_dfs = [] | |
| if not all_dfs: | |
| st.error("No tables detected or file is empty.") | |
| else: | |
| st.success(f"Parsed {len(all_dfs)} table(s).") | |
| concat_df = normalize_and_concat(all_dfs) # uses your existing function | |
| st.session_state.concat_df = concat_df | |
| # ---------------- 2) Editable Grid ---------------- | |
| st.subheader("2) Edit merged rows (Ag-Grid)") | |
| if st.session_state.concat_df.empty: | |
| st.info("Upload and parse a PDF to begin. The merged grid will appear here.") | |
| else: | |
| # Work on a copy so we can add a delete flag without mutating the original yet | |
| df = st.session_state.concat_df.copy() | |
| # β Ensure a boolean "delete" column exists (users tick this to mark rows for removal) | |
| if "delete" not in df.columns: | |
| df["delete"] = False | |
| # β Build grid (no row selection needed) | |
| gb = GridOptionsBuilder.from_dataframe(df) | |
| gb.configure_default_column(editable=True, resizable=True) | |
| gb.configure_column("_rid", hide=True) | |
| gb.configure_column("delete", header_name="π Delete?", editable=True) | |
| grid_options = gb.build() | |
| # β Render editable grid and capture edits | |
| grid_resp = AgGrid( | |
| df, | |
| gridOptions=grid_options, | |
| update_mode=GridUpdateMode.MODEL_CHANGED, # edits flow back on change | |
| fit_columns_on_grid_load=True, | |
| height=420, | |
| enable_enterprise_modules=False, | |
| ) | |
| edited_df = pd.DataFrame(grid_resp["data"]) | |
| # Persist all edits (including delete ticks) to session state | |
| st.session_state.concat_df = edited_df | |
| # β Delete rows that are checked | |
| colA, colB = st.columns([1, 1]) | |
| with colA: | |
| to_delete = edited_df.loc[edited_df.get("delete", False) == True, "_rid"].tolist() | |
| st.caption(f"Checked for deletion: {len(to_delete)} row(s)") | |
| if st.button("Delete checked rows", type="primary", disabled=(len(to_delete) == 0)): | |
| kept = edited_df[~edited_df["_rid"].isin(to_delete)].drop(columns=["delete"], errors="ignore").reset_index(drop=True) | |
| kept["_rid"] = range(len(kept)) | |
| st.session_state.concat_df = kept | |
| st.success(f"Deleted {len(to_delete)} row(s).") | |
| with colB: | |
| # π Refresh button β forces a rerun | |
| if st.button("π Refresh table"): | |
| try: | |
| st.rerun() # Streamlit β₯1.30 | |
| except Exception: | |
| st.experimental_rerun() # fallback for older versions | |
| # ---------------- 3) Pick Header + Clean ---------------- | |
| st.subheader("3) Pick header row & remove header-like duplicates") | |
| if st.session_state.concat_df.empty: | |
| st.info("Header tools will show after parsing a PDF.") | |
| else: | |
| df = st.session_state.concat_df | |
| header_idx = st.number_input("Header row index (0-based)", min_value=0, max_value=len(df)-1, value=0, step=1) | |
| if st.button("Apply header"): | |
| df_with_header, header_vals = apply_header_row(df, int(header_idx)) | |
| st.success("Header applied.") | |
| st.dataframe(df_with_header.head(15), use_container_width=True) | |
| st.write("Remove rows similar to header:") | |
| min_ratio = st.slider("Similarity threshold", 70, 100, 90, 1) | |
| cleaned = drop_header_like_rows(df_with_header, header_vals, min_ratio=min_ratio) | |
| st.caption(f"Rows after cleaning: {len(cleaned)}") | |
| st.dataframe(cleaned.head(60), use_container_width=True) | |
| print(cleaned) | |
| st.download_button( | |
| "Download cleaned CSV", | |
| data=cleaned.drop(columns=["_rid"]).to_csv(index=False, encoding="utf-8-sig"), | |
| file_name="cleaned_tables.csv", | |
| mime="text/csv", | |
| ) | |
| st.caption("Tip: Camelot works best on digital PDFs. For scanned PDFs, consider OCR then table detection.") | |