| from PIL import Image |
| import io |
| import json |
| import base64 |
| from webpass.crypto_utils import generate_key, encrypt_password, decrypt_password |
|
|
| |
| def str_to_bin(message): |
| return ''.join(format(ord(c), '08b') for c in message) |
|
|
| def bin_to_str(binary): |
| chars = [binary[i:i+8] for i in range(0, len(binary), 8)] |
| return ''.join(chr(int(c, 2)) for c in chars) |
|
|
| |
|
|
| def encode_data(cover_image, data, password, filename=None): |
| """ |
| Universal GCM-Stego Encoder: Handles both TEXT and FILES. |
| 1. Wraps data in a JSON payload. |
| 2. Encrypts the JSON string using AES-GCM-256. |
| 3. Embeds authenticated ciphertext into pixels. |
| """ |
| |
| if filename: |
| |
| b64_data = base64.b64encode(data.read()).decode('utf-8') |
| payload_dict = { |
| "type": "file", |
| "filename": filename, |
| "content": b64_data |
| } |
| else: |
| |
| payload_dict = { |
| "type": "text", |
| "content": data |
| } |
|
|
| |
| raw_payload = json.dumps(payload_dict) |
|
|
| |
| |
| static_salt = b'WebPass_Stego_Salt_GCM' |
| key = generate_key(password, static_salt) |
| encrypted_payload = encrypt_password(raw_payload, key) |
|
|
| |
| final_message = encrypted_payload + "#####END#####" |
| binary_message = str_to_bin(final_message) |
| data_len = len(binary_message) |
|
|
| |
| img = Image.open(cover_image) |
| img = img.convert("RGB") |
| pixels = list(img.getdata()) |
| |
| max_capacity = len(pixels) * 3 |
| if data_len > max_capacity: |
| raise ValueError(f"Payload too large for this image! Needs {data_len} bits.") |
|
|
| new_pixels = [] |
| idx = 0 |
|
|
| |
| for pixel in pixels: |
| r, g, b = pixel |
| if idx < data_len: |
| r = int(format(r, '08b')[:-1] + binary_message[idx], 2) |
| idx += 1 |
| if idx < data_len: |
| g = int(format(g, '08b')[:-1] + binary_message[idx], 2) |
| idx += 1 |
| if idx < data_len: |
| b = int(format(b, '08b')[:-1] + binary_message[idx], 2) |
| idx += 1 |
| new_pixels.append((r, g, b)) |
|
|
| img.putdata(new_pixels) |
| |
| output = io.BytesIO() |
| img.save(output, format="PNG") |
| output.seek(0) |
| return output |
|
|
| def decode_data(stego_image, password): |
| """ |
| Universal GCM-Stego Decoder. |
| Returns a dictionary: {"type": "text"|"file", "content": ..., "filename": ...} |
| """ |
| img = Image.open(stego_image) |
| img = img.convert("RGB") |
| pixels = list(img.getdata()) |
|
|
| binary_data = "" |
| encrypted_payload = "" |
|
|
| |
| chars = [] |
| bit_buffer = "" |
| for pixel in pixels: |
| for val in pixel: |
| bit_buffer += format(val, '08b')[-1] |
| if len(bit_buffer) == 8: |
| char = chr(int(bit_buffer, 2)) |
| chars.append(char) |
| bit_buffer = "" |
| |
| if len(chars) >= 13 and "".join(chars[-13:]) == "#####END#####": |
| encrypted_payload = "".join(chars[:-13]) |
| break |
| if encrypted_payload: |
| break |
| |
| if not encrypted_payload: |
| raise ValueError("No hidden payload found in image pixels.") |
|
|
| |
| try: |
| static_salt = b'WebPass_Stego_Salt_GCM' |
| key = generate_key(password, static_salt) |
| |
| json_payload = decrypt_password(encrypted_payload, key) |
| except Exception: |
| raise ValueError("Incorrect password or image data has been corrupted.") |
|
|
| |
| try: |
| data = json.loads(json_payload) |
| |
| if data['type'] == 'file': |
| file_bytes = base64.b64decode(data['content']) |
| return { |
| "type": "file", |
| "filename": data['filename'], |
| "file_bytes": io.BytesIO(file_bytes) |
| } |
| else: |
| return { |
| "type": "text", |
| "content": data['content'] |
| } |
| except json.JSONDecodeError: |
| return {"type": "text", "content": json_payload} |