|
|
from __future__ import annotations |
|
|
|
|
|
import io |
|
|
import json |
|
|
import re |
|
|
import zipfile |
|
|
from datetime import datetime, timezone |
|
|
from typing import Annotated |
|
|
from uuid import UUID |
|
|
|
|
|
import orjson |
|
|
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile |
|
|
from fastapi.encoders import jsonable_encoder |
|
|
from fastapi.responses import StreamingResponse |
|
|
from fastapi_pagination import Page, Params, add_pagination |
|
|
from fastapi_pagination.ext.sqlalchemy import paginate |
|
|
from sqlmodel import and_, col, select |
|
|
from sqlmodel.ext.asyncio.session import AsyncSession |
|
|
|
|
|
from langflow.api.utils import ( |
|
|
CurrentActiveUser, |
|
|
DbSession, |
|
|
cascade_delete_flow, |
|
|
remove_api_keys, |
|
|
validate_is_component, |
|
|
) |
|
|
from langflow.api.v1.schemas import FlowListCreate |
|
|
from langflow.initial_setup.constants import STARTER_FOLDER_NAME |
|
|
from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead, FlowUpdate |
|
|
from langflow.services.database.models.flow.model import FlowHeader |
|
|
from langflow.services.database.models.flow.utils import get_webhook_component_in_flow |
|
|
from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME |
|
|
from langflow.services.database.models.folder.model import Folder |
|
|
from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id |
|
|
from langflow.services.database.models.vertex_builds.crud import get_vertex_builds_by_flow_id |
|
|
from langflow.services.deps import get_settings_service |
|
|
from langflow.services.settings.service import SettingsService |
|
|
|
|
|
|
|
|
router = APIRouter(prefix="/flows", tags=["Flows"]) |
|
|
|
|
|
|
|
|
async def _new_flow( |
|
|
*, |
|
|
session: AsyncSession, |
|
|
flow: FlowCreate, |
|
|
user_id: UUID, |
|
|
): |
|
|
try: |
|
|
"""Create a new flow.""" |
|
|
if flow.user_id is None: |
|
|
flow.user_id = user_id |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (await session.exec(select(Flow).where(Flow.name == flow.name).where(Flow.user_id == user_id))).first(): |
|
|
flows = ( |
|
|
await session.exec( |
|
|
select(Flow).where(Flow.name.like(f"{flow.name} (%")).where(Flow.user_id == user_id) |
|
|
) |
|
|
).all() |
|
|
if flows: |
|
|
extract_number = re.compile(r"\((\d+)\)$") |
|
|
numbers = [] |
|
|
for _flow in flows: |
|
|
result = extract_number.search(_flow.name) |
|
|
if result: |
|
|
numbers.append(int(result.groups(1)[0])) |
|
|
if numbers: |
|
|
flow.name = f"{flow.name} ({max(numbers) + 1})" |
|
|
else: |
|
|
flow.name = f"{flow.name} (1)" |
|
|
|
|
|
if ( |
|
|
flow.endpoint_name |
|
|
and ( |
|
|
await session.exec( |
|
|
select(Flow).where(Flow.endpoint_name == flow.endpoint_name).where(Flow.user_id == user_id) |
|
|
) |
|
|
).first() |
|
|
): |
|
|
flows = ( |
|
|
await session.exec( |
|
|
select(Flow) |
|
|
.where(Flow.endpoint_name.like(f"{flow.endpoint_name}-%")) |
|
|
.where(Flow.user_id == user_id) |
|
|
) |
|
|
).all() |
|
|
if flows: |
|
|
|
|
|
|
|
|
|
|
|
numbers = [int(flow.endpoint_name.split("-")[-1]) for flow in flows] |
|
|
flow.endpoint_name = f"{flow.endpoint_name}-{max(numbers) + 1}" |
|
|
else: |
|
|
flow.endpoint_name = f"{flow.endpoint_name}-1" |
|
|
|
|
|
db_flow = Flow.model_validate(flow, from_attributes=True) |
|
|
db_flow.updated_at = datetime.now(timezone.utc) |
|
|
|
|
|
if db_flow.folder_id is None: |
|
|
|
|
|
default_folder = ( |
|
|
await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME, Folder.user_id == user_id)) |
|
|
).first() |
|
|
if default_folder: |
|
|
db_flow.folder_id = default_folder.id |
|
|
|
|
|
session.add(db_flow) |
|
|
except Exception as e: |
|
|
|
|
|
if hasattr(e, "errors"): |
|
|
raise HTTPException(status_code=400, detail=str(e)) from e |
|
|
if isinstance(e, HTTPException): |
|
|
raise |
|
|
raise HTTPException(status_code=500, detail=str(e)) from e |
|
|
|
|
|
return db_flow |
|
|
|
|
|
|
|
|
@router.post("/", response_model=FlowRead, status_code=201) |
|
|
async def create_flow( |
|
|
*, |
|
|
session: DbSession, |
|
|
flow: FlowCreate, |
|
|
current_user: CurrentActiveUser, |
|
|
): |
|
|
try: |
|
|
db_flow = await _new_flow(session=session, flow=flow, user_id=current_user.id) |
|
|
await session.commit() |
|
|
await session.refresh(db_flow) |
|
|
except Exception as e: |
|
|
if "UNIQUE constraint failed" in str(e): |
|
|
|
|
|
columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0] |
|
|
|
|
|
|
|
|
|
|
|
column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0] |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique" |
|
|
) from e |
|
|
if isinstance(e, HTTPException): |
|
|
raise |
|
|
raise HTTPException(status_code=500, detail=str(e)) from e |
|
|
return db_flow |
|
|
|
|
|
|
|
|
@router.get("/", response_model=list[FlowRead] | Page[FlowRead] | list[FlowHeader], status_code=200) |
|
|
async def read_flows( |
|
|
*, |
|
|
current_user: CurrentActiveUser, |
|
|
session: DbSession, |
|
|
remove_example_flows: bool = False, |
|
|
components_only: bool = False, |
|
|
get_all: bool = True, |
|
|
folder_id: UUID | None = None, |
|
|
params: Annotated[Params, Depends()], |
|
|
header_flows: bool = False, |
|
|
): |
|
|
"""Retrieve a list of flows with pagination support. |
|
|
|
|
|
Args: |
|
|
current_user (User): The current authenticated user. |
|
|
session (Session): The database session. |
|
|
settings_service (SettingsService): The settings service. |
|
|
components_only (bool, optional): Whether to return only components. Defaults to False. |
|
|
|
|
|
get_all (bool, optional): Whether to return all flows without pagination. Defaults to True. |
|
|
**This field must be True because of backward compatibility with the frontend - Release: 1.0.20** |
|
|
|
|
|
folder_id (UUID, optional): The folder ID. Defaults to None. |
|
|
params (Params): Pagination parameters. |
|
|
remove_example_flows (bool, optional): Whether to remove example flows. Defaults to False. |
|
|
header_flows (bool, optional): Whether to return only specific headers of the flows. Defaults to False. |
|
|
|
|
|
Returns: |
|
|
list[FlowRead] | Page[FlowRead] | list[FlowHeader] |
|
|
A list of flows or a paginated response containing the list of flows or a list of flow headers. |
|
|
""" |
|
|
try: |
|
|
auth_settings = get_settings_service().auth_settings |
|
|
|
|
|
default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() |
|
|
default_folder_id = default_folder.id if default_folder else None |
|
|
|
|
|
starter_folder = (await session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME))).first() |
|
|
starter_folder_id = starter_folder.id if starter_folder else None |
|
|
|
|
|
if not starter_folder and not default_folder: |
|
|
raise HTTPException( |
|
|
status_code=404, |
|
|
detail="Starter folder and default folder not found. Please create a folder and add flows to it.", |
|
|
) |
|
|
|
|
|
if not folder_id: |
|
|
folder_id = default_folder_id |
|
|
|
|
|
if auth_settings.AUTO_LOGIN: |
|
|
stmt = select(Flow).where( |
|
|
(Flow.user_id == None) | (Flow.user_id == current_user.id) |
|
|
) |
|
|
else: |
|
|
stmt = select(Flow).where(Flow.user_id == current_user.id) |
|
|
|
|
|
if remove_example_flows: |
|
|
stmt = stmt.where(Flow.folder_id != starter_folder_id) |
|
|
|
|
|
if components_only: |
|
|
stmt = stmt.where(Flow.is_component == True) |
|
|
|
|
|
if get_all: |
|
|
flows = (await session.exec(stmt)).all() |
|
|
flows = validate_is_component(flows) |
|
|
if components_only: |
|
|
flows = [flow for flow in flows if flow.is_component] |
|
|
if remove_example_flows and starter_folder_id: |
|
|
flows = [flow for flow in flows if flow.folder_id != starter_folder_id] |
|
|
if header_flows: |
|
|
return [ |
|
|
{ |
|
|
"id": flow.id, |
|
|
"name": flow.name, |
|
|
"folder_id": flow.folder_id, |
|
|
"is_component": flow.is_component, |
|
|
"description": flow.description, |
|
|
} |
|
|
for flow in flows |
|
|
] |
|
|
return flows |
|
|
|
|
|
stmt = stmt.where(Flow.folder_id == folder_id) |
|
|
return await paginate(session, stmt, params=params) |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) from e |
|
|
|
|
|
|
|
|
async def _read_flow( |
|
|
session: AsyncSession, |
|
|
flow_id: UUID, |
|
|
user_id: UUID, |
|
|
settings_service: SettingsService, |
|
|
): |
|
|
"""Read a flow.""" |
|
|
auth_settings = settings_service.auth_settings |
|
|
stmt = select(Flow).where(Flow.id == flow_id) |
|
|
if auth_settings.AUTO_LOGIN: |
|
|
|
|
|
|
|
|
stmt = stmt.where( |
|
|
(Flow.user_id == user_id) | (Flow.user_id == None) |
|
|
) |
|
|
return (await session.exec(stmt)).first() |
|
|
|
|
|
|
|
|
@router.get("/{flow_id}", response_model=FlowRead, status_code=200) |
|
|
async def read_flow( |
|
|
*, |
|
|
session: DbSession, |
|
|
flow_id: UUID, |
|
|
current_user: CurrentActiveUser, |
|
|
): |
|
|
"""Read a flow.""" |
|
|
if user_flow := await _read_flow(session, flow_id, current_user.id, get_settings_service()): |
|
|
return user_flow |
|
|
raise HTTPException(status_code=404, detail="Flow not found") |
|
|
|
|
|
|
|
|
@router.patch("/{flow_id}", response_model=FlowRead, status_code=200) |
|
|
async def update_flow( |
|
|
*, |
|
|
session: DbSession, |
|
|
flow_id: UUID, |
|
|
flow: FlowUpdate, |
|
|
current_user: CurrentActiveUser, |
|
|
): |
|
|
"""Update a flow.""" |
|
|
settings_service = get_settings_service() |
|
|
try: |
|
|
db_flow = await _read_flow( |
|
|
session=session, |
|
|
flow_id=flow_id, |
|
|
user_id=current_user.id, |
|
|
settings_service=settings_service, |
|
|
) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) from e |
|
|
|
|
|
if not db_flow: |
|
|
raise HTTPException(status_code=404, detail="Flow not found") |
|
|
|
|
|
try: |
|
|
flow_data = flow.model_dump(exclude_unset=True) |
|
|
if settings_service.settings.remove_api_keys: |
|
|
flow_data = remove_api_keys(flow_data) |
|
|
for key, value in flow_data.items(): |
|
|
setattr(db_flow, key, value) |
|
|
webhook_component = get_webhook_component_in_flow(db_flow.data) |
|
|
db_flow.webhook = webhook_component is not None |
|
|
db_flow.updated_at = datetime.now(timezone.utc) |
|
|
|
|
|
if db_flow.folder_id is None: |
|
|
default_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() |
|
|
if default_folder: |
|
|
db_flow.folder_id = default_folder.id |
|
|
session.add(db_flow) |
|
|
await session.commit() |
|
|
await session.refresh(db_flow) |
|
|
except Exception as e: |
|
|
|
|
|
if hasattr(e, "errors"): |
|
|
raise HTTPException(status_code=400, detail=str(e)) from e |
|
|
if "UNIQUE constraint failed" in str(e): |
|
|
|
|
|
columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0] |
|
|
|
|
|
|
|
|
|
|
|
column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0] |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique" |
|
|
) from e |
|
|
raise HTTPException(status_code=500, detail=str(e)) from e |
|
|
|
|
|
return db_flow |
|
|
|
|
|
|
|
|
@router.delete("/{flow_id}", status_code=200) |
|
|
async def delete_flow( |
|
|
*, |
|
|
session: DbSession, |
|
|
flow_id: UUID, |
|
|
current_user: CurrentActiveUser, |
|
|
): |
|
|
"""Delete a flow.""" |
|
|
flow = await _read_flow( |
|
|
session=session, |
|
|
flow_id=flow_id, |
|
|
user_id=current_user.id, |
|
|
settings_service=get_settings_service(), |
|
|
) |
|
|
if not flow: |
|
|
raise HTTPException(status_code=404, detail="Flow not found") |
|
|
await cascade_delete_flow(session, flow.id) |
|
|
await session.commit() |
|
|
return {"message": "Flow deleted successfully"} |
|
|
|
|
|
|
|
|
@router.post("/batch/", response_model=list[FlowRead], status_code=201) |
|
|
async def create_flows( |
|
|
*, |
|
|
session: DbSession, |
|
|
flow_list: FlowListCreate, |
|
|
current_user: CurrentActiveUser, |
|
|
): |
|
|
"""Create multiple new flows.""" |
|
|
db_flows = [] |
|
|
for flow in flow_list.flows: |
|
|
flow.user_id = current_user.id |
|
|
db_flow = Flow.model_validate(flow, from_attributes=True) |
|
|
session.add(db_flow) |
|
|
db_flows.append(db_flow) |
|
|
await session.commit() |
|
|
for db_flow in db_flows: |
|
|
await session.refresh(db_flow) |
|
|
return db_flows |
|
|
|
|
|
|
|
|
@router.post("/upload/", response_model=list[FlowRead], status_code=201) |
|
|
async def upload_file( |
|
|
*, |
|
|
session: DbSession, |
|
|
file: Annotated[UploadFile, File(...)], |
|
|
current_user: CurrentActiveUser, |
|
|
folder_id: UUID | None = None, |
|
|
): |
|
|
"""Upload flows from a file.""" |
|
|
contents = await file.read() |
|
|
data = orjson.loads(contents) |
|
|
response_list = [] |
|
|
flow_list = FlowListCreate(**data) if "flows" in data else FlowListCreate(flows=[FlowCreate(**data)]) |
|
|
|
|
|
for flow in flow_list.flows: |
|
|
flow.user_id = current_user.id |
|
|
if folder_id: |
|
|
flow.folder_id = folder_id |
|
|
response = await _new_flow(session=session, flow=flow, user_id=current_user.id) |
|
|
response_list.append(response) |
|
|
|
|
|
try: |
|
|
await session.commit() |
|
|
for db_flow in response_list: |
|
|
await session.refresh(db_flow) |
|
|
except Exception as e: |
|
|
if "UNIQUE constraint failed" in str(e): |
|
|
|
|
|
columns = str(e).split("UNIQUE constraint failed: ")[1].split(".")[1].split("\n")[0] |
|
|
|
|
|
|
|
|
|
|
|
column = columns.split(",")[1] if "id" in columns.split(",")[0] else columns.split(",")[0] |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=400, detail=f"{column.capitalize().replace('_', ' ')} must be unique" |
|
|
) from e |
|
|
if isinstance(e, HTTPException): |
|
|
raise |
|
|
raise HTTPException(status_code=500, detail=str(e)) from e |
|
|
|
|
|
return response_list |
|
|
|
|
|
|
|
|
@router.delete("/") |
|
|
async def delete_multiple_flows( |
|
|
flow_ids: list[UUID], |
|
|
user: CurrentActiveUser, |
|
|
db: DbSession, |
|
|
): |
|
|
"""Delete multiple flows by their IDs. |
|
|
|
|
|
Args: |
|
|
flow_ids (List[str]): The list of flow IDs to delete. |
|
|
user (User, optional): The user making the request. Defaults to the current active user. |
|
|
db (Session, optional): The database session. |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary containing the number of flows deleted. |
|
|
|
|
|
""" |
|
|
try: |
|
|
flows_to_delete = ( |
|
|
await db.exec(select(Flow).where(col(Flow.id).in_(flow_ids)).where(Flow.user_id == user.id)) |
|
|
).all() |
|
|
for flow in flows_to_delete: |
|
|
transactions_to_delete = await get_transactions_by_flow_id(db, flow.id) |
|
|
for transaction in transactions_to_delete: |
|
|
await db.delete(transaction) |
|
|
|
|
|
builds_to_delete = await get_vertex_builds_by_flow_id(db, flow.id) |
|
|
for build in builds_to_delete: |
|
|
await db.delete(build) |
|
|
|
|
|
await db.delete(flow) |
|
|
|
|
|
await db.commit() |
|
|
return {"deleted": len(flows_to_delete)} |
|
|
except Exception as exc: |
|
|
raise HTTPException(status_code=500, detail=str(exc)) from exc |
|
|
|
|
|
|
|
|
@router.post("/download/", status_code=200) |
|
|
async def download_multiple_file( |
|
|
flow_ids: list[UUID], |
|
|
user: CurrentActiveUser, |
|
|
db: DbSession, |
|
|
): |
|
|
"""Download all flows as a zip file.""" |
|
|
flows = (await db.exec(select(Flow).where(and_(Flow.user_id == user.id, Flow.id.in_(flow_ids))))).all() |
|
|
|
|
|
if not flows: |
|
|
raise HTTPException(status_code=404, detail="No flows found.") |
|
|
|
|
|
flows_without_api_keys = [remove_api_keys(flow.model_dump()) for flow in flows] |
|
|
|
|
|
if len(flows_without_api_keys) > 1: |
|
|
|
|
|
zip_stream = io.BytesIO() |
|
|
|
|
|
|
|
|
with zipfile.ZipFile(zip_stream, "w") as zip_file: |
|
|
for flow in flows_without_api_keys: |
|
|
|
|
|
flow_json = json.dumps(jsonable_encoder(flow)) |
|
|
|
|
|
|
|
|
zip_file.writestr(f"{flow['name']}.json", flow_json) |
|
|
|
|
|
|
|
|
zip_stream.seek(0) |
|
|
|
|
|
|
|
|
current_time = datetime.now(tz=timezone.utc).astimezone().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"{current_time}_langflow_flows.zip" |
|
|
|
|
|
return StreamingResponse( |
|
|
zip_stream, |
|
|
media_type="application/x-zip-compressed", |
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"}, |
|
|
) |
|
|
return flows_without_api_keys[0] |
|
|
|
|
|
|
|
|
@router.get("/basic_examples/", response_model=list[FlowRead], status_code=200) |
|
|
async def read_basic_examples( |
|
|
*, |
|
|
session: DbSession, |
|
|
): |
|
|
"""Retrieve a list of basic example flows. |
|
|
|
|
|
Args: |
|
|
session (Session): The database session. |
|
|
|
|
|
Returns: |
|
|
list[FlowRead]: A list of basic example flows. |
|
|
""" |
|
|
try: |
|
|
|
|
|
starter_folder = (await session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME))).first() |
|
|
|
|
|
if not starter_folder: |
|
|
return [] |
|
|
|
|
|
|
|
|
return (await session.exec(select(Flow).where(Flow.folder_id == starter_folder.id))).all() |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) from e |
|
|
|
|
|
|
|
|
add_pagination(router) |
|
|
|