Spaces:
Paused
Paused
File size: 2,822 Bytes
266909b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
import cv2
import json
import os
import sys
import base64
import logging
from pyzbar import pyzbar
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
from checksum import checksum
# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def apply_preprocessing(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
_, threshold = cv2.threshold(gray, 128, 255, cv2.THRESH_BINARY)
return threshold
def read_barcode(preprocessed_frame):
barcodes = pyzbar.decode(preprocessed_frame)
for barcode in barcodes:
barcode_info = barcode.data.decode('utf-8')
return True, barcode_info
return False, None
def process_frame(frame):
preprocessed_frame = apply_preprocessing(frame)
success, data = read_barcode(preprocessed_frame)
if not success:
logging.warning("Failed to read QR code")
return None # Return None if no barcode is found
return data
def decode_video(cap, dest_folder):
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
pbar = tqdm(total=total_frames, desc="Processing Frames")
ret, first_frame = cap.read()
if not ret:
logging.error("Cannot read first frame")
return
metadata = process_frame(first_frame)
if metadata is None:
logging.error("No QR code in first frame; cannot proceed")
return
meta_data = json.loads(metadata)
dest = os.path.join(dest_folder, meta_data["Filename"])
file = open(dest, "wb")
# Start worker processes
num_workers = cpu_count()
with Pool(num_workers) as pool:
while cap.isOpened():
frames = []
done = False
for _ in range(num_workers):
ret, frame = cap.read()
if ret:
frames.append(frame)
else:
done = True
break
datas = pool.map(process_frame, frames)
pbar.update(len(frames))
for data in datas:
file.write(base64.b64decode(data))
if done:
break
file.close()
cap.release()
pbar.close()
logging.info("Verify file integrity")
md5_sum = checksum(dest)
assert md5_sum == meta_data["Filehash"], "Data corrupted"
logging.info("File integrity verified: ")
logging.info(dest)
def decode(src, dest_folder):
cap = cv2.VideoCapture(src)
decode_video(cap, dest_folder)
if __name__ == '__main__':
if len(sys.argv) < 3:
logging.error("Usage: python script.py source_file.mp4 destination_folder")
sys.exit(1)
src = sys.argv[1]
dest_folder = sys.argv[2]
decode(src, dest_folder)
|