Spaces:
Sleeping
Sleeping
| import cv2 | |
| import supervision as sv | |
| import time | |
| import os | |
| import importlib.util | |
| from id_mapping import mapping | |
| from multiprocessing import Process, Queue | |
| from predict import Predictor | |
| from show_stitched import showAnnotatedStitched | |
| import shutil | |
| from datetime import datetime | |
| config_dir = os.path.abspath(os.path.dirname(__file__)) | |
| config_path = os.path.join(config_dir, 'PC_CONFIG.py') | |
| spec = importlib.util.spec_from_file_location("PC_CONFIG", config_path) | |
| PC_CONFIG = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(PC_CONFIG) | |
| def start_annotation_process(queue): | |
| image_count = 0 | |
| file_dir = os.path.join(PC_CONFIG.FILE_DIRECTORY,"image-rec","annotated_images") | |
| archive_directory_content(file_dir) | |
| while True: | |
| if not queue.empty(): | |
| item = queue.get() # Wait for an item from the queue | |
| if item[0] == "STOP": # Check for the termination signal | |
| print("Stopping annotation process.") | |
| break # Exit the loop to end the process | |
| print(f"item: {item}") | |
| image_file_path, results, detection_id = item | |
| image = cv2.imread(image_file_path) | |
| image_count += 1 | |
| show_annotation(image, results, detection_id, image_count) | |
| showAnnotatedStitched() | |
| def archive_directory_content(directory_path): | |
| # Moves files with the specified extension from the given directory to an archive directory, | |
| # except for files named .gitkeep. | |
| archive_dir= os.path.join(PC_CONFIG.FILE_DIRECTORY,"image-rec","annotated_archive") | |
| file_extension=".jpg" | |
| os.makedirs(archive_dir, exist_ok=True) # Create the archive directory if it doesn't exist | |
| for filename in os.listdir(directory_path): | |
| if filename.endswith(file_extension) and filename != ".gitkeep": | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| new_filename = f"{filename.rsplit('.', 1)[0]}_{timestamp}.{filename.rsplit('.', 1)[1]}" | |
| src_path = os.path.join(directory_path, filename) | |
| dest_path = os.path.join(archive_dir, new_filename) | |
| shutil.move(src_path, dest_path) | |
| print(f"Moved and renamed: {src_path} to {dest_path}") | |
| def show_annotation(image, results, detection_id, image_count): | |
| print(f"results[0]: {results[0]}") | |
| predictions = results[0].predictions | |
| filtered_predictions = [pred for pred in predictions if pred.detection_id == detection_id] | |
| if not filtered_predictions: | |
| print("No matching detection_id found.") | |
| # Update the predictions list with only the filtered predictions | |
| results[0].predictions = filtered_predictions | |
| # Load the results into the supervision Detections API | |
| detections = sv.Detections.from_inference(results[0].dict(by_alias=True, exclude_none=True)) | |
| print(f"detections : {detections}") | |
| class_name = "None" | |
| annotated_image= image | |
| if detections and detections.data and detections.data["class_name"]: | |
| class_name = detections.data["class_name"][0] | |
| class_id = str(mapping.get(class_name, -1)) | |
| updated_label = class_name + ", id=" + class_id | |
| # updated_label = class_name + ", " + class_id | |
| # Create supervision annotators | |
| bounding_box_annotator = sv.BoundingBoxAnnotator() | |
| label_annotator = sv.LabelAnnotator() | |
| # Annotate the image with inference results | |
| annotated_image = bounding_box_annotator.annotate(scene=image, detections=detections) | |
| annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections, labels=[updated_label]) | |
| # Save the annotated image with a unique name | |
| file_name = f"annotated_image{image_count}.jpg" | |
| file_path = os.path.join(PC_CONFIG.FILE_DIRECTORY,"image-rec","annotated_images",file_name) | |
| cv2.imwrite(file_path, annotated_image) | |
| print(f"Image saved as {file_path}") | |
| # Create a named window | |
| window_name = f"Annotated Image {image_count}" | |
| cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) | |
| # Move the window to a specific position on the screen | |
| cv2.moveWindow(window_name, 0, 0) | |
| # Display the saved annotated image | |
| cv2.imshow(f"Annotated Image {image_count}", annotated_image) | |
| cv2.waitKey(3000) # Display the image for 3 seconds | |
| cv2.destroyAllWindows() # Close all OpenCV windows | |
| if __name__ == '__main__': | |
| image_folder_path = os.path.join(PC_CONFIG.FILE_DIRECTORY, "image-rec", "sample_images") | |
| show_annotation_queue = Queue() | |
| process = Process(target=start_annotation_process, args=(show_annotation_queue,)) | |
| process.start() | |
| predictor = Predictor() | |
| # List all .jpg files in the specified directory | |
| file_path_list = [os.path.join(image_folder_path, f) for f in os.listdir(image_folder_path) if f.endswith('.jpg')] | |
| for file_path in file_path_list: | |
| class_name, results, detection_id = predictor.predict_id(file_path) # Perform prediction | |
| print(f"result: {results}") | |
| show_annotation_queue.put((file_path, results, detection_id)) | |
| # Send the termination signal to the process | |
| show_annotation_queue.put(("STOP",)) | |
| # Wait for the process to finish | |
| process.join() |