File size: 6,686 Bytes
307aee3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | """
Script to check and fix widget access permissions in MongoDB.
This script will:
1. Check if access_roles collection exists
2. Check if the admin role has widget_access configured
3. Add widget_access if missing
"""
import asyncio
import sys
import os
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.nosql import mongo_db
from insightfy_utils.logging import get_logger
logger = get_logger(__name__)
MERCHANT_ID = "IN-NATUR-CHEANN-7D2B-O9BP1"
ROLE_ID = "admin"
# All widget IDs
ALL_WIDGET_IDS = [
# Chart widgets
"wid_revenue_trend_12m_001",
"wid_gross_margin_trend_12m_001",
"wid_channel_mix_001",
"wid_top_5_skus_001",
"wid_inventory_status_001",
"wid_top_selling_products_30d_001",
"wid_staff_performance_001",
"wid_personal_sales_trend_30d_001",
"wid_top_products_sold_by_me_001",
# Table widgets
"wid_recent_orders_001",
"wid_pending_orders_001",
"wid_low_stock_items_001",
"wid_top_customers_30d_001",
"wid_top_refunded_orders_001",
"wid_expiring_stock_001",
"wid_product_reorder_list_001",
# KPI widgets
"wid_total_revenue_001",
"wid_gross_margin_pct_001",
"wid_orders_count_001",
"wid_aov_001",
"wid_repeat_rate_001",
"wid_refund_rate_001"
]
async def check_and_fix_widget_access():
"""Check and fix widget access permissions."""
print(f"\n{'='*80}")
print(f"Checking Widget Access Permissions")
print(f"{'='*80}\n")
# Check if access_roles collection exists
collections = await mongo_db.list_collection_names()
print(f"β Available collections: {', '.join(collections)}\n")
if "access_roles" not in collections:
print("β access_roles collection does not exist!")
print(" Creating collection...\n")
await mongo_db.create_collection("access_roles")
print("β Created access_roles collection\n")
# Check for the admin role document
print(f"Checking for role: merchant_id={MERCHANT_ID}, role_id={ROLE_ID}\n")
role_doc = await mongo_db["access_roles"].find_one({
"merchant_id": MERCHANT_ID,
"role_id": ROLE_ID
})
if not role_doc:
print("β No access_roles document found for admin role!")
print(" Creating document with full permissions...\n")
new_doc = {
"merchant_id": MERCHANT_ID,
"role_id": ROLE_ID,
"permissions": {
"charts": ["view", "create", "update", "delete"],
"tables": ["view", "create", "update", "delete"],
"kpis": ["view"],
"analytics": ["view", "create", "update"],
"reports": ["view", "create", "update", "delete"],
"dashboard": ["view", "create", "update", "delete"]
},
"widget_access": ALL_WIDGET_IDS
}
result = await mongo_db["access_roles"].insert_one(new_doc)
print(f"β Created access_roles document with ID: {result.inserted_id}\n")
role_doc = new_doc
else:
print("β Found access_roles document\n")
print(f"Document ID: {role_doc.get('_id')}")
print(f"Permissions: {role_doc.get('permissions', {})}\n")
# Check widget_access field
widget_access = role_doc.get("widget_access", [])
if not widget_access:
print("β widget_access field is missing or empty!")
print(" Adding all widget IDs...\n")
result = await mongo_db["access_roles"].update_one(
{
"merchant_id": MERCHANT_ID,
"role_id": ROLE_ID
},
{
"$set": {
"widget_access": ALL_WIDGET_IDS
}
}
)
print(f"β Updated document (matched: {result.matched_count}, modified: {result.modified_count})\n")
widget_access = ALL_WIDGET_IDS
else:
print(f"β widget_access field exists with {len(widget_access)} widgets\n")
# Check specific widget
test_widget = "wid_revenue_trend_12m_001"
if test_widget in widget_access:
print(f"β Test widget '{test_widget}' is in widget_access array\n")
else:
print(f"β Test widget '{test_widget}' is NOT in widget_access array!")
print(f" Current widgets: {widget_access}\n")
# Add missing widgets
missing_widgets = [w for w in ALL_WIDGET_IDS if w not in widget_access]
if missing_widgets:
print(f" Adding {len(missing_widgets)} missing widgets...\n")
result = await mongo_db["access_roles"].update_one(
{
"merchant_id": MERCHANT_ID,
"role_id": ROLE_ID
},
{
"$addToSet": {
"widget_access": {"$each": missing_widgets}
}
}
)
print(f"β Updated document (matched: {result.matched_count}, modified: {result.modified_count})\n")
# Test the actual query used by the API
print(f"{'='*80}")
print(f"Testing API Query")
print(f"{'='*80}\n")
test_query = {
"merchant_id": MERCHANT_ID,
"role_id": ROLE_ID,
"widget_access": test_widget
}
print(f"Query: {test_query}\n")
result = await mongo_db["access_roles"].find_one(test_query)
if result:
print("β
SUCCESS! Query returned a document")
print(" The API should now grant access to this widget\n")
else:
print("β FAILED! Query returned None")
print(" The API will return 403 Forbidden\n")
# Debug: show what's actually in the database
all_docs = await mongo_db["access_roles"].find({
"merchant_id": MERCHANT_ID,
"role_id": ROLE_ID
}).to_list(length=10)
print(f"Found {len(all_docs)} document(s) for this merchant/role:")
for doc in all_docs:
print(f"\n Document ID: {doc.get('_id')}")
print(f" merchant_id: {doc.get('merchant_id')}")
print(f" role_id: {doc.get('role_id')}")
print(f" widget_access type: {type(doc.get('widget_access'))}")
print(f" widget_access length: {len(doc.get('widget_access', []))}")
print(f" widget_access sample: {doc.get('widget_access', [])[:3]}")
print(f"\n{'='*80}")
print(f"Check Complete")
print(f"{'='*80}\n")
if __name__ == "__main__":
asyncio.run(check_and_fix_widget_access())
|