Spaces:
Sleeping
Sleeping
File size: 31,628 Bytes
aa96015 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 | """
Excel/CSV → DuckDB ingestion (generic, robust, multi-table, unified lineage)
- Supports Excel (.xlsx/.xlsm/.xls) and CSV (first row = headers)
- Hierarchical headers with merged-cell parent context (titles removed)
- Merged rows/cols resolved to master (top-left) value for consistent replication
- Multiple tables detected ONLY when separated by at least one completely empty row
- Footer detection (ignore trailing notes/summaries)
- Pivot detection (skip pivot-looking rows; optional sheet-level pivot/charthood skip)
- Optional LLM inference for unnamed columns and table titles (EXCEL_LLM_INFER=1)
- One DuckDB table per detected table block (Excel) or per file (CSV)
- Unified lineage tables for BOTH Excel and CSV:
__file_schema (file_name, sheet_name, table_name, column_ordinal, original_name, sql_column)
__file_tables (file_name, sheet_name, table_name, block_index, start_row, end_row,
header_rows_json, inferred_title, original_title_text)
Usage:
python source_to_duckdb.py --file /path/file.xlsx --duckdb /path/out.duckdb
python source_to_duckdb.py --file /path/file.csv --duckdb /path/out.duckdb
"""
import os
import re
import sys
import json
import hashlib
from pathlib import Path
from typing import List, Tuple, Dict
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
# ------------------------- Small utilities -------------------------
def _nonempty(vals):
return [v for v in vals if v not in (None, "")]
def _is_numlike(x):
if isinstance(x, (int, float)):
return True
s = str(x).strip().replace(",", "")
if s.endswith("%"):
s = s[:-1]
if not s:
return False
if any(c.isalpha() for c in s):
return False
try:
float(s); return True
except: return False
def _is_year_token(x):
if isinstance(x, int) and 1800 <= x <= 2100: return True
s = str(x).strip()
return s.isdigit() and 1800 <= int(s) <= 2100
def sanitize_table_name(name: str) -> str:
t = re.sub(r"[^\w]", "_", str(name))
t = re.sub(r"_+", "_", t).strip("_")
if t and not t[0].isalpha(): t = "table_" + t
return t or "sheet_data"
def clean_col_name(s: str) -> str:
s = re.sub(r"[^\w\s%#‰]", "", str(s).strip())
s = s.replace("%"," pct").replace("‰"," permille").replace("#"," count ")
s = re.sub(r"\s+"," ", s)
s = re.sub(r"\s+","_", s)
s = re.sub(r"_+","_", s).strip("_")
if s and s[0].isdigit(): s = "col_" + s
return s or "unnamed_column"
def ensure_unique(names):
seen = {}; out = []
for n in names:
base = (n or "unnamed_column").lower()
if base not in seen:
seen[base] = 0; out.append(n)
else:
i = seen[base] + 1
while f"{n}_{i}".lower() in seen: i += 1
seen[base] = i; out.append(f"{n}_{i}")
seen[(f"{n}_{i}").lower()] = 0
return out
def compose_col(parts):
cleaned = []; prev = None
for p in parts:
if not p: continue
p_norm = str(p).strip()
if prev is not None and p_norm.lower() == prev.lower(): continue
cleaned.append(p_norm); prev = p_norm
if not cleaned: return ""
return clean_col_name("_".join(cleaned))
# ------------------------- Heuristics & detection -------------------------
def is_probably_footer(cells):
nonempty = [(i, v) for i, v in enumerate(cells) if v not in (None, "")]
if not nonempty: return False
if len(nonempty) <= 2:
text = " ".join(str(v) for _, v in nonempty).strip().lower()
if any(text.startswith(k) for k in ["note","notes","source","summary","disclaimer"]): return True
if len(text) > 50: return True
return False
def is_probably_data(cells, num_cols):
vals = [v for v in cells if v not in (None, "")]
if not vals: return False
nums_list = [v for v in vals if _is_numlike(v)]
num_num = len(nums_list); num_text = len(vals) - num_num
density = len(vals) / max(1, num_cols)
if num_num >= 2 and all(_is_year_token(v) for v in nums_list) and num_text >= 2:
return False
if num_num >= max(2, num_text): return True
if density >= 0.6 and num_num >= 2: return True
first = str(vals[0]).strip().lower() if vals else ""
if first in ("total","totals","grand total"): return True
return False
PIVOT_MARKERS = {"row labels","column labels","values","grand total","report filter","filters","∑ values","σ values","Σ values"}
def is_pivot_marker_string(s: str) -> bool:
if not s: return False
t = str(s).strip().lower()
if t in PIVOT_MARKERS: return True
if t.startswith(("sum of ","count of ","avg of ","average of ")): return True
if t.endswith(" total") or t.startswith("total "): return True
return False
def is_pivot_row(cells) -> bool:
text_cells = [str(v).strip() for v in cells if v not in (None, "")]
if not text_cells: return False
if any(is_pivot_marker_string(x) for x in text_cells): return True
agg_hits = sum(1 for x in text_cells if x.lower().startswith(("sum of","count of","avg of","average of","min of","max of")))
return agg_hits >= 2
def is_pivot_or_chart_sheet(ws: Worksheet) -> bool:
try:
if getattr(ws, "_charts", None): return True
except Exception: pass
if hasattr(ws, "_pivots") and getattr(ws, "_pivots"): return True
scan_rows = min(ws.max_row, 40); scan_cols = min(ws.max_column, 20)
pivotish = 0
for r in range(1, scan_rows+1):
row = [ws.cell(r,c).value for c in range(1, scan_cols+1)]
if is_pivot_row(row):
pivotish += 1
if pivotish >= 2: return True
name = (ws.title or "").lower()
if any(k in name for k in ("pivot","dashboard","chart","charts")): return True
return False
def _samples_for_column(rows, col_idx, max_items=20):
vals = []
for row in rows:
if col_idx < len(row):
v = row[col_idx]
if v not in (None, ""): vals.append(v)
if len(vals) >= max_items: break
return vals
def _heuristic_infer_col_name(samples):
if not samples: return None
if sum(1 for v in samples if _is_year_token(v)) >= max(2, int(0.8*len(samples))): return "year"
pct_hits = 0
for v in samples:
s = str(v).strip()
if s.endswith("%"): pct_hits += 1
else:
try:
f = float(s.replace(",",""))
if 0 <= f <= 1.0 or 0 <= f <= 100: pct_hits += 0.5
except: pass
if pct_hits >= max(2, int(0.7*len(samples))): return "percentage"
if sum(1 for v in samples if _is_numlike(v)) >= max(3, int(0.7*len(samples))):
intish = 0
for v in samples:
try:
if float(str(v).replace(",","")) == int(float(str(v).replace(",",""))): intish += 1
except: pass
if intish >= max(2, int(0.6*len(samples))): return "count"
return "value"
uniq = {str(v).strip().lower() for v in samples}
if len(uniq) <= 3 and max(len(str(v)) for v in samples) >= 30: return "question"
if sum(1 for v in samples if re.search(r"\d", str(v)) and ("-" in str(v) or "–" in str(v))) >= max(2, int(0.6*len(samples))): return "range"
if len(uniq) < max(5, int(0.5*len(samples))): return "category"
return None
def used_bounds(ws: Worksheet) -> Tuple[int,int,int,int]:
min_row, max_row, min_col, max_col = None, 0, None, 0
for r in ws.iter_rows():
for c in r:
v = c.value
if v is not None and str(v).strip() != "":
if min_row is None or c.row < min_row: min_row = c.row
if c.row > max_row: max_row = c.row
if min_col is None or c.column < min_col: min_col = c.column
if c.column > max_col: max_col = c.column
if min_row is None: return 1,0,1,0
return min_row, max_row, min_col, max_col
def build_merged_master_map(ws: Worksheet):
mapping = {}
for mr in ws.merged_cells.ranges:
min_col, min_row, max_col, max_row = mr.min_col, mr.min_row, mr.max_col, mr.max_row
master = (min_row, min_col)
for r in range(min_row, max_row+1):
for c in range(min_col, max_col+1):
mapping[(r,c)] = master
return mapping
def build_value_grid(ws: Worksheet, min_row: int, max_row: int, min_col: int, max_col: int):
merged_map = build_merged_master_map(ws)
nrows = max_row - min_row + 1; ncols = max_col - min_col + 1
grid = [[None]*ncols for _ in range(nrows)]
for r in range(min_row, max_row+1):
rr = r - min_row
for c in range(min_col, max_col+1):
cc = c - min_col
master = merged_map.get((r,c))
if master:
mr, mc = master; grid[rr][cc] = ws.cell(mr, mc).value
else:
grid[rr][cc] = ws.cell(r, c).value
return grid
def row_vals_from_grid(grid, r, min_row):
return grid[r - min_row]
def is_empty_row_vals(vals):
return not any(v not in (None, "") for v in vals)
def is_title_like_row_vals(vals, total_cols=20):
vals_ne = _nonempty(vals)
if not vals_ne: return False
if len(vals_ne) == 1: return True
coverage = len(vals_ne) / max(1, total_cols)
if coverage <= 0.2 and all(isinstance(v,str) and len(str(v))>20 for v in vals_ne): return True
uniq = {str(v).strip().lower() for v in vals_ne}
if len(uniq) == 1: return True
block = {"local currency unit per us dollar","exchange rate","average annual exchange rate"}
if any(str(v).strip().lower() in block for v in vals_ne): return True
return False
def is_header_candidate_row_vals(vals, total_cols=20):
vals_ne = _nonempty(vals)
if not vals_ne: return False
if is_title_like_row_vals(vals, total_cols): return False
nums = sum(1 for v in vals_ne if _is_numlike(v))
years = sum(1 for v in vals_ne if _is_year_token(v))
has_text = any(not _is_numlike(v) for v in vals_ne)
if years >= 2 and has_text: return True
if nums >= max(2, len(vals_ne)-nums): return years >= max(2, int(0.6*len(vals_ne)))
uniq_labels = {str(v).strip().lower() for v in vals_ne if not _is_numlike(v)}
return (len(vals_ne) >= 2) or (len(uniq_labels) >= 2)
def detect_tables_fast(ws: Worksheet, grid, min_row, max_row, min_col, max_col):
blocks = []
if is_pivot_or_chart_sheet(ws): return blocks
total_cols = max_col - min_col + 1
r = min_row
while r <= max_row:
vals = row_vals_from_grid(grid, r, min_row)
if is_empty_row_vals(vals) or is_title_like_row_vals(vals, total_cols) or is_pivot_row(vals):
r += 1; continue
if not is_probably_data(vals, total_cols):
r += 1; continue
data_start = r
header_rows = []
up = data_start - 1
while up >= min_row:
vup = row_vals_from_grid(grid, up, min_row)
if is_empty_row_vals(vup): break
if is_title_like_row_vals(vup, total_cols) or is_pivot_row(vup):
up -= 1; continue
if is_header_candidate_row_vals(vup, total_cols):
header_rows = []
hdr_row = up
while hdr_row >= min_row:
hdr_vals = row_vals_from_grid(grid, hdr_row, min_row)
if is_empty_row_vals(hdr_vals): break
if is_header_candidate_row_vals(hdr_vals, total_cols):
header_rows.insert(0, hdr_row); hdr_row -= 1
else: break
break
data_end = data_start
rr = data_start + 1
while rr <= max_row:
v = row_vals_from_grid(grid, rr, min_row)
if is_probably_footer(v) or is_pivot_row(v): break
if is_empty_row_vals(v): break
if is_probably_data(v, total_cols) or is_header_candidate_row_vals(v, total_cols):
data_end = rr
rr += 1
title_text = None
if header_rows:
top = header_rows[0]
for tr in range(max(min_row, top-3), top):
tv = row_vals_from_grid(grid, tr, min_row)
if is_title_like_row_vals(tv, total_cols):
first = next((str(x).strip() for x in tv if x not in (None,"")), None)
if first: title_text = first
break
if (header_rows or data_end - data_start >= 1) and data_start <= data_end:
blocks.append({"header_rows": header_rows, "data_start": data_start, "data_end": data_end, "title_text": title_text})
r = data_end + 1
while r <= max_row and is_empty_row_vals(row_vals_from_grid(grid, r, min_row)):
r += 1
return blocks
def expand_headers_from_grid(grid, header_rows, min_row, min_col, eff_max_col):
if not header_rows: return []
mat = []
for r in header_rows:
row_vals = row_vals_from_grid(grid, r, min_row)
row = [("" if (row_vals[c] is None) else str(row_vals[c]).strip()) for c in range(0, eff_max_col)]
last = ""
for i in range(len(row)):
if row[i] == "" and i > 0: row[i] = last
else: last = row[i]
mat.append(row)
return mat
def sheet_block_to_df_fast(ws, grid, min_row, max_row, min_col, max_col, header_rows, data_start, data_end):
import pandas as pd
total_cols = max_col - min_col + 1
if (not header_rows) and data_start and data_start > min_row:
prev = row_vals_from_grid(grid, data_start - 1, min_row)
if is_header_candidate_row_vals(prev, total_cols):
header_rows = [data_start - 1]
if (not header_rows) and data_start:
cur = row_vals_from_grid(grid, data_start, min_row)
nxt = row_vals_from_grid(grid, data_start + 1, min_row) if data_start + 1 <= max_row else []
if is_header_candidate_row_vals(cur, total_cols) and is_probably_data(nxt, total_cols):
header_rows = [data_start]; data_start += 1
if not header_rows or data_start is None or data_end is None or data_end < data_start:
import pandas as _pd
return _pd.DataFrame(), [], []
def used_upto_col():
maxc = 0
for r in list(header_rows) + list(range(data_start, data_end+1)):
vals = row_vals_from_grid(grid, r, min_row)
for c_off in range(total_cols):
v = vals[c_off]
if v not in (None, ""): maxc = max(maxc, c_off+1)
return maxc or total_cols
eff_max_col = used_upto_col()
header_mat = expand_headers_from_grid(grid, header_rows, min_row, min_col, eff_max_col)
def is_title_level(values):
total = len(values)
filled = [str(v).strip() for v in values if v not in (None, "")]
if total == 0: return False
coverage = len(filled) / total
if coverage <= 0.2 and len(filled) <= 2: return True
if filled:
uniq = {v.lower() for v in filled}
if len(uniq) == 1:
label = next(iter(uniq))
dom = sum(1 for v in values if isinstance(v,str) and v.strip().lower() == label)
if dom / total >= 0.6: return True
return False
usable_levels = [i for i in range(len(header_mat)) if not is_title_level(header_mat[i])]
if not usable_levels and header_mat: usable_levels = [len(header_mat) - 1]
cols = []
for c_off in range(eff_max_col):
parts = [header_mat[l][c_off] for l in range(usable_levels[0], usable_levels[-1]+1)] if usable_levels else []
cols.append(compose_col(parts))
cols = ensure_unique([clean_col_name(x) for x in cols])
data_rows = []
for r in range(data_start, data_end+1):
vals = row_vals_from_grid(grid, r, min_row)
row = [vals[c_off] for c_off in range(eff_max_col)]
if is_probably_footer(row): break
data_rows.append(row[:len(cols)])
if not data_rows:
import pandas as _pd
return _pd.DataFrame(columns=cols), header_mat, cols
keep_mask = [any(row[i] not in (None, "") for row in data_rows) for i in range(len(cols))]
kept_cols = [c for c,k in zip(cols, keep_mask) if k]
trimmed_rows = [[v for v,k in zip(row, keep_mask) if k] for row in data_rows]
import pandas as pd
df = pd.DataFrame(trimmed_rows, columns=kept_cols)
if any(str(c).startswith("unnamed_column") for c in df.columns):
new_names = list(df.columns)
for idx, name in enumerate(list(df.columns)):
if not str(name).startswith("unnamed_column"): continue
samples = _samples_for_column(trimmed_rows, idx, max_items=20)
guess = _heuristic_infer_col_name(samples)
if guess: new_names[idx] = clean_col_name(guess)
df.columns = ensure_unique([clean_col_name(x) for x in new_names])
return df, header_mat, kept_cols
# ------------------------- Optional LLM title inference -------------------------
def _llm_infer_table_title(header_mat, sample_rows, sheet_name):
if os.environ.get("EXCEL_LLM_INFER","0") != "1": return None
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key: return None
headers = []
if header_mat:
for c in range(len(header_mat[0])):
parts = [header_mat[l][c] for l in range(len(header_mat))]
parts = [p for p in parts if p]
if parts: headers.append("_".join(parts))
headers = headers[:10]
samples = [[str(x) for x in r[:6]] for r in sample_rows[:5]]
prompt = (
"Propose a short, human-readable title for a data table.\n"
"Keep it 3-6 words, Title Case, no punctuation at the end.\n"
f"Sheet: {sheet_name}\nHeaders: {headers}\nRow samples: {samples}\n"
"Answer with JSON: {\"title\": \"...\"}"
)
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
resp = client.chat.completions.create(
model=os.environ.get("OPENAI_MODEL","gpt-4o-mini"),
messages=[{"role":"user","content":prompt}], temperature=0.2,
)
text = resp.choices[0].message.content.strip()
except Exception:
return None
import re as _re, json as _json
m = _re.search(r"\{.*\}", text, re.S)
if not m: return None
try:
obj = _json.loads(m.group(0)); title = obj.get("title","").strip()
return title or None
except Exception: return None
def _heuristic_table_title(header_mat, sheet_name, idx):
if header_mat:
parts = []
levels = len(header_mat)
cols = len(header_mat[0]) if header_mat else 0
for c in range(min(6, cols)):
colparts = [header_mat[l][c] for l in range(min(levels, 2)) if header_mat[l][c]]
if colparts: parts.extend(colparts)
if parts:
base = " ".join(dict.fromkeys(parts))
return base[:60]
return f"{sheet_name} Table {idx}"
def infer_table_title(header_mat, sample_rows, sheet_name, idx):
title = _heuristic_table_title(header_mat, sheet_name, idx)
llm = _llm_infer_table_title(header_mat, sample_rows, sheet_name)
return llm or title
# ------------------------- Unified lineage helpers -------------------------
FILE_SCHEMA_TABLE = "__file_schema"
FILE_TABLES_TABLE = "__file_tables"
def ensure_lineage_tables(con):
con.execute(f"""
CREATE TABLE IF NOT EXISTS {FILE_SCHEMA_TABLE} (
file_name TEXT,
sheet_name TEXT,
table_name TEXT,
column_ordinal INTEGER,
original_name TEXT,
sql_column TEXT
)
""")
con.execute(f"""
CREATE TABLE IF NOT EXISTS {FILE_TABLES_TABLE} (
file_name TEXT,
sheet_name TEXT,
table_name TEXT,
block_index INTEGER,
start_row INTEGER,
end_row INTEGER,
header_rows_json TEXT,
inferred_title TEXT,
original_title_text TEXT
)
""")
def record_table_schema(con, file_name, sheet_name, table_name, columns):
"""
columns: list of tuples (column_ordinal, original_name, sql_column)
"""
ensure_lineage_tables(con)
# DuckDB doesn't support `IS ?` with NULL; branch the delete
if sheet_name is None:
con.execute(
f"DELETE FROM {FILE_SCHEMA_TABLE} WHERE file_name = ? AND sheet_name IS NULL AND table_name = ?",
[file_name, table_name],
)
else:
con.execute(
f"DELETE FROM {FILE_SCHEMA_TABLE} WHERE file_name = ? AND sheet_name = ? AND table_name = ?",
[file_name, sheet_name, table_name],
)
con.executemany(
f"INSERT INTO {FILE_SCHEMA_TABLE} (file_name, sheet_name, table_name, column_ordinal, original_name, sql_column) VALUES (?, ?, ?, ?, ?, ?)",
[(file_name, sheet_name, table_name, i, orig, sql) for (i, orig, sql) in columns],
)
def record_table_block(con, file_name, sheet_name, table_name, block_index, start_row, end_row, header_rows_json, inferred_title, original_title_text):
ensure_lineage_tables(con)
# DuckDB doesn't support `IS ?` with NULL; branch the delete
if sheet_name is None:
con.execute(
f"DELETE FROM {FILE_TABLES_TABLE} WHERE file_name = ? AND sheet_name IS NULL AND table_name = ? AND block_index = ?",
[file_name, table_name, int(block_index) if block_index is not None else 0],
)
else:
con.execute(
f"DELETE FROM {FILE_TABLES_TABLE} WHERE file_name = ? AND sheet_name = ? AND table_name = ? AND block_index = ?",
[file_name, sheet_name, table_name, int(block_index) if block_index is not None else 0],
)
con.execute(
f"""INSERT INTO {FILE_TABLES_TABLE}
(file_name, sheet_name, table_name, block_index, start_row, end_row, header_rows_json, inferred_title, original_title_text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
[
file_name, sheet_name, table_name,
int(block_index) if block_index is not None else 0,
int(start_row) if start_row is not None else None,
int(end_row) if end_row is not None else None,
header_rows_json, inferred_title, original_title_text
]
)
# --- block coalescing to avoid nested/overlapping duplicates ---
def coalesce_blocks(blocks: List[Dict]) -> List[Dict]:
"""Keep only maximal non-overlapping blocks by data row range."""
if not blocks: return blocks
blocks_sorted = sorted(blocks, key=lambda b: (b["data_start"], b["data_end"]))
result = []
for b in blocks_sorted:
if any(b["data_start"] >= x["data_start"] and b["data_end"] <= x["data_end"] for x in result):
continue # fully contained -> drop
result.append(b)
return result
# ------------------------- Persistence: Excel -------------------------
def persist(excel_path, duckdb_path):
try:
from duckdb import connect
except ImportError:
print("Error: DuckDB library not installed. Install with: pip install duckdb"); sys.exit(1)
try:
wb = load_workbook(excel_path, data_only=True)
except FileNotFoundError:
print(f"Error: Excel file not found: {excel_path}"); sys.exit(1)
except Exception as e:
print(f"Error loading Excel file: {e}"); sys.exit(1)
file_name = Path(excel_path).name
db_path = Path(duckdb_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
new_db = not db_path.exists()
con = connect(str(db_path))
if new_db: print(f"Created new DuckDB at: {db_path}")
# Ensure unified lineage tables exist
ensure_lineage_tables(con)
used_names = set(); total_tables = 0; total_rows = 0
for sheet in wb.sheetnames:
ws = wb[sheet]
try:
if not isinstance(ws, Worksheet):
print(f"Skipping chartsheet: {sheet}"); continue
except Exception: pass
if is_pivot_or_chart_sheet(ws):
print(f"Skipping pivot/chart-like sheet: {sheet}"); continue
min_row, max_row, min_col, max_col = used_bounds(ws)
if max_row < min_row: continue
grid = build_value_grid(ws, min_row, max_row, min_col, max_col)
blocks = detect_tables_fast(ws, grid, min_row, max_row, min_col, max_col)
blocks = coalesce_blocks(blocks)
if not blocks: continue
# per-sheet content hash set to avoid identical duplicate content
seen_content = set()
for idx, blk in enumerate(blocks, start=1):
df, header_mat, kept_cols = sheet_block_to_df_fast(
ws, grid, min_row, max_row, min_col, max_col,
blk["header_rows"], blk["data_start"], blk["data_end"]
)
if df.empty: continue
# Content hash (stable CSV representation)
csv_bytes = df.to_csv(index=False).encode("utf-8")
h = hashlib.sha256(csv_bytes).hexdigest()
if h in seen_content:
print(f"Skipping duplicate content on sheet {sheet} (block {idx})")
continue
seen_content.add(h)
# Build original composite header names for lineage mapping
original_cols = []
if header_mat:
levels = len(header_mat)
cols = len(header_mat[0]) if header_mat else 0
for c in range(cols):
parts = [header_mat[l][c] for l in range(levels)]
original_cols.append("_".join([p for p in parts if p]))
else:
original_cols = list(df.columns)
while len(original_cols) < len(df.columns): original_cols.append("unnamed")
title_orig = blk.get("title_text")
title = title_orig or infer_table_title(header_mat, df.values.tolist(), sheet, idx)
candidate = title if title else f"{sheet} Table {idx}"
table = ensure_unique_table_name(used_names, candidate)
# Create/replace table
con.execute(f'DROP TABLE IF EXISTS "{table}"')
con.register(f"{table}_temp", df)
con.execute(f'CREATE TABLE "{table}" AS SELECT * FROM {table}_temp')
con.unregister(f"{table}_temp")
# Record lineage (schema + block)
schema_rows = []
for cidx, (orig, sqlc) in enumerate(zip(original_cols[:len(df.columns)], df.columns), start=1):
schema_rows.append((cidx, str(orig), str(sqlc)))
record_table_schema(
con,
file_name=file_name,
sheet_name=sheet,
table_name=table,
columns=schema_rows,
)
record_table_block(
con,
file_name=file_name,
sheet_name=sheet,
table_name=table,
block_index=idx,
start_row=int(blk["data_start"]),
end_row=int(blk["data_end"]),
header_rows_json=json.dumps(blk["header_rows"]),
inferred_title=title if title else None,
original_title_text=title_orig if title_orig else None,
)
print(f"Created table {table} from sheet {sheet} with {len(df)} rows and {len(df.columns)} columns.")
total_tables += 1; total_rows += len(df)
con.close()
print(f"""\n✅ Completed.
- Created {total_tables} tables with {total_rows} total rows
- Column lineage: {FILE_SCHEMA_TABLE}
- Block metadata: {FILE_TABLES_TABLE}""")
# ------------------------- Persistence: CSV -------------------------
def persist_csv(csv_path, duckdb_path):
"""
Ingest a single CSV file into DuckDB AND write lineage, aligned with Excel.
- First row is headers.
- One table named from the CSV file name.
- Cleans headers and ensures uniqueness.
- Records lineage in __file_schema and __file_tables using the unified schema (with file_name).
"""
import pandas as pd
from duckdb import connect
csv_path = Path(csv_path)
if not csv_path.exists():
print(f"Error: CSV file not found: {csv_path}")
sys.exit(1)
# Keep original header names for lineage before cleaning
try:
df_raw = pd.read_csv(csv_path, header=0, encoding="utf-8-sig")
except UnicodeDecodeError:
df_raw = pd.read_csv(csv_path, header=0)
original_headers = list(df_raw.columns)
# Clean/normalize column names
def _clean_hdr(s):
s = str(s) if s is not None else ""
s = s.strip()
s = re.sub(r"\s+", " ", s)
return clean_col_name(s)
cleaned_cols = ensure_unique([_clean_hdr(c) for c in original_headers])
df = df_raw.copy()
df.columns = cleaned_cols
# Compute table name from file name
table = sanitize_table_name(csv_path.stem)
# Open / create DuckDB
db_path = Path(duckdb_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
new_db = not db_path.exists()
con = connect(str(db_path))
if new_db:
print(f"Created new DuckDB at: {db_path}")
# Ensure unified lineage tables (with file_name) exist
ensure_lineage_tables(con)
# Create/replace the data table
con.execute(f'DROP TABLE IF EXISTS "{table}"')
con.register(f"{table}_temp_df", df)
con.execute(f'CREATE TABLE "{table}" AS SELECT * FROM {table}_temp_df')
con.unregister(f"{table}_temp_df")
# Write lineage
file_name = csv_path.name
sheet_name = None # CSV has no sheet
block_index = 1 # single block/table for CSV
start_row = 2 # header is row 1, data starts at 2
end_row = len(df) + 1 # header + data rows
header_rows_json = "[1]" # header row index list as JSON
inferred_title = None
original_title_text = None
# Map original->sql columns
schema_rows = []
for i, (orig, sql) in enumerate(zip(original_headers, cleaned_cols), start=1):
schema_rows.append((i, str(orig), str(sql)))
record_table_schema(
con,
file_name=file_name,
sheet_name=sheet_name,
table_name=table,
columns=schema_rows
)
record_table_block(
con,
file_name=file_name,
sheet_name=sheet_name,
table_name=table,
block_index=block_index,
start_row=start_row,
end_row=end_row,
header_rows_json=header_rows_json,
inferred_title=inferred_title,
original_title_text=original_title_text
)
print(f'Created table {table} from CSV "{csv_path.name}" with {len(df)} rows and {len(df.columns)} columns.')
con.close()
# ------------------------- CLI -------------------------
def ensure_unique_table_name(existing: set, name: str) -> str:
base = sanitize_table_name(name) or "table"
if base not in existing:
existing.add(base); return base
i = 2
while f"{base}_{i}" in existing: i += 1
out = f"{base}_{i}"; existing.add(out); return out
def main():
import argparse
ap = argparse.ArgumentParser(description="Excel/CSV → DuckDB (unified --file + lineage).")
ap.add_argument("--file", required=True, help="Path to .xlsx/.xlsm/.xls or .csv")
ap.add_argument("--duckdb", required=True, help="Path to DuckDB file")
args = ap.parse_args()
if not os.path.exists(args.file):
print(f"Error: file not found: {args.file}")
sys.exit(1)
ext = Path(args.file).suffix.lower()
if ext in [".xlsx", ".xlsm", ".xls"]:
persist(args.file, args.duckdb)
elif ext == ".csv":
persist_csv(args.file, args.duckdb)
else:
print("Error: unsupported file type. Use .xlsx/.xlsm/.xls or .csv")
sys.exit(2)
if __name__ == "__main__":
main()
|