| """Tests that verify SQL injection mitigations in insights and state modules.""" |
|
|
| import re |
|
|
| from agent.insights import InsightsEngine |
|
|
|
|
| def test_session_cols_no_injection_chars(): |
| """_SESSION_COLS must not contain SQL injection vectors.""" |
| cols = InsightsEngine._SESSION_COLS |
| assert ";" not in cols |
| assert "--" not in cols |
| assert "'" not in cols |
| assert "DROP" not in cols.upper() |
|
|
|
|
| def test_get_sessions_all_query_is_parameterized(): |
| """_GET_SESSIONS_ALL must use a ? placeholder for the cutoff value.""" |
| query = InsightsEngine._GET_SESSIONS_ALL |
| assert "?" in query |
| assert "started_at >= ?" in query |
| |
| assert "{" not in query |
|
|
|
|
| def test_get_sessions_with_source_query_is_parameterized(): |
| """_GET_SESSIONS_WITH_SOURCE must use ? placeholders for both parameters.""" |
| query = InsightsEngine._GET_SESSIONS_WITH_SOURCE |
| assert query.count("?") == 2 |
| assert "started_at >= ?" in query |
| assert "source = ?" in query |
| assert "{" not in query |
|
|
|
|
| def test_session_col_names_are_safe_identifiers(): |
| """Every column name listed in _SESSION_COLS must be a simple identifier.""" |
| cols = InsightsEngine._SESSION_COLS |
| identifiers = [c.strip() for c in cols.split(",")] |
| safe_identifier = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") |
| for col in identifiers: |
| assert safe_identifier.match(col), ( |
| f"Column name {col!r} is not a safe SQL identifier" |
| ) |
|
|