File size: 4,876 Bytes
307aee3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
Test the complete authentication flow to debug the 403 error.
"""
import asyncio
import sys
import os

# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from app.dependencies.auth import get_current_user, require_widget_access, user_has_widget_access
from app.utils.jwt import decode_jwt_token
from app.nosql import mongo_db
from insightfy_utils.logging import get_logger

logger = get_logger(__name__)

# The JWT token from the curl command
JWT_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJibG9vbSIsIm1lcmNoYW50X2lkIjoiSU4tTkFUVVItQ0hFQU5OLTdEMkItTzlCUDEiLCJhc3NvY2lhdGVfaWQiOiJBU1QwMTEiLCJyb2xlX2lkIjoiYWRtaW4iLCJicmFuY2hfaWQiOiJocSIsImV4cCI6MTc2MzI5OTY3M30.ypc3TEbUox3tp_0BTZz1GBk9WeCkQgWGx1fv_yiHPdQ"

WIDGET_ID = "wid_revenue_trend_12m_001"


async def test_auth_flow():
    """Test the complete authentication flow."""
    
    print(f"\n{'='*80}")
    print(f"Testing Authentication Flow")
    print(f"{'='*80}\n")
    
    # Step 1: Decode JWT token
    print("Step 1: Decoding JWT Token")
    print("-" * 80)
    try:
        payload = decode_jwt_token(JWT_TOKEN)
        print(f"βœ“ Token decoded successfully")
        print(f"  Payload: {payload}\n")
        
        current_user = {
            "associate_id": payload["associate_id"],
            "merchant_id": payload["merchant_id"],
            "branch_id": payload["branch_id"],
            "role_id": payload.get("role_id", "user")
        }
        print(f"  Current user: {current_user}\n")
    except Exception as e:
        print(f"❌ Failed to decode token: {e}\n")
        return
    
    # Step 2: Extract credentials
    print("Step 2: Extracting Credentials")
    print("-" * 80)
    merchant_id = current_user.get("merchant_id")
    user_id = current_user.get("associate_id")
    role_id = current_user.get("role_id")
    
    print(f"  merchant_id: {merchant_id}")
    print(f"  user_id: {user_id}")
    print(f"  role_id: {role_id}")
    print(f"  widget_id: {WIDGET_ID}\n")
    
    if not user_id or not merchant_id or not role_id:
        print(f"❌ Missing credentials!\n")
        return
    
    # Step 3: Check MongoDB for access_roles document
    print("Step 3: Checking MongoDB for access_roles document")
    print("-" * 80)
    
    query = {
        "merchant_id": merchant_id,
        "role_id": role_id
    }
    print(f"  Query: {query}\n")
    
    role_doc = await mongo_db["access_roles"].find_one(query)
    
    if not role_doc:
        print(f"❌ No access_roles document found!\n")
        return
    
    print(f"βœ“ Found access_roles document")
    print(f"  Document ID: {role_doc.get('_id')}")
    print(f"  widget_access type: {type(role_doc.get('widget_access'))}")
    print(f"  widget_access length: {len(role_doc.get('widget_access', []))}\n")
    
    # Step 4: Check if widget_id is in widget_access array
    print("Step 4: Checking widget_access array")
    print("-" * 80)
    
    widget_access = role_doc.get("widget_access", [])
    
    if WIDGET_ID in widget_access:
        print(f"βœ“ Widget '{WIDGET_ID}' found in widget_access array\n")
    else:
        print(f"❌ Widget '{WIDGET_ID}' NOT found in widget_access array")
        print(f"  Available widgets: {widget_access[:5]}...\n")
        return
    
    # Step 5: Test the exact query used by user_has_widget_access
    print("Step 5: Testing user_has_widget_access() query")
    print("-" * 80)
    
    test_query = {
        "merchant_id": merchant_id,
        "role_id": role_id,
        "widget_access": WIDGET_ID
    }
    print(f"  Query: {test_query}\n")
    
    result = await mongo_db["access_roles"].find_one(test_query)
    
    if result:
        print(f"βœ“ Query returned a document\n")
    else:
        print(f"❌ Query returned None\n")
        return
    
    # Step 6: Test the actual function
    print("Step 6: Testing user_has_widget_access() function")
    print("-" * 80)
    
    has_access = await user_has_widget_access(merchant_id, role_id, WIDGET_ID)
    
    if has_access:
        print(f"βœ“ user_has_widget_access() returned True\n")
    else:
        print(f"❌ user_has_widget_access() returned False\n")
        return
    
    # Step 7: Test require_widget_access
    print("Step 7: Testing require_widget_access() function")
    print("-" * 80)
    
    try:
        result = await require_widget_access(WIDGET_ID, current_user)
        print(f"βœ“ require_widget_access() succeeded")
        print(f"  Returned: {result}\n")
    except Exception as e:
        print(f"❌ require_widget_access() raised exception: {e}\n")
        return
    
    # Final result
    print(f"{'='*80}")
    print(f"βœ… ALL TESTS PASSED!")
    print(f"The API should grant access to this widget.")
    print(f"{'='*80}\n")


if __name__ == "__main__":
    asyncio.run(test_auth_flow())