File size: 5,451 Bytes
3533d55 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | #!/usr/bin/env python3
import sys
import os.path
import math
import random
import hashlib
from PIL import Image
from ctypes import *
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
def prepare_message(text, password):
content_data = text.encode('utf-8')
enc = encrypt(content_data, password)
array=[]
for b in enc:
for i in range(8):
array.append((b >> i) & 1)
return array
def encrypt(plain_text, password):
salt = get_random_bytes(16)
private_key = hashlib.scrypt(
password.encode(), salt=salt, n=2**14, r=8, p=1, dklen=32)
cipher = AES.new(private_key, AES.MODE_GCM)
cipher_text, tag = cipher.encrypt_and_digest(plain_text)
# salt (16) + nonce (16) + tag (16) + ciphertext
enc = salt + cipher.nonce + tag + cipher_text
return enc
def decrypt(data, password, key=None):
salt = data[:16]
nonce = data[16:32]
tag = data[32:48]
cipher_text = data[48:]
if key is None:
key = hashlib.scrypt(
password.encode(), salt=salt, n=2**14, r=8, p=1, dklen=32)
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
decrypted = cipher.decrypt_and_verify(cipher_text, tag)
return decrypted
def embed(input_img_path, cost_matrix, msg_file_path, password, output_img_path):
me = os.path.abspath(os.path.dirname(__file__))
lib = cdll.LoadLibrary(os.path.join(me, "lib", "stc.so"))
# Prepare cover image
im=Image.open(input_img_path)
if im.mode in ['L']:
width, height = im.size
if im.mode in ['RGB', 'RGBA', 'RGBX']:
pass
I = im.load()
cover = (c_int*(width*height))()
idx=0
for j in range(height):
for i in range(width):
cover[idx] = I[i, j]
idx += 1
# Prepare costs
INF = 2**31-1
costs = (c_float*(width*height*3))()
idx=0
for j in range(height):
for i in range(width):
if cover[idx]==0:
costs[3*idx+0] = INF
costs[3*idx+1] = 0
costs[3*idx+2] = cost_matrix[j, i]
elif cover[idx]==255:
costs[3*idx+0] = cost_matrix[j, i]
costs[3*idx+1] = 0
costs[3*idx+2] = INF
else:
costs[3*idx+0] = cost_matrix[j, i]
costs[3*idx+1] = 0
costs[3*idx+2] = cost_matrix[j, i]
idx += 1
# Prepare message
PAYLOAD_RATES = [0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.10]
msg_bits = prepare_message(msg_file_path, password)
# Pick the smallest rate that fits the message
m = None
for rate in PAYLOAD_RATES:
candidate = int(width*height*rate)
if candidate >= len(msg_bits):
m = candidate
print(f"Selected payload rate: {rate} ({len(msg_bits)} bits into {m} capacity)")
break
if m is None:
print("Message too long")
sys.exit(0)
message = (c_ubyte*m)()
for i in range(len(msg_bits)):
message[i] = msg_bits[i]
for i in range(len(msg_bits), m):
message[i] = random.getrandbits(1)
# Hide message
stego = (c_int*(width*height))()
a = lib.stc_hide(width*height, cover, costs, m, message, stego)
# Save output message
idx=0
for j in range(height):
for i in range(width):
im.putpixel((i, j), stego[idx])
idx += 1
im.save(output_img_path)
im.close()
def extract(stego_img_path, password, output_msg_path):
PAYLOAD_RATES = [0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.10]
me = os.path.abspath(os.path.dirname(__file__))
lib = cdll.LoadLibrary(os.path.join(me, "lib", "stc.so"))
# Prepare stego image
im=Image.open(stego_img_path)
if im.mode in ['L']:
width, height = im.size
if im.mode in ['RGB', 'RGBA', 'RGBX']:
pass
I = im.load()
stego = (c_int*(width*height))()
idx=0
for j in range(height):
for i in range(width):
stego[idx] = I[i, j]
idx += 1
# Try each candidate rate until GCM tag verification succeeds
n = width*height
for rate in PAYLOAD_RATES:
m = int(n*rate)
extracted_message = (c_ubyte*m)()
lib.stc_unhide(n, stego, m, extracted_message)
# Convert bits to bytes
enc = bytearray()
bitidx=0
bitval=0
for b in extracted_message:
if bitidx==8:
enc.append(bitval)
bitidx=0
bitval=0
bitval |= b<<bitidx
bitidx+=1
if bitidx==8:
enc.append(bitval)
if len(enc) < 49:
continue # too short for salt+nonce+tag + 1 byte
# Derive key once per rate (scrypt is slow)
salt = bytes(enc[:16])
key = hashlib.scrypt(
password.encode(), salt=salt, n=2**14, r=8, p=1, dklen=32)
# Try different ciphertext lengths (GCM needs exact length)
for end in range(len(enc), 48, -1):
try:
cleartext = decrypt(bytes(enc[:end]), password, key=key)
f = open(output_msg_path, 'w')
f.write(cleartext.decode())
f.close()
return
except (ValueError, UnicodeDecodeError):
continue
raise ValueError('Unable to decode message: no valid payload rate found')
|