Spaces:
Sleeping
Sleeping
File size: 5,230 Bytes
689227c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import cv2
import supervision as sv
import time
import os
import importlib.util
from id_mapping import mapping
from multiprocessing import Process, Queue
from predict import Predictor
from show_stitched import showAnnotatedStitched
import shutil
from datetime import datetime
config_dir = os.path.abspath(os.path.dirname(__file__))
config_path = os.path.join(config_dir, 'PC_CONFIG.py')
spec = importlib.util.spec_from_file_location("PC_CONFIG", config_path)
PC_CONFIG = importlib.util.module_from_spec(spec)
spec.loader.exec_module(PC_CONFIG)
def start_annotation_process(queue):
image_count = 0
file_dir = os.path.join(PC_CONFIG.FILE_DIRECTORY,"image-rec","annotated_images")
archive_directory_content(file_dir)
while True:
if not queue.empty():
item = queue.get() # Wait for an item from the queue
if item[0] == "STOP": # Check for the termination signal
print("Stopping annotation process.")
break # Exit the loop to end the process
print(f"item: {item}")
image_file_path, results, detection_id = item
image = cv2.imread(image_file_path)
image_count += 1
show_annotation(image, results, detection_id, image_count)
showAnnotatedStitched()
def archive_directory_content(directory_path):
# Moves files with the specified extension from the given directory to an archive directory,
# except for files named .gitkeep.
archive_dir= os.path.join(PC_CONFIG.FILE_DIRECTORY,"image-rec","annotated_archive")
file_extension=".jpg"
os.makedirs(archive_dir, exist_ok=True) # Create the archive directory if it doesn't exist
for filename in os.listdir(directory_path):
if filename.endswith(file_extension) and filename != ".gitkeep":
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
new_filename = f"{filename.rsplit('.', 1)[0]}_{timestamp}.{filename.rsplit('.', 1)[1]}"
src_path = os.path.join(directory_path, filename)
dest_path = os.path.join(archive_dir, new_filename)
shutil.move(src_path, dest_path)
print(f"Moved and renamed: {src_path} to {dest_path}")
def show_annotation(image, results, detection_id, image_count):
print(f"results[0]: {results[0]}")
predictions = results[0].predictions
filtered_predictions = [pred for pred in predictions if pred.detection_id == detection_id]
if not filtered_predictions:
print("No matching detection_id found.")
# Update the predictions list with only the filtered predictions
results[0].predictions = filtered_predictions
# Load the results into the supervision Detections API
detections = sv.Detections.from_inference(results[0].dict(by_alias=True, exclude_none=True))
print(f"detections : {detections}")
class_name = "None"
annotated_image= image
if detections and detections.data and detections.data["class_name"]:
class_name = detections.data["class_name"][0]
class_id = str(mapping.get(class_name, -1))
updated_label = class_name + ", id=" + class_id
# updated_label = class_name + ", " + class_id
# Create supervision annotators
bounding_box_annotator = sv.BoundingBoxAnnotator()
label_annotator = sv.LabelAnnotator()
# Annotate the image with inference results
annotated_image = bounding_box_annotator.annotate(scene=image, detections=detections)
annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections, labels=[updated_label])
# Save the annotated image with a unique name
file_name = f"annotated_image{image_count}.jpg"
file_path = os.path.join(PC_CONFIG.FILE_DIRECTORY,"image-rec","annotated_images",file_name)
cv2.imwrite(file_path, annotated_image)
print(f"Image saved as {file_path}")
# Create a named window
window_name = f"Annotated Image {image_count}"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
# Move the window to a specific position on the screen
cv2.moveWindow(window_name, 0, 0)
# Display the saved annotated image
cv2.imshow(f"Annotated Image {image_count}", annotated_image)
cv2.waitKey(3000) # Display the image for 3 seconds
cv2.destroyAllWindows() # Close all OpenCV windows
if __name__ == '__main__':
image_folder_path = os.path.join(PC_CONFIG.FILE_DIRECTORY, "image-rec", "sample_images")
show_annotation_queue = Queue()
process = Process(target=start_annotation_process, args=(show_annotation_queue,))
process.start()
predictor = Predictor()
# List all .jpg files in the specified directory
file_path_list = [os.path.join(image_folder_path, f) for f in os.listdir(image_folder_path) if f.endswith('.jpg')]
for file_path in file_path_list:
class_name, results, detection_id = predictor.predict_id(file_path) # Perform prediction
print(f"result: {results}")
show_annotation_queue.put((file_path, results, detection_id))
# Send the termination signal to the process
show_annotation_queue.put(("STOP",))
# Wait for the process to finish
process.join() |