Buckets:
| from __future__ import annotations | |
| from io import BytesIO | |
| from . import Image, ImageFile | |
| try: | |
| from . import _webp | |
| SUPPORTED = True | |
| except ImportError: | |
| SUPPORTED = False | |
| TYPE_CHECKING = False | |
| if TYPE_CHECKING: | |
| from typing import IO, Any | |
| _VP8_MODES_BY_IDENTIFIER = { | |
| b"VP8 ": "RGB", | |
| b"VP8X": "RGBA", | |
| b"VP8L": "RGBA", # lossless | |
| } | |
| def _accept(prefix: bytes) -> bool | str: | |
| is_riff_file_format = prefix.startswith(b"RIFF") | |
| is_webp_file = prefix[8:12] == b"WEBP" | |
| is_valid_vp8_mode = prefix[12:16] in _VP8_MODES_BY_IDENTIFIER | |
| if is_riff_file_format and is_webp_file and is_valid_vp8_mode: | |
| if not SUPPORTED: | |
| return ( | |
| "image file could not be identified because WEBP support not installed" | |
| ) | |
| return True | |
| return False | |
| class WebPImageFile(ImageFile.ImageFile): | |
| format = "WEBP" | |
| format_description = "WebP image" | |
| __loaded = 0 | |
| __logical_frame = 0 | |
| def _open(self) -> None: | |
| # Use the newer AnimDecoder API to parse the (possibly) animated file, | |
| # and access muxed chunks like ICC/EXIF/XMP. | |
| assert self.fp is not None | |
| self._decoder = _webp.WebPAnimDecoder(self.fp.read()) | |
| # Get info from decoder | |
| self._size, self.info["loop"], bgcolor, self.n_frames, self.rawmode = ( | |
| self._decoder.get_info() | |
| ) | |
| self.info["background"] = ( | |
| (bgcolor >> 16) & 0xFF, # R | |
| (bgcolor >> 8) & 0xFF, # G | |
| bgcolor & 0xFF, # B | |
| (bgcolor >> 24) & 0xFF, # A | |
| ) | |
| self.is_animated = self.n_frames > 1 | |
| self._mode = "RGB" if self.rawmode == "RGBX" else self.rawmode | |
| # Attempt to read ICC / EXIF / XMP chunks from file | |
| for key, chunk_name in { | |
| "icc_profile": "ICCP", | |
| "exif": "EXIF", | |
| "xmp": "XMP ", | |
| }.items(): | |
| if value := self._decoder.get_chunk(chunk_name): | |
| self.info[key] = value | |
| # Initialize seek state | |
| self._reset(reset=False) | |
| def _getexif(self) -> dict[int, Any] | None: | |
| if "exif" not in self.info: | |
| return None | |
| return self.getexif()._get_merged_dict() | |
| def seek(self, frame: int) -> None: | |
| if not self._seek_check(frame): | |
| return | |
| # Set logical frame to requested position | |
| self.__logical_frame = frame | |
| def _reset(self, reset: bool = True) -> None: | |
| if reset: | |
| self._decoder.reset() | |
| self.__physical_frame = 0 | |
| self.__loaded = -1 | |
| self.__timestamp = 0 | |
| def _get_next(self) -> tuple[bytes, int, int]: | |
| # Get next frame | |
| ret = self._decoder.get_next() | |
| self.__physical_frame += 1 | |
| # Check if an error occurred | |
| if ret is None: | |
| self._reset() # Reset just to be safe | |
| self.seek(0) | |
| msg = "failed to decode next frame in WebP file" | |
| raise EOFError(msg) | |
| # Compute duration | |
| data, timestamp = ret | |
| duration = timestamp - self.__timestamp | |
| self.__timestamp = timestamp | |
| # libwebp gives frame end, adjust to start of frame | |
| timestamp -= duration | |
| return data, timestamp, duration | |
| def _seek(self, frame: int) -> None: | |
| if self.__physical_frame == frame: | |
| return # Nothing to do | |
| if frame < self.__physical_frame: | |
| self._reset() # Rewind to beginning | |
| while self.__physical_frame < frame: | |
| self._get_next() # Advance to the requested frame | |
| def load(self) -> Image.core.PixelAccess | None: | |
| if self.__loaded != self.__logical_frame: | |
| self._seek(self.__logical_frame) | |
| # We need to load the image data for this frame | |
| data, self.info["timestamp"], self.info["duration"] = self._get_next() | |
| self.__loaded = self.__logical_frame | |
| # Set tile | |
| if self.fp and self._exclusive_fp: | |
| self.fp.close() | |
| self.fp = BytesIO(data) | |
| self.tile = [ImageFile._Tile("raw", (0, 0) + self.size, 0, self.rawmode)] | |
| return super().load() | |
| def load_seek(self, pos: int) -> None: | |
| pass | |
| def tell(self) -> int: | |
| return self.__logical_frame | |
| def _convert_frame(im: Image.Image) -> Image.Image: | |
| # Make sure image mode is supported | |
| if im.mode not in ("RGBX", "RGBA", "RGB"): | |
| im = im.convert("RGBA" if im.has_transparency_data else "RGB") | |
| return im | |
| def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: | |
| encoderinfo = im.encoderinfo.copy() | |
| append_images = list(encoderinfo.get("append_images", [])) | |
| # If total frame count is 1, then save using the legacy API, which | |
| # will preserve non-alpha modes | |
| total = 0 | |
| for ims in [im] + append_images: | |
| total += getattr(ims, "n_frames", 1) | |
| if total == 1: | |
| _save(im, fp, filename) | |
| return | |
| background: int | tuple[int, ...] = (0, 0, 0, 0) | |
| if "background" in encoderinfo: | |
| background = encoderinfo["background"] | |
| elif "background" in im.info: | |
| background = im.info["background"] | |
| if isinstance(background, int): | |
| # GifImagePlugin stores a global color table index in | |
| # info["background"]. So it must be converted to an RGBA value | |
| palette = im.getpalette() | |
| if palette: | |
| r, g, b = palette[background * 3 : (background + 1) * 3] | |
| background = (r, g, b, 255) | |
| else: | |
| background = (background, background, background, 255) | |
| duration = im.encoderinfo.get("duration", im.info.get("duration", 0)) | |
| loop = im.encoderinfo.get("loop", 0) | |
| minimize_size = im.encoderinfo.get("minimize_size", False) | |
| kmin = im.encoderinfo.get("kmin", None) | |
| kmax = im.encoderinfo.get("kmax", None) | |
| allow_mixed = im.encoderinfo.get("allow_mixed", False) | |
| verbose = False | |
| lossless = im.encoderinfo.get("lossless", False) | |
| quality = im.encoderinfo.get("quality", 80) | |
| alpha_quality = im.encoderinfo.get("alpha_quality", 100) | |
| method = im.encoderinfo.get("method", 0) | |
| icc_profile = im.encoderinfo.get("icc_profile") or "" | |
| exif = im.encoderinfo.get("exif", "") | |
| if isinstance(exif, Image.Exif): | |
| exif = exif.tobytes() | |
| xmp = im.encoderinfo.get("xmp", "") | |
| if allow_mixed: | |
| lossless = False | |
| # Sensible keyframe defaults are from gif2webp.c script | |
| if kmin is None: | |
| kmin = 9 if lossless else 3 | |
| if kmax is None: | |
| kmax = 17 if lossless else 5 | |
| # Validate background color | |
| if ( | |
| not isinstance(background, (list, tuple)) | |
| or len(background) != 4 | |
| or not all(0 <= v < 256 for v in background) | |
| ): | |
| msg = f"Background color is not an RGBA tuple clamped to (0-255): {background}" | |
| raise OSError(msg) | |
| # Convert to packed uint | |
| bg_r, bg_g, bg_b, bg_a = background | |
| background = (bg_a << 24) | (bg_r << 16) | (bg_g << 8) | (bg_b << 0) | |
| # Setup the WebP animation encoder | |
| enc = _webp.WebPAnimEncoder( | |
| im.size, | |
| background, | |
| loop, | |
| minimize_size, | |
| kmin, | |
| kmax, | |
| allow_mixed, | |
| verbose, | |
| ) | |
| # Add each frame | |
| frame_idx = 0 | |
| timestamp = 0 | |
| cur_idx = im.tell() | |
| try: | |
| for ims in [im] + append_images: | |
| # Get number of frames in this image | |
| nfr = getattr(ims, "n_frames", 1) | |
| for idx in range(nfr): | |
| ims.seek(idx) | |
| frame = _convert_frame(ims) | |
| # Append the frame to the animation encoder | |
| enc.add( | |
| frame.getim(), | |
| round(timestamp), | |
| lossless, | |
| quality, | |
| alpha_quality, | |
| method, | |
| ) | |
| # Update timestamp and frame index | |
| if isinstance(duration, (list, tuple)): | |
| timestamp += duration[frame_idx] | |
| else: | |
| timestamp += duration | |
| frame_idx += 1 | |
| finally: | |
| im.seek(cur_idx) | |
| # Force encoder to flush frames | |
| enc.add(None, round(timestamp), lossless, quality, alpha_quality, 0) | |
| # Get the final output from the encoder | |
| data = enc.assemble(icc_profile, exif, xmp) | |
| if data is None: | |
| msg = "cannot write file as WebP (encoder returned None)" | |
| raise OSError(msg) | |
| fp.write(data) | |
| def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: | |
| lossless = im.encoderinfo.get("lossless", False) | |
| quality = im.encoderinfo.get("quality", 80) | |
| alpha_quality = im.encoderinfo.get("alpha_quality", 100) | |
| icc_profile = im.encoderinfo.get("icc_profile") or "" | |
| exif = im.encoderinfo.get("exif", b"") | |
| if isinstance(exif, Image.Exif): | |
| exif = exif.tobytes() | |
| if exif.startswith(b"Exif\x00\x00"): | |
| exif = exif[6:] | |
| xmp = im.encoderinfo.get("xmp", "") | |
| method = im.encoderinfo.get("method", 4) | |
| exact = 1 if im.encoderinfo.get("exact") else 0 | |
| im = _convert_frame(im) | |
| data = _webp.WebPEncode( | |
| im.getim(), | |
| lossless, | |
| float(quality), | |
| float(alpha_quality), | |
| icc_profile, | |
| method, | |
| exact, | |
| exif, | |
| xmp, | |
| ) | |
| if data is None: | |
| msg = "cannot write file as WebP (encoder returned None)" | |
| raise OSError(msg) | |
| fp.write(data) | |
| Image.register_open(WebPImageFile.format, WebPImageFile, _accept) | |
| if SUPPORTED: | |
| Image.register_save(WebPImageFile.format, _save) | |
| Image.register_save_all(WebPImageFile.format, _save_all) | |
| Image.register_extension(WebPImageFile.format, ".webp") | |
| Image.register_mime(WebPImageFile.format, "image/webp") | |
Xet Storage Details
- Size:
- 9.85 kB
- Xet hash:
- 63e77273d375a56eca9b0f7222c558454b2f84cd4e3efc9d81b1960039d897e9
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.