Spaces:
Paused
Paused
File size: 3,965 Bytes
266909b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
from itertools import islice
import os
import sys
import base64
import math
import json
import qrcode
import numpy as np
from multiprocessing import Pool, cpu_count
import av
from PIL import Image
from tqdm import tqdm
from checksum import checksum
# Constants
chunk_size = 500 # Adjust this as per the data capacity of QR codes and your specific requirements
frame_rate = 20.0
width = 1080
height = 1080
def read_in_chunks(file_object, chunk_size=1024):
"""Generator to read a file piece by piece."""
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
def create_qr(data_str, box_size=10, error_correction_level=qrcode.constants.ERROR_CORRECT_L):
"""Generate a QR code as a NumPy array from data string with specified box size and error correction level."""
qr = qrcode.QRCode(
version=1,
error_correction=error_correction_level,
box_size=box_size,
border=4,
)
qr.add_data(data_str)
img = qr.make_image(fill_color="black", back_color="white").convert('RGB')
pil_img = img.resize((width, height), Image.Resampling.NEAREST) # Use NEAREST for less interpolation blur
return np.array(pil_img)
def process_chunk(data):
"""Encode data chunk into QR and return as image."""
return create_qr(base64.b64encode(data).decode('ascii'))
def encode_and_write_frames(frames, stream, container):
"""Encode frames and write to video container."""
for frame in frames:
video_frame = av.VideoFrame.from_ndarray(frame, format='rgb24')
for packet in stream.encode(video_frame):
container.mux(packet)
def create_video(src, dest, read_file_lazy = False):
"""Create video from source file using PyAV."""
md5_checksum = checksum(src)
file_stats = os.stat(src)
file_size = file_stats.st_size
chunk_count = math.ceil(file_size / chunk_size)
pbar = tqdm(total=chunk_count, desc="Generating Frames")
meta_data = {
"Filename": os.path.basename(src),
"ChunkCount": chunk_count,
"Filehash": md5_checksum,
"ConverterUrl": "https://github.com/karaketir16/file2video",
"ConverterVersion": "python_v1"
}
first_frame_data = json.dumps(meta_data, indent=4)
first_frame = create_qr(first_frame_data)
# Open output file
container = av.open(dest, mode='w')
stream = container.add_stream('h264', rate=frame_rate)
stream.width = width
stream.height = height
stream.pix_fmt = 'yuv420p'
stream.options = {'crf': '40'}
# Write the first frame
video_frame = av.VideoFrame.from_ndarray(first_frame, format='rgb24')
for packet in stream.encode(video_frame):
container.mux(packet)
# Process chunks in batches using multiprocessing
with open(src, 'rb') as f, Pool(cpu_count()) as pool:
entire_file = []
if not read_file_lazy:
entire_file = f.read()
i = 0
while True:
chunks = []
if read_file_lazy:
chunks = list(islice(read_in_chunks(f, chunk_size), cpu_count()))
else:
for _ in range(cpu_count()):
chunks.append(entire_file[i: i + chunk_size])
if not chunks[-1]: #empty
chunks.pop()
break
i = i + chunk_size
if not chunks:
break
frames = pool.map(process_chunk, chunks)
encode_and_write_frames(frames, stream, container)
pbar.update(len(frames))
pbar.close()
# Finalize the video file
for packet in stream.encode():
container.mux(packet)
container.close()
if __name__ == '__main__':
if len(sys.argv) < 3:
print("Usage: python file2video.py source_file output_file.mp4")
sys.exit(1)
src = sys.argv[1]
dest = sys.argv[2]
create_video(src, dest)
|