|
|
import copy |
|
|
import json |
|
|
import shutil |
|
|
import time |
|
|
from collections import defaultdict |
|
|
from copy import deepcopy |
|
|
from datetime import datetime, timezone |
|
|
from pathlib import Path |
|
|
from uuid import UUID |
|
|
|
|
|
import orjson |
|
|
from aiofile import async_open |
|
|
from emoji import demojize, purely_emoji |
|
|
from loguru import logger |
|
|
from sqlalchemy.exc import NoResultFound |
|
|
from sqlmodel import select |
|
|
|
|
|
from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS |
|
|
from langflow.initial_setup.constants import STARTER_FOLDER_DESCRIPTION, STARTER_FOLDER_NAME |
|
|
from langflow.services.auth.utils import create_super_user |
|
|
from langflow.services.database.models.flow.model import Flow, FlowCreate |
|
|
from langflow.services.database.models.folder.model import Folder, FolderCreate |
|
|
from langflow.services.database.models.folder.utils import ( |
|
|
create_default_folder_if_it_doesnt_exist, |
|
|
get_default_folder_id, |
|
|
) |
|
|
from langflow.services.database.models.user.crud import get_user_by_username |
|
|
from langflow.services.deps import ( |
|
|
async_session_scope, |
|
|
get_settings_service, |
|
|
get_storage_service, |
|
|
get_variable_service, |
|
|
session_scope, |
|
|
) |
|
|
from langflow.template.field.prompt import DEFAULT_PROMPT_INTUT_TYPES |
|
|
from langflow.utils.util import escape_json_dump |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_projects_components_with_latest_component_versions(project_data, all_types_dict): |
|
|
|
|
|
all_types_dict_flat = {} |
|
|
for category in all_types_dict.values(): |
|
|
for key, component in category.items(): |
|
|
all_types_dict_flat[key] = component |
|
|
|
|
|
node_changes_log = defaultdict(list) |
|
|
project_data_copy = deepcopy(project_data) |
|
|
|
|
|
for node in project_data_copy.get("nodes", []): |
|
|
node_data = node.get("data").get("node") |
|
|
node_type = node.get("data").get("type") |
|
|
|
|
|
|
|
|
if node_data.get("tool_mode", False): |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
if any(output.get("types") == ["Tool"] for output in node_data.get("outputs", [])): |
|
|
continue |
|
|
|
|
|
if node_type in all_types_dict_flat: |
|
|
latest_node = all_types_dict_flat.get(node_type) |
|
|
latest_template = latest_node.get("template") |
|
|
node_data["template"]["code"] = latest_template["code"] |
|
|
|
|
|
if "outputs" in latest_node: |
|
|
node_data["outputs"] = latest_node["outputs"] |
|
|
if node_data["template"]["_type"] != latest_template["_type"]: |
|
|
node_data["template"]["_type"] = latest_template["_type"] |
|
|
if node_type != "Prompt": |
|
|
node_data["template"] = latest_template |
|
|
else: |
|
|
for key, value in latest_template.items(): |
|
|
if key not in node_data["template"]: |
|
|
node_changes_log[node_type].append( |
|
|
{ |
|
|
"attr": key, |
|
|
"old_value": None, |
|
|
"new_value": value, |
|
|
} |
|
|
) |
|
|
node_data["template"][key] = value |
|
|
elif isinstance(value, dict) and value.get("value"): |
|
|
node_changes_log[node_type].append( |
|
|
{ |
|
|
"attr": key, |
|
|
"old_value": node_data["template"][key], |
|
|
"new_value": value, |
|
|
} |
|
|
) |
|
|
node_data["template"][key]["value"] = value["value"] |
|
|
for key in node_data["template"]: |
|
|
if key not in latest_template: |
|
|
node_data["template"][key]["input_types"] = DEFAULT_PROMPT_INTUT_TYPES |
|
|
node_changes_log[node_type].append( |
|
|
{ |
|
|
"attr": "_type", |
|
|
"old_value": node_data["template"]["_type"], |
|
|
"new_value": latest_template["_type"], |
|
|
} |
|
|
) |
|
|
else: |
|
|
for attr in NODE_FORMAT_ATTRIBUTES: |
|
|
if ( |
|
|
attr in latest_node |
|
|
|
|
|
and latest_node[attr] != node_data.get(attr) |
|
|
): |
|
|
node_changes_log[node_type].append( |
|
|
{ |
|
|
"attr": attr, |
|
|
"old_value": node_data.get(attr), |
|
|
"new_value": latest_node[attr], |
|
|
} |
|
|
) |
|
|
node_data[attr] = latest_node[attr] |
|
|
|
|
|
for field_name, field_dict in latest_template.items(): |
|
|
if field_name not in node_data["template"]: |
|
|
node_data["template"][field_name] = field_dict |
|
|
continue |
|
|
|
|
|
to_check_attributes = FIELD_FORMAT_ATTRIBUTES |
|
|
for attr in to_check_attributes: |
|
|
if ( |
|
|
attr in field_dict |
|
|
and attr in node_data["template"].get(field_name) |
|
|
|
|
|
and field_dict[attr] != node_data["template"][field_name][attr] |
|
|
): |
|
|
node_changes_log[node_type].append( |
|
|
{ |
|
|
"attr": f"{field_name}.{attr}", |
|
|
"old_value": node_data["template"][field_name][attr], |
|
|
"new_value": field_dict[attr], |
|
|
} |
|
|
) |
|
|
node_data["template"][field_name][attr] = field_dict[attr] |
|
|
|
|
|
if node_type != "Prompt": |
|
|
for field_name in list(node_data["template"].keys()): |
|
|
if field_name not in latest_template: |
|
|
node_data["template"].pop(field_name) |
|
|
log_node_changes(node_changes_log) |
|
|
return project_data_copy |
|
|
|
|
|
|
|
|
def scape_json_parse(json_string: str) -> dict: |
|
|
if isinstance(json_string, dict): |
|
|
return json_string |
|
|
parsed_string = json_string.replace("œ", '"') |
|
|
return json.loads(parsed_string) |
|
|
|
|
|
|
|
|
def update_new_output(data): |
|
|
nodes = copy.deepcopy(data["nodes"]) |
|
|
edges = copy.deepcopy(data["edges"]) |
|
|
|
|
|
for edge in edges: |
|
|
if "sourceHandle" in edge and "targetHandle" in edge: |
|
|
new_source_handle = scape_json_parse(edge["sourceHandle"]) |
|
|
new_target_handle = scape_json_parse(edge["targetHandle"]) |
|
|
id_ = new_source_handle["id"] |
|
|
source_node_index = next((index for (index, d) in enumerate(nodes) if d["id"] == id_), -1) |
|
|
source_node = nodes[source_node_index] if source_node_index != -1 else None |
|
|
|
|
|
if "baseClasses" in new_source_handle: |
|
|
if "output_types" not in new_source_handle: |
|
|
if source_node and "node" in source_node["data"] and "output_types" in source_node["data"]["node"]: |
|
|
new_source_handle["output_types"] = source_node["data"]["node"]["output_types"] |
|
|
else: |
|
|
new_source_handle["output_types"] = new_source_handle["baseClasses"] |
|
|
del new_source_handle["baseClasses"] |
|
|
|
|
|
if new_target_handle.get("inputTypes"): |
|
|
intersection = [ |
|
|
type_ for type_ in new_source_handle["output_types"] if type_ in new_target_handle["inputTypes"] |
|
|
] |
|
|
else: |
|
|
intersection = [ |
|
|
type_ for type_ in new_source_handle["output_types"] if type_ == new_target_handle["type"] |
|
|
] |
|
|
|
|
|
selected = intersection[0] if intersection else None |
|
|
if "name" not in new_source_handle: |
|
|
new_source_handle["name"] = " | ".join(new_source_handle["output_types"]) |
|
|
new_source_handle["output_types"] = [selected] if selected else [] |
|
|
|
|
|
if source_node and not source_node["data"]["node"].get("outputs"): |
|
|
if "outputs" not in source_node["data"]["node"]: |
|
|
source_node["data"]["node"]["outputs"] = [] |
|
|
types = source_node["data"]["node"].get( |
|
|
"output_types", source_node["data"]["node"].get("base_classes", []) |
|
|
) |
|
|
if not any(output.get("selected") == selected for output in source_node["data"]["node"]["outputs"]): |
|
|
source_node["data"]["node"]["outputs"].append( |
|
|
{ |
|
|
"types": types, |
|
|
"selected": selected, |
|
|
"name": " | ".join(types), |
|
|
"display_name": " | ".join(types), |
|
|
} |
|
|
) |
|
|
deduplicated_outputs = [] |
|
|
if source_node is None: |
|
|
source_node = {"data": {"node": {"outputs": []}}} |
|
|
|
|
|
for output in source_node["data"]["node"]["outputs"]: |
|
|
if output["name"] not in [d["name"] for d in deduplicated_outputs]: |
|
|
deduplicated_outputs.append(output) |
|
|
source_node["data"]["node"]["outputs"] = deduplicated_outputs |
|
|
|
|
|
edge["sourceHandle"] = escape_json_dump(new_source_handle) |
|
|
edge["data"]["sourceHandle"] = new_source_handle |
|
|
edge["data"]["targetHandle"] = new_target_handle |
|
|
|
|
|
|
|
|
|
|
|
for node in nodes: |
|
|
if "outputs" in node["data"]["node"]: |
|
|
for output in node["data"]["node"]["outputs"]: |
|
|
for edge in edges: |
|
|
if node["id"] != edge["source"] or output.get("method") is None: |
|
|
continue |
|
|
source_handle = scape_json_parse(edge["sourceHandle"]) |
|
|
if source_handle["output_types"] == output.get("types") and source_handle["name"] != output["name"]: |
|
|
source_handle["name"] = output["name"] |
|
|
if isinstance(source_handle, str): |
|
|
source_handle = scape_json_parse(source_handle) |
|
|
edge["sourceHandle"] = escape_json_dump(source_handle) |
|
|
edge["data"]["sourceHandle"] = source_handle |
|
|
|
|
|
data_copy = copy.deepcopy(data) |
|
|
data_copy["nodes"] = nodes |
|
|
data_copy["edges"] = edges |
|
|
return data_copy |
|
|
|
|
|
|
|
|
def update_edges_with_latest_component_versions(project_data): |
|
|
edge_changes_log = defaultdict(list) |
|
|
project_data_copy = deepcopy(project_data) |
|
|
for edge in project_data_copy.get("edges", []): |
|
|
source_handle = edge.get("data").get("sourceHandle") |
|
|
source_handle = scape_json_parse(source_handle) |
|
|
target_handle = edge.get("data").get("targetHandle") |
|
|
target_handle = scape_json_parse(target_handle) |
|
|
|
|
|
source_node = next( |
|
|
(node for node in project_data.get("nodes", []) if node.get("id") == edge.get("source")), |
|
|
None, |
|
|
) |
|
|
target_node = next( |
|
|
(node for node in project_data.get("nodes", []) if node.get("id") == edge.get("target")), |
|
|
None, |
|
|
) |
|
|
if source_node and target_node: |
|
|
source_node_data = source_node.get("data").get("node") |
|
|
target_node_data = target_node.get("data").get("node") |
|
|
output_data = next( |
|
|
(output for output in source_node_data.get("outputs", []) if output["name"] == source_handle["name"]), |
|
|
None, |
|
|
) |
|
|
if not output_data: |
|
|
output_data = next( |
|
|
( |
|
|
output |
|
|
for output in source_node_data.get("outputs", []) |
|
|
if output["display_name"] == source_handle["name"] |
|
|
), |
|
|
None, |
|
|
) |
|
|
if output_data: |
|
|
source_handle["name"] = output_data["name"] |
|
|
if output_data: |
|
|
if len(output_data.get("types")) == 1: |
|
|
new_output_types = output_data.get("types") |
|
|
elif output_data.get("selected"): |
|
|
new_output_types = [output_data.get("selected")] |
|
|
else: |
|
|
new_output_types = [] |
|
|
else: |
|
|
new_output_types = [] |
|
|
|
|
|
if source_handle["output_types"] != new_output_types: |
|
|
edge_changes_log[source_node_data["display_name"]].append( |
|
|
{ |
|
|
"attr": "output_types", |
|
|
"old_value": source_handle["output_types"], |
|
|
"new_value": new_output_types, |
|
|
} |
|
|
) |
|
|
source_handle["output_types"] = new_output_types |
|
|
|
|
|
field_name = target_handle.get("fieldName") |
|
|
if field_name in target_node_data.get("template") and target_handle["inputTypes"] != target_node_data.get( |
|
|
"template" |
|
|
).get(field_name).get("input_types"): |
|
|
edge_changes_log[target_node_data["display_name"]].append( |
|
|
{ |
|
|
"attr": "inputTypes", |
|
|
"old_value": target_handle["inputTypes"], |
|
|
"new_value": target_node_data.get("template").get(field_name).get("input_types"), |
|
|
} |
|
|
) |
|
|
target_handle["inputTypes"] = target_node_data.get("template").get(field_name).get("input_types") |
|
|
escaped_source_handle = escape_json_dump(source_handle) |
|
|
escaped_target_handle = escape_json_dump(target_handle) |
|
|
try: |
|
|
old_escape_source_handle = escape_json_dump(json.loads(edge["sourceHandle"])) |
|
|
|
|
|
except json.JSONDecodeError: |
|
|
old_escape_source_handle = edge["sourceHandle"] |
|
|
|
|
|
try: |
|
|
old_escape_target_handle = escape_json_dump(json.loads(edge["targetHandle"])) |
|
|
except json.JSONDecodeError: |
|
|
old_escape_target_handle = edge["targetHandle"] |
|
|
if old_escape_source_handle != escaped_source_handle: |
|
|
edge_changes_log[source_node_data["display_name"]].append( |
|
|
{ |
|
|
"attr": "sourceHandle", |
|
|
"old_value": old_escape_source_handle, |
|
|
"new_value": escaped_source_handle, |
|
|
} |
|
|
) |
|
|
edge["sourceHandle"] = escaped_source_handle |
|
|
if old_escape_target_handle != escaped_target_handle: |
|
|
edge_changes_log[target_node_data["display_name"]].append( |
|
|
{ |
|
|
"attr": "targetHandle", |
|
|
"old_value": old_escape_target_handle, |
|
|
"new_value": escaped_target_handle, |
|
|
} |
|
|
) |
|
|
edge["targetHandle"] = escaped_target_handle |
|
|
|
|
|
else: |
|
|
logger.error(f"Source or target node not found for edge: {edge}") |
|
|
log_node_changes(edge_changes_log) |
|
|
return project_data_copy |
|
|
|
|
|
|
|
|
def log_node_changes(node_changes_log) -> None: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_messages = [] |
|
|
for node_name, changes in node_changes_log.items(): |
|
|
message = f"\nNode: {node_name} was updated with the following changes:" |
|
|
for change in changes: |
|
|
message += f"\n- {change['attr']}: {change['old_value']} -> {change['new_value']}" |
|
|
formatted_messages.append(message) |
|
|
if formatted_messages: |
|
|
logger.debug("\n".join(formatted_messages)) |
|
|
|
|
|
|
|
|
def load_starter_projects(retries=3, delay=1) -> list[tuple[Path, dict]]: |
|
|
starter_projects = [] |
|
|
folder = Path(__file__).parent / "starter_projects" |
|
|
for file in folder.glob("*.json"): |
|
|
attempt = 0 |
|
|
while attempt < retries: |
|
|
content = file.read_text(encoding="utf-8") |
|
|
try: |
|
|
project = orjson.loads(content) |
|
|
starter_projects.append((file, project)) |
|
|
logger.info(f"Loaded starter project {file}") |
|
|
break |
|
|
except orjson.JSONDecodeError as e: |
|
|
attempt += 1 |
|
|
if attempt >= retries: |
|
|
msg = f"Error loading starter project {file}: {e}" |
|
|
raise ValueError(msg) from e |
|
|
time.sleep(delay) |
|
|
return starter_projects |
|
|
|
|
|
|
|
|
def copy_profile_pictures() -> None: |
|
|
config_dir = get_storage_service().settings_service.settings.config_dir |
|
|
if config_dir is None: |
|
|
msg = "Config dir is not set in the settings" |
|
|
raise ValueError(msg) |
|
|
origin = Path(__file__).parent / "profile_pictures" |
|
|
target = Path(config_dir) / "profile_pictures" |
|
|
|
|
|
if not origin.exists(): |
|
|
msg = f"The source folder '{origin}' does not exist." |
|
|
raise ValueError(msg) |
|
|
|
|
|
if not target.exists(): |
|
|
target.mkdir(parents=True) |
|
|
|
|
|
try: |
|
|
shutil.copytree(origin, target, dirs_exist_ok=True) |
|
|
logger.debug(f"Folder copied from '{origin}' to '{target}'") |
|
|
|
|
|
except Exception: |
|
|
logger.exception("Error copying the folder") |
|
|
|
|
|
|
|
|
def get_project_data(project): |
|
|
project_name = project.get("name") |
|
|
project_description = project.get("description") |
|
|
project_is_component = project.get("is_component") |
|
|
project_updated_at = project.get("updated_at") |
|
|
if not project_updated_at: |
|
|
updated_at_datetime = datetime.now(tz=timezone.utc) |
|
|
else: |
|
|
updated_at_datetime = datetime.fromisoformat(project_updated_at) |
|
|
project_data = project.get("data") |
|
|
project_icon = project.get("icon") |
|
|
project_icon = demojize(project_icon) if project_icon and purely_emoji(project_icon) else project_icon |
|
|
project_icon_bg_color = project.get("icon_bg_color") |
|
|
project_gradient = project.get("gradient") |
|
|
project_tags = project.get("tags") |
|
|
return ( |
|
|
project_name, |
|
|
project_description, |
|
|
project_is_component, |
|
|
updated_at_datetime, |
|
|
project_data, |
|
|
project_icon, |
|
|
project_icon_bg_color, |
|
|
project_gradient, |
|
|
project_tags, |
|
|
) |
|
|
|
|
|
|
|
|
def update_project_file(project_path: Path, project: dict, updated_project_data) -> None: |
|
|
project["data"] = updated_project_data |
|
|
project_path.write_text(orjson.dumps(project, option=ORJSON_OPTIONS).decode(), encoding="utf-8") |
|
|
logger.info(f"Updated starter project {project['name']} file") |
|
|
|
|
|
|
|
|
def update_existing_project( |
|
|
existing_project, |
|
|
project_name, |
|
|
project_description, |
|
|
project_is_component, |
|
|
updated_at_datetime, |
|
|
project_data, |
|
|
project_icon, |
|
|
project_icon_bg_color, |
|
|
) -> None: |
|
|
logger.info(f"Updating starter project {project_name}") |
|
|
existing_project.data = project_data |
|
|
existing_project.folder = STARTER_FOLDER_NAME |
|
|
existing_project.description = project_description |
|
|
existing_project.is_component = project_is_component |
|
|
existing_project.updated_at = updated_at_datetime |
|
|
existing_project.icon = project_icon |
|
|
existing_project.icon_bg_color = project_icon_bg_color |
|
|
|
|
|
|
|
|
def create_new_project( |
|
|
session, |
|
|
project_name, |
|
|
project_description, |
|
|
project_is_component, |
|
|
updated_at_datetime, |
|
|
project_data, |
|
|
project_gradient, |
|
|
project_tags, |
|
|
project_icon, |
|
|
project_icon_bg_color, |
|
|
new_folder_id, |
|
|
) -> None: |
|
|
logger.debug(f"Creating starter project {project_name}") |
|
|
new_project = FlowCreate( |
|
|
name=project_name, |
|
|
description=project_description, |
|
|
icon=project_icon, |
|
|
icon_bg_color=project_icon_bg_color, |
|
|
data=project_data, |
|
|
is_component=project_is_component, |
|
|
updated_at=updated_at_datetime, |
|
|
folder_id=new_folder_id, |
|
|
gradient=project_gradient, |
|
|
tags=project_tags, |
|
|
) |
|
|
db_flow = Flow.model_validate(new_project, from_attributes=True) |
|
|
session.add(db_flow) |
|
|
|
|
|
|
|
|
def get_all_flows_similar_to_project(session, folder_id): |
|
|
return session.exec(select(Folder).where(Folder.id == folder_id)).first().flows |
|
|
|
|
|
|
|
|
def delete_start_projects(session, folder_id) -> None: |
|
|
flows = session.exec(select(Folder).where(Folder.id == folder_id)).first().flows |
|
|
for flow in flows: |
|
|
session.delete(flow) |
|
|
session.commit() |
|
|
|
|
|
|
|
|
def folder_exists(session, folder_name): |
|
|
folder = session.exec(select(Folder).where(Folder.name == folder_name)).first() |
|
|
return folder is not None |
|
|
|
|
|
|
|
|
def create_starter_folder(session): |
|
|
if not folder_exists(session, STARTER_FOLDER_NAME): |
|
|
new_folder = FolderCreate(name=STARTER_FOLDER_NAME, description=STARTER_FOLDER_DESCRIPTION) |
|
|
db_folder = Folder.model_validate(new_folder, from_attributes=True) |
|
|
session.add(db_folder) |
|
|
session.commit() |
|
|
session.refresh(db_folder) |
|
|
return db_folder |
|
|
return session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first() |
|
|
|
|
|
|
|
|
def _is_valid_uuid(val): |
|
|
try: |
|
|
uuid_obj = UUID(val) |
|
|
except ValueError: |
|
|
return False |
|
|
return str(uuid_obj) == val |
|
|
|
|
|
|
|
|
async def load_flows_from_directory() -> None: |
|
|
"""On langflow startup, this loads all flows from the directory specified in the settings. |
|
|
|
|
|
All flows are uploaded into the default folder for the superuser. |
|
|
Note that this feature currently only works if AUTO_LOGIN is enabled in the settings. |
|
|
""" |
|
|
settings_service = get_settings_service() |
|
|
flows_path = settings_service.settings.load_flows_path |
|
|
if not flows_path: |
|
|
return |
|
|
if not settings_service.auth_settings.AUTO_LOGIN: |
|
|
logger.warning("AUTO_LOGIN is disabled, not loading flows from directory") |
|
|
return |
|
|
|
|
|
async with async_session_scope() as session: |
|
|
user = await get_user_by_username(session, settings_service.auth_settings.SUPERUSER) |
|
|
if user is None: |
|
|
msg = "Superuser not found in the database" |
|
|
raise NoResultFound(msg) |
|
|
user_id = user.id |
|
|
flows_path_ = Path(flows_path) |
|
|
files = [f for f in flows_path_.iterdir() if f.is_file()] |
|
|
for file_path in files: |
|
|
if file_path.suffix != ".json": |
|
|
continue |
|
|
logger.info(f"Loading flow from file: {file_path.name}") |
|
|
async with async_open(file_path, "r", encoding="utf-8") as f: |
|
|
content = await f.read() |
|
|
flow = orjson.loads(content) |
|
|
no_json_name = file_path.stem |
|
|
flow_endpoint_name = flow.get("endpoint_name") |
|
|
if _is_valid_uuid(no_json_name): |
|
|
flow["id"] = no_json_name |
|
|
flow_id = flow.get("id") |
|
|
|
|
|
existing = await find_existing_flow(session, flow_id, flow_endpoint_name) |
|
|
if existing: |
|
|
logger.debug(f"Found existing flow: {existing.name}") |
|
|
logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}") |
|
|
for key, value in flow.items(): |
|
|
if hasattr(existing, key): |
|
|
|
|
|
setattr(existing, key, value) |
|
|
existing.updated_at = datetime.now(tz=timezone.utc).astimezone() |
|
|
existing.user_id = user_id |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if existing.folder_id is None: |
|
|
folder_id = await get_default_folder_id(session, user_id) |
|
|
existing.folder_id = folder_id |
|
|
|
|
|
session.add(existing) |
|
|
else: |
|
|
logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}") |
|
|
|
|
|
|
|
|
folder_id = await get_default_folder_id(session, user_id) |
|
|
|
|
|
flow["user_id"] = user_id |
|
|
flow["folder_id"] = folder_id |
|
|
flow = Flow.model_validate(flow, from_attributes=True) |
|
|
flow.updated_at = datetime.now(tz=timezone.utc).astimezone() |
|
|
session.add(flow) |
|
|
|
|
|
|
|
|
async def find_existing_flow(session, flow_id, flow_endpoint_name): |
|
|
if flow_endpoint_name: |
|
|
logger.debug(f"flow_endpoint_name: {flow_endpoint_name}") |
|
|
stmt = select(Flow).where(Flow.endpoint_name == flow_endpoint_name) |
|
|
if existing := (await session.exec(stmt)).first(): |
|
|
logger.debug(f"Found existing flow by endpoint name: {existing.name}") |
|
|
return existing |
|
|
stmt = select(Flow).where(Flow.id == flow_id) |
|
|
if existing := (await session.exec(stmt)).first(): |
|
|
logger.debug(f"Found existing flow by id: {flow_id}") |
|
|
return existing |
|
|
return None |
|
|
|
|
|
|
|
|
def create_or_update_starter_projects(all_types_dict: dict) -> None: |
|
|
with session_scope() as session: |
|
|
new_folder = create_starter_folder(session) |
|
|
starter_projects = load_starter_projects() |
|
|
delete_start_projects(session, new_folder.id) |
|
|
copy_profile_pictures() |
|
|
for project_path, project in starter_projects: |
|
|
( |
|
|
project_name, |
|
|
project_description, |
|
|
project_is_component, |
|
|
updated_at_datetime, |
|
|
project_data, |
|
|
project_icon, |
|
|
project_icon_bg_color, |
|
|
project_gradient, |
|
|
project_tags, |
|
|
) = get_project_data(project) |
|
|
updated_project_data = update_projects_components_with_latest_component_versions( |
|
|
project_data.copy(), all_types_dict |
|
|
) |
|
|
updated_project_data = update_edges_with_latest_component_versions(updated_project_data) |
|
|
if updated_project_data != project_data: |
|
|
project_data = updated_project_data |
|
|
|
|
|
|
|
|
update_project_file(project_path, project, updated_project_data) |
|
|
if project_name and project_data: |
|
|
for existing_project in get_all_flows_similar_to_project(session, new_folder.id): |
|
|
session.delete(existing_project) |
|
|
|
|
|
create_new_project( |
|
|
session=session, |
|
|
project_name=project_name, |
|
|
project_description=project_description, |
|
|
project_is_component=project_is_component, |
|
|
updated_at_datetime=updated_at_datetime, |
|
|
project_data=project_data, |
|
|
project_icon=project_icon, |
|
|
project_icon_bg_color=project_icon_bg_color, |
|
|
project_gradient=project_gradient, |
|
|
project_tags=project_tags, |
|
|
new_folder_id=new_folder.id, |
|
|
) |
|
|
|
|
|
|
|
|
async def initialize_super_user_if_needed() -> None: |
|
|
settings_service = get_settings_service() |
|
|
if not settings_service.auth_settings.AUTO_LOGIN: |
|
|
return |
|
|
username = settings_service.auth_settings.SUPERUSER |
|
|
password = settings_service.auth_settings.SUPERUSER_PASSWORD |
|
|
if not username or not password: |
|
|
msg = "SUPERUSER and SUPERUSER_PASSWORD must be set in the settings if AUTO_LOGIN is true." |
|
|
raise ValueError(msg) |
|
|
|
|
|
async with async_session_scope() as async_session: |
|
|
super_user = await create_super_user(db=async_session, username=username, password=password) |
|
|
await get_variable_service().initialize_user_variables(super_user.id, async_session) |
|
|
await create_default_folder_if_it_doesnt_exist(async_session, super_user.id) |
|
|
logger.info("Super user initialized") |
|
|
|