|
|
import base64 |
|
|
import contextlib |
|
|
import hashlib |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
from typing import TYPE_CHECKING, Any |
|
|
|
|
|
from fastapi import UploadFile |
|
|
from platformdirs import user_cache_dir |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from langflow.api.v1.schemas import BuildStatus |
|
|
|
|
|
CACHE: dict[str, Any] = {} |
|
|
|
|
|
CACHE_DIR = user_cache_dir("langflow", "langflow") |
|
|
|
|
|
PREFIX = "langflow_cache" |
|
|
|
|
|
|
|
|
class CacheMiss: |
|
|
def __repr__(self) -> str: |
|
|
return "<CACHE_MISS>" |
|
|
|
|
|
def __bool__(self) -> bool: |
|
|
return False |
|
|
|
|
|
|
|
|
def create_cache_folder(func): |
|
|
def wrapper(*args, **kwargs): |
|
|
|
|
|
cache_path = Path(CACHE_DIR) / PREFIX |
|
|
|
|
|
|
|
|
cache_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
return func(*args, **kwargs) |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
@create_cache_folder |
|
|
def clear_old_cache_files(max_cache_size: int = 3) -> None: |
|
|
cache_dir = Path(tempfile.gettempdir()) / PREFIX |
|
|
cache_files = list(cache_dir.glob("*.dill")) |
|
|
|
|
|
if len(cache_files) > max_cache_size: |
|
|
cache_files_sorted_by_mtime = sorted(cache_files, key=lambda x: x.stat().st_mtime, reverse=True) |
|
|
|
|
|
for cache_file in cache_files_sorted_by_mtime[max_cache_size:]: |
|
|
with contextlib.suppress(OSError): |
|
|
cache_file.unlink() |
|
|
|
|
|
|
|
|
def filter_json(json_data): |
|
|
filtered_data = json_data.copy() |
|
|
|
|
|
|
|
|
if "viewport" in filtered_data: |
|
|
del filtered_data["viewport"] |
|
|
if "chatHistory" in filtered_data: |
|
|
del filtered_data["chatHistory"] |
|
|
|
|
|
|
|
|
if "nodes" in filtered_data: |
|
|
for node in filtered_data["nodes"]: |
|
|
if "position" in node: |
|
|
del node["position"] |
|
|
if "positionAbsolute" in node: |
|
|
del node["positionAbsolute"] |
|
|
if "selected" in node: |
|
|
del node["selected"] |
|
|
if "dragging" in node: |
|
|
del node["dragging"] |
|
|
|
|
|
return filtered_data |
|
|
|
|
|
|
|
|
@create_cache_folder |
|
|
def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str: |
|
|
"""Save a binary file to the specified folder. |
|
|
|
|
|
Args: |
|
|
content: The content of the file as a bytes object. |
|
|
file_name: The name of the file, including its extension. |
|
|
accepted_types: A list of accepted file types. |
|
|
|
|
|
Returns: |
|
|
The path to the saved file. |
|
|
""" |
|
|
if not any(file_name.endswith(suffix) for suffix in accepted_types): |
|
|
msg = f"File {file_name} is not accepted" |
|
|
raise ValueError(msg) |
|
|
|
|
|
|
|
|
cache_path = Path(CACHE_DIR) / PREFIX |
|
|
if not content: |
|
|
msg = "Please, reload the file in the loader." |
|
|
raise ValueError(msg) |
|
|
data = content.split(",")[1] |
|
|
decoded_bytes = base64.b64decode(data) |
|
|
|
|
|
|
|
|
file_path = cache_path / file_name |
|
|
|
|
|
|
|
|
file_path.write_bytes(decoded_bytes) |
|
|
|
|
|
return str(file_path) |
|
|
|
|
|
|
|
|
@create_cache_folder |
|
|
def save_uploaded_file(file: UploadFile, folder_name): |
|
|
"""Save an uploaded file to the specified folder with a hash of its content as the file name. |
|
|
|
|
|
Args: |
|
|
file: The uploaded file object. |
|
|
folder_name: The name of the folder to save the file in. |
|
|
|
|
|
Returns: |
|
|
The path to the saved file. |
|
|
""" |
|
|
cache_path = Path(CACHE_DIR) |
|
|
folder_path = cache_path / folder_name |
|
|
filename = file.filename |
|
|
file_extension = Path(filename).suffix if isinstance(filename, str | Path) else "" |
|
|
file_object = file.file |
|
|
|
|
|
|
|
|
if not folder_path.exists(): |
|
|
folder_path.mkdir() |
|
|
|
|
|
|
|
|
sha256_hash = hashlib.sha256() |
|
|
|
|
|
file_object.seek(0) |
|
|
|
|
|
while chunk := file_object.read(8192): |
|
|
sha256_hash.update(chunk) |
|
|
|
|
|
|
|
|
hex_dig = sha256_hash.hexdigest() |
|
|
file_name = f"{hex_dig}{file_extension}" |
|
|
|
|
|
|
|
|
file_object.seek(0) |
|
|
|
|
|
|
|
|
file_path = folder_path / file_name |
|
|
|
|
|
with file_path.open("wb") as new_file: |
|
|
while chunk := file_object.read(8192): |
|
|
new_file.write(chunk) |
|
|
|
|
|
return file_path |
|
|
|
|
|
|
|
|
def update_build_status(cache_service, flow_id: str, status: "BuildStatus") -> None: |
|
|
cached_flow = cache_service[flow_id] |
|
|
if cached_flow is None: |
|
|
msg = f"Flow {flow_id} not found in cache" |
|
|
raise ValueError(msg) |
|
|
cached_flow["status"] = status |
|
|
cache_service[flow_id] = cached_flow |
|
|
cached_flow["status"] = status |
|
|
cache_service[flow_id] = cached_flow |
|
|
|
|
|
|
|
|
CACHE_MISS = CacheMiss() |
|
|
|