| | import base64 |
| | import os |
| | import platform |
| | from collections import OrderedDict |
| | from io import BytesIO |
| | from pathlib import Path |
| | from typing import Sequence |
| |
|
| | import requests |
| | from PIL import Image |
| |
|
| |
|
| | class LRUCache: |
| | |
| | def __init__(self, capacity: int): |
| | self.cache = OrderedDict() |
| | self.capacity = capacity |
| |
|
| | def has(self, key) -> bool: |
| | return key in self.cache |
| |
|
| | |
| | |
| | |
| | |
| | |
| | def get(self, key, default=None): |
| | if key not in self.cache: |
| | return default |
| | else: |
| | self.cache.move_to_end(key) |
| | return self.cache[key] |
| |
|
| | |
| | |
| | |
| | |
| | |
| | def put(self, key, value) -> None: |
| | self.cache[key] = value |
| | self.cache.move_to_end(key) |
| | if len(self.cache) > self.capacity: |
| | self.cache.popitem(last=False) |
| |
|
| | def pop(self, key, value): |
| | self.cache.pop(key, None) |
| |
|
| |
|
| | def handle_response(res, json, url): |
| | if res.status_code < 299: |
| | return res.json() |
| | elif res.status_code == 404: |
| | logging.error("Request URL: {} | Error[404]: 请求错误: 错误的地址".format(url)) |
| | raise VQLError(516) |
| | elif res.status_code == 422: |
| | logging.error( |
| | "Request URL: {} | Request body: {} | Error[422]: 请求错误: 错误的请求格式".format( |
| | url, json |
| | ) |
| | ) |
| | raise VQLError(517) |
| | else: |
| | info = res.json() |
| | logging.error( |
| | "Request URL: {} | Request body: {} | Error: {}".format(url, json, info) |
| | ) |
| | raise VQLError(511, detail=info) |
| |
|
| |
|
| | def chunks(l: Sequence, win_len: int, stride_len: int): |
| | s_id = 0 |
| | e_id = min(len(l), win_len) |
| |
|
| | while True: |
| | yield l[s_id:e_id] |
| |
|
| | if e_id == len(l): |
| | break |
| |
|
| | s_id = s_id + stride_len |
| | e_id = min(s_id + win_len, len(l)) |
| |
|
| |
|
| | def encode_image(input): |
| | def _encode(img): |
| | output_buffer = BytesIO() |
| | img.save(output_buffer, format="JPEG") |
| | byte_data = output_buffer.getvalue() |
| | base64_str = base64.b64encode(byte_data) |
| | base64_str = str(base64_str, encoding="utf-8") |
| | return base64_str |
| |
|
| | input = input.convert("RGB") |
| | if isinstance(input, list): |
| | res = [] |
| | for img in input: |
| | res.append(_encode(img)) |
| | return res |
| | else: |
| | return _encode(input) |
| |
|
| |
|
| | def get_platform() -> str: |
| | """Get platform.""" |
| | system = platform.system() |
| | if system == "Darwin": |
| | return "MacOS" |
| | return system |
| |
|
| |
|
| | def read_image(input_source) -> Image.Image: |
| | """ |
| | Read an image from a local path, URL, PIL Image object, or Path object. |
| | |
| | Args: |
| | input_source (str or PIL.Image.Image or Path): The source of the image. |
| | Can be a local file path, a URL, a PIL Image object, or a Path object. |
| | |
| | Returns: |
| | PIL.Image.Image: The image as a PIL Image object. |
| | |
| | Raises: |
| | ValueError: If the input source is invalid or the image cannot be read. |
| | """ |
| | if isinstance(input_source, Image.Image): |
| | return input_source |
| |
|
| | if isinstance(input_source, (str, Path)): |
| | if isinstance(input_source, str) and input_source.startswith( |
| | ("http://", "https://") |
| | ): |
| | |
| | try: |
| | response = requests.get(input_source) |
| | response.raise_for_status() |
| | return Image.open(BytesIO(response.content)) |
| | except requests.RequestException as e: |
| | raise ValueError(f"Failed to fetch image from URL: {e}") |
| | elif os.path.isfile(str(input_source)): |
| | |
| | try: |
| | return Image.open(input_source) |
| | except IOError as e: |
| | raise ValueError(f"Failed to open local image file: {e}") |
| | else: |
| | raise ValueError( |
| | "Invalid input source. Must be a valid URL or local file path." |
| | ) |
| |
|
| | raise ValueError( |
| | "Invalid input type. Must be a string (URL or file path), Path object, or PIL Image object." |
| | ) |
| |
|