|
|
import os, struct, math |
|
|
import numpy as np |
|
|
from PIL import Image, ImageDraw |
|
|
|
|
|
PHI = (1.0 + 5.0**0.5) / 2.0 |
|
|
MAGIC = b"FLC1\x00\x00\x00\x00" |
|
|
VER_V2 = 2 |
|
|
|
|
|
|
|
|
def fibonacci_sequence(n): |
|
|
fibs = [1, 2] |
|
|
while len(fibs) < n: |
|
|
fibs.append(fibs[-1] + fibs[-2]) |
|
|
return np.array(fibs[:n], dtype=np.int64) |
|
|
|
|
|
def fibonacci_sequence_std(n): |
|
|
fibs = [1, 1] |
|
|
while len(fibs) < n: |
|
|
fibs.append(fibs[-1] + fibs[-2]) |
|
|
return np.array(fibs[:n], dtype=np.int64) |
|
|
|
|
|
def fibonacci_frequency_boundaries(n_coeffs: int, n_bands: int): |
|
|
if n_bands < 2: return [0, n_coeffs] |
|
|
fibs = fibonacci_sequence(n_bands).astype(np.float64) |
|
|
w = fibs / (fibs.sum() + 1e-12) |
|
|
cum = np.cumsum(w) |
|
|
b = [0] |
|
|
for i in range(n_bands - 1): |
|
|
b.append(int(round(n_coeffs * cum[i]))) |
|
|
b.append(n_coeffs) |
|
|
|
|
|
for i in range(1, len(b)): |
|
|
if b[i] <= b[i-1]: b[i] = b[i-1] + 1 |
|
|
return b |
|
|
|
|
|
|
|
|
def dct_ortho_1d(x: np.ndarray) -> np.ndarray: |
|
|
N = x.shape[0] |
|
|
v = np.concatenate([x, x[::-1]]) |
|
|
V = np.fft.fft(v) |
|
|
k = np.arange(N) |
|
|
X = np.real(V[:N] * np.exp(-1j * np.pi * k / (2 * N))) |
|
|
X *= 2.0 |
|
|
X[0] *= (1.0 / math.sqrt(4 * N)) |
|
|
X[1:] *= (1.0 / math.sqrt(2 * N)) |
|
|
return X |
|
|
|
|
|
def idct_ortho_1d(X: np.ndarray) -> np.ndarray: |
|
|
N = X.shape[0] |
|
|
x0, xr = X[0] * math.sqrt(4 * N), X[1:] * math.sqrt(2 * N) |
|
|
c = np.empty(N, dtype=np.complex128) |
|
|
c[0], c[1:] = x0 / 2.0, xr / 2.0 |
|
|
k = np.arange(N) |
|
|
c = c * np.exp(1j * np.pi * k / (2 * N)) |
|
|
V = np.zeros(2 * N, dtype=np.complex128) |
|
|
V[:N] = c |
|
|
V[N+1:] = np.conj(c[1:][::-1]) |
|
|
return np.fft.ifft(V).real[:N] |
|
|
|
|
|
def dct_blocks_ortho(x_blocks: np.ndarray) -> np.ndarray: |
|
|
return np.array([dct_ortho_1d(b) for b in x_blocks]) |
|
|
|
|
|
def idct_blocks_ortho(X_blocks: np.ndarray) -> np.ndarray: |
|
|
return np.array([idct_ortho_1d(B) for B in X_blocks]) |
|
|
|
|
|
|
|
|
class BitWriter: |
|
|
def __init__(self): |
|
|
self.buf, self.acc, self.nbits = bytearray(), 0, 0 |
|
|
def write_bit(self, b: int): |
|
|
self.acc = (self.acc << 1) | (b & 1) |
|
|
self.nbits += 1 |
|
|
if self.nbits == 8: |
|
|
self.buf.append(self.acc); self.acc = 0; self.nbits = 0 |
|
|
def finish(self): |
|
|
if self.nbits: self.buf.append(self.acc << (8 - self.nbits)) |
|
|
return bytes(self.buf) |
|
|
|
|
|
class BitReader: |
|
|
def __init__(self, data: bytes): |
|
|
self.data, self.i, self.acc, self.nbits = data, 0, 0, 0 |
|
|
def read_bit(self): |
|
|
if self.nbits == 0: |
|
|
self.acc = self.data[self.i]; self.i += 1; self.nbits = 8 |
|
|
b = (self.acc >> (self.nbits - 1)) & 1 |
|
|
self.nbits -= 1 |
|
|
return b |
|
|
|
|
|
def fib_encode_nonneg(bw, n): |
|
|
m = int(n) + 1 |
|
|
fibs = [1, 2] |
|
|
while fibs[-1] <= m: fibs.append(fibs[-1] + fibs[-2]) |
|
|
bits = [0] * (len(fibs) - 1) |
|
|
for i in reversed(range(len(bits))): |
|
|
if fibs[i] <= m: bits[i] = 1; m -= fibs[i] |
|
|
for i in range(max((i for i, b in enumerate(bits) if b), default=0) + 1): |
|
|
bw.write_bit(bits[i]) |
|
|
bw.write_bit(1) |
|
|
|
|
|
def fib_decode_nonneg(br): |
|
|
fibs, bits, prev = [1, 2], [], 0 |
|
|
while True: |
|
|
b = br.read_bit(); bits.append(b) |
|
|
if prev == 1 and b == 1: break |
|
|
prev = b |
|
|
if len(bits) > len(fibs): fibs.append(fibs[-1] + fibs[-2]) |
|
|
m = sum(fibs[i] for i, bi in enumerate(bits[:-1]) if bi) |
|
|
return m - 1 |
|
|
|
|
|
def rle_fib_encode_ints(ints): |
|
|
bw = BitWriter() |
|
|
zrun = 0 |
|
|
for v in ints: |
|
|
if v == 0: zrun += 1; continue |
|
|
if zrun: bw.write_bit(0); fib_encode_nonneg(bw, zrun); zrun = 0 |
|
|
bw.write_bit(1); fib_encode_nonneg(bw, (v << 1) ^ (v >> 63)) |
|
|
if zrun: bw.write_bit(0); fib_encode_nonneg(bw, zrun) |
|
|
return bw.finish() |
|
|
|
|
|
def rle_fib_decode_ints(payload, n_out): |
|
|
br, out, i = BitReader(payload), np.zeros(n_out, dtype=np.int64), 0 |
|
|
while i < n_out: |
|
|
if br.read_bit() == 0: i = min(n_out, i + fib_decode_nonneg(br)) |
|
|
else: |
|
|
u = fib_decode_nonneg(br) |
|
|
out[i] = (u >> 1) ^ (-(u & 1)); i += 1 |
|
|
return out |
|
|
|
|
|
|
|
|
def band_quantize_dct(coeffs, boundaries, base_step): |
|
|
q = np.zeros_like(coeffs, dtype=np.int32) |
|
|
for bi in range(len(boundaries) - 1): |
|
|
a, b = boundaries[bi], boundaries[bi + 1] |
|
|
step = base_step * (PHI ** bi) |
|
|
q[:, a:b] = np.round(coeffs[:, a:b] / step) |
|
|
return q |
|
|
|
|
|
def band_dequantize_dct(q, boundaries, base_step): |
|
|
coeffs = np.zeros_like(q, dtype=np.float64) |
|
|
for bi in range(len(boundaries) - 1): |
|
|
a, b = boundaries[bi], boundaries[bi + 1] |
|
|
step = base_step * (PHI ** bi) |
|
|
coeffs[:, a:b] = q[:, a:b] * step |
|
|
return coeffs |
|
|
|
|
|
def hologram_spectrum_image(zints, max_symbols=262144): |
|
|
z = zints[:max_symbols]; v = np.tanh(z / 32.0) |
|
|
theta = (2 * math.pi / (PHI**2)) * np.arange(v.size) + 2.0 * math.pi * (v * 0.25) |
|
|
r = 1.0 + 0.35 * np.abs(v) |
|
|
syms = r * np.cos(theta) + 1j * r * np.sin(theta) |
|
|
N = int(2**math.ceil(math.log2(math.sqrt(syms.size or 1)))) |
|
|
U = np.pad(syms, (0, N*N - syms.size)).reshape(N, N) |
|
|
mag = np.log1p(np.abs(np.fft.fftshift(np.fft.fft2(U)))) |
|
|
mag = (mag - mag.min()) / (mag.max() - mag.min() + 1e-12) |
|
|
return (mag * 255).astype(np.uint8) |
|
|
|
|
|
def bytes_to_fib_spiral_image(data, max_pixels=262144): |
|
|
arr = np.frombuffer(data, dtype=np.uint8)[:max_pixels] |
|
|
fibs = fibonacci_sequence_std(32) |
|
|
sizes, area = [], 0 |
|
|
for s in fibs: |
|
|
sizes.append(int(s)); area += s*s |
|
|
if area >= arr.size: break |
|
|
|
|
|
tiles, minx, miny, maxx, maxy = [], 0, 0, 0, 0 |
|
|
curr_x, curr_y = 0, 0 |
|
|
for i, s in enumerate(sizes): |
|
|
d = (i-1)%4 |
|
|
if i>0: |
|
|
if d==0: curr_x = maxx; curr_y = miny |
|
|
elif d==1: curr_x = maxx-s; curr_y = maxy |
|
|
elif d==2: curr_x = minx-s; curr_y = maxy-s |
|
|
else: curr_x = minx; curr_y = miny-s |
|
|
tiles.append((curr_x, curr_y, s)) |
|
|
minx, miny = min(minx, curr_x), min(miny, curr_y) |
|
|
maxx, maxy = max(maxx, curr_x+s), max(maxy, curr_y+s) |
|
|
|
|
|
W, H = maxx-minx, maxy-miny |
|
|
img = np.zeros((H, W), dtype=np.uint8) |
|
|
idx = 0 |
|
|
for x, y, s in tiles: |
|
|
take = min(s*s, arr.size - idx) |
|
|
if take <= 0: break |
|
|
block = np.pad(arr[idx:idx+take], (0, s*s-take)).reshape(s, s) |
|
|
img[H-(y-miny+s):H-(y-miny), x-minx:x-minx+s] = block |
|
|
idx += take |
|
|
return img, tiles, (minx, miny, maxx, maxy) |
|
|
|
|
|
|
|
|
def flc_encode_file(in_path, out_flc, preview_png=None, unzip_gif=None, block_len=1024, n_bands=10, base_step=0.004, **kwargs): |
|
|
raw = open(in_path, "rb").read() |
|
|
x = (np.frombuffer(raw, dtype=np.uint8).astype(np.float64) - 127.5) / 127.5 |
|
|
pad = (-x.size) % block_len |
|
|
X = np.pad(x, (0, pad)).reshape(-1, block_len) |
|
|
C = dct_blocks_ortho(X) |
|
|
bnds = fibonacci_frequency_boundaries(block_len, n_bands) |
|
|
Q = band_quantize_dct(C, bnds, base_step) |
|
|
payload = rle_fib_encode_ints(np.diff(Q.flatten(), prepend=0)) |
|
|
|
|
|
header = struct.pack("<8sH Q I I H d d H", MAGIC, VER_V2, len(raw), block_len, X.shape[0], n_bands, base_step, 127.5, len(bnds)) |
|
|
with open(out_flc, "wb") as f: |
|
|
f.write(header); f.write(struct.pack("<"+ "I"*len(bnds), *bnds)) |
|
|
f.write(struct.pack("<I", len(payload))); f.write(payload) |
|
|
|
|
|
if unzip_gif: |
|
|
frames = [] |
|
|
for t in range(1, n_bands + 1): |
|
|
Q_p = np.zeros_like(Q) |
|
|
for bi in range(t): Q_p[:, bnds[bi]:bnds[bi+1]] = Q[:, bnds[bi]:bnds[bi+1]] |
|
|
X_p = idct_blocks_ortho(band_dequantize_dct(Q_p, bnds, base_step)) |
|
|
recon = np.clip((X_p.flatten()[:len(raw)] * 127.5) + 127.5, 0, 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
h_img = Image.fromarray(hologram_spectrum_image(Q_p.flatten())).resize((256, 256)) |
|
|
s_img_arr, _, _ = bytes_to_fib_spiral_image(recon.tobytes()) |
|
|
s_img = Image.fromarray(s_img_arr).resize((256, 256)) |
|
|
|
|
|
frame = Image.new("RGB", (512, 280), (10, 10, 15)) |
|
|
frame.paste(h_img, (0, 24)); frame.paste(s_img, (256, 24)) |
|
|
frames.append(frame) |
|
|
frames[0].save(unzip_gif, save_all=True, append_images=frames[1:], duration=100, loop=0) |
|
|
|
|
|
return {"n_bytes": len(raw), "payload_len": len(payload), "ratio": len(payload)/len(raw)} |
|
|
|
|
|
def flc_decode_file(in_flc, out_path): |
|
|
blob = open(in_flc, "rb").read() |
|
|
h_sz = struct.calcsize("<8sH Q I I H d d H") |
|
|
magic, ver, n_bytes, b_len, n_blks, n_bnds, step, mu, bnd_l = struct.unpack_from("<8sH Q I I H d d H", blob) |
|
|
off = h_sz |
|
|
bnds = struct.unpack_from("<" + "I"*bnd_l, blob, off); off += 4*bnd_l |
|
|
p_len = struct.unpack_from("<I", blob, off)[0]; off += 4 |
|
|
d = rle_fib_decode_ints(blob[off:off+p_len], n_blks * b_len) |
|
|
Q = np.cumsum(d).reshape(n_blks, b_len) |
|
|
X = idct_blocks_ortho(band_dequantize_dct(Q, bnds, step)) |
|
|
res = np.clip((X.flatten()[:n_bytes] * 127.5) + mu, 0, 255).astype(np.uint8) |
|
|
with open(out_path, "wb") as f: f.write(res.tobytes()) |
|
|
return {"n_bytes": n_bytes} |
|
|
|
|
|
def cosine_similarity_bytes(a, b): |
|
|
x = np.frombuffer(a, dtype=np.uint8).astype(float) |
|
|
y = np.frombuffer(b, dtype=np.uint8).astype(float) |
|
|
n = min(len(x), len(y)) |
|
|
x, y = x[:n]-x[:n].mean(), y[:n]-y[:n].mean() |
|
|
return np.dot(x, y) / (np.linalg.norm(x) * np.linalg.norm(y) + 1e-12) |