File size: 2,251 Bytes
7ff9600
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from __future__ import annotations

from pathlib import Path

import duckdb
import pytest

from hakari_bench.viewer.sql import (
    column_or_null,
    coalesce_existing_columns,
    qualified_column,
    quote_identifier,
    table_columns,
    table_exists,
)


def test_sql_identifier_helpers_quote_known_good_identifiers() -> None:
    columns = {"benchmark", "score"}

    assert quote_identifier("task_results", allowed={"task_results"}) == '"task_results"'
    assert qualified_column("tr", "score", allowed_columns=columns) == '"tr"."score"'
    assert column_or_null(columns, "benchmark", alias="tr") == '"tr"."benchmark"'
    assert column_or_null(columns, "missing", alias="tr") == "NULL"
    assert coalesce_existing_columns(columns, ["finished_at_utc", "benchmark"], alias="tr") == '"tr"."benchmark"'


def test_sql_identifier_helpers_reject_untrusted_identifiers() -> None:
    with pytest.raises(ValueError, match="not allowlisted"):
        quote_identifier("task_results", allowed={"dataset_metadata"})
    with pytest.raises(ValueError, match="Invalid SQL identifier"):
        quote_identifier("task_results; DROP TABLE task_results")
    with pytest.raises(ValueError, match="Invalid SQL identifier"):
        qualified_column("tr; DROP TABLE task_results", "score", allowed_columns={"score"})
    with pytest.raises(ValueError, match="not allowlisted"):
        qualified_column("tr", "score", allowed_columns={"benchmark"})


def test_table_introspection_uses_allowlisted_identifiers(tmp_path: Path) -> None:
    db_path = tmp_path / "results.duckdb"
    con = duckdb.connect(str(db_path))
    try:
        con.execute("CREATE TABLE task_results (benchmark VARCHAR, score DOUBLE)")

        assert table_exists(con, "task_results", allowed_tables={"task_results"}) is True
        assert table_columns(con, "task_results", allowed_tables={"task_results"}) == {"benchmark", "score"}
        with pytest.raises(ValueError, match="not allowlisted"):
            table_exists(con, "task_results", allowed_tables={"dataset_metadata"})
        with pytest.raises(ValueError, match="Invalid SQL identifier"):
            table_columns(con, "task_results; DROP TABLE task_results", allowed_tables=None)
    finally:
        con.close()