| | import io |
| | import json |
| | import zipfile |
| | from datetime import datetime, timezone |
| | from typing import Annotated |
| |
|
| | import orjson |
| | from fastapi import APIRouter, Depends, File, HTTPException, Response, UploadFile, status |
| | from fastapi.encoders import jsonable_encoder |
| | from fastapi.responses import StreamingResponse |
| | from fastapi_pagination import Params |
| | from fastapi_pagination.ext.sqlmodel import paginate |
| | from sqlalchemy import or_, update |
| | from sqlalchemy.orm import selectinload |
| | from sqlmodel import select |
| |
|
| | from langflow.api.utils import CurrentActiveUser, DbSession, cascade_delete_flow, custom_params, remove_api_keys |
| | from langflow.api.v1.flows import create_flows |
| | from langflow.api.v1.schemas import FlowListCreate |
| | from langflow.helpers.flow import generate_unique_flow_name |
| | from langflow.helpers.folders import generate_unique_folder_name |
| | from langflow.initial_setup.constants import STARTER_FOLDER_NAME |
| | from langflow.services.database.models.flow.model import Flow, FlowCreate, FlowRead |
| | from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME |
| | from langflow.services.database.models.folder.model import ( |
| | Folder, |
| | FolderCreate, |
| | FolderRead, |
| | FolderReadWithFlows, |
| | FolderUpdate, |
| | ) |
| | from langflow.services.database.models.folder.pagination_model import FolderWithPaginatedFlows |
| |
|
| | router = APIRouter(prefix="/folders", tags=["Folders"]) |
| |
|
| |
|
| | @router.post("/", response_model=FolderRead, status_code=201) |
| | async def create_folder( |
| | *, |
| | session: DbSession, |
| | folder: FolderCreate, |
| | current_user: CurrentActiveUser, |
| | ): |
| | try: |
| | new_folder = Folder.model_validate(folder, from_attributes=True) |
| | new_folder.user_id = current_user.id |
| | |
| | |
| | |
| | |
| | |
| | if ( |
| | await session.exec( |
| | statement=select(Folder).where(Folder.name == new_folder.name).where(Folder.user_id == current_user.id) |
| | ) |
| | ).first(): |
| | folder_results = await session.exec( |
| | select(Folder).where( |
| | Folder.name.like(f"{new_folder.name}%"), |
| | Folder.user_id == current_user.id, |
| | ) |
| | ) |
| | if folder_results: |
| | folder_names = [folder.name for folder in folder_results] |
| | folder_numbers = [int(name.split("(")[-1].split(")")[0]) for name in folder_names if "(" in name] |
| | if folder_numbers: |
| | new_folder.name = f"{new_folder.name} ({max(folder_numbers) + 1})" |
| | else: |
| | new_folder.name = f"{new_folder.name} (1)" |
| |
|
| | session.add(new_folder) |
| | await session.commit() |
| | await session.refresh(new_folder) |
| |
|
| | if folder.components_list: |
| | update_statement_components = ( |
| | update(Flow).where(Flow.id.in_(folder.components_list)).values(folder_id=new_folder.id) |
| | ) |
| | await session.exec(update_statement_components) |
| | await session.commit() |
| |
|
| | if folder.flows_list: |
| | update_statement_flows = update(Flow).where(Flow.id.in_(folder.flows_list)).values(folder_id=new_folder.id) |
| | await session.exec(update_statement_flows) |
| | await session.commit() |
| |
|
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) from e |
| |
|
| | return new_folder |
| |
|
| |
|
| | @router.get("/", response_model=list[FolderRead], status_code=200) |
| | async def read_folders( |
| | *, |
| | session: DbSession, |
| | current_user: CurrentActiveUser, |
| | ): |
| | try: |
| | folders = ( |
| | await session.exec( |
| | select(Folder).where( |
| | or_(Folder.user_id == current_user.id, Folder.user_id == None) |
| | ) |
| | ) |
| | ).all() |
| | folders = [folder for folder in folders if folder.name != STARTER_FOLDER_NAME] |
| | return sorted(folders, key=lambda x: x.name != DEFAULT_FOLDER_NAME) |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) from e |
| |
|
| |
|
| | @router.get("/{folder_id}", response_model=FolderWithPaginatedFlows | FolderReadWithFlows, status_code=200) |
| | async def read_folder( |
| | *, |
| | session: DbSession, |
| | folder_id: str, |
| | current_user: CurrentActiveUser, |
| | params: Annotated[Params | None, Depends(custom_params)], |
| | is_component: bool = False, |
| | is_flow: bool = False, |
| | search: str = "", |
| | ): |
| | try: |
| | folder = ( |
| | await session.exec( |
| | select(Folder) |
| | .options(selectinload(Folder.flows)) |
| | .where(Folder.id == folder_id, Folder.user_id == current_user.id) |
| | ) |
| | ).first() |
| | except Exception as e: |
| | if "No result found" in str(e): |
| | raise HTTPException(status_code=404, detail="Folder not found") from e |
| | raise HTTPException(status_code=500, detail=str(e)) from e |
| |
|
| | if not folder: |
| | raise HTTPException(status_code=404, detail="Folder not found") |
| |
|
| | try: |
| | if params and params.page and params.size: |
| | stmt = select(Flow).where(Flow.folder_id == folder_id) |
| |
|
| | if Flow.updated_at is not None: |
| | stmt = stmt.order_by(Flow.updated_at.desc()) |
| | if is_component: |
| | stmt = stmt.where(Flow.is_component == True) |
| | if is_flow: |
| | stmt = stmt.where(Flow.is_component == False) |
| | if search: |
| | stmt = stmt.where(Flow.name.like(f"%{search}%")) |
| | paginated_flows = await paginate(session, stmt, params=params) |
| |
|
| | return FolderWithPaginatedFlows(folder=FolderRead.model_validate(folder), flows=paginated_flows) |
| |
|
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) from e |
| |
|
| | flows_from_current_user_in_folder = [flow for flow in folder.flows if flow.user_id == current_user.id] |
| | folder.flows = flows_from_current_user_in_folder |
| | return folder |
| |
|
| |
|
| | @router.patch("/{folder_id}", response_model=FolderRead, status_code=200) |
| | async def update_folder( |
| | *, |
| | session: DbSession, |
| | folder_id: str, |
| | folder: FolderUpdate, |
| | current_user: CurrentActiveUser, |
| | ): |
| | try: |
| | existing_folder = ( |
| | await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)) |
| | ).first() |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) from e |
| |
|
| | if not existing_folder: |
| | raise HTTPException(status_code=404, detail="Folder not found") |
| |
|
| | try: |
| | if folder.name and folder.name != existing_folder.name: |
| | existing_folder.name = folder.name |
| | session.add(existing_folder) |
| | await session.commit() |
| | await session.refresh(existing_folder) |
| | return existing_folder |
| |
|
| | folder_data = existing_folder.model_dump(exclude_unset=True) |
| | for key, value in folder_data.items(): |
| | if key not in {"components", "flows"}: |
| | setattr(existing_folder, key, value) |
| | session.add(existing_folder) |
| | await session.commit() |
| | await session.refresh(existing_folder) |
| |
|
| | concat_folder_components = folder.components + folder.flows |
| |
|
| | flows_ids = (await session.exec(select(Flow.id).where(Flow.folder_id == existing_folder.id))).all() |
| |
|
| | excluded_flows = list(set(flows_ids) - set(concat_folder_components)) |
| |
|
| | my_collection_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() |
| | if my_collection_folder: |
| | update_statement_my_collection = ( |
| | update(Flow).where(Flow.id.in_(excluded_flows)).values(folder_id=my_collection_folder.id) |
| | ) |
| | await session.exec(update_statement_my_collection) |
| | await session.commit() |
| |
|
| | if concat_folder_components: |
| | update_statement_components = ( |
| | update(Flow).where(Flow.id.in_(concat_folder_components)).values(folder_id=existing_folder.id) |
| | ) |
| | await session.exec(update_statement_components) |
| | await session.commit() |
| |
|
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) from e |
| |
|
| | return existing_folder |
| |
|
| |
|
| | @router.delete("/{folder_id}", status_code=204) |
| | async def delete_folder( |
| | *, |
| | session: DbSession, |
| | folder_id: str, |
| | current_user: CurrentActiveUser, |
| | ): |
| | try: |
| | flows = ( |
| | await session.exec(select(Flow).where(Flow.folder_id == folder_id, Flow.user_id == current_user.id)) |
| | ).all() |
| | if len(flows) > 0: |
| | for flow in flows: |
| | await cascade_delete_flow(session, flow.id) |
| |
|
| | folder = ( |
| | await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)) |
| | ).first() |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) from e |
| |
|
| | if not folder: |
| | raise HTTPException(status_code=404, detail="Folder not found") |
| |
|
| | try: |
| | await session.delete(folder) |
| | await session.commit() |
| | return Response(status_code=status.HTTP_204_NO_CONTENT) |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) from e |
| |
|
| |
|
| | @router.get("/download/{folder_id}", status_code=200) |
| | async def download_file( |
| | *, |
| | session: DbSession, |
| | folder_id: str, |
| | current_user: CurrentActiveUser, |
| | ): |
| | """Download all flows from folder as a zip file.""" |
| | try: |
| | query = select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id) |
| | result = await session.exec(query) |
| | folder = result.first() |
| |
|
| | if not folder: |
| | raise HTTPException(status_code=404, detail="Folder not found") |
| |
|
| | flows_query = select(Flow).where(Flow.folder_id == folder_id) |
| | flows_result = await session.exec(flows_query) |
| | flows = [FlowRead.model_validate(flow, from_attributes=True) for flow in flows_result.all()] |
| |
|
| | if not flows: |
| | raise HTTPException(status_code=404, detail="No flows found in folder") |
| |
|
| | flows_without_api_keys = [remove_api_keys(flow.model_dump()) for flow in flows] |
| | zip_stream = io.BytesIO() |
| |
|
| | with zipfile.ZipFile(zip_stream, "w") as zip_file: |
| | for flow in flows_without_api_keys: |
| | flow_json = json.dumps(jsonable_encoder(flow)) |
| | zip_file.writestr(f"{flow['name']}.json", flow_json) |
| |
|
| | zip_stream.seek(0) |
| |
|
| | current_time = datetime.now(tz=timezone.utc).astimezone().strftime("%Y%m%d_%H%M%S") |
| | filename = f"{current_time}_{folder.name}_flows.zip" |
| |
|
| | return StreamingResponse( |
| | zip_stream, |
| | media_type="application/x-zip-compressed", |
| | headers={"Content-Disposition": f"attachment; filename={filename}"}, |
| | ) |
| |
|
| | except Exception as e: |
| | if "No result found" in str(e): |
| | raise HTTPException(status_code=404, detail="Folder not found") from e |
| | raise HTTPException(status_code=500, detail=str(e)) from e |
| |
|
| |
|
| | @router.post("/upload/", response_model=list[FlowRead], status_code=201) |
| | async def upload_file( |
| | *, |
| | session: DbSession, |
| | file: Annotated[UploadFile, File(...)], |
| | current_user: CurrentActiveUser, |
| | ): |
| | """Upload flows from a file.""" |
| | contents = await file.read() |
| | data = orjson.loads(contents) |
| |
|
| | if not data: |
| | raise HTTPException(status_code=400, detail="No flows found in the file") |
| |
|
| | folder_name = await generate_unique_folder_name(data["folder_name"], current_user.id, session) |
| |
|
| | data["folder_name"] = folder_name |
| |
|
| | folder = FolderCreate(name=data["folder_name"], description=data["folder_description"]) |
| |
|
| | new_folder = Folder.model_validate(folder, from_attributes=True) |
| | new_folder.id = None |
| | new_folder.user_id = current_user.id |
| | session.add(new_folder) |
| | await session.commit() |
| | await session.refresh(new_folder) |
| |
|
| | del data["folder_name"] |
| | del data["folder_description"] |
| |
|
| | if "flows" in data: |
| | flow_list = FlowListCreate(flows=[FlowCreate(**flow) for flow in data["flows"]]) |
| | else: |
| | raise HTTPException(status_code=400, detail="No flows found in the data") |
| | |
| | for flow in flow_list.flows: |
| | flow_name = await generate_unique_flow_name(flow.name, current_user.id, session) |
| | flow.name = flow_name |
| | flow.user_id = current_user.id |
| | flow.folder_id = new_folder.id |
| |
|
| | return await create_flows(session=session, flow_list=flow_list, current_user=current_user) |
| |
|