Spaces:
Running
Running
| import cv2 | |
| import numpy as np | |
| from PIL import Image, PngImagePlugin, ImageDraw | |
| import json | |
| from datetime import datetime | |
| from cryptography.fernet import Fernet | |
| import base64 | |
| import hashlib | |
| class WatermarkProcessor: | |
| def __init__(self, encryption_key=None): | |
| """Initialize with optional encryption key""" | |
| if encryption_key: | |
| self.fernet = Fernet(encryption_key) | |
| else: | |
| key = Fernet.generate_key() | |
| self.fernet = Fernet(key) | |
| def to_bin(self, data): | |
| """Convert data to binary format as string""" | |
| if isinstance(data, str): | |
| return ''.join(format(ord(char), '08b') for char in data) | |
| elif isinstance(data, bytes): | |
| return ''.join(format(x, '08b') for x in data) | |
| elif isinstance(data, np.ndarray): | |
| return [format(i, "08b") for i in data] | |
| elif isinstance(data, int) or isinstance(data, np.uint8): | |
| return format(data, "08b") | |
| else: | |
| raise TypeError("Type not supported.") | |
| def create_preview(self, image_path, watermark_text, opacity=0.3): | |
| """Create a preview of watermark on image""" | |
| try: | |
| image = Image.open(image_path) | |
| txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) | |
| draw = ImageDraw.Draw(txt_layer) | |
| # Calculate text position | |
| text_width = draw.textlength(watermark_text) | |
| text_x = (image.width - text_width) // 2 | |
| text_y = image.height // 2 | |
| # Add watermark text | |
| draw.text((text_x, text_y), watermark_text, | |
| fill=(255, 255, 255, int(255 * opacity))) | |
| # Combine layers | |
| preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) | |
| return preview | |
| except Exception as e: | |
| return None | |
| def png_encode(self, im_name, extra): | |
| """Encode watermark using PNG metadata""" | |
| try: | |
| im = Image.open(im_name) | |
| info = PngImagePlugin.PngInfo() | |
| info.add_text("TXT", extra) | |
| im.save("test.png", pnginfo=info) | |
| return "test.png", "Watermark added successfully" | |
| except Exception as e: | |
| return im_name, f"Error adding watermark: {str(e)}" | |
| def encode(self, image_path, watermark_text, metadata=None): | |
| """Encode watermark using simple LSB steganography with header. | |
| ํค๋(32๋นํธ)๋ watermark ๋ฐ์ดํฐ(JSON ๋ฌธ์์ด)์ ๊ธธ์ด(๋ฌธ์์)๋ฅผ ์ด์ง ๋ฌธ์์ด๋ก ์ ์ฅํฉ๋๋ค. | |
| """ | |
| try: | |
| image = cv2.imread(image_path) | |
| if image is None: | |
| raise ValueError("Could not read image file") | |
| # Prepare watermark data | |
| data = { | |
| 'text': watermark_text, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'metadata': metadata or {} | |
| } | |
| # Convert data to string (UTF-8) | |
| json_str = json.dumps(data, ensure_ascii=False) | |
| # ํค๋: 32๋นํธ์ ๋ฐ์ดํฐ ๊ธธ์ด(๋ฌธ์์)๋ฅผ ์ ์ฅ | |
| data_length = len(json_str) | |
| header = format(data_length, '032b') | |
| # ๋ณธ๋ฌธ: ๊ฐ ๋ฌธ์๋ฅผ 8๋นํธ ์ด์ง์๋ก ๋ณํ | |
| body = ''.join(format(ord(char), '08b') for char in json_str) | |
| binary_data = header + body | |
| # Check capacity | |
| if len(binary_data) > image.shape[0] * image.shape[1] * 3: | |
| return image_path, "Error: Image too small for watermark data" | |
| # Embed data into LSB of each pixel | |
| data_index = 0 | |
| for i in range(image.shape[0]): | |
| for j in range(image.shape[1]): | |
| for k in range(3): | |
| if data_index < len(binary_data): | |
| pixel = int(image[i, j, k]) | |
| # Clear the LSB | |
| pixel = pixel & 0xFE | |
| # Set the LSB according to our data bit | |
| pixel = pixel | (int(binary_data[data_index]) & 1) | |
| image[i, j, k] = np.uint8(pixel) | |
| data_index += 1 | |
| else: | |
| break | |
| if data_index >= len(binary_data): | |
| break | |
| if data_index >= len(binary_data): | |
| break | |
| # Save result image (PNG: lossless) | |
| output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
| cv2.imwrite(output_path, image) | |
| return output_path, "Watermark added successfully" | |
| except Exception as e: | |
| return image_path, f"Error in encoding: {str(e)}" | |
| def decode(self, image_path): | |
| """Decode watermark using simple LSB steganography with header. | |
| ๋จผ์ 32๋นํธ(ํค๋)๋งํผ ์ฝ์ด ๋ฐ์ดํฐ ๊ธธ์ด(๋ฌธ์์)๋ฅผ ๊ตฌํ ํ, ๊ทธ ๊ธธ์ด์ ํด๋นํ๋ ๋ณธ๋ฌธ ๋นํธ๋ง ์ฝ์ด ๋ฌธ์์ด๋ก ๋ณต์ํฉ๋๋ค. | |
| """ | |
| try: | |
| # PNG ๋ฉํ๋ฐ์ดํฐ ๋จผ์ ํ์ธ | |
| try: | |
| im = Image.open(image_path) | |
| if "TXT" in im.info: | |
| return im.info["TXT"] | |
| except: | |
| pass | |
| image = cv2.imread(image_path) | |
| if image is None: | |
| raise ValueError("Could not read image file") | |
| bits = [] | |
| total_needed = None | |
| count = 0 | |
| # ํฝ์ ์ํ: ํ์ํ ๋นํธ ์๋งํผ ์ฝ์ด์ค๊ธฐ | |
| for i in range(image.shape[0]): | |
| for j in range(image.shape[1]): | |
| for k in range(3): | |
| bits.append(str(image[i, j, k] & 1)) | |
| count += 1 | |
| # ํค๋ 32๋นํธ๋ฅผ ๋ชจ๋ ์ฝ์ ๊ฒฝ์ฐ | |
| if count == 32 and total_needed is None: | |
| header = ''.join(bits[:32]) | |
| data_length = int(header, 2) | |
| total_needed = 32 + data_length * 8 | |
| if total_needed is not None and count >= total_needed: | |
| break | |
| if total_needed is not None and count >= total_needed: | |
| break | |
| if total_needed is not None and count >= total_needed: | |
| break | |
| if total_needed is None: | |
| return "Error: Not enough data to read header" | |
| binary_data = ''.join(bits[:total_needed]) | |
| header = binary_data[:32] | |
| data_length = int(header, 2) | |
| total_bits = data_length * 8 | |
| message_bits = binary_data[32:32 + total_bits] | |
| text = '' | |
| for i in range(0, len(message_bits), 8): | |
| byte = message_bits[i:i+8] | |
| text += chr(int(byte, 2)) | |
| try: | |
| data = json.loads(text) | |
| return json.dumps(data, ensure_ascii=False, indent=2) | |
| except json.JSONDecodeError: | |
| return text | |
| except Exception as e: | |
| return f"Error in decoding: {str(e)}" | |
| def analyze_quality(self, original_path, watermarked_path): | |
| """Analyze watermark quality""" | |
| try: | |
| original = cv2.imread(original_path) | |
| watermarked = cv2.imread(watermarked_path) | |
| if original is None or watermarked is None: | |
| raise ValueError("Could not read image files") | |
| # Calculate PSNR | |
| mse = np.mean((original - watermarked) ** 2) | |
| if mse == 0: | |
| psnr = float('inf') | |
| else: | |
| psnr = 20 * np.log10(255.0 / np.sqrt(mse)) | |
| # Calculate histogram similarity | |
| hist_original = cv2.calcHist([original], [0], None, [256], [0, 256]) | |
| hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256]) | |
| hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL) | |
| # Count modified pixels | |
| diff = cv2.bitwise_xor(original, watermarked) | |
| modified_pixels = np.count_nonzero(diff) | |
| report = { | |
| 'psnr': round(psnr, 2), | |
| 'histogram_similarity': round(hist_correlation, 4), | |
| 'modified_pixels': modified_pixels, | |
| 'image_size': original.shape, | |
| 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100 | |
| } | |
| return json.dumps(report, indent=2) | |
| except Exception as e: | |
| return f"Error in quality analysis: {str(e)}" | |