| import types |
|
|
| import pytest |
|
|
| import app |
|
|
|
|
| |
| |
| |
|
|
| def reset_model_state(): |
| app._model = None |
| app._tokenizer = None |
| app._current_model_id = None |
|
|
|
|
| def assistant_text(result): |
| return result[0][-1]["content"] |
|
|
|
|
| def sql_output(result): |
| return result[4] |
|
|
|
|
| def status_html(result): |
| return result[6] |
|
|
|
|
| @pytest.fixture(autouse=True) |
| def clean_model_state(): |
| reset_model_state() |
| yield |
| reset_model_state() |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.parametrize( |
| ("message", "expected"), |
| [ |
| ( |
| "crie tabela pesquisadores com id nome artigo e curriculo", |
| ["CREATE TABLE pesquisadores", "id INTEGER", "nome TEXT", "artigo TEXT", "curriculo TEXT"], |
| ), |
| ( |
| "cria tabela animal com nome tamanho peso especie", |
| ["CREATE TABLE animal", "nome TEXT", "tamanho TEXT", "peso NUMERIC", "especie TEXT"], |
| ), |
| ( |
| "faça tabela clientes com id nome email", |
| ["CREATE TABLE clientes", "id INTEGER", "nome TEXT", "email TEXT"], |
| ), |
| ( |
| "create table researchers with id, name, articles and cv", |
| ["CREATE TABLE researchers", "id INTEGER", "name TEXT", "articles TEXT", "cv TEXT"], |
| ), |
| ( |
| "crie tabela alunos com id int nome text nota numeric", |
| ["CREATE TABLE alunos", "id INTEGER", "nome TEXT", "nota NUMERIC"], |
| ), |
| ( |
| "crie tabela pesquisadores com id nome artigo curriculo", |
| ["CREATE TABLE pesquisadores", "id INTEGER", "nome TEXT", "artigo TEXT", "curriculo TEXT"], |
| ), |
| ], |
| ) |
| def test_create_table_without_model(message, expected, monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response(message, [], "", None, None) |
|
|
| for fragment in expected: |
| assert fragment in sql_output(result), f"missing: {fragment!r}" |
| assert "validator-ok" in result[5] |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
| assert "without calling the model" in status_html(result) |
|
|
|
|
| |
| |
| |
|
|
| def test_create_table_from_active_preset(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response( |
| "gere esta tabela", [], app.PRESETS["employees"], None, None |
| ) |
|
|
| assert "CREATE TABLE employees" in sql_output(result) |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
| assert "without calling the model" in status_html(result) |
|
|
|
|
| def test_create_table_response_updates_schema_context_display(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response( |
| "create table animals with id name species weight", |
| [], |
| app.PRESETS["products"], |
| None, |
| None, |
| ) |
|
|
| assert "CREATE TABLE animals" in result[2] |
| assert "Context: animals" in result[8] |
| assert "CREATE TABLE animals" in result[8] |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.parametrize( |
| ("message", "expected_col"), |
| [ |
| ("adicione cpf", "cpf TEXT"), |
| ("add email", "email TEXT"), |
| ("inclua telefone", "telefone TEXT"), |
| ("acrescente campo bonus numeric", "bonus NUMERIC"), |
| ("adicione: matricula", "matricula TEXT"), |
| ], |
| ) |
| def test_add_column_variants(message, expected_col, monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela funcionarios com id nome salario", [], "", None, None |
| ) |
|
|
| result = app.generate_response(message, base[0], "", None, None) |
|
|
| assert expected_col in sql_output(result) |
| assert "CREATE TABLE funcionarios" in sql_output(result) |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
| assert "without calling the model" in status_html(result) |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.parametrize( |
| ("message", "removed_col"), |
| [ |
| ("remova salario", "salario"), |
| ("remove nome", "nome"), |
| ("delete salario", "salario"), |
| ("drop coluna id", "id"), |
| ], |
| ) |
| def test_remove_column_variants(message, removed_col, monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela funcionarios com id nome salario", [], "", None, None |
| ) |
|
|
| result = app.generate_response(message, base[0], "", None, None) |
|
|
| assert "CREATE TABLE funcionarios" in sql_output(result) |
| assert removed_col not in sql_output(result) |
| assert "validator-ok" in result[5] |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.parametrize( |
| "edit_message", |
| [ |
| "altere para ter também email", |
| "mude adicionando telefone", |
| ], |
| ) |
| def test_edit_intent_recognizes_pt_conjugations(edit_message, monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela x com id nome", [], "", None, None |
| ) |
|
|
| result = app.generate_response(edit_message, base[0], "", None, None) |
|
|
| assert "CREATE TABLE x" in sql_output(result) |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
| assert "without calling the model" in status_html(result) |
|
|
|
|
| |
| |
| |
|
|
| def test_add_multiple_columns(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela pesquisadores com id nome artigo e curriculo", [], "", None, None |
| ) |
|
|
| result = app.generate_response("add email and phone", base[0], "", None, None) |
|
|
| assert "email TEXT" in sql_output(result) |
| assert "phone TEXT" in sql_output(result) |
|
|
|
|
| def test_remove_column(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela pesquisadores com id nome artigo e curriculo", [], "", None, None |
| ) |
| added = app.generate_response("adicione cpf", base[0], "", None, None) |
|
|
| result = app.generate_response("remover curriculo", added[0], "", None, None) |
|
|
| assert "curriculo TEXT" not in sql_output(result) |
| assert "cpf TEXT" in sql_output(result) |
| assert "id INTEGER" in sql_output(result) |
| assert "validator-ok" in result[5] |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.parametrize( |
| "history", |
| [ |
| [{"role": "assistant", "content": "```sql\nCREATE TABLE pesquisadores (\n id INTEGER,\n nome TEXT\n);\n```"}], |
| [{"role": "assistant", "content": [{"text": "```sql\nCREATE TABLE pesquisadores (\n id INTEGER,\n nome TEXT\n);\n```"}]}], |
| [{"role": "assistant", "content": "CREATE TABLE pesquisadores (\n id INTEGER,\n nome TEXT\n);"}], |
| ], |
| ) |
| def test_edit_from_history_content_shapes(history, monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response( |
| "edita ela para ter um novo elemento: cpf", history, "", None, None |
| ) |
|
|
| assert "CREATE TABLE pesquisadores" in sql_output(result) |
| assert "cpf TEXT" in sql_output(result) |
| assert "id INTEGER" in sql_output(result) |
| assert "validator-ok" in result[5] |
|
|
|
|
| |
| |
| |
|
|
| def test_edit_from_active_schema_no_history(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response( |
| "adicione bonus", [], app.PRESETS["employees"], None, None |
| ) |
|
|
| assert "CREATE TABLE employees" in sql_output(result) |
| assert "bonus" in sql_output(result) |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
| assert "without calling the model" in status_html(result) |
|
|
|
|
| |
| |
| |
|
|
| def test_last_create_table_returns_most_recent(): |
| history = [ |
| {"role": "assistant", "content": "```sql\nCREATE TABLE old (x TEXT);\n```"}, |
| {"role": "user", "content": "adicione id"}, |
| {"role": "assistant", "content": "```sql\nCREATE TABLE new (id INTEGER);\n```"}, |
| ] |
| result = app.last_create_table_from_history(history) |
| assert "CREATE TABLE new" in result |
| assert "CREATE TABLE old" not in result |
|
|
|
|
| |
| |
| |
|
|
| def test_full_schema_build_flow(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| r1 = app.generate_response( |
| "crie tabela produtos com id nome preco", [], "", None, None |
| ) |
| assert "CREATE TABLE produtos" in sql_output(r1) |
| assert "preco NUMERIC" in sql_output(r1) |
|
|
| r2 = app.generate_response("adicione categoria e estoque", r1[0], "", None, None) |
| assert "categoria TEXT" in sql_output(r2) |
| assert "estoque INTEGER" in sql_output(r2) |
| assert "id INTEGER" in sql_output(r2) |
|
|
| r3 = app.generate_response("remova preco", r2[0], "", None, None) |
| assert "preco" not in sql_output(r3) |
| assert "categoria TEXT" in sql_output(r3) |
|
|
| r4 = app.generate_response("qual o produto mais caro?", r3[0], sql_output(r3), None, None) |
| assert "Load a model" in status_html(r4) |
|
|
|
|
| |
| |
| |
|
|
| def test_sql_prompt_uses_schema_template(): |
| prompt = app.build_generation_prompt( |
| app.PRESETS["employees"], |
| "What is the average salary per department?", |
| ) |
| assert "CREATE TABLE employees" in prompt |
| assert "<|user|>" in prompt |
| assert "<|assistant|>" in prompt |
|
|
|
|
| def test_sql_prompt_fallback_schema_when_empty(): |
| prompt = app.build_generation_prompt("", "select all rows") |
| assert "CREATE TABLE unknown (id INTEGER)" in prompt |
|
|
|
|
| def test_sql_prompt_normalizes_pt_br_query_to_english(): |
| prompt = app.build_generation_prompt(app.PRESETS["products"], "qual o produto mais caro?") |
|
|
| assert "Question: what is the most expensive product?" in prompt |
| assert "qual o produto mais caro" not in prompt |
|
|
|
|
| def test_example_prompts_translate_pt_br_phrase(): |
| html = app.render_example_prompts() |
|
|
| assert "what is the most expensive product?" in html |
| assert "qual o produto mais caro" not in html |
|
|
|
|
| def test_model_metadata_uses_compact_source_contract_grid(): |
| html = app.model_metadata() |
|
|
| assert html.count('class="stat-card"') == 4 |
| assert "source-row" not in html |
|
|
|
|
| def test_sql_prompt_normalizes_grouped_average_to_english(): |
| prompt = app.build_generation_prompt( |
| app.PRESETS["employees"], |
| "qual a media de salario por departamento", |
| ) |
|
|
| assert "Question: what is the average salary by department?" in prompt |
| assert "media de salario" not in prompt |
|
|
|
|
| def test_deterministic_sql_query_handles_most_expensive_product(): |
| sql = app.sql_core.deterministic_sql_query("qual o produto mais caro?", app.PRESETS["products"]) |
|
|
| assert sql == "SELECT * FROM products ORDER BY price DESC LIMIT 1;" |
| assert "validator-ok" in app.validate_sql(sql) |
|
|
|
|
| def test_deterministic_sql_query_handles_pt_br_grouped_average(): |
| sql = app.sql_core.deterministic_sql_query( |
| "qual a media de salario por departamento", |
| app.PRESETS["employees"], |
| ) |
|
|
| assert sql == "SELECT department, AVG(salary) AS average_salary FROM employees GROUP BY department;" |
| assert "validator-ok" in app.validate_sql(sql) |
|
|
|
|
| def test_deterministic_sql_query_handles_pt_br_comparison(): |
| sql = app.sql_core.deterministic_sql_query( |
| "mostre os alunos com nota maior que 8", |
| app.PRESETS["students"], |
| ) |
|
|
| assert sql == "SELECT * FROM students WHERE grade > 8;" |
| assert "validator-ok" in app.validate_sql(sql) |
|
|
|
|
| def test_deterministic_sql_query_handles_custom_weight_ranking(): |
| schema = "CREATE TABLE animal (nome TEXT, nome_cientifico TEXT, peso NUMERIC, especie TEXT)" |
|
|
| sql = app.sql_core.deterministic_sql_query("qual o maior peso?", schema) |
|
|
| assert sql == "SELECT * FROM animal ORDER BY peso DESC LIMIT 1;" |
| assert "validator-ok" in app.validate_sql(sql) |
|
|
|
|
| def test_custom_animal_weight_flow_without_model(monkeypatch): |
| monkeypatch.setattr(app, "_generate_model_text", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| created = app.generate_response( |
| "crie tabela animal com nome, nome_cientifico, peso e especie", |
| [], |
| "", |
| None, |
| None, |
| ) |
| result = app.generate_response("qual o maior peso?", created[0], created[2], None, created[7]) |
|
|
| assert "CREATE TABLE animal" in sql_output(created) |
| assert sql_output(result) == "SELECT * FROM animal ORDER BY peso DESC LIMIT 1;" |
| assert "validator-ok" in result[5] |
| assert app.SOURCE_DETERMINISTIC_SQL_TEMPLATE in status_html(result) |
| assert "deterministic SQL template" in status_html(result) |
|
|
|
|
| def test_pt_br_template_query_does_not_require_model(monkeypatch): |
| monkeypatch.setattr(app, "_generate_model_text", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response( |
| "qual o produto mais caro?", |
| [], |
| app.PRESETS["products"], |
| None, |
| None, |
| ) |
|
|
| assert sql_output(result) == "SELECT * FROM products ORDER BY price DESC LIMIT 1;" |
| assert app.SOURCE_DETERMINISTIC_SQL_TEMPLATE in status_html(result) |
| assert "deterministic SQL template" in status_html(result) |
|
|
|
|
| def test_sql_prompt_translates_column_aliases_to_english(): |
| prompt = app.build_generation_prompt( |
| app.PRESETS["students"], |
| "mostre os alunos com nota maior que 8", |
| ) |
|
|
| assert "Question: show the students with grade greater than 8?" in prompt |
| assert "nota maior que" not in prompt |
|
|
|
|
| def test_pt_br_sql_query_reaches_model_with_english_prompt(monkeypatch): |
| app._model = types.SimpleNamespace(generation_config=types.SimpleNamespace(eos_token_id=0)) |
| app._tokenizer = types.SimpleNamespace(eos_token_id=0, pad_token_id=0) |
| app._current_model_id = app.FINE_TUNED_MODEL_ID |
| seen = {} |
|
|
| def fake_generate(prompt, generation_kind): |
| seen["prompt"] = prompt |
| assert generation_kind == app.model_core.SQL_GENERATION |
| return "SELECT name FROM products ORDER BY price DESC LIMIT 1;", 1 |
|
|
| monkeypatch.setattr(app, "_generate_model_text", fake_generate) |
|
|
| result = app.generate_response( |
| "liste produtos por categoria", |
| [], |
| app.PRESETS["products"], |
| app.FINE_TUNED_MODEL_KEY, |
| None, |
| ) |
|
|
| assert "SELECT name FROM products" in sql_output(result) |
| assert "Question: list products by category?" in seen["prompt"] |
| assert "liste produtos" not in seen["prompt"] |
| assert app.SOURCE_FINE_TUNED_MODEL in status_html(result) |
|
|
|
|
| def test_sql_model_rejects_destructive_output(monkeypatch): |
| app._model = types.SimpleNamespace(generation_config=types.SimpleNamespace(eos_token_id=0)) |
| app._tokenizer = types.SimpleNamespace(eos_token_id=0, pad_token_id=0) |
| app._current_model_id = app.FINE_TUNED_MODEL_ID |
|
|
| monkeypatch.setattr( |
| app, |
| "_generate_model_text", |
| lambda *a, **k: ("DROP TABLE employees;", 1), |
| ) |
|
|
| result = app.generate_response( |
| "find employees named Alice", |
| [], |
| app.PRESETS["employees"], |
| app.FINE_TUNED_MODEL_KEY, |
| None, |
| ) |
|
|
| assert sql_output(result) == "" |
| assert "DROP TABLE" not in assistant_text(result) |
| assert "rejected" in assistant_text(result).lower() |
| assert app.SOURCE_FINE_TUNED_MODEL in status_html(result) |
| assert "Rejected non-SELECT/WITH model output" in status_html(result) |
|
|
|
|
| def test_sql_model_rejects_non_sql_output_as_chat_capability(monkeypatch): |
| app._model = types.SimpleNamespace(generation_config=types.SimpleNamespace(eos_token_id=0)) |
| app._tokenizer = types.SimpleNamespace(eos_token_id=0, pad_token_id=0) |
| app._current_model_id = app.FINE_TUNED_MODEL_ID |
|
|
| monkeypatch.setattr( |
| app, |
| "_generate_model_text", |
| lambda *a, **k: ("I can help with that, but I need more details.", 1), |
| ) |
|
|
| result = app.generate_response( |
| "find employees named Alice", |
| [], |
| app.PRESETS["employees"], |
| app.FINE_TUNED_MODEL_KEY, |
| None, |
| ) |
|
|
| assert sql_output(result) == "" |
| assert "I can help" not in assistant_text(result) |
| assert "chat response" not in status_html(result).lower() |
| assert app.SOURCE_FINE_TUNED_MODEL in status_html(result) |
| assert "Rejected non-SELECT/WITH model output" in status_html(result) |
|
|
|
|
| def test_sql_model_rejects_hallucinated_schema_column(monkeypatch): |
| app._model = types.SimpleNamespace(generation_config=types.SimpleNamespace(eos_token_id=0)) |
| app._tokenizer = types.SimpleNamespace(eos_token_id=0, pad_token_id=0) |
| app._current_model_id = app.FINE_TUNED_MODEL_ID |
|
|
| monkeypatch.setattr( |
| app, |
| "_generate_model_text", |
| lambda *a, **k: ("SELECT email FROM employees WHERE name = 'Alice';", 1), |
| ) |
|
|
| result = app.generate_response( |
| "find employees named Alice", |
| [], |
| app.PRESETS["employees"], |
| app.FINE_TUNED_MODEL_KEY, |
| None, |
| ) |
|
|
| assert sql_output(result) == "" |
| assert "model output failed SQL/schema validation" in assistant_text(result) |
| assert "email" not in assistant_text(result) |
| assert "schema-invalid model output" in status_html(result) |
| assert app.SOURCE_FINE_TUNED_MODEL in status_html(result) |
|
|
|
|
| def test_sql_model_accepts_valid_schema_column_output(monkeypatch): |
| app._model = types.SimpleNamespace(generation_config=types.SimpleNamespace(eos_token_id=0)) |
| app._tokenizer = types.SimpleNamespace(eos_token_id=0, pad_token_id=0) |
| app._current_model_id = app.FINE_TUNED_MODEL_ID |
|
|
| monkeypatch.setattr( |
| app, |
| "_generate_model_text", |
| lambda *a, **k: ("SELECT * FROM employees WHERE name = 'Alice';", 1), |
| ) |
|
|
| result = app.generate_response( |
| "find employees named Alice", |
| [], |
| app.PRESETS["employees"], |
| app.FINE_TUNED_MODEL_KEY, |
| None, |
| ) |
|
|
| assert "SELECT * FROM employees WHERE name = 'Alice';" in sql_output(result) |
| assert "validator-ok" in result[5] |
| assert app.SOURCE_FINE_TUNED_MODEL in status_html(result) |
|
|
|
|
| def test_sql_intent_detected(): |
| assert app.is_sql_intent("What is the average salary per department?", app.PRESETS["employees"]) |
| assert app.is_sql_intent("what is the most expensive product?", app.PRESETS["products"]) |
| assert app.is_sql_intent("liste todos os funcionários", app.PRESETS["employees"]) |
| assert app.is_sql_intent("mostre os alunos com nota maior que 8", app.PRESETS["students"]) |
|
|
|
|
| def test_greeting_not_sql_intent(): |
| assert not app.is_sql_intent("oi", app.PRESETS["employees"]) |
| assert not app.is_sql_intent("hello", "") |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.parametrize(("raw", "expected"), [ |
| ("```sql\nSELECT * FROM x\n```", "SELECT * FROM x"), |
| ("SELECT id FROM t<|end|>", "SELECT id FROM t"), |
| ("SQL: SELECT name FROM t", "SELECT name FROM t"), |
| ("```\nSELECT 1\n```", "SELECT 1"), |
| ]) |
| def test_clean_generation_strips_artifacts(raw, expected): |
| assert app.clean_generation(raw) == expected |
|
|
|
|
| def test_format_generation_result_sql_path(): |
| sql, chat, validator = app.format_generation_result("SELECT * FROM employees") |
| assert sql == "SELECT * FROM employees" |
| assert chat == "" |
| assert "validator-ok" in validator |
|
|
|
|
| def test_format_generation_result_rejects_non_sql_output(): |
| sql, chat, validator = app.format_generation_result("I don't know, try again.") |
| assert sql == "" |
| assert chat == "" |
| assert validator == app.EMPTY_VALIDATOR |
|
|
|
|
| @pytest.mark.parametrize("raw", [ |
| "DROP TABLE employees;", |
| "ALTER TABLE employees ADD COLUMN email TEXT;", |
| "UPDATE employees SET salary = 0;", |
| "DELETE FROM employees;", |
| "INSERT INTO employees VALUES (1);", |
| "CREATE TABLE employees_copy (id INTEGER);", |
| ]) |
| def test_format_generation_result_rejects_non_select_model_sql(raw): |
| sql, chat, validator = app.format_generation_result(raw) |
| assert sql == "" |
| assert chat == "" |
| assert validator == app.EMPTY_VALIDATOR |
|
|
|
|
| @pytest.mark.parametrize("raw", [ |
| "SELECT * FROM employees; DROP TABLE employees;", |
| "SELECT 1; DELETE FROM employees;", |
| "WITH high_salary AS (SELECT * FROM employees) SELECT * FROM high_salary; UPDATE employees SET salary = 0;", |
| "SELECT * FROM employees DROP TABLE employees", |
| ]) |
| def test_format_generation_result_rejects_stacked_model_sql(raw): |
| sql, chat, validator = app.format_generation_result(raw) |
| assert sql == "" |
| assert chat == "" |
| assert validator == app.EMPTY_VALIDATOR |
|
|
|
|
| def test_format_generation_result_rejects_incomplete_not_clause(): |
| sql, chat, validator = app.format_generation_result( |
| 'SELECT COUNT(*) FROM employees WHERE name = "Alice" AND NOT' |
| ) |
| assert sql == "" |
| assert chat == "" |
| assert validator == app.EMPTY_VALIDATOR |
|
|
|
|
| def test_format_generation_result_rejects_bare_negated_predicate(): |
| sql, chat, validator = app.format_generation_result( |
| 'SELECT COUNT(*) FROM employees WHERE name = "Alice" AND NOT department' |
| ) |
| assert sql == "" |
| assert chat == "" |
| assert validator == app.EMPTY_VALIDATOR |
|
|
|
|
| def test_format_generation_result_rejects_trailing_comparison_operator(): |
| sql, chat, validator = app.format_generation_result( |
| "SELECT id AS email FROM employees WHERE 1 =", |
| app.PRESETS["employees"], |
| ) |
| assert sql == "" |
| assert chat == "" |
| assert validator == app.EMPTY_VALIDATOR |
|
|
|
|
| def test_format_generation_result_accepts_with_query(): |
| sql, chat, validator = app.format_generation_result( |
| "WITH high_salary AS (SELECT * FROM employees) SELECT * FROM high_salary;" |
| ) |
| assert sql.startswith("WITH high_salary") |
| assert chat == "" |
| assert "validator-ok" in validator |
|
|
|
|
| def test_format_generation_result_rejects_unknown_column_against_schema(): |
| sql, chat, validator = app.format_generation_result( |
| "SELECT email FROM employees;", |
| app.PRESETS["employees"], |
| ) |
| assert sql == "" |
| assert chat == "" |
| assert validator == app.EMPTY_VALIDATOR |
|
|
|
|
| def test_format_generation_result_accepts_double_quoted_comparison_literal(): |
| sql, chat, validator = app.format_generation_result( |
| 'SELECT COUNT(*) FROM employees WHERE name = "Alice";', |
| app.PRESETS["employees"], |
| ) |
| assert sql == 'SELECT COUNT(*) FROM employees WHERE name = "Alice";' |
| assert chat == "" |
| assert "validator-ok" in validator |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.parametrize("stmt", [ |
| "SELECT * FROM employees", |
| "CREATE TABLE t (id INTEGER)", |
| "INSERT INTO t VALUES (1)", |
| "WITH cte AS (SELECT 1) SELECT * FROM cte", |
| "DROP TABLE t", |
| "UPDATE t SET x = 1 WHERE id = 1", |
| ]) |
| def test_validate_sql_valid_starters(stmt): |
| assert "validator-ok" in app.validate_sql(stmt) |
|
|
|
|
| def test_validate_sql_garbage_returns_warn(): |
| assert "validator-warn" in app.validate_sql("isto nao e sql %$#") |
|
|
|
|
| def test_validate_sql_incomplete_trailing_clause_returns_warn(): |
| assert "validator-warn" in app.validate_sql("SELECT MAX(price), id FROM products GROUP BY 1 ORDER") |
|
|
|
|
| def test_validate_sql_bare_negated_predicate_returns_warn(): |
| assert "validator-warn" in app.validate_sql( |
| 'SELECT COUNT(*) FROM employees WHERE name = "Alice" AND NOT department' |
| ) |
|
|
|
|
| def test_validate_sql_trailing_comparison_operator_returns_warn(): |
| assert "validator-warn" in app.validate_sql( |
| "SELECT id AS email FROM employees WHERE 1 =", |
| app.PRESETS["employees"], |
| ) |
|
|
|
|
| def test_validate_sql_rejects_unknown_schema_column(): |
| assert "validator-warn" in app.validate_sql( |
| "SELECT email FROM employees;", |
| app.PRESETS["employees"], |
| ) |
|
|
|
|
| def test_validate_sql_rejects_double_quoted_unknown_schema_column(): |
| assert "validator-warn" in app.validate_sql( |
| 'SELECT "email" FROM employees;', |
| app.PRESETS["employees"], |
| ) |
|
|
|
|
| def test_validate_sql_accepts_double_quoted_comparison_literal(): |
| assert "validator-ok" in app.validate_sql( |
| 'SELECT COUNT(*) FROM employees WHERE name = "Alice";', |
| app.PRESETS["employees"], |
| ) |
|
|
|
|
| def test_validate_sql_rejects_unknown_schema_table(): |
| assert "validator-warn" in app.validate_sql( |
| "SELECT name FROM departments;", |
| app.PRESETS["employees"], |
| ) |
|
|
|
|
| def test_validate_sql_rejects_date_when_not_in_schema(): |
| assert "validator-warn" in app.validate_sql( |
| "SELECT date FROM employees;", |
| app.PRESETS["employees"], |
| ) |
|
|
|
|
| def test_validate_sql_accepts_schema_alias_and_output_alias(): |
| assert "validator-ok" in app.validate_sql( |
| "SELECT e.name, COUNT(*) AS total FROM employees e GROUP BY e.name ORDER BY total DESC;", |
| app.PRESETS["employees"], |
| ) |
|
|
|
|
| def test_validate_sql_empty_returns_empty_badge(): |
| assert app.validate_sql("") == app.EMPTY_VALIDATOR |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.parametrize(("raw", "expected_type"), [ |
| ("price DECIMAL", "NUMERIC"), |
| ("active BOOL", "BOOLEAN"), |
| ("qty INT", "INTEGER"), |
| ("score REAL", "REAL"), |
| |
| |
| ("date DATE", "DATE"), |
| ("int INTEGER", "INTEGER"), |
| ("name TEXT", "TEXT"), |
| ]) |
| def test_parse_column_explicit_type_normalization(raw, expected_type): |
| parsed = app.parse_column_definition(raw) |
| assert parsed is not None |
| assert parsed[1] == expected_type |
| _, col_type = parsed |
| assert col_type == expected_type |
|
|
|
|
| |
| |
| |
|
|
| def test_trim_chat_history_caps_at_max_exchanges(): |
| history = [ |
| {"role": "user" if i % 2 == 0 else "assistant", "content": str(i)} |
| for i in range(30) |
| ] |
| trimmed = app.trim_chat_history(history) |
| assert len(trimmed) == 20 |
|
|
|
|
| |
| |
| |
|
|
| def test_empty_input_returns_error(): |
| result = app.generate_response("", [], "", None, None) |
| assert result[0] == [] |
| assert "Type a message" in status_html(result) |
|
|
|
|
| def test_malformed_create_table_returns_error(): |
| result = app.generate_response("crie tabela", [], "", None, None) |
| assert sql_output(result) == "" |
| assert "CREATE TABLE needs" in status_html(result) |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
|
|
|
|
| def test_edit_without_existing_table_returns_error(): |
| result = app.generate_response("adicione cpf", [], "", None, None) |
| assert sql_output(result) == "" |
| assert "existing CREATE TABLE" in status_html(result) |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
|
|
|
|
| @pytest.mark.parametrize( |
| "message", |
| [ |
| "delete all animals", |
| "delete animals", |
| "DELETE FROM animals", |
| "drop table animals", |
| "drop animals table", |
| "drop the animals table", |
| "drop animals", |
| "update animals set weight = 0", |
| "insert into animals values (1)", |
| "insert animal", |
| "insert row into animals", |
| "add row to animals", |
| "add animal record", |
| ], |
| ) |
| def test_unsupported_data_mutation_returns_static_fallback(message, monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response( |
| message, |
| [], |
| "CREATE TABLE animals (id INTEGER, name TEXT, weight NUMERIC)", |
| None, |
| None, |
| ) |
|
|
| assert sql_output(result) == "" |
| assert result[5] == app.EMPTY_VALIDATOR |
| assert app.UNSUPPORTED_MUTATION_RESPONSE in assistant_text(result) |
| assert app.SOURCE_STATIC_FALLBACK in status_html(result) |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER not in status_html(result) |
|
|
|
|
| def test_remove_nonexistent_columns_does_not_return_unchanged_schema(): |
| schema = "CREATE TABLE animals (id INTEGER, name TEXT, species TEXT);" |
|
|
| assert app.sql_core.edit_create_table_from_message("delete all animals", [], schema) == "" |
|
|
|
|
| def test_noop_schema_edit_reports_no_matching_column(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response( |
| "delete salary", |
| [], |
| "CREATE TABLE animals (id INTEGER, name TEXT, species TEXT)", |
| None, |
| None, |
| ) |
|
|
| assert sql_output(result) == "" |
| assert "No matching schema column was changed." in status_html(result) |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
|
|
|
|
| def test_data_mutation_uses_schema_from_history_when_active_schema_empty(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| history = [ |
| { |
| "role": "assistant", |
| "content": "```sql\nCREATE TABLE animals (id INTEGER, species TEXT);\n```", |
| } |
| ] |
|
|
| result = app.generate_response("delete animals", history, "", None, None) |
|
|
| assert sql_output(result) == "" |
| assert app.UNSUPPORTED_MUTATION_RESPONSE in assistant_text(result) |
| assert app.SOURCE_STATIC_FALLBACK in status_html(result) |
|
|
|
|
| @pytest.mark.parametrize("message", ["create table rows with id name", "create table records with id name"]) |
| def test_create_table_names_that_overlap_row_terms_still_create_schema(message, monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response(message, [], "", None, None) |
|
|
| assert "CREATE TABLE" in sql_output(result) |
| assert "id INTEGER" in sql_output(result) |
| assert "name TEXT" in sql_output(result) |
| assert app.SOURCE_DETERMINISTIC_SCHEMA_PARSER in status_html(result) |
|
|
|
|
| def test_non_template_sql_intent_without_model_returns_load_error(): |
| result = app.generate_response( |
| "find employees named Alice", [], app.PRESETS["employees"], None, None |
| ) |
| assert "Load a model" in status_html(result) |
| assert app.SOURCE_FINE_TUNED_MODEL in status_html(result) |
|
|
|
|
| def test_model_id_mismatch_returns_inconsistency_error(): |
| app._model = types.SimpleNamespace( |
| generation_config=types.SimpleNamespace(eos_token_id=0) |
| ) |
| app._tokenizer = object() |
| app._current_model_id = "microsoft/Phi-3-mini-4k-instruct" |
|
|
| try: |
| result = app.generate_response( |
| "select all", [], app.PRESETS["employees"], app.FINE_TUNED_MODEL_KEY, None |
| ) |
| assert "inconsistent" in status_html(result) |
| assert app.SOURCE_FINE_TUNED_MODEL in status_html(result) |
| finally: |
| reset_model_state() |
|
|
|
|
| def test_busy_generation_lock_raises(): |
| assert app._model_activity_lock.acquire(blocking=False) |
| try: |
| with pytest.raises(RuntimeError, match="Another model operation"): |
| app._run_generation(object(), {}, {}) |
| finally: |
| app._model_activity_lock.release() |
|
|
|
|
| def test_generation_exception_is_rendered_not_raised(monkeypatch): |
| class DummyTokenizer: |
| eos_token_id = 0 |
| pad_token_id = 0 |
|
|
| def __call__(self, prompt, return_tensors): |
| return {"input_ids": types.SimpleNamespace(shape=(1, 1))} |
|
|
| monkeypatch.setattr(app, "import_model_runtime", lambda: (object(), None, None, None)) |
| monkeypatch.setattr( |
| app, "_run_generation", |
| lambda *a, **k: (_ for _ in ()).throw(RuntimeError("timeout")) |
| ) |
| app._model = types.SimpleNamespace( |
| generation_config=types.SimpleNamespace(eos_token_id=0) |
| ) |
| app._tokenizer = DummyTokenizer() |
| app._current_model_id = app.FINE_TUNED_MODEL_ID |
|
|
| result = app.generate_response( |
| "select all rows", [], "", app.FINE_TUNED_MODEL_KEY, None |
| ) |
|
|
| assert sql_output(result) == "" |
| assert "Generation failed: RuntimeError: timeout" in status_html(result) |
| assert app.SOURCE_FINE_TUNED_MODEL in status_html(result) |
|
|
|
|
| |
| |
| |
|
|
| def test_off_topic_message_returns_fallback(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response("me conte uma piada", [], "", None, None) |
|
|
| assert sql_output(result) == "" |
| assert app.FALLBACK_RESPONSE in assistant_text(result) |
| assert app.SOURCE_STATIC_FALLBACK in status_html(result) |
|
|
|
|
| def test_greeting_returns_fallback(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
|
|
| result = app.generate_response("oi", [], "", None, None) |
|
|
| assert sql_output(result) == "" |
| assert app.FALLBACK_RESPONSE in assistant_text(result) |
| assert app.SOURCE_STATIC_FALLBACK in status_html(result) |
|
|
|
|
| |
| |
| |
|
|
| def test_stopwords_not_treated_as_columns(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela animal com nome especie", [], "", None, None |
| ) |
|
|
| result = app.generate_response("add peso", base[0], "", None, None) |
|
|
| schema = sql_output(result) |
| assert "peso NUMERIC" in schema |
| assert " to TEXT" not in schema |
| assert " as TEXT" not in schema |
| assert " from TEXT" not in schema |
|
|
|
|
| |
| |
| |
|
|
| def test_rename_column_basic(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela animal com nome cientifico especie", [], "", None, None |
| ) |
|
|
| result = app.generate_response( |
| "rename cientifico to nome_cientifico", base[0], "", None, None |
| ) |
|
|
| schema = sql_output(result) |
| assert "nome_cientifico TEXT" in schema |
| assert "\n cientifico TEXT" not in schema |
| assert "nome TEXT" in schema |
| assert "especie TEXT" in schema |
| assert "validator-ok" in result[5] |
|
|
|
|
| def test_rename_column_pt(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela produto com id nome preco", [], "", None, None |
| ) |
|
|
| result = app.generate_response( |
| "renomeie preco para valor", base[0], "", None, None |
| ) |
|
|
| schema = sql_output(result) |
| assert "valor NUMERIC" in schema |
| assert "preco" not in schema |
|
|
|
|
| |
| |
| |
|
|
| def test_compound_add_and_rename(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela animal com nome cientifico especie", [], "", None, None |
| ) |
|
|
| result = app.generate_response( |
| "add peso and rename cientifico to nome_cientifico", base[0], "", None, None |
| ) |
|
|
| schema = sql_output(result) |
| assert "peso" in schema |
| assert "nome_cientifico TEXT" in schema |
| assert "\n cientifico TEXT" not in schema |
| assert "edit TEXT" not in schema |
| assert " to TEXT" not in schema |
| assert "validator-ok" in result[5] |
|
|
|
|
| def test_compound_add_and_remove(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela funcionarios com id nome salario departamento", [], "", None, None |
| ) |
|
|
| result = app.generate_response( |
| "add email and remove salario", base[0], "", None, None |
| ) |
|
|
| schema = sql_output(result) |
| assert "email TEXT" in schema |
| assert "salario" not in schema |
| assert "id INTEGER" in schema |
| assert "nome TEXT" in schema |
|
|
|
|
| |
| |
| |
|
|
| def test_rename_preserves_column_type(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela vendas com id total date", [], "", None, None |
| ) |
|
|
| result = app.generate_response( |
| "rename total to valor_total", base[0], "", None, None |
| ) |
|
|
| schema = sql_output(result) |
| assert "valor_total NUMERIC" in schema |
| assert "\n total NUMERIC" not in schema |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.parametrize( |
| ("message", "schema"), |
| [ |
| ("troca tipo por medida", "CREATE TABLE comida (id INTEGER)"), |
| ("renomeia nome para titulo", "CREATE TABLE livro (id INTEGER, nome TEXT)"), |
| ("muda preco para numeric", "CREATE TABLE produto (id INTEGER, preco TEXT)"), |
| ("altera coluna idade para integer", "CREATE TABLE pessoa (id INTEGER, idade TEXT)"), |
| ], |
| ) |
| def test_edit_terms_routed_to_off_topic(message, schema, monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| |
| result = app.generate_response(message, [], schema, None, None) |
| status = status_html(result) |
| assert "Load a model" not in status |
| |
|
|
|
|
| |
| |
| |
|
|
| def test_build_generation_prompt_excludes_raw_chat_history(): |
| schema = "CREATE TABLE comida (id INTEGER, nome TEXT, sabor TEXT)" |
| message = "liste tudo ordenado por nome" |
| chat_history = [ |
| {"role": "user", "content": "crie tabela comida com nome sabor"}, |
| {"role": "assistant", "content": "```sql\nCREATE TABLE comida (id INTEGER, nome TEXT, sabor TEXT)\n```"}, |
| {"role": "user", "content": "adiciona coluna peso"}, |
| {"role": "assistant", "content": "```sql\nALTER TABLE comida ADD COLUMN peso NUMERIC\n```"}, |
| ] |
| prompt = app.build_generation_prompt(schema, message, chat_history) |
| assert "Previous conversation:" not in prompt |
| assert "crie tabela comida" not in prompt |
| assert "adiciona coluna peso" not in prompt |
| assert "CREATE TABLE comida" in prompt |
| assert "Question: list tudo ordered by name?" in prompt |
|
|
|
|
| def test_build_generation_prompt_no_history_no_context(): |
| schema = "CREATE TABLE comida (id INTEGER)" |
| message = "liste todos" |
| prompt = app.build_generation_prompt(schema, message, None) |
| |
| assert "Previous conversation:" not in prompt |
| |
| assert "comida" in prompt |
| assert "Question: list all?" in prompt |
|
|
|
|
| |
| |
| |
|
|
| def test_troca_x_por_y_rename(monkeypatch): |
| monkeypatch.setattr(app, "_run_generation", lambda *a, **k: pytest.fail("model should not run")) |
| base = app.generate_response( |
| "crie tabela comida com id nome sabor peso tipo", [], "", None, None |
| ) |
| result = app.generate_response( |
| "troca tipo por medida", base[0], base[2], None, None |
| ) |
| schema = sql_output(result) |
| assert "medida TEXT" in schema |
| assert "tipo TEXT" not in schema |
| assert "id INTEGER" in schema |
| assert "nome TEXT" in schema |
| assert "sabor TEXT" in schema |
| assert "peso NUMERIC" in schema |
|
|