""" Script to check and fix widget access permissions in MongoDB. This script will: 1. Check if access_roles collection exists 2. Check if the admin role has widget_access configured 3. Add widget_access if missing """ import asyncio import sys import os # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app.nosql import mongo_db from insightfy_utils.logging import get_logger logger = get_logger(__name__) MERCHANT_ID = "IN-NATUR-CHEANN-7D2B-O9BP1" ROLE_ID = "admin" # All widget IDs ALL_WIDGET_IDS = [ # Chart widgets "wid_revenue_trend_12m_001", "wid_gross_margin_trend_12m_001", "wid_channel_mix_001", "wid_top_5_skus_001", "wid_inventory_status_001", "wid_top_selling_products_30d_001", "wid_staff_performance_001", "wid_personal_sales_trend_30d_001", "wid_top_products_sold_by_me_001", # Table widgets "wid_recent_orders_001", "wid_pending_orders_001", "wid_low_stock_items_001", "wid_top_customers_30d_001", "wid_top_refunded_orders_001", "wid_expiring_stock_001", "wid_product_reorder_list_001", # KPI widgets "wid_total_revenue_001", "wid_gross_margin_pct_001", "wid_orders_count_001", "wid_aov_001", "wid_repeat_rate_001", "wid_refund_rate_001" ] async def check_and_fix_widget_access(): """Check and fix widget access permissions.""" print(f"\n{'='*80}") print(f"Checking Widget Access Permissions") print(f"{'='*80}\n") # Check if access_roles collection exists collections = await mongo_db.list_collection_names() print(f"✓ Available collections: {', '.join(collections)}\n") if "access_roles" not in collections: print("❌ access_roles collection does not exist!") print(" Creating collection...\n") await mongo_db.create_collection("access_roles") print("✓ Created access_roles collection\n") # Check for the admin role document print(f"Checking for role: merchant_id={MERCHANT_ID}, role_id={ROLE_ID}\n") role_doc = await mongo_db["access_roles"].find_one({ "merchant_id": MERCHANT_ID, "role_id": ROLE_ID }) if not role_doc: print("❌ No access_roles document found for admin role!") print(" Creating document with full permissions...\n") new_doc = { "merchant_id": MERCHANT_ID, "role_id": ROLE_ID, "permissions": { "charts": ["view", "create", "update", "delete"], "tables": ["view", "create", "update", "delete"], "kpis": ["view"], "analytics": ["view", "create", "update"], "reports": ["view", "create", "update", "delete"], "dashboard": ["view", "create", "update", "delete"] }, "widget_access": ALL_WIDGET_IDS } result = await mongo_db["access_roles"].insert_one(new_doc) print(f"✓ Created access_roles document with ID: {result.inserted_id}\n") role_doc = new_doc else: print("✓ Found access_roles document\n") print(f"Document ID: {role_doc.get('_id')}") print(f"Permissions: {role_doc.get('permissions', {})}\n") # Check widget_access field widget_access = role_doc.get("widget_access", []) if not widget_access: print("❌ widget_access field is missing or empty!") print(" Adding all widget IDs...\n") result = await mongo_db["access_roles"].update_one( { "merchant_id": MERCHANT_ID, "role_id": ROLE_ID }, { "$set": { "widget_access": ALL_WIDGET_IDS } } ) print(f"✓ Updated document (matched: {result.matched_count}, modified: {result.modified_count})\n") widget_access = ALL_WIDGET_IDS else: print(f"✓ widget_access field exists with {len(widget_access)} widgets\n") # Check specific widget test_widget = "wid_revenue_trend_12m_001" if test_widget in widget_access: print(f"✓ Test widget '{test_widget}' is in widget_access array\n") else: print(f"❌ Test widget '{test_widget}' is NOT in widget_access array!") print(f" Current widgets: {widget_access}\n") # Add missing widgets missing_widgets = [w for w in ALL_WIDGET_IDS if w not in widget_access] if missing_widgets: print(f" Adding {len(missing_widgets)} missing widgets...\n") result = await mongo_db["access_roles"].update_one( { "merchant_id": MERCHANT_ID, "role_id": ROLE_ID }, { "$addToSet": { "widget_access": {"$each": missing_widgets} } } ) print(f"✓ Updated document (matched: {result.matched_count}, modified: {result.modified_count})\n") # Test the actual query used by the API print(f"{'='*80}") print(f"Testing API Query") print(f"{'='*80}\n") test_query = { "merchant_id": MERCHANT_ID, "role_id": ROLE_ID, "widget_access": test_widget } print(f"Query: {test_query}\n") result = await mongo_db["access_roles"].find_one(test_query) if result: print("✅ SUCCESS! Query returned a document") print(" The API should now grant access to this widget\n") else: print("❌ FAILED! Query returned None") print(" The API will return 403 Forbidden\n") # Debug: show what's actually in the database all_docs = await mongo_db["access_roles"].find({ "merchant_id": MERCHANT_ID, "role_id": ROLE_ID }).to_list(length=10) print(f"Found {len(all_docs)} document(s) for this merchant/role:") for doc in all_docs: print(f"\n Document ID: {doc.get('_id')}") print(f" merchant_id: {doc.get('merchant_id')}") print(f" role_id: {doc.get('role_id')}") print(f" widget_access type: {type(doc.get('widget_access'))}") print(f" widget_access length: {len(doc.get('widget_access', []))}") print(f" widget_access sample: {doc.get('widget_access', [])[:3]}") print(f"\n{'='*80}") print(f"Check Complete") print(f"{'='*80}\n") if __name__ == "__main__": asyncio.run(check_and_fix_widget_access())