Create app.py
Browse files
app.py
ADDED
|
@@ -0,0 +1,200 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import os
|
| 2 |
+
import math
|
| 3 |
+
import hashlib
|
| 4 |
+
import numpy as np
|
| 5 |
+
import cv2
|
| 6 |
+
from PIL import Image
|
| 7 |
+
from tqdm import tqdm
|
| 8 |
+
import gradio as gr
|
| 9 |
+
|
| 10 |
+
# Constants
|
| 11 |
+
PADDING_TOKEN = b"<PAD>"
|
| 12 |
+
CHUNK_SIZE = 1000 * 1024 # 758 KB chunk size
|
| 13 |
+
FRAMES_DIR = './frames' # Directory for saving images
|
| 14 |
+
WIDTH, HEIGHT = 1920, 1080 # Image resolution
|
| 15 |
+
REQUIRED_LENGTH = WIDTH * HEIGHT # Number of bits for 1920x1080 resolution
|
| 16 |
+
VIDEO_PATH = './output_video.mp4' # Path to save the video
|
| 17 |
+
|
| 18 |
+
# Ensure frames directory exists
|
| 19 |
+
if not os.path.exists(FRAMES_DIR):
|
| 20 |
+
os.makedirs(FRAMES_DIR)
|
| 21 |
+
|
| 22 |
+
def encode(file_path):
|
| 23 |
+
"""Encode a file by splitting it into chunks, converting those chunks to images, and creating a video."""
|
| 24 |
+
if not os.path.exists('./parts'):
|
| 25 |
+
os.makedirs('./parts')
|
| 26 |
+
file_size = os.path.getsize(file_path)
|
| 27 |
+
num_chunks = math.ceil(file_size / CHUNK_SIZE)
|
| 28 |
+
# Split the file into chunks
|
| 29 |
+
with open(file_path, 'rb') as f:
|
| 30 |
+
for i in tqdm(range(num_chunks), desc="Splitting file", unit="chunk"):
|
| 31 |
+
chunk = f.read(CHUNK_SIZE)
|
| 32 |
+
# If it's the last chunk and smaller than chunk_size, pad it
|
| 33 |
+
if len(chunk) < CHUNK_SIZE:
|
| 34 |
+
padding_size = CHUNK_SIZE - len(chunk)
|
| 35 |
+
padding = np.full(padding_size, PADDING_TOKEN, dtype='S1') # Use NumPy to create padding
|
| 36 |
+
chunk = np.concatenate((np.frombuffer(chunk, dtype='S1'), padding))
|
| 37 |
+
chunk_file_path = os.path.join('./parts', f"{i}.part")
|
| 38 |
+
with open(chunk_file_path, 'wb') as chunk_file:
|
| 39 |
+
chunk_file.write(chunk)
|
| 40 |
+
# Convert each chunk to an image
|
| 41 |
+
chunk_files = [os.path.join('./parts', f"{i}.part") for i in range(num_chunks)]
|
| 42 |
+
for i, chunk_file in tqdm(enumerate(chunk_files), desc="Converting chunks to images", unit="chunk"):
|
| 43 |
+
convert_part_to_image(chunk_file, i)
|
| 44 |
+
# Create a video from the frames
|
| 45 |
+
create_video_from_frames(FRAMES_DIR, VIDEO_PATH)
|
| 46 |
+
return VIDEO_PATH
|
| 47 |
+
|
| 48 |
+
def decode(video_path, output_file_path):
|
| 49 |
+
"""Decode a video back to the original file by extracting frames, converting them back to chunks, and merging them."""
|
| 50 |
+
# Extract frames from the video
|
| 51 |
+
extracted_frames_dir = './extracted_frames'
|
| 52 |
+
extract_frames_from_video(video_path, extracted_frames_dir)
|
| 53 |
+
# List all chunk files
|
| 54 |
+
extracted_frame_files = sorted([os.path.join(extracted_frames_dir, f) for f in os.listdir(extracted_frames_dir) if f.endswith('.png')])
|
| 55 |
+
# Convert frames back to chunk files
|
| 56 |
+
chunk_files = []
|
| 57 |
+
for i, frame_file in tqdm(enumerate(extracted_frame_files), desc="Converting frames to chunks", unit="frame"):
|
| 58 |
+
binary_string = hd_rgb_image_to_binary(frame_file)
|
| 59 |
+
chunk_file_path = os.path.join('./parts', f"{i}.part")
|
| 60 |
+
with open(chunk_file_path, 'wb') as chunk_file:
|
| 61 |
+
chunk_file.write(int(binary_string, 2).to_bytes(len(binary_string) // 8, byteorder='big'))
|
| 62 |
+
chunk_files.append(chunk_file_path)
|
| 63 |
+
# Merge the chunks into the original file
|
| 64 |
+
merge_chunks(chunk_files, output_file_path)
|
| 65 |
+
# Verify file integrity
|
| 66 |
+
original_hash = calculate_sha256(output_file_path)
|
| 67 |
+
return f"SHA-256 hash of the decoded file: {original_hash}"
|
| 68 |
+
|
| 69 |
+
def binary_string_to_rgb_image(binary_string, width=1920, height=1080, output_path='output_rgb.png'):
|
| 70 |
+
"""Convert a binary string to an RGB image and save it."""
|
| 71 |
+
if len(binary_string) != REQUIRED_LENGTH * 3: # 3 bits per pixel for RGB
|
| 72 |
+
raise ValueError(f"Binary string must be exactly {REQUIRED_LENGTH * 3} bits for {width}x{height} resolution")
|
| 73 |
+
# Create a new RGB image
|
| 74 |
+
image = Image.new('RGB', (width, height))
|
| 75 |
+
# Convert binary string to RGB pixel values
|
| 76 |
+
pixels = image.load()
|
| 77 |
+
for y in range(height):
|
| 78 |
+
for x in range(width):
|
| 79 |
+
index = (y * width + x) * 3 # Each pixel has 3 bits
|
| 80 |
+
r = int(binary_string[index:index+1], 2) * 255 # 0 or 255
|
| 81 |
+
g = int(binary_string[index+1:index+2], 2) * 255 # 0 or 255
|
| 82 |
+
b = int(binary_string[index+2:index+3], 2) * 255 # 0 or 255
|
| 83 |
+
pixels[x, y] = (r, g, b)
|
| 84 |
+
# Save the image
|
| 85 |
+
image.save(output_path)
|
| 86 |
+
|
| 87 |
+
def hd_rgb_image_to_binary(image_path):
|
| 88 |
+
"""Convert an RGB image back to a binary string."""
|
| 89 |
+
image = Image.open(image_path).convert('RGB')
|
| 90 |
+
width, height = image.size
|
| 91 |
+
binary_string = ""
|
| 92 |
+
pixels = image.load()
|
| 93 |
+
for y in range(height):
|
| 94 |
+
for x in range(width):
|
| 95 |
+
r, g, b = pixels[x, y]
|
| 96 |
+
binary_string += '1' if r == 255 else '0'
|
| 97 |
+
binary_string += '1' if g == 255 else '0'
|
| 98 |
+
binary_string += '1' if b == 255 else '0'
|
| 99 |
+
return binary_string
|
| 100 |
+
|
| 101 |
+
def convert_part_to_image(chunk_file_path, part_index, width=1920, height=1080):
|
| 102 |
+
"""Convert a chunk file to an image and save it in the /frames directory."""
|
| 103 |
+
with open(chunk_file_path, 'rb') as chunk_file:
|
| 104 |
+
chunk_data = chunk_file.read()
|
| 105 |
+
# Convert the chunk data into a binary string
|
| 106 |
+
binary_string = ''.join([f'{byte:08b}' for byte in chunk_data]) # 8 bits per byte
|
| 107 |
+
# Pad or truncate the binary string to match the required length for the image
|
| 108 |
+
binary_string = binary_string[:REQUIRED_LENGTH * 3].ljust(REQUIRED_LENGTH * 3, '0') # Truncate or pad with '0'
|
| 109 |
+
image_path = os.path.join(FRAMES_DIR, f"frame_{part_index}.png")
|
| 110 |
+
binary_string_to_rgb_image(binary_string, width, height, image_path)
|
| 111 |
+
|
| 112 |
+
def merge_chunks(chunk_files, output_file):
|
| 113 |
+
"""Merge chunk files into a single file, removing padding."""
|
| 114 |
+
with open(output_file, 'wb') as f:
|
| 115 |
+
# Using tqdm to show progress in merging chunks
|
| 116 |
+
for chunk_file in tqdm(chunk_files, desc="Merging chunks", unit="chunk"):
|
| 117 |
+
with open(chunk_file, 'rb') as chunk:
|
| 118 |
+
data = chunk.read()
|
| 119 |
+
# Remove the padding from the last chunk
|
| 120 |
+
data = data.rstrip(PADDING_TOKEN)
|
| 121 |
+
f.write(data)
|
| 122 |
+
|
| 123 |
+
def calculate_sha256(file_path):
|
| 124 |
+
"""Calculate the SHA-256 hash of a file."""
|
| 125 |
+
sha256_hash = hashlib.sha256()
|
| 126 |
+
with open(file_path, 'rb') as f:
|
| 127 |
+
for byte_block in iter(lambda: f.read(4096), b""):
|
| 128 |
+
sha256_hash.update(byte_block)
|
| 129 |
+
return sha256_hash.hexdigest()
|
| 130 |
+
|
| 131 |
+
def create_video_from_frames(frame_folder, video_path, width=1920, height=1080, fps=30):
|
| 132 |
+
"""Create a video from a folder of image frames."""
|
| 133 |
+
frame_files = sorted([os.path.join(frame_folder, f) for f in os.listdir(frame_folder) if f.endswith('.png')])
|
| 134 |
+
# Initialize the video writer with libx264 codec
|
| 135 |
+
fourcc = cv2.VideoWriter_fourcc(*'H264') # Use libx264 codec
|
| 136 |
+
video_writer = cv2.VideoWriter(video_path, fourcc, fps, (width, height))
|
| 137 |
+
# Using tqdm to show progress in creating the video
|
| 138 |
+
for frame_file in tqdm(frame_files, desc="Creating video", unit="frame"):
|
| 139 |
+
frame = cv2.imread(frame_file)
|
| 140 |
+
video_writer.write(frame) # Add the frame to the video
|
| 141 |
+
video_writer.release()
|
| 142 |
+
|
| 143 |
+
def extract_frames_from_video(video_path, output_folder):
|
| 144 |
+
"""Extract frames from a video and save them to the output folder."""
|
| 145 |
+
if not os.path.exists(output_folder):
|
| 146 |
+
os.makedirs(output_folder)
|
| 147 |
+
cap = cv2.VideoCapture(video_path)
|
| 148 |
+
frame_index = 0
|
| 149 |
+
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
| 150 |
+
# Using tqdm to show progress in extracting frames
|
| 151 |
+
while True:
|
| 152 |
+
ret, frame = cap.read()
|
| 153 |
+
if not ret:
|
| 154 |
+
break
|
| 155 |
+
frame_path = os.path.join(output_folder, f"frame_{frame_index}.png")
|
| 156 |
+
cv2.imwrite(frame_path, frame)
|
| 157 |
+
frame_index += 1
|
| 158 |
+
cap.release()
|
| 159 |
+
|
| 160 |
+
def cleanup():
|
| 161 |
+
"""Clean up temporary files and directories."""
|
| 162 |
+
directories_to_remove = ['./parts', './frames', './extracted_frames']
|
| 163 |
+
for directory in directories_to_remove:
|
| 164 |
+
if os.path.exists(directory):
|
| 165 |
+
for file_name in os.listdir(directory):
|
| 166 |
+
file_path = os.path.join(directory, file_name)
|
| 167 |
+
try:
|
| 168 |
+
if os.path.isfile(file_path) or os.path.islink(file_path):
|
| 169 |
+
os.unlink(file_path)
|
| 170 |
+
elif os.path.isdir(file_path):
|
| 171 |
+
os.rmdir(file_path)
|
| 172 |
+
except Exception as e:
|
| 173 |
+
print(f"Failed to delete {file_path}. Reason: {e}")
|
| 174 |
+
os.rmdir(directory)
|
| 175 |
+
|
| 176 |
+
def encode_interface(file):
|
| 177 |
+
video_path = encode(file.name)
|
| 178 |
+
return video_path
|
| 179 |
+
|
| 180 |
+
def decode_interface(video, output_file):
|
| 181 |
+
result = decode(video.name, output_file.name)
|
| 182 |
+
return result
|
| 183 |
+
|
| 184 |
+
# Create Gradio interface
|
| 185 |
+
with gr.Blocks() as demo:
|
| 186 |
+
gr.Markdown("# File Encoding/Decoding Tool")
|
| 187 |
+
with gr.Tab("Encode"):
|
| 188 |
+
file_input = gr.File(label="Upload file to encode")
|
| 189 |
+
encode_button = gr.Button("Encode")
|
| 190 |
+
encode_output = gr.File(label="Encoded Video")
|
| 191 |
+
encode_button.click(encode_interface, inputs=file_input, outputs=encode_output)
|
| 192 |
+
|
| 193 |
+
with gr.Tab("Decode"):
|
| 194 |
+
video_input = gr.File(label="Upload video to decode")
|
| 195 |
+
output_file_input = gr.File(label="Output file path")
|
| 196 |
+
decode_button = gr.Button("Decode")
|
| 197 |
+
decode_output = gr.Textbox(label="Decoding Result")
|
| 198 |
+
decode_button.click(decode_interface, inputs=[video_input, output_file_input], outputs=decode_output)
|
| 199 |
+
|
| 200 |
+
demo.launch()
|