Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import cv2 | |
| from shapely import Point, Polygon | |
| from shapely.geometry import box | |
| from shapely.wkt import dumps | |
| def mask_to_polygons(mask): | |
| contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if contours: | |
| return Polygon(contours[0].squeeze()) | |
| return None | |
| def calculate_centroid(bbox): | |
| x1,y1,x2,y2 = bbox | |
| cx = (x1 + x2) / 2 | |
| cy = (y1 + y2) / 2 | |
| return (cx,cy) | |
| def text_in_segmentation(df_ocr, df_masks,box_col="box", text_col="text"): | |
| """ | |
| Finds the target text in segmentation masks. E.g. "Soverom", "Bad". | |
| Args: | |
| - df_ocr (Pandas Dataframe): 'text' column with the OCR text, and 'box' column with OCR bounding boxes | |
| - seg_results (tensor object): contains results from YOLO model | |
| Returns: | |
| - Pandas Dataframe: "text" column with the OCR text found in segmentation masks, | |
| "polygon_coord" column with the polygon coordinates from the segmentation mask. | |
| """ | |
| results = [] | |
| if df_ocr is None or df_ocr.empty or df_masks is None or df_masks.empty: | |
| return pd.DataFrame(columns=[text_col,box_col, "mask_id"]) | |
| for mask_row in df_masks.itertuples(index=False): | |
| mask_id = mask_row.mask_id | |
| mask_array = mask_row.polygon | |
| poly = Polygon(mask_array) | |
| matched_text = [] | |
| matched_boxes = [] | |
| for idx,row in df_ocr.iterrows(): | |
| cx, cy = calculate_centroid(row[box_col]) | |
| if poly.contains(Point(cx, cy)): | |
| matched_text.append(row[text_col]) | |
| matched_boxes.append(row[box_col]) | |
| if matched_text: | |
| results.append({ | |
| "text": matched_text, | |
| "box": matched_boxes, | |
| "mask_id": mask_id | |
| }) | |
| return pd.DataFrame(results) | |
| def text_in_bboxes(df_ocr, df_objdet, box_col="box", text_col="text"): | |
| """ | |
| Finds the target text in object detection bounding boxes. | |
| Args: | |
| - df_ocr (Pandas DataFrame): DataFrame with 'text' column and 'box' column of OCR results. | |
| - df_objdet (Pandas DataFrame): DataFrame containing object detection results with bounding boxes. | |
| Returns: | |
| - Pandas DataFrame: Contains "text" column with the OCR text found in bounding boxes, | |
| "box" column with the corresponding bounding boxes, and "mask_id" for object detection. | |
| """ | |
| results = [] | |
| if df_ocr is None or df_ocr.empty or df_objdet is None or df_objdet.empty: | |
| return pd.DataFrame(columns=[text_col, box_col, "mask_id"]) | |
| for idx, ocr_row in df_ocr.iterrows(): | |
| ocr_box = ocr_row[box_col] | |
| ocr_cx, ocr_cy = calculate_centroid(ocr_box) | |
| for objdet_row in df_objdet.itertuples(index=False): | |
| mask_id = objdet_row.mask_id | |
| objdet_box = objdet_row.bboxes # Assuming this is also a list/tuple of bounding box coordinates | |
| # Calculate the bounding box centroid | |
| objdet_cx, objdet_cy = calculate_centroid(objdet_box) | |
| # Check if the centroid of OCR bounding box is within the object detection bounding box | |
| if (objdet_box[0] <= ocr_cx <= objdet_box[2]) and (objdet_box[1] <= ocr_cy <= objdet_box[3]): | |
| results.append({ | |
| "text": ocr_row[text_col], | |
| "box": ocr_box, | |
| "mask_id": mask_id | |
| }) | |
| return pd.DataFrame(results) | |
| def oppholdsrom_plantegning(df_oppholdsrom, detected_boxes): | |
| """ | |
| Finds the target floorplan object containing the text "Oppholdsrom". | |
| Used to count the number of bruksenheter in each floorplan. | |
| """ | |
| if df_oppholdsrom.empty or not detected_boxes: | |
| return 0 | |
| detected_polygons = [box(x1, y1, x2, y2) for (x1, y1, x2, y2) in detected_boxes] | |
| count = 0 | |
| for _, row in df_oppholdsrom.iterrows(): | |
| cx, cy = calculate_centroid(row["box"]) | |
| centroid = Point(cx, cy) | |
| if any(polygon.contains(centroid) for polygon in detected_polygons): | |
| count += 1 | |
| return count | |
| def safe_group_counts(df,key_col, list_col, count_name): | |
| if df is None or df.empty or key_col not in df.columns: | |
| return pd.DataFrame({key_col: pd.Series(dtype=int), | |
| count_name: pd.Series(dtype=int)}) | |
| grouped = ( | |
| df | |
| .groupby(key_col)[list_col] | |
| .agg(lambda series_of_lists: series_of_lists.apply(len).sum()) | |
| .reset_index() | |
| .rename(columns={list_col:count_name}) | |
| ) | |
| return grouped | |
| def floor_levels_duplicates(floor_levels_df): | |
| # Dersom det finnes duplikater for etasjenummer, antar vi at plantegningen gjelder denne etasjen. | |
| dup_floor_count = floor_levels_df['etasje'].value_counts() | |
| duplicate_floor_nums = dup_floor_count[dup_floor_count > 1].index | |
| # Lagre etasje nummer for gjeldende plantegning | |
| if duplicate_floor_nums.size > 0: | |
| etasje_nummer = pd.DataFrame({'etasje_nr': duplicate_floor_nums}) | |
| floor_levels_df = floor_levels_df.drop_duplicates(subset='etasje', keep='first') | |
| return floor_levels_df | |
| return floor_levels_df | |
| #else: | |
| # etasje_nummer = pd.DataFrame({'etasje_nr': floor_levels_df['floor_num'].unique()}) | |
| #unique_floors = floor_levels_df['floor_num'].unique() | |
| def floor_levels_logic(floor_levels_df, df_oppholdsrom_filtered,df_mask): | |
| # èn etasje og èn plantegning | |
| antall_etasjer = len(floor_levels_df) | |
| if antall_etasjer == 1: | |
| new_dataframe = pd.DataFrame({ | |
| 'text': df_oppholdsrom_filtered['text'], | |
| 'mask_id': df_oppholdsrom_filtered['mask_id'], | |
| 'etasje': floor_levels_df['etasje'].reset_index(drop=True) | |
| }) | |
| return new_dataframe | |
| elif antall_etasjer > 1: | |
| # Check if bbox from YOLO segmentation is close to floor level | |
| # Iterate over df | |
| result = pd.merge(df_oppholdsrom_filtered, df_mask, on='mask_id', how='inner') | |
| result_sorted = result.copy() | |
| result_sorted[['xmin','ymin', 'xmax','ymax']] = pd.DataFrame( | |
| result_sorted['bboxes'].tolist(), | |
| index=result_sorted.index | |
| ) | |
| result_sorted = result_sorted.sort_values(['xmin', 'ymin']).reset_index(drop=True) | |
| floor_levels_sorted = floor_levels_df.copy() | |
| floor_levels_sorted[['xmin', 'ymin', 'xmax', 'ymax']] = pd.DataFrame( | |
| floor_levels_sorted['box'].tolist(), | |
| index=floor_levels_sorted.index | |
| ) | |
| floor_levels_sorted = floor_levels_sorted.sort_values(['xmin', 'ymin']).reset_index(drop=True) | |
| new_dataframe = pd.DataFrame({ | |
| 'text': result_sorted['text'], | |
| 'mask_id': result_sorted['mask_id'], | |
| 'etasje': floor_levels_sorted['etasje'] | |
| }) | |
| return new_dataframe | |
| # To etasjer og to plantegninger | |
| # må ha segmenteringsmaske med oppholdsrom | |
| # return df with cols: floor_num, text, bbox, mask id | |