File size: 5,850 Bytes
c509b44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""
Tests for enhanced admin rules with regex and severity support
"""

import sys
from pathlib import Path

# Add backend directory to Python path
backend_dir = Path(__file__).parent.parent
sys.path.insert(0, str(backend_dir))

import pytest
import tempfile
import os
import re

from api.storage.rules_store import RulesStore


@pytest.fixture
def temp_rules_db():
    """Create a temporary database for testing."""
    with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as f:
        db_path = f.name
    yield db_path
    # Cleanup
    if os.path.exists(db_path):
        os.unlink(db_path)


@pytest.fixture
def rules_store(temp_rules_db):
    """Create a RulesStore instance with temporary database."""
    # RulesStore uses a fixed path, so we'll just use the default
    # For tests, it will create/use data/admin_rules.db
    # Each test should use unique tenant_id to avoid conflicts
    store = RulesStore()
    yield store
    # Cleanup: Delete test data after each test
    # Note: In a real scenario, you'd want to clean up specific tenant data
    # For now, tests use unique tenant IDs to avoid conflicts


def test_add_rule_with_regex_and_severity(rules_store):
    """Test adding a rule with regex pattern and severity."""
    tenant_id = "test_tenant_regex_severity"  # Unique tenant ID
    success = rules_store.add_rule(
        tenant_id=tenant_id,
        rule="Block password queries",
        pattern=r".*password.*|.*pwd.*",
        severity="high",
        description="Blocks any queries containing password or pwd",
        enabled=True
    )
    
    assert success is True
    
    # Get detailed rules
    rules = rules_store.get_rules_detailed(tenant_id)
    assert len(rules) == 1
    assert rules[0]["pattern"] == r".*password.*|.*pwd.*"
    assert rules[0]["severity"] == "high"
    assert rules[0]["description"] == "Blocks any queries containing password or pwd"
    assert rules[0]["enabled"] == 1


def test_add_rule_without_pattern_uses_rule_text(rules_store):
    """Test that if pattern is not provided, rule text is used as pattern."""
    tenant_id = "test_tenant_no_pattern"  # Unique tenant ID
    rules_store.add_rule(
        tenant_id=tenant_id,
        rule="Block sensitive data",
        severity="medium"
    )
    
    rules = rules_store.get_rules_detailed(tenant_id)
    assert len(rules) == 1
    assert rules[0]["pattern"] == "Block sensitive data"
    assert rules[0]["severity"] == "medium"


def test_get_rules_backward_compatibility(rules_store):
    """Test that get_rules() still returns simple list for backward compatibility."""
    tenant_id = "test_tenant_backward_compat"  # Unique tenant ID
    rules_store.add_rule(
        tenant_id=tenant_id,
        rule="Rule 1",
        severity="low"
    )
    rules_store.add_rule(
        tenant_id=tenant_id,
        rule="Rule 2",
        severity="high"
    )
    
    rules = rules_store.get_rules(tenant_id)
    assert isinstance(rules, list)
    assert len(rules) == 2
    assert "Rule 1" in rules
    assert "Rule 2" in rules


def test_regex_pattern_matching(rules_store):
    """Test that regex patterns work correctly."""
    tenant_id = "test_tenant_regex_match"  # Unique tenant ID
    rules_store.add_rule(
        tenant_id=tenant_id,
        rule="Email pattern",
        pattern=r".*@.*\..*",
        severity="medium"
    )
    
    rules = rules_store.get_rules_detailed(tenant_id)
    assert len(rules) == 1
    pattern = rules[0]["pattern"]
    
    # Test regex matching
    test_cases = [
        ("user@example.com", True),
        ("contact me at test@domain.org", True),
        ("no email here", False),
        ("just text", False)
    ]
    
    regex = re.compile(pattern, re.IGNORECASE)
    for text, should_match in test_cases:
        assert (regex.search(text) is not None) == should_match, f"Failed for: {text}"


def test_severity_levels(rules_store):
    """Test different severity levels."""
    tenant_id = "test_tenant_severity"  # Unique tenant ID
    severities = ["low", "medium", "high", "critical"]
    
    for i, severity in enumerate(severities):
        rules_store.add_rule(
            tenant_id=tenant_id,
            rule=f"Rule {severity}",
            severity=severity
        )
    
    rules = rules_store.get_rules_detailed(tenant_id)
    assert len(rules) == len(severities)
    
    for rule in rules:
        assert rule["severity"] in severities


def test_disabled_rules_not_returned(rules_store):
    """Test that disabled rules are not returned by get_rules()."""
    tenant_id = "test_tenant_disabled"  # Unique tenant ID
    rules_store.add_rule(
        tenant_id=tenant_id,
        rule="Enabled rule",
        enabled=True
    )
    rules_store.add_rule(
        tenant_id=tenant_id,
        rule="Disabled rule",
        enabled=False
    )
    
    rules = rules_store.get_rules(tenant_id)
    assert len(rules) == 1
    assert "Enabled rule" in rules
    assert "Disabled rule" not in rules
    
    # But disabled rules should still exist in detailed view (if we add a method for that)
    # For now, we rely on enabled column filtering


def test_multiple_tenants_isolation(rules_store):
    """Test that rules are properly isolated by tenant."""
    rules_store.add_rule(
        tenant_id="tenant1",
        rule="Tenant 1 rule",
        severity="low"
    )
    rules_store.add_rule(
        tenant_id="tenant2",
        rule="Tenant 2 rule",
        severity="high"
    )
    
    tenant1_rules = rules_store.get_rules("tenant1")
    tenant2_rules = rules_store.get_rules("tenant2")
    
    assert len(tenant1_rules) == 1
    assert "Tenant 1 rule" in tenant1_rules
    assert "Tenant 2 rule" not in tenant1_rules
    
    assert len(tenant2_rules) == 1
    assert "Tenant 2 rule" in tenant2_rules
    assert "Tenant 1 rule" not in tenant2_rules