codeserver / genz /decode_video.py
ar08's picture
Upload 12 files
266909b verified
import cv2
import json
import os
import sys
import base64
import logging
from pyzbar import pyzbar
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
from checksum import checksum
# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def apply_preprocessing(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
_, threshold = cv2.threshold(gray, 128, 255, cv2.THRESH_BINARY)
return threshold
def read_barcode(preprocessed_frame):
barcodes = pyzbar.decode(preprocessed_frame)
for barcode in barcodes:
barcode_info = barcode.data.decode('utf-8')
return True, barcode_info
return False, None
def process_frame(frame):
preprocessed_frame = apply_preprocessing(frame)
success, data = read_barcode(preprocessed_frame)
if not success:
logging.warning("Failed to read QR code")
return None # Return None if no barcode is found
return data
def decode_video(cap, dest_folder):
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
pbar = tqdm(total=total_frames, desc="Processing Frames")
ret, first_frame = cap.read()
if not ret:
logging.error("Cannot read first frame")
return
metadata = process_frame(first_frame)
if metadata is None:
logging.error("No QR code in first frame; cannot proceed")
return
meta_data = json.loads(metadata)
dest = os.path.join(dest_folder, meta_data["Filename"])
file = open(dest, "wb")
# Start worker processes
num_workers = cpu_count()
with Pool(num_workers) as pool:
while cap.isOpened():
frames = []
done = False
for _ in range(num_workers):
ret, frame = cap.read()
if ret:
frames.append(frame)
else:
done = True
break
datas = pool.map(process_frame, frames)
pbar.update(len(frames))
for data in datas:
file.write(base64.b64decode(data))
if done:
break
file.close()
cap.release()
pbar.close()
logging.info("Verify file integrity")
md5_sum = checksum(dest)
assert md5_sum == meta_data["Filehash"], "Data corrupted"
logging.info("File integrity verified: ")
logging.info(dest)
def decode(src, dest_folder):
cap = cv2.VideoCapture(src)
decode_video(cap, dest_folder)
if __name__ == '__main__':
if len(sys.argv) < 3:
logging.error("Usage: python script.py source_file.mp4 destination_folder")
sys.exit(1)
src = sys.argv[1]
dest_folder = sys.argv[2]
decode(src, dest_folder)