File size: 5,740 Bytes
cfb0fa4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | from open_webui.routers.images import (
get_image_data,
upload_image,
)
from fastapi import (
APIRouter,
Depends,
HTTPException,
Request,
UploadFile,
)
from typing import Optional
from pathlib import Path
from open_webui.storage.provider import Storage
from open_webui.models.chats import Chats
from open_webui.models.files import Files
from open_webui.routers.files import upload_file_handler
from open_webui.retrieval.web.utils import validate_url
import mimetypes
import base64
import io
import re
import requests
BASE64_IMAGE_URL_PREFIX = re.compile(r"data:image/\w+;base64,", re.IGNORECASE)
MARKDOWN_IMAGE_URL_PATTERN = re.compile(r"!\[(.*?)\]\((.+?)\)", re.IGNORECASE)
def get_image_base64_from_url(url: str) -> Optional[str]:
try:
if url.startswith("http"):
# Validate URL to prevent SSRF attacks against local/private networks
validate_url(url)
# Download the image from the URL
response = requests.get(url)
response.raise_for_status()
image_data = response.content
encoded_string = base64.b64encode(image_data).decode("utf-8")
content_type = response.headers.get("Content-Type", "image/png")
return f"data:{content_type};base64,{encoded_string}"
else:
file = Files.get_file_by_id(url)
if not file:
return None
file_path = Storage.get_file(file.path)
file_path = Path(file_path)
if file_path.is_file():
with open(file_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
content_type, _ = mimetypes.guess_type(file_path.name)
return f"data:{content_type};base64,{encoded_string}"
else:
return None
except Exception as e:
return None
def get_image_url_from_base64(request, base64_image_string, metadata, user):
if BASE64_IMAGE_URL_PREFIX.match(base64_image_string):
image_url = ""
# Extract base64 image data from the line
image_data, content_type = get_image_data(base64_image_string)
if image_data is not None:
_, image_url = upload_image(
request,
image_data,
content_type,
metadata,
user,
)
return image_url
return None
def convert_markdown_base64_images(request, content: str, metadata, user):
def replace(match):
base64_string = match.group(2)
MIN_REPLACEMENT_URL_LENGTH = 1024
if len(base64_string) > MIN_REPLACEMENT_URL_LENGTH:
url = get_image_url_from_base64(request, base64_string, metadata, user)
if url:
return f""
return match.group(0)
return MARKDOWN_IMAGE_URL_PATTERN.sub(replace, content)
def load_b64_audio_data(b64_str):
try:
if "," in b64_str:
header, b64_data = b64_str.split(",", 1)
else:
b64_data = b64_str
header = "data:audio/wav;base64"
audio_data = base64.b64decode(b64_data)
content_type = (
header.split(";")[0].split(":")[1] if ";" in header else "audio/wav"
)
return audio_data, content_type
except Exception as e:
print(f"Error decoding base64 audio data: {e}")
return None, None
def upload_audio(request, audio_data, content_type, metadata, user):
audio_format = mimetypes.guess_extension(content_type)
file = UploadFile(
file=io.BytesIO(audio_data),
filename=f"generated-{audio_format}", # will be converted to a unique ID on upload_file
headers={
"content-type": content_type,
},
)
file_item = upload_file_handler(
request,
file=file,
metadata=metadata,
process=False,
user=user,
)
url = request.app.url_path_for("get_file_content_by_id", id=file_item.id)
return url
def get_audio_url_from_base64(request, base64_audio_string, metadata, user):
if "data:audio/wav;base64" in base64_audio_string:
audio_url = ""
# Extract base64 audio data from the line
audio_data, content_type = load_b64_audio_data(base64_audio_string)
if audio_data is not None:
audio_url = upload_audio(
request,
audio_data,
content_type,
metadata,
user,
)
return audio_url
return None
def get_file_url_from_base64(request, base64_file_string, metadata, user):
if "data:image/png;base64" in base64_file_string:
return get_image_url_from_base64(request, base64_file_string, metadata, user)
elif "data:audio/wav;base64" in base64_file_string:
return get_audio_url_from_base64(request, base64_file_string, metadata, user)
return None
def get_image_base64_from_file_id(id: str) -> Optional[str]:
file = Files.get_file_by_id(id)
if not file:
return None
try:
file_path = Storage.get_file(file.path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
import base64
with open(file_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
content_type, _ = mimetypes.guess_type(file_path.name)
return f"data:{content_type};base64,{encoded_string}"
else:
return None
except Exception as e:
return None
|