""" Excel/CSV → DuckDB ingestion (generic, robust, multi-table, unified lineage) - Supports Excel (.xlsx/.xlsm/.xls) and CSV (first row = headers) - Hierarchical headers with merged-cell parent context (titles removed) - Merged rows/cols resolved to master (top-left) value for consistent replication - Multiple tables detected ONLY when separated by at least one completely empty row - Footer detection (ignore trailing notes/summaries) - Pivot detection (skip pivot-looking rows; optional sheet-level pivot/charthood skip) - Optional LLM inference for unnamed columns and table titles (EXCEL_LLM_INFER=1) - One DuckDB table per detected table block (Excel) or per file (CSV) - Unified lineage tables for BOTH Excel and CSV: __file_schema (file_name, sheet_name, table_name, column_ordinal, original_name, sql_column) __file_tables (file_name, sheet_name, table_name, block_index, start_row, end_row, header_rows_json, inferred_title, original_title_text) Usage: python source_to_duckdb.py --file /path/file.xlsx --duckdb /path/out.duckdb python source_to_duckdb.py --file /path/file.csv --duckdb /path/out.duckdb """ import os import re import sys import json import hashlib from pathlib import Path from typing import List, Tuple, Dict from openpyxl import load_workbook from openpyxl.worksheet.worksheet import Worksheet # ------------------------- Small utilities ------------------------- def _nonempty(vals): return [v for v in vals if v not in (None, "")] def _is_numlike(x): if isinstance(x, (int, float)): return True s = str(x).strip().replace(",", "") if s.endswith("%"): s = s[:-1] if not s: return False if any(c.isalpha() for c in s): return False try: float(s); return True except: return False def _is_year_token(x): if isinstance(x, int) and 1800 <= x <= 2100: return True s = str(x).strip() return s.isdigit() and 1800 <= int(s) <= 2100 def sanitize_table_name(name: str) -> str: t = re.sub(r"[^\w]", "_", str(name)) t = re.sub(r"_+", "_", t).strip("_") if t and not t[0].isalpha(): t = "table_" + t return t or "sheet_data" def clean_col_name(s: str) -> str: s = re.sub(r"[^\w\s%#‰]", "", str(s).strip()) s = s.replace("%"," pct").replace("‰"," permille").replace("#"," count ") s = re.sub(r"\s+"," ", s) s = re.sub(r"\s+","_", s) s = re.sub(r"_+","_", s).strip("_") if s and s[0].isdigit(): s = "col_" + s return s or "unnamed_column" def ensure_unique(names): seen = {}; out = [] for n in names: base = (n or "unnamed_column").lower() if base not in seen: seen[base] = 0; out.append(n) else: i = seen[base] + 1 while f"{n}_{i}".lower() in seen: i += 1 seen[base] = i; out.append(f"{n}_{i}") seen[(f"{n}_{i}").lower()] = 0 return out def compose_col(parts): cleaned = []; prev = None for p in parts: if not p: continue p_norm = str(p).strip() if prev is not None and p_norm.lower() == prev.lower(): continue cleaned.append(p_norm); prev = p_norm if not cleaned: return "" return clean_col_name("_".join(cleaned)) # ------------------------- Heuristics & detection ------------------------- def is_probably_footer(cells): nonempty = [(i, v) for i, v in enumerate(cells) if v not in (None, "")] if not nonempty: return False if len(nonempty) <= 2: text = " ".join(str(v) for _, v in nonempty).strip().lower() if any(text.startswith(k) for k in ["note","notes","source","summary","disclaimer"]): return True if len(text) > 50: return True return False def is_probably_data(cells, num_cols): vals = [v for v in cells if v not in (None, "")] if not vals: return False nums_list = [v for v in vals if _is_numlike(v)] num_num = len(nums_list); num_text = len(vals) - num_num density = len(vals) / max(1, num_cols) if num_num >= 2 and all(_is_year_token(v) for v in nums_list) and num_text >= 2: return False if num_num >= max(2, num_text): return True if density >= 0.6 and num_num >= 2: return True first = str(vals[0]).strip().lower() if vals else "" if first in ("total","totals","grand total"): return True return False PIVOT_MARKERS = {"row labels","column labels","values","grand total","report filter","filters","∑ values","σ values","Σ values"} def is_pivot_marker_string(s: str) -> bool: if not s: return False t = str(s).strip().lower() if t in PIVOT_MARKERS: return True if t.startswith(("sum of ","count of ","avg of ","average of ")): return True if t.endswith(" total") or t.startswith("total "): return True return False def is_pivot_row(cells) -> bool: text_cells = [str(v).strip() for v in cells if v not in (None, "")] if not text_cells: return False if any(is_pivot_marker_string(x) for x in text_cells): return True agg_hits = sum(1 for x in text_cells if x.lower().startswith(("sum of","count of","avg of","average of","min of","max of"))) return agg_hits >= 2 def is_pivot_or_chart_sheet(ws: Worksheet) -> bool: try: if getattr(ws, "_charts", None): return True except Exception: pass if hasattr(ws, "_pivots") and getattr(ws, "_pivots"): return True scan_rows = min(ws.max_row, 40); scan_cols = min(ws.max_column, 20) pivotish = 0 for r in range(1, scan_rows+1): row = [ws.cell(r,c).value for c in range(1, scan_cols+1)] if is_pivot_row(row): pivotish += 1 if pivotish >= 2: return True name = (ws.title or "").lower() if any(k in name for k in ("pivot","dashboard","chart","charts")): return True return False def _samples_for_column(rows, col_idx, max_items=20): vals = [] for row in rows: if col_idx < len(row): v = row[col_idx] if v not in (None, ""): vals.append(v) if len(vals) >= max_items: break return vals def _heuristic_infer_col_name(samples): if not samples: return None if sum(1 for v in samples if _is_year_token(v)) >= max(2, int(0.8*len(samples))): return "year" pct_hits = 0 for v in samples: s = str(v).strip() if s.endswith("%"): pct_hits += 1 else: try: f = float(s.replace(",","")) if 0 <= f <= 1.0 or 0 <= f <= 100: pct_hits += 0.5 except: pass if pct_hits >= max(2, int(0.7*len(samples))): return "percentage" if sum(1 for v in samples if _is_numlike(v)) >= max(3, int(0.7*len(samples))): intish = 0 for v in samples: try: if float(str(v).replace(",","")) == int(float(str(v).replace(",",""))): intish += 1 except: pass if intish >= max(2, int(0.6*len(samples))): return "count" return "value" uniq = {str(v).strip().lower() for v in samples} if len(uniq) <= 3 and max(len(str(v)) for v in samples) >= 30: return "question" if sum(1 for v in samples if re.search(r"\d", str(v)) and ("-" in str(v) or "–" in str(v))) >= max(2, int(0.6*len(samples))): return "range" if len(uniq) < max(5, int(0.5*len(samples))): return "category" return None def used_bounds(ws: Worksheet) -> Tuple[int,int,int,int]: min_row, max_row, min_col, max_col = None, 0, None, 0 for r in ws.iter_rows(): for c in r: v = c.value if v is not None and str(v).strip() != "": if min_row is None or c.row < min_row: min_row = c.row if c.row > max_row: max_row = c.row if min_col is None or c.column < min_col: min_col = c.column if c.column > max_col: max_col = c.column if min_row is None: return 1,0,1,0 return min_row, max_row, min_col, max_col def build_merged_master_map(ws: Worksheet): mapping = {} for mr in ws.merged_cells.ranges: min_col, min_row, max_col, max_row = mr.min_col, mr.min_row, mr.max_col, mr.max_row master = (min_row, min_col) for r in range(min_row, max_row+1): for c in range(min_col, max_col+1): mapping[(r,c)] = master return mapping def build_value_grid(ws: Worksheet, min_row: int, max_row: int, min_col: int, max_col: int): merged_map = build_merged_master_map(ws) nrows = max_row - min_row + 1; ncols = max_col - min_col + 1 grid = [[None]*ncols for _ in range(nrows)] for r in range(min_row, max_row+1): rr = r - min_row for c in range(min_col, max_col+1): cc = c - min_col master = merged_map.get((r,c)) if master: mr, mc = master; grid[rr][cc] = ws.cell(mr, mc).value else: grid[rr][cc] = ws.cell(r, c).value return grid def row_vals_from_grid(grid, r, min_row): return grid[r - min_row] def is_empty_row_vals(vals): return not any(v not in (None, "") for v in vals) def is_title_like_row_vals(vals, total_cols=20): vals_ne = _nonempty(vals) if not vals_ne: return False if len(vals_ne) == 1: return True coverage = len(vals_ne) / max(1, total_cols) if coverage <= 0.2 and all(isinstance(v,str) and len(str(v))>20 for v in vals_ne): return True uniq = {str(v).strip().lower() for v in vals_ne} if len(uniq) == 1: return True block = {"local currency unit per us dollar","exchange rate","average annual exchange rate"} if any(str(v).strip().lower() in block for v in vals_ne): return True return False def is_header_candidate_row_vals(vals, total_cols=20): vals_ne = _nonempty(vals) if not vals_ne: return False if is_title_like_row_vals(vals, total_cols): return False nums = sum(1 for v in vals_ne if _is_numlike(v)) years = sum(1 for v in vals_ne if _is_year_token(v)) has_text = any(not _is_numlike(v) for v in vals_ne) if years >= 2 and has_text: return True if nums >= max(2, len(vals_ne)-nums): return years >= max(2, int(0.6*len(vals_ne))) uniq_labels = {str(v).strip().lower() for v in vals_ne if not _is_numlike(v)} return (len(vals_ne) >= 2) or (len(uniq_labels) >= 2) def detect_tables_fast(ws: Worksheet, grid, min_row, max_row, min_col, max_col): blocks = [] if is_pivot_or_chart_sheet(ws): return blocks total_cols = max_col - min_col + 1 r = min_row while r <= max_row: vals = row_vals_from_grid(grid, r, min_row) if is_empty_row_vals(vals) or is_title_like_row_vals(vals, total_cols) or is_pivot_row(vals): r += 1; continue if not is_probably_data(vals, total_cols): r += 1; continue data_start = r header_rows = [] up = data_start - 1 while up >= min_row: vup = row_vals_from_grid(grid, up, min_row) if is_empty_row_vals(vup): break if is_title_like_row_vals(vup, total_cols) or is_pivot_row(vup): up -= 1; continue if is_header_candidate_row_vals(vup, total_cols): header_rows = [] hdr_row = up while hdr_row >= min_row: hdr_vals = row_vals_from_grid(grid, hdr_row, min_row) if is_empty_row_vals(hdr_vals): break if is_header_candidate_row_vals(hdr_vals, total_cols): header_rows.insert(0, hdr_row); hdr_row -= 1 else: break break data_end = data_start rr = data_start + 1 while rr <= max_row: v = row_vals_from_grid(grid, rr, min_row) if is_probably_footer(v) or is_pivot_row(v): break if is_empty_row_vals(v): break if is_probably_data(v, total_cols) or is_header_candidate_row_vals(v, total_cols): data_end = rr rr += 1 title_text = None if header_rows: top = header_rows[0] for tr in range(max(min_row, top-3), top): tv = row_vals_from_grid(grid, tr, min_row) if is_title_like_row_vals(tv, total_cols): first = next((str(x).strip() for x in tv if x not in (None,"")), None) if first: title_text = first break if (header_rows or data_end - data_start >= 1) and data_start <= data_end: blocks.append({"header_rows": header_rows, "data_start": data_start, "data_end": data_end, "title_text": title_text}) r = data_end + 1 while r <= max_row and is_empty_row_vals(row_vals_from_grid(grid, r, min_row)): r += 1 return blocks def expand_headers_from_grid(grid, header_rows, min_row, min_col, eff_max_col): if not header_rows: return [] mat = [] for r in header_rows: row_vals = row_vals_from_grid(grid, r, min_row) row = [("" if (row_vals[c] is None) else str(row_vals[c]).strip()) for c in range(0, eff_max_col)] last = "" for i in range(len(row)): if row[i] == "" and i > 0: row[i] = last else: last = row[i] mat.append(row) return mat def sheet_block_to_df_fast(ws, grid, min_row, max_row, min_col, max_col, header_rows, data_start, data_end): import pandas as pd total_cols = max_col - min_col + 1 if (not header_rows) and data_start and data_start > min_row: prev = row_vals_from_grid(grid, data_start - 1, min_row) if is_header_candidate_row_vals(prev, total_cols): header_rows = [data_start - 1] if (not header_rows) and data_start: cur = row_vals_from_grid(grid, data_start, min_row) nxt = row_vals_from_grid(grid, data_start + 1, min_row) if data_start + 1 <= max_row else [] if is_header_candidate_row_vals(cur, total_cols) and is_probably_data(nxt, total_cols): header_rows = [data_start]; data_start += 1 if not header_rows or data_start is None or data_end is None or data_end < data_start: import pandas as _pd return _pd.DataFrame(), [], [] def used_upto_col(): maxc = 0 for r in list(header_rows) + list(range(data_start, data_end+1)): vals = row_vals_from_grid(grid, r, min_row) for c_off in range(total_cols): v = vals[c_off] if v not in (None, ""): maxc = max(maxc, c_off+1) return maxc or total_cols eff_max_col = used_upto_col() header_mat = expand_headers_from_grid(grid, header_rows, min_row, min_col, eff_max_col) def is_title_level(values): total = len(values) filled = [str(v).strip() for v in values if v not in (None, "")] if total == 0: return False coverage = len(filled) / total if coverage <= 0.2 and len(filled) <= 2: return True if filled: uniq = {v.lower() for v in filled} if len(uniq) == 1: label = next(iter(uniq)) dom = sum(1 for v in values if isinstance(v,str) and v.strip().lower() == label) if dom / total >= 0.6: return True return False usable_levels = [i for i in range(len(header_mat)) if not is_title_level(header_mat[i])] if not usable_levels and header_mat: usable_levels = [len(header_mat) - 1] cols = [] for c_off in range(eff_max_col): parts = [header_mat[l][c_off] for l in range(usable_levels[0], usable_levels[-1]+1)] if usable_levels else [] cols.append(compose_col(parts)) cols = ensure_unique([clean_col_name(x) for x in cols]) data_rows = [] for r in range(data_start, data_end+1): vals = row_vals_from_grid(grid, r, min_row) row = [vals[c_off] for c_off in range(eff_max_col)] if is_probably_footer(row): break data_rows.append(row[:len(cols)]) if not data_rows: import pandas as _pd return _pd.DataFrame(columns=cols), header_mat, cols keep_mask = [any(row[i] not in (None, "") for row in data_rows) for i in range(len(cols))] kept_cols = [c for c,k in zip(cols, keep_mask) if k] trimmed_rows = [[v for v,k in zip(row, keep_mask) if k] for row in data_rows] import pandas as pd df = pd.DataFrame(trimmed_rows, columns=kept_cols) if any(str(c).startswith("unnamed_column") for c in df.columns): new_names = list(df.columns) for idx, name in enumerate(list(df.columns)): if not str(name).startswith("unnamed_column"): continue samples = _samples_for_column(trimmed_rows, idx, max_items=20) guess = _heuristic_infer_col_name(samples) if guess: new_names[idx] = clean_col_name(guess) df.columns = ensure_unique([clean_col_name(x) for x in new_names]) return df, header_mat, kept_cols # ------------------------- Optional LLM title inference ------------------------- def _llm_infer_table_title(header_mat, sample_rows, sheet_name): if os.environ.get("EXCEL_LLM_INFER","0") != "1": return None api_key = os.environ.get("OPENAI_API_KEY") if not api_key: return None headers = [] if header_mat: for c in range(len(header_mat[0])): parts = [header_mat[l][c] for l in range(len(header_mat))] parts = [p for p in parts if p] if parts: headers.append("_".join(parts)) headers = headers[:10] samples = [[str(x) for x in r[:6]] for r in sample_rows[:5]] prompt = ( "Propose a short, human-readable title for a data table.\n" "Keep it 3-6 words, Title Case, no punctuation at the end.\n" f"Sheet: {sheet_name}\nHeaders: {headers}\nRow samples: {samples}\n" "Answer with JSON: {\"title\": \"...\"}" ) try: from openai import OpenAI client = OpenAI(api_key=api_key) resp = client.chat.completions.create( model=os.environ.get("OPENAI_MODEL","gpt-4o-mini"), messages=[{"role":"user","content":prompt}], temperature=0.2, ) text = resp.choices[0].message.content.strip() except Exception: return None import re as _re, json as _json m = _re.search(r"\{.*\}", text, re.S) if not m: return None try: obj = _json.loads(m.group(0)); title = obj.get("title","").strip() return title or None except Exception: return None def _heuristic_table_title(header_mat, sheet_name, idx): if header_mat: parts = [] levels = len(header_mat) cols = len(header_mat[0]) if header_mat else 0 for c in range(min(6, cols)): colparts = [header_mat[l][c] for l in range(min(levels, 2)) if header_mat[l][c]] if colparts: parts.extend(colparts) if parts: base = " ".join(dict.fromkeys(parts)) return base[:60] return f"{sheet_name} Table {idx}" def infer_table_title(header_mat, sample_rows, sheet_name, idx): title = _heuristic_table_title(header_mat, sheet_name, idx) llm = _llm_infer_table_title(header_mat, sample_rows, sheet_name) return llm or title # ------------------------- Unified lineage helpers ------------------------- FILE_SCHEMA_TABLE = "__file_schema" FILE_TABLES_TABLE = "__file_tables" def ensure_lineage_tables(con): con.execute(f""" CREATE TABLE IF NOT EXISTS {FILE_SCHEMA_TABLE} ( file_name TEXT, sheet_name TEXT, table_name TEXT, column_ordinal INTEGER, original_name TEXT, sql_column TEXT ) """) con.execute(f""" CREATE TABLE IF NOT EXISTS {FILE_TABLES_TABLE} ( file_name TEXT, sheet_name TEXT, table_name TEXT, block_index INTEGER, start_row INTEGER, end_row INTEGER, header_rows_json TEXT, inferred_title TEXT, original_title_text TEXT ) """) def record_table_schema(con, file_name, sheet_name, table_name, columns): """ columns: list of tuples (column_ordinal, original_name, sql_column) """ ensure_lineage_tables(con) # DuckDB doesn't support `IS ?` with NULL; branch the delete if sheet_name is None: con.execute( f"DELETE FROM {FILE_SCHEMA_TABLE} WHERE file_name = ? AND sheet_name IS NULL AND table_name = ?", [file_name, table_name], ) else: con.execute( f"DELETE FROM {FILE_SCHEMA_TABLE} WHERE file_name = ? AND sheet_name = ? AND table_name = ?", [file_name, sheet_name, table_name], ) con.executemany( f"INSERT INTO {FILE_SCHEMA_TABLE} (file_name, sheet_name, table_name, column_ordinal, original_name, sql_column) VALUES (?, ?, ?, ?, ?, ?)", [(file_name, sheet_name, table_name, i, orig, sql) for (i, orig, sql) in columns], ) def record_table_block(con, file_name, sheet_name, table_name, block_index, start_row, end_row, header_rows_json, inferred_title, original_title_text): ensure_lineage_tables(con) # DuckDB doesn't support `IS ?` with NULL; branch the delete if sheet_name is None: con.execute( f"DELETE FROM {FILE_TABLES_TABLE} WHERE file_name = ? AND sheet_name IS NULL AND table_name = ? AND block_index = ?", [file_name, table_name, int(block_index) if block_index is not None else 0], ) else: con.execute( f"DELETE FROM {FILE_TABLES_TABLE} WHERE file_name = ? AND sheet_name = ? AND table_name = ? AND block_index = ?", [file_name, sheet_name, table_name, int(block_index) if block_index is not None else 0], ) con.execute( f"""INSERT INTO {FILE_TABLES_TABLE} (file_name, sheet_name, table_name, block_index, start_row, end_row, header_rows_json, inferred_title, original_title_text) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", [ file_name, sheet_name, table_name, int(block_index) if block_index is not None else 0, int(start_row) if start_row is not None else None, int(end_row) if end_row is not None else None, header_rows_json, inferred_title, original_title_text ] ) # --- block coalescing to avoid nested/overlapping duplicates --- def coalesce_blocks(blocks: List[Dict]) -> List[Dict]: """Keep only maximal non-overlapping blocks by data row range.""" if not blocks: return blocks blocks_sorted = sorted(blocks, key=lambda b: (b["data_start"], b["data_end"])) result = [] for b in blocks_sorted: if any(b["data_start"] >= x["data_start"] and b["data_end"] <= x["data_end"] for x in result): continue # fully contained -> drop result.append(b) return result # ------------------------- Persistence: Excel ------------------------- def persist(excel_path, duckdb_path): try: from duckdb import connect except ImportError: print("Error: DuckDB library not installed. Install with: pip install duckdb"); sys.exit(1) try: wb = load_workbook(excel_path, data_only=True) except FileNotFoundError: print(f"Error: Excel file not found: {excel_path}"); sys.exit(1) except Exception as e: print(f"Error loading Excel file: {e}"); sys.exit(1) file_name = Path(excel_path).name db_path = Path(duckdb_path) db_path.parent.mkdir(parents=True, exist_ok=True) new_db = not db_path.exists() con = connect(str(db_path)) if new_db: print(f"Created new DuckDB at: {db_path}") # Ensure unified lineage tables exist ensure_lineage_tables(con) used_names = set(); total_tables = 0; total_rows = 0 for sheet in wb.sheetnames: ws = wb[sheet] try: if not isinstance(ws, Worksheet): print(f"Skipping chartsheet: {sheet}"); continue except Exception: pass if is_pivot_or_chart_sheet(ws): print(f"Skipping pivot/chart-like sheet: {sheet}"); continue min_row, max_row, min_col, max_col = used_bounds(ws) if max_row < min_row: continue grid = build_value_grid(ws, min_row, max_row, min_col, max_col) blocks = detect_tables_fast(ws, grid, min_row, max_row, min_col, max_col) blocks = coalesce_blocks(blocks) if not blocks: continue # per-sheet content hash set to avoid identical duplicate content seen_content = set() for idx, blk in enumerate(blocks, start=1): df, header_mat, kept_cols = sheet_block_to_df_fast( ws, grid, min_row, max_row, min_col, max_col, blk["header_rows"], blk["data_start"], blk["data_end"] ) if df.empty: continue # Content hash (stable CSV representation) csv_bytes = df.to_csv(index=False).encode("utf-8") h = hashlib.sha256(csv_bytes).hexdigest() if h in seen_content: print(f"Skipping duplicate content on sheet {sheet} (block {idx})") continue seen_content.add(h) # Build original composite header names for lineage mapping original_cols = [] if header_mat: levels = len(header_mat) cols = len(header_mat[0]) if header_mat else 0 for c in range(cols): parts = [header_mat[l][c] for l in range(levels)] original_cols.append("_".join([p for p in parts if p])) else: original_cols = list(df.columns) while len(original_cols) < len(df.columns): original_cols.append("unnamed") title_orig = blk.get("title_text") title = title_orig or infer_table_title(header_mat, df.values.tolist(), sheet, idx) candidate = title if title else f"{sheet} Table {idx}" table = ensure_unique_table_name(used_names, candidate) # Create/replace table con.execute(f'DROP TABLE IF EXISTS "{table}"') con.register(f"{table}_temp", df) con.execute(f'CREATE TABLE "{table}" AS SELECT * FROM {table}_temp') con.unregister(f"{table}_temp") # Record lineage (schema + block) schema_rows = [] for cidx, (orig, sqlc) in enumerate(zip(original_cols[:len(df.columns)], df.columns), start=1): schema_rows.append((cidx, str(orig), str(sqlc))) record_table_schema( con, file_name=file_name, sheet_name=sheet, table_name=table, columns=schema_rows, ) record_table_block( con, file_name=file_name, sheet_name=sheet, table_name=table, block_index=idx, start_row=int(blk["data_start"]), end_row=int(blk["data_end"]), header_rows_json=json.dumps(blk["header_rows"]), inferred_title=title if title else None, original_title_text=title_orig if title_orig else None, ) print(f"Created table {table} from sheet {sheet} with {len(df)} rows and {len(df.columns)} columns.") total_tables += 1; total_rows += len(df) con.close() print(f"""\n✅ Completed. - Created {total_tables} tables with {total_rows} total rows - Column lineage: {FILE_SCHEMA_TABLE} - Block metadata: {FILE_TABLES_TABLE}""") # ------------------------- Persistence: CSV ------------------------- def persist_csv(csv_path, duckdb_path): """ Ingest a single CSV file into DuckDB AND write lineage, aligned with Excel. - First row is headers. - One table named from the CSV file name. - Cleans headers and ensures uniqueness. - Records lineage in __file_schema and __file_tables using the unified schema (with file_name). """ import pandas as pd from duckdb import connect csv_path = Path(csv_path) if not csv_path.exists(): print(f"Error: CSV file not found: {csv_path}") sys.exit(1) # Keep original header names for lineage before cleaning try: df_raw = pd.read_csv(csv_path, header=0, encoding="utf-8-sig") except UnicodeDecodeError: df_raw = pd.read_csv(csv_path, header=0) original_headers = list(df_raw.columns) # Clean/normalize column names def _clean_hdr(s): s = str(s) if s is not None else "" s = s.strip() s = re.sub(r"\s+", " ", s) return clean_col_name(s) cleaned_cols = ensure_unique([_clean_hdr(c) for c in original_headers]) df = df_raw.copy() df.columns = cleaned_cols # Compute table name from file name table = sanitize_table_name(csv_path.stem) # Open / create DuckDB db_path = Path(duckdb_path) db_path.parent.mkdir(parents=True, exist_ok=True) new_db = not db_path.exists() con = connect(str(db_path)) if new_db: print(f"Created new DuckDB at: {db_path}") # Ensure unified lineage tables (with file_name) exist ensure_lineage_tables(con) # Create/replace the data table con.execute(f'DROP TABLE IF EXISTS "{table}"') con.register(f"{table}_temp_df", df) con.execute(f'CREATE TABLE "{table}" AS SELECT * FROM {table}_temp_df') con.unregister(f"{table}_temp_df") # Write lineage file_name = csv_path.name sheet_name = None # CSV has no sheet block_index = 1 # single block/table for CSV start_row = 2 # header is row 1, data starts at 2 end_row = len(df) + 1 # header + data rows header_rows_json = "[1]" # header row index list as JSON inferred_title = None original_title_text = None # Map original->sql columns schema_rows = [] for i, (orig, sql) in enumerate(zip(original_headers, cleaned_cols), start=1): schema_rows.append((i, str(orig), str(sql))) record_table_schema( con, file_name=file_name, sheet_name=sheet_name, table_name=table, columns=schema_rows ) record_table_block( con, file_name=file_name, sheet_name=sheet_name, table_name=table, block_index=block_index, start_row=start_row, end_row=end_row, header_rows_json=header_rows_json, inferred_title=inferred_title, original_title_text=original_title_text ) print(f'Created table {table} from CSV "{csv_path.name}" with {len(df)} rows and {len(df.columns)} columns.') con.close() # ------------------------- CLI ------------------------- def ensure_unique_table_name(existing: set, name: str) -> str: base = sanitize_table_name(name) or "table" if base not in existing: existing.add(base); return base i = 2 while f"{base}_{i}" in existing: i += 1 out = f"{base}_{i}"; existing.add(out); return out def main(): import argparse ap = argparse.ArgumentParser(description="Excel/CSV → DuckDB (unified --file + lineage).") ap.add_argument("--file", required=True, help="Path to .xlsx/.xlsm/.xls or .csv") ap.add_argument("--duckdb", required=True, help="Path to DuckDB file") args = ap.parse_args() if not os.path.exists(args.file): print(f"Error: file not found: {args.file}") sys.exit(1) ext = Path(args.file).suffix.lower() if ext in [".xlsx", ".xlsm", ".xls"]: persist(args.file, args.duckdb) elif ext == ".csv": persist_csv(args.file, args.duckdb) else: print("Error: unsupported file type. Use .xlsx/.xlsm/.xls or .csv") sys.exit(2) if __name__ == "__main__": main()