Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| import os | |
| import io | |
| import pathlib | |
| import argparse | |
| import filetype | |
| import numpy as np | |
| from imwatermark import WatermarkEncoder, WatermarkDecoder | |
| from PIL import Image | |
| from PIL.ExifTags import TAGS | |
| from PIL.TiffImagePlugin import ImageFileDirectory_v2 | |
| from util import log, Map | |
| import piexif | |
| import piexif.helper | |
| options = Map({ 'method': 'dwtDctSvd', 'type': 'bytes' }) | |
| def get_exif(image): | |
| # using piexif | |
| res1 = {} | |
| try: | |
| exif = piexif.load(image.info["exif"]) | |
| exif = exif.get("Exif", {}) | |
| for k, v in exif.items(): | |
| key = list(vars(piexif.ExifIFD).keys())[list(vars(piexif.ExifIFD).values()).index(k)] | |
| res1[key] = piexif.helper.UserComment.load(v) | |
| except Exception: | |
| pass | |
| # using pillow | |
| res2 = {} | |
| try: | |
| res2 = { TAGS[k]: v for k, v in image.getexif().items() if k in TAGS } | |
| except Exception: | |
| pass | |
| return {**res1, **res2} | |
| def set_exif(d: dict): | |
| ifd = ImageFileDirectory_v2() | |
| _TAGS = {v: k for k, v in TAGS.items()} # enumerate possible exif tags | |
| for k, v in d.items(): | |
| ifd[_TAGS[k]] = v | |
| exif_stream = io.BytesIO() | |
| ifd.save(exif_stream) | |
| encoded = b'Exif\x00\x00' + exif_stream.getvalue() | |
| return encoded | |
| def get_watermark(image, params): | |
| data = np.asarray(image) | |
| decoder = WatermarkDecoder(options.type, params.length) | |
| decoded = decoder.decode(data, options.method) | |
| wm = decoded.decode(encoding='ascii', errors='ignore') | |
| return wm | |
| def set_watermark(image, params): | |
| data = np.asarray(image) | |
| encoder = WatermarkEncoder() | |
| length = params.length // 8 | |
| text = f"{params.wm:<{length}}"[:length] | |
| bytearr = text.encode(encoding='ascii', errors='ignore') | |
| encoder.set_watermark(options.type, bytearr) | |
| encoded = encoder.encode(data, options.method) | |
| image = Image.fromarray(encoded) | |
| return image | |
| def watermark(params, file): | |
| if not os.path.exists(file): | |
| log.error({ 'watermark': 'file not found' }) | |
| return | |
| if not filetype.is_image(file): | |
| log.error({ 'watermark': 'file is not an image' }) | |
| return | |
| image = Image.open(file) | |
| if image.width * image.height < 256 * 256: | |
| log.error({ 'watermark': 'image too small' }) | |
| return | |
| exif = get_exif(image) | |
| if params.command == 'read': | |
| fn = params.input | |
| wm = get_watermark(image, params) | |
| elif params.command == 'write': | |
| metadata = b'' if params.strip else set_exif(exif) | |
| if params.output != '': | |
| pathlib.Path(params.output).mkdir(parents = True, exist_ok = True) | |
| image=set_watermark(image, params) | |
| fn = os.path.join(params.output, file) | |
| image.save(fn, exif=metadata) | |
| if params.verify: | |
| image = Image.open(fn) | |
| data = np.asarray(image) | |
| decoder = WatermarkDecoder(options.type, params.length) | |
| decoded = decoder.decode(data, options.method) | |
| wm = decoded.decode(encoding='ascii', errors='ignore') | |
| else: | |
| wm = params.wm | |
| log.info({ 'file': fn }) | |
| log.info({ 'resolution': f'{image.width}x{image.height}' }) | |
| log.info({ 'watermark': wm }) | |
| log.info({ 'exif': None if params.strip else exif }) | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser(description = 'image watermarking') | |
| parser.add_argument('command', choices = ['read', 'write']) | |
| parser.add_argument('--wm', type=str, required=False, default='sdnext', help='watermark string') | |
| parser.add_argument('--strip', default=False, action='store_true', help = "strip existing exif data") | |
| parser.add_argument('--verify', default=False, action='store_true', help = "verify watermark during write") | |
| parser.add_argument('--length', type=int, default=32, help="watermark length in bits") | |
| parser.add_argument('--output', type=str, required=False, default='', help='folder to store images, default is overwrite in-place') | |
| parser.add_argument('input', type=str, nargs='*') | |
| args = parser.parse_args() | |
| # log.info({ 'watermark args': vars(args), 'options': options }) | |
| for arg in args.input: | |
| if os.path.isfile(arg): | |
| watermark(args, arg) | |
| elif os.path.isdir(arg): | |
| for root, _dirs, files in os.walk(arg): | |
| for f in files: | |
| watermark(args, os.path.join(root, f)) | |