Atomic-VSA / src /vsa_csv_loader.jl
marshad180's picture
Update Atomic VSA deployment
fa6bd30 verified
using Printf
# ==============================================================================
# VSA CSV LOADER — Universal CSV → VDBTable Pipeline
# Handles: quoted fields, multi-value cells, auto type detection
# Produces: VDBTable ready for VSA queries & SQL
# ==============================================================================
# --- CSV Parsing (handles quoted commas) ---
function csv_parse_line(line::AbstractString)
fields = String[]
current = IOBuffer()
in_quotes = false
for c in line
if c == '"'
in_quotes = !in_quotes
elseif c == ',' && !in_quotes
push!(fields, strip(String(take!(current))))
current = IOBuffer()
else
write(current, c)
end
end
push!(fields, strip(String(take!(current))))
return fields
end
function csv_read(path::String; max_rows::Int=0)
lines = readlines(path)
isempty(lines) && return (String[], Vector{Vector{String}}())
headers = csv_parse_line(lines[1])
rows = Vector{Vector{String}}()
limit = max_rows > 0 ? min(max_rows + 1, length(lines)) : length(lines)
for i in 2:limit
line = strip(lines[i])
isempty(line) && continue
fields = csv_parse_line(line)
# Pad or trim to match header count
while length(fields) < length(headers)
push!(fields, "")
end
if length(fields) > length(headers)
fields = fields[1:length(headers)]
end
push!(rows, fields)
end
return (headers, rows)
end
# --- Auto Type Detection ---
# Decides if a column is numeric (THERMO) or categorical (CAT)
struct ColumnProfile
name::String
is_numeric::Bool
min_val::Float64
max_val::Float64
unique_values::Set{String}
sample_count::Int
end
function profile_columns(headers::Vector{String}, rows::Vector{Vector{String}})
profiles = ColumnProfile[]
for (j, name) in enumerate(headers)
values = [row[j] for row in rows if j <= length(row)]
# Try parsing as numeric
nums = Float64[]
for v in values
if !isempty(v)
n = tryparse(Float64, v)
n !== nothing && push!(nums, n)
end
end
non_empty = filter(!isempty, values)
numeric_ratio = length(non_empty) > 0 ? length(nums) / length(non_empty) : 0.0
uniques = Set(non_empty)
# Column is numeric if >80% parse as numbers AND unique count > 10
is_numeric = numeric_ratio > 0.8 && length(uniques) > 10
min_v = isempty(nums) ? 0.0 : minimum(nums)
max_v = isempty(nums) ? 100.0 : maximum(nums)
push!(profiles, ColumnProfile(name, is_numeric, min_v, max_v, uniques, length(non_empty)))
end
return profiles
end
# --- Build VDBTable from CSV ---
"""
csv_to_table(reg, path; dim, id_col, max_rows, max_categories)
Load a CSV file into a VDBTable.
- `reg`: VSARegistry for atom allocation
- `path`: Path to CSV file
- `dim`: Vector dimension (default 2048)
- `id_col`: Column index to use as record ID (default 1)
- `max_rows`: Maximum rows to load (0 = all)
- `max_categories`: Maximum unique values for a CAT encoder (default 500)
"""
function csv_to_table(reg::VSARegistry, path::String;
dim::Int=2048,
id_col::Int=1,
max_rows::Int=0,
max_categories::Int=500,
table_name::String="")
# Read CSV
headers, rows = csv_read(path; max_rows=max_rows)
isempty(rows) && error("Empty CSV: $path")
# Auto-detect table name from filename
if isempty(table_name)
table_name = replace(basename(path), ".csv" => "")
table_name = replace(table_name, r"[^a-zA-Z0-9_]" => "_")
end
# Profile columns
profiles = profile_columns(headers, rows)
# Build schema (skip the ID column from encoding)
schema = Tuple{String, VSAEncoder}[]
col_indices = Int[] # Which CSV column index maps to which schema column
for (j, prof) in enumerate(profiles)
j == id_col && continue # Skip ID column
enc = if prof.is_numeric
# Thermometer encoding for numeric data
margin = (prof.max_val - prof.min_val) * 0.1
min_v = prof.min_val - margin
max_v = prof.max_val + margin
ThermometerEncoder(reg, prof.name, min_v, max_v; levels=100)
else
# Categorical encoding — collect top N categories
cats = collect(prof.unique_values)
if length(cats) > max_categories
# Take top by frequency
freq = Dict{String,Int}()
for row in rows
j <= length(row) && !isempty(row[j]) && (freq[row[j]] = get(freq, row[j], 0) + 1)
end
sorted = sort(collect(freq), by=x -> -x.second)
cats = [x.first for x in sorted[1:min(max_categories, length(sorted))]]
end
CategoricalEncoder(reg, prof.name, cats)
end
push!(schema, (prof.name, enc))
push!(col_indices, j)
end
# Create table
table = create_table(reg, table_name, dim, schema)
# Insert rows
inserted = 0
for row in rows
id = id_col <= length(row) ? row[id_col] : "row_$(inserted+1)"
isempty(id) && (id = "row_$(inserted+1)")
fields = Dict{String, Any}()
for (si, ci) in enumerate(col_indices)
ci <= length(row) || continue
val_str = row[ci]
isempty(val_str) && continue
col_name = schema[si][1]
enc = schema[si][2]
if enc isa ThermometerEncoder
v = tryparse(Float64, val_str)
v !== nothing && (fields[col_name] = v)
else
fields[col_name] = val_str
end
end
vdb_insert!(table, id, fields)
inserted += 1
end
return table, inserted
end
# --- Summary ---
function csv_summary(path::String; max_rows::Int=5)
headers, rows = csv_read(path; max_rows=max_rows)
profiles = profile_columns(headers, rows)
println(" File: $(basename(path))")
println(" Rows: $(length(rows)) (sampled for profiling)")
println(" Columns: $(length(headers))")
println(" ─────────────────────────────────────────────")
for prof in profiles
type_str = prof.is_numeric ?
@sprintf("NUMERIC [%.1f, %.1f]", prof.min_val, prof.max_val) :
"CATEGORICAL ($(length(prof.unique_values)) unique)"
@printf(" %-25s %s\n", prof.name, type_str)
end
end
# --- Bulk Load Helper ---
# Load multiple CSVs into a single VSAEngine
function csv_load_all!(engine::VSAEngine, paths::Vector{String};
max_rows::Int=0, max_categories::Int=500)
results = Dict{String, NamedTuple{(:table, :rows), Tuple{VDBTable, Int}}}()
for path in paths
t = @elapsed begin
table, n = csv_to_table(engine.reg, path;
dim=engine.dim,
max_rows=max_rows,
max_categories=max_categories)
engine.tables[table.name] = table
end
@printf(" ✓ %-25s %5d records (%.3f s)\n", table.name, n, t)
results[table.name] = (table=table, rows=n)
end
return results
end