Spaces:
Running
Running
| import io | |
| import json | |
| import zipfile | |
| from datetime import datetime, timezone | |
| from typing import Annotated | |
| import orjson | |
| from fastapi import APIRouter, Depends, File, HTTPException, Response, UploadFile, status | |
| from fastapi.encoders import jsonable_encoder | |
| from fastapi.responses import StreamingResponse | |
| from fastapi_pagination import Params | |
| from fastapi_pagination.ext.sqlmodel import paginate | |
| from sqlalchemy import or_, update | |
| from sqlalchemy.orm import selectinload | |
| from sqlmodel import select | |
| from langflow.api.utils import CurrentActiveUser, DbSession, cascade_delete_flow, custom_params, remove_api_keys | |
| from langflow.api.v1.flows import create_flows | |
| from langflow.api.v1.schemas import FlowListCreate | |
| from langflow.helpers.flow import generate_unique_flow_name | |
| from langflow.helpers.folders import generate_unique_folder_name | |
| from langflow.initial_setup.constants import STARTER_FOLDER_NAME | |
| from langflow.services.database.models.flow.model import Flow, FlowCreate, FlowRead | |
| from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME | |
| from langflow.services.database.models.folder.model import ( | |
| Folder, | |
| FolderCreate, | |
| FolderRead, | |
| FolderReadWithFlows, | |
| FolderUpdate, | |
| ) | |
| from langflow.services.database.models.folder.pagination_model import FolderWithPaginatedFlows | |
| router = APIRouter(prefix="/folders", tags=["Folders"]) | |
| async def create_folder( | |
| *, | |
| session: DbSession, | |
| folder: FolderCreate, | |
| current_user: CurrentActiveUser, | |
| ): | |
| try: | |
| new_folder = Folder.model_validate(folder, from_attributes=True) | |
| new_folder.user_id = current_user.id | |
| # First check if the folder.name is unique | |
| # there might be flows with name like: "MyFlow", "MyFlow (1)", "MyFlow (2)" | |
| # so we need to check if the name is unique with `like` operator | |
| # if we find a flow with the same name, we add a number to the end of the name | |
| # based on the highest number found | |
| if ( | |
| await session.exec( | |
| statement=select(Folder).where(Folder.name == new_folder.name).where(Folder.user_id == current_user.id) | |
| ) | |
| ).first(): | |
| folder_results = await session.exec( | |
| select(Folder).where( | |
| Folder.name.like(f"{new_folder.name}%"), # type: ignore[attr-defined] | |
| Folder.user_id == current_user.id, | |
| ) | |
| ) | |
| if folder_results: | |
| folder_names = [folder.name for folder in folder_results] | |
| folder_numbers = [int(name.split("(")[-1].split(")")[0]) for name in folder_names if "(" in name] | |
| if folder_numbers: | |
| new_folder.name = f"{new_folder.name} ({max(folder_numbers) + 1})" | |
| else: | |
| new_folder.name = f"{new_folder.name} (1)" | |
| session.add(new_folder) | |
| await session.commit() | |
| await session.refresh(new_folder) | |
| if folder.components_list: | |
| update_statement_components = ( | |
| update(Flow).where(Flow.id.in_(folder.components_list)).values(folder_id=new_folder.id) # type: ignore[attr-defined] | |
| ) | |
| await session.exec(update_statement_components) | |
| await session.commit() | |
| if folder.flows_list: | |
| update_statement_flows = update(Flow).where(Flow.id.in_(folder.flows_list)).values(folder_id=new_folder.id) # type: ignore[attr-defined] | |
| await session.exec(update_statement_flows) | |
| await session.commit() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| return new_folder | |
| async def read_folders( | |
| *, | |
| session: DbSession, | |
| current_user: CurrentActiveUser, | |
| ): | |
| try: | |
| folders = ( | |
| await session.exec( | |
| select(Folder).where( | |
| or_(Folder.user_id == current_user.id, Folder.user_id == None) # noqa: E711 | |
| ) | |
| ) | |
| ).all() | |
| folders = [folder for folder in folders if folder.name != STARTER_FOLDER_NAME] | |
| return sorted(folders, key=lambda x: x.name != DEFAULT_FOLDER_NAME) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| async def read_folder( | |
| *, | |
| session: DbSession, | |
| folder_id: str, | |
| current_user: CurrentActiveUser, | |
| params: Annotated[Params | None, Depends(custom_params)], | |
| is_component: bool = False, | |
| is_flow: bool = False, | |
| search: str = "", | |
| ): | |
| try: | |
| folder = ( | |
| await session.exec( | |
| select(Folder) | |
| .options(selectinload(Folder.flows)) | |
| .where(Folder.id == folder_id, Folder.user_id == current_user.id) | |
| ) | |
| ).first() | |
| except Exception as e: | |
| if "No result found" in str(e): | |
| raise HTTPException(status_code=404, detail="Folder not found") from e | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| if not folder: | |
| raise HTTPException(status_code=404, detail="Folder not found") | |
| try: | |
| if params and params.page and params.size: | |
| stmt = select(Flow).where(Flow.folder_id == folder_id) | |
| if Flow.updated_at is not None: | |
| stmt = stmt.order_by(Flow.updated_at.desc()) # type: ignore[attr-defined] | |
| if is_component: | |
| stmt = stmt.where(Flow.is_component == True) # noqa: E712 | |
| if is_flow: | |
| stmt = stmt.where(Flow.is_component == False) # noqa: E712 | |
| if search: | |
| stmt = stmt.where(Flow.name.like(f"%{search}%")) # type: ignore[attr-defined] | |
| paginated_flows = await paginate(session, stmt, params=params) | |
| return FolderWithPaginatedFlows(folder=FolderRead.model_validate(folder), flows=paginated_flows) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| flows_from_current_user_in_folder = [flow for flow in folder.flows if flow.user_id == current_user.id] | |
| folder.flows = flows_from_current_user_in_folder | |
| return folder | |
| async def update_folder( | |
| *, | |
| session: DbSession, | |
| folder_id: str, | |
| folder: FolderUpdate, # Assuming FolderUpdate is a Pydantic model defining updatable fields | |
| current_user: CurrentActiveUser, | |
| ): | |
| try: | |
| existing_folder = ( | |
| await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)) | |
| ).first() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| if not existing_folder: | |
| raise HTTPException(status_code=404, detail="Folder not found") | |
| try: | |
| if folder.name and folder.name != existing_folder.name: | |
| existing_folder.name = folder.name | |
| session.add(existing_folder) | |
| await session.commit() | |
| await session.refresh(existing_folder) | |
| return existing_folder | |
| folder_data = existing_folder.model_dump(exclude_unset=True) | |
| for key, value in folder_data.items(): | |
| if key not in {"components", "flows"}: | |
| setattr(existing_folder, key, value) | |
| session.add(existing_folder) | |
| await session.commit() | |
| await session.refresh(existing_folder) | |
| concat_folder_components = folder.components + folder.flows | |
| flows_ids = (await session.exec(select(Flow.id).where(Flow.folder_id == existing_folder.id))).all() | |
| excluded_flows = list(set(flows_ids) - set(concat_folder_components)) | |
| my_collection_folder = (await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME))).first() | |
| if my_collection_folder: | |
| update_statement_my_collection = ( | |
| update(Flow).where(Flow.id.in_(excluded_flows)).values(folder_id=my_collection_folder.id) # type: ignore[attr-defined] | |
| ) | |
| await session.exec(update_statement_my_collection) | |
| await session.commit() | |
| if concat_folder_components: | |
| update_statement_components = ( | |
| update(Flow).where(Flow.id.in_(concat_folder_components)).values(folder_id=existing_folder.id) # type: ignore[attr-defined] | |
| ) | |
| await session.exec(update_statement_components) | |
| await session.commit() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| return existing_folder | |
| async def delete_folder( | |
| *, | |
| session: DbSession, | |
| folder_id: str, | |
| current_user: CurrentActiveUser, | |
| ): | |
| try: | |
| flows = ( | |
| await session.exec(select(Flow).where(Flow.folder_id == folder_id, Flow.user_id == current_user.id)) | |
| ).all() | |
| if len(flows) > 0: | |
| for flow in flows: | |
| await cascade_delete_flow(session, flow.id) | |
| folder = ( | |
| await session.exec(select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id)) | |
| ).first() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| if not folder: | |
| raise HTTPException(status_code=404, detail="Folder not found") | |
| try: | |
| await session.delete(folder) | |
| await session.commit() | |
| return Response(status_code=status.HTTP_204_NO_CONTENT) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| async def download_file( | |
| *, | |
| session: DbSession, | |
| folder_id: str, | |
| current_user: CurrentActiveUser, | |
| ): | |
| """Download all flows from folder as a zip file.""" | |
| try: | |
| query = select(Folder).where(Folder.id == folder_id, Folder.user_id == current_user.id) | |
| result = await session.exec(query) | |
| folder = result.first() | |
| if not folder: | |
| raise HTTPException(status_code=404, detail="Folder not found") | |
| flows_query = select(Flow).where(Flow.folder_id == folder_id) | |
| flows_result = await session.exec(flows_query) | |
| flows = [FlowRead.model_validate(flow, from_attributes=True) for flow in flows_result.all()] | |
| if not flows: | |
| raise HTTPException(status_code=404, detail="No flows found in folder") | |
| flows_without_api_keys = [remove_api_keys(flow.model_dump()) for flow in flows] | |
| zip_stream = io.BytesIO() | |
| with zipfile.ZipFile(zip_stream, "w") as zip_file: | |
| for flow in flows_without_api_keys: | |
| flow_json = json.dumps(jsonable_encoder(flow)) | |
| zip_file.writestr(f"{flow['name']}.json", flow_json) | |
| zip_stream.seek(0) | |
| current_time = datetime.now(tz=timezone.utc).astimezone().strftime("%Y%m%d_%H%M%S") | |
| filename = f"{current_time}_{folder.name}_flows.zip" | |
| return StreamingResponse( | |
| zip_stream, | |
| media_type="application/x-zip-compressed", | |
| headers={"Content-Disposition": f"attachment; filename={filename}"}, | |
| ) | |
| except Exception as e: | |
| if "No result found" in str(e): | |
| raise HTTPException(status_code=404, detail="Folder not found") from e | |
| raise HTTPException(status_code=500, detail=str(e)) from e | |
| async def upload_file( | |
| *, | |
| session: DbSession, | |
| file: Annotated[UploadFile, File(...)], | |
| current_user: CurrentActiveUser, | |
| ): | |
| """Upload flows from a file.""" | |
| contents = await file.read() | |
| data = orjson.loads(contents) | |
| if not data: | |
| raise HTTPException(status_code=400, detail="No flows found in the file") | |
| folder_name = await generate_unique_folder_name(data["folder_name"], current_user.id, session) | |
| data["folder_name"] = folder_name | |
| folder = FolderCreate(name=data["folder_name"], description=data["folder_description"]) | |
| new_folder = Folder.model_validate(folder, from_attributes=True) | |
| new_folder.id = None | |
| new_folder.user_id = current_user.id | |
| session.add(new_folder) | |
| await session.commit() | |
| await session.refresh(new_folder) | |
| del data["folder_name"] | |
| del data["folder_description"] | |
| if "flows" in data: | |
| flow_list = FlowListCreate(flows=[FlowCreate(**flow) for flow in data["flows"]]) | |
| else: | |
| raise HTTPException(status_code=400, detail="No flows found in the data") | |
| # Now we set the user_id for all flows | |
| for flow in flow_list.flows: | |
| flow_name = await generate_unique_flow_name(flow.name, current_user.id, session) | |
| flow.name = flow_name | |
| flow.user_id = current_user.id | |
| flow.folder_id = new_folder.id | |
| return await create_flows(session=session, flow_list=flow_list, current_user=current_user) | |