| | from collections import defaultdict |
| | import pandas as pd |
| | import random |
| | import re |
| | import io |
| | import pypdfium2 as pdfium |
| | import fitz |
| | from PIL import Image, ImageDraw |
| | from PyPDF2 import PdfReader, PdfWriter |
| | from PyPDF2.generic import TextStringObject, NameObject, ArrayObject, FloatObject |
| | from PyPDF2.generic import NameObject, TextStringObject, DictionaryObject, FloatObject, ArrayObject |
| | from PyPDF2 import PdfReader |
| | from PyPDF2.generic import TextStringObject |
| | import numpy as np |
| | import cv2 |
| | from collections import defaultdict |
| | import random |
| | import fitz |
| | import PyPDF2 |
| | import io |
| | from PyPDF2.generic import TextStringObject |
| | from PyPDF2 import PdfReader, PdfWriter |
| |
|
| |
|
| | def convert2img(path): |
| | pdf = pdfium.PdfDocument(path) |
| | page = pdf.get_page(0) |
| | pil_image = page.render().to_pil() |
| | pl1=np.array(pil_image) |
| | img = cv2.cvtColor(pl1, cv2.COLOR_RGB2BGR) |
| | return img |
| |
|
| | def convert2pillow(path): |
| | pdf = pdfium.PdfDocument(path) |
| | page = pdf.get_page(0) |
| | pil_image = page.render().to_pil() |
| | return pil_image |
| |
|
| | def calculate_midpoint(x1,y1,x2,y2): |
| | xm = int((x1 + x2) / 2) |
| | ym = int((y1 + y2) / 2) |
| | return (xm, ym) |
| |
|
| | def read_text(input_pdf_path): |
| | pdf_document = fitz.open('pdf',input_pdf_path) |
| |
|
| | for page_num in range(pdf_document.page_count): |
| | page = pdf_document[page_num] |
| | text_instances = page.get_text("words") |
| |
|
| | page.apply_redactions() |
| | return text_instances |
| |
|
| | def normalize_text(text): |
| | """ |
| | Normalize text by removing all whitespace characters and converting to lowercase. |
| | """ |
| | if not isinstance(text, str): |
| | return "" |
| | |
| | text = re.sub(r'\s+', '', text) |
| | return text.lower() |
| |
|
| |
|
| | def build_flexible_regex(term): |
| | """ |
| | Match the full string, allowing whitespace or light punctuation between words, |
| | but not allowing extra words or partial matches. |
| | """ |
| | words = normalize_text(term).split() |
| | pattern = r'[\s\.\:\-]*'.join(map(re.escape, words)) |
| | full_pattern = rf'^{pattern}$' |
| | return re.compile(full_pattern, re.IGNORECASE) |
| |
|
| | def flexible_search(df, search_terms): |
| | """ |
| | Search for terms in column names and top N rows. |
| | Returns matched column indices and cell positions. |
| | """ |
| | normalized_columns = [normalize_text(col) for col in df.columns] |
| | results = {term: {"col_matches": [], "cell_matches": []} for term in search_terms} |
| |
|
| | for term in search_terms: |
| | regex = build_flexible_regex(term) |
| |
|
| | |
| | for col_idx, col_text in enumerate(df.columns): |
| | norm_col = normalize_text(col_text) |
| | if regex.search(norm_col): |
| | results[term]["col_matches"].append(col_idx) |
| |
|
| | |
| | for row_idx in range(min(3, len(df))): |
| | for col_idx in range(len(df.columns)): |
| | cell_text = normalize_text(df.iat[row_idx, col_idx]) |
| | if regex.search(cell_text): |
| | results[term]["cell_matches"].append((row_idx, col_idx)) |
| |
|
| | return results |
| |
|
| |
|
| | def generate_current_table_without_cropping(clm_idx, clmn_name, df): |
| | selected_df = df.iloc[:, clm_idx] |
| | print("hello I generated the selected columns table without cropping") |
| | selected_df.columns = clmn_name |
| | return selected_df |
| |
|
| |
|
| |
|
| | def crop_rename_table(indices, clmn_name, clmn_idx,df): |
| | |
| | crop_at = max(indices) + 1 |
| |
|
| | df = df.iloc[crop_at:] |
| | df.reset_index(drop=True, inplace=True) |
| |
|
| |
|
| | slctd_clms = df.iloc[:, clmn_idx] |
| | slctd_clms.columns = clmn_name |
| |
|
| | return slctd_clms |
| |
|
| | def clean_column_row(row): |
| | return [re.sub(r'^\d+-\s*', '', str(cell)) for cell in row] |
| |
|
| | def details_in_another_table(clmn_name, clmn_idx, current_dfs, dfs): |
| | matching_dfs = [ |
| | dff for dff in dfs |
| | if dff is not current_dfs and current_dfs.shape[1] == dff.shape[1] |
| | ] |
| |
|
| | if not matching_dfs: |
| | return None |
| |
|
| | updated_dfs = [] |
| | for dff in matching_dfs: |
| | selected_dff = dff.iloc[:, clmn_idx].copy() |
| |
|
| | |
| | cleaned_header = clean_column_row(selected_dff.columns.tolist()) |
| | col_names_as_row = pd.DataFrame([cleaned_header]) |
| |
|
| | |
| | selected_dff.columns = clmn_name |
| | col_names_as_row.columns = clmn_name |
| |
|
| | |
| | temp_df = pd.concat([col_names_as_row, selected_dff], ignore_index=True) |
| | updated_dfs.append(temp_df) |
| |
|
| | combined_df = pd.concat(updated_dfs, ignore_index=True) |
| |
|
| | return combined_df |
| |
|
| | def map_user_input_to_standard_labels(user_inputs): |
| | patterns = { |
| | 'door_id': r'\b(?:door\s*)?(?:id|no|number)\b|\bdoor\s*name\b', |
| | 'door_type': r'\b(?:\S+\s+)?door\s*type\b|\btype(?:\s+\w+)?\b', |
| | 'structural_opening': r'\bstructural\s+opening\b', |
| | 'width': r'\bwidth\b', |
| | 'height': r'\bheight\b', |
| | } |
| |
|
| | def normalize(text): |
| | return re.sub(r'\s+', ' ', text.strip(), flags=re.MULTILINE).lower() |
| |
|
| | mapped = {} |
| |
|
| | for item in user_inputs: |
| | normalized_item = normalize(item) |
| | matched = False |
| | for label, pattern in patterns.items(): |
| | if label not in mapped and re.search(pattern, normalized_item, re.IGNORECASE): |
| | mapped[label] = item |
| | matched = True |
| | break |
| | |
| | |
| |
|
| | return mapped |
| |
|
| | def analyse_cell_columns(cell_columns_appearance): |
| | cell_matches = [] |
| | col_matches = [] |
| | for key in cell_columns_appearance.keys(): |
| | if len(cell_columns_appearance[key]['cell_matches']) >0: |
| | cell_matches.append(cell_columns_appearance[key]['cell_matches'][0]) |
| | if len(cell_columns_appearance[key]['col_matches']) >0: |
| | col_matches.append(cell_columns_appearance[key]['col_matches'][0]) |
| | return cell_matches, col_matches |
| |
|
| | |
| | def get_row_column_indices(cell_clmn_indx): |
| | row_index = [] |
| | column_index = [] |
| | for t in cell_clmn_indx: |
| | row_index.append(t[0]) |
| | column_index.append(t[1]) |
| | return row_index, column_index |
| |
|
| | |
| | def get_column_index(col_matches): |
| | idx = [] |
| | for t in col_matches: |
| | idx.append(t) |
| | return idx |
| |
|
| |
|
| | def extract_tables(schedule): |
| | doc = fitz.open("pdf",schedule) |
| | for page in doc: |
| | tabs = page.find_tables() |
| | dfs = [] |
| | for tab in tabs: |
| | df = tab.to_pandas() |
| | dfs.append(df) |
| | return dfs |
| |
|
| | def get_selected_columns(dfs, user_patterns): |
| | selected_columns = [] |
| | selected_columns_new = None |
| |
|
| | for i in range(len(dfs)): |
| | cell_columns_appearance = flexible_search(dfs[i], user_patterns) |
| | cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) |
| |
|
| | |
| |
|
| | if len(user_patterns) == 2: |
| | clmn_name = ["door_id", "door_type"] |
| | if len(user_patterns) == 4: |
| | clmn_name = ["door_id", "door_type", "width", "height"] |
| | if len(user_patterns) == 3: |
| | clmn_name = ["door_id", "door_type", "structural opening"] |
| | if len(cell_matches) == 0 and len(col_matches) == 0: |
| | print(f"this is df {i}, SEARCH IN ANOTHER DF") |
| | else: |
| | |
| | if len(col_matches) == len(user_patterns): |
| | column_index_list = get_column_index(col_matches) |
| | print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") |
| |
|
| | print(column_index_list) |
| | if len(dfs[i]) <10: |
| | selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
| |
|
| | |
| | if len(dfs[i]) >10: |
| | selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) |
| | |
| |
|
| | |
| | if len(cell_matches) == len(user_patterns): |
| | row_index_list, column_index_list = get_row_column_indices(cell_matches) |
| | print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") |
| |
|
| | |
| | if len(dfs[i]) <10: |
| | |
| | selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
| | break |
| | |
| | if len(dfs[i]) >10: |
| | print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") |
| | selected_columns_new = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) |
| | break |
| | return selected_columns_new |
| |
|
| |
|
| |
|
| | def separate_main_secondary(input_user_clmn_names): |
| | main_info = input_user_clmn_names[:4] |
| | secondary_info = input_user_clmn_names[4:] |
| | return main_info, secondary_info |
| |
|
| | |
| | |
| | def get_column_name(user_input_m): |
| | |
| | empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] |
| |
|
| | |
| | fixed_list = ["door_id", "door_type", "width", "height"] |
| | for i in range(len(empty_indices)): |
| | if empty_indices[i] == 3: |
| | fixed_list[2] = "structural_opening" |
| | fixed_list[empty_indices[i]] = "" |
| |
|
| | |
| | clmn_name_m = [i for i in fixed_list if i] |
| |
|
| | return clmn_name_m |
| |
|
| | |
| | def get_column_name_secondary(user_input_m): |
| | |
| | empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] |
| |
|
| | |
| | fixed_list = ["fire_rate", "acoustic_rate"] |
| | for i in range(len(empty_indices)): |
| | fixed_list[empty_indices[i]] = "" |
| |
|
| | |
| | clmn_name_m = [i for i in fixed_list if i] |
| |
|
| | return clmn_name_m |
| |
|
| |
|
| | |
| | def get_selected_columns_all(dfs, user_patterns): |
| | selected_columns = [] |
| | selected_columns_new = None |
| |
|
| | for i in range(len(dfs)): |
| |
|
| |
|
| |
|
| |
|
| |
|
| | main_info, secondary_info = separate_main_secondary(user_patterns) |
| | clmn_name_main = get_column_name(main_info) |
| | non_empty_main_info = [item for item in main_info if item] |
| |
|
| | clmn_name_secondary = get_column_name_secondary(secondary_info) |
| |
|
| |
|
| | non_empty_secondary_info = [item for item in secondary_info if item] |
| |
|
| | clmn_name = clmn_name_main + clmn_name_secondary |
| | non_empty_info = non_empty_main_info + non_empty_secondary_info |
| |
|
| | |
| | print(f"clmn name: {clmn_name}") |
| | print(f"non-empty info: {non_empty_info}") |
| | |
| |
|
| |
|
| | cell_columns_appearance = flexible_search(dfs[i], non_empty_info) |
| | cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) |
| |
|
| | print(f"length of cell_matches: {len(cell_matches)}") |
| | print(f"cell_matches: {cell_matches}") |
| | |
| | |
| |
|
| |
|
| |
|
| |
|
| | print(clmn_name) |
| |
|
| | if len(cell_matches) == 0 and len(col_matches) == 0: |
| | print(f"this is df {i}, SEARCH IN ANOTHER DF") |
| |
|
| | else: |
| | |
| | if len(col_matches) == len(non_empty_info): |
| | column_index_list = get_column_index(col_matches) |
| | print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") |
| | |
| | |
| | print(column_index_list) |
| | if len(dfs[i]) <10: |
| | selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
| | |
| | |
| | |
| | if len(dfs[i]) >10: |
| | selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) |
| | |
| |
|
| | |
| | if len(cell_matches) == len(non_empty_info): |
| | row_index_list, column_index_list = get_row_column_indices(cell_matches) |
| | print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") |
| |
|
| | |
| | |
| | |
| | selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
| | selected_columns_new = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) |
| |
|
| | break |
| | |
| | |
| | |
| | |
| | |
| | return selected_columns_new |
| |
|
| |
|
| | |
| | |
| | def get_st_op_pattern(selected_columns, user_input): |
| | target = 'structural_opening' |
| | if target in selected_columns.columns: |
| | name = user_input[2] |
| | return name |
| | return None |
| |
|
| |
|
| | def find_text_in_plan(label, x): |
| | substring_coordinates = [] |
| | words = [] |
| | point_list = [] |
| | |
| | for tpl in x: |
| | if tpl[4] == label: |
| | substring_coordinates.append(calculate_midpoint(tpl[0],tpl[1],tpl[2],tpl[3])) |
| | point_list.append(calculate_midpoint(tpl[1],tpl[0],tpl[3],tpl[2])) |
| | words.append(tpl[4]) |
| | return substring_coordinates, words, point_list |
| |
|
| |
|
| |
|
| | def get_word_locations_plan(flattened_list, plan_texts): |
| | locations = [] |
| | not_found = [] |
| |
|
| | if len(flattened_list[0]) == 2: |
| | for lbl, clr in flattened_list: |
| | location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
| | if len(location) ==0: |
| | not_found.append(lbl) |
| | locations.append((location, lbl, clr)) |
| |
|
| | if len(flattened_list[0]) == 3: |
| | for lbl, w, clr in flattened_list: |
| | location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
| | if len(location) ==0: |
| | not_found.append(lbl) |
| | locations.append((location, lbl, clr, w)) |
| | if len(flattened_list[0]) == 4: |
| | for lbl, w, h, clr in flattened_list: |
| | location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
| | if len(location) ==0: |
| | not_found.append(lbl) |
| | locations.append((location, lbl, clr, w, h)) |
| | return locations, not_found |
| | |
| | def get_repeated_labels(locations): |
| | seen_labels = set() |
| | repeated_labels = set() |
| |
|
| | for item in locations: |
| | label = item[1] |
| | if label in seen_labels: |
| | repeated_labels.add(label) |
| | else: |
| | seen_labels.add(label) |
| | return repeated_labels |
| |
|
| | def get_cleaned_data(locations): |
| | processed = defaultdict(int) |
| |
|
| | new_data = [] |
| | if len(locations[0]) == 3: |
| | for coords, label, color in locations: |
| | if len(coords)>1: |
| | index = processed[label] % len(coords) |
| | new_coord = [coords[index]] |
| | new_data.append((new_coord, label, color)) |
| | processed[label] += 1 |
| | if len(coords)==1: |
| | new_data.append((coords, label, color)) |
| |
|
| | if len(locations[0]) == 4: |
| | for coords, label, color, w in locations: |
| | if len(coords)>1: |
| | index = processed[label] % len(coords) |
| | new_coord = [coords[index]] |
| | new_data.append((new_coord, label, color, w)) |
| | processed[label] += 1 |
| | if len(coords)==1: |
| | new_data.append((coords, label, color, w)) |
| | if len(locations[0]) == 5: |
| | for coords, label, color, w, h in locations: |
| | if len(coords)>1: |
| | index = processed[label] % len(coords) |
| | new_coord = [coords[index]] |
| | new_data.append((new_coord, label, color, w, h)) |
| | processed[label] += 1 |
| | if len(coords)==1: |
| | new_data.append((coords, label, color, w, h)) |
| |
|
| | return new_data |
| |
|
| |
|
| | |
| | def get_width_info_tobeprinted(new_data): |
| | width_info_tobeprinted = [] |
| | if len(new_data[0]) == 4: |
| | for _,_,_, w in new_data: |
| | |
| | |
| | width_info_tobeprinted.append(w) |
| | if len(new_data[0]) == 5: |
| | for _,_,_, w,h in new_data: |
| | w = re.sub(r",", "", w) |
| | h = re.sub(r",", "", h) |
| | if float(w).is_integer(): |
| | w = int(float(w)) |
| | else: |
| | w = w |
| | if float(h).is_integer(): |
| | h = int(float(h)) |
| | else: |
| | h = h |
| | width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") |
| | return width_info_tobeprinted |
| | |
| | def clean_dimensions(text): |
| | |
| | text = re.sub(r'[,\s]*mm', '', text) |
| | text = text.replace(",", "") |
| | return text |
| |
|
| | def get_cleaned_width(width_info_tobeprinted): |
| | cleaned_width = [] |
| | for w in width_info_tobeprinted: |
| | cleaned_width.append(clean_dimensions(w)) |
| | return cleaned_width |
| |
|
| |
|
| | def get_widths_bb_format(cleaned_width, kelma): |
| | pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b" |
| | match = re.search(pattern, kelma) |
| | widths = [] |
| | for widthaa in cleaned_width: |
| | index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x")) |
| | width_name = widthaa[:index] |
| | height_name = widthaa[index+1:] |
| | width_name = int(float(width_name)) |
| | height_name = int(float(height_name)) |
| | if match: |
| | full_text = f"{width_name} mm wide x {height_name} mm high" |
| | else: |
| | full_text = f"{height_name} mm wide x {width_name} mm high" |
| | widths.append(full_text) |
| | return widths |
| |
|
| |
|
| | def get_width_info_tobeprinted_secondary(new_data, main_info, secondary_info): |
| | width_info_tobeprinted = [] |
| | secondary_info_tobeprinted = [] |
| |
|
| | if len(main_info) == 2 and len(secondary_info) == 1: |
| | for coords, label, acous, color in new_data: |
| | secondary_info_tobeprinted.append(acous) |
| |
|
| |
|
| | if len(main_info) == 2 and len(secondary_info) == 2: |
| | for coords, label, acous, fire, color in new_data: |
| | secondary_info_tobeprinted.append((acous, fire)) |
| |
|
| | if len(main_info) == 3 and len(secondary_info) == 1: |
| | for coords, label, width, acous, color in new_data: |
| | width_info_tobeprinted.append(width) |
| | secondary_info_tobeprinted.append(acous) |
| |
|
| |
|
| | if len(main_info) == 3 and len(secondary_info) == 2: |
| | for coords, label, width, acous, fire, color in new_data: |
| | width_info_tobeprinted.append(width) |
| | secondary_info_tobeprinted.append((acous, fire)) |
| |
|
| | if len(main_info) == 4 and len(secondary_info) == 1: |
| | for coords, label, width, height, acous, color in new_data: |
| | w = re.sub(r",", "", width) |
| | h = re.sub(r",", "", height) |
| | if float(w).is_integer(): |
| | w = int(float(w)) |
| | else: |
| | w = w |
| | if float(h).is_integer(): |
| | h = int(float(h)) |
| | else: |
| | h = h |
| | width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") |
| | secondary_info_tobeprinted.append(acous) |
| |
|
| |
|
| | if len(main_info) == 4 and len(secondary_info) == 2: |
| | for coords, label, width, height, acous, fire, color in new_data: |
| | w = re.sub(r",", "", width) |
| | h = re.sub(r",", "", height) |
| | if float(w).is_integer(): |
| | w = int(float(w)) |
| | else: |
| | w = w |
| | if float(h).is_integer(): |
| | h = int(float(h)) |
| | else: |
| | h = h |
| | width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") |
| | secondary_info_tobeprinted.append((acous, fire)) |
| | return width_info_tobeprinted, secondary_info_tobeprinted |
| |
|
| | def get_word_locations_plan_secondary(flattened_list, plan_texts, main_info, secondary_info): |
| | |
| | locations = [] |
| | not_found = [] |
| | len_main = len(main_info) |
| | len_secondary = len(secondary_info) |
| |
|
| | if len_main == 2 and len_secondary == 2: |
| | for lbl, clr, acoustic, fire in flattened_list: |
| | location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
| | if len(location) ==0: |
| | not_found.append(lbl) |
| | locations.append((location, lbl, clr, acoustic, fire)) |
| |
|
| | if len_main == 2 and len_secondary == 1: |
| | for lbl, clr, acoustic in flattened_list: |
| | location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
| | if len(location) ==0: |
| | not_found.append(lbl) |
| | locations.append((location, lbl, clr, acoustic)) |
| |
|
| |
|
| |
|
| | if len_main == 3 and len_secondary == 2: |
| | for lbl, w, clr, acoustic, fire in flattened_list: |
| | location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
| | if len(location) ==0: |
| | not_found.append(lbl) |
| | locations.append((location, lbl, w, clr, acoustic, fire)) |
| |
|
| | if len_main == 3 and len_secondary == 1: |
| | for lbl, w, clr, acoustic in flattened_list: |
| | location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
| | if len(location) ==0: |
| | not_found.append(lbl) |
| | locations.append((location, lbl, w, clr, acoustic)) |
| |
|
| |
|
| |
|
| | if len_main == 4 and len_secondary == 2: |
| | for lbl, w, h, clr, acoustic, fire in flattened_list: |
| | location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
| | if len(location) ==0: |
| | not_found.append(lbl) |
| | locations.append((location, lbl, w, h, clr, acoustic, fire)) |
| |
|
| | if len_main == 4 and len_secondary == 1: |
| | for lbl, w, h, clr, acoustic in flattened_list: |
| | location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
| | if len(location) ==0: |
| | not_found.append(lbl) |
| | locations.append((location, lbl, w, h, clr,acoustic)) |
| | return locations, not_found |
| |
|
| | |
| | def get_similar_colors_all(selected_columns_new): |
| | def generate_rgb(): |
| | return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) |
| |
|
| | unique_keys = selected_columns_new['door_type'].unique() |
| | key_colors = {key: generate_rgb() for key in unique_keys} |
| |
|
| | |
| | clmns_fields = selected_columns_new.columns.to_list() |
| |
|
| | def col_template(): |
| | d = { |
| | 'values': [], |
| | 'color': None |
| | } |
| | for field in clmns_fields: |
| | d[field] = [] |
| | return d |
| |
|
| | col_dict = defaultdict(col_template) |
| |
|
| | for _, row in selected_columns_new.iterrows(): |
| | key = row['door_type'] |
| | col_dict[key]['values'].append(row['door_id']) |
| |
|
| | for field in clmns_fields: |
| | col_dict[key][field].append(row.get(field, None)) |
| |
|
| | col_dict[key]['color'] = key_colors[key] |
| |
|
| | return dict(col_dict) |
| |
|
| | |
| | def get_flattened_tuples_list_all(col_dict): |
| | exclude_fields = ['door_type', 'values'] |
| | flattened_list = [] |
| |
|
| | for values_dict in col_dict.values(): |
| | |
| | list_fields = [k for k, v in values_dict.items() |
| | if isinstance(v, list) and k not in exclude_fields] |
| | n_rows = len(values_dict[list_fields[0]]) if list_fields else 0 |
| |
|
| | for i in range(n_rows): |
| | tuple_row = tuple(values_dict[field][i] for field in list_fields) + (values_dict['color'],) |
| | flattened_list.append(tuple_row) |
| |
|
| | return flattened_list |
| |
|
| |
|
| | |
| | def get_cleaned_data_secondary(locations, main_info, secondary_info): |
| | processed = defaultdict(int) |
| |
|
| | new_data = [] |
| | if len(main_info) == 2 and len(secondary_info) == 1: |
| | for coords, label, color, acous in locations: |
| | if len(coords)>1: |
| | index = processed[label] % len(coords) |
| | new_coord = [coords[index]] |
| | new_data.append((new_coord, label, color, acous)) |
| | processed[label] += 1 |
| | if len(coords)==1: |
| | new_data.append((coords, label, color, acous)) |
| |
|
| | if len(main_info) == 2 and len(secondary_info) == 2: |
| | for coords, label, color, acous, fire in locations: |
| | if len(coords)>1: |
| | index = processed[label] % len(coords) |
| | new_coord = [coords[index]] |
| | new_data.append((new_coord, label, color, acous, fire)) |
| | processed[label] += 1 |
| | if len(coords)==1: |
| | new_data.append((coords, label, color, acous, fire)) |
| |
|
| |
|
| | if len(main_info) == 3 and len(secondary_info) == 1: |
| | for coords, label, width, color, acous in locations: |
| | if len(coords)>1: |
| | index = processed[label] % len(coords) |
| | new_coord = [coords[index]] |
| | new_data.append((new_coord, label, width, color, acous)) |
| | processed[label] += 1 |
| | if len(coords)==1: |
| | new_data.append((coords, label, width, color, acous)) |
| |
|
| | if len(main_info) == 3 and len(secondary_info) == 2: |
| | for coords, label, width, color, acous, fire in locations: |
| | if len(coords)>1: |
| | index = processed[label] % len(coords) |
| | new_coord = [coords[index]] |
| | new_data.append((new_coord, label, width, color, acous, fire)) |
| | processed[label] += 1 |
| | if len(coords)==1: |
| | new_data.append((coords, label, width, color, acous, fire)) |
| |
|
| | if len(main_info) == 4 and len(secondary_info) == 1: |
| | for coords, label, width, height, color, acous in locations: |
| | if len(coords)>1: |
| | index = processed[label] % len(coords) |
| | new_coord = [coords[index]] |
| | new_data.append((new_coord, label, width, height, color, acous)) |
| | processed[label] += 1 |
| | if len(coords)==1: |
| | new_data.append((coords, label, width, height, color, acous)) |
| |
|
| | if len(main_info) == 4 and len(secondary_info) == 2: |
| | for coords, label, width, height, color, acous, fire in locations: |
| | if len(coords)>1: |
| | index = processed[label] % len(coords) |
| | new_coord = [coords[index]] |
| | new_data.append((new_coord, label, width, height, color, acous, fire)) |
| | processed[label] += 1 |
| | if len(coords)==1: |
| | new_data.append((coords, label, width, height, color, acous, fire)) |
| |
|
| | return new_data |
| |
|
| | def get_secondary_tobeprinted_clean(selected_secondary_info, secondary_tobeprinted, secondary_info): |
| | secondary_printed_clean = [] |
| | if len(secondary_info) == 1: |
| | if any('acoustic' in col for col in selected_secondary_info.columns): |
| | for acous in secondary_tobeprinted: |
| | new_text = f"acoustic rating: {acous};" |
| | secondary_printed_clean.append(new_text) |
| | if any('fire' in col for col in selected_secondary_info.columns): |
| | for fire in secondary_tobeprinted: |
| | new_text = f"fire rating: {fire};" |
| | secondary_printed_clean.append(new_text) |
| | if len(secondary_info) == 2: |
| | for fire, acous in secondary_tobeprinted: |
| | new_text = f"fire rating: {fire}; acoustic rating: {acous};" |
| | secondary_printed_clean.append(new_text) |
| | print(new_text) |
| | return secondary_printed_clean |
| |
|
| |
|
| | def mix_width_secondary(widths, secondary_printed_clean): |
| | all_print = [] |
| | for i in range(len(widths)): |
| | newest_text = f"{widths[i]}; {secondary_printed_clean[i]}" |
| | all_print.append(newest_text) |
| | return all_print |
| |
|
| | def add_bluebeam_count_annotations_secondary(pdf_bytes, locations, main_info, secondary_info): |
| | pdf_stream = io.BytesIO(pdf_bytes) |
| | pdf_document = fitz.open("pdf", pdf_stream.read()) |
| |
|
| | page = pdf_document[0] |
| | if len(main_info) == 2 and len(secondary_info) == 1: |
| | for loc in locations: |
| | coor, lbl, acous, clr = loc |
| | clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) |
| | for cor in coor: |
| | |
| | annot = page.add_circle_annot( |
| | fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) |
| | ) |
| |
|
| | |
| | annot.set_colors(stroke=clr, fill=(1, 1, 1)) |
| | annot.set_border(width=2) |
| | annot.set_opacity(1) |
| |
|
| | |
| | annot.set_info("name", lbl) |
| | annot.set_info("subject", "Count") |
| | annot.set_info("title", lbl) |
| | annot.update() |
| |
|
| | if len(main_info) == 2 and len(secondary_info) == 2: |
| | for loc in locations: |
| | coor, lbl, acous, fire, clr = loc |
| | clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) |
| | for cor in coor: |
| | |
| | annot = page.add_circle_annot( |
| | fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) |
| | ) |
| |
|
| | |
| | annot.set_colors(stroke=clr, fill=(1, 1, 1)) |
| | annot.set_border(width=2) |
| | annot.set_opacity(1) |
| |
|
| | |
| | annot.set_info("name", lbl) |
| | annot.set_info("subject", "Count") |
| | annot.set_info("title", lbl) |
| | annot.update() |
| |
|
| | if len(main_info) == 3 and len(secondary_info) == 1: |
| | for loc in locations: |
| | if len(loc) != 5: |
| | continue |
| | coor, lbl, w, acous, clr = loc |
| | clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) |
| | for cor in coor: |
| | |
| | annot = page.add_circle_annot( |
| | fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) |
| | ) |
| |
|
| | |
| | annot.set_colors(stroke=clr, fill=(1, 1, 1)) |
| | annot.set_border(width=2) |
| | annot.set_opacity(1) |
| |
|
| | |
| | annot.set_info("name", lbl) |
| | annot.set_info("subject", "Count") |
| | annot.set_info("title", lbl) |
| | annot.update() |
| |
|
| | if len(main_info) == 3 and len(secondary_info) == 2: |
| | for loc in locations: |
| | coor, lbl, w, acous, fire, clr = loc |
| | clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) |
| | for cor in coor: |
| | |
| | annot = page.add_circle_annot( |
| | fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) |
| | ) |
| |
|
| | |
| | annot.set_colors(stroke=clr, fill=(1, 1, 1)) |
| | annot.set_border(width=2) |
| | annot.set_opacity(1) |
| |
|
| | |
| | annot.set_info("name", lbl) |
| | annot.set_info("subject", "Count") |
| | annot.set_info("title", lbl) |
| | annot.update() |
| |
|
| | if len(main_info) == 4 and len(secondary_info) == 1: |
| | for loc in locations: |
| | coor, lbl, w, h, acous, clr = loc |
| | clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) |
| | for cor in coor: |
| | |
| | annot = page.add_circle_annot( |
| | fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) |
| | ) |
| |
|
| | |
| | annot.set_colors(stroke=clr, fill=(1, 1, 1)) |
| | annot.set_border(width=2) |
| | annot.set_opacity(1) |
| |
|
| | |
| | annot.set_info("name", lbl) |
| | annot.set_info("subject", "Count") |
| | annot.set_info("title", lbl) |
| | annot.update() |
| |
|
| | if len(main_info) == 4 and len(secondary_info) == 2: |
| | for loc in locations: |
| | coor, lbl, w, h, acous, fire, clr = loc |
| | clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) |
| | for cor in coor: |
| | |
| | annot = page.add_circle_annot( |
| | fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) |
| | ) |
| |
|
| | |
| | annot.set_colors(stroke=clr, fill=(1, 1, 1)) |
| | annot.set_border(width=2) |
| | annot.set_opacity(1) |
| |
|
| | |
| | annot.set_info("name", lbl) |
| | annot.set_info("subject", "Count") |
| | annot.set_info("title", lbl) |
| | annot.update() |
| |
|
| |
|
| |
|
| | |
| | output_stream = io.BytesIO() |
| | pdf_document.save(output_stream) |
| | pdf_document.close() |
| |
|
| | return output_stream.getvalue() |
| |
|
| |
|
| | def modify_author_in_pypdf2(pdf_bytes, new_authors): |
| | pdf_stream = io.BytesIO(pdf_bytes) |
| | reader = PyPDF2.PdfReader(pdf_stream) |
| | writer = PyPDF2.PdfWriter() |
| |
|
| | author_index = 0 |
| |
|
| | for page in reader.pages: |
| | if "/Annots" in page: |
| | for annot in page["/Annots"]: |
| | annot_obj = annot.get_object() |
| | |
| | if len(new_authors) == 0: |
| | break |
| | if author_index < len(new_authors): |
| | annot_obj.update({"/T": TextStringObject(new_authors[author_index])}) |
| | author_index += 1 |
| |
|
| | |
| | else: |
| | annot_obj.update({"/T": TextStringObject(new_authors[-1])}) |
| |
|
| | writer.add_page(page) |
| |
|
| | |
| | output_stream = io.BytesIO() |
| | writer.write(output_stream) |
| | output_stream.seek(0) |
| |
|
| | return output_stream.read() |
| |
|
| |
|
| |
|
| |
|
| |
|
| | def add_bluebeam_count_annotations(pdf_bytes, locations): |
| | pdf_stream = io.BytesIO(pdf_bytes) |
| | pdf_document = fitz.open("pdf", pdf_stream.read()) |
| |
|
| | page = pdf_document[0] |
| | print(f"length of locations 0 from not sec presence: {len(locations[0])}") |
| |
|
| | for loc in locations: |
| |
|
| | if len(loc) == 3: |
| | coor, lbl, clr = loc |
| | clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) |
| | for cor in coor: |
| | |
| | annot = page.add_circle_annot( |
| | fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) |
| | ) |
| |
|
| | |
| | annot.set_colors(stroke=clr, fill=(1, 1, 1)) |
| | annot.set_border(width=2) |
| | annot.set_opacity(1) |
| |
|
| | |
| | annot.set_info("name", lbl) |
| | annot.set_info("subject", "Count") |
| | annot.set_info("title", lbl) |
| | annot.update() |
| | if len(loc) == 4: |
| | coor, lbl, clr,w = loc |
| | clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) |
| | for cor in coor: |
| | |
| | annot = page.add_circle_annot( |
| | fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) |
| | ) |
| |
|
| | |
| | annot.set_colors(stroke=clr, fill=(1, 1, 1)) |
| | annot.set_border(width=2) |
| | annot.set_opacity(1) |
| |
|
| | |
| | annot.set_info("name", lbl) |
| | annot.set_info("subject", "Count") |
| | annot.set_info("title", lbl) |
| | annot.update() |
| |
|
| | if len(loc) == 5: |
| | coor, lbl, clr,w,h = loc |
| | clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) |
| | for cor in coor: |
| | |
| | annot = page.add_circle_annot( |
| | fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) |
| | ) |
| |
|
| | |
| | annot.set_colors(stroke=clr, fill=(1, 1, 1)) |
| | annot.set_border(width=2) |
| | annot.set_opacity(1) |
| |
|
| | |
| | annot.set_info("name", lbl) |
| | annot.set_info("subject", "Count") |
| | annot.set_info("title", lbl) |
| | annot.update() |
| |
|
| | |
| | output_stream = io.BytesIO() |
| | pdf_document.save(output_stream) |
| | pdf_document.close() |
| |
|
| | return output_stream.getvalue() |
| |
|
| |
|
| | |
| | def modify_author_in_pypdf2(pdf_bytes, new_authors): |
| | pdf_stream = io.BytesIO(pdf_bytes) |
| | reader = PyPDF2.PdfReader(pdf_stream) |
| | writer = PyPDF2.PdfWriter() |
| |
|
| | author_index = 0 |
| |
|
| | for page in reader.pages: |
| | if "/Annots" in page: |
| | for annot in page["/Annots"]: |
| | annot_obj = annot.get_object() |
| | |
| | if len(new_authors) == 0: |
| | break |
| | if author_index < len(new_authors): |
| | annot_obj.update({"/T": TextStringObject(new_authors[author_index])}) |
| | author_index += 1 |
| |
|
| | |
| | else: |
| | annot_obj.update({"/T": TextStringObject(new_authors[-1])}) |
| |
|
| | writer.add_page(page) |
| |
|
| | |
| | output_stream = io.BytesIO() |
| | writer.write(output_stream) |
| | output_stream.seek(0) |
| |
|
| | return output_stream.read() |
| |
|
| |
|
| |
|
| | def merge_pdf_bytes_list(pdfs): |
| | writer = PdfWriter() |
| |
|
| | for pdf_bytes in pdfs: |
| | pdf_stream = io.BytesIO(pdf_bytes) |
| | reader = PdfReader(pdf_stream) |
| | for page in reader.pages: |
| | writer.add_page(page) |
| |
|
| | output_stream = io.BytesIO() |
| | writer.write(output_stream) |
| | output_stream.seek(0) |
| |
|
| | return output_stream.read() |
| |
|
| |
|
| | def process_pdf_secondary(input_pdf_path, output_pdf_path, locations, new_authors, main_info, secondary_info): |
| | |
| | if isinstance(input_pdf_path, bytes): |
| | original_pdf_bytes = input_pdf_path |
| | else: |
| | with open(input_pdf_path, "rb") as file: |
| | original_pdf_bytes = file.read() |
| |
|
| | |
| | annotated_pdf_bytes = add_bluebeam_count_annotations_secondary(original_pdf_bytes, locations, main_info, secondary_info) |
| |
|
| | |
| | final_pdf_bytes = modify_author_in_pypdf2(annotated_pdf_bytes, new_authors) |
| |
|
| | return final_pdf_bytes |
| |
|
| |
|
| | def process_pdf(input_pdf_path, output_pdf_path, locations, new_authors): |
| | |
| | if isinstance(input_pdf_path, bytes): |
| | original_pdf_bytes = input_pdf_path |
| | else: |
| | with open(input_pdf_path, "rb") as file: |
| | original_pdf_bytes = file.read() |
| |
|
| | |
| | annotated_pdf_bytes = add_bluebeam_count_annotations(original_pdf_bytes, locations) |
| |
|
| | |
| | final_pdf_bytes = modify_author_in_pypdf2(annotated_pdf_bytes, new_authors) |
| | return final_pdf_bytes |
| |
|
| | def mainRun(schedule, plan, searcharray): |
| | print("mainRun is RUNNING") |
| | |
| | |
| | eltype = type(plan) |
| | print(f"el type beta3 variable plan:: {eltype}") |
| | len_plan = len(plan) |
| | print(f"length of the plan's array is: {len_plan}") |
| | p1_type = type(plan[0]) |
| | print(f"el mawgood fe p[0]: {p1_type}") |
| | |
| | print(f"search array: {searcharray}") |
| | |
| | dfs = extract_tables(schedule) |
| |
|
| | pdfs = [] |
| | for p in plan: |
| | pdf_document = fitz.open("pdf", p) |
| | |
| | page = pdf_document[0] |
| | rect = page.rect |
| |
|
| | width_pdf = rect.width |
| | height_pdf = rect.height |
| |
|
| | print(f"plan width: {width_pdf}") |
| | print(f"plan height: {height_pdf}") |
| |
|
| | all_new_data = [] |
| | all_widths = [] |
| | pdf_outputs = [] |
| | |
| | for j in range(len(searcharray)): |
| | user_input = searcharray[j] |
| | |
| | secondary_presence = False |
| | if user_input[4] or user_input[5]: |
| | secondary_presence = True |
| | main_info_, secondary_info_ = separate_main_secondary(user_input) |
| | main_info = [item for item in main_info_ if item] |
| | secondary_info = [item for item in secondary_info_ if item] |
| | print("feh secondary information") |
| | if user_input[4]: |
| | print("Fire rate mawgooda") |
| | if user_input[5]: |
| | print("Acoustic Rate mawgooda") |
| | else: |
| | print("mafeesh secondary information") |
| | |
| | selected_columns_combined = get_selected_columns_all(dfs, user_input) |
| | kelma = get_st_op_pattern(selected_columns_combined, user_input) |
| | col_dict = get_similar_colors_all(selected_columns_combined) |
| | flattened_list = get_flattened_tuples_list_all(col_dict) |
| | plan_texts = read_text(p) |
| |
|
| | if secondary_presence: |
| | plan_texts = read_text(p) |
| | locations, not_found = get_word_locations_plan_secondary(flattened_list,plan_texts, main_info, secondary_info) |
| | new_data3 = get_cleaned_data_secondary(locations,main_info,secondary_info) |
| | |
| | |
| | all_new_data.append(new_data3) |
| | repeated_labels = get_repeated_labels(locations) |
| | if kelma == None: |
| | widths, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info) |
| | else: |
| | width_info_tobeprinted, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info) |
| | cleaned_width = get_cleaned_width(width_info_tobeprinted) |
| | widths = get_widths_bb_format(cleaned_width, kelma) |
| | |
| | if selected_columns_combined.shape[1] == 2: |
| | widths = [] |
| | |
| | secondary_printed_clean = get_secondary_tobeprinted_clean(selected_columns_combined, secondary_tobeprinted, secondary_info) |
| | all_print = mix_width_secondary(widths, secondary_printed_clean) |
| |
|
| | |
| | all_widths.append(all_print) |
| | |
| | |
| | |
| | |
| | if pdf_outputs: |
| | final_pdf_bytes = process_pdf_secondary(pdf_outputs[j-1], "final_output_multiple_input_new2.pdf", all_new_data[j], all_widths[j], main_info, secondary_info) |
| | pdf_outputs.append(final_pdf_bytes) |
| | else: |
| | final_pdf_bytes = process_pdf_secondary(p, "final_output_multiple_input_new2.pdf", all_new_data[j], all_widths[j], main_info, secondary_info) |
| | pdf_outputs.append(final_pdf_bytes) |
| | |
| | else: |
| | locations, not_found = get_word_locations_plan(flattened_list,plan_texts) |
| | new_data = get_cleaned_data(locations) |
| | |
| | all_new_data.append(new_data) |
| | repeated_labels = get_repeated_labels(locations) |
| | if kelma == None: |
| | widths = get_width_info_tobeprinted(new_data) |
| | else: |
| | width_info_tobeprinted = get_width_info_tobeprinted(new_data) |
| | cleaned_width = get_cleaned_width(width_info_tobeprinted) |
| | widths = get_widths_bb_format(cleaned_width, kelma) |
| |
|
| | |
| | if selected_columns_combined.shape[1] == 2: |
| | widths = [] |
| | |
| | |
| | all_widths.append(widths) |
| | |
| | flat_list_new_data = [item for sublist in all_new_data for item in sublist] |
| | flat_list_widths = [item for sublist in all_widths for item in sublist] |
| | |
| | if pdf_outputs: |
| | final_pdf_bytes = process_pdf(pdf_outputs[j-1], "final_output_width_trial.pdf", all_new_data[j], all_widths[j]) |
| | pdf_outputs.append(final_pdf_bytes) |
| | else: |
| | final_pdf_bytes = process_pdf(p, "final_output_width_trial.pdf", all_new_data[j], all_widths[j]) |
| | pdf_outputs.append(final_pdf_bytes) |
| | |
| |
|
| | pdfs.append(final_pdf_bytes) |
| | merged_pdf = merge_pdf_bytes_list(pdfs) |
| | print(f"number of pges of merged_pdf is {len(merged_pdf)} and its type is {type(merged_pdf)}") |
| | |
| | not_found = [] |
| | doc2 =fitz.open('pdf',merged_pdf) |
| | len_doc2 = len(doc2) |
| | print(f"number of pges of doc2 is {len_doc2} and its type is {type(doc2)}") |
| | page=doc2[0] |
| | pix = page.get_pixmap() |
| | pl=Image.frombytes('RGB', [pix.width,pix.height],pix.samples) |
| | img=np.array(pl) |
| | annotatedimg = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) |
| | |
| | |
| | list1=pd.DataFrame(columns=['content', 'id', 'subject','color']) |
| | |
| | |
| | for page in doc2: |
| | |
| | for annot in page.annots(): |
| | |
| | annot_color = annot.colors |
| | if annot_color is not None: |
| | |
| | stroke_color = annot_color.get('stroke') |
| | fill_color = annot_color.get('fill') |
| | if fill_color: |
| | v='fill' |
| | |
| | if stroke_color: |
| | v='stroke' |
| | x,y,z=int(annot_color.get(v)[0]*255),int(annot_color.get(v)[1]*255),int(annot_color.get(v)[2]*255) |
| | list1.loc[len(list1)] =[annot.info['content'],annot.info['id'],annot.info['subject'],[x,y,z]] |
| | return annotatedimg, doc2 , list1, repeated_labels , not_found |
| |
|