Spaces:
Paused
Paused
| import base64 | |
| from configs import dify_config | |
| from core.file import file_repository | |
| from core.helper import ssrf_proxy | |
| from core.model_runtime.entities import AudioPromptMessageContent, ImagePromptMessageContent | |
| from extensions.ext_database import db | |
| from extensions.ext_storage import storage | |
| from . import helpers | |
| from .enums import FileAttribute | |
| from .models import File, FileTransferMethod, FileType | |
| from .tool_file_parser import ToolFileParser | |
| def get_attr(*, file: File, attr: FileAttribute): | |
| match attr: | |
| case FileAttribute.TYPE: | |
| return file.type.value | |
| case FileAttribute.SIZE: | |
| return file.size | |
| case FileAttribute.NAME: | |
| return file.filename | |
| case FileAttribute.MIME_TYPE: | |
| return file.mime_type | |
| case FileAttribute.TRANSFER_METHOD: | |
| return file.transfer_method.value | |
| case FileAttribute.URL: | |
| return file.remote_url | |
| case FileAttribute.EXTENSION: | |
| return file.extension | |
| case _: | |
| raise ValueError(f"Invalid file attribute: {attr}") | |
| def to_prompt_message_content(f: File, /): | |
| """ | |
| Convert a File object to an ImagePromptMessageContent object. | |
| This function takes a File object and converts it to an ImagePromptMessageContent | |
| object, which can be used as a prompt for image-based AI models. | |
| Args: | |
| file (File): The File object to convert. Must be of type FileType.IMAGE. | |
| Returns: | |
| ImagePromptMessageContent: An object containing the image data and detail level. | |
| Raises: | |
| ValueError: If the file is not an image or if the file data is missing. | |
| Note: | |
| The detail level of the image prompt is determined by the file's extra_config. | |
| If not specified, it defaults to ImagePromptMessageContent.DETAIL.LOW. | |
| """ | |
| match f.type: | |
| case FileType.IMAGE: | |
| if dify_config.MULTIMODAL_SEND_IMAGE_FORMAT == "url": | |
| data = _to_url(f) | |
| else: | |
| data = _to_base64_data_string(f) | |
| if f._extra_config and f._extra_config.image_config and f._extra_config.image_config.detail: | |
| detail = f._extra_config.image_config.detail | |
| else: | |
| detail = ImagePromptMessageContent.DETAIL.LOW | |
| return ImagePromptMessageContent(data=data, detail=detail) | |
| case FileType.AUDIO: | |
| encoded_string = _file_to_encoded_string(f) | |
| if f.extension is None: | |
| raise ValueError("Missing file extension") | |
| return AudioPromptMessageContent(data=encoded_string, format=f.extension.lstrip(".")) | |
| case _: | |
| raise ValueError(f"file type {f.type} is not supported") | |
| def download(f: File, /): | |
| if f.transfer_method == FileTransferMethod.TOOL_FILE: | |
| tool_file = file_repository.get_tool_file(session=db.session(), file=f) | |
| return _download_file_content(tool_file.file_key) | |
| elif f.transfer_method == FileTransferMethod.LOCAL_FILE: | |
| upload_file = file_repository.get_upload_file(session=db.session(), file=f) | |
| return _download_file_content(upload_file.key) | |
| # remote file | |
| response = ssrf_proxy.get(f.remote_url, follow_redirects=True) | |
| response.raise_for_status() | |
| return response.content | |
| def _download_file_content(path: str, /): | |
| """ | |
| Download and return the contents of a file as bytes. | |
| This function loads the file from storage and ensures it's in bytes format. | |
| Args: | |
| path (str): The path to the file in storage. | |
| Returns: | |
| bytes: The contents of the file as a bytes object. | |
| Raises: | |
| ValueError: If the loaded file is not a bytes object. | |
| """ | |
| data = storage.load(path, stream=False) | |
| if not isinstance(data, bytes): | |
| raise ValueError(f"file {path} is not a bytes object") | |
| return data | |
| def _get_encoded_string(f: File, /): | |
| match f.transfer_method: | |
| case FileTransferMethod.REMOTE_URL: | |
| response = ssrf_proxy.get(f.remote_url) | |
| response.raise_for_status() | |
| content = response.content | |
| encoded_string = base64.b64encode(content).decode("utf-8") | |
| return encoded_string | |
| case FileTransferMethod.LOCAL_FILE: | |
| upload_file = file_repository.get_upload_file(session=db.session(), file=f) | |
| data = _download_file_content(upload_file.key) | |
| encoded_string = base64.b64encode(data).decode("utf-8") | |
| return encoded_string | |
| case FileTransferMethod.TOOL_FILE: | |
| tool_file = file_repository.get_tool_file(session=db.session(), file=f) | |
| data = _download_file_content(tool_file.file_key) | |
| encoded_string = base64.b64encode(data).decode("utf-8") | |
| return encoded_string | |
| case _: | |
| raise ValueError(f"Unsupported transfer method: {f.transfer_method}") | |
| def _to_base64_data_string(f: File, /): | |
| encoded_string = _get_encoded_string(f) | |
| return f"data:{f.mime_type};base64,{encoded_string}" | |
| def _file_to_encoded_string(f: File, /): | |
| match f.type: | |
| case FileType.IMAGE: | |
| return _to_base64_data_string(f) | |
| case FileType.AUDIO: | |
| return _get_encoded_string(f) | |
| case _: | |
| raise ValueError(f"file type {f.type} is not supported") | |
| def _to_url(f: File, /): | |
| if f.transfer_method == FileTransferMethod.REMOTE_URL: | |
| if f.remote_url is None: | |
| raise ValueError("Missing file remote_url") | |
| return f.remote_url | |
| elif f.transfer_method == FileTransferMethod.LOCAL_FILE: | |
| if f.related_id is None: | |
| raise ValueError("Missing file related_id") | |
| return helpers.get_signed_file_url(upload_file_id=f.related_id) | |
| elif f.transfer_method == FileTransferMethod.TOOL_FILE: | |
| # add sign url | |
| if f.related_id is None or f.extension is None: | |
| raise ValueError("Missing file related_id or extension") | |
| return ToolFileParser.get_tool_file_manager().sign_file(tool_file_id=f.related_id, extension=f.extension) | |
| else: | |
| raise ValueError(f"Unsupported transfer method: {f.transfer_method}") | |