data-analysis-agent / source_to_duckdb.py
arthikrangan's picture
Upload 2 files
aa96015 verified
"""
Excel/CSV → DuckDB ingestion (generic, robust, multi-table, unified lineage)
- Supports Excel (.xlsx/.xlsm/.xls) and CSV (first row = headers)
- Hierarchical headers with merged-cell parent context (titles removed)
- Merged rows/cols resolved to master (top-left) value for consistent replication
- Multiple tables detected ONLY when separated by at least one completely empty row
- Footer detection (ignore trailing notes/summaries)
- Pivot detection (skip pivot-looking rows; optional sheet-level pivot/charthood skip)
- Optional LLM inference for unnamed columns and table titles (EXCEL_LLM_INFER=1)
- One DuckDB table per detected table block (Excel) or per file (CSV)
- Unified lineage tables for BOTH Excel and CSV:
__file_schema (file_name, sheet_name, table_name, column_ordinal, original_name, sql_column)
__file_tables (file_name, sheet_name, table_name, block_index, start_row, end_row,
header_rows_json, inferred_title, original_title_text)
Usage:
python source_to_duckdb.py --file /path/file.xlsx --duckdb /path/out.duckdb
python source_to_duckdb.py --file /path/file.csv --duckdb /path/out.duckdb
"""
import os
import re
import sys
import json
import hashlib
from pathlib import Path
from typing import List, Tuple, Dict
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
# ------------------------- Small utilities -------------------------
def _nonempty(vals):
return [v for v in vals if v not in (None, "")]
def _is_numlike(x):
if isinstance(x, (int, float)):
return True
s = str(x).strip().replace(",", "")
if s.endswith("%"):
s = s[:-1]
if not s:
return False
if any(c.isalpha() for c in s):
return False
try:
float(s); return True
except: return False
def _is_year_token(x):
if isinstance(x, int) and 1800 <= x <= 2100: return True
s = str(x).strip()
return s.isdigit() and 1800 <= int(s) <= 2100
def sanitize_table_name(name: str) -> str:
t = re.sub(r"[^\w]", "_", str(name))
t = re.sub(r"_+", "_", t).strip("_")
if t and not t[0].isalpha(): t = "table_" + t
return t or "sheet_data"
def clean_col_name(s: str) -> str:
s = re.sub(r"[^\w\s%#‰]", "", str(s).strip())
s = s.replace("%"," pct").replace("‰"," permille").replace("#"," count ")
s = re.sub(r"\s+"," ", s)
s = re.sub(r"\s+","_", s)
s = re.sub(r"_+","_", s).strip("_")
if s and s[0].isdigit(): s = "col_" + s
return s or "unnamed_column"
def ensure_unique(names):
seen = {}; out = []
for n in names:
base = (n or "unnamed_column").lower()
if base not in seen:
seen[base] = 0; out.append(n)
else:
i = seen[base] + 1
while f"{n}_{i}".lower() in seen: i += 1
seen[base] = i; out.append(f"{n}_{i}")
seen[(f"{n}_{i}").lower()] = 0
return out
def compose_col(parts):
cleaned = []; prev = None
for p in parts:
if not p: continue
p_norm = str(p).strip()
if prev is not None and p_norm.lower() == prev.lower(): continue
cleaned.append(p_norm); prev = p_norm
if not cleaned: return ""
return clean_col_name("_".join(cleaned))
# ------------------------- Heuristics & detection -------------------------
def is_probably_footer(cells):
nonempty = [(i, v) for i, v in enumerate(cells) if v not in (None, "")]
if not nonempty: return False
if len(nonempty) <= 2:
text = " ".join(str(v) for _, v in nonempty).strip().lower()
if any(text.startswith(k) for k in ["note","notes","source","summary","disclaimer"]): return True
if len(text) > 50: return True
return False
def is_probably_data(cells, num_cols):
vals = [v for v in cells if v not in (None, "")]
if not vals: return False
nums_list = [v for v in vals if _is_numlike(v)]
num_num = len(nums_list); num_text = len(vals) - num_num
density = len(vals) / max(1, num_cols)
if num_num >= 2 and all(_is_year_token(v) for v in nums_list) and num_text >= 2:
return False
if num_num >= max(2, num_text): return True
if density >= 0.6 and num_num >= 2: return True
first = str(vals[0]).strip().lower() if vals else ""
if first in ("total","totals","grand total"): return True
return False
PIVOT_MARKERS = {"row labels","column labels","values","grand total","report filter","filters","∑ values","σ values","Σ values"}
def is_pivot_marker_string(s: str) -> bool:
if not s: return False
t = str(s).strip().lower()
if t in PIVOT_MARKERS: return True
if t.startswith(("sum of ","count of ","avg of ","average of ")): return True
if t.endswith(" total") or t.startswith("total "): return True
return False
def is_pivot_row(cells) -> bool:
text_cells = [str(v).strip() for v in cells if v not in (None, "")]
if not text_cells: return False
if any(is_pivot_marker_string(x) for x in text_cells): return True
agg_hits = sum(1 for x in text_cells if x.lower().startswith(("sum of","count of","avg of","average of","min of","max of")))
return agg_hits >= 2
def is_pivot_or_chart_sheet(ws: Worksheet) -> bool:
try:
if getattr(ws, "_charts", None): return True
except Exception: pass
if hasattr(ws, "_pivots") and getattr(ws, "_pivots"): return True
scan_rows = min(ws.max_row, 40); scan_cols = min(ws.max_column, 20)
pivotish = 0
for r in range(1, scan_rows+1):
row = [ws.cell(r,c).value for c in range(1, scan_cols+1)]
if is_pivot_row(row):
pivotish += 1
if pivotish >= 2: return True
name = (ws.title or "").lower()
if any(k in name for k in ("pivot","dashboard","chart","charts")): return True
return False
def _samples_for_column(rows, col_idx, max_items=20):
vals = []
for row in rows:
if col_idx < len(row):
v = row[col_idx]
if v not in (None, ""): vals.append(v)
if len(vals) >= max_items: break
return vals
def _heuristic_infer_col_name(samples):
if not samples: return None
if sum(1 for v in samples if _is_year_token(v)) >= max(2, int(0.8*len(samples))): return "year"
pct_hits = 0
for v in samples:
s = str(v).strip()
if s.endswith("%"): pct_hits += 1
else:
try:
f = float(s.replace(",",""))
if 0 <= f <= 1.0 or 0 <= f <= 100: pct_hits += 0.5
except: pass
if pct_hits >= max(2, int(0.7*len(samples))): return "percentage"
if sum(1 for v in samples if _is_numlike(v)) >= max(3, int(0.7*len(samples))):
intish = 0
for v in samples:
try:
if float(str(v).replace(",","")) == int(float(str(v).replace(",",""))): intish += 1
except: pass
if intish >= max(2, int(0.6*len(samples))): return "count"
return "value"
uniq = {str(v).strip().lower() for v in samples}
if len(uniq) <= 3 and max(len(str(v)) for v in samples) >= 30: return "question"
if sum(1 for v in samples if re.search(r"\d", str(v)) and ("-" in str(v) or "–" in str(v))) >= max(2, int(0.6*len(samples))): return "range"
if len(uniq) < max(5, int(0.5*len(samples))): return "category"
return None
def used_bounds(ws: Worksheet) -> Tuple[int,int,int,int]:
min_row, max_row, min_col, max_col = None, 0, None, 0
for r in ws.iter_rows():
for c in r:
v = c.value
if v is not None and str(v).strip() != "":
if min_row is None or c.row < min_row: min_row = c.row
if c.row > max_row: max_row = c.row
if min_col is None or c.column < min_col: min_col = c.column
if c.column > max_col: max_col = c.column
if min_row is None: return 1,0,1,0
return min_row, max_row, min_col, max_col
def build_merged_master_map(ws: Worksheet):
mapping = {}
for mr in ws.merged_cells.ranges:
min_col, min_row, max_col, max_row = mr.min_col, mr.min_row, mr.max_col, mr.max_row
master = (min_row, min_col)
for r in range(min_row, max_row+1):
for c in range(min_col, max_col+1):
mapping[(r,c)] = master
return mapping
def build_value_grid(ws: Worksheet, min_row: int, max_row: int, min_col: int, max_col: int):
merged_map = build_merged_master_map(ws)
nrows = max_row - min_row + 1; ncols = max_col - min_col + 1
grid = [[None]*ncols for _ in range(nrows)]
for r in range(min_row, max_row+1):
rr = r - min_row
for c in range(min_col, max_col+1):
cc = c - min_col
master = merged_map.get((r,c))
if master:
mr, mc = master; grid[rr][cc] = ws.cell(mr, mc).value
else:
grid[rr][cc] = ws.cell(r, c).value
return grid
def row_vals_from_grid(grid, r, min_row):
return grid[r - min_row]
def is_empty_row_vals(vals):
return not any(v not in (None, "") for v in vals)
def is_title_like_row_vals(vals, total_cols=20):
vals_ne = _nonempty(vals)
if not vals_ne: return False
if len(vals_ne) == 1: return True
coverage = len(vals_ne) / max(1, total_cols)
if coverage <= 0.2 and all(isinstance(v,str) and len(str(v))>20 for v in vals_ne): return True
uniq = {str(v).strip().lower() for v in vals_ne}
if len(uniq) == 1: return True
block = {"local currency unit per us dollar","exchange rate","average annual exchange rate"}
if any(str(v).strip().lower() in block for v in vals_ne): return True
return False
def is_header_candidate_row_vals(vals, total_cols=20):
vals_ne = _nonempty(vals)
if not vals_ne: return False
if is_title_like_row_vals(vals, total_cols): return False
nums = sum(1 for v in vals_ne if _is_numlike(v))
years = sum(1 for v in vals_ne if _is_year_token(v))
has_text = any(not _is_numlike(v) for v in vals_ne)
if years >= 2 and has_text: return True
if nums >= max(2, len(vals_ne)-nums): return years >= max(2, int(0.6*len(vals_ne)))
uniq_labels = {str(v).strip().lower() for v in vals_ne if not _is_numlike(v)}
return (len(vals_ne) >= 2) or (len(uniq_labels) >= 2)
def detect_tables_fast(ws: Worksheet, grid, min_row, max_row, min_col, max_col):
blocks = []
if is_pivot_or_chart_sheet(ws): return blocks
total_cols = max_col - min_col + 1
r = min_row
while r <= max_row:
vals = row_vals_from_grid(grid, r, min_row)
if is_empty_row_vals(vals) or is_title_like_row_vals(vals, total_cols) or is_pivot_row(vals):
r += 1; continue
if not is_probably_data(vals, total_cols):
r += 1; continue
data_start = r
header_rows = []
up = data_start - 1
while up >= min_row:
vup = row_vals_from_grid(grid, up, min_row)
if is_empty_row_vals(vup): break
if is_title_like_row_vals(vup, total_cols) or is_pivot_row(vup):
up -= 1; continue
if is_header_candidate_row_vals(vup, total_cols):
header_rows = []
hdr_row = up
while hdr_row >= min_row:
hdr_vals = row_vals_from_grid(grid, hdr_row, min_row)
if is_empty_row_vals(hdr_vals): break
if is_header_candidate_row_vals(hdr_vals, total_cols):
header_rows.insert(0, hdr_row); hdr_row -= 1
else: break
break
data_end = data_start
rr = data_start + 1
while rr <= max_row:
v = row_vals_from_grid(grid, rr, min_row)
if is_probably_footer(v) or is_pivot_row(v): break
if is_empty_row_vals(v): break
if is_probably_data(v, total_cols) or is_header_candidate_row_vals(v, total_cols):
data_end = rr
rr += 1
title_text = None
if header_rows:
top = header_rows[0]
for tr in range(max(min_row, top-3), top):
tv = row_vals_from_grid(grid, tr, min_row)
if is_title_like_row_vals(tv, total_cols):
first = next((str(x).strip() for x in tv if x not in (None,"")), None)
if first: title_text = first
break
if (header_rows or data_end - data_start >= 1) and data_start <= data_end:
blocks.append({"header_rows": header_rows, "data_start": data_start, "data_end": data_end, "title_text": title_text})
r = data_end + 1
while r <= max_row and is_empty_row_vals(row_vals_from_grid(grid, r, min_row)):
r += 1
return blocks
def expand_headers_from_grid(grid, header_rows, min_row, min_col, eff_max_col):
if not header_rows: return []
mat = []
for r in header_rows:
row_vals = row_vals_from_grid(grid, r, min_row)
row = [("" if (row_vals[c] is None) else str(row_vals[c]).strip()) for c in range(0, eff_max_col)]
last = ""
for i in range(len(row)):
if row[i] == "" and i > 0: row[i] = last
else: last = row[i]
mat.append(row)
return mat
def sheet_block_to_df_fast(ws, grid, min_row, max_row, min_col, max_col, header_rows, data_start, data_end):
import pandas as pd
total_cols = max_col - min_col + 1
if (not header_rows) and data_start and data_start > min_row:
prev = row_vals_from_grid(grid, data_start - 1, min_row)
if is_header_candidate_row_vals(prev, total_cols):
header_rows = [data_start - 1]
if (not header_rows) and data_start:
cur = row_vals_from_grid(grid, data_start, min_row)
nxt = row_vals_from_grid(grid, data_start + 1, min_row) if data_start + 1 <= max_row else []
if is_header_candidate_row_vals(cur, total_cols) and is_probably_data(nxt, total_cols):
header_rows = [data_start]; data_start += 1
if not header_rows or data_start is None or data_end is None or data_end < data_start:
import pandas as _pd
return _pd.DataFrame(), [], []
def used_upto_col():
maxc = 0
for r in list(header_rows) + list(range(data_start, data_end+1)):
vals = row_vals_from_grid(grid, r, min_row)
for c_off in range(total_cols):
v = vals[c_off]
if v not in (None, ""): maxc = max(maxc, c_off+1)
return maxc or total_cols
eff_max_col = used_upto_col()
header_mat = expand_headers_from_grid(grid, header_rows, min_row, min_col, eff_max_col)
def is_title_level(values):
total = len(values)
filled = [str(v).strip() for v in values if v not in (None, "")]
if total == 0: return False
coverage = len(filled) / total
if coverage <= 0.2 and len(filled) <= 2: return True
if filled:
uniq = {v.lower() for v in filled}
if len(uniq) == 1:
label = next(iter(uniq))
dom = sum(1 for v in values if isinstance(v,str) and v.strip().lower() == label)
if dom / total >= 0.6: return True
return False
usable_levels = [i for i in range(len(header_mat)) if not is_title_level(header_mat[i])]
if not usable_levels and header_mat: usable_levels = [len(header_mat) - 1]
cols = []
for c_off in range(eff_max_col):
parts = [header_mat[l][c_off] for l in range(usable_levels[0], usable_levels[-1]+1)] if usable_levels else []
cols.append(compose_col(parts))
cols = ensure_unique([clean_col_name(x) for x in cols])
data_rows = []
for r in range(data_start, data_end+1):
vals = row_vals_from_grid(grid, r, min_row)
row = [vals[c_off] for c_off in range(eff_max_col)]
if is_probably_footer(row): break
data_rows.append(row[:len(cols)])
if not data_rows:
import pandas as _pd
return _pd.DataFrame(columns=cols), header_mat, cols
keep_mask = [any(row[i] not in (None, "") for row in data_rows) for i in range(len(cols))]
kept_cols = [c for c,k in zip(cols, keep_mask) if k]
trimmed_rows = [[v for v,k in zip(row, keep_mask) if k] for row in data_rows]
import pandas as pd
df = pd.DataFrame(trimmed_rows, columns=kept_cols)
if any(str(c).startswith("unnamed_column") for c in df.columns):
new_names = list(df.columns)
for idx, name in enumerate(list(df.columns)):
if not str(name).startswith("unnamed_column"): continue
samples = _samples_for_column(trimmed_rows, idx, max_items=20)
guess = _heuristic_infer_col_name(samples)
if guess: new_names[idx] = clean_col_name(guess)
df.columns = ensure_unique([clean_col_name(x) for x in new_names])
return df, header_mat, kept_cols
# ------------------------- Optional LLM title inference -------------------------
def _llm_infer_table_title(header_mat, sample_rows, sheet_name):
if os.environ.get("EXCEL_LLM_INFER","0") != "1": return None
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key: return None
headers = []
if header_mat:
for c in range(len(header_mat[0])):
parts = [header_mat[l][c] for l in range(len(header_mat))]
parts = [p for p in parts if p]
if parts: headers.append("_".join(parts))
headers = headers[:10]
samples = [[str(x) for x in r[:6]] for r in sample_rows[:5]]
prompt = (
"Propose a short, human-readable title for a data table.\n"
"Keep it 3-6 words, Title Case, no punctuation at the end.\n"
f"Sheet: {sheet_name}\nHeaders: {headers}\nRow samples: {samples}\n"
"Answer with JSON: {\"title\": \"...\"}"
)
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
resp = client.chat.completions.create(
model=os.environ.get("OPENAI_MODEL","gpt-4o-mini"),
messages=[{"role":"user","content":prompt}], temperature=0.2,
)
text = resp.choices[0].message.content.strip()
except Exception:
return None
import re as _re, json as _json
m = _re.search(r"\{.*\}", text, re.S)
if not m: return None
try:
obj = _json.loads(m.group(0)); title = obj.get("title","").strip()
return title or None
except Exception: return None
def _heuristic_table_title(header_mat, sheet_name, idx):
if header_mat:
parts = []
levels = len(header_mat)
cols = len(header_mat[0]) if header_mat else 0
for c in range(min(6, cols)):
colparts = [header_mat[l][c] for l in range(min(levels, 2)) if header_mat[l][c]]
if colparts: parts.extend(colparts)
if parts:
base = " ".join(dict.fromkeys(parts))
return base[:60]
return f"{sheet_name} Table {idx}"
def infer_table_title(header_mat, sample_rows, sheet_name, idx):
title = _heuristic_table_title(header_mat, sheet_name, idx)
llm = _llm_infer_table_title(header_mat, sample_rows, sheet_name)
return llm or title
# ------------------------- Unified lineage helpers -------------------------
FILE_SCHEMA_TABLE = "__file_schema"
FILE_TABLES_TABLE = "__file_tables"
def ensure_lineage_tables(con):
con.execute(f"""
CREATE TABLE IF NOT EXISTS {FILE_SCHEMA_TABLE} (
file_name TEXT,
sheet_name TEXT,
table_name TEXT,
column_ordinal INTEGER,
original_name TEXT,
sql_column TEXT
)
""")
con.execute(f"""
CREATE TABLE IF NOT EXISTS {FILE_TABLES_TABLE} (
file_name TEXT,
sheet_name TEXT,
table_name TEXT,
block_index INTEGER,
start_row INTEGER,
end_row INTEGER,
header_rows_json TEXT,
inferred_title TEXT,
original_title_text TEXT
)
""")
def record_table_schema(con, file_name, sheet_name, table_name, columns):
"""
columns: list of tuples (column_ordinal, original_name, sql_column)
"""
ensure_lineage_tables(con)
# DuckDB doesn't support `IS ?` with NULL; branch the delete
if sheet_name is None:
con.execute(
f"DELETE FROM {FILE_SCHEMA_TABLE} WHERE file_name = ? AND sheet_name IS NULL AND table_name = ?",
[file_name, table_name],
)
else:
con.execute(
f"DELETE FROM {FILE_SCHEMA_TABLE} WHERE file_name = ? AND sheet_name = ? AND table_name = ?",
[file_name, sheet_name, table_name],
)
con.executemany(
f"INSERT INTO {FILE_SCHEMA_TABLE} (file_name, sheet_name, table_name, column_ordinal, original_name, sql_column) VALUES (?, ?, ?, ?, ?, ?)",
[(file_name, sheet_name, table_name, i, orig, sql) for (i, orig, sql) in columns],
)
def record_table_block(con, file_name, sheet_name, table_name, block_index, start_row, end_row, header_rows_json, inferred_title, original_title_text):
ensure_lineage_tables(con)
# DuckDB doesn't support `IS ?` with NULL; branch the delete
if sheet_name is None:
con.execute(
f"DELETE FROM {FILE_TABLES_TABLE} WHERE file_name = ? AND sheet_name IS NULL AND table_name = ? AND block_index = ?",
[file_name, table_name, int(block_index) if block_index is not None else 0],
)
else:
con.execute(
f"DELETE FROM {FILE_TABLES_TABLE} WHERE file_name = ? AND sheet_name = ? AND table_name = ? AND block_index = ?",
[file_name, sheet_name, table_name, int(block_index) if block_index is not None else 0],
)
con.execute(
f"""INSERT INTO {FILE_TABLES_TABLE}
(file_name, sheet_name, table_name, block_index, start_row, end_row, header_rows_json, inferred_title, original_title_text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
[
file_name, sheet_name, table_name,
int(block_index) if block_index is not None else 0,
int(start_row) if start_row is not None else None,
int(end_row) if end_row is not None else None,
header_rows_json, inferred_title, original_title_text
]
)
# --- block coalescing to avoid nested/overlapping duplicates ---
def coalesce_blocks(blocks: List[Dict]) -> List[Dict]:
"""Keep only maximal non-overlapping blocks by data row range."""
if not blocks: return blocks
blocks_sorted = sorted(blocks, key=lambda b: (b["data_start"], b["data_end"]))
result = []
for b in blocks_sorted:
if any(b["data_start"] >= x["data_start"] and b["data_end"] <= x["data_end"] for x in result):
continue # fully contained -> drop
result.append(b)
return result
# ------------------------- Persistence: Excel -------------------------
def persist(excel_path, duckdb_path):
try:
from duckdb import connect
except ImportError:
print("Error: DuckDB library not installed. Install with: pip install duckdb"); sys.exit(1)
try:
wb = load_workbook(excel_path, data_only=True)
except FileNotFoundError:
print(f"Error: Excel file not found: {excel_path}"); sys.exit(1)
except Exception as e:
print(f"Error loading Excel file: {e}"); sys.exit(1)
file_name = Path(excel_path).name
db_path = Path(duckdb_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
new_db = not db_path.exists()
con = connect(str(db_path))
if new_db: print(f"Created new DuckDB at: {db_path}")
# Ensure unified lineage tables exist
ensure_lineage_tables(con)
used_names = set(); total_tables = 0; total_rows = 0
for sheet in wb.sheetnames:
ws = wb[sheet]
try:
if not isinstance(ws, Worksheet):
print(f"Skipping chartsheet: {sheet}"); continue
except Exception: pass
if is_pivot_or_chart_sheet(ws):
print(f"Skipping pivot/chart-like sheet: {sheet}"); continue
min_row, max_row, min_col, max_col = used_bounds(ws)
if max_row < min_row: continue
grid = build_value_grid(ws, min_row, max_row, min_col, max_col)
blocks = detect_tables_fast(ws, grid, min_row, max_row, min_col, max_col)
blocks = coalesce_blocks(blocks)
if not blocks: continue
# per-sheet content hash set to avoid identical duplicate content
seen_content = set()
for idx, blk in enumerate(blocks, start=1):
df, header_mat, kept_cols = sheet_block_to_df_fast(
ws, grid, min_row, max_row, min_col, max_col,
blk["header_rows"], blk["data_start"], blk["data_end"]
)
if df.empty: continue
# Content hash (stable CSV representation)
csv_bytes = df.to_csv(index=False).encode("utf-8")
h = hashlib.sha256(csv_bytes).hexdigest()
if h in seen_content:
print(f"Skipping duplicate content on sheet {sheet} (block {idx})")
continue
seen_content.add(h)
# Build original composite header names for lineage mapping
original_cols = []
if header_mat:
levels = len(header_mat)
cols = len(header_mat[0]) if header_mat else 0
for c in range(cols):
parts = [header_mat[l][c] for l in range(levels)]
original_cols.append("_".join([p for p in parts if p]))
else:
original_cols = list(df.columns)
while len(original_cols) < len(df.columns): original_cols.append("unnamed")
title_orig = blk.get("title_text")
title = title_orig or infer_table_title(header_mat, df.values.tolist(), sheet, idx)
candidate = title if title else f"{sheet} Table {idx}"
table = ensure_unique_table_name(used_names, candidate)
# Create/replace table
con.execute(f'DROP TABLE IF EXISTS "{table}"')
con.register(f"{table}_temp", df)
con.execute(f'CREATE TABLE "{table}" AS SELECT * FROM {table}_temp')
con.unregister(f"{table}_temp")
# Record lineage (schema + block)
schema_rows = []
for cidx, (orig, sqlc) in enumerate(zip(original_cols[:len(df.columns)], df.columns), start=1):
schema_rows.append((cidx, str(orig), str(sqlc)))
record_table_schema(
con,
file_name=file_name,
sheet_name=sheet,
table_name=table,
columns=schema_rows,
)
record_table_block(
con,
file_name=file_name,
sheet_name=sheet,
table_name=table,
block_index=idx,
start_row=int(blk["data_start"]),
end_row=int(blk["data_end"]),
header_rows_json=json.dumps(blk["header_rows"]),
inferred_title=title if title else None,
original_title_text=title_orig if title_orig else None,
)
print(f"Created table {table} from sheet {sheet} with {len(df)} rows and {len(df.columns)} columns.")
total_tables += 1; total_rows += len(df)
con.close()
print(f"""\n✅ Completed.
- Created {total_tables} tables with {total_rows} total rows
- Column lineage: {FILE_SCHEMA_TABLE}
- Block metadata: {FILE_TABLES_TABLE}""")
# ------------------------- Persistence: CSV -------------------------
def persist_csv(csv_path, duckdb_path):
"""
Ingest a single CSV file into DuckDB AND write lineage, aligned with Excel.
- First row is headers.
- One table named from the CSV file name.
- Cleans headers and ensures uniqueness.
- Records lineage in __file_schema and __file_tables using the unified schema (with file_name).
"""
import pandas as pd
from duckdb import connect
csv_path = Path(csv_path)
if not csv_path.exists():
print(f"Error: CSV file not found: {csv_path}")
sys.exit(1)
# Keep original header names for lineage before cleaning
try:
df_raw = pd.read_csv(csv_path, header=0, encoding="utf-8-sig")
except UnicodeDecodeError:
df_raw = pd.read_csv(csv_path, header=0)
original_headers = list(df_raw.columns)
# Clean/normalize column names
def _clean_hdr(s):
s = str(s) if s is not None else ""
s = s.strip()
s = re.sub(r"\s+", " ", s)
return clean_col_name(s)
cleaned_cols = ensure_unique([_clean_hdr(c) for c in original_headers])
df = df_raw.copy()
df.columns = cleaned_cols
# Compute table name from file name
table = sanitize_table_name(csv_path.stem)
# Open / create DuckDB
db_path = Path(duckdb_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
new_db = not db_path.exists()
con = connect(str(db_path))
if new_db:
print(f"Created new DuckDB at: {db_path}")
# Ensure unified lineage tables (with file_name) exist
ensure_lineage_tables(con)
# Create/replace the data table
con.execute(f'DROP TABLE IF EXISTS "{table}"')
con.register(f"{table}_temp_df", df)
con.execute(f'CREATE TABLE "{table}" AS SELECT * FROM {table}_temp_df')
con.unregister(f"{table}_temp_df")
# Write lineage
file_name = csv_path.name
sheet_name = None # CSV has no sheet
block_index = 1 # single block/table for CSV
start_row = 2 # header is row 1, data starts at 2
end_row = len(df) + 1 # header + data rows
header_rows_json = "[1]" # header row index list as JSON
inferred_title = None
original_title_text = None
# Map original->sql columns
schema_rows = []
for i, (orig, sql) in enumerate(zip(original_headers, cleaned_cols), start=1):
schema_rows.append((i, str(orig), str(sql)))
record_table_schema(
con,
file_name=file_name,
sheet_name=sheet_name,
table_name=table,
columns=schema_rows
)
record_table_block(
con,
file_name=file_name,
sheet_name=sheet_name,
table_name=table,
block_index=block_index,
start_row=start_row,
end_row=end_row,
header_rows_json=header_rows_json,
inferred_title=inferred_title,
original_title_text=original_title_text
)
print(f'Created table {table} from CSV "{csv_path.name}" with {len(df)} rows and {len(df.columns)} columns.')
con.close()
# ------------------------- CLI -------------------------
def ensure_unique_table_name(existing: set, name: str) -> str:
base = sanitize_table_name(name) or "table"
if base not in existing:
existing.add(base); return base
i = 2
while f"{base}_{i}" in existing: i += 1
out = f"{base}_{i}"; existing.add(out); return out
def main():
import argparse
ap = argparse.ArgumentParser(description="Excel/CSV → DuckDB (unified --file + lineage).")
ap.add_argument("--file", required=True, help="Path to .xlsx/.xlsm/.xls or .csv")
ap.add_argument("--duckdb", required=True, help="Path to DuckDB file")
args = ap.parse_args()
if not os.path.exists(args.file):
print(f"Error: file not found: {args.file}")
sys.exit(1)
ext = Path(args.file).suffix.lower()
if ext in [".xlsx", ".xlsm", ".xls"]:
persist(args.file, args.duckdb)
elif ext == ".csv":
persist_csv(args.file, args.duckdb)
else:
print("Error: unsupported file type. Use .xlsx/.xlsm/.xls or .csv")
sys.exit(2)
if __name__ == "__main__":
main()