Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """MartheDeployment_Doors_fasterRCNN.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1kgEtpfNt0jxSwPRhOzODIC6P_prg-c4L | |
| ## Libraries | |
| """ | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| import statistics | |
| from statistics import mode | |
| from PIL import Image | |
| # pip install PyPDF2 | |
| # pip install PyMuPDF | |
| # pip install pip install PyMuPDF==1.19.0 | |
| import io | |
| # !pip install pypdfium2 | |
| import pypdfium2 as pdfium | |
| import fitz # PyMuPDF | |
| import os | |
| #drive.mount("/content/drive", force_remount=True) | |
| import torch | |
| from torchvision.models.detection.faster_rcnn import FastRCNNPredictor | |
| from PIL import Image, ImageDraw | |
| import torchvision.transforms.functional as F | |
| import matplotlib.pyplot as plt | |
| import google_sheet_Legend | |
| import torch | |
| import torchvision | |
| from torchvision.models.detection.faster_rcnn import FastRCNNPredictor | |
| from PyPDF2 import PdfReader, PdfWriter | |
| from PyPDF2.generic import TextStringObject, NameObject, ArrayObject, FloatObject | |
| from PyPDF2.generic import NameObject, TextStringObject, DictionaryObject, FloatObject, ArrayObject | |
| from PyPDF2 import PdfReader | |
| from io import BytesIO | |
| def convert2pillow(path): | |
| pdf = pdfium.PdfDocument(path) | |
| page = pdf.get_page(0) | |
| pil_image = page.render().to_pil() | |
| return pil_image | |
| # Function to get the model | |
| def get_model(num_classes): | |
| # Load a pre-trained Faster R-CNN model with a ResNet-50-FPN backbone | |
| model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True) | |
| # Get the number of input features for the classifier | |
| in_features = model.roi_heads.box_predictor.cls_score.in_features | |
| # Replace the pre-trained head with a new one for our number of classes | |
| model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes) | |
| return model | |
| def ev_model(img, model, device, threshold): | |
| image_tensor = F.to_tensor(img).unsqueeze(0) | |
| image_tensor = image_tensor.to(device) | |
| model.eval() | |
| with torch.no_grad(): | |
| predictions = model(image_tensor) | |
| doors_info = [] | |
| for element in range(len(predictions[0]['boxes'])): | |
| score = predictions[0]['scores'][element].item() | |
| if score > threshold: | |
| box = predictions[0]['boxes'][element].tolist() | |
| label = predictions[0]['labels'][element].item() | |
| doors_info.append((box,label)) | |
| return doors_info | |
| def distance(point1, point2): | |
| x1, y1 = point1 | |
| x2, y2 = point2 | |
| return np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) | |
| def calculate_midpoint(p1, p2): | |
| x1, y1 = p1 | |
| x2, y2 = p2 | |
| # Calculate the midpoint | |
| xm = int((x1 + x2) / 2) | |
| ym = int((y1 + y2) / 2) | |
| return (xm, ym) | |
| def get_door_info(doors_info): | |
| width_pixels = [] | |
| lines = [] | |
| #sanda = [] | |
| line_midpoint = [] | |
| singles = 0 | |
| door_type = [] | |
| for door_inf in doors_info: | |
| xmin, ymin, xmax, ymax = door_inf[0] | |
| #horz_bottom | |
| if door_inf[1] == 2: | |
| #for drawing | |
| #point_st = (int(xmin), int(ymax) + 5) | |
| #point_end = (int(xmax),int(ymax) + 5) | |
| point_st = (int(xmin), int(ymax)) | |
| point_end = (int(xmax),int(ymax)) | |
| lines.append((point_st,point_end)) | |
| line_midpoint.append(calculate_midpoint(point_st,point_end)) | |
| #sanda_st = (int(xmin), int(ymax)) | |
| #sand_end = (int(xmax),int(ymax)) | |
| #sanda.append((sanda_st, sand_end)) | |
| #line_midpoint.append(calculate_midpoint(sanda_st,sand_end)) | |
| #for calculation | |
| width = distance((xmin,ymax), (xmax,ymax)) | |
| width_pixels.append(width) | |
| door_type.append(0) | |
| singles +=1 | |
| #horz_upper | |
| if door_inf[1] == 3: | |
| #for drawing | |
| point_st = (int(xmin),int(ymin)) | |
| point_end = (int(xmax),int(ymin)) | |
| #point_st = (int(xmin),int(ymin) -5) | |
| #point_end = (int(xmax),int(ymin) - 5) | |
| lines.append((point_st,point_end)) | |
| line_midpoint.append(calculate_midpoint(point_st, point_end)) | |
| #sanda_st = (int(xmin),int(ymin)) | |
| #sand_end = (int(xmax),int(ymin)) | |
| #sanda.append((sanda_st, sand_end)) | |
| #line_midpoint.append(calculate_midpoint(sanda_st,sand_end)) | |
| #for calculation | |
| width = distance((xmin,ymin), (xmax,ymin)) | |
| width_pixels.append(width) | |
| singles +=1 | |
| door_type.append(0) | |
| #vert_right | |
| if door_inf[1] == 4: | |
| #for drawing | |
| #point_st = (int(xmax) + 5,int(ymin)) | |
| #point_end = (int(xmax) + 5,int(ymax)) | |
| point_st = (int(xmax), int(ymin)) | |
| point_end = (int(xmax), int(ymax)) | |
| lines.append((point_st,point_end)) | |
| line_midpoint.append(calculate_midpoint(point_st,point_end)) | |
| #sanda_st = (int(xmax), int(ymin)) | |
| #sand_end = (int(xmax), int(ymax)) | |
| #sanda.append((sanda_st, sand_end)) | |
| #line_midpoint.append(calculate_midpoint(sanda_st,sand_end)) | |
| #for calculation | |
| width = distance((xmax,ymin), (xmax,ymax)) | |
| width_pixels.append(width) | |
| singles +=1 | |
| door_type.append(0) | |
| #vert_left | |
| if door_inf[1] == 5: | |
| #for drawing | |
| point_st = (int(xmin),int(ymin)) | |
| point_end = (int(xmin),int(ymax)) | |
| #point_st = (int(xmin) -5,int(ymin)) | |
| #point_end = (int(xmin) -5,int(ymax)) | |
| lines.append((point_st,point_end)) | |
| line_midpoint.append(calculate_midpoint(point_st,point_end)) | |
| #sanda_st = (int(xmin),int(ymin)) | |
| #sand_end = (int(xmin),int(ymax)) | |
| #sanda.append((sanda_st, sand_end)) | |
| #line_midpoint.append(calculate_midpoint(sanda_st,sand_end)) | |
| #for calculation | |
| width = distance((xmin,ymin), (xmin,ymax)) | |
| width_pixels.append(width) | |
| singles +=1 | |
| door_type.append(0) | |
| return width_pixels, lines, line_midpoint, singles, door_type | |
| def get_door_info_double(doors_info_double, width_pixels, lines, line_midpoint, door_type): | |
| doubles = 0 | |
| for door_inf in doors_info_double: | |
| xmin, ymin, xmax, ymax = door_inf[0] | |
| #double_bottom | |
| if door_inf[1] == 1: | |
| point_st = (int(xmin), int(ymax)) | |
| point_end = (int(xmax),int(ymax)) | |
| #point_st = (int(xmin), int(ymax) + 5) | |
| #point_end = (int(xmax),int(ymax) + 5) | |
| lines.append((point_st,point_end)) | |
| line_midpoint.append(calculate_midpoint(point_st,point_end)) | |
| #sanda_st = (int(xmin), int(ymax)) | |
| #sand_end = (int(xmax),int(ymax)) | |
| #sanda.append((sanda_st, sand_end)) | |
| #line_midpoint.append(calculate_midpoint(sanda_st,sand_end)) | |
| #for calculation | |
| width = distance((xmin,ymax), (xmax,ymax)) | |
| width_pixels.append(width) | |
| doubles +=1 | |
| door_type.append(1) | |
| #double_upper | |
| if door_inf[1] == 7: | |
| #for drawing | |
| point_st = (int(xmin),int(ymin)) | |
| point_end = (int(xmax),int(ymin)) | |
| #point_st = (int(xmin),int(ymin) -5) | |
| #point_end = (int(xmax),int(ymin) - 5) | |
| lines.append((point_st,point_end)) | |
| line_midpoint.append(calculate_midpoint(point_st,point_end)) | |
| #sanda_st = (int(xmin),int(ymin)) | |
| #sand_end = (int(xmax),int(ymin)) | |
| #sanda.append((sanda_st, sand_end)) | |
| #line_midpoint.append(calculate_midpoint(sanda_st,sand_end)) | |
| #for calculation | |
| width = distance((xmin,ymin), (xmax,ymin)) | |
| width_pixels.append(width) | |
| doubles +=1 | |
| door_type.append(1) | |
| #double_right | |
| if door_inf[1] == 8: | |
| #for drawing | |
| #point_st = (int(xmax) + 5,int(ymin)) | |
| #point_end = (int(xmax) + 5,int(ymax)) | |
| point_st = (int(xmax),int(ymin)) | |
| point_end = (int(xmax),int(ymax)) | |
| lines.append((point_st,point_end)) | |
| line_midpoint.append(calculate_midpoint(point_st,point_end)) | |
| #sanda_st = (int(xmax), int(ymin)) | |
| #sand_end = (int(xmax), int(ymax)) | |
| #sanda.append((sanda_st, sand_end)) | |
| #line_midpoint.append(calculate_midpoint(sanda_st,sand_end)) | |
| #for calculation | |
| width = distance((xmax,ymin), (xmax,ymax)) | |
| width_pixels.append(width) | |
| doubles +=1 | |
| door_type.append(1) | |
| #double_left | |
| if door_inf[1] == 4: | |
| #for drawing | |
| #point_st = (int(xmin) -5,int(ymin)) | |
| #point_end = (int(xmin) -5,int(ymax)) | |
| point_st = (int(xmin),int(ymin)) | |
| point_end = (int(xmin),int(ymax)) | |
| lines.append((point_st,point_end)) | |
| line_midpoint.append(calculate_midpoint(point_st,point_end)) | |
| #sanda_st = (int(xmin),int(ymin)) | |
| #sand_end = (int(xmin),int(ymax)) | |
| #sanda.append((sanda_st, sand_end)) | |
| #line_midpoint.append(calculate_midpoint(sanda_st,sand_end)) | |
| #for calculation | |
| width = distance((xmin,ymin), (xmin,ymax)) | |
| width_pixels.append(width) | |
| doubles +=1 | |
| door_type.append(1) | |
| return width_pixels, lines, line_midpoint, doubles, door_type | |
| def pxl2meter(width_pixels, ratio): | |
| real_width = [] | |
| for width in width_pixels: | |
| real_width.append(round(width*ratio, 2)) | |
| return real_width | |
| def width_as_char(real_width): | |
| char_width = [] | |
| for width in real_width: | |
| char_width.append(f"{width}") | |
| return char_width | |
| def adjustannotations(OutputPdfStage1): | |
| input_pdf_path = OutputPdfStage1 | |
| # output_pdf_path = "AnnotationAdjusted.pdf" | |
| # Load the input PDF | |
| pdf_bytes_io = BytesIO(OutputPdfStage1) | |
| reader = PdfReader(pdf_bytes_io) | |
| writer = PdfWriter() | |
| # Append all pages to the writer | |
| writer.append_pages_from_reader(reader) | |
| # Add metadata (optional) | |
| metadata = reader.metadata | |
| writer.add_metadata(metadata) | |
| # Iterate over pages | |
| for page_index, page in enumerate(writer.pages): | |
| if "/Annots" in page: | |
| annotations = page["/Annots"] | |
| for annot_index, annot in enumerate(annotations): | |
| obj = annot.get_object() | |
| # print("obj", obj) | |
| # print(obj.get("/IT")) | |
| if obj.get("/Subtype") == "/Line": | |
| print("AWL ANNOT IF") | |
| # Check the /IT value to differentiate annotations | |
| if "/Contents" in obj and "m" in obj["/Contents"]: | |
| print("Tany IF") | |
| obj.update({ | |
| NameObject("/Measure"): DictionaryObject({ | |
| NameObject("/Type"): NameObject("/Measure"), | |
| NameObject("/L"): DictionaryObject({ | |
| NameObject("/G"): FloatObject(1), | |
| NameObject("/U"): TextStringObject("sq m"), # Unit of measurement for area | |
| }), | |
| }), | |
| NameObject("/IT"): NameObject("/LineDimension"), # Use more distinctive name | |
| NameObject("/Subj"): TextStringObject("Length Measurement"), # Intent explicitly for Area | |
| }) | |
| print("After Update:", obj) | |
| # # Save the modified PDF | |
| output_pdf_io = BytesIO() | |
| writer.write(output_pdf_io) | |
| output_pdf_io.seek(0) | |
| print("Annotations updated and saved ") | |
| return output_pdf_io.read() | |
| def add_annotations_to_pdf(image, pdf_name, lines, char_width, line_midpoint, door_type): | |
| image_width, image_height = image.size | |
| # Create a new PDF document | |
| pdf_document = fitz.open('pdf',pdf_name) | |
| page=pdf_document[0] | |
| rotationOld=page.rotation | |
| derotationMatrix=page.derotation_matrix | |
| print('rotationOld',rotationOld) | |
| if page.rotation!=0: | |
| rotationangle = page.rotation | |
| page.set_rotation(0) | |
| print('rotationnew',page.rotation) | |
| # Add a new page to the document with the same dimensions as the image | |
| # page = pdf_document.new_page(width=image_width, height=image_height) | |
| # # Insert the image into the PDF page | |
| # image_stream = io.BytesIO() | |
| # image.save(image_stream, format="PNG") | |
| # page.insert_image(page.rect, stream=image_stream.getvalue()) | |
| #Annotation for drawin lines as in the markups | |
| for i in range(len(line_midpoint)): | |
| x, y = line_midpoint[i] | |
| p_midpoint = fitz.Point(x, y) * derotationMatrix | |
| rect = fitz.Rect(p_midpoint.x, p_midpoint.y, p_midpoint.x + 200, p_midpoint.y + 50) | |
| text = char_width[i] | |
| annot = page.add_freetext_annot(rect, text, fontsize=10, fontname="helv", text_color=(1, 0, 0)) | |
| annot.update() | |
| #for i in range(len(doubleD_bbox)): | |
| #a7seb ana el midpoint beta3 el bbox 3ashan a7ot el width feha | |
| #x,y = calculate_midpoint((doubleD_bbox[i][0], doubleD_bbox[i][1]), (doubleD_bbox[i][2],doubleD_bbox[i][3])) | |
| #p_midpoint = fitz.Point(x, y) * derotationMatrix | |
| #rect = fitz.Rect(p_midpoint.x, p_midpoint.y, p_midpoint.x + 200, p_midpoint.y + 50) | |
| #text = char_d_width[i] | |
| #annot = page.add_freetext_annot(rect, text, fontsize=10, fontname="helv", text_color=(1, 0, 0)) | |
| #annot.update() | |
| #Annotation for drawin lines as in the markups | |
| for i in range(len(lines)): | |
| l_points = [fitz.Point(*lines[i][0])* derotationMatrix, fitz.Point(*lines[i][1])* derotationMatrix] | |
| #l_points = [fitz.Point(*sanda[i][0])* derotationMatrix, fitz.Point(*lines[i][0])* derotationMatrix, fitz.Point(*lines[i][1])* derotationMatrix, fitz.Point(*sanda[i][1])* derotationMatrix] | |
| annot = page.add_polyline_annot(l_points) | |
| annot.set_border(width=2, dashes=None) # Optional border styling | |
| annot.set_colors(stroke=(1, 0, 0)) # Set the line color to red | |
| annot.set_info(content=str(char_width[i])+' m',subject='Perimeter Measurement', title="ADR Team") | |
| annot.update() | |
| for i in range(len(door_type)): | |
| x, y = line_midpoint[i] | |
| p_midpoint = fitz.Point(x, y) * derotationMatrix | |
| if door_type[i] == 0: | |
| text = "Single Door" | |
| else: | |
| text = "Double Door" | |
| # Create an annotation (sticky note) | |
| annot = page.add_text_annot((p_midpoint.x, p_midpoint.y), text) | |
| annot.set_border(width=0.2, dashes=(1, 2)) # Optional border styling | |
| annot.set_colors(stroke=(1, 0, 0), fill=None) # Set the stroke color to red | |
| annot.update() | |
| page.set_rotation(rotationOld) | |
| return pdf_document | |
| def main_run(img_pillow,pdf_fullpath, weights_path, weights_path2, pdf_name,pdfpath,ratio): ####pdf_fullpath here is the data and not the path | |
| img_pillow = convert2pillow(pdf_fullpath) | |
| # For Single Doors | |
| num_classes = 10 # classes + background | |
| # Load the model with the specified number of classes | |
| model = get_model(num_classes) | |
| # Load the saved model's state dictionary with map_location to handle CPU | |
| device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') | |
| try: | |
| model.load_state_dict(torch.load(weights_path, map_location=device), strict=False) | |
| except RuntimeError as e: | |
| print(f"Error loading model state_dict: {e}") | |
| return | |
| # Set the model to evaluation mode | |
| model.eval() | |
| # Move the model to the appropriate device | |
| model.to(device) | |
| # For Double Doors | |
| num_classes2 = 12 # classes + background | |
| # Load the model with the specified number of classes | |
| model2 = get_model(num_classes2) | |
| # Load the saved model's state dictionary with map_location to handle CPU | |
| device2 = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') | |
| try: | |
| model2.load_state_dict(torch.load(weights_path2, map_location=device2), strict=False) | |
| except RuntimeError as e: | |
| print(f"Error loading model state_dict: {e}") | |
| return | |
| # Set the model to evaluation mode | |
| model2.eval() | |
| # Move the model to the appropriate device | |
| model2.to(device2) | |
| # START INFERENCE | |
| doors_info = ev_model(img_pillow, model, device, 0.9) | |
| doors_info_double = ev_model(img_pillow, model2, device2, 0.8) | |
| width_pixels, lines, line_midpoint, single_count, door_type = get_door_info(doors_info) | |
| width_pixels, lines, line_midpoint, double_count, door_type = get_door_info_double(doors_info_double, width_pixels, lines, line_midpoint, door_type) | |
| real_width = pxl2meter(width_pixels, ratio) | |
| char_width = width_as_char(real_width) | |
| pdf_document = add_annotations_to_pdf(img_pillow, pdf_fullpath, lines, char_width, line_midpoint, door_type) | |
| modified_pdf_data = pdf_document.tobytes() | |
| OutputPdfStage2=adjustannotations(modified_pdf_data) | |
| #Dataframe for Doors count | |
| doors_count = {'Type': ['Single Doors', 'Double Doors'], 'Quantity': [single_count, double_count]} | |
| df_doors = pd.DataFrame(doors_count) | |
| doc2 =fitz.open('pdf',OutputPdfStage2) | |
| page=pdf_document[0] | |
| pix = page.get_pixmap() # render page to an image | |
| pl=Image.frombytes('RGB', [pix.width,pix.height],pix.samples) | |
| img=np.array(pl) | |
| annotatedimg = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| df_doors = df_doors.fillna(' ') | |
| gc,spreadsheet_service,spreadsheetId, spreadsheet_url , namepathArr=google_sheet_Legend.legendGoogleSheets(df_doors , pdf_name,pdfpath) | |
| list1=pd.DataFrame(columns=['content', 'id', 'subject','color']) | |
| # for page in doc: | |
| for page in doc2: | |
| # Iterate through annotations on the page | |
| for annot in page.annots(): | |
| # Get the color of the annotation | |
| annot_color = annot.colors | |
| if annot_color is not None: | |
| # annot_color is a dictionary with 'stroke' and 'fill' keys | |
| stroke_color = annot_color.get('stroke') # Border color | |
| print('strokeee',stroke_color) | |
| # fill_color = annot_color.get('fill') # Fill color | |
| # if fill_color: | |
| # v='fill' | |
| # print('fill') | |
| if stroke_color: | |
| v='stroke' | |
| # x,y,z=int(annot_color.get(v)[0]*255),int(annot_color.get(v)[1]*255),int(annot_color.get(v)[2]*255) | |
| list1.loc[len(list1)] =[annot.info['content'],annot.info['id'],annot.info['subject'],[255,0,0]] | |
| print('list1',list1) | |
| # modify this return | |
| # OutputPdfStage2=OutputPdfStage2.read() | |
| # doc2 =fitz.open('pdf',OutputPdfStage2) | |
| return annotatedimg, doc2 , spreadsheet_url, list1, df_doors | |
| # model_path = '/content/drive/MyDrive/combined.pth' | |
| # #pdf_name = data | |
| # for i in range(len(fullpath)): | |
| # main_run(fullpath[i], model_path, pdf_name[i]) |