Spaces:
Running
Running
| import asyncio | |
| import os | |
| import sys | |
| from dotenv import load_dotenv | |
| # Load env variables from .env file | |
| load_dotenv() | |
| # Add current directory to path so python can find the modules | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| from database import async_session | |
| from crud import create_user, get_user_by_username | |
| from schemas import UserCreate | |
| async def main(): | |
| print("=========================================") | |
| print(" KRAQEN ADMIN USER CREATOR ") | |
| print("=========================================\n") | |
| username = input("Enter new admin username: ").strip() | |
| if not username: | |
| print("β Username cannot be empty.") | |
| return | |
| import getpass | |
| password = getpass.getpass("Enter new admin password: ").strip() | |
| password_confirm = getpass.getpass("Confirm password: ").strip() | |
| if password != password_confirm: | |
| print("β Passwords do not match.") | |
| return | |
| if len(password) < 6: | |
| print("β Password must be at least 6 characters long.") | |
| return | |
| print("\nβ³ Connecting to your database...") | |
| async with async_session() as session: | |
| try: | |
| # Check if user exists | |
| existing = await get_user_by_username(session, username) | |
| if existing: | |
| print(f"β User '{username}' already exists in database.") | |
| return | |
| user_data = UserCreate(username=username, password=password) | |
| await create_user(session, user_data) | |
| print(f"\nπ Admin user '{username}' created successfully in Neon Postgres database!") | |
| except Exception as e: | |
| print(f"β Error creating user: {e}") | |
| if __name__ == "__main__": | |
| # Fix for Windows asyncio loop policy | |
| if sys.platform == 'win32': | |
| asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
| asyncio.run(main()) | |