Spaces:
Running
Running
| import contextlib | |
| import json | |
| import os | |
| from pathlib import Path | |
| from shutil import copy2 | |
| from typing import Any, Literal | |
| import orjson | |
| import yaml | |
| from loguru import logger | |
| from pydantic import field_validator | |
| from pydantic.fields import FieldInfo | |
| from pydantic_settings import BaseSettings, EnvSettingsSource, PydanticBaseSettingsSource, SettingsConfigDict | |
| from typing_extensions import override | |
| from langflow.services.settings.constants import VARIABLES_TO_GET_FROM_ENVIRONMENT | |
| # BASE_COMPONENTS_PATH = str(Path(__file__).parent / "components") | |
| BASE_COMPONENTS_PATH = str(Path(__file__).parent.parent.parent / "components") | |
| def is_list_of_any(field: FieldInfo) -> bool: | |
| """Check if the given field is a list or an optional list of any type. | |
| Args: | |
| field (FieldInfo): The field to be checked. | |
| Returns: | |
| bool: True if the field is a list or a list of any type, False otherwise. | |
| """ | |
| if field.annotation is None: | |
| return False | |
| try: | |
| union_args = field.annotation.__args__ if hasattr(field.annotation, "__args__") else [] | |
| return field.annotation.__origin__ is list or any( | |
| arg.__origin__ is list for arg in union_args if hasattr(arg, "__origin__") | |
| ) | |
| except AttributeError: | |
| return False | |
| class MyCustomSource(EnvSettingsSource): | |
| def prepare_field_value(self, field_name: str, field: FieldInfo, value: Any, value_is_complex: bool) -> Any: # type: ignore[misc] | |
| # allow comma-separated list parsing | |
| # fieldInfo contains the annotation of the field | |
| if is_list_of_any(field): | |
| if isinstance(value, str): | |
| value = value.split(",") | |
| if isinstance(value, list): | |
| return value | |
| return super().prepare_field_value(field_name, field, value, value_is_complex) | |
| class Settings(BaseSettings): | |
| # Define the default LANGFLOW_DIR | |
| config_dir: str | None = None | |
| # Define if langflow db should be saved in config dir or | |
| # in the langflow directory | |
| save_db_in_config_dir: bool = False | |
| """Define if langflow database should be saved in LANGFLOW_CONFIG_DIR or in the langflow directory | |
| (i.e. in the package directory).""" | |
| dev: bool = False | |
| """If True, Langflow will run in development mode.""" | |
| database_url: str | None = None | |
| """Database URL for Langflow. If not provided, Langflow will use a SQLite database.""" | |
| pool_size: int = 10 | |
| """The number of connections to keep open in the connection pool. If not provided, the default is 10.""" | |
| max_overflow: int = 20 | |
| """The number of connections to allow that can be opened beyond the pool size. | |
| If not provided, the default is 20.""" | |
| db_connect_timeout: int = 20 | |
| """The number of seconds to wait before giving up on a lock to released or establishing a connection to the | |
| database.""" | |
| # sqlite configuration | |
| sqlite_pragmas: dict | None = {"synchronous": "NORMAL", "journal_mode": "WAL"} | |
| """SQLite pragmas to use when connecting to the database.""" | |
| # cache configuration | |
| cache_type: Literal["async", "redis", "memory", "disk"] = "async" | |
| """The cache type can be 'async' or 'redis'.""" | |
| cache_expire: int = 3600 | |
| """The cache expire in seconds.""" | |
| variable_store: str = "db" | |
| """The store can be 'db' or 'kubernetes'.""" | |
| prometheus_enabled: bool = False | |
| """If set to True, Langflow will expose Prometheus metrics.""" | |
| prometheus_port: int = 9090 | |
| """The port on which Langflow will expose Prometheus metrics. 9090 is the default port.""" | |
| remove_api_keys: bool = False | |
| components_path: list[str] = [] | |
| langchain_cache: str = "InMemoryCache" | |
| load_flows_path: str | None = None | |
| # Redis | |
| redis_host: str = "localhost" | |
| redis_port: int = 6379 | |
| redis_db: int = 0 | |
| redis_url: str | None = None | |
| redis_cache_expire: int = 3600 | |
| # Sentry | |
| sentry_dsn: str | None = None | |
| sentry_traces_sample_rate: float | None = 1.0 | |
| sentry_profiles_sample_rate: float | None = 1.0 | |
| store: bool | None = True | |
| store_url: str | None = "https://api.langflow.store" | |
| download_webhook_url: str | None = "https://api.langflow.store/flows/trigger/ec611a61-8460-4438-b187-a4f65e5559d4" | |
| like_webhook_url: str | None = "https://api.langflow.store/flows/trigger/64275852-ec00-45c1-984e-3bff814732da" | |
| storage_type: str = "local" | |
| celery_enabled: bool = False | |
| fallback_to_env_var: bool = True | |
| """If set to True, Global Variables set in the UI will fallback to a environment variable | |
| with the same name in case Langflow fails to retrieve the variable value.""" | |
| store_environment_variables: bool = True | |
| """Whether to store environment variables as Global Variables in the database.""" | |
| variables_to_get_from_environment: list[str] = VARIABLES_TO_GET_FROM_ENVIRONMENT | |
| """List of environment variables to get from the environment and store in the database.""" | |
| worker_timeout: int = 300 | |
| """Timeout for the API calls in seconds.""" | |
| frontend_timeout: int = 0 | |
| """Timeout for the frontend API calls in seconds.""" | |
| user_agent: str = "langflow" | |
| """User agent for the API calls.""" | |
| backend_only: bool = False | |
| """If set to True, Langflow will not serve the frontend.""" | |
| # Telemetry | |
| do_not_track: bool = False | |
| """If set to True, Langflow will not track telemetry.""" | |
| telemetry_base_url: str = "https://langflow.gateway.scarf.sh" | |
| transactions_storage_enabled: bool = True | |
| """If set to True, Langflow will track transactions between flows.""" | |
| vertex_builds_storage_enabled: bool = True | |
| """If set to True, Langflow will keep track of each vertex builds (outputs) in the UI for any flow.""" | |
| # Config | |
| host: str = "127.0.0.1" | |
| """The host on which Langflow will run.""" | |
| port: int = 7860 | |
| """The port on which Langflow will run.""" | |
| workers: int = 1 | |
| """The number of workers to run.""" | |
| log_level: str = "critical" | |
| """The log level for Langflow.""" | |
| log_file: str | None = "logs/langflow.log" | |
| """The path to log file for Langflow.""" | |
| alembic_log_file: str = "alembic/alembic.log" | |
| """The path to log file for Alembic for SQLAlchemy.""" | |
| frontend_path: str | None = None | |
| """The path to the frontend directory containing build files. This is for development purposes only..""" | |
| open_browser: bool = False | |
| """If set to True, Langflow will open the browser on startup.""" | |
| auto_saving: bool = True | |
| """If set to True, Langflow will auto save flows.""" | |
| auto_saving_interval: int = 1000 | |
| """The interval in ms at which Langflow will auto save flows.""" | |
| health_check_max_retries: int = 5 | |
| """The maximum number of retries for the health check.""" | |
| max_file_size_upload: int = 100 | |
| """The maximum file size for the upload in MB.""" | |
| deactivate_tracing: bool = False | |
| """If set to True, tracing will be deactivated.""" | |
| max_transactions_to_keep: int = 3000 | |
| """The maximum number of transactions to keep in the database.""" | |
| max_vertex_builds_to_keep: int = 3000 | |
| """The maximum number of vertex builds to keep in the database.""" | |
| def set_dev(cls, value): | |
| from langflow.settings import set_dev | |
| set_dev(value) | |
| return value | |
| def set_user_agent(cls, value): | |
| if not value: | |
| value = "Langflow" | |
| import os | |
| os.environ["USER_AGENT"] = value | |
| logger.debug(f"Setting user agent to {value}") | |
| return value | |
| def set_variables_to_get_from_environment(cls, value): | |
| if isinstance(value, str): | |
| value = value.split(",") | |
| return list(set(VARIABLES_TO_GET_FROM_ENVIRONMENT + value)) | |
| def set_log_file(cls, value): | |
| if isinstance(value, Path): | |
| value = str(value) | |
| return value | |
| def set_langflow_dir(cls, value): | |
| if not value: | |
| from platformdirs import user_cache_dir | |
| # Define the app name and author | |
| app_name = "langflow" | |
| app_author = "langflow" | |
| # Get the cache directory for the application | |
| cache_dir = user_cache_dir(app_name, app_author) | |
| # Create a .langflow directory inside the cache directory | |
| value = Path(cache_dir) | |
| value.mkdir(parents=True, exist_ok=True) | |
| if isinstance(value, str): | |
| value = Path(value) | |
| if not value.exists(): | |
| value.mkdir(parents=True, exist_ok=True) | |
| return str(value) | |
| def set_database_url(cls, value, info): | |
| if not value: | |
| logger.debug("No database_url provided, trying LANGFLOW_DATABASE_URL env variable") | |
| if langflow_database_url := os.getenv("LANGFLOW_DATABASE_URL"): | |
| value = langflow_database_url | |
| logger.debug("Using LANGFLOW_DATABASE_URL env variable.") | |
| else: | |
| logger.debug("No database_url env variable, using sqlite database") | |
| # Originally, we used sqlite:///./langflow.db | |
| # so we need to migrate to the new format | |
| # if there is a database in that location | |
| if not info.data["config_dir"]: | |
| msg = "config_dir not set, please set it or provide a database_url" | |
| raise ValueError(msg) | |
| from langflow.utils.version import get_version_info | |
| from langflow.utils.version import is_pre_release as langflow_is_pre_release | |
| version = get_version_info()["version"] | |
| is_pre_release = langflow_is_pre_release(version) | |
| if info.data["save_db_in_config_dir"]: | |
| database_dir = info.data["config_dir"] | |
| logger.debug(f"Saving database to config_dir: {database_dir}") | |
| else: | |
| database_dir = Path(__file__).parent.parent.parent.resolve() | |
| logger.debug(f"Saving database to langflow directory: {database_dir}") | |
| pre_db_file_name = "langflow-pre.db" | |
| db_file_name = "langflow.db" | |
| new_pre_path = f"{database_dir}/{pre_db_file_name}" | |
| new_path = f"{database_dir}/{db_file_name}" | |
| final_path = None | |
| if is_pre_release: | |
| if Path(new_pre_path).exists(): | |
| final_path = new_pre_path | |
| elif Path(new_path).exists() and info.data["save_db_in_config_dir"]: | |
| # We need to copy the current db to the new location | |
| logger.debug("Copying existing database to new location") | |
| copy2(new_path, new_pre_path) | |
| logger.debug(f"Copied existing database to {new_pre_path}") | |
| elif Path(f"./{db_file_name}").exists() and info.data["save_db_in_config_dir"]: | |
| logger.debug("Copying existing database to new location") | |
| copy2(f"./{db_file_name}", new_pre_path) | |
| logger.debug(f"Copied existing database to {new_pre_path}") | |
| else: | |
| logger.debug(f"Creating new database at {new_pre_path}") | |
| final_path = new_pre_path | |
| elif Path(new_path).exists(): | |
| logger.debug(f"Database already exists at {new_path}, using it") | |
| final_path = new_path | |
| elif Path(f"./{db_file_name}").exists(): | |
| try: | |
| logger.debug("Copying existing database to new location") | |
| copy2(f"./{db_file_name}", new_path) | |
| logger.debug(f"Copied existing database to {new_path}") | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Failed to copy database, using default path") | |
| new_path = f"./{db_file_name}" | |
| else: | |
| final_path = new_path | |
| if final_path is None: | |
| final_path = new_pre_path if is_pre_release else new_path | |
| value = f"sqlite:///{final_path}" | |
| return value | |
| def set_components_path(cls, value): | |
| if os.getenv("LANGFLOW_COMPONENTS_PATH"): | |
| logger.debug("Adding LANGFLOW_COMPONENTS_PATH to components_path") | |
| langflow_component_path = os.getenv("LANGFLOW_COMPONENTS_PATH") | |
| if Path(langflow_component_path).exists() and langflow_component_path not in value: | |
| if isinstance(langflow_component_path, list): | |
| for path in langflow_component_path: | |
| if path not in value: | |
| value.append(path) | |
| logger.debug(f"Extending {langflow_component_path} to components_path") | |
| elif langflow_component_path not in value: | |
| value.append(langflow_component_path) | |
| logger.debug(f"Appending {langflow_component_path} to components_path") | |
| if not value: | |
| value = [BASE_COMPONENTS_PATH] | |
| logger.debug("Setting default components path to components_path") | |
| elif BASE_COMPONENTS_PATH not in value: | |
| value.append(BASE_COMPONENTS_PATH) | |
| logger.debug("Adding default components path to components_path") | |
| logger.debug(f"Components path: {value}") | |
| return value | |
| model_config = SettingsConfigDict(validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_") | |
| def update_from_yaml(self, file_path: str, *, dev: bool = False) -> None: | |
| new_settings = load_settings_from_yaml(file_path) | |
| self.components_path = new_settings.components_path or [] | |
| self.dev = dev | |
| def update_settings(self, **kwargs) -> None: | |
| logger.debug("Updating settings") | |
| for key, value in kwargs.items(): | |
| # value may contain sensitive information, so we don't want to log it | |
| if not hasattr(self, key): | |
| logger.debug(f"Key {key} not found in settings") | |
| continue | |
| logger.debug(f"Updating {key}") | |
| if isinstance(getattr(self, key), list): | |
| # value might be a '[something]' string | |
| value_ = value | |
| with contextlib.suppress(json.decoder.JSONDecodeError): | |
| value_ = orjson.loads(str(value)) | |
| if isinstance(value_, list): | |
| for item in value_: | |
| item_ = str(item) if isinstance(item, Path) else item | |
| if item_ not in getattr(self, key): | |
| getattr(self, key).append(item_) | |
| logger.debug(f"Extended {key}") | |
| else: | |
| value_ = str(value_) if isinstance(value_, Path) else value_ | |
| if value_ not in getattr(self, key): | |
| getattr(self, key).append(value_) | |
| logger.debug(f"Appended {key}") | |
| else: | |
| setattr(self, key, value) | |
| logger.debug(f"Updated {key}") | |
| logger.debug(f"{key}: {getattr(self, key)}") | |
| def settings_customise_sources( # type: ignore[misc] | |
| cls, | |
| settings_cls: type[BaseSettings], | |
| init_settings: PydanticBaseSettingsSource, | |
| env_settings: PydanticBaseSettingsSource, | |
| dotenv_settings: PydanticBaseSettingsSource, | |
| file_secret_settings: PydanticBaseSettingsSource, | |
| ) -> tuple[PydanticBaseSettingsSource, ...]: | |
| return (MyCustomSource(settings_cls),) | |
| def save_settings_to_yaml(settings: Settings, file_path: str) -> None: | |
| with Path(file_path).open("w", encoding="utf-8") as f: | |
| settings_dict = settings.model_dump() | |
| yaml.dump(settings_dict, f) | |
| def load_settings_from_yaml(file_path: str) -> Settings: | |
| # Check if a string is a valid path or a file name | |
| if "/" not in file_path: | |
| # Get current path | |
| current_path = Path(__file__).resolve().parent | |
| file_path_ = Path(current_path) / file_path | |
| else: | |
| file_path_ = Path(file_path) | |
| with file_path_.open(encoding="utf-8") as f: | |
| settings_dict = yaml.safe_load(f) | |
| settings_dict = {k.upper(): v for k, v in settings_dict.items()} | |
| for key in settings_dict: | |
| if key not in Settings.model_fields: | |
| msg = f"Key {key} not found in settings" | |
| raise KeyError(msg) | |
| logger.debug(f"Loading {len(settings_dict[key])} {key} from {file_path}") | |
| return Settings(**settings_dict) | |