Spaces:
Paused
Paused
| import cv2 | |
| import json | |
| import os | |
| import sys | |
| import base64 | |
| import logging | |
| from pyzbar import pyzbar | |
| from tqdm import tqdm | |
| from multiprocessing import Pool, cpu_count | |
| from checksum import checksum | |
| # Setup basic logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| def apply_preprocessing(frame): | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| _, threshold = cv2.threshold(gray, 128, 255, cv2.THRESH_BINARY) | |
| return threshold | |
| def read_barcode(preprocessed_frame): | |
| barcodes = pyzbar.decode(preprocessed_frame) | |
| for barcode in barcodes: | |
| barcode_info = barcode.data.decode('utf-8') | |
| return True, barcode_info | |
| return False, None | |
| def process_frame(frame): | |
| preprocessed_frame = apply_preprocessing(frame) | |
| success, data = read_barcode(preprocessed_frame) | |
| if not success: | |
| logging.warning("Failed to read QR code") | |
| return None # Return None if no barcode is found | |
| return data | |
| def decode_video(cap, dest_folder): | |
| if not os.path.exists(dest_folder): | |
| os.makedirs(dest_folder) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| pbar = tqdm(total=total_frames, desc="Processing Frames") | |
| ret, first_frame = cap.read() | |
| if not ret: | |
| logging.error("Cannot read first frame") | |
| return | |
| metadata = process_frame(first_frame) | |
| if metadata is None: | |
| logging.error("No QR code in first frame; cannot proceed") | |
| return | |
| meta_data = json.loads(metadata) | |
| dest = os.path.join(dest_folder, meta_data["Filename"]) | |
| file = open(dest, "wb") | |
| # Start worker processes | |
| num_workers = cpu_count() | |
| with Pool(num_workers) as pool: | |
| while cap.isOpened(): | |
| frames = [] | |
| done = False | |
| for _ in range(num_workers): | |
| ret, frame = cap.read() | |
| if ret: | |
| frames.append(frame) | |
| else: | |
| done = True | |
| break | |
| datas = pool.map(process_frame, frames) | |
| pbar.update(len(frames)) | |
| for data in datas: | |
| file.write(base64.b64decode(data)) | |
| if done: | |
| break | |
| file.close() | |
| cap.release() | |
| pbar.close() | |
| logging.info("Verify file integrity") | |
| md5_sum = checksum(dest) | |
| assert md5_sum == meta_data["Filehash"], "Data corrupted" | |
| logging.info("File integrity verified: ") | |
| logging.info(dest) | |
| def decode(src, dest_folder): | |
| cap = cv2.VideoCapture(src) | |
| decode_video(cap, dest_folder) | |
| if __name__ == '__main__': | |
| if len(sys.argv) < 3: | |
| logging.error("Usage: python script.py source_file.mp4 destination_folder") | |
| sys.exit(1) | |
| src = sys.argv[1] | |
| dest_folder = sys.argv[2] | |
| decode(src, dest_folder) | |