Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| from typing import TYPE_CHECKING | |
| from loguru import logger | |
| from sqlalchemy import delete | |
| from sqlalchemy import exc as sqlalchemy_exc | |
| from sqlmodel import col, select | |
| from langflow.services.auth.utils import create_super_user, verify_password | |
| from langflow.services.cache.factory import CacheServiceFactory | |
| from langflow.services.database.models.transactions.model import TransactionTable | |
| from langflow.services.database.models.vertex_builds.model import VertexBuildTable | |
| from langflow.services.database.utils import initialize_database | |
| from langflow.services.schema import ServiceType | |
| from langflow.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD | |
| from .deps import get_db_service, get_service, get_settings_service | |
| if TYPE_CHECKING: | |
| from sqlmodel.ext.asyncio.session import AsyncSession | |
| from langflow.services.settings.manager import SettingsService | |
| async def get_or_create_super_user(session: AsyncSession, username, password, is_default): | |
| from langflow.services.database.models.user.model import User | |
| stmt = select(User).where(User.username == username) | |
| user = (await session.exec(stmt)).first() | |
| if user and user.is_superuser: | |
| return None # Superuser already exists | |
| if user and is_default: | |
| if user.is_superuser: | |
| if verify_password(password, user.password): | |
| return None | |
| # Superuser exists but password is incorrect | |
| # which means that the user has changed the | |
| # base superuser credentials. | |
| # This means that the user has already created | |
| # a superuser and changed the password in the UI | |
| # so we don't need to do anything. | |
| logger.debug( | |
| "Superuser exists but password is incorrect. " | |
| "This means that the user has changed the " | |
| "base superuser credentials." | |
| ) | |
| return None | |
| logger.debug("User with superuser credentials exists but is not a superuser.") | |
| return None | |
| if user: | |
| if verify_password(password, user.password): | |
| msg = "User with superuser credentials exists but is not a superuser." | |
| raise ValueError(msg) | |
| msg = "Incorrect superuser credentials" | |
| raise ValueError(msg) | |
| if is_default: | |
| logger.debug("Creating default superuser.") | |
| else: | |
| logger.debug("Creating superuser.") | |
| try: | |
| return await create_super_user(username, password, db=session) | |
| except Exception as exc: # noqa: BLE001 | |
| if "UNIQUE constraint failed: user.username" in str(exc): | |
| # This is to deal with workers running this | |
| # at startup and trying to create the superuser | |
| # at the same time. | |
| logger.opt(exception=True).debug("Superuser already exists.") | |
| return None | |
| logger.opt(exception=True).debug("Error creating superuser.") | |
| async def setup_superuser(settings_service, session: AsyncSession) -> None: | |
| if settings_service.auth_settings.AUTO_LOGIN: | |
| logger.debug("AUTO_LOGIN is set to True. Creating default superuser.") | |
| else: | |
| # Remove the default superuser if it exists | |
| await teardown_superuser(settings_service, session) | |
| username = settings_service.auth_settings.SUPERUSER | |
| password = settings_service.auth_settings.SUPERUSER_PASSWORD | |
| is_default = (username == DEFAULT_SUPERUSER) and (password == DEFAULT_SUPERUSER_PASSWORD) | |
| try: | |
| user = await get_or_create_super_user( | |
| session=session, username=username, password=password, is_default=is_default | |
| ) | |
| if user is not None: | |
| logger.debug("Superuser created successfully.") | |
| except Exception as exc: | |
| logger.exception(exc) | |
| msg = "Could not create superuser. Please create a superuser manually." | |
| raise RuntimeError(msg) from exc | |
| finally: | |
| settings_service.auth_settings.reset_credentials() | |
| async def teardown_superuser(settings_service, session: AsyncSession) -> None: | |
| """Teardown the superuser.""" | |
| # If AUTO_LOGIN is True, we will remove the default superuser | |
| # from the database. | |
| if not settings_service.auth_settings.AUTO_LOGIN: | |
| try: | |
| logger.debug("AUTO_LOGIN is set to False. Removing default superuser if exists.") | |
| username = DEFAULT_SUPERUSER | |
| from langflow.services.database.models.user.model import User | |
| stmt = select(User).where(User.username == username) | |
| user = (await session.exec(stmt)).first() | |
| # Check if super was ever logged in, if not delete it | |
| # if it has logged in, it means the user is using it to login | |
| if user and user.is_superuser is True and not user.last_login_at: | |
| await session.delete(user) | |
| await session.commit() | |
| logger.debug("Default superuser removed successfully.") | |
| except Exception as exc: | |
| logger.exception(exc) | |
| await session.rollback() | |
| msg = "Could not remove default superuser." | |
| raise RuntimeError(msg) from exc | |
| async def teardown_services() -> None: | |
| """Teardown all the services.""" | |
| try: | |
| async with get_db_service().with_async_session() as session: | |
| await teardown_superuser(get_settings_service(), session) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.exception(exc) | |
| try: | |
| from langflow.services.manager import service_manager | |
| await service_manager.teardown() | |
| except Exception as exc: # noqa: BLE001 | |
| logger.exception(exc) | |
| def initialize_settings_service() -> None: | |
| """Initialize the settings manager.""" | |
| from langflow.services.settings import factory as settings_factory | |
| get_service(ServiceType.SETTINGS_SERVICE, settings_factory.SettingsServiceFactory()) | |
| def initialize_session_service() -> None: | |
| """Initialize the session manager.""" | |
| from langflow.services.cache import factory as cache_factory | |
| from langflow.services.session import factory as session_service_factory | |
| initialize_settings_service() | |
| get_service( | |
| ServiceType.CACHE_SERVICE, | |
| cache_factory.CacheServiceFactory(), | |
| ) | |
| get_service( | |
| ServiceType.SESSION_SERVICE, | |
| session_service_factory.SessionServiceFactory(), | |
| ) | |
| async def clean_transactions(settings_service: SettingsService, session: AsyncSession) -> None: | |
| """Clean up old transactions from the database. | |
| This function deletes transactions that exceed the maximum number to keep (configured in settings). | |
| It orders transactions by timestamp descending and removes the oldest ones beyond the limit. | |
| Args: | |
| settings_service: The settings service containing configuration like max_transactions_to_keep | |
| session: The database session to use for the deletion | |
| Returns: | |
| None | |
| """ | |
| try: | |
| # Delete transactions using bulk delete | |
| delete_stmt = delete(TransactionTable).where( | |
| col(TransactionTable.id).in_( | |
| select(TransactionTable.id) | |
| .order_by(col(TransactionTable.timestamp).desc()) | |
| .offset(settings_service.settings.max_transactions_to_keep) | |
| ) | |
| ) | |
| await session.exec(delete_stmt) | |
| await session.commit() | |
| logger.debug("Successfully cleaned up old transactions") | |
| except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc: | |
| logger.error(f"Error cleaning up transactions: {exc!s}") | |
| await session.rollback() | |
| # Don't re-raise since this is a cleanup task | |
| async def clean_vertex_builds(settings_service: SettingsService, session: AsyncSession) -> None: | |
| """Clean up old vertex builds from the database. | |
| This function deletes vertex builds that exceed the maximum number to keep (configured in settings). | |
| It orders vertex builds by timestamp descending and removes the oldest ones beyond the limit. | |
| Args: | |
| settings_service: The settings service containing configuration like max_vertex_builds_to_keep | |
| session: The database session to use for the deletion | |
| Returns: | |
| None | |
| """ | |
| try: | |
| # Delete vertex builds using bulk delete | |
| delete_stmt = delete(VertexBuildTable).where( | |
| col(VertexBuildTable.id).in_( | |
| select(VertexBuildTable.id) | |
| .order_by(col(VertexBuildTable.timestamp).desc()) | |
| .offset(settings_service.settings.max_vertex_builds_to_keep) | |
| ) | |
| ) | |
| await session.exec(delete_stmt) | |
| await session.commit() | |
| logger.debug("Successfully cleaned up old vertex builds") | |
| except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc: | |
| logger.error(f"Error cleaning up vertex builds: {exc!s}") | |
| await session.rollback() | |
| # Don't re-raise since this is a cleanup task | |
| async def initialize_services(*, fix_migration: bool = False) -> None: | |
| """Initialize all the services needed.""" | |
| # Test cache connection | |
| get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory()) | |
| # Setup the superuser | |
| await asyncio.to_thread(initialize_database, fix_migration=fix_migration) | |
| async with get_db_service().with_async_session() as session: | |
| settings_service = get_service(ServiceType.SETTINGS_SERVICE) | |
| await setup_superuser(settings_service, session) | |
| try: | |
| await get_db_service().assign_orphaned_flows_to_superuser() | |
| except sqlalchemy_exc.IntegrityError as exc: | |
| logger.warning(f"Error assigning orphaned flows to the superuser: {exc!s}") | |
| await clean_transactions(settings_service, session) | |
| await clean_vertex_builds(settings_service, session) | |