| | |
| | """ |
| | Test script for Role Management API flows |
| | Tests the updated CREATE and UPDATE endpoints with new widget_access field |
| | """ |
| |
|
| | import asyncio |
| | import json |
| | import sys |
| | from datetime import datetime |
| | from typing import Dict, Any |
| |
|
| | |
| | class MockRoleAPITest: |
| | def __init__(self): |
| | self.test_results = [] |
| | |
| | def log_test(self, test_name: str, status: str, details: str = ""): |
| | """Log test results""" |
| | result = { |
| | "test": test_name, |
| | "status": status, |
| | "details": details, |
| | "timestamp": datetime.now().isoformat() |
| | } |
| | self.test_results.append(result) |
| | print(f"β {test_name}: {status}") |
| | if details: |
| | print(f" Details: {details}") |
| | |
| | def validate_schema(self, data: Dict[str, Any], expected_fields: list) -> bool: |
| | """Validate that data contains expected fields""" |
| | missing_fields = [] |
| | for field in expected_fields: |
| | if field not in data: |
| | missing_fields.append(field) |
| | |
| | if missing_fields: |
| | return False, f"Missing fields: {missing_fields}" |
| | return True, "All required fields present" |
| | |
| | def test_create_role_schema_validation(self): |
| | """Test CREATE role request schema validation""" |
| | test_name = "CREATE Role - Schema Validation" |
| | |
| | |
| | create_request = { |
| | "role_id": "ROLE-MANAGER-001", |
| | "name": "Branch Manager", |
| | "description": "Manages branch operations and staff", |
| | "inherits_from": "ROLE-BASE-MANAGER", |
| | "permissions": { |
| | "appointments": ["view", "create", "edit"], |
| | "customers": ["view", "create", "edit"] |
| | }, |
| | "scope": { |
| | "global_scope": False, |
| | "zones": ["ZONE-NORTH"], |
| | "branches": ["BR-DELHI-01"] |
| | }, |
| | "geo_fence": { |
| | "enabled": True, |
| | "radius_meters": 250 |
| | }, |
| | "widget_access": [ |
| | "wid_sales_001", |
| | "wid_orders_001", |
| | "wid_customers_001", |
| | "wid_recent_orders_001" |
| | ] |
| | } |
| | |
| | |
| | required_fields = ["role_id", "name", "permissions"] |
| | is_valid, details = self.validate_schema(create_request, required_fields) |
| | |
| | if is_valid: |
| | |
| | if "widget_access" in create_request and isinstance(create_request["widget_access"], list): |
| | self.log_test(test_name, "PASS", "Schema validation successful, widget_access field present") |
| | else: |
| | self.log_test(test_name, "FAIL", "widget_access field missing or invalid type") |
| | else: |
| | self.log_test(test_name, "FAIL", details) |
| | |
| | def test_create_role_response_schema(self): |
| | """Test CREATE role response schema""" |
| | test_name = "CREATE Role - Response Schema" |
| | |
| | |
| | create_response = { |
| | "_id": "507f1f77bcf86cd799439011", |
| | "role_id": "ROLE-MANAGER-001", |
| | "name": "Branch Manager", |
| | "description": "Manages branch operations and staff", |
| | "inherits_from": "ROLE-BASE-MANAGER", |
| | "permissions": { |
| | "appointments": ["view", "create", "edit"], |
| | "customers": ["view", "create", "edit"] |
| | }, |
| | "scope": { |
| | "global_scope": False, |
| | "zones": ["ZONE-NORTH"], |
| | "branches": ["BR-DELHI-01"] |
| | }, |
| | "geo_fence": { |
| | "enabled": True, |
| | "radius_meters": 250 |
| | }, |
| | "widget_access": [ |
| | "wid_sales_001", |
| | "wid_orders_001", |
| | "wid_customers_001", |
| | "wid_recent_orders_001" |
| | ], |
| | "created_at": "2024-01-15T10:30:00Z", |
| | "updated_at": "2024-01-15T10:30:00Z", |
| | "archived": False, |
| | "merchant_id": "MERCHANT-001", |
| | "created_by": "USER-001" |
| | } |
| | |
| | |
| | expected_fields = [ |
| | "_id", "role_id", "name", "permissions", "created_at", |
| | "updated_at", "archived", "widget_access" |
| | ] |
| | |
| | is_valid, details = self.validate_schema(create_response, expected_fields) |
| | |
| | if is_valid: |
| | |
| | if (isinstance(create_response.get("widget_access"), list) and |
| | isinstance(create_response.get("archived"), bool) and |
| | create_response.get("_id")): |
| | self.log_test(test_name, "PASS", "Response schema validation successful") |
| | else: |
| | self.log_test(test_name, "FAIL", "Field type validation failed") |
| | else: |
| | self.log_test(test_name, "FAIL", details) |
| | |
| | def test_update_role_schema_validation(self): |
| | """Test UPDATE role request schema validation""" |
| | test_name = "UPDATE Role - Schema Validation" |
| | |
| | |
| | update_request = { |
| | "name": "Senior Branch Manager", |
| | "description": "Updated description for senior role", |
| | "widget_access": [ |
| | "wid_sales_001", |
| | "wid_orders_001", |
| | "wid_customers_001", |
| | "wid_recent_orders_001", |
| | "wid_analytics_001" |
| | ], |
| | "permissions": { |
| | "appointments": ["view", "create", "edit", "delete"], |
| | "customers": ["view", "create", "edit"], |
| | "reports": ["view"] |
| | } |
| | } |
| | |
| | |
| | if isinstance(update_request.get("widget_access"), list): |
| | widget_count = len(update_request["widget_access"]) |
| | self.log_test(test_name, "PASS", f"Update schema valid, {widget_count} widgets specified") |
| | else: |
| | self.log_test(test_name, "FAIL", "widget_access field validation failed") |
| | |
| | def test_update_role_response_schema(self): |
| | """Test UPDATE role response schema""" |
| | test_name = "UPDATE Role - Response Schema" |
| | |
| | |
| | update_response = { |
| | "_id": "507f1f77bcf86cd799439011", |
| | "role_id": "ROLE-MANAGER-001", |
| | "name": "Senior Branch Manager", |
| | "description": "Updated description for senior role", |
| | "inherits_from": "ROLE-BASE-MANAGER", |
| | "permissions": { |
| | "appointments": ["view", "create", "edit", "delete"], |
| | "customers": ["view", "create", "edit"], |
| | "reports": ["view"] |
| | }, |
| | "scope": { |
| | "global_scope": False, |
| | "zones": ["ZONE-NORTH"], |
| | "branches": ["BR-DELHI-01"] |
| | }, |
| | "geo_fence": { |
| | "enabled": True, |
| | "radius_meters": 250 |
| | }, |
| | "widget_access": [ |
| | "wid_sales_001", |
| | "wid_orders_001", |
| | "wid_customers_001", |
| | "wid_recent_orders_001", |
| | "wid_analytics_001" |
| | ], |
| | "created_at": "2024-01-15T10:30:00Z", |
| | "updated_at": "2024-01-15T11:45:00Z", |
| | "archived": False, |
| | "merchant_id": "MERCHANT-001", |
| | "created_by": "USER-001", |
| | "updated_by": "USER-002" |
| | } |
| | |
| | |
| | expected_fields = [ |
| | "_id", "role_id", "name", "permissions", "created_at", |
| | "updated_at", "archived", "widget_access", "updated_by" |
| | ] |
| | |
| | is_valid, details = self.validate_schema(update_response, expected_fields) |
| | |
| | if is_valid: |
| | |
| | if update_response["updated_at"] != update_response["created_at"]: |
| | self.log_test(test_name, "PASS", "Update response schema valid, timestamps updated") |
| | else: |
| | self.log_test(test_name, "FAIL", "Timestamps not properly updated") |
| | else: |
| | self.log_test(test_name, "FAIL", details) |
| | |
| | def test_widget_access_field_scenarios(self): |
| | """Test various widget_access field scenarios""" |
| | test_name = "Widget Access Field - Various Scenarios" |
| | |
| | scenarios = [ |
| | { |
| | "name": "Empty widget list", |
| | "widget_access": [], |
| | "expected": "PASS" |
| | }, |
| | { |
| | "name": "Single widget", |
| | "widget_access": ["wid_sales_001"], |
| | "expected": "PASS" |
| | }, |
| | { |
| | "name": "Multiple widgets", |
| | "widget_access": ["wid_sales_001", "wid_orders_001", "wid_analytics_001"], |
| | "expected": "PASS" |
| | }, |
| | { |
| | "name": "None value (optional field)", |
| | "widget_access": None, |
| | "expected": "PASS" |
| | } |
| | ] |
| | |
| | passed_scenarios = 0 |
| | for scenario in scenarios: |
| | widget_access = scenario["widget_access"] |
| | if widget_access is None or isinstance(widget_access, list): |
| | passed_scenarios += 1 |
| | |
| | if passed_scenarios == len(scenarios): |
| | self.log_test(test_name, "PASS", f"All {len(scenarios)} widget access scenarios validated") |
| | else: |
| | self.log_test(test_name, "FAIL", f"Only {passed_scenarios}/{len(scenarios)} scenarios passed") |
| | |
| | def test_backward_compatibility(self): |
| | """Test backward compatibility with existing API calls""" |
| | test_name = "Backward Compatibility" |
| | |
| | |
| | old_format_request = { |
| | "role_id": "ROLE-OLD-001", |
| | "name": "Legacy Role", |
| | "permissions": { |
| | "customers": ["view", "edit"] |
| | }, |
| | "description": "Legacy role without widget_access" |
| | } |
| | |
| | |
| | required_fields = ["role_id", "name", "permissions"] |
| | is_valid, details = self.validate_schema(old_format_request, required_fields) |
| | |
| | if is_valid: |
| | self.log_test(test_name, "PASS", "Legacy requests without widget_access still supported") |
| | else: |
| | self.log_test(test_name, "FAIL", details) |
| | |
| | def test_field_validation_edge_cases(self): |
| | """Test edge cases for field validation""" |
| | test_name = "Field Validation - Edge Cases" |
| | |
| | edge_cases = [ |
| | { |
| | "case": "Very long widget ID", |
| | "widget_access": ["wid_" + "x" * 100], |
| | "valid": True |
| | }, |
| | { |
| | "case": "Widget ID with special characters", |
| | "widget_access": ["wid_sales-001_v2"], |
| | "valid": True |
| | }, |
| | { |
| | "case": "Large number of widgets", |
| | "widget_access": [f"wid_{i:03d}" for i in range(50)], |
| | "valid": True |
| | } |
| | ] |
| | |
| | passed_cases = 0 |
| | for case in edge_cases: |
| | if isinstance(case["widget_access"], list): |
| | passed_cases += 1 |
| | |
| | if passed_cases == len(edge_cases): |
| | self.log_test(test_name, "PASS", f"All {len(edge_cases)} edge cases handled") |
| | else: |
| | self.log_test(test_name, "FAIL", f"Only {passed_cases}/{len(edge_cases)} edge cases passed") |
| | |
| | def run_all_tests(self): |
| | """Run all test scenarios""" |
| | print("π Starting Role Management API Flow Tests") |
| | print("=" * 50) |
| | |
| | |
| | self.test_create_role_schema_validation() |
| | self.test_create_role_response_schema() |
| | self.test_update_role_schema_validation() |
| | self.test_update_role_response_schema() |
| | self.test_widget_access_field_scenarios() |
| | self.test_backward_compatibility() |
| | self.test_field_validation_edge_cases() |
| | |
| | |
| | print("\n" + "=" * 50) |
| | print("π Test Summary") |
| | print("=" * 50) |
| | |
| | passed = len([r for r in self.test_results if r["status"] == "PASS"]) |
| | failed = len([r for r in self.test_results if r["status"] == "FAIL"]) |
| | total = len(self.test_results) |
| | |
| | print(f"Total Tests: {total}") |
| | print(f"Passed: {passed}") |
| | print(f"Failed: {failed}") |
| | print(f"Success Rate: {(passed/total)*100:.1f}%") |
| | |
| | if failed > 0: |
| | print("\nβ Failed Tests:") |
| | for result in self.test_results: |
| | if result["status"] == "FAIL": |
| | print(f" - {result['test']}: {result['details']}") |
| | else: |
| | print("\nβ
All tests passed!") |
| | |
| | return passed == total |
| |
|
| | def main(): |
| | """Main test execution""" |
| | tester = MockRoleAPITest() |
| | success = tester.run_all_tests() |
| | |
| | |
| | sys.exit(0 if success else 1) |
| |
|
| | if __name__ == "__main__": |
| | main() |