Spaces:
Sleeping
Sleeping
| from collections import defaultdict | |
| from collections import Counter | |
| import pandas as pd | |
| import random | |
| import math | |
| import re | |
| import io | |
| import pypdfium2 as pdfium | |
| import fitz | |
| from PIL import Image, ImageDraw | |
| from PyPDF2 import PdfReader, PdfWriter | |
| from PyPDF2.generic import TextStringObject, NameObject, ArrayObject, FloatObject | |
| from PyPDF2.generic import NameObject, TextStringObject, DictionaryObject, FloatObject, ArrayObject | |
| from PyPDF2 import PdfReader | |
| from PyPDF2.generic import TextStringObject | |
| import numpy as np | |
| import cv2 | |
| from collections import defaultdict | |
| import random | |
| import fitz # PyMuPDF | |
| import PyPDF2 | |
| import io | |
| from PyPDF2.generic import TextStringObject # ✅ Required for setting string values | |
| from PyPDF2 import PdfReader, PdfWriter | |
| import zlib | |
| import base64 | |
| import datetime | |
| import uuid | |
| from xml.etree.ElementTree import Element, SubElement, tostring, ElementTree | |
| from xml.dom.minidom import parseString | |
| from collections import defaultdict | |
| from xml.etree.ElementTree import Element, SubElement, tostring | |
| from azure.ai.formrecognizer import DocumentAnalysisClient | |
| from azure.core.credentials import AzureKeyCredential | |
| import chardet | |
| def convert2img(path): | |
| pdf = pdfium.PdfDocument(path) | |
| page = pdf.get_page(0) | |
| pil_image = page.render().to_pil() | |
| pl1=np.array(pil_image) | |
| img = cv2.cvtColor(pl1, cv2.COLOR_RGB2BGR) | |
| return img | |
| def convert2pillow(path): | |
| pdf = pdfium.PdfDocument(path) | |
| page = pdf.get_page(0) | |
| pil_image = page.render().to_pil() | |
| return pil_image | |
| def calculate_midpoint(x1,y1,x2,y2): | |
| xm = int((x1 + x2) / 2) | |
| ym = int((y1 + y2) / 2) | |
| return (xm, ym) | |
| def read_text(input_pdf_path): | |
| pdf_document = fitz.open('pdf',input_pdf_path) | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document[page_num] | |
| text_instances = page.get_text("words") | |
| page.apply_redactions() | |
| return text_instances | |
| def normalize_text(text): | |
| """ | |
| Normalize text by removing all whitespace characters and converting to lowercase. | |
| """ | |
| if not isinstance(text, str): | |
| return "" | |
| # Remove all whitespace characters (spaces, tabs, newlines) | |
| text = re.sub(r'\s+', '', text) | |
| return text.lower() | |
| def build_flexible_regex(term): | |
| """ | |
| Match the full string, allowing whitespace or light punctuation between words, | |
| but not allowing extra words or partial matches. | |
| """ | |
| words = normalize_text(term).split() | |
| pattern = r'[\s\.\:\-]*'.join(map(re.escape, words)) | |
| full_pattern = rf'^{pattern}$' | |
| return re.compile(full_pattern, re.IGNORECASE) | |
| def flexible_search(df, search_terms): | |
| """ | |
| Search for terms in column names and top N rows. | |
| Returns matched column indices and cell positions. | |
| """ | |
| normalized_columns = [normalize_text(col) for col in df.columns] | |
| results = {term: {"col_matches": [], "cell_matches": []} for term in search_terms} | |
| for term in search_terms: | |
| regex = build_flexible_regex(term) | |
| # Search in column names | |
| for col_idx, col_text in enumerate(df.columns): | |
| norm_col = normalize_text(col_text) | |
| if regex.search(norm_col): | |
| results[term]["col_matches"].append(col_idx) | |
| # Search in top N rows | |
| for row_idx in range(min(3, len(df))): | |
| for col_idx in range(len(df.columns)): | |
| cell_text = normalize_text(df.iat[row_idx, col_idx]) | |
| if regex.search(cell_text): | |
| results[term]["cell_matches"].append((row_idx, col_idx)) | |
| return results | |
| """def generate_current_table_without_cropping(clm_idx, clmn_name, df): | |
| selected_df = df.iloc[:, clm_idx] | |
| print("hello I generated the selected columns table without cropping") | |
| selected_df.columns = clmn_name | |
| return selected_df""" | |
| def generate_current_table_without_cropping(clm_idx,df): | |
| selected_df = df.iloc[:, clm_idx] | |
| print("hello I generated the selected columns table without cropping") | |
| return selected_df | |
| def crop_rename_table(indices, clmn_name, clmn_idx,df): | |
| #crop_at = (max(set(indices), key=indices.count)) + 1 | |
| crop_at = max(indices) + 1 | |
| df = df.iloc[crop_at:] # Starts from row index 5 (zero-based index) | |
| df.reset_index(drop=True, inplace=True) # Reset index after cropping | |
| slctd_clms = df.iloc[:, clmn_idx] # Select columns by index | |
| slctd_clms.columns = clmn_name # Rename selected columns | |
| return slctd_clms | |
| def clean_column_row(row): | |
| return [re.sub(r'^\d+-\s*', '', str(cell)) for cell in row] | |
| def details_in_another_table(clmn_name, clmn_idx, current_dfs, dfs): | |
| matching_dfs = [ | |
| dff for dff in dfs | |
| if dff is not current_dfs and current_dfs.shape[1] == dff.shape[1] | |
| ] | |
| if not matching_dfs: | |
| return None | |
| updated_dfs = [] | |
| for dff in matching_dfs: | |
| selected_dff = dff.iloc[:, clmn_idx].copy() | |
| # Clean the column names and make them a row | |
| cleaned_header = clean_column_row(selected_dff.columns.tolist()) | |
| col_names_as_row = pd.DataFrame([cleaned_header]) | |
| # Rename columns | |
| selected_dff.columns = clmn_name | |
| col_names_as_row.columns = clmn_name | |
| # Combine the cleaned row with data | |
| temp_df = pd.concat([col_names_as_row, selected_dff], ignore_index=True) | |
| updated_dfs.append(temp_df) | |
| combined_df = pd.concat(updated_dfs, ignore_index=True) | |
| return combined_df | |
| def map_user_input_to_standard_labels(user_inputs): | |
| patterns = { | |
| 'door_id': r'\b(?:door\s*)?(?:id|no|number)\b|\bdoor\s*name\b', | |
| 'door_type': r'\b(?:\S+\s+)?door\s*type\b|\btype(?:\s+\w+)?\b', | |
| 'structural_opening': r'\bstructural\s+opening\b', | |
| 'width': r'\bwidth\b', | |
| 'height': r'\bheight\b', | |
| } | |
| def normalize(text): | |
| return re.sub(r'\s+', ' ', text.strip(), flags=re.MULTILINE).lower() | |
| mapped = {} | |
| for item in user_inputs: | |
| normalized_item = normalize(item) | |
| matched = False | |
| for label, pattern in patterns.items(): | |
| if label not in mapped and re.search(pattern, normalized_item, re.IGNORECASE): | |
| mapped[label] = item | |
| matched = True | |
| break | |
| #if not matched: | |
| # mapped[normalized_item] = None | |
| return mapped | |
| def analyse_cell_columns(cell_columns_appearance): | |
| cell_matches = [] | |
| col_matches = [] | |
| for key in cell_columns_appearance.keys(): | |
| if len(cell_columns_appearance[key]['cell_matches']) >0: | |
| cell_matches.append(cell_columns_appearance[key]['cell_matches'][0]) | |
| if len(cell_columns_appearance[key]['col_matches']) >0: | |
| col_matches.append(cell_columns_appearance[key]['col_matches'][0]) | |
| return cell_matches, col_matches | |
| # when column names are located in the cells | |
| def get_row_column_indices(cell_clmn_indx): | |
| row_index = [] | |
| column_index = [] | |
| for t in cell_clmn_indx: | |
| row_index.append(t[0]) | |
| column_index.append(t[1]) | |
| return row_index, column_index | |
| # when column names are located in the coulmns itself | |
| def get_column_index(col_matches): | |
| idx = [] | |
| for t in col_matches: | |
| idx.append(t) | |
| return idx | |
| def extract_tables(schedule): | |
| doc = fitz.open("pdf",schedule) | |
| for page in doc: | |
| tabs = page.find_tables() | |
| dfs = [] | |
| for tab in tabs: | |
| df = tab.to_pandas() | |
| dfs.append(df) | |
| return dfs | |
| def get_selected_columns(dfs, user_patterns): | |
| selected_columns = [] | |
| selected_columns_new = None # Initialize selected_columns_new to None | |
| for i in range(len(dfs)): | |
| cell_columns_appearance = flexible_search(dfs[i], user_patterns) | |
| cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) | |
| if len(user_patterns) == 2: | |
| clmn_name = ["door_id", "door_type"] | |
| if len(user_patterns) == 4: | |
| clmn_name = ["door_id", "door_type", "width", "height"] | |
| if len(user_patterns) == 3: | |
| clmn_name = ["door_id", "door_type", "structural opening"] | |
| if len(cell_matches) == 0 and len(col_matches) == 0: | |
| print(f"this is df {i}, SEARCH IN ANOTHER DF") | |
| else: | |
| #IN COLUMNS | |
| if len(col_matches) == len(user_patterns): | |
| column_index_list = get_column_index(col_matches) | |
| print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") | |
| print(column_index_list) | |
| if len(dfs[i]) <10: | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) | |
| #break | |
| #IN CELLS | |
| if len(cell_matches) == len(user_patterns): | |
| row_index_list, column_index_list = get_row_column_indices(cell_matches) | |
| print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") | |
| #details in another table | |
| if len(dfs[i]) <10: | |
| #selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| break | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") | |
| selected_columns_new = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) | |
| break | |
| return selected_columns_new | |
| def separate_main_secondary(input_user_clmn_names): | |
| main_info = input_user_clmn_names[:4] | |
| secondary_info = input_user_clmn_names[4:] | |
| return main_info, secondary_info | |
| # take main info | |
| def get_column_name(user_input_m): | |
| #get empty indices | |
| empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] | |
| # fixed column names | |
| fixed_list = ["door_id", "door_type", "width", "height"] | |
| for i in range(len(empty_indices)): | |
| if empty_indices[i] == 3 and empty_indices[i - 1] == 2: | |
| fixed_list[2] = "" | |
| if empty_indices[i] == 3 and not empty_indices[i - 1] == 2: | |
| fixed_list[2] = "structural_opening" | |
| fixed_list[empty_indices[i]] = "" | |
| #finalize the column name structure | |
| clmn_name_m = [i for i in fixed_list if i] | |
| return clmn_name_m | |
| # take secondary info | |
| def get_column_name_secondary(user_input_m): | |
| #get empty indices | |
| empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] | |
| # fixed column names | |
| fixed_list = ["fire_rate", "acoustic_rate"] | |
| for i in range(len(empty_indices)): | |
| fixed_list[empty_indices[i]] = "" | |
| #finalize the column name structure | |
| clmn_name_m = [i for i in fixed_list if i] | |
| return clmn_name_m | |
| ### byte type not path | |
| def extract_tables_model(schedule_byte): | |
| # Set your Azure credentials | |
| endpoint = "https://tabledetection2.cognitiveservices.azure.com/" | |
| key = "5lr94dODMJihbGOMw2Vdz29zXRBiqt528fSGoGmzSJHTrWtHSnRdJQQJ99BEACYeBjFXJ3w3AAALACOGBANH" | |
| # Create client | |
| client = DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key)) | |
| poller = client.begin_analyze_document("prebuilt-layout", document=schedule_byte) | |
| # Get result | |
| result = poller.result() | |
| #print(result) | |
| import pandas as pd | |
| tables = [] | |
| for table in result.tables: | |
| max_cols = max(cell.column_index for cell in table.cells) + 1 | |
| max_rows = max(cell.row_index for cell in table.cells) + 1 | |
| table_data = [["" for _ in range(max_cols)] for _ in range(max_rows)] | |
| for cell in table.cells: | |
| table_data[cell.row_index][cell.column_index] = cell.content | |
| df = pd.DataFrame(table_data) | |
| tables.append(df) | |
| return tables | |
| #handling both main and secondary info together in one table | |
| def get_selected_columns_all(dfs, user_patterns): | |
| selected_columns = [] | |
| selected_columns_new = None # Initialize selected_columns_new to None | |
| for i in range(len(dfs)): | |
| main_info, secondary_info = separate_main_secondary(user_patterns) | |
| clmn_name_main = get_column_name(main_info) | |
| non_empty_main_info = [item for item in main_info if item] | |
| clmn_name_secondary = get_column_name_secondary(secondary_info) | |
| non_empty_secondary_info = [item for item in secondary_info if item] | |
| clmn_name = clmn_name_main + clmn_name_secondary | |
| non_empty_info = non_empty_main_info + non_empty_secondary_info | |
| #print(f"main info: {main_info}") | |
| print(f"clmn name: {clmn_name}") | |
| print(f"non-empty info: {non_empty_info}") | |
| #print(f"length of non-empty info: {len(non_empty_main_info)}") | |
| cell_columns_appearance = flexible_search(dfs[i], non_empty_info) | |
| cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) | |
| print(f"length of cell_matches: {len(cell_matches)}") | |
| print(f"cell_matches: {cell_matches}") | |
| #clmn_name = map_user_input_to_standard_labels(user_patterns) | |
| #if len(clmn_name) < len(user_patterns): | |
| print(clmn_name) | |
| if len(cell_matches) == 0 and len(col_matches) == 0: | |
| print(f"this is df {i}, SEARCH IN ANOTHER DF") | |
| else: | |
| #IN COLUMNS | |
| if len(col_matches) == len(non_empty_info): | |
| column_index_list = get_column_index(col_matches) | |
| print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") | |
| #print(len(clm_idx)) | |
| #details in another table | |
| print(column_index_list) | |
| if len(dfs[i]) <10: | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| #break | |
| #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) | |
| #break | |
| #IN CELLS | |
| if len(cell_matches) == len(non_empty_info): | |
| row_index_list, column_index_list = get_row_column_indices(cell_matches) | |
| print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") | |
| #details in another table | |
| #if len(dfs[i]) <2: | |
| #selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| selected_columns_new2 = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) | |
| selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True) | |
| break | |
| #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) | |
| ##details in the same table | |
| #if len(dfs[i]) >2: | |
| # #print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") | |
| #break | |
| return selected_columns_new | |
| #for new dictionary logic | |
| def get_selected_columns_all(dfs, user_patterns): | |
| selected_columns = [] | |
| selected_columns_new = None # Initialize selected_columns_new to None | |
| for i in range(len(dfs)): | |
| extra_info = user_patterns[6:] | |
| main_info, secondary_info = separate_main_secondary(user_patterns) | |
| clmn_name_main = get_column_name(main_info) | |
| non_empty_main_info = [item for item in main_info if item] | |
| clmn_name_secondary = get_column_name_secondary(secondary_info) | |
| non_empty_secondary_info = [item for item in secondary_info if item] | |
| #clmn_name = clmn_name_main + clmn_name_secondary | |
| clmn_name = clmn_name_main + clmn_name_secondary + extra_info | |
| non_empty_info = non_empty_main_info + non_empty_secondary_info | |
| #print(f"main info: {main_info}") | |
| print(f"clmn name: {clmn_name}") | |
| print(f"non-empty info: {non_empty_info}") | |
| #print(f"length of non-empty info: {len(non_empty_main_info)}") | |
| cell_columns_appearance = flexible_search(dfs[i], non_empty_info) | |
| cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) | |
| print(f"length of cell_matches: {len(cell_matches)}") | |
| print(f"cell_matches: {cell_matches}") | |
| print(f"col_matches: {col_matches}") | |
| #clmn_name = map_user_input_to_standard_labels(user_patterns) | |
| #if len(clmn_name) < len(user_patterns): | |
| print(clmn_name) | |
| if len(cell_matches) == 0 and len(col_matches) == 0: | |
| print(f"this is df {i}, SEARCH IN ANOTHER DF") | |
| else: | |
| #IN COLUMNS | |
| if len(col_matches) == len(non_empty_info): | |
| column_index_list = get_column_index(col_matches) | |
| print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") | |
| #print(len(clm_idx)) | |
| #details in another table | |
| print(column_index_list) | |
| #if len(dfs[i]) <10: | |
| #break | |
| #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) | |
| #details in the same table | |
| #if len(dfs[i]) >10: | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| selected_columns_new2 = generate_current_table_without_cropping(column_index_list,dfs[i]) | |
| selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True) | |
| selected_columns_new.columns = clmn_name # must match number of columns | |
| #break | |
| #IN CELLS | |
| if len(cell_matches) == len(non_empty_info): | |
| row_index_list, column_index_list = get_row_column_indices(cell_matches) | |
| print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") | |
| #details in another table | |
| #if len(dfs[i]) <2: | |
| #selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) | |
| print(f"column names: {clmn_name}") | |
| print(f"column index list: {column_index_list}") | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| selected_columns_new2 = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) | |
| selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True) | |
| break | |
| #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) | |
| ##details in the same table | |
| #if len(dfs[i]) >2: | |
| # #print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") | |
| #break | |
| return selected_columns_new | |
| # 3ayz akhaleehaa te search fel selected_columns column names nafsaha | |
| # 7ab2a 3ayz a3raf bardo maktooba ezay fel df el 7a2e2ya (akeed za ma el user medakhalha bezabt) | |
| def get_st_op_pattern(selected_columns, user_input): | |
| target = 'structural_opening' | |
| if target in selected_columns.columns: | |
| name = user_input[2] | |
| return name | |
| return None | |
| def find_text_in_plan(label, x): | |
| substring_coordinates = [] | |
| words = [] | |
| point_list = [] | |
| #None, None, None | |
| for tpl in x: | |
| if tpl[4] == label: | |
| substring_coordinates.append(calculate_midpoint(tpl[0],tpl[1],tpl[2],tpl[3]))# for pdf | |
| point_list.append(calculate_midpoint(tpl[1],tpl[0],tpl[3],tpl[2]))# for rotated | |
| words.append(tpl[4]) | |
| return substring_coordinates, words, point_list | |
| def get_selected_columns_by_index(df, column_index_list, user_patterns): | |
| selected_df = df.iloc[:, column_index_list] | |
| # Rename columns to match the structure of the clr_dictionary | |
| main_info, secondary_info = separate_main_secondary(user_patterns) | |
| clmn_name_main = get_column_name(main_info) | |
| clmn_name_secondary = get_column_name_secondary(secondary_info) | |
| clmn_name = clmn_name_main + clmn_name_secondary | |
| print(f"clmn_name from the function el 3amla moshkela: {clmn_name}") | |
| selected_df.columns = clmn_name | |
| return selected_df | |
| ## Get the column indices from extract_tables(schedule) | |
| def get_column_indices_from_dfs_normal(dfs, user_patterns): | |
| for i in range(len(dfs)): | |
| main_info, secondary_info = separate_main_secondary(user_patterns) | |
| clmn_name_main = get_column_name(main_info) | |
| non_empty_main_info = [item for item in main_info if item] | |
| clmn_name_secondary = get_column_name_secondary(secondary_info) | |
| non_empty_secondary_info = [item for item in secondary_info if item] | |
| clmn_name = clmn_name_main + clmn_name_secondary | |
| non_empty_info = non_empty_main_info + non_empty_secondary_info | |
| cell_columns_appearance = flexible_search(dfs[i], non_empty_info) | |
| cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) | |
| if len(cell_matches) == 0 and len(col_matches) == 0 and i < len(dfs) - 1: | |
| continue | |
| elif len(cell_matches) == 0 and len(col_matches) == 0: | |
| column_index_list = None | |
| else: | |
| #IN COLUMNS | |
| if len(col_matches) == len(non_empty_info): | |
| column_index_list = get_column_index(col_matches) | |
| print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") | |
| #print(f"column index list: {column_index_list}") | |
| break | |
| #IN CELLS | |
| if len(cell_matches) == len(non_empty_info): | |
| row_index_list, column_index_list = get_row_column_indices(cell_matches) | |
| print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") | |
| #print(f"column index list: {column_index_list}") | |
| break | |
| return column_index_list | |
| def find_missing_columns(complete_list, non_complete_list): | |
| def normalize_text(text): | |
| if not isinstance(text, str): | |
| return "" | |
| text = re.sub(r'\s+', '', text) # Remove all whitespace | |
| return text.lower() | |
| def normalize_text(text): | |
| """ | |
| Normalize text by removing all whitespace, brackets, and converting to lowercase. | |
| """ | |
| if not isinstance(text, str): | |
| return "" | |
| # Remove all whitespace characters (spaces, tabs, newlines) | |
| text = re.sub(r'\s+', '', text) | |
| # Remove brackets of any type | |
| text = re.sub(r'[\(\)\[\]\{\}]', '', text) | |
| return text.lower() | |
| complete_list = complete_list | |
| non_complete = non_complete_list | |
| # Normalize non_complete just once for speed | |
| normalized_non_complete = [normalize_text(item) for item in non_complete] | |
| missing = [] | |
| for item in complete_list: | |
| normalized_item = normalize_text(item) | |
| if normalized_item not in normalized_non_complete: | |
| missing.append(item) | |
| #delete empty fields as it is the 6 fixed fields approach | |
| missing = [item for item in missing if item] | |
| #print(f"{missing} can't be found in the schedule, make sure you entered it right or try entering the first row information instead of the column names") | |
| return missing | |
| # Returns the columns the code failed to locate on the schedule | |
| def check_missing(dfs, user_patterns): | |
| all_words = [] | |
| for i in range(len(dfs)): | |
| main_info, secondary_info = separate_main_secondary(user_patterns) | |
| clmn_name_main = get_column_name(main_info) | |
| non_empty_main_info = [item for item in main_info if item] | |
| clmn_name_secondary = get_column_name_secondary(secondary_info) | |
| non_empty_secondary_info = [item for item in secondary_info if item] | |
| clmn_name = clmn_name_main + clmn_name_secondary | |
| non_empty_info = non_empty_main_info + non_empty_secondary_info | |
| cell_columns_appearance = flexible_search(dfs[i], non_empty_info) | |
| cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) | |
| words = [dfs[i].iloc[row, col] for row, col in cell_matches] | |
| all_words.append(words) | |
| found_words = max(all_words, key=len) | |
| print(found_words) | |
| missings = find_missing_columns(user_patterns, found_words) | |
| return missings | |
| # get the index of dataframe that has the maximum column matches in the dfs from model table detection | |
| def get_df_index(dfs, user_patterns): | |
| df_matches = [] | |
| for i in range(len(dfs)): | |
| main_info, secondary_info = separate_main_secondary(user_patterns) | |
| clmn_name_main = get_column_name(main_info) | |
| non_empty_main_info = [item for item in main_info if item] | |
| clmn_name_secondary = get_column_name_secondary(secondary_info) | |
| non_empty_secondary_info = [item for item in secondary_info if item] | |
| clmn_name = clmn_name_main + clmn_name_secondary | |
| non_empty_info = non_empty_main_info + non_empty_secondary_info | |
| cell_columns_appearance = flexible_search(dfs[i], non_empty_info) | |
| cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) | |
| if len(cell_matches) == 0 and len(col_matches) == 0: | |
| continue | |
| else: | |
| column_index_list_from_columns = get_column_index(col_matches) | |
| row_index_list, column_index_list_from_cells = get_row_column_indices(cell_matches) | |
| if len(column_index_list_from_columns) > len(column_index_list_from_cells): | |
| df_matches.append((column_index_list_from_columns,i)) | |
| else: | |
| df_matches.append((column_index_list_from_cells,i)) | |
| longest_list = max(df_matches, key=lambda x: len(x[0])) | |
| #index of the longest list will be the df number | |
| index_longest_list = longest_list[1] | |
| return index_longest_list | |
| def get_word_locations_plan(flattened_list, plan_texts): | |
| locations = [] | |
| not_found = [] | |
| if len(flattened_list[0]) == 2: | |
| for lbl, clr in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr)) | |
| if len(flattened_list[0]) == 3: | |
| for lbl, w, clr in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr, w)) | |
| if len(flattened_list[0]) == 4: | |
| for lbl, w, h, clr in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr, w, h)) | |
| return locations, not_found | |
| def get_repeated_labels(locations): | |
| seen_labels = set() | |
| repeated_labels = set() | |
| for item in locations: | |
| label = item[1] | |
| if label in seen_labels: | |
| repeated_labels.add(label) | |
| else: | |
| seen_labels.add(label) | |
| return repeated_labels | |
| def get_cleaned_data(locations): | |
| processed = defaultdict(int) | |
| new_data = [] | |
| if len(locations[0]) == 3: | |
| for coords, label, color in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color)) | |
| if len(locations[0]) == 4: | |
| for coords, label, color, w in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color, w)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color, w)) | |
| if len(locations[0]) == 5: | |
| for coords, label, color, w, h in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color, w, h)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color, w, h)) | |
| return new_data | |
| # law 0.5 maslan tetkatab we law mesh keda yesheel el decimal point | |
| def get_width_info_tobeprinted(new_data): | |
| width_info_tobeprinted = [] | |
| if len(new_data[0]) < 4: | |
| for _,_,_, in new_data: | |
| width_info_tobeprinted.append("N/A mm wide x N/A mm high") | |
| if len(new_data[0]) == 4: | |
| for _,_,_, w in new_data: | |
| #w = re.sub(r",", "", w) | |
| #w = int(float(w)) | |
| width_info_tobeprinted.append(w) | |
| if len(new_data[0]) == 5: | |
| for _,_,_, w,h in new_data: | |
| w = re.sub(r",", "", w) | |
| h = re.sub(r",", "", h) | |
| #if w == "N/A": | |
| #if w.isalpha(): | |
| if is_not_number(w): | |
| w = w | |
| else: | |
| if float(w).is_integer(): | |
| w = int(float(w)) | |
| else: | |
| w = w | |
| #if h == "N/A": | |
| #if h.isalpha(): | |
| if is_not_number(h): | |
| h = h | |
| else: | |
| if float(h).is_integer(): | |
| h = int(float(h)) | |
| else: | |
| h = h | |
| width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") | |
| return width_info_tobeprinted | |
| def clean_dimensions(text): | |
| # Remove commas and "mm" | |
| text = re.sub(r'[,\s]*mm', '', text) # Remove "mm" with optional spaces or commas before it | |
| text = text.replace(",", "") # Remove remaining commas if any | |
| return text | |
| def get_cleaned_width(width_info_tobeprinted): | |
| cleaned_width = [] | |
| for w in width_info_tobeprinted: | |
| cleaned_width.append(clean_dimensions(w)) | |
| return cleaned_width | |
| def get_widths_bb_format(cleaned_width, kelma): | |
| pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b" | |
| match = re.search(pattern, kelma) | |
| widths = [] | |
| for widthaa in cleaned_width: | |
| index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x")) | |
| width_name = widthaa[:index] | |
| height_name = widthaa[index+1:] | |
| width_name = int(float(width_name)) | |
| height_name = int(float(height_name)) | |
| if match: | |
| full_text = f"{width_name} mm wide x {height_name} mm high" | |
| else: | |
| full_text = f"{height_name} mm wide x {width_name} mm high" | |
| widths.append(full_text) | |
| return widths | |
| def is_not_number(s: str) -> bool: | |
| try: | |
| float(s) # accepts ints, floats, scientific notation | |
| return False # it *is* a number | |
| except ValueError: | |
| return True # not a number | |
| def get_width_info_tobeprinted_secondary(new_data, main_info, secondary_info): | |
| width_info_tobeprinted = [] | |
| secondary_info_tobeprinted = [] | |
| if len(main_info) == 2 and len(secondary_info) == 1: | |
| for coords, label, acous, color in new_data: | |
| secondary_info_tobeprinted.append(acous) | |
| width_info_tobeprinted.append("N/A mm wide x N/A mm high") | |
| if len(main_info) == 2 and len(secondary_info) == 2: | |
| for coords, label, acous, fire, color in new_data: | |
| secondary_info_tobeprinted.append((acous, fire)) | |
| width_info_tobeprinted.append("N/A mm wide x N/A mm high") | |
| if len(main_info) == 3 and len(secondary_info) == 1: | |
| for coords, label, width, acous, color in new_data: | |
| width_info_tobeprinted.append(width) | |
| secondary_info_tobeprinted.append(acous) | |
| if len(main_info) == 3 and len(secondary_info) == 2: | |
| for coords, label, width, acous, fire, color in new_data: | |
| width_info_tobeprinted.append(width) | |
| secondary_info_tobeprinted.append((acous, fire)) | |
| if len(main_info) == 4 and len(secondary_info) == 1: | |
| for coords, label, width, height, acous, color in new_data: | |
| w = re.sub(r",", "", width) | |
| h = re.sub(r",", "", height) | |
| #if w.isalpha(): | |
| if is_not_number(w): | |
| w = w | |
| else: | |
| if float(w).is_integer(): | |
| w = int(float(w)) | |
| else: | |
| w = w | |
| #if h == "N/A": | |
| #if h.isalpha(): | |
| if is_not_number(h): | |
| h = h | |
| else: | |
| if float(h).is_integer(): | |
| h = int(float(h)) | |
| else: | |
| h = h | |
| width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") | |
| secondary_info_tobeprinted.append((acous)) | |
| if len(main_info) == 4 and len(secondary_info) == 2: | |
| for coords, label, width, height, acous, fire, color in new_data: | |
| print(type(width)) | |
| print(type(height)) | |
| w = re.sub(r",", "", width) | |
| h = re.sub(r",", "", height) | |
| #if w == "N/A": | |
| #if w.isalpha(): | |
| if is_not_number(w): | |
| w = w | |
| else: | |
| if float(w).is_integer(): | |
| w = int(float(w)) | |
| else: | |
| w = w | |
| #if h == "N/A": | |
| #if h.isalpha(): | |
| if is_not_number(h): | |
| h = h | |
| else: | |
| if float(h).is_integer(): | |
| h = int(float(h)) | |
| else: | |
| h = h | |
| width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") | |
| secondary_info_tobeprinted.append((acous, fire)) | |
| return width_info_tobeprinted, secondary_info_tobeprinted | |
| def get_word_locations_plan_secondary(flattened_list, plan_texts, main_info, secondary_info): | |
| #hena fe 7alet en keda keda fe secondary information | |
| locations = [] | |
| not_found = [] | |
| len_main = len(main_info) #3 or #4 #sometimes maybe 2 | |
| len_secondary = len(secondary_info) #2 or #1 | |
| if len_main == 2 and len_secondary == 2: | |
| for lbl, clr, acoustic, fire in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr, acoustic, fire)) | |
| if len_main == 2 and len_secondary == 1: | |
| for lbl, clr, acoustic in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr, acoustic)) | |
| if len_main == 3 and len_secondary == 2: | |
| for lbl, w, clr, acoustic, fire in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, w, clr, acoustic, fire)) | |
| if len_main == 3 and len_secondary == 1: | |
| for lbl, w, clr, acoustic in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, w, clr, acoustic)) | |
| if len_main == 4 and len_secondary == 2: | |
| for lbl, w, h, clr, acoustic, fire in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, w, h, clr, acoustic, fire)) | |
| if len_main == 4 and len_secondary == 1: | |
| for lbl, w, h, clr, acoustic in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, w, h, clr,acoustic)) | |
| return locations, not_found | |
| ### newest, accept combined table | |
| def get_similar_colors_all(selected_columns_new): | |
| def generate_rgb(): | |
| return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) | |
| unique_keys = selected_columns_new['door_type'].unique() | |
| key_colors = {key: generate_rgb() for key in unique_keys} | |
| #Column fields | |
| clmns_fields = selected_columns_new.columns.to_list() | |
| def col_template(): | |
| d = { | |
| 'values': [], | |
| 'color_annot': None | |
| } | |
| for field in clmns_fields: | |
| d[field] = [] | |
| return d | |
| col_dict = defaultdict(col_template) | |
| for _, row in selected_columns_new.iterrows(): | |
| key = row['door_type'] | |
| col_dict[key]['values'].append(row['door_id']) | |
| for field in clmns_fields: | |
| col_dict[key][field].append(row.get(field, None)) | |
| col_dict[key]['color_annot'] = key_colors[key] | |
| return dict(col_dict) | |
| def get_fixed_color_all(selected_columns_new, fixed_color=(0, 0, 255)): | |
| """ | |
| Works like get_similar_colors_all but: | |
| - does NOT require a 'door_type' column. | |
| - puts everything under a single key ("ALL"). | |
| - assigns the same fixed_color to all rows. | |
| """ | |
| # Column fields | |
| clmns_fields = selected_columns_new.columns.to_list() | |
| def col_template(): | |
| d = { | |
| 'values': [], | |
| 'color_annot': fixed_color, | |
| } | |
| for field in clmns_fields: | |
| d[field] = [] | |
| return d | |
| col_dict = defaultdict(col_template) | |
| for _, row in selected_columns_new.iterrows(): | |
| key = "ALL" # single group since we don't have door_type | |
| # If door_id exists, use it as the main value, otherwise use the first column | |
| if 'door_id' in selected_columns_new.columns: | |
| main_value = row['door_id'] | |
| else: | |
| main_value = row.iloc[0] | |
| col_dict[key]['values'].append(main_value) | |
| for field in clmns_fields: | |
| col_dict[key][field].append(row.get(field, None)) | |
| # color_annot is already fixed in the template | |
| return dict(col_dict) | |
| ### newest, accept combined table | |
| def get_flattened_tuples_list_all(col_dict): | |
| exclude_fields = ['door_type', 'values'] | |
| flattened_list = [] | |
| for values_dict in col_dict.values(): | |
| # All fields that are lists and not in the excluded fields | |
| list_fields = [k for k, v in values_dict.items() | |
| if isinstance(v, list) and k not in exclude_fields] | |
| n_rows = len(values_dict[list_fields[0]]) if list_fields else 0 | |
| for i in range(n_rows): | |
| tuple_row = tuple(values_dict[field][i] for field in list_fields) + (values_dict['color_annot'],) | |
| flattened_list.append(tuple_row) | |
| return flattened_list | |
| def get_flattened_tuples_list_no_doortype(selected_columns): | |
| flattened_list_no_color = list(selected_columns.itertuples(name=None, index=False)) | |
| col = (0,0,255) | |
| new_fl_list = [] | |
| for tu in flattened_list_no_color: | |
| new_fl_list.append(tu + (col,)) | |
| return new_fl_list | |
| #SECONDARY | |
| def get_cleaned_data_secondary(locations, main_info, secondary_info): | |
| processed = defaultdict(int) | |
| new_data = [] | |
| if len(main_info) == 2 and len(secondary_info) == 1: | |
| for coords, label, color, acous in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color, acous)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color, acous)) | |
| if len(main_info) == 2 and len(secondary_info) == 2: | |
| for coords, label, color, acous, fire in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color, acous, fire)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color, acous, fire)) | |
| if len(main_info) == 3 and len(secondary_info) == 1: | |
| for coords, label, width, color, acous in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, width, color, acous)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, width, color, acous)) | |
| if len(main_info) == 3 and len(secondary_info) == 2: | |
| for coords, label, width, color, acous, fire in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, width, color, acous, fire)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, width, color, acous, fire)) | |
| if len(main_info) == 4 and len(secondary_info) == 1: | |
| for coords, label, width, height, color, acous in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, width, height, color, acous)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, width, height, color, acous)) | |
| if len(main_info) == 4 and len(secondary_info) == 2: | |
| for coords, label, width, height, color, acous, fire in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, width, height, color, acous, fire)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, width, height, color, acous, fire)) | |
| return new_data | |
| def merge_pdf_bytes_list(pdfs): | |
| writer = PdfWriter() | |
| for pdf_bytes in pdfs: | |
| pdf_stream = io.BytesIO(pdf_bytes) | |
| reader = PdfReader(pdf_stream) | |
| for page in reader.pages: | |
| writer.add_page(page) | |
| output_stream = io.BytesIO() | |
| writer.write(output_stream) | |
| output_stream.seek(0) | |
| return output_stream.read() | |
| def calculate_bounding_rect_count(vertices,padding): | |
| x, y = vertices[0] | |
| xmin = x - padding | |
| ymin = y - padding | |
| xmax = x + padding | |
| ymax = y + padding | |
| return [xmin, ymin, xmax, ymax] | |
| def rgb_string_to_hex(rgb_string): | |
| r, g, b = map(float, rgb_string.strip().split()) | |
| return '#{:02X}{:02X}{:02X}'.format(int(r * 255), int(g * 255), int(b * 255)) | |
| def generate_annotation_xml_block_count(vertices, area_text, author, custom_data: dict, column_order: list, index: int, | |
| label: str = '',height:str='',width:str='', | |
| color:str='',countstyle:str='',countsize:str=''): | |
| now = datetime.datetime.utcnow() | |
| mod_date = now.strftime("D:%Y%m%d%H%M%S+00'00'") | |
| creation_date = now.isoformat() + 'Z' | |
| id_str = "fitz-" + uuid.uuid4().hex[:4].upper() | |
| vert_str = ' '.join([f'{x:.4f}' for point in vertices for x in point]) | |
| ordered_column_values = [f'({custom_data.get(col, "")})' for col in column_order] | |
| bsi_column_data = ''.join(ordered_column_values) | |
| type_internal= 'Bluebeam.PDF.Annotations.AnnotationMeasureCount' | |
| subject ='Count Measurement' | |
| padding=10 | |
| rectvertices=calculate_bounding_rect_count(vertices,padding) | |
| bbmeasure = '''<</Type/Measure | |
| /Subtype/RL | |
| /R(1 mm = 1 mm) | |
| /X[<</Type/NumberFormat/U(mm)/C 0.3527778/D 100/SS()>>] | |
| /D[<</Type/NumberFormat/U(mm)/C 1/D 100/SS()>>] | |
| /A[<</Type/NumberFormat/U(sq mm)/C 1/D 100/FD true/SS()>>] | |
| /T[<</Type/NumberFormat/U(\\260)/C 1/D 100/FD true/PS()/SS()>>] | |
| /V[<</Type/NumberFormat/U(cu mm)/C 1/D 100/FD true/SS()>>] | |
| /TargetUnitConversion 0.3527778>>''' | |
| raw_text = f'''<< | |
| /Version 1 | |
| /DS(font: Helvetica 12pt; text-align:center; line-height:13.8pt; color:#FF0000) | |
| /CountStyle{countstyle} | |
| /CountScale {countsize} | |
| /MeasurementTypes 128 | |
| /BBMeasure{bbmeasure} | |
| /NumCounts {area_text} | |
| /AP<</N/BBObjPtr_{uuid.uuid4().hex.upper()}>> | |
| /IT/PolygonCount | |
| /Vertices[{vert_str}] | |
| /IC[{color}] | |
| /T({author}) | |
| /CreationDate({mod_date}) | |
| /BSIColumnData[{bsi_column_data}] | |
| /RC(<?xml version="1.0"?><body xmlns:xfa="http://www.xfa.org/schema/xfa-data/1.0/" xfa:contentType="text/html" xfa:APIVersion="BluebeamPDFRevu:2018" xfa:spec="2.2.0" style="font:Helvetica 12pt; text-align:center; line-height:13.8pt; color:#FF0000" xmlns="http://www.w3.org/1999/xhtml"><p>{area_text}</p></body>) | |
| /Label({label}) | |
| /Height {height} | |
| /Width {width} | |
| /Subj({subject}) | |
| /NM({id_str}) | |
| /Subtype/Polygon | |
| /Rect[{rectvertices[0]} {rectvertices[1]} {rectvertices[2]} {rectvertices[3]}] | |
| /Contents({area_text}) | |
| /F 4 | |
| /C[{color}] | |
| /BS<</Type/Border/W 0/S/S>> | |
| /M({mod_date}) | |
| >>'''.encode('utf-8') | |
| compressed = zlib.compress(raw_text) | |
| base64_raw = base64.b16encode(compressed).lower().decode() | |
| annotation = Element('Annotation') | |
| SubElement(annotation, 'Page') ############## newline ##################### | |
| SubElement(annotation, 'Contents').text = area_text | |
| SubElement(annotation, 'ModDate').text = creation_date | |
| SubElement(annotation, 'Color').text = rgb_string_to_hex(color) ############## newline ##################### | |
| SubElement(annotation, 'Type').text = 'Polygon' | |
| SubElement(annotation, 'ID').text = id_str | |
| SubElement(annotation, 'TypeInternal').text = type_internal | |
| SubElement(annotation, 'Raw').text = base64_raw | |
| SubElement(annotation, 'Index').text = str(index) | |
| custom = SubElement(annotation, 'Custom') | |
| for key, value in custom_data.items(): | |
| SubElement(custom, key).text = value | |
| SubElement(annotation, 'Subject').text = subject | |
| SubElement(annotation, 'CreationDate').text = creation_date | |
| SubElement(annotation, 'Author').text = author | |
| SubElement(annotation, 'Label').text = label | |
| SubElement(annotation, 'Height').text = height | |
| SubElement(annotation, 'Width').text = width | |
| return annotation | |
| def save_multiple_annotations_count_bax(annotations, output_path, column_order,pdfWidth,pdfHeight, num_pages): ##new parameter for page number handling | |
| """ | |
| annotations: list of dicts, each with: | |
| - vertices: [x, y] | |
| - text: str (label) | |
| - author: ADR | |
| - custom_data: dict of custom field values | |
| - type_internal: str (e.g., Bluebeam.PDF.Annotations.AnnotationMeasureCount) | |
| - subject: str (e.g., Count Measurement) | |
| """ | |
| doc = Element('Document', Version='1') | |
| #group annotations by page number | |
| annotations_by_page = defaultdict(list) | |
| for ann in annotations: | |
| page_num = ann.get('page', 1) | |
| annotations_by_page[page_num].append(ann) | |
| # Loop through ALL pages | |
| # for page_index, (page_num, page_annotations) in enumerate(sorted(annotations_by_page.items())): | |
| for page_index in range(num_pages): ##new line for page handling | |
| page = SubElement(doc, 'Page', Index=str(page_index)) | |
| SubElement(page, 'Label').text = str(page_index + 1) ##new line for page handling | |
| SubElement(page, 'Width').text = str(pdfWidth) | |
| SubElement(page, 'Height').text = str(pdfHeight) | |
| #adding annotations only if they exist | |
| for i, ann in enumerate(annotations_by_page.get(page_index + 1, [])): ## adjusted for page handling | |
| annotation_xml = generate_annotation_xml_block_count( | |
| vertices=ann['vertices'], | |
| area_text=ann['text'], | |
| author=ann['author'], | |
| custom_data=ann['custom_data'], | |
| column_order=column_order, | |
| index=i, | |
| label=ann.get('label', 'label1'), | |
| height=ann.get('height', '123'), | |
| width=ann.get('width', '123'), | |
| color=ann.get('color', ''), | |
| countstyle=ann.get('countstyle', ''), | |
| countsize=ann.get('countsize','') | |
| ) | |
| annotation_xml.find('Page').text = str(page_index+1) ## adjusted for page handling | |
| page.append(annotation_xml) | |
| # pretty_xml = parseString(tostring(doc)).toprettyxml(indent=" ") | |
| # with open(output_path, 'w', encoding='utf-8') as f: | |
| # f.write(pretty_xml) | |
| pretty_xml= tostring(doc, encoding="unicode", method="xml") | |
| print(f"Saved {len(annotations)} annotations to {output_path}") | |
| return pretty_xml | |
| #templates of countstyles so u can call e.g. CountStyles['Circle'] | |
| CountStyles = { | |
| 'Circle': '/Circle', | |
| 'Diamond':'/Diamond', | |
| 'Triangle':'/Triangle', | |
| 'Square':'/Square', | |
| 'Checkmark':'/Checkmark', | |
| } | |
| def convert_to_bytes(input_pdf_path): | |
| with open(input_pdf_path, "rb") as file: | |
| original_pdf_bytes = file.read() | |
| return original_pdf_bytes | |
| def mirrored_points(x, y, height_plan): | |
| #'vertices': [[new_data[i][0][0][0], new_data[i][0][0][1]]], | |
| mirrored = [] | |
| mirrored.append([x, height_plan - y]) | |
| return mirrored | |
| def point_mupdf_to_pdf(x, y, page): | |
| mediabox = page.mediabox | |
| H = float(mediabox.height) # Use mediabox height, not rect height | |
| pdf_x = mediabox.x0 + x | |
| pdf_y = mediabox.y0 + (H - y) | |
| return [[pdf_x, pdf_y]] | |
| # Modified to adjust mirrored points | |
| def create_bb_bax_secondary(new_data, widthat, heightat, secondary_tobeprinted, CountStyles, input_user_clmn_names, page_number, height_plan): | |
| bax_annotations = [] | |
| for i in range(len(new_data)): | |
| r,g,b = new_data[i][len(new_data[i])-1] # colorr | |
| R = str(float(r/255)) | |
| G = str(float(g/255)) | |
| B = str(float(b/255)) | |
| #vertix = mirrored_points(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) | |
| vertix = point_mupdf_to_pdf(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) | |
| if input_user_clmn_names[4] and input_user_clmn_names[5]: | |
| bax_annotations.append({ | |
| 'vertices': vertix, | |
| 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way | |
| 'author': 'ADR', | |
| 'custom_data': {'FireRating': secondary_tobeprinted[i][0], 'AcousticRating': secondary_tobeprinted[i][1], 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add ) | |
| 'label': new_data[i][1], #change label to whatever u want | |
| 'Height': heightat[i], #for tameem to change - i added any values' | |
| 'Width':widthat[i], | |
| 'page' : page_number, | |
| 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) | |
| 'countstyle': CountStyles['Circle'], | |
| 'countsize':'0.8' #how big or small is the count icon | |
| }) | |
| else: | |
| # Fire mawgooda | |
| if input_user_clmn_names[4]: | |
| bax_annotations.append({ | |
| 'vertices': vertix, | |
| 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way | |
| 'author': 'ADR', | |
| 'custom_data': {'FireRating': secondary_tobeprinted[i], 'AcousticRating': 'N/A', 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add ) | |
| 'label': new_data[i][1], #change label to whatever u want | |
| 'Height': heightat[i], #for tameem to change - i added any values' | |
| 'Width':widthat[i], | |
| 'page' : page_number, | |
| 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) | |
| 'countstyle': CountStyles['Circle'], | |
| 'countsize':'0.8' #how big or small is the count icon | |
| }) | |
| elif input_user_clmn_names[5]: | |
| bax_annotations.append({ | |
| 'vertices': vertix, | |
| 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way | |
| 'author': 'ADR', | |
| 'custom_data': {'FireRating': 'N/A', 'AcousticRating': secondary_tobeprinted[i], 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add ) | |
| 'label': new_data[i][1], #change label to whatever u want | |
| 'Height': heightat[i], #for tameem to change - i added any values' | |
| 'Width':widthat[i], | |
| 'page' : page_number, | |
| 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) | |
| 'countstyle': CountStyles['Circle'], | |
| 'countsize':'0.8' #how big or small is the count icon | |
| }) | |
| return bax_annotations | |
| # Modified to adjust mirrored points | |
| def create_bb_bax(new_data, widthat, heightat, CountStyles, page_number, height_plan): | |
| bax_annotations = [] | |
| for i in range(len(new_data)): | |
| #r,g,b = new_data[i][len(new_data[i])-2] # colorr | |
| r,g,b = new_data[i][2] # colorr | |
| R = str(float(r/255)) | |
| G = str(float(g/255)) | |
| B = str(float(b/255)) | |
| #vertix = mirrored_points(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) | |
| vertix = point_mupdf_to_pdf(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) | |
| bax_annotations.append({ | |
| 'vertices': vertix, | |
| 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way | |
| 'author': 'ADR', | |
| 'custom_data': {'FireRating': 'N/A', 'AcousticRating': 'N/A', 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add ) | |
| 'label': new_data[i][1], #change label to whatever u want | |
| 'height': heightat[i], #for tameem to change - i added any values' | |
| 'width':widthat[i], | |
| 'page' : page_number, | |
| 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) | |
| 'countstyle': CountStyles['Circle'], | |
| 'countsize':'0.8' #how big or small is the count icon | |
| }) | |
| return bax_annotations | |
| def add_location(col_dict, plan_texts): | |
| not_found = [] | |
| for key_outer, value_outer in col_dict.items(): | |
| locations = [] | |
| for id in value_outer['door_id']: | |
| location, _,_ = find_text_in_plan(id, plan_texts) | |
| if len(location) == 0: | |
| not_found.append(id) | |
| locations.append(location) | |
| value_outer['location'] = locations | |
| return col_dict, not_found | |
| import pandas as pd | |
| def _ensure_color_tuple(x): | |
| if x is None or isinstance(x, tuple): | |
| return x | |
| try: | |
| return tuple(x) | |
| except Exception: | |
| return x | |
| def _ensure_list_of_tuples(val): | |
| if val is None: | |
| return [] | |
| if isinstance(val, tuple): | |
| return [val] | |
| if isinstance(val, list): | |
| out = [] | |
| for item in val: | |
| if item is None: | |
| continue | |
| if isinstance(item, tuple): | |
| out.append(item) | |
| elif isinstance(item, list): | |
| out.append(tuple(item)) | |
| else: | |
| try: | |
| out.append(tuple(item)) | |
| except Exception: | |
| pass | |
| return out | |
| try: | |
| return [tuple(val)] | |
| except Exception: | |
| return [] | |
| def grouped_to_dataframe_dynamic(grouped, keep_group=False, | |
| explode_locations=False, | |
| drop_empty_locations=False): | |
| rows = [] | |
| for group_key, block in grouped.items(): | |
| ids = block.get('door_id') or block.get('values') or [] | |
| list_lengths = [len(v) for v in block.values() if isinstance(v, list)] | |
| n = max(list_lengths + [len(ids)]) if (list_lengths or ids) else 0 | |
| if n == 0: | |
| continue | |
| for i in range(n): | |
| row = {} | |
| door_id = ids[i] if i < len(ids) else f"{group_key}:{i}" | |
| row['door_id'] = door_id | |
| for k, v in block.items(): | |
| if k == 'values': | |
| continue | |
| val = (v[i] if isinstance(v, list) and i < len(v) | |
| else (v if not isinstance(v, list) else None)) | |
| if k == 'color': | |
| val = _ensure_color_tuple(val) | |
| elif k == 'location': | |
| val = _ensure_list_of_tuples(val) | |
| row[k] = val | |
| if keep_group: | |
| row['source_group'] = group_key | |
| rows.append(row) | |
| df = pd.DataFrame(rows) # dynamic union of keys | |
| # If there's a 'location' column, normalize + optionally drop empties / explode | |
| if 'location' in df.columns: | |
| df['location'] = df['location'].apply(_ensure_list_of_tuples) | |
| if drop_empty_locations: | |
| df = df[df['location'].map(lambda xs: len(xs) > 0)].reset_index(drop=True) | |
| if explode_locations: | |
| # after filtering empties, explode so each row has a single (x,y) tuple | |
| df = df.explode('location', ignore_index=True) | |
| return df | |
| # Modify it to return widths and height from width, height columns | |
| def get_width_clean_width_height(width_list, height_list): | |
| widths = [] | |
| heights = [] | |
| for width in width_list: | |
| w = re.sub(r",", "", width) | |
| if is_not_number(w): | |
| w = w | |
| else: | |
| if float(w).is_integer(): | |
| w = int(float(w)) | |
| else: | |
| w = w | |
| w = str(w) | |
| widths.append(w) | |
| for height in height_list: | |
| h = re.sub(r",", "", height) | |
| if is_not_number(h): | |
| h = h | |
| else: | |
| if float(h).is_integer(): | |
| h = int(float(h)) | |
| else: | |
| h = h | |
| h = str(h) | |
| heights.append(h) | |
| return widths, heights | |
| def get_widths_bb_format_st_op(cleaned_width, kelma): | |
| pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b" | |
| match = re.search(pattern, kelma) | |
| widths = [] | |
| heights = [] | |
| for widthaa in cleaned_width: | |
| index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x")) | |
| width_name = widthaa[:index] | |
| height_name = widthaa[index+1:] | |
| width_name = int(float(width_name)) | |
| height_name = int(float(height_name)) | |
| if match: | |
| full_text = f"{width_name} mm wide x {height_name} mm high" | |
| width = width_name | |
| height = height_name | |
| else: | |
| width = height_name | |
| height = width_name | |
| widths.append(width) | |
| heights.append(height) | |
| return widths, heights | |
| # New for new dictionary logic | |
| def create_bb_bax_new(df_points, CountStyles, page_number, height_plan): | |
| bax_annotations = [] | |
| exclude = {"location", "color_annot"} | |
| for _, row in df_points.iterrows(): | |
| rw = row | |
| customDta = row.drop(labels=exclude, errors="ignore").to_dict() | |
| r,g,b = rw['color_annot'] | |
| R = str(float(r/255)) | |
| G = str(float(g/255)) | |
| B = str(float(b/255)) | |
| x, y = rw['location'] | |
| vertix = point_mupdf_to_pdf(x, y, height_plan) | |
| bax_annotations.append({ | |
| 'vertices': vertix, | |
| 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way | |
| 'author': 'ADR', | |
| 'custom_data': customDta, #identify custom colums here as( Column name: Text to add ) | |
| 'label': rw['door_id'], #change label to whatever u want | |
| 'page' : page_number, | |
| 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) | |
| 'countstyle': CountStyles['Circle'], | |
| 'countsize':'0.8' #how big or small is the count icon | |
| }) | |
| return bax_annotations, customDta | |
| #Handle missing widths or heights in some rows | |
| def generate_separate_dimensions(widths): | |
| widthat = [] | |
| heightat = [] | |
| #pattern = r'(\d+)\s*mm wide x\s*(\d+)\s*mm high' | |
| pattern = r'(\d+(?:\.\d+)?)\s*mm wide x\s*(\d+(?:\.\d+)?)\s*mm high' | |
| for s in widths: | |
| match = re.match(pattern, s) | |
| if match: | |
| width = match.group(1) | |
| height = match.group(2) | |
| widthat.append(width) | |
| heightat.append(height) | |
| else: | |
| widthat.append("N/A") | |
| heightat.append("N/A") | |
| return widthat, heightat | |
| def generate_bluebeam_columns_raw(column_names): | |
| """ | |
| Generate BluebeamUserDefinedColumns XML as raw string, without headers or extra fields. | |
| """ | |
| root = Element("BluebeamUserDefinedColumns") | |
| for idx, name in enumerate(column_names): | |
| item = SubElement(root, "BSIColumnItem", Index=str(idx), Subtype="Text") | |
| SubElement(item, "Name").text = name | |
| SubElement(item, "DisplayOrder").text = str(idx) | |
| SubElement(item, "Deleted").text = "False" | |
| SubElement(item, "Multiline").text = "False" | |
| # Convert to string and decode raw bytes | |
| return tostring(root, encoding="unicode", method="xml") | |
| def pick_approach(schedule, plan, searcharray, flag): | |
| not_found_list = [] | |
| missings = [] | |
| no_tables = False | |
| for p in plan: | |
| for k in range(len(schedule)): | |
| if flag == 1: | |
| dfs = extract_tables(schedule[k]) | |
| if flag == 2: | |
| dfs = extract_tables_model(schedule[k]) | |
| user_input_this_schedule = searcharray[k] | |
| for j in range(len(user_input_this_schedule)): | |
| user_input = user_input_this_schedule[j] | |
| secondary_presence = False | |
| if user_input[4] or user_input[5]: | |
| secondary_presence = True | |
| main_info_, secondary_info_ = separate_main_secondary(user_input) | |
| main_info = [item for item in main_info_ if item] | |
| secondary_info = [item for item in secondary_info_ if item] | |
| selected_columns_combined = get_selected_columns_all(dfs, user_input) | |
| if selected_columns_combined is None: | |
| dfs_normal = extract_tables(schedule[k]) | |
| column_indices = get_column_indices_from_dfs_normal(dfs_normal, user_input) | |
| if column_indices is None: | |
| missing_clmns = check_missing(dfs, user_input) | |
| missing_message = f"{missing_clmns} can't be extracted from table input {j+1} in schedule {k+1}" | |
| missings.append(missing_message) | |
| no_tables = True | |
| continue # continue to the next user input | |
| if len(dfs) == 1: | |
| selected_columns_combined = get_selected_columns_by_index(dfs[0], column_indices, user_input) | |
| if len(dfs) > 1: | |
| index_df = get_df_index(dfs, user_input) | |
| selected_columns_combined = get_selected_columns_by_index(dfs[index_df], column_indices, user_input) | |
| selected_columns_combined = selected_columns_combined.applymap(lambda x: 'N/A' if isinstance(x, str) and x.strip() == '' else x) | |
| selected_columns_combined = selected_columns_combined.fillna('N/A') | |
| selected_columns_combined = selected_columns_combined.replace(r'(?i)\bn/a\b', 'N/A', regex=True) | |
| kelma = get_st_op_pattern(selected_columns_combined, user_input) | |
| if "door_type" in selected_columns_combined.columns: | |
| col_dict = get_similar_colors_all(selected_columns_combined) | |
| flattened_list = get_flattened_tuples_list_all(col_dict) | |
| else: | |
| if secondary_presence: | |
| main_info = main_info + [""] | |
| flattened_list = get_flattened_tuples_list_no_doortype(selected_columns_combined) | |
| plan_texts = read_text(p) | |
| if secondary_presence: | |
| locations, not_found = get_word_locations_plan_secondary(flattened_list,plan_texts, main_info, secondary_info) | |
| not_found_list.append(not_found) | |
| else: | |
| locations, not_found = get_word_locations_plan(flattened_list,plan_texts) | |
| not_found_list.append(not_found) | |
| ## Getting the not found in all plans | |
| flattened_not_found_list = [item for sublist in not_found_list for item in sublist] | |
| from collections import Counter | |
| counts_not_found = Counter(flattened_not_found_list) | |
| not_found_any_plan = [] | |
| for key, value in counts_not_found.items(): | |
| if value == len(plan): | |
| not_found_any_plan.append(key) | |
| not_found_any_plan = [item for item in not_found_any_plan if item != "N/A"] | |
| return no_tables, not_found_any_plan | |
| def get_df_csv(sch): | |
| with open(sch, "rb") as f: | |
| raw = f.read(100_000) # read first 100 KB (enough for detection) | |
| guess = chardet.detect(raw) | |
| #print(guess) # {'encoding': 'Windows-1252', 'confidence': 0.73, ...} | |
| encoding = guess["encoding"] or "utf-8" # fallback | |
| df = pd.read_csv(sch, encoding=encoding) | |
| return df | |
| #if number has mm or any text beside it | |
| def clean_width_height(clean_widths, clean_height): | |
| new_width = [] | |
| new_height = [] | |
| for text in clean_widths: | |
| numbers = re.findall(r"\d+(?:\.\d+)?", text) | |
| new_width.append(numbers[0]) | |
| for text in clean_height: | |
| numbers = re.findall(r"\d+(?:\.\d+)?", text) | |
| new_height.append(numbers[0]) | |
| return new_width, new_height | |
| def mainRun(schedule, plan, searcharray, sch_csv_pdf): | |
| if sch_csv_pdf: | |
| print("shcedule type is PDF") | |
| no_tables_normal, not_found_any_plan_normal = pick_approach(schedule, plan, searcharray, 1) | |
| print(f"NO TABLES NORMAL: {no_tables_normal}") | |
| try: | |
| no_tables_model, not_found_any_plan_model = pick_approach(schedule, plan, searcharray, 2) | |
| except: | |
| print("Model detection has issue of file too large") | |
| no_tables_model = True | |
| print(f"NO TABLES MODEL: {no_tables_model}") | |
| pick_normal = False | |
| pick_model = False | |
| both_failed = False | |
| if no_tables_model and no_tables_normal: | |
| both_failed = True | |
| print("Both approaches failed") | |
| elif no_tables_model: | |
| pick_normal = True | |
| #print("choose normal") | |
| elif no_tables_normal: | |
| pick_model = True | |
| #print("choose model") | |
| else: | |
| ## Decide according to the not found labels | |
| #print("el etneen shaghaleen") | |
| if len(not_found_any_plan_model) > len(not_found_any_plan_normal): | |
| #print("choose not_found_any_plan_normal") | |
| pick_normal = True | |
| elif len(not_found_any_plan_model) < len(not_found_any_plan_normal): | |
| pick_model = True | |
| #print("choose not_found_any_plan_model") | |
| else: # law ad ba3d choose the older approach (fitz) | |
| pick_normal = True | |
| #print("choose any") | |
| else: | |
| print("schedule type is CSV") | |
| df = get_df_csv(schedule[0]) | |
| print(df) | |
| print("mainRun is RUNNING") | |
| #print(type(plan)) | |
| eltype = type(plan) | |
| print(f"el type beta3 variable plan:: {eltype}") | |
| len_plan = len(plan) | |
| print(f"length of the plan's array is: {len_plan}") | |
| p1_type = type(plan[0]) | |
| print(f"el mawgood fe p[0]: {p1_type}") | |
| print(f"length of search array: {len(searcharray)}") | |
| #dfs = extract_tables(schedule) | |
| print(f"type of schedule: {type(schedule)}") | |
| print(f"length of schedules: {len(schedule)}") | |
| pdf_widths = [] | |
| pdf_heights = [] | |
| pdfs_count_type = [] | |
| annotation_counter = 0 | |
| page_number = 0 | |
| bax_annotations_all_inputs = [] #for the same plan | |
| column_order_all = [] | |
| #pdfs = [] | |
| not_found_list = [] | |
| repeated_labels_list = [] | |
| missings = [] # for the errors | |
| for p in plan: | |
| annotation_counter +=1 | |
| page_number +=1 | |
| pdf_document = fitz.open("pdf", p) | |
| # Get the first page (0-indexed) | |
| page = pdf_document[0] | |
| rect = page.rect # Rectangle: contains x0, y0, x1, y1 | |
| width_plan = page.cropbox.width # or: width = rect.x1 - rect.x0 | |
| height_plan = page.cropbox.height # or: height = rect.y1 - rect.y0 | |
| #width_plan = math.ceil(width_plan) | |
| #height_plan = math.ceil(height_plan) | |
| for k in range(len(schedule)): | |
| if both_failed == False: | |
| if sch_csv_pdf and pick_normal: | |
| dfs = extract_tables(schedule[k]) | |
| if sch_csv_pdf and pick_model: | |
| dfs = extract_tables_model(schedule[k]) | |
| else: | |
| print("BOTH FAILED, BREAK THE CODE AND IMPORT CSV FILE") | |
| error_message = f"Can't read the pdf schedule number {k+1}, please upload csv file instead" | |
| break | |
| if sch_csv_pdf == False: | |
| df = get_df_csv(schedule[k]) | |
| dfs = [df] | |
| user_input_this_schedule = searcharray[k] | |
| for j in range(len(user_input_this_schedule)): | |
| user_input = user_input_this_schedule[j] | |
| secondary_presence = False | |
| if user_input[4] or user_input[5]: | |
| secondary_presence = True | |
| main_info_, secondary_info_ = separate_main_secondary(user_input) | |
| main_info = [item for item in main_info_ if item] | |
| secondary_info = [item for item in secondary_info_ if item] | |
| print("feh secondary information") | |
| if user_input[4]: | |
| print("Fire rate mawgooda") | |
| if user_input[5]: | |
| print("Acoustic Rate mawgooda") | |
| else: | |
| print("mafeesh secondary information") | |
| selected_columns_combined = get_selected_columns_all(dfs, user_input) | |
| if sch_csv_pdf: | |
| if selected_columns_combined is None: | |
| dfs_normal = extract_tables(schedule[k]) | |
| column_indices = get_column_indices_from_dfs_normal(dfs_normal, user_input) | |
| if column_indices is None: | |
| missing_clmns = check_missing(dfs, user_input) | |
| missing_message = f"{missing_clmns} can't be extracted from table input {j+1} in schedule {k+1}" | |
| missings.append(missing_message) | |
| continue # continue to the next user input | |
| if len(dfs) == 1: | |
| selected_columns_combined = get_selected_columns_by_index(dfs[0], column_indices, user_input) | |
| if len(dfs) > 1: | |
| index_df = get_df_index(dfs, user_input) | |
| selected_columns_combined = get_selected_columns_by_index(dfs[index_df], column_indices, user_input) | |
| selected_columns_combined = selected_columns_combined.applymap(lambda x: 'N/A' if isinstance(x, str) and x.strip() == '' else x) | |
| selected_columns_combined = selected_columns_combined.fillna('N/A') | |
| selected_columns_combined = selected_columns_combined.replace(r'(?i)\bn/a\b', 'N/A', regex=True) | |
| kelma = get_st_op_pattern(selected_columns_combined, user_input) | |
| if "door_type" in selected_columns_combined.columns: | |
| col_dict = get_similar_colors_all(selected_columns_combined) | |
| flattened_list = get_flattened_tuples_list_all(col_dict) | |
| else: | |
| if secondary_presence: | |
| main_info = main_info + [""] | |
| # new get_fixed_colors_all | |
| col_dict = get_fixed_color_all(selected_columns_combined) | |
| flattened_list = get_flattened_tuples_list_no_doortype(selected_columns_combined) | |
| plan_texts = read_text(p) | |
| #locations, not_found = get_word_locations_plan_secondary(flattened_list,plan_texts, main_info, secondary_info) | |
| #not_found_list.append(not_found) | |
| #new_data3 = get_cleaned_data_secondary(locations,main_info,secondary_info) | |
| #repeated_labels = get_repeated_labels(locations) | |
| #repeated_labels = list(repeated_labels) | |
| #repeated_labels_list.append(repeated_labels) | |
| col_dict, not_found = add_location(col_dict, plan_texts) | |
| not_found_list.append(not_found) | |
| df_points = grouped_to_dataframe_dynamic(col_dict, | |
| drop_empty_locations=True, | |
| explode_locations=True) | |
| #df_points.columns = df_points.columns.str.strip().str.replace(r"\s+", "_", regex=True) | |
| # Clean column names | |
| df_points.columns = (df_points.columns | |
| .str.strip() | |
| .str.replace(r"[^\w-]+", "_", regex=True) | |
| .str.replace(r"_+", "_", regex=True) | |
| .str.strip("_")) | |
| # Getting rid of Col1 and so on | |
| pattern2 = r'(?i)\bcol\d+\b' | |
| mask2 = df_points.applymap(lambda x: bool(re.search(pattern2, str(x)))) | |
| df_points = df_points.mask(mask2, "N/A") | |
| print(f"col_dict: {col_dict}") | |
| print(f"selected_columns_combined: {selected_columns_combined}") | |
| print(f"df: {df_points}") | |
| if df_points.empty: | |
| continue # to the next user input | |
| # handling no door type in the new dictionary logic | |
| if 'color_annot' not in df_points: | |
| df_points['color_annot'] = (0, 0, 255) | |
| dupes = df_points['door_id'].value_counts() | |
| repeated_ids = dupes[dupes > 1].index.to_list() | |
| repeated_labels_list.append(repeated_ids) | |
| if ('width' in df_points and 'height' in df_points) or 'structural_opening' in df_points: | |
| if kelma: | |
| lst_st_op = df_points["structural_opening"].tolist() | |
| cleaned_st_op = get_cleaned_width(lst_st_op) | |
| widths, heights = get_widths_bb_format_st_op(cleaned_st_op, kelma) | |
| # remove a column (returns a new df) | |
| df_points = df_points.drop(columns=['structural_opening']) | |
| # add two columns (scalars, lists/arrays/Series of length len(df), or expressions) | |
| df_points['width'] = widths # e.g., a list/Series/np.array or a scalar | |
| df_points['height'] = heights | |
| else: | |
| # make sure they are strings first to keep the flow of get_width_clean_width_height function | |
| df_points['width'] = df_points['width'].astype('string') | |
| df_points['height'] = df_points['height'].astype('string') | |
| lst_width = df_points["width"].tolist() | |
| lst_height = df_points["height"].tolist() | |
| clean_widths, clean_height = get_width_clean_width_height(lst_width, lst_height) | |
| #clean_widths, clean_height = clean_width_height(clean_widths, clean_height) | |
| df_points["width"] = clean_widths | |
| df_points["height"] = clean_height | |
| df_points = df_points.rename(columns={'width': 'Width_', 'height':'Height_'}) | |
| #if kelma == None: | |
| #widths, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info) | |
| #else: | |
| #width_info_tobeprinted, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info) | |
| #cleaned_width = get_cleaned_width(width_info_tobeprinted) | |
| #widths = get_widths_bb_format(cleaned_width, kelma) | |
| #Count type annotation | |
| #widht_count, height_count = generate_separate_dimensions(widths) | |
| #bax = create_bb_bax_secondary(new_data3, widht_count, height_count, secondary_tobeprinted, CountStyles, user_input, page_number, page) | |
| #bax_annotations_all_inputs.append(bax) | |
| print(f"color_annot: {df_points['color_annot']}") | |
| print(f"df: {df_points}") | |
| bax, customDta = create_bb_bax_new(df_points, CountStyles, page_number, page) | |
| bax_annotations_all_inputs.append(bax) | |
| for key in customDta.keys(): | |
| column_order_all.append(key) | |
| #column_order_all.append(customDta) | |
| # if it is not byte type | |
| #pdfs_count_type.append(convert_to_bytes(p)) | |
| pdfs_count_type.append(p) | |
| pdf_widths.append(width_plan) | |
| pdf_heights.append(height_plan) | |
| merged_pdf = merge_pdf_bytes_list(pdfs_count_type) | |
| print(f"number of pges of merged_pdf is {len(merged_pdf)} and its type is {type(merged_pdf)}") | |
| bax_annotation = [] | |
| for bax_ann in bax_annotations_all_inputs: | |
| bax_annotation.extend(bax_ann) | |
| #column_order = ['FireRating', 'AcousticRating', 'Height_', 'Width_'] | |
| #column_order = [] | |
| #for key in customDta.keys(): | |
| # column_order.append(key) | |
| column_order = [] | |
| for cols in column_order_all: | |
| column_order.append(cols) | |
| column_order = list(set(column_order)) | |
| print(f"lenght of column_order is: {len(column_order)} and it's content: {column_order}") | |
| ## Getting the not found in all plans | |
| flattened_not_found_list = [item for sublist in not_found_list for item in sublist] | |
| counts_not_found = Counter(flattened_not_found_list) | |
| not_found_any_plan = [] | |
| for key, value in counts_not_found.items(): | |
| if value == len(pdfs_count_type): | |
| not_found_any_plan.append(key) | |
| flattened_repeated_labels_list = [item for sublist in repeated_labels_list for item in sublist] | |
| pretty_xml = save_multiple_annotations_count_bax(bax_annotation, 'count_type_Windows.bax', column_order,pdf_widths,pdf_heights,page_number) | |
| column_xml = generate_bluebeam_columns_raw(column_order) | |
| repeated_labels = flattened_repeated_labels_list | |
| ##### SHOULD return pretty_xml, column_xml, merged_pdf | |
| not_found = [item for item in not_found_any_plan if item != "N/A"] | |
| annotatedimgs=[] | |
| doc2 =fitz.open('pdf',merged_pdf) | |
| len_doc2 = len(doc2) | |
| list1=pd.DataFrame(columns=['content', 'id', 'subject','color']) | |
| print(f"number of pges of doc2 is {len_doc2} and its type is {type(doc2)}") | |
| for page in doc2: | |
| print("now inside page in doc2") | |
| # page=doc2[0] | |
| pix = page.get_pixmap() # render page to an image | |
| pl=Image.frombytes('RGB', [pix.width,pix.height],pix.samples) | |
| img=np.array(pl) | |
| annotatedimg = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| annotatedimgs.append(annotatedimg) | |
| # Iterate through annotations on the page | |
| annotations_page = page.annots() | |
| print(f"annotations: {annotations_page}") | |
| ''' | |
| for annot in page.annots(): | |
| # Get the color of the annotation | |
| print("ann: {annot}") | |
| annot_color = annot.colors | |
| if annot_color is not None: | |
| # annot_color is a dictionary with 'stroke' and 'fill' keys | |
| print(annot_color) | |
| stroke_color = annot_color.get('stroke') # Border color | |
| fill_color = annot_color.get('fill') # Fill color | |
| if fill_color: | |
| v='fill' | |
| # print('fill') | |
| if stroke_color: | |
| v='stroke' | |
| x,y,z=int(annot_color.get(v)[0]*255),int(annot_color.get(v)[1]*255),int(annot_color.get(v)[2]*255) | |
| print(f"x: {x}") | |
| print(f"y: {y}") | |
| print(f"z: {z}") | |
| list1.loc[len(list1)] =[annot.info['content'],annot.info['id'],annot.info['subject'],[x,y,z]] | |
| print(f"list1 : {list1}") | |
| ''' | |
| return annotatedimgs, doc2 , list1, repeated_labels , not_found, pretty_xml, column_xml | |