Spaces:
Sleeping
Sleeping
| """ | |
| Excel/CSV → DuckDB ingestion (generic, robust, multi-table, unified lineage) | |
| - Supports Excel (.xlsx/.xlsm/.xls) and CSV (first row = headers) | |
| - Hierarchical headers with merged-cell parent context (titles removed) | |
| - Merged rows/cols resolved to master (top-left) value for consistent replication | |
| - Multiple tables detected ONLY when separated by at least one completely empty row | |
| - Footer detection (ignore trailing notes/summaries) | |
| - Pivot detection (skip pivot-looking rows; optional sheet-level pivot/charthood skip) | |
| - Optional LLM inference for unnamed columns and table titles (EXCEL_LLM_INFER=1) | |
| - One DuckDB table per detected table block (Excel) or per file (CSV) | |
| - Unified lineage tables for BOTH Excel and CSV: | |
| __file_schema (file_name, sheet_name, table_name, column_ordinal, original_name, sql_column) | |
| __file_tables (file_name, sheet_name, table_name, block_index, start_row, end_row, | |
| header_rows_json, inferred_title, original_title_text) | |
| Usage: | |
| python source_to_duckdb.py --file /path/file.xlsx --duckdb /path/out.duckdb | |
| python source_to_duckdb.py --file /path/file.csv --duckdb /path/out.duckdb | |
| """ | |
| import os | |
| import re | |
| import sys | |
| import json | |
| import hashlib | |
| from pathlib import Path | |
| from typing import List, Tuple, Dict | |
| from openpyxl import load_workbook | |
| from openpyxl.worksheet.worksheet import Worksheet | |
| # ------------------------- Small utilities ------------------------- | |
| def _nonempty(vals): | |
| return [v for v in vals if v not in (None, "")] | |
| def _is_numlike(x): | |
| if isinstance(x, (int, float)): | |
| return True | |
| s = str(x).strip().replace(",", "") | |
| if s.endswith("%"): | |
| s = s[:-1] | |
| if not s: | |
| return False | |
| if any(c.isalpha() for c in s): | |
| return False | |
| try: | |
| float(s); return True | |
| except: return False | |
| def _is_year_token(x): | |
| if isinstance(x, int) and 1800 <= x <= 2100: return True | |
| s = str(x).strip() | |
| return s.isdigit() and 1800 <= int(s) <= 2100 | |
| def sanitize_table_name(name: str) -> str: | |
| t = re.sub(r"[^\w]", "_", str(name)) | |
| t = re.sub(r"_+", "_", t).strip("_") | |
| if t and not t[0].isalpha(): t = "table_" + t | |
| return t or "sheet_data" | |
| def clean_col_name(s: str) -> str: | |
| s = re.sub(r"[^\w\s%#‰]", "", str(s).strip()) | |
| s = s.replace("%"," pct").replace("‰"," permille").replace("#"," count ") | |
| s = re.sub(r"\s+"," ", s) | |
| s = re.sub(r"\s+","_", s) | |
| s = re.sub(r"_+","_", s).strip("_") | |
| if s and s[0].isdigit(): s = "col_" + s | |
| return s or "unnamed_column" | |
| def ensure_unique(names): | |
| seen = {}; out = [] | |
| for n in names: | |
| base = (n or "unnamed_column").lower() | |
| if base not in seen: | |
| seen[base] = 0; out.append(n) | |
| else: | |
| i = seen[base] + 1 | |
| while f"{n}_{i}".lower() in seen: i += 1 | |
| seen[base] = i; out.append(f"{n}_{i}") | |
| seen[(f"{n}_{i}").lower()] = 0 | |
| return out | |
| def compose_col(parts): | |
| cleaned = []; prev = None | |
| for p in parts: | |
| if not p: continue | |
| p_norm = str(p).strip() | |
| if prev is not None and p_norm.lower() == prev.lower(): continue | |
| cleaned.append(p_norm); prev = p_norm | |
| if not cleaned: return "" | |
| return clean_col_name("_".join(cleaned)) | |
| # ------------------------- Heuristics & detection ------------------------- | |
| def is_probably_footer(cells): | |
| nonempty = [(i, v) for i, v in enumerate(cells) if v not in (None, "")] | |
| if not nonempty: return False | |
| if len(nonempty) <= 2: | |
| text = " ".join(str(v) for _, v in nonempty).strip().lower() | |
| if any(text.startswith(k) for k in ["note","notes","source","summary","disclaimer"]): return True | |
| if len(text) > 50: return True | |
| return False | |
| def is_probably_data(cells, num_cols): | |
| vals = [v for v in cells if v not in (None, "")] | |
| if not vals: return False | |
| nums_list = [v for v in vals if _is_numlike(v)] | |
| num_num = len(nums_list); num_text = len(vals) - num_num | |
| density = len(vals) / max(1, num_cols) | |
| if num_num >= 2 and all(_is_year_token(v) for v in nums_list) and num_text >= 2: | |
| return False | |
| if num_num >= max(2, num_text): return True | |
| if density >= 0.6 and num_num >= 2: return True | |
| first = str(vals[0]).strip().lower() if vals else "" | |
| if first in ("total","totals","grand total"): return True | |
| return False | |
| PIVOT_MARKERS = {"row labels","column labels","values","grand total","report filter","filters","∑ values","σ values","Σ values"} | |
| def is_pivot_marker_string(s: str) -> bool: | |
| if not s: return False | |
| t = str(s).strip().lower() | |
| if t in PIVOT_MARKERS: return True | |
| if t.startswith(("sum of ","count of ","avg of ","average of ")): return True | |
| if t.endswith(" total") or t.startswith("total "): return True | |
| return False | |
| def is_pivot_row(cells) -> bool: | |
| text_cells = [str(v).strip() for v in cells if v not in (None, "")] | |
| if not text_cells: return False | |
| if any(is_pivot_marker_string(x) for x in text_cells): return True | |
| agg_hits = sum(1 for x in text_cells if x.lower().startswith(("sum of","count of","avg of","average of","min of","max of"))) | |
| return agg_hits >= 2 | |
| def is_pivot_or_chart_sheet(ws: Worksheet) -> bool: | |
| try: | |
| if getattr(ws, "_charts", None): return True | |
| except Exception: pass | |
| if hasattr(ws, "_pivots") and getattr(ws, "_pivots"): return True | |
| scan_rows = min(ws.max_row, 40); scan_cols = min(ws.max_column, 20) | |
| pivotish = 0 | |
| for r in range(1, scan_rows+1): | |
| row = [ws.cell(r,c).value for c in range(1, scan_cols+1)] | |
| if is_pivot_row(row): | |
| pivotish += 1 | |
| if pivotish >= 2: return True | |
| name = (ws.title or "").lower() | |
| if any(k in name for k in ("pivot","dashboard","chart","charts")): return True | |
| return False | |
| def _samples_for_column(rows, col_idx, max_items=20): | |
| vals = [] | |
| for row in rows: | |
| if col_idx < len(row): | |
| v = row[col_idx] | |
| if v not in (None, ""): vals.append(v) | |
| if len(vals) >= max_items: break | |
| return vals | |
| def _heuristic_infer_col_name(samples): | |
| if not samples: return None | |
| if sum(1 for v in samples if _is_year_token(v)) >= max(2, int(0.8*len(samples))): return "year" | |
| pct_hits = 0 | |
| for v in samples: | |
| s = str(v).strip() | |
| if s.endswith("%"): pct_hits += 1 | |
| else: | |
| try: | |
| f = float(s.replace(",","")) | |
| if 0 <= f <= 1.0 or 0 <= f <= 100: pct_hits += 0.5 | |
| except: pass | |
| if pct_hits >= max(2, int(0.7*len(samples))): return "percentage" | |
| if sum(1 for v in samples if _is_numlike(v)) >= max(3, int(0.7*len(samples))): | |
| intish = 0 | |
| for v in samples: | |
| try: | |
| if float(str(v).replace(",","")) == int(float(str(v).replace(",",""))): intish += 1 | |
| except: pass | |
| if intish >= max(2, int(0.6*len(samples))): return "count" | |
| return "value" | |
| uniq = {str(v).strip().lower() for v in samples} | |
| if len(uniq) <= 3 and max(len(str(v)) for v in samples) >= 30: return "question" | |
| if sum(1 for v in samples if re.search(r"\d", str(v)) and ("-" in str(v) or "–" in str(v))) >= max(2, int(0.6*len(samples))): return "range" | |
| if len(uniq) < max(5, int(0.5*len(samples))): return "category" | |
| return None | |
| def used_bounds(ws: Worksheet) -> Tuple[int,int,int,int]: | |
| min_row, max_row, min_col, max_col = None, 0, None, 0 | |
| for r in ws.iter_rows(): | |
| for c in r: | |
| v = c.value | |
| if v is not None and str(v).strip() != "": | |
| if min_row is None or c.row < min_row: min_row = c.row | |
| if c.row > max_row: max_row = c.row | |
| if min_col is None or c.column < min_col: min_col = c.column | |
| if c.column > max_col: max_col = c.column | |
| if min_row is None: return 1,0,1,0 | |
| return min_row, max_row, min_col, max_col | |
| def build_merged_master_map(ws: Worksheet): | |
| mapping = {} | |
| for mr in ws.merged_cells.ranges: | |
| min_col, min_row, max_col, max_row = mr.min_col, mr.min_row, mr.max_col, mr.max_row | |
| master = (min_row, min_col) | |
| for r in range(min_row, max_row+1): | |
| for c in range(min_col, max_col+1): | |
| mapping[(r,c)] = master | |
| return mapping | |
| def build_value_grid(ws: Worksheet, min_row: int, max_row: int, min_col: int, max_col: int): | |
| merged_map = build_merged_master_map(ws) | |
| nrows = max_row - min_row + 1; ncols = max_col - min_col + 1 | |
| grid = [[None]*ncols for _ in range(nrows)] | |
| for r in range(min_row, max_row+1): | |
| rr = r - min_row | |
| for c in range(min_col, max_col+1): | |
| cc = c - min_col | |
| master = merged_map.get((r,c)) | |
| if master: | |
| mr, mc = master; grid[rr][cc] = ws.cell(mr, mc).value | |
| else: | |
| grid[rr][cc] = ws.cell(r, c).value | |
| return grid | |
| def row_vals_from_grid(grid, r, min_row): | |
| return grid[r - min_row] | |
| def is_empty_row_vals(vals): | |
| return not any(v not in (None, "") for v in vals) | |
| def is_title_like_row_vals(vals, total_cols=20): | |
| vals_ne = _nonempty(vals) | |
| if not vals_ne: return False | |
| if len(vals_ne) == 1: return True | |
| coverage = len(vals_ne) / max(1, total_cols) | |
| if coverage <= 0.2 and all(isinstance(v,str) and len(str(v))>20 for v in vals_ne): return True | |
| uniq = {str(v).strip().lower() for v in vals_ne} | |
| if len(uniq) == 1: return True | |
| block = {"local currency unit per us dollar","exchange rate","average annual exchange rate"} | |
| if any(str(v).strip().lower() in block for v in vals_ne): return True | |
| return False | |
| def is_header_candidate_row_vals(vals, total_cols=20): | |
| vals_ne = _nonempty(vals) | |
| if not vals_ne: return False | |
| if is_title_like_row_vals(vals, total_cols): return False | |
| nums = sum(1 for v in vals_ne if _is_numlike(v)) | |
| years = sum(1 for v in vals_ne if _is_year_token(v)) | |
| has_text = any(not _is_numlike(v) for v in vals_ne) | |
| if years >= 2 and has_text: return True | |
| if nums >= max(2, len(vals_ne)-nums): return years >= max(2, int(0.6*len(vals_ne))) | |
| uniq_labels = {str(v).strip().lower() for v in vals_ne if not _is_numlike(v)} | |
| return (len(vals_ne) >= 2) or (len(uniq_labels) >= 2) | |
| def detect_tables_fast(ws: Worksheet, grid, min_row, max_row, min_col, max_col): | |
| blocks = [] | |
| if is_pivot_or_chart_sheet(ws): return blocks | |
| total_cols = max_col - min_col + 1 | |
| r = min_row | |
| while r <= max_row: | |
| vals = row_vals_from_grid(grid, r, min_row) | |
| if is_empty_row_vals(vals) or is_title_like_row_vals(vals, total_cols) or is_pivot_row(vals): | |
| r += 1; continue | |
| if not is_probably_data(vals, total_cols): | |
| r += 1; continue | |
| data_start = r | |
| header_rows = [] | |
| up = data_start - 1 | |
| while up >= min_row: | |
| vup = row_vals_from_grid(grid, up, min_row) | |
| if is_empty_row_vals(vup): break | |
| if is_title_like_row_vals(vup, total_cols) or is_pivot_row(vup): | |
| up -= 1; continue | |
| if is_header_candidate_row_vals(vup, total_cols): | |
| header_rows = [] | |
| hdr_row = up | |
| while hdr_row >= min_row: | |
| hdr_vals = row_vals_from_grid(grid, hdr_row, min_row) | |
| if is_empty_row_vals(hdr_vals): break | |
| if is_header_candidate_row_vals(hdr_vals, total_cols): | |
| header_rows.insert(0, hdr_row); hdr_row -= 1 | |
| else: break | |
| break | |
| data_end = data_start | |
| rr = data_start + 1 | |
| while rr <= max_row: | |
| v = row_vals_from_grid(grid, rr, min_row) | |
| if is_probably_footer(v) or is_pivot_row(v): break | |
| if is_empty_row_vals(v): break | |
| if is_probably_data(v, total_cols) or is_header_candidate_row_vals(v, total_cols): | |
| data_end = rr | |
| rr += 1 | |
| title_text = None | |
| if header_rows: | |
| top = header_rows[0] | |
| for tr in range(max(min_row, top-3), top): | |
| tv = row_vals_from_grid(grid, tr, min_row) | |
| if is_title_like_row_vals(tv, total_cols): | |
| first = next((str(x).strip() for x in tv if x not in (None,"")), None) | |
| if first: title_text = first | |
| break | |
| if (header_rows or data_end - data_start >= 1) and data_start <= data_end: | |
| blocks.append({"header_rows": header_rows, "data_start": data_start, "data_end": data_end, "title_text": title_text}) | |
| r = data_end + 1 | |
| while r <= max_row and is_empty_row_vals(row_vals_from_grid(grid, r, min_row)): | |
| r += 1 | |
| return blocks | |
| def expand_headers_from_grid(grid, header_rows, min_row, min_col, eff_max_col): | |
| if not header_rows: return [] | |
| mat = [] | |
| for r in header_rows: | |
| row_vals = row_vals_from_grid(grid, r, min_row) | |
| row = [("" if (row_vals[c] is None) else str(row_vals[c]).strip()) for c in range(0, eff_max_col)] | |
| last = "" | |
| for i in range(len(row)): | |
| if row[i] == "" and i > 0: row[i] = last | |
| else: last = row[i] | |
| mat.append(row) | |
| return mat | |
| def sheet_block_to_df_fast(ws, grid, min_row, max_row, min_col, max_col, header_rows, data_start, data_end): | |
| import pandas as pd | |
| total_cols = max_col - min_col + 1 | |
| if (not header_rows) and data_start and data_start > min_row: | |
| prev = row_vals_from_grid(grid, data_start - 1, min_row) | |
| if is_header_candidate_row_vals(prev, total_cols): | |
| header_rows = [data_start - 1] | |
| if (not header_rows) and data_start: | |
| cur = row_vals_from_grid(grid, data_start, min_row) | |
| nxt = row_vals_from_grid(grid, data_start + 1, min_row) if data_start + 1 <= max_row else [] | |
| if is_header_candidate_row_vals(cur, total_cols) and is_probably_data(nxt, total_cols): | |
| header_rows = [data_start]; data_start += 1 | |
| if not header_rows or data_start is None or data_end is None or data_end < data_start: | |
| import pandas as _pd | |
| return _pd.DataFrame(), [], [] | |
| def used_upto_col(): | |
| maxc = 0 | |
| for r in list(header_rows) + list(range(data_start, data_end+1)): | |
| vals = row_vals_from_grid(grid, r, min_row) | |
| for c_off in range(total_cols): | |
| v = vals[c_off] | |
| if v not in (None, ""): maxc = max(maxc, c_off+1) | |
| return maxc or total_cols | |
| eff_max_col = used_upto_col() | |
| header_mat = expand_headers_from_grid(grid, header_rows, min_row, min_col, eff_max_col) | |
| def is_title_level(values): | |
| total = len(values) | |
| filled = [str(v).strip() for v in values if v not in (None, "")] | |
| if total == 0: return False | |
| coverage = len(filled) / total | |
| if coverage <= 0.2 and len(filled) <= 2: return True | |
| if filled: | |
| uniq = {v.lower() for v in filled} | |
| if len(uniq) == 1: | |
| label = next(iter(uniq)) | |
| dom = sum(1 for v in values if isinstance(v,str) and v.strip().lower() == label) | |
| if dom / total >= 0.6: return True | |
| return False | |
| usable_levels = [i for i in range(len(header_mat)) if not is_title_level(header_mat[i])] | |
| if not usable_levels and header_mat: usable_levels = [len(header_mat) - 1] | |
| cols = [] | |
| for c_off in range(eff_max_col): | |
| parts = [header_mat[l][c_off] for l in range(usable_levels[0], usable_levels[-1]+1)] if usable_levels else [] | |
| cols.append(compose_col(parts)) | |
| cols = ensure_unique([clean_col_name(x) for x in cols]) | |
| data_rows = [] | |
| for r in range(data_start, data_end+1): | |
| vals = row_vals_from_grid(grid, r, min_row) | |
| row = [vals[c_off] for c_off in range(eff_max_col)] | |
| if is_probably_footer(row): break | |
| data_rows.append(row[:len(cols)]) | |
| if not data_rows: | |
| import pandas as _pd | |
| return _pd.DataFrame(columns=cols), header_mat, cols | |
| keep_mask = [any(row[i] not in (None, "") for row in data_rows) for i in range(len(cols))] | |
| kept_cols = [c for c,k in zip(cols, keep_mask) if k] | |
| trimmed_rows = [[v for v,k in zip(row, keep_mask) if k] for row in data_rows] | |
| import pandas as pd | |
| df = pd.DataFrame(trimmed_rows, columns=kept_cols) | |
| if any(str(c).startswith("unnamed_column") for c in df.columns): | |
| new_names = list(df.columns) | |
| for idx, name in enumerate(list(df.columns)): | |
| if not str(name).startswith("unnamed_column"): continue | |
| samples = _samples_for_column(trimmed_rows, idx, max_items=20) | |
| guess = _heuristic_infer_col_name(samples) | |
| if guess: new_names[idx] = clean_col_name(guess) | |
| df.columns = ensure_unique([clean_col_name(x) for x in new_names]) | |
| return df, header_mat, kept_cols | |
| # ------------------------- Optional LLM title inference ------------------------- | |
| def _llm_infer_table_title(header_mat, sample_rows, sheet_name): | |
| if os.environ.get("EXCEL_LLM_INFER","0") != "1": return None | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: return None | |
| headers = [] | |
| if header_mat: | |
| for c in range(len(header_mat[0])): | |
| parts = [header_mat[l][c] for l in range(len(header_mat))] | |
| parts = [p for p in parts if p] | |
| if parts: headers.append("_".join(parts)) | |
| headers = headers[:10] | |
| samples = [[str(x) for x in r[:6]] for r in sample_rows[:5]] | |
| prompt = ( | |
| "Propose a short, human-readable title for a data table.\n" | |
| "Keep it 3-6 words, Title Case, no punctuation at the end.\n" | |
| f"Sheet: {sheet_name}\nHeaders: {headers}\nRow samples: {samples}\n" | |
| "Answer with JSON: {\"title\": \"...\"}" | |
| ) | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI(api_key=api_key) | |
| resp = client.chat.completions.create( | |
| model=os.environ.get("OPENAI_MODEL","gpt-4o-mini"), | |
| messages=[{"role":"user","content":prompt}], temperature=0.2, | |
| ) | |
| text = resp.choices[0].message.content.strip() | |
| except Exception: | |
| return None | |
| import re as _re, json as _json | |
| m = _re.search(r"\{.*\}", text, re.S) | |
| if not m: return None | |
| try: | |
| obj = _json.loads(m.group(0)); title = obj.get("title","").strip() | |
| return title or None | |
| except Exception: return None | |
| def _heuristic_table_title(header_mat, sheet_name, idx): | |
| if header_mat: | |
| parts = [] | |
| levels = len(header_mat) | |
| cols = len(header_mat[0]) if header_mat else 0 | |
| for c in range(min(6, cols)): | |
| colparts = [header_mat[l][c] for l in range(min(levels, 2)) if header_mat[l][c]] | |
| if colparts: parts.extend(colparts) | |
| if parts: | |
| base = " ".join(dict.fromkeys(parts)) | |
| return base[:60] | |
| return f"{sheet_name} Table {idx}" | |
| def infer_table_title(header_mat, sample_rows, sheet_name, idx): | |
| title = _heuristic_table_title(header_mat, sheet_name, idx) | |
| llm = _llm_infer_table_title(header_mat, sample_rows, sheet_name) | |
| return llm or title | |
| # ------------------------- Unified lineage helpers ------------------------- | |
| FILE_SCHEMA_TABLE = "__file_schema" | |
| FILE_TABLES_TABLE = "__file_tables" | |
| def ensure_lineage_tables(con): | |
| con.execute(f""" | |
| CREATE TABLE IF NOT EXISTS {FILE_SCHEMA_TABLE} ( | |
| file_name TEXT, | |
| sheet_name TEXT, | |
| table_name TEXT, | |
| column_ordinal INTEGER, | |
| original_name TEXT, | |
| sql_column TEXT | |
| ) | |
| """) | |
| con.execute(f""" | |
| CREATE TABLE IF NOT EXISTS {FILE_TABLES_TABLE} ( | |
| file_name TEXT, | |
| sheet_name TEXT, | |
| table_name TEXT, | |
| block_index INTEGER, | |
| start_row INTEGER, | |
| end_row INTEGER, | |
| header_rows_json TEXT, | |
| inferred_title TEXT, | |
| original_title_text TEXT | |
| ) | |
| """) | |
| def record_table_schema(con, file_name, sheet_name, table_name, columns): | |
| """ | |
| columns: list of tuples (column_ordinal, original_name, sql_column) | |
| """ | |
| ensure_lineage_tables(con) | |
| # DuckDB doesn't support `IS ?` with NULL; branch the delete | |
| if sheet_name is None: | |
| con.execute( | |
| f"DELETE FROM {FILE_SCHEMA_TABLE} WHERE file_name = ? AND sheet_name IS NULL AND table_name = ?", | |
| [file_name, table_name], | |
| ) | |
| else: | |
| con.execute( | |
| f"DELETE FROM {FILE_SCHEMA_TABLE} WHERE file_name = ? AND sheet_name = ? AND table_name = ?", | |
| [file_name, sheet_name, table_name], | |
| ) | |
| con.executemany( | |
| f"INSERT INTO {FILE_SCHEMA_TABLE} (file_name, sheet_name, table_name, column_ordinal, original_name, sql_column) VALUES (?, ?, ?, ?, ?, ?)", | |
| [(file_name, sheet_name, table_name, i, orig, sql) for (i, orig, sql) in columns], | |
| ) | |
| def record_table_block(con, file_name, sheet_name, table_name, block_index, start_row, end_row, header_rows_json, inferred_title, original_title_text): | |
| ensure_lineage_tables(con) | |
| # DuckDB doesn't support `IS ?` with NULL; branch the delete | |
| if sheet_name is None: | |
| con.execute( | |
| f"DELETE FROM {FILE_TABLES_TABLE} WHERE file_name = ? AND sheet_name IS NULL AND table_name = ? AND block_index = ?", | |
| [file_name, table_name, int(block_index) if block_index is not None else 0], | |
| ) | |
| else: | |
| con.execute( | |
| f"DELETE FROM {FILE_TABLES_TABLE} WHERE file_name = ? AND sheet_name = ? AND table_name = ? AND block_index = ?", | |
| [file_name, sheet_name, table_name, int(block_index) if block_index is not None else 0], | |
| ) | |
| con.execute( | |
| f"""INSERT INTO {FILE_TABLES_TABLE} | |
| (file_name, sheet_name, table_name, block_index, start_row, end_row, header_rows_json, inferred_title, original_title_text) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", | |
| [ | |
| file_name, sheet_name, table_name, | |
| int(block_index) if block_index is not None else 0, | |
| int(start_row) if start_row is not None else None, | |
| int(end_row) if end_row is not None else None, | |
| header_rows_json, inferred_title, original_title_text | |
| ] | |
| ) | |
| # --- block coalescing to avoid nested/overlapping duplicates --- | |
| def coalesce_blocks(blocks: List[Dict]) -> List[Dict]: | |
| """Keep only maximal non-overlapping blocks by data row range.""" | |
| if not blocks: return blocks | |
| blocks_sorted = sorted(blocks, key=lambda b: (b["data_start"], b["data_end"])) | |
| result = [] | |
| for b in blocks_sorted: | |
| if any(b["data_start"] >= x["data_start"] and b["data_end"] <= x["data_end"] for x in result): | |
| continue # fully contained -> drop | |
| result.append(b) | |
| return result | |
| # ------------------------- Persistence: Excel ------------------------- | |
| def persist(excel_path, duckdb_path): | |
| try: | |
| from duckdb import connect | |
| except ImportError: | |
| print("Error: DuckDB library not installed. Install with: pip install duckdb"); sys.exit(1) | |
| try: | |
| wb = load_workbook(excel_path, data_only=True) | |
| except FileNotFoundError: | |
| print(f"Error: Excel file not found: {excel_path}"); sys.exit(1) | |
| except Exception as e: | |
| print(f"Error loading Excel file: {e}"); sys.exit(1) | |
| file_name = Path(excel_path).name | |
| db_path = Path(duckdb_path) | |
| db_path.parent.mkdir(parents=True, exist_ok=True) | |
| new_db = not db_path.exists() | |
| con = connect(str(db_path)) | |
| if new_db: print(f"Created new DuckDB at: {db_path}") | |
| # Ensure unified lineage tables exist | |
| ensure_lineage_tables(con) | |
| used_names = set(); total_tables = 0; total_rows = 0 | |
| for sheet in wb.sheetnames: | |
| ws = wb[sheet] | |
| try: | |
| if not isinstance(ws, Worksheet): | |
| print(f"Skipping chartsheet: {sheet}"); continue | |
| except Exception: pass | |
| if is_pivot_or_chart_sheet(ws): | |
| print(f"Skipping pivot/chart-like sheet: {sheet}"); continue | |
| min_row, max_row, min_col, max_col = used_bounds(ws) | |
| if max_row < min_row: continue | |
| grid = build_value_grid(ws, min_row, max_row, min_col, max_col) | |
| blocks = detect_tables_fast(ws, grid, min_row, max_row, min_col, max_col) | |
| blocks = coalesce_blocks(blocks) | |
| if not blocks: continue | |
| # per-sheet content hash set to avoid identical duplicate content | |
| seen_content = set() | |
| for idx, blk in enumerate(blocks, start=1): | |
| df, header_mat, kept_cols = sheet_block_to_df_fast( | |
| ws, grid, min_row, max_row, min_col, max_col, | |
| blk["header_rows"], blk["data_start"], blk["data_end"] | |
| ) | |
| if df.empty: continue | |
| # Content hash (stable CSV representation) | |
| csv_bytes = df.to_csv(index=False).encode("utf-8") | |
| h = hashlib.sha256(csv_bytes).hexdigest() | |
| if h in seen_content: | |
| print(f"Skipping duplicate content on sheet {sheet} (block {idx})") | |
| continue | |
| seen_content.add(h) | |
| # Build original composite header names for lineage mapping | |
| original_cols = [] | |
| if header_mat: | |
| levels = len(header_mat) | |
| cols = len(header_mat[0]) if header_mat else 0 | |
| for c in range(cols): | |
| parts = [header_mat[l][c] for l in range(levels)] | |
| original_cols.append("_".join([p for p in parts if p])) | |
| else: | |
| original_cols = list(df.columns) | |
| while len(original_cols) < len(df.columns): original_cols.append("unnamed") | |
| title_orig = blk.get("title_text") | |
| title = title_orig or infer_table_title(header_mat, df.values.tolist(), sheet, idx) | |
| candidate = title if title else f"{sheet} Table {idx}" | |
| table = ensure_unique_table_name(used_names, candidate) | |
| # Create/replace table | |
| con.execute(f'DROP TABLE IF EXISTS "{table}"') | |
| con.register(f"{table}_temp", df) | |
| con.execute(f'CREATE TABLE "{table}" AS SELECT * FROM {table}_temp') | |
| con.unregister(f"{table}_temp") | |
| # Record lineage (schema + block) | |
| schema_rows = [] | |
| for cidx, (orig, sqlc) in enumerate(zip(original_cols[:len(df.columns)], df.columns), start=1): | |
| schema_rows.append((cidx, str(orig), str(sqlc))) | |
| record_table_schema( | |
| con, | |
| file_name=file_name, | |
| sheet_name=sheet, | |
| table_name=table, | |
| columns=schema_rows, | |
| ) | |
| record_table_block( | |
| con, | |
| file_name=file_name, | |
| sheet_name=sheet, | |
| table_name=table, | |
| block_index=idx, | |
| start_row=int(blk["data_start"]), | |
| end_row=int(blk["data_end"]), | |
| header_rows_json=json.dumps(blk["header_rows"]), | |
| inferred_title=title if title else None, | |
| original_title_text=title_orig if title_orig else None, | |
| ) | |
| print(f"Created table {table} from sheet {sheet} with {len(df)} rows and {len(df.columns)} columns.") | |
| total_tables += 1; total_rows += len(df) | |
| con.close() | |
| print(f"""\n✅ Completed. | |
| - Created {total_tables} tables with {total_rows} total rows | |
| - Column lineage: {FILE_SCHEMA_TABLE} | |
| - Block metadata: {FILE_TABLES_TABLE}""") | |
| # ------------------------- Persistence: CSV ------------------------- | |
| def persist_csv(csv_path, duckdb_path): | |
| """ | |
| Ingest a single CSV file into DuckDB AND write lineage, aligned with Excel. | |
| - First row is headers. | |
| - One table named from the CSV file name. | |
| - Cleans headers and ensures uniqueness. | |
| - Records lineage in __file_schema and __file_tables using the unified schema (with file_name). | |
| """ | |
| import pandas as pd | |
| from duckdb import connect | |
| csv_path = Path(csv_path) | |
| if not csv_path.exists(): | |
| print(f"Error: CSV file not found: {csv_path}") | |
| sys.exit(1) | |
| # Keep original header names for lineage before cleaning | |
| try: | |
| df_raw = pd.read_csv(csv_path, header=0, encoding="utf-8-sig") | |
| except UnicodeDecodeError: | |
| df_raw = pd.read_csv(csv_path, header=0) | |
| original_headers = list(df_raw.columns) | |
| # Clean/normalize column names | |
| def _clean_hdr(s): | |
| s = str(s) if s is not None else "" | |
| s = s.strip() | |
| s = re.sub(r"\s+", " ", s) | |
| return clean_col_name(s) | |
| cleaned_cols = ensure_unique([_clean_hdr(c) for c in original_headers]) | |
| df = df_raw.copy() | |
| df.columns = cleaned_cols | |
| # Compute table name from file name | |
| table = sanitize_table_name(csv_path.stem) | |
| # Open / create DuckDB | |
| db_path = Path(duckdb_path) | |
| db_path.parent.mkdir(parents=True, exist_ok=True) | |
| new_db = not db_path.exists() | |
| con = connect(str(db_path)) | |
| if new_db: | |
| print(f"Created new DuckDB at: {db_path}") | |
| # Ensure unified lineage tables (with file_name) exist | |
| ensure_lineage_tables(con) | |
| # Create/replace the data table | |
| con.execute(f'DROP TABLE IF EXISTS "{table}"') | |
| con.register(f"{table}_temp_df", df) | |
| con.execute(f'CREATE TABLE "{table}" AS SELECT * FROM {table}_temp_df') | |
| con.unregister(f"{table}_temp_df") | |
| # Write lineage | |
| file_name = csv_path.name | |
| sheet_name = None # CSV has no sheet | |
| block_index = 1 # single block/table for CSV | |
| start_row = 2 # header is row 1, data starts at 2 | |
| end_row = len(df) + 1 # header + data rows | |
| header_rows_json = "[1]" # header row index list as JSON | |
| inferred_title = None | |
| original_title_text = None | |
| # Map original->sql columns | |
| schema_rows = [] | |
| for i, (orig, sql) in enumerate(zip(original_headers, cleaned_cols), start=1): | |
| schema_rows.append((i, str(orig), str(sql))) | |
| record_table_schema( | |
| con, | |
| file_name=file_name, | |
| sheet_name=sheet_name, | |
| table_name=table, | |
| columns=schema_rows | |
| ) | |
| record_table_block( | |
| con, | |
| file_name=file_name, | |
| sheet_name=sheet_name, | |
| table_name=table, | |
| block_index=block_index, | |
| start_row=start_row, | |
| end_row=end_row, | |
| header_rows_json=header_rows_json, | |
| inferred_title=inferred_title, | |
| original_title_text=original_title_text | |
| ) | |
| print(f'Created table {table} from CSV "{csv_path.name}" with {len(df)} rows and {len(df.columns)} columns.') | |
| con.close() | |
| # ------------------------- CLI ------------------------- | |
| def ensure_unique_table_name(existing: set, name: str) -> str: | |
| base = sanitize_table_name(name) or "table" | |
| if base not in existing: | |
| existing.add(base); return base | |
| i = 2 | |
| while f"{base}_{i}" in existing: i += 1 | |
| out = f"{base}_{i}"; existing.add(out); return out | |
| def main(): | |
| import argparse | |
| ap = argparse.ArgumentParser(description="Excel/CSV → DuckDB (unified --file + lineage).") | |
| ap.add_argument("--file", required=True, help="Path to .xlsx/.xlsm/.xls or .csv") | |
| ap.add_argument("--duckdb", required=True, help="Path to DuckDB file") | |
| args = ap.parse_args() | |
| if not os.path.exists(args.file): | |
| print(f"Error: file not found: {args.file}") | |
| sys.exit(1) | |
| ext = Path(args.file).suffix.lower() | |
| if ext in [".xlsx", ".xlsm", ".xls"]: | |
| persist(args.file, args.duckdb) | |
| elif ext == ".csv": | |
| persist_csv(args.file, args.duckdb) | |
| else: | |
| print("Error: unsupported file type. Use .xlsx/.xlsm/.xls or .csv") | |
| sys.exit(2) | |
| if __name__ == "__main__": | |
| main() | |