Spaces:
Sleeping
Sleeping
| using Printf | |
| # ============================================================================== | |
| # VSA CSV LOADER — Universal CSV → VDBTable Pipeline | |
| # Handles: quoted fields, multi-value cells, auto type detection | |
| # Produces: VDBTable ready for VSA queries & SQL | |
| # ============================================================================== | |
| # --- CSV Parsing (handles quoted commas) --- | |
| function csv_parse_line(line::AbstractString) | |
| fields = String[] | |
| current = IOBuffer() | |
| in_quotes = false | |
| for c in line | |
| if c == '"' | |
| in_quotes = !in_quotes | |
| elseif c == ',' && !in_quotes | |
| push!(fields, strip(String(take!(current)))) | |
| current = IOBuffer() | |
| else | |
| write(current, c) | |
| end | |
| end | |
| push!(fields, strip(String(take!(current)))) | |
| return fields | |
| end | |
| function csv_read(path::String; max_rows::Int=0) | |
| lines = readlines(path) | |
| isempty(lines) && return (String[], Vector{Vector{String}}()) | |
| headers = csv_parse_line(lines[1]) | |
| rows = Vector{Vector{String}}() | |
| limit = max_rows > 0 ? min(max_rows + 1, length(lines)) : length(lines) | |
| for i in 2:limit | |
| line = strip(lines[i]) | |
| isempty(line) && continue | |
| fields = csv_parse_line(line) | |
| # Pad or trim to match header count | |
| while length(fields) < length(headers) | |
| push!(fields, "") | |
| end | |
| if length(fields) > length(headers) | |
| fields = fields[1:length(headers)] | |
| end | |
| push!(rows, fields) | |
| end | |
| return (headers, rows) | |
| end | |
| # --- Auto Type Detection --- | |
| # Decides if a column is numeric (THERMO) or categorical (CAT) | |
| struct ColumnProfile | |
| name::String | |
| is_numeric::Bool | |
| min_val::Float64 | |
| max_val::Float64 | |
| unique_values::Set{String} | |
| sample_count::Int | |
| end | |
| function profile_columns(headers::Vector{String}, rows::Vector{Vector{String}}) | |
| profiles = ColumnProfile[] | |
| for (j, name) in enumerate(headers) | |
| values = [row[j] for row in rows if j <= length(row)] | |
| # Try parsing as numeric | |
| nums = Float64[] | |
| for v in values | |
| if !isempty(v) | |
| n = tryparse(Float64, v) | |
| n !== nothing && push!(nums, n) | |
| end | |
| end | |
| non_empty = filter(!isempty, values) | |
| numeric_ratio = length(non_empty) > 0 ? length(nums) / length(non_empty) : 0.0 | |
| uniques = Set(non_empty) | |
| # Column is numeric if >80% parse as numbers AND unique count > 10 | |
| is_numeric = numeric_ratio > 0.8 && length(uniques) > 10 | |
| min_v = isempty(nums) ? 0.0 : minimum(nums) | |
| max_v = isempty(nums) ? 100.0 : maximum(nums) | |
| push!(profiles, ColumnProfile(name, is_numeric, min_v, max_v, uniques, length(non_empty))) | |
| end | |
| return profiles | |
| end | |
| # --- Build VDBTable from CSV --- | |
| """ | |
| csv_to_table(reg, path; dim, id_col, max_rows, max_categories) | |
| Load a CSV file into a VDBTable. | |
| - `reg`: VSARegistry for atom allocation | |
| - `path`: Path to CSV file | |
| - `dim`: Vector dimension (default 2048) | |
| - `id_col`: Column index to use as record ID (default 1) | |
| - `max_rows`: Maximum rows to load (0 = all) | |
| - `max_categories`: Maximum unique values for a CAT encoder (default 500) | |
| """ | |
| function csv_to_table(reg::VSARegistry, path::String; | |
| dim::Int=2048, | |
| id_col::Int=1, | |
| max_rows::Int=0, | |
| max_categories::Int=500, | |
| table_name::String="") | |
| # Read CSV | |
| headers, rows = csv_read(path; max_rows=max_rows) | |
| isempty(rows) && error("Empty CSV: $path") | |
| # Auto-detect table name from filename | |
| if isempty(table_name) | |
| table_name = replace(basename(path), ".csv" => "") | |
| table_name = replace(table_name, r"[^a-zA-Z0-9_]" => "_") | |
| end | |
| # Profile columns | |
| profiles = profile_columns(headers, rows) | |
| # Build schema (skip the ID column from encoding) | |
| schema = Tuple{String, VSAEncoder}[] | |
| col_indices = Int[] # Which CSV column index maps to which schema column | |
| for (j, prof) in enumerate(profiles) | |
| j == id_col && continue # Skip ID column | |
| enc = if prof.is_numeric | |
| # Thermometer encoding for numeric data | |
| margin = (prof.max_val - prof.min_val) * 0.1 | |
| min_v = prof.min_val - margin | |
| max_v = prof.max_val + margin | |
| ThermometerEncoder(reg, prof.name, min_v, max_v; levels=100) | |
| else | |
| # Categorical encoding — collect top N categories | |
| cats = collect(prof.unique_values) | |
| if length(cats) > max_categories | |
| # Take top by frequency | |
| freq = Dict{String,Int}() | |
| for row in rows | |
| j <= length(row) && !isempty(row[j]) && (freq[row[j]] = get(freq, row[j], 0) + 1) | |
| end | |
| sorted = sort(collect(freq), by=x -> -x.second) | |
| cats = [x.first for x in sorted[1:min(max_categories, length(sorted))]] | |
| end | |
| CategoricalEncoder(reg, prof.name, cats) | |
| end | |
| push!(schema, (prof.name, enc)) | |
| push!(col_indices, j) | |
| end | |
| # Create table | |
| table = create_table(reg, table_name, dim, schema) | |
| # Insert rows | |
| inserted = 0 | |
| for row in rows | |
| id = id_col <= length(row) ? row[id_col] : "row_$(inserted+1)" | |
| isempty(id) && (id = "row_$(inserted+1)") | |
| fields = Dict{String, Any}() | |
| for (si, ci) in enumerate(col_indices) | |
| ci <= length(row) || continue | |
| val_str = row[ci] | |
| isempty(val_str) && continue | |
| col_name = schema[si][1] | |
| enc = schema[si][2] | |
| if enc isa ThermometerEncoder | |
| v = tryparse(Float64, val_str) | |
| v !== nothing && (fields[col_name] = v) | |
| else | |
| fields[col_name] = val_str | |
| end | |
| end | |
| vdb_insert!(table, id, fields) | |
| inserted += 1 | |
| end | |
| return table, inserted | |
| end | |
| # --- Summary --- | |
| function csv_summary(path::String; max_rows::Int=5) | |
| headers, rows = csv_read(path; max_rows=max_rows) | |
| profiles = profile_columns(headers, rows) | |
| println(" File: $(basename(path))") | |
| println(" Rows: $(length(rows)) (sampled for profiling)") | |
| println(" Columns: $(length(headers))") | |
| println(" ─────────────────────────────────────────────") | |
| for prof in profiles | |
| type_str = prof.is_numeric ? | |
| @sprintf("NUMERIC [%.1f, %.1f]", prof.min_val, prof.max_val) : | |
| "CATEGORICAL ($(length(prof.unique_values)) unique)" | |
| @printf(" %-25s %s\n", prof.name, type_str) | |
| end | |
| end | |
| # --- Bulk Load Helper --- | |
| # Load multiple CSVs into a single VSAEngine | |
| function csv_load_all!(engine::VSAEngine, paths::Vector{String}; | |
| max_rows::Int=0, max_categories::Int=500) | |
| results = Dict{String, NamedTuple{(:table, :rows), Tuple{VDBTable, Int}}}() | |
| for path in paths | |
| t = @elapsed begin | |
| table, n = csv_to_table(engine.reg, path; | |
| dim=engine.dim, | |
| max_rows=max_rows, | |
| max_categories=max_categories) | |
| engine.tables[table.name] = table | |
| end | |
| @printf(" ✓ %-25s %5d records (%.3f s)\n", table.name, n, t) | |
| results[table.name] = (table=table, rows=n) | |
| end | |
| return results | |
| end | |