Spaces:
Running
Running
| import copy | |
| import json | |
| import shutil | |
| import time | |
| from collections import defaultdict | |
| from copy import deepcopy | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from uuid import UUID | |
| import orjson | |
| from aiofile import async_open | |
| from emoji import demojize, purely_emoji | |
| from loguru import logger | |
| from sqlalchemy.exc import NoResultFound | |
| from sqlmodel import select | |
| from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS | |
| from langflow.initial_setup.constants import STARTER_FOLDER_DESCRIPTION, STARTER_FOLDER_NAME | |
| from langflow.services.auth.utils import create_super_user | |
| from langflow.services.database.models.flow.model import Flow, FlowCreate | |
| from langflow.services.database.models.folder.model import Folder, FolderCreate | |
| from langflow.services.database.models.folder.utils import ( | |
| create_default_folder_if_it_doesnt_exist, | |
| get_default_folder_id, | |
| ) | |
| from langflow.services.database.models.user.crud import get_user_by_username | |
| from langflow.services.deps import ( | |
| async_session_scope, | |
| get_settings_service, | |
| get_storage_service, | |
| get_variable_service, | |
| session_scope, | |
| ) | |
| from langflow.template.field.prompt import DEFAULT_PROMPT_INTUT_TYPES | |
| from langflow.utils.util import escape_json_dump | |
| # In the folder ./starter_projects we have a few JSON files that represent | |
| # starter projects. We want to load these into the database so that users | |
| # can use them as a starting point for their own projects. | |
| def update_projects_components_with_latest_component_versions(project_data, all_types_dict): | |
| # Flatten the all_types_dict for easy access | |
| all_types_dict_flat = {} | |
| for category in all_types_dict.values(): | |
| for key, component in category.items(): | |
| all_types_dict_flat[key] = component # noqa: PERF403 | |
| node_changes_log = defaultdict(list) | |
| project_data_copy = deepcopy(project_data) | |
| for node in project_data_copy.get("nodes", []): | |
| node_data = node.get("data").get("node") | |
| node_type = node.get("data").get("type") | |
| # Skip updating if tool_mode is True | |
| if node_data.get("tool_mode", False): | |
| continue | |
| # Skip nodes with outputs of the specified format | |
| # NOTE: to account for the fact that the Simple Agent has dynamic outputs | |
| if any(output.get("types") == ["Tool"] for output in node_data.get("outputs", [])): | |
| continue | |
| if node_type in all_types_dict_flat: | |
| latest_node = all_types_dict_flat.get(node_type) | |
| latest_template = latest_node.get("template") | |
| node_data["template"]["code"] = latest_template["code"] | |
| if "outputs" in latest_node: | |
| node_data["outputs"] = latest_node["outputs"] | |
| if node_data["template"]["_type"] != latest_template["_type"]: | |
| node_data["template"]["_type"] = latest_template["_type"] | |
| if node_type != "Prompt": | |
| node_data["template"] = latest_template | |
| else: | |
| for key, value in latest_template.items(): | |
| if key not in node_data["template"]: | |
| node_changes_log[node_type].append( | |
| { | |
| "attr": key, | |
| "old_value": None, | |
| "new_value": value, | |
| } | |
| ) | |
| node_data["template"][key] = value | |
| elif isinstance(value, dict) and value.get("value"): | |
| node_changes_log[node_type].append( | |
| { | |
| "attr": key, | |
| "old_value": node_data["template"][key], | |
| "new_value": value, | |
| } | |
| ) | |
| node_data["template"][key]["value"] = value["value"] | |
| for key in node_data["template"]: | |
| if key not in latest_template: | |
| node_data["template"][key]["input_types"] = DEFAULT_PROMPT_INTUT_TYPES | |
| node_changes_log[node_type].append( | |
| { | |
| "attr": "_type", | |
| "old_value": node_data["template"]["_type"], | |
| "new_value": latest_template["_type"], | |
| } | |
| ) | |
| else: | |
| for attr in NODE_FORMAT_ATTRIBUTES: | |
| if ( | |
| attr in latest_node | |
| # Check if it needs to be updated | |
| and latest_node[attr] != node_data.get(attr) | |
| ): | |
| node_changes_log[node_type].append( | |
| { | |
| "attr": attr, | |
| "old_value": node_data.get(attr), | |
| "new_value": latest_node[attr], | |
| } | |
| ) | |
| node_data[attr] = latest_node[attr] | |
| for field_name, field_dict in latest_template.items(): | |
| if field_name not in node_data["template"]: | |
| node_data["template"][field_name] = field_dict | |
| continue | |
| # The idea here is to update some attributes of the field | |
| to_check_attributes = FIELD_FORMAT_ATTRIBUTES | |
| for attr in to_check_attributes: | |
| if ( | |
| attr in field_dict | |
| and attr in node_data["template"].get(field_name) | |
| # Check if it needs to be updated | |
| and field_dict[attr] != node_data["template"][field_name][attr] | |
| ): | |
| node_changes_log[node_type].append( | |
| { | |
| "attr": f"{field_name}.{attr}", | |
| "old_value": node_data["template"][field_name][attr], | |
| "new_value": field_dict[attr], | |
| } | |
| ) | |
| node_data["template"][field_name][attr] = field_dict[attr] | |
| # Remove fields that are not in the latest template | |
| if node_type != "Prompt": | |
| for field_name in list(node_data["template"].keys()): | |
| if field_name not in latest_template: | |
| node_data["template"].pop(field_name) | |
| log_node_changes(node_changes_log) | |
| return project_data_copy | |
| def scape_json_parse(json_string: str) -> dict: | |
| if isinstance(json_string, dict): | |
| return json_string | |
| parsed_string = json_string.replace("œ", '"') | |
| return json.loads(parsed_string) | |
| def update_new_output(data): | |
| nodes = copy.deepcopy(data["nodes"]) | |
| edges = copy.deepcopy(data["edges"]) | |
| for edge in edges: | |
| if "sourceHandle" in edge and "targetHandle" in edge: | |
| new_source_handle = scape_json_parse(edge["sourceHandle"]) | |
| new_target_handle = scape_json_parse(edge["targetHandle"]) | |
| id_ = new_source_handle["id"] | |
| source_node_index = next((index for (index, d) in enumerate(nodes) if d["id"] == id_), -1) | |
| source_node = nodes[source_node_index] if source_node_index != -1 else None | |
| if "baseClasses" in new_source_handle: | |
| if "output_types" not in new_source_handle: | |
| if source_node and "node" in source_node["data"] and "output_types" in source_node["data"]["node"]: | |
| new_source_handle["output_types"] = source_node["data"]["node"]["output_types"] | |
| else: | |
| new_source_handle["output_types"] = new_source_handle["baseClasses"] | |
| del new_source_handle["baseClasses"] | |
| if new_target_handle.get("inputTypes"): | |
| intersection = [ | |
| type_ for type_ in new_source_handle["output_types"] if type_ in new_target_handle["inputTypes"] | |
| ] | |
| else: | |
| intersection = [ | |
| type_ for type_ in new_source_handle["output_types"] if type_ == new_target_handle["type"] | |
| ] | |
| selected = intersection[0] if intersection else None | |
| if "name" not in new_source_handle: | |
| new_source_handle["name"] = " | ".join(new_source_handle["output_types"]) | |
| new_source_handle["output_types"] = [selected] if selected else [] | |
| if source_node and not source_node["data"]["node"].get("outputs"): | |
| if "outputs" not in source_node["data"]["node"]: | |
| source_node["data"]["node"]["outputs"] = [] | |
| types = source_node["data"]["node"].get( | |
| "output_types", source_node["data"]["node"].get("base_classes", []) | |
| ) | |
| if not any(output.get("selected") == selected for output in source_node["data"]["node"]["outputs"]): | |
| source_node["data"]["node"]["outputs"].append( | |
| { | |
| "types": types, | |
| "selected": selected, | |
| "name": " | ".join(types), | |
| "display_name": " | ".join(types), | |
| } | |
| ) | |
| deduplicated_outputs = [] | |
| if source_node is None: | |
| source_node = {"data": {"node": {"outputs": []}}} | |
| for output in source_node["data"]["node"]["outputs"]: | |
| if output["name"] not in [d["name"] for d in deduplicated_outputs]: | |
| deduplicated_outputs.append(output) | |
| source_node["data"]["node"]["outputs"] = deduplicated_outputs | |
| edge["sourceHandle"] = escape_json_dump(new_source_handle) | |
| edge["data"]["sourceHandle"] = new_source_handle | |
| edge["data"]["targetHandle"] = new_target_handle | |
| # The above sets the edges but some of the sourceHandles do not have valid name | |
| # which can be found in the nodes. We need to update the sourceHandle with the | |
| # name from node['data']['node']['outputs'] | |
| for node in nodes: | |
| if "outputs" in node["data"]["node"]: | |
| for output in node["data"]["node"]["outputs"]: | |
| for edge in edges: | |
| if node["id"] != edge["source"] or output.get("method") is None: | |
| continue | |
| source_handle = scape_json_parse(edge["sourceHandle"]) | |
| if source_handle["output_types"] == output.get("types") and source_handle["name"] != output["name"]: | |
| source_handle["name"] = output["name"] | |
| if isinstance(source_handle, str): | |
| source_handle = scape_json_parse(source_handle) | |
| edge["sourceHandle"] = escape_json_dump(source_handle) | |
| edge["data"]["sourceHandle"] = source_handle | |
| data_copy = copy.deepcopy(data) | |
| data_copy["nodes"] = nodes | |
| data_copy["edges"] = edges | |
| return data_copy | |
| def update_edges_with_latest_component_versions(project_data): | |
| edge_changes_log = defaultdict(list) | |
| project_data_copy = deepcopy(project_data) | |
| for edge in project_data_copy.get("edges", []): | |
| source_handle = edge.get("data").get("sourceHandle") | |
| source_handle = scape_json_parse(source_handle) | |
| target_handle = edge.get("data").get("targetHandle") | |
| target_handle = scape_json_parse(target_handle) | |
| # Now find the source and target nodes in the nodes list | |
| source_node = next( | |
| (node for node in project_data.get("nodes", []) if node.get("id") == edge.get("source")), | |
| None, | |
| ) | |
| target_node = next( | |
| (node for node in project_data.get("nodes", []) if node.get("id") == edge.get("target")), | |
| None, | |
| ) | |
| if source_node and target_node: | |
| source_node_data = source_node.get("data").get("node") | |
| target_node_data = target_node.get("data").get("node") | |
| output_data = next( | |
| (output for output in source_node_data.get("outputs", []) if output["name"] == source_handle["name"]), | |
| None, | |
| ) | |
| if not output_data: | |
| output_data = next( | |
| ( | |
| output | |
| for output in source_node_data.get("outputs", []) | |
| if output["display_name"] == source_handle["name"] | |
| ), | |
| None, | |
| ) | |
| if output_data: | |
| source_handle["name"] = output_data["name"] | |
| if output_data: | |
| if len(output_data.get("types")) == 1: | |
| new_output_types = output_data.get("types") | |
| elif output_data.get("selected"): | |
| new_output_types = [output_data.get("selected")] | |
| else: | |
| new_output_types = [] | |
| else: | |
| new_output_types = [] | |
| if source_handle["output_types"] != new_output_types: | |
| edge_changes_log[source_node_data["display_name"]].append( | |
| { | |
| "attr": "output_types", | |
| "old_value": source_handle["output_types"], | |
| "new_value": new_output_types, | |
| } | |
| ) | |
| source_handle["output_types"] = new_output_types | |
| field_name = target_handle.get("fieldName") | |
| if field_name in target_node_data.get("template") and target_handle["inputTypes"] != target_node_data.get( | |
| "template" | |
| ).get(field_name).get("input_types"): | |
| edge_changes_log[target_node_data["display_name"]].append( | |
| { | |
| "attr": "inputTypes", | |
| "old_value": target_handle["inputTypes"], | |
| "new_value": target_node_data.get("template").get(field_name).get("input_types"), | |
| } | |
| ) | |
| target_handle["inputTypes"] = target_node_data.get("template").get(field_name).get("input_types") | |
| escaped_source_handle = escape_json_dump(source_handle) | |
| escaped_target_handle = escape_json_dump(target_handle) | |
| try: | |
| old_escape_source_handle = escape_json_dump(json.loads(edge["sourceHandle"])) | |
| except json.JSONDecodeError: | |
| old_escape_source_handle = edge["sourceHandle"] | |
| try: | |
| old_escape_target_handle = escape_json_dump(json.loads(edge["targetHandle"])) | |
| except json.JSONDecodeError: | |
| old_escape_target_handle = edge["targetHandle"] | |
| if old_escape_source_handle != escaped_source_handle: | |
| edge_changes_log[source_node_data["display_name"]].append( | |
| { | |
| "attr": "sourceHandle", | |
| "old_value": old_escape_source_handle, | |
| "new_value": escaped_source_handle, | |
| } | |
| ) | |
| edge["sourceHandle"] = escaped_source_handle | |
| if old_escape_target_handle != escaped_target_handle: | |
| edge_changes_log[target_node_data["display_name"]].append( | |
| { | |
| "attr": "targetHandle", | |
| "old_value": old_escape_target_handle, | |
| "new_value": escaped_target_handle, | |
| } | |
| ) | |
| edge["targetHandle"] = escaped_target_handle | |
| else: | |
| logger.error(f"Source or target node not found for edge: {edge}") | |
| log_node_changes(edge_changes_log) | |
| return project_data_copy | |
| def log_node_changes(node_changes_log) -> None: | |
| # The idea here is to log the changes that were made to the nodes in debug | |
| # Something like: | |
| # Node: "Node Name" was updated with the following changes: | |
| # attr_name: old_value -> new_value | |
| # let's create one log per node | |
| formatted_messages = [] | |
| for node_name, changes in node_changes_log.items(): | |
| message = f"\nNode: {node_name} was updated with the following changes:" | |
| for change in changes: | |
| message += f"\n- {change['attr']}: {change['old_value']} -> {change['new_value']}" | |
| formatted_messages.append(message) | |
| if formatted_messages: | |
| logger.debug("\n".join(formatted_messages)) | |
| def load_starter_projects(retries=3, delay=1) -> list[tuple[Path, dict]]: | |
| starter_projects = [] | |
| folder = Path(__file__).parent / "starter_projects" | |
| for file in folder.glob("*.json"): | |
| attempt = 0 | |
| while attempt < retries: | |
| content = file.read_text(encoding="utf-8") | |
| try: | |
| project = orjson.loads(content) | |
| starter_projects.append((file, project)) | |
| logger.info(f"Loaded starter project {file}") | |
| break # Break if load is successful | |
| except orjson.JSONDecodeError as e: | |
| attempt += 1 | |
| if attempt >= retries: | |
| msg = f"Error loading starter project {file}: {e}" | |
| raise ValueError(msg) from e | |
| time.sleep(delay) # Wait before retrying | |
| return starter_projects | |
| def copy_profile_pictures() -> None: | |
| config_dir = get_storage_service().settings_service.settings.config_dir | |
| if config_dir is None: | |
| msg = "Config dir is not set in the settings" | |
| raise ValueError(msg) | |
| origin = Path(__file__).parent / "profile_pictures" | |
| target = Path(config_dir) / "profile_pictures" | |
| if not origin.exists(): | |
| msg = f"The source folder '{origin}' does not exist." | |
| raise ValueError(msg) | |
| if not target.exists(): | |
| target.mkdir(parents=True) | |
| try: | |
| shutil.copytree(origin, target, dirs_exist_ok=True) | |
| logger.debug(f"Folder copied from '{origin}' to '{target}'") | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Error copying the folder") | |
| def get_project_data(project): | |
| project_name = project.get("name") | |
| project_description = project.get("description") | |
| project_is_component = project.get("is_component") | |
| project_updated_at = project.get("updated_at") | |
| if not project_updated_at: | |
| updated_at_datetime = datetime.now(tz=timezone.utc) | |
| else: | |
| updated_at_datetime = datetime.fromisoformat(project_updated_at) | |
| project_data = project.get("data") | |
| project_icon = project.get("icon") | |
| project_icon = demojize(project_icon) if project_icon and purely_emoji(project_icon) else project_icon | |
| project_icon_bg_color = project.get("icon_bg_color") | |
| project_gradient = project.get("gradient") | |
| project_tags = project.get("tags") | |
| return ( | |
| project_name, | |
| project_description, | |
| project_is_component, | |
| updated_at_datetime, | |
| project_data, | |
| project_icon, | |
| project_icon_bg_color, | |
| project_gradient, | |
| project_tags, | |
| ) | |
| def update_project_file(project_path: Path, project: dict, updated_project_data) -> None: | |
| project["data"] = updated_project_data | |
| project_path.write_text(orjson.dumps(project, option=ORJSON_OPTIONS).decode(), encoding="utf-8") | |
| logger.info(f"Updated starter project {project['name']} file") | |
| def update_existing_project( | |
| existing_project, | |
| project_name, | |
| project_description, | |
| project_is_component, | |
| updated_at_datetime, | |
| project_data, | |
| project_icon, | |
| project_icon_bg_color, | |
| ) -> None: | |
| logger.info(f"Updating starter project {project_name}") | |
| existing_project.data = project_data | |
| existing_project.folder = STARTER_FOLDER_NAME | |
| existing_project.description = project_description | |
| existing_project.is_component = project_is_component | |
| existing_project.updated_at = updated_at_datetime | |
| existing_project.icon = project_icon | |
| existing_project.icon_bg_color = project_icon_bg_color | |
| def create_new_project( | |
| session, | |
| project_name, | |
| project_description, | |
| project_is_component, | |
| updated_at_datetime, | |
| project_data, | |
| project_gradient, | |
| project_tags, | |
| project_icon, | |
| project_icon_bg_color, | |
| new_folder_id, | |
| ) -> None: | |
| logger.debug(f"Creating starter project {project_name}") | |
| new_project = FlowCreate( | |
| name=project_name, | |
| description=project_description, | |
| icon=project_icon, | |
| icon_bg_color=project_icon_bg_color, | |
| data=project_data, | |
| is_component=project_is_component, | |
| updated_at=updated_at_datetime, | |
| folder_id=new_folder_id, | |
| gradient=project_gradient, | |
| tags=project_tags, | |
| ) | |
| db_flow = Flow.model_validate(new_project, from_attributes=True) | |
| session.add(db_flow) | |
| def get_all_flows_similar_to_project(session, folder_id): | |
| return session.exec(select(Folder).where(Folder.id == folder_id)).first().flows | |
| def delete_start_projects(session, folder_id) -> None: | |
| flows = session.exec(select(Folder).where(Folder.id == folder_id)).first().flows | |
| for flow in flows: | |
| session.delete(flow) | |
| session.commit() | |
| def folder_exists(session, folder_name): | |
| folder = session.exec(select(Folder).where(Folder.name == folder_name)).first() | |
| return folder is not None | |
| def create_starter_folder(session): | |
| if not folder_exists(session, STARTER_FOLDER_NAME): | |
| new_folder = FolderCreate(name=STARTER_FOLDER_NAME, description=STARTER_FOLDER_DESCRIPTION) | |
| db_folder = Folder.model_validate(new_folder, from_attributes=True) | |
| session.add(db_folder) | |
| session.commit() | |
| session.refresh(db_folder) | |
| return db_folder | |
| return session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first() | |
| def _is_valid_uuid(val): | |
| try: | |
| uuid_obj = UUID(val) | |
| except ValueError: | |
| return False | |
| return str(uuid_obj) == val | |
| async def load_flows_from_directory() -> None: | |
| """On langflow startup, this loads all flows from the directory specified in the settings. | |
| All flows are uploaded into the default folder for the superuser. | |
| Note that this feature currently only works if AUTO_LOGIN is enabled in the settings. | |
| """ | |
| settings_service = get_settings_service() | |
| flows_path = settings_service.settings.load_flows_path | |
| if not flows_path: | |
| return | |
| if not settings_service.auth_settings.AUTO_LOGIN: | |
| logger.warning("AUTO_LOGIN is disabled, not loading flows from directory") | |
| return | |
| async with async_session_scope() as session: | |
| user = await get_user_by_username(session, settings_service.auth_settings.SUPERUSER) | |
| if user is None: | |
| msg = "Superuser not found in the database" | |
| raise NoResultFound(msg) | |
| user_id = user.id | |
| flows_path_ = Path(flows_path) | |
| files = [f for f in flows_path_.iterdir() if f.is_file()] | |
| for file_path in files: | |
| if file_path.suffix != ".json": | |
| continue | |
| logger.info(f"Loading flow from file: {file_path.name}") | |
| async with async_open(file_path, "r", encoding="utf-8") as f: | |
| content = await f.read() | |
| flow = orjson.loads(content) | |
| no_json_name = file_path.stem | |
| flow_endpoint_name = flow.get("endpoint_name") | |
| if _is_valid_uuid(no_json_name): | |
| flow["id"] = no_json_name | |
| flow_id = flow.get("id") | |
| existing = await find_existing_flow(session, flow_id, flow_endpoint_name) | |
| if existing: | |
| logger.debug(f"Found existing flow: {existing.name}") | |
| logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}") | |
| for key, value in flow.items(): | |
| if hasattr(existing, key): | |
| # flow dict from json and db representation are not 100% the same | |
| setattr(existing, key, value) | |
| existing.updated_at = datetime.now(tz=timezone.utc).astimezone() | |
| existing.user_id = user_id | |
| # Generally, folder_id should not be None, but we must check this due to the previous | |
| # behavior where flows could be added and folder_id was None, orphaning | |
| # them within Langflow. | |
| if existing.folder_id is None: | |
| folder_id = await get_default_folder_id(session, user_id) | |
| existing.folder_id = folder_id | |
| session.add(existing) | |
| else: | |
| logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}") | |
| # Current behavior loads all new flows into default folder | |
| folder_id = await get_default_folder_id(session, user_id) | |
| flow["user_id"] = user_id | |
| flow["folder_id"] = folder_id | |
| flow = Flow.model_validate(flow, from_attributes=True) | |
| flow.updated_at = datetime.now(tz=timezone.utc).astimezone() | |
| session.add(flow) | |
| async def find_existing_flow(session, flow_id, flow_endpoint_name): | |
| if flow_endpoint_name: | |
| logger.debug(f"flow_endpoint_name: {flow_endpoint_name}") | |
| stmt = select(Flow).where(Flow.endpoint_name == flow_endpoint_name) | |
| if existing := (await session.exec(stmt)).first(): | |
| logger.debug(f"Found existing flow by endpoint name: {existing.name}") | |
| return existing | |
| stmt = select(Flow).where(Flow.id == flow_id) | |
| if existing := (await session.exec(stmt)).first(): | |
| logger.debug(f"Found existing flow by id: {flow_id}") | |
| return existing | |
| return None | |
| def create_or_update_starter_projects(all_types_dict: dict) -> None: | |
| with session_scope() as session: | |
| new_folder = create_starter_folder(session) | |
| starter_projects = load_starter_projects() | |
| delete_start_projects(session, new_folder.id) | |
| copy_profile_pictures() | |
| for project_path, project in starter_projects: | |
| ( | |
| project_name, | |
| project_description, | |
| project_is_component, | |
| updated_at_datetime, | |
| project_data, | |
| project_icon, | |
| project_icon_bg_color, | |
| project_gradient, | |
| project_tags, | |
| ) = get_project_data(project) | |
| updated_project_data = update_projects_components_with_latest_component_versions( | |
| project_data.copy(), all_types_dict | |
| ) | |
| updated_project_data = update_edges_with_latest_component_versions(updated_project_data) | |
| if updated_project_data != project_data: | |
| project_data = updated_project_data | |
| # We also need to update the project data in the file | |
| update_project_file(project_path, project, updated_project_data) | |
| if project_name and project_data: | |
| for existing_project in get_all_flows_similar_to_project(session, new_folder.id): | |
| session.delete(existing_project) | |
| create_new_project( | |
| session=session, | |
| project_name=project_name, | |
| project_description=project_description, | |
| project_is_component=project_is_component, | |
| updated_at_datetime=updated_at_datetime, | |
| project_data=project_data, | |
| project_icon=project_icon, | |
| project_icon_bg_color=project_icon_bg_color, | |
| project_gradient=project_gradient, | |
| project_tags=project_tags, | |
| new_folder_id=new_folder.id, | |
| ) | |
| async def initialize_super_user_if_needed() -> None: | |
| settings_service = get_settings_service() | |
| if not settings_service.auth_settings.AUTO_LOGIN: | |
| return | |
| username = settings_service.auth_settings.SUPERUSER | |
| password = settings_service.auth_settings.SUPERUSER_PASSWORD | |
| if not username or not password: | |
| msg = "SUPERUSER and SUPERUSER_PASSWORD must be set in the settings if AUTO_LOGIN is true." | |
| raise ValueError(msg) | |
| async with async_session_scope() as async_session: | |
| super_user = await create_super_user(db=async_session, username=username, password=password) | |
| await get_variable_service().initialize_user_variables(super_user.id, async_session) | |
| await create_default_folder_if_it_doesnt_exist(async_session, super_user.id) | |
| logger.info("Super user initialized") | |