Spaces:
Sleeping
Sleeping
| from PIL import Image | |
| from PIL import ImageFile | |
| ImageFile.LOAD_TRUNCATED_IMAGES = True | |
| import torch | |
| import random | |
| import numpy as np | |
| from transformers import MaskFormerFeatureExtractor, MaskFormerForInstanceSegmentation,MaskFormerImageProcessor | |
| from enum import Enum | |
| import json | |
| from flask import Flask | |
| from flask import request | |
| from flask_cors import CORS, cross_origin | |
| import base64 | |
| import io | |
| import os | |
| import cv2 | |
| from io import BytesIO | |
| import copy | |
| import sympy | |
| import imutils | |
| import gc | |
| import statistics | |
| model_name="facebook/maskformer-swin-large-ade" | |
| os.environ['SENTENCE_TRANSFORMERS_HOME'] = '/code/.cache' | |
| os.environ['TRANSFORMERS_CACHE '] = '/code/.cache' | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = MaskFormerForInstanceSegmentation.from_pretrained(model_name).to(device) | |
| model.eval() | |
| processor1 = MaskFormerFeatureExtractor.from_pretrained(model_name) | |
| processor2 = MaskFormerImageProcessor.from_pretrained(model_name) | |
| processor = MaskFormerImageProcessor.from_pretrained(model_name) | |
| class LABEL_TYPE(Enum): | |
| WINDOW = 8 | |
| WALL = 0 | |
| FLOOR = 3 | |
| CEILING = 5 | |
| #================================Optimizer ===========================================================================================================# | |
| def enhance_image(image,alpha=1.5, beta=20, sharpening_factor=1.5): | |
| """ | |
| Enhance image by adjusting brightness, contrast, and sharpness. | |
| Parameters: | |
| - image: Input image | |
| - alpha: Controls the contrast (1.0 means no change) | |
| - beta: Controls the brightness | |
| - sharpening_factor: Controls the sharpness | |
| Returns: | |
| - Enhanced image | |
| """ | |
| # Adjust contrast and brightness | |
| enhanced_image = cv2.convertScaleAbs(image, alpha=alpha, beta=beta) | |
| # Apply unsharp masking for sharpening | |
| #blurred = cv2.GaussianBlur(enhanced_image, (0, 0), 3) | |
| #sharpened = cv2.addWeighted(enhanced_image, 1.0 + sharpening_factor, blurred, -sharpening_factor, 0) | |
| return enhanced_image | |
| def enhance_image_quality(image, denoise_strength=10, sharpening_factor=1.5): | |
| """ | |
| Enhance image quality by denoising and sharpening. | |
| Parameters: | |
| - image: Input image | |
| - denoise_strength: Controls the strength of denoising (higher values mean more denoising) | |
| - sharpening_factor: Controls the sharpness | |
| Returns: | |
| - Enhanced image | |
| """ | |
| # Denoise the image | |
| denoised_image = cv2.fastNlMeansDenoisingColored(image, None, denoise_strength, 10, 7, 21) | |
| # Sharpen the denoised image | |
| # blurred = cv2.GaussianBlur(denoised_image, (0, 0), 3) | |
| # sharpened = cv2.addWeighted(denoised_image, 1.0 + sharpening_factor, blurred, -sharpening_factor, 0) | |
| return denoised_image | |
| def enhance_image_with_erosion(image, erosion_kernel_size=3, iterations=1): | |
| """ | |
| Enhance image by applying erosion. | |
| Parameters: | |
| - image: Input image | |
| - erosion_kernel_size: Size of the kernel for erosion | |
| - iterations: Number of times erosion is applied | |
| Returns: | |
| - Enhanced image | |
| """ | |
| # Create a rectangular kernel for erosion | |
| kernel = np.ones((erosion_kernel_size, erosion_kernel_size), np.uint8) | |
| # Apply erosion | |
| eroded_image = cv2.erode(image, kernel, iterations=iterations) | |
| return eroded_image | |
| def optimize_image(image, erosion_kernel_size=3, dilation_kernel_size=3, iterations=1): | |
| """ | |
| Optimize image by applying opening operation (erosion followed by dilation). | |
| Parameters: | |
| - image: Input image | |
| - erosion_kernel_size: Size of the kernel for erosion | |
| - dilation_kernel_size: Size of the kernel for dilation | |
| - iterations: Number of times erosion and dilation are applied | |
| Returns: | |
| - Optimized image | |
| """ | |
| # Create kernels for erosion and dilation | |
| erosion_kernel = np.ones((erosion_kernel_size, erosion_kernel_size), np.uint8) | |
| dilation_kernel = np.ones((dilation_kernel_size, dilation_kernel_size), np.uint8) | |
| # Apply erosion followed by dilation (opening operation) | |
| optimized_image = cv2.morphologyEx(image, cv2.MORPH_OPEN, erosion_kernel, iterations=iterations) | |
| optimized_image = cv2.dilate(optimized_image, dilation_kernel, iterations=iterations) | |
| return optimized_image | |
| #================================Optimizer ===========================================================================================================# | |
| #================================Process Function ===================================================================================================# | |
| def process_image(img,processor_type=1): | |
| processor=False | |
| if(processor_type): | |
| processor = processor1 | |
| else: | |
| processor = processor2 | |
| img=Image.fromarray(img) | |
| inputs = processor(images=img, return_tensors="pt") | |
| outputs = model(**inputs.to(device)) | |
| # model predicts class_queries_logits of shape `(batch_size, num_queries)` | |
| # and masks_queries_logits of shape `(batch_size, num_queries, height, width)` | |
| class_queries_logits = outputs.class_queries_logits | |
| masks_queries_logits = outputs.masks_queries_logits | |
| result=processor.post_process_panoptic_segmentation(outputs, target_sizes=[img.size[::-1]])[0] | |
| segementation_id=False | |
| for lb_obj in result['segments_info']: | |
| if(lb_obj['label_id']==8): | |
| segementation_id=lb_obj['id'] | |
| mask=result['segmentation'].detach().cpu().numpy() | |
| return {"mask":mask,"label_id":segementation_id} | |
| def process_mask(maskobj): | |
| mask=maskobj["mask"] | |
| segementation_id=maskobj["label_id"] | |
| mask=np.array(mask) | |
| binary_mask=mask==segementation_id | |
| binary_mask=(binary_mask*255).astype(np.uint8) | |
| return binary_mask | |
| def apply_errsion(mask): | |
| margin_size = 13 | |
| # Dilate the mask to expand the regions | |
| kernel = np.ones((margin_size, margin_size), dtype=np.uint8) | |
| #dilated_mask = cv2.dilate(d, kernel, iterations=1) | |
| # Erode the dilated mask to get back the original size with a margin | |
| eroded_mask = cv2.erode(mask, kernel, iterations=1) | |
| return eroded_mask | |
| def clear_counter_noise(mask): | |
| cnts = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| cnts = imutils.grab_contours(cnts) | |
| cnts = sorted(cnts, key=cv2.contourArea, reverse=True) | |
| rect_areas = [] | |
| for c in cnts: | |
| (x, y, w, h) = cv2.boundingRect(c) | |
| rect_areas.append(w * h) | |
| if(len(rect_areas)): | |
| avg_area = statistics.mean(rect_areas) | |
| for c in cnts: | |
| (x, y, w, h) = cv2.boundingRect(c) | |
| cnt_area = w * h | |
| if cnt_area < 0.5 * avg_area: | |
| mask[y:y + h, x:x + w] = 0 | |
| return mask | |
| def appx_best_fit_ngon(contour, n: int = 4) : | |
| hull = cv2.convexHull(contour) | |
| hull = np.array(hull).reshape((len(hull), 2)) | |
| # to sympy land | |
| hull = [sympy.Point(*pt) for pt in hull] | |
| # run until we cut down to n vertices | |
| while len(hull) > n: | |
| best_candidate = None | |
| # for all edges in hull ( <edge_idx_1>, <edge_idx_2> ) -> | |
| for edge_idx_1 in range(len(hull)): | |
| edge_idx_2 = (edge_idx_1 + 1) % len(hull) | |
| adj_idx_1 = (edge_idx_1 - 1) % len(hull) | |
| adj_idx_2 = (edge_idx_1 + 2) % len(hull) | |
| edge_pt_1 = sympy.Point(*hull[edge_idx_1]) | |
| edge_pt_2 = sympy.Point(*hull[edge_idx_2]) | |
| adj_pt_1 = sympy.Point(*hull[adj_idx_1]) | |
| adj_pt_2 = sympy.Point(*hull[adj_idx_2]) | |
| subpoly = sympy.Polygon(adj_pt_1, edge_pt_1, edge_pt_2, adj_pt_2) | |
| angle1 = subpoly.angles[edge_pt_1] | |
| angle2 = subpoly.angles[edge_pt_2] | |
| # we need to first make sure that the sum of the interior angles the edge | |
| # makes with the two adjacent edges is more than 180° | |
| if sympy.N(angle1 + angle2) <= sympy.pi: | |
| continue | |
| # find the new vertex if we delete this edge | |
| adj_edge_1 = sympy.Line(adj_pt_1, edge_pt_1) | |
| adj_edge_2 = sympy.Line(edge_pt_2, adj_pt_2) | |
| intersect = adj_edge_1.intersection(adj_edge_2)[0] | |
| # the area of the triangle we'll be adding | |
| area = sympy.N(sympy.Triangle(edge_pt_1, intersect, edge_pt_2).area) | |
| # should be the lowest | |
| if best_candidate and best_candidate[1] < area: | |
| continue | |
| # delete the edge and add the intersection of adjacent edges to the hull | |
| better_hull = list(hull) | |
| better_hull[edge_idx_1] = intersect | |
| del better_hull[edge_idx_2] | |
| best_candidate = (better_hull, area) | |
| if not best_candidate: | |
| raise ValueError("Could not find the best fit n-gon!") | |
| hull = best_candidate[0] | |
| # back to python land | |
| hull = [[int(x), int(y)] for x, y in hull] | |
| return hull | |
| def get_window_rect_box(mask): | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
| windows=[] | |
| for contour in contours: | |
| hull = appx_best_fit_ngon(contour) | |
| windows.append(hull) | |
| return windows | |
| def generator(img,model_name,processor_type,enhancer_fun=1,apply_errsion_p=True,clear_noise=True): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| enhance_img=img | |
| if enhancer_fun==1: | |
| enhance_img=optimize_image(img) | |
| elif enhancer_fun==2: | |
| enhance_img=enhance_image_with_erosion(img) | |
| elif enhancer_fun==3: | |
| enhance_img=enhance_image_quality(img) | |
| else: | |
| enhance_img=enhance_image(img) | |
| maskobj=process_image(enhance_img,processor_type) | |
| if(maskobj['label_id']!=False): | |
| mask=process_mask(maskobj) | |
| else: | |
| maskobj=process_image(img,processor_type) | |
| mask=process_mask(maskobj) | |
| if(apply_errsion_p): | |
| mask= apply_errsion(mask) | |
| if(clear_noise): | |
| mask= clear_counter_noise(mask) | |
| return get_window_rect_box(mask) | |
| #========================================Process function =============================================================================================# | |
| def query_image(img): | |
| target_size = (img.shape[0], img.shape[1]) | |
| inputs = processor1(images=img, return_tensors="pt") | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| # outputs.class_queries_logits = outputs.class_queries_logits.cpu() | |
| # outputs.masks_queries_logits = outputs.masks_queries_logits.cpu() | |
| results = processor1.post_process_segmentation(outputs=outputs, target_size=target_size)[0].cpu().detach() | |
| results = torch.argmax(results, dim=0).numpy() | |
| return results | |
| def create_layer(label,data,image): | |
| mask=query_image(data) | |
| binary_mask=False | |
| np_image=np.array(image) | |
| if label==LABEL_TYPE.WALL.value: | |
| binary_mask=(mask == LABEL_TYPE.WALL.value) | |
| else: | |
| not_wall =mask != LABEL_TYPE.WALL.value | |
| not_window=mask != LABEL_TYPE.WINDOW.value | |
| not_ceiling =mask != LABEL_TYPE.CEILING.value | |
| binary_mask= (not_wall & not_window & not_ceiling) | |
| object_mask = (binary_mask * 255).astype(np.uint8) | |
| contours, _ = cv2.findContours(object_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if contours: | |
| # Get the bounding box for the largest contour (assuming only one object in the mask) | |
| rgba_image = cv2.cvtColor(np_image, cv2.COLOR_BGR2BGRA) | |
| contour_mask = np.zeros_like(object_mask) | |
| # Draw contours on the mask | |
| #cv2.drawContours(contour_mask, contours, -1, (255), thickness=cv2.FILLED) | |
| for contour in contours: | |
| epsilon = 0.003 * cv2.arcLength(contour, True) | |
| approx = cv2.approxPolyDP(contour, epsilon, True) | |
| cv2.drawContours(contour_mask, [approx], -1, (255), thickness=cv2.FILLED) | |
| # Apply the mask to the RGB image | |
| #result_image = cv2.bitwise_and(np_image, np_image, mask=contour_mask) | |
| rgba_image[:, :, 3] = contour_mask | |
| return rgba_image | |
| app = Flask(__name__) | |
| cors = CORS(app) | |
| app.config['CORS_HEADERS'] = '*' | |
| app.config['CORS_ORIGINS'] = '*' | |
| app.config['CORS_EXPOSE_HEADERS'] = '*' | |
| def get_window_rec_points(): | |
| body=request.get_json() | |
| processor_type=1 | |
| enhance_function=1 | |
| apply_errsion=True | |
| clear_noise=True | |
| if body.get("processor_type"): | |
| processor_type=int(body['processor_type']) | |
| if body.get("enhance_function"): | |
| processor_type=int(body['enhance_function']) | |
| if body.get("apply_errsion"): | |
| processor_type=bool(body['apply_errsion']) | |
| if body.get("clear_noise"): | |
| processor_type=bool(int(body['clear_noise'])) | |
| base64_str=body['img'] | |
| img = Image.open(io.BytesIO(base64.decodebytes(bytes(base64_str, "utf-8")))) | |
| numpydata = np.asarray(img) | |
| print(f"=================In ================get_window_rec_points parameters {model_name} ,processor_type={processor_type} ,enhance_function={enhance_function},apply_errsion={apply_errsion},clear_noise={clear_noise}") | |
| vertices=generator(numpydata,model_name,processor_type,enhance_function,apply_errsion,clear_noise) | |
| print("==========================================================================") | |
| print(vertices) | |
| print("=============================================") | |
| response=app.make_response(json.dumps({"windows":vertices,"num_of_windows":len(vertices)})) | |
| # response.headers.add("Access-Control-Allow-Origin", "*") | |
| # response.headers.add("Access-Control-Allow-Headers", "*") | |
| # response.headers.add("Access-Control-Allow-Methods", "*") | |
| response.content_type="application/json" | |
| return response | |
| def get_window_layer(is_floor): | |
| body=request.get_json() | |
| base64_str=body['img'] | |
| # Assuming base64_str is the string value without 'data:image/jpeg;base64,' | |
| img = Image.open(io.BytesIO(base64.decodebytes(bytes(base64_str, "utf-8")))) | |
| numpydata = np.asarray(img) | |
| label=LABEL_TYPE.FLOOR.value | |
| if is_floor=="0": | |
| label=LABEL_TYPE.WALL.value | |
| # Assuming base64_str is the string value without 'data:image/jpeg;base64,' | |
| img = Image.open(io.BytesIO(base64.decodebytes(bytes(base64_str, "utf-8")))) | |
| object=create_layer(label,numpydata,img) | |
| img_cv=Image.fromarray(object) | |
| buffered = BytesIO() | |
| img_cv.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()) | |
| response=app.make_response(json.dumps({ | |
| "img":img_str.decode("utf-8") | |
| })) | |
| # response.headers.add("Access-Control-Allow-Origin", "*") | |
| # response.headers.add("Access-Control-Allow-Headers", "*") | |
| # response.headers.add("Access-Control-Allow-Methods", "*") | |
| response.content_type="application/json" | |
| return response | |
| app.run(debug=True,port="7860") |