docs(widget-permissions): Add comprehensive documentation and debugging tools for widget access control
307aee3 | """ | |
| Script to check and fix widget access permissions in MongoDB. | |
| This script will: | |
| 1. Check if access_roles collection exists | |
| 2. Check if the admin role has widget_access configured | |
| 3. Add widget_access if missing | |
| """ | |
| import asyncio | |
| import sys | |
| import os | |
| # Add parent directory to path | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from app.nosql import mongo_db | |
| from insightfy_utils.logging import get_logger | |
| logger = get_logger(__name__) | |
| MERCHANT_ID = "IN-NATUR-CHEANN-7D2B-O9BP1" | |
| ROLE_ID = "admin" | |
| # All widget IDs | |
| ALL_WIDGET_IDS = [ | |
| # Chart widgets | |
| "wid_revenue_trend_12m_001", | |
| "wid_gross_margin_trend_12m_001", | |
| "wid_channel_mix_001", | |
| "wid_top_5_skus_001", | |
| "wid_inventory_status_001", | |
| "wid_top_selling_products_30d_001", | |
| "wid_staff_performance_001", | |
| "wid_personal_sales_trend_30d_001", | |
| "wid_top_products_sold_by_me_001", | |
| # Table widgets | |
| "wid_recent_orders_001", | |
| "wid_pending_orders_001", | |
| "wid_low_stock_items_001", | |
| "wid_top_customers_30d_001", | |
| "wid_top_refunded_orders_001", | |
| "wid_expiring_stock_001", | |
| "wid_product_reorder_list_001", | |
| # KPI widgets | |
| "wid_total_revenue_001", | |
| "wid_gross_margin_pct_001", | |
| "wid_orders_count_001", | |
| "wid_aov_001", | |
| "wid_repeat_rate_001", | |
| "wid_refund_rate_001" | |
| ] | |
| async def check_and_fix_widget_access(): | |
| """Check and fix widget access permissions.""" | |
| print(f"\n{'='*80}") | |
| print(f"Checking Widget Access Permissions") | |
| print(f"{'='*80}\n") | |
| # Check if access_roles collection exists | |
| collections = await mongo_db.list_collection_names() | |
| print(f"β Available collections: {', '.join(collections)}\n") | |
| if "access_roles" not in collections: | |
| print("β access_roles collection does not exist!") | |
| print(" Creating collection...\n") | |
| await mongo_db.create_collection("access_roles") | |
| print("β Created access_roles collection\n") | |
| # Check for the admin role document | |
| print(f"Checking for role: merchant_id={MERCHANT_ID}, role_id={ROLE_ID}\n") | |
| role_doc = await mongo_db["access_roles"].find_one({ | |
| "merchant_id": MERCHANT_ID, | |
| "role_id": ROLE_ID | |
| }) | |
| if not role_doc: | |
| print("β No access_roles document found for admin role!") | |
| print(" Creating document with full permissions...\n") | |
| new_doc = { | |
| "merchant_id": MERCHANT_ID, | |
| "role_id": ROLE_ID, | |
| "permissions": { | |
| "charts": ["view", "create", "update", "delete"], | |
| "tables": ["view", "create", "update", "delete"], | |
| "kpis": ["view"], | |
| "analytics": ["view", "create", "update"], | |
| "reports": ["view", "create", "update", "delete"], | |
| "dashboard": ["view", "create", "update", "delete"] | |
| }, | |
| "widget_access": ALL_WIDGET_IDS | |
| } | |
| result = await mongo_db["access_roles"].insert_one(new_doc) | |
| print(f"β Created access_roles document with ID: {result.inserted_id}\n") | |
| role_doc = new_doc | |
| else: | |
| print("β Found access_roles document\n") | |
| print(f"Document ID: {role_doc.get('_id')}") | |
| print(f"Permissions: {role_doc.get('permissions', {})}\n") | |
| # Check widget_access field | |
| widget_access = role_doc.get("widget_access", []) | |
| if not widget_access: | |
| print("β widget_access field is missing or empty!") | |
| print(" Adding all widget IDs...\n") | |
| result = await mongo_db["access_roles"].update_one( | |
| { | |
| "merchant_id": MERCHANT_ID, | |
| "role_id": ROLE_ID | |
| }, | |
| { | |
| "$set": { | |
| "widget_access": ALL_WIDGET_IDS | |
| } | |
| } | |
| ) | |
| print(f"β Updated document (matched: {result.matched_count}, modified: {result.modified_count})\n") | |
| widget_access = ALL_WIDGET_IDS | |
| else: | |
| print(f"β widget_access field exists with {len(widget_access)} widgets\n") | |
| # Check specific widget | |
| test_widget = "wid_revenue_trend_12m_001" | |
| if test_widget in widget_access: | |
| print(f"β Test widget '{test_widget}' is in widget_access array\n") | |
| else: | |
| print(f"β Test widget '{test_widget}' is NOT in widget_access array!") | |
| print(f" Current widgets: {widget_access}\n") | |
| # Add missing widgets | |
| missing_widgets = [w for w in ALL_WIDGET_IDS if w not in widget_access] | |
| if missing_widgets: | |
| print(f" Adding {len(missing_widgets)} missing widgets...\n") | |
| result = await mongo_db["access_roles"].update_one( | |
| { | |
| "merchant_id": MERCHANT_ID, | |
| "role_id": ROLE_ID | |
| }, | |
| { | |
| "$addToSet": { | |
| "widget_access": {"$each": missing_widgets} | |
| } | |
| } | |
| ) | |
| print(f"β Updated document (matched: {result.matched_count}, modified: {result.modified_count})\n") | |
| # Test the actual query used by the API | |
| print(f"{'='*80}") | |
| print(f"Testing API Query") | |
| print(f"{'='*80}\n") | |
| test_query = { | |
| "merchant_id": MERCHANT_ID, | |
| "role_id": ROLE_ID, | |
| "widget_access": test_widget | |
| } | |
| print(f"Query: {test_query}\n") | |
| result = await mongo_db["access_roles"].find_one(test_query) | |
| if result: | |
| print("β SUCCESS! Query returned a document") | |
| print(" The API should now grant access to this widget\n") | |
| else: | |
| print("β FAILED! Query returned None") | |
| print(" The API will return 403 Forbidden\n") | |
| # Debug: show what's actually in the database | |
| all_docs = await mongo_db["access_roles"].find({ | |
| "merchant_id": MERCHANT_ID, | |
| "role_id": ROLE_ID | |
| }).to_list(length=10) | |
| print(f"Found {len(all_docs)} document(s) for this merchant/role:") | |
| for doc in all_docs: | |
| print(f"\n Document ID: {doc.get('_id')}") | |
| print(f" merchant_id: {doc.get('merchant_id')}") | |
| print(f" role_id: {doc.get('role_id')}") | |
| print(f" widget_access type: {type(doc.get('widget_access'))}") | |
| print(f" widget_access length: {len(doc.get('widget_access', []))}") | |
| print(f" widget_access sample: {doc.get('widget_access', [])[:3]}") | |
| print(f"\n{'='*80}") | |
| print(f"Check Complete") | |
| print(f"{'='*80}\n") | |
| if __name__ == "__main__": | |
| asyncio.run(check_and_fix_widget_access()) | |