from collections import defaultdict from collections import Counter import pandas as pd import random import math import re import io import pypdfium2 as pdfium import fitz from PIL import Image, ImageDraw from PyPDF2 import PdfReader, PdfWriter from PyPDF2.generic import TextStringObject, NameObject, ArrayObject, FloatObject from PyPDF2.generic import NameObject, TextStringObject, DictionaryObject, FloatObject, ArrayObject from PyPDF2 import PdfReader from PyPDF2.generic import TextStringObject import numpy as np import cv2 from collections import defaultdict import random import fitz # PyMuPDF import PyPDF2 import io from PyPDF2.generic import TextStringObject # ✅ Required for setting string values from PyPDF2 import PdfReader, PdfWriter import zlib import base64 import datetime import uuid from xml.etree.ElementTree import Element, SubElement, tostring, ElementTree from xml.dom.minidom import parseString from collections import defaultdict from xml.etree.ElementTree import Element, SubElement, tostring from azure.ai.formrecognizer import DocumentAnalysisClient from azure.core.credentials import AzureKeyCredential import chardet def convert2img(path): pdf = pdfium.PdfDocument(path) page = pdf.get_page(0) pil_image = page.render().to_pil() pl1=np.array(pil_image) img = cv2.cvtColor(pl1, cv2.COLOR_RGB2BGR) return img def convert2pillow(path): pdf = pdfium.PdfDocument(path) page = pdf.get_page(0) pil_image = page.render().to_pil() return pil_image def calculate_midpoint(x1,y1,x2,y2): xm = int((x1 + x2) / 2) ym = int((y1 + y2) / 2) return (xm, ym) def read_text(input_pdf_path): pdf_document = fitz.open('pdf',input_pdf_path) for page_num in range(pdf_document.page_count): page = pdf_document[page_num] text_instances = page.get_text("words") page.apply_redactions() return text_instances def normalize_text(text): """ Normalize text by removing all whitespace characters and converting to lowercase. """ if not isinstance(text, str): return "" # Remove all whitespace characters (spaces, tabs, newlines) text = re.sub(r'\s+', '', text) return text.lower() def build_flexible_regex(term): """ Match the full string, allowing whitespace or light punctuation between words, but not allowing extra words or partial matches. """ words = normalize_text(term).split() pattern = r'[\s\.\:\-]*'.join(map(re.escape, words)) full_pattern = rf'^{pattern}$' return re.compile(full_pattern, re.IGNORECASE) def flexible_search(df, search_terms): """ Search for terms in column names and top N rows. Returns matched column indices and cell positions. """ normalized_columns = [normalize_text(col) for col in df.columns] results = {term: {"col_matches": [], "cell_matches": []} for term in search_terms} for term in search_terms: regex = build_flexible_regex(term) # Search in column names for col_idx, col_text in enumerate(df.columns): norm_col = normalize_text(col_text) if regex.search(norm_col): results[term]["col_matches"].append(col_idx) # Search in top N rows for row_idx in range(min(3, len(df))): for col_idx in range(len(df.columns)): cell_text = normalize_text(df.iat[row_idx, col_idx]) if regex.search(cell_text): results[term]["cell_matches"].append((row_idx, col_idx)) return results """def generate_current_table_without_cropping(clm_idx, clmn_name, df): selected_df = df.iloc[:, clm_idx] print("hello I generated the selected columns table without cropping") selected_df.columns = clmn_name return selected_df""" def generate_current_table_without_cropping(clm_idx,df): selected_df = df.iloc[:, clm_idx] print("hello I generated the selected columns table without cropping") return selected_df def crop_rename_table(indices, clmn_name, clmn_idx,df): #crop_at = (max(set(indices), key=indices.count)) + 1 crop_at = max(indices) + 1 df = df.iloc[crop_at:] # Starts from row index 5 (zero-based index) df.reset_index(drop=True, inplace=True) # Reset index after cropping slctd_clms = df.iloc[:, clmn_idx] # Select columns by index slctd_clms.columns = clmn_name # Rename selected columns return slctd_clms def clean_column_row(row): return [re.sub(r'^\d+-\s*', '', str(cell)) for cell in row] def details_in_another_table(clmn_name, clmn_idx, current_dfs, dfs): matching_dfs = [ dff for dff in dfs if dff is not current_dfs and current_dfs.shape[1] == dff.shape[1] ] if not matching_dfs: return None updated_dfs = [] for dff in matching_dfs: selected_dff = dff.iloc[:, clmn_idx].copy() # Clean the column names and make them a row cleaned_header = clean_column_row(selected_dff.columns.tolist()) col_names_as_row = pd.DataFrame([cleaned_header]) # Rename columns selected_dff.columns = clmn_name col_names_as_row.columns = clmn_name # Combine the cleaned row with data temp_df = pd.concat([col_names_as_row, selected_dff], ignore_index=True) updated_dfs.append(temp_df) combined_df = pd.concat(updated_dfs, ignore_index=True) return combined_df def map_user_input_to_standard_labels(user_inputs): patterns = { 'door_id': r'\b(?:door\s*)?(?:id|no|number)\b|\bdoor\s*name\b', 'door_type': r'\b(?:\S+\s+)?door\s*type\b|\btype(?:\s+\w+)?\b', 'structural_opening': r'\bstructural\s+opening\b', 'width': r'\bwidth\b', 'height': r'\bheight\b', } def normalize(text): return re.sub(r'\s+', ' ', text.strip(), flags=re.MULTILINE).lower() mapped = {} for item in user_inputs: normalized_item = normalize(item) matched = False for label, pattern in patterns.items(): if label not in mapped and re.search(pattern, normalized_item, re.IGNORECASE): mapped[label] = item matched = True break #if not matched: # mapped[normalized_item] = None return mapped def analyse_cell_columns(cell_columns_appearance): cell_matches = [] col_matches = [] for key in cell_columns_appearance.keys(): if len(cell_columns_appearance[key]['cell_matches']) >0: cell_matches.append(cell_columns_appearance[key]['cell_matches'][0]) if len(cell_columns_appearance[key]['col_matches']) >0: col_matches.append(cell_columns_appearance[key]['col_matches'][0]) return cell_matches, col_matches # when column names are located in the cells def get_row_column_indices(cell_clmn_indx): row_index = [] column_index = [] for t in cell_clmn_indx: row_index.append(t[0]) column_index.append(t[1]) return row_index, column_index # when column names are located in the coulmns itself def get_column_index(col_matches): idx = [] for t in col_matches: idx.append(t) return idx def extract_tables(schedule): doc = fitz.open("pdf",schedule) for page in doc: tabs = page.find_tables() dfs = [] for tab in tabs: df = tab.to_pandas() dfs.append(df) return dfs def get_selected_columns(dfs, user_patterns): selected_columns = [] selected_columns_new = None # Initialize selected_columns_new to None for i in range(len(dfs)): cell_columns_appearance = flexible_search(dfs[i], user_patterns) cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) if len(user_patterns) == 2: clmn_name = ["door_id", "door_type"] if len(user_patterns) == 4: clmn_name = ["door_id", "door_type", "width", "height"] if len(user_patterns) == 3: clmn_name = ["door_id", "door_type", "structural opening"] if len(cell_matches) == 0 and len(col_matches) == 0: print(f"this is df {i}, SEARCH IN ANOTHER DF") else: #IN COLUMNS if len(col_matches) == len(user_patterns): column_index_list = get_column_index(col_matches) print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") print(column_index_list) if len(dfs[i]) <10: selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) #details in the same table if len(dfs[i]) >10: selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) #break #IN CELLS if len(cell_matches) == len(user_patterns): row_index_list, column_index_list = get_row_column_indices(cell_matches) print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") #details in another table if len(dfs[i]) <10: #selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) break #details in the same table if len(dfs[i]) >10: print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") selected_columns_new = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) break return selected_columns_new def separate_main_secondary(input_user_clmn_names): main_info = input_user_clmn_names[:4] secondary_info = input_user_clmn_names[4:] return main_info, secondary_info # take main info def get_column_name(user_input_m): #get empty indices empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] # fixed column names fixed_list = ["door_id", "door_type", "width", "height"] for i in range(len(empty_indices)): if empty_indices[i] == 3 and empty_indices[i - 1] == 2: fixed_list[2] = "" if empty_indices[i] == 3 and not empty_indices[i - 1] == 2: fixed_list[2] = "structural_opening" fixed_list[empty_indices[i]] = "" #finalize the column name structure clmn_name_m = [i for i in fixed_list if i] return clmn_name_m # take secondary info def get_column_name_secondary(user_input_m): #get empty indices empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] # fixed column names fixed_list = ["fire_rate", "acoustic_rate"] for i in range(len(empty_indices)): fixed_list[empty_indices[i]] = "" #finalize the column name structure clmn_name_m = [i for i in fixed_list if i] return clmn_name_m ### byte type not path def extract_tables_model(schedule_byte): # Set your Azure credentials endpoint = "https://tabledetection2.cognitiveservices.azure.com/" key = "5lr94dODMJihbGOMw2Vdz29zXRBiqt528fSGoGmzSJHTrWtHSnRdJQQJ99BEACYeBjFXJ3w3AAALACOGBANH" # Create client client = DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key)) poller = client.begin_analyze_document("prebuilt-layout", document=schedule_byte) # Get result result = poller.result() #print(result) import pandas as pd tables = [] for table in result.tables: max_cols = max(cell.column_index for cell in table.cells) + 1 max_rows = max(cell.row_index for cell in table.cells) + 1 table_data = [["" for _ in range(max_cols)] for _ in range(max_rows)] for cell in table.cells: table_data[cell.row_index][cell.column_index] = cell.content df = pd.DataFrame(table_data) tables.append(df) return tables #handling both main and secondary info together in one table def get_selected_columns_all(dfs, user_patterns): selected_columns = [] selected_columns_new = None # Initialize selected_columns_new to None for i in range(len(dfs)): main_info, secondary_info = separate_main_secondary(user_patterns) clmn_name_main = get_column_name(main_info) non_empty_main_info = [item for item in main_info if item] clmn_name_secondary = get_column_name_secondary(secondary_info) non_empty_secondary_info = [item for item in secondary_info if item] clmn_name = clmn_name_main + clmn_name_secondary non_empty_info = non_empty_main_info + non_empty_secondary_info #print(f"main info: {main_info}") print(f"clmn name: {clmn_name}") print(f"non-empty info: {non_empty_info}") #print(f"length of non-empty info: {len(non_empty_main_info)}") cell_columns_appearance = flexible_search(dfs[i], non_empty_info) cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) print(f"length of cell_matches: {len(cell_matches)}") print(f"cell_matches: {cell_matches}") #clmn_name = map_user_input_to_standard_labels(user_patterns) #if len(clmn_name) < len(user_patterns): print(clmn_name) if len(cell_matches) == 0 and len(col_matches) == 0: print(f"this is df {i}, SEARCH IN ANOTHER DF") else: #IN COLUMNS if len(col_matches) == len(non_empty_info): column_index_list = get_column_index(col_matches) print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") #print(len(clm_idx)) #details in another table print(column_index_list) if len(dfs[i]) <10: selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) #break #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) #details in the same table if len(dfs[i]) >10: selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) #break #IN CELLS if len(cell_matches) == len(non_empty_info): row_index_list, column_index_list = get_row_column_indices(cell_matches) print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") #details in another table #if len(dfs[i]) <2: #selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) selected_columns_new2 = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True) break #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) ##details in the same table #if len(dfs[i]) >2: # #print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") #break return selected_columns_new #for new dictionary logic def get_selected_columns_all(dfs, user_patterns): selected_columns = [] selected_columns_new = None # Initialize selected_columns_new to None for i in range(len(dfs)): extra_info = user_patterns[6:] main_info, secondary_info = separate_main_secondary(user_patterns) clmn_name_main = get_column_name(main_info) non_empty_main_info = [item for item in main_info if item] clmn_name_secondary = get_column_name_secondary(secondary_info) non_empty_secondary_info = [item for item in secondary_info if item] #clmn_name = clmn_name_main + clmn_name_secondary clmn_name = clmn_name_main + clmn_name_secondary + extra_info non_empty_info = non_empty_main_info + non_empty_secondary_info #print(f"main info: {main_info}") print(f"clmn name: {clmn_name}") print(f"non-empty info: {non_empty_info}") #print(f"length of non-empty info: {len(non_empty_main_info)}") cell_columns_appearance = flexible_search(dfs[i], non_empty_info) cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) print(f"length of cell_matches: {len(cell_matches)}") print(f"cell_matches: {cell_matches}") print(f"col_matches: {col_matches}") #clmn_name = map_user_input_to_standard_labels(user_patterns) #if len(clmn_name) < len(user_patterns): print(clmn_name) if len(cell_matches) == 0 and len(col_matches) == 0: print(f"this is df {i}, SEARCH IN ANOTHER DF") else: #IN COLUMNS if len(col_matches) == len(non_empty_info): column_index_list = get_column_index(col_matches) print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") #print(len(clm_idx)) #details in another table print(column_index_list) #if len(dfs[i]) <10: #break #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) #details in the same table #if len(dfs[i]) >10: selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) selected_columns_new2 = generate_current_table_without_cropping(column_index_list,dfs[i]) selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True) selected_columns_new.columns = clmn_name # must match number of columns #break #IN CELLS if len(cell_matches) == len(non_empty_info): row_index_list, column_index_list = get_row_column_indices(cell_matches) print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") #details in another table #if len(dfs[i]) <2: #selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) print(f"column names: {clmn_name}") print(f"column index list: {column_index_list}") selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) selected_columns_new2 = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True) break #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) ##details in the same table #if len(dfs[i]) >2: # #print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") #break return selected_columns_new # 3ayz akhaleehaa te search fel selected_columns column names nafsaha # 7ab2a 3ayz a3raf bardo maktooba ezay fel df el 7a2e2ya (akeed za ma el user medakhalha bezabt) def get_st_op_pattern(selected_columns, user_input): target = 'structural_opening' if target in selected_columns.columns: name = user_input[2] return name return None def find_text_in_plan(label, x): substring_coordinates = [] words = [] point_list = [] #None, None, None for tpl in x: if tpl[4] == label: substring_coordinates.append(calculate_midpoint(tpl[0],tpl[1],tpl[2],tpl[3]))# for pdf point_list.append(calculate_midpoint(tpl[1],tpl[0],tpl[3],tpl[2]))# for rotated words.append(tpl[4]) return substring_coordinates, words, point_list def get_selected_columns_by_index(df, column_index_list, user_patterns): selected_df = df.iloc[:, column_index_list] # Rename columns to match the structure of the clr_dictionary main_info, secondary_info = separate_main_secondary(user_patterns) clmn_name_main = get_column_name(main_info) clmn_name_secondary = get_column_name_secondary(secondary_info) clmn_name = clmn_name_main + clmn_name_secondary print(f"clmn_name from the function el 3amla moshkela: {clmn_name}") selected_df.columns = clmn_name return selected_df ## Get the column indices from extract_tables(schedule) def get_column_indices_from_dfs_normal(dfs, user_patterns): for i in range(len(dfs)): main_info, secondary_info = separate_main_secondary(user_patterns) clmn_name_main = get_column_name(main_info) non_empty_main_info = [item for item in main_info if item] clmn_name_secondary = get_column_name_secondary(secondary_info) non_empty_secondary_info = [item for item in secondary_info if item] clmn_name = clmn_name_main + clmn_name_secondary non_empty_info = non_empty_main_info + non_empty_secondary_info cell_columns_appearance = flexible_search(dfs[i], non_empty_info) cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) if len(cell_matches) == 0 and len(col_matches) == 0 and i < len(dfs) - 1: continue elif len(cell_matches) == 0 and len(col_matches) == 0: column_index_list = None else: #IN COLUMNS if len(col_matches) == len(non_empty_info): column_index_list = get_column_index(col_matches) print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") #print(f"column index list: {column_index_list}") break #IN CELLS if len(cell_matches) == len(non_empty_info): row_index_list, column_index_list = get_row_column_indices(cell_matches) print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") #print(f"column index list: {column_index_list}") break return column_index_list def find_missing_columns(complete_list, non_complete_list): def normalize_text(text): if not isinstance(text, str): return "" text = re.sub(r'\s+', '', text) # Remove all whitespace return text.lower() def normalize_text(text): """ Normalize text by removing all whitespace, brackets, and converting to lowercase. """ if not isinstance(text, str): return "" # Remove all whitespace characters (spaces, tabs, newlines) text = re.sub(r'\s+', '', text) # Remove brackets of any type text = re.sub(r'[\(\)\[\]\{\}]', '', text) return text.lower() complete_list = complete_list non_complete = non_complete_list # Normalize non_complete just once for speed normalized_non_complete = [normalize_text(item) for item in non_complete] missing = [] for item in complete_list: normalized_item = normalize_text(item) if normalized_item not in normalized_non_complete: missing.append(item) #delete empty fields as it is the 6 fixed fields approach missing = [item for item in missing if item] #print(f"{missing} can't be found in the schedule, make sure you entered it right or try entering the first row information instead of the column names") return missing # Returns the columns the code failed to locate on the schedule def check_missing(dfs, user_patterns): all_words = [] for i in range(len(dfs)): main_info, secondary_info = separate_main_secondary(user_patterns) clmn_name_main = get_column_name(main_info) non_empty_main_info = [item for item in main_info if item] clmn_name_secondary = get_column_name_secondary(secondary_info) non_empty_secondary_info = [item for item in secondary_info if item] clmn_name = clmn_name_main + clmn_name_secondary non_empty_info = non_empty_main_info + non_empty_secondary_info cell_columns_appearance = flexible_search(dfs[i], non_empty_info) cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) words = [dfs[i].iloc[row, col] for row, col in cell_matches] all_words.append(words) found_words = max(all_words, key=len) print(found_words) missings = find_missing_columns(user_patterns, found_words) return missings # get the index of dataframe that has the maximum column matches in the dfs from model table detection def get_df_index(dfs, user_patterns): df_matches = [] for i in range(len(dfs)): main_info, secondary_info = separate_main_secondary(user_patterns) clmn_name_main = get_column_name(main_info) non_empty_main_info = [item for item in main_info if item] clmn_name_secondary = get_column_name_secondary(secondary_info) non_empty_secondary_info = [item for item in secondary_info if item] clmn_name = clmn_name_main + clmn_name_secondary non_empty_info = non_empty_main_info + non_empty_secondary_info cell_columns_appearance = flexible_search(dfs[i], non_empty_info) cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) if len(cell_matches) == 0 and len(col_matches) == 0: continue else: column_index_list_from_columns = get_column_index(col_matches) row_index_list, column_index_list_from_cells = get_row_column_indices(cell_matches) if len(column_index_list_from_columns) > len(column_index_list_from_cells): df_matches.append((column_index_list_from_columns,i)) else: df_matches.append((column_index_list_from_cells,i)) longest_list = max(df_matches, key=lambda x: len(x[0])) #index of the longest list will be the df number index_longest_list = longest_list[1] return index_longest_list def get_word_locations_plan(flattened_list, plan_texts): locations = [] not_found = [] if len(flattened_list[0]) == 2: for lbl, clr in flattened_list: location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) if len(location) ==0: not_found.append(lbl) locations.append((location, lbl, clr)) if len(flattened_list[0]) == 3: for lbl, w, clr in flattened_list: location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) if len(location) ==0: not_found.append(lbl) locations.append((location, lbl, clr, w)) if len(flattened_list[0]) == 4: for lbl, w, h, clr in flattened_list: location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) if len(location) ==0: not_found.append(lbl) locations.append((location, lbl, clr, w, h)) return locations, not_found def get_repeated_labels(locations): seen_labels = set() repeated_labels = set() for item in locations: label = item[1] if label in seen_labels: repeated_labels.add(label) else: seen_labels.add(label) return repeated_labels def get_cleaned_data(locations): processed = defaultdict(int) new_data = [] if len(locations[0]) == 3: for coords, label, color in locations: if len(coords)>1: index = processed[label] % len(coords) # Round-robin indexing new_coord = [coords[index]] # Pick the correct coordinate new_data.append((new_coord, label, color)) processed[label] += 1 # Move to the next coordinate for this label if len(coords)==1: new_data.append((coords, label, color)) if len(locations[0]) == 4: for coords, label, color, w in locations: if len(coords)>1: index = processed[label] % len(coords) # Round-robin indexing new_coord = [coords[index]] # Pick the correct coordinate new_data.append((new_coord, label, color, w)) processed[label] += 1 # Move to the next coordinate for this label if len(coords)==1: new_data.append((coords, label, color, w)) if len(locations[0]) == 5: for coords, label, color, w, h in locations: if len(coords)>1: index = processed[label] % len(coords) # Round-robin indexing new_coord = [coords[index]] # Pick the correct coordinate new_data.append((new_coord, label, color, w, h)) processed[label] += 1 # Move to the next coordinate for this label if len(coords)==1: new_data.append((coords, label, color, w, h)) return new_data # law 0.5 maslan tetkatab we law mesh keda yesheel el decimal point def get_width_info_tobeprinted(new_data): width_info_tobeprinted = [] if len(new_data[0]) < 4: for _,_,_, in new_data: width_info_tobeprinted.append("N/A mm wide x N/A mm high") if len(new_data[0]) == 4: for _,_,_, w in new_data: #w = re.sub(r",", "", w) #w = int(float(w)) width_info_tobeprinted.append(w) if len(new_data[0]) == 5: for _,_,_, w,h in new_data: w = re.sub(r",", "", w) h = re.sub(r",", "", h) #if w == "N/A": #if w.isalpha(): if is_not_number(w): w = w else: if float(w).is_integer(): w = int(float(w)) else: w = w #if h == "N/A": #if h.isalpha(): if is_not_number(h): h = h else: if float(h).is_integer(): h = int(float(h)) else: h = h width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") return width_info_tobeprinted def clean_dimensions(text): # Remove commas and "mm" text = re.sub(r'[,\s]*mm', '', text) # Remove "mm" with optional spaces or commas before it text = text.replace(",", "") # Remove remaining commas if any return text def get_cleaned_width(width_info_tobeprinted): cleaned_width = [] for w in width_info_tobeprinted: cleaned_width.append(clean_dimensions(w)) return cleaned_width def get_widths_bb_format(cleaned_width, kelma): pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b" match = re.search(pattern, kelma) widths = [] for widthaa in cleaned_width: index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x")) width_name = widthaa[:index] height_name = widthaa[index+1:] width_name = int(float(width_name)) height_name = int(float(height_name)) if match: full_text = f"{width_name} mm wide x {height_name} mm high" else: full_text = f"{height_name} mm wide x {width_name} mm high" widths.append(full_text) return widths def is_not_number(s: str) -> bool: try: float(s) # accepts ints, floats, scientific notation return False # it *is* a number except ValueError: return True # not a number def get_width_info_tobeprinted_secondary(new_data, main_info, secondary_info): width_info_tobeprinted = [] secondary_info_tobeprinted = [] if len(main_info) == 2 and len(secondary_info) == 1: for coords, label, acous, color in new_data: secondary_info_tobeprinted.append(acous) width_info_tobeprinted.append("N/A mm wide x N/A mm high") if len(main_info) == 2 and len(secondary_info) == 2: for coords, label, acous, fire, color in new_data: secondary_info_tobeprinted.append((acous, fire)) width_info_tobeprinted.append("N/A mm wide x N/A mm high") if len(main_info) == 3 and len(secondary_info) == 1: for coords, label, width, acous, color in new_data: width_info_tobeprinted.append(width) secondary_info_tobeprinted.append(acous) if len(main_info) == 3 and len(secondary_info) == 2: for coords, label, width, acous, fire, color in new_data: width_info_tobeprinted.append(width) secondary_info_tobeprinted.append((acous, fire)) if len(main_info) == 4 and len(secondary_info) == 1: for coords, label, width, height, acous, color in new_data: w = re.sub(r",", "", width) h = re.sub(r",", "", height) #if w.isalpha(): if is_not_number(w): w = w else: if float(w).is_integer(): w = int(float(w)) else: w = w #if h == "N/A": #if h.isalpha(): if is_not_number(h): h = h else: if float(h).is_integer(): h = int(float(h)) else: h = h width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") secondary_info_tobeprinted.append((acous)) if len(main_info) == 4 and len(secondary_info) == 2: for coords, label, width, height, acous, fire, color in new_data: print(type(width)) print(type(height)) w = re.sub(r",", "", width) h = re.sub(r",", "", height) #if w == "N/A": #if w.isalpha(): if is_not_number(w): w = w else: if float(w).is_integer(): w = int(float(w)) else: w = w #if h == "N/A": #if h.isalpha(): if is_not_number(h): h = h else: if float(h).is_integer(): h = int(float(h)) else: h = h width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") secondary_info_tobeprinted.append((acous, fire)) return width_info_tobeprinted, secondary_info_tobeprinted def get_word_locations_plan_secondary(flattened_list, plan_texts, main_info, secondary_info): #hena fe 7alet en keda keda fe secondary information locations = [] not_found = [] len_main = len(main_info) #3 or #4 #sometimes maybe 2 len_secondary = len(secondary_info) #2 or #1 if len_main == 2 and len_secondary == 2: for lbl, clr, acoustic, fire in flattened_list: location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) if len(location) ==0: not_found.append(lbl) locations.append((location, lbl, clr, acoustic, fire)) if len_main == 2 and len_secondary == 1: for lbl, clr, acoustic in flattened_list: location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) if len(location) ==0: not_found.append(lbl) locations.append((location, lbl, clr, acoustic)) if len_main == 3 and len_secondary == 2: for lbl, w, clr, acoustic, fire in flattened_list: location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) if len(location) ==0: not_found.append(lbl) locations.append((location, lbl, w, clr, acoustic, fire)) if len_main == 3 and len_secondary == 1: for lbl, w, clr, acoustic in flattened_list: location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) if len(location) ==0: not_found.append(lbl) locations.append((location, lbl, w, clr, acoustic)) if len_main == 4 and len_secondary == 2: for lbl, w, h, clr, acoustic, fire in flattened_list: location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) if len(location) ==0: not_found.append(lbl) locations.append((location, lbl, w, h, clr, acoustic, fire)) if len_main == 4 and len_secondary == 1: for lbl, w, h, clr, acoustic in flattened_list: location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) if len(location) ==0: not_found.append(lbl) locations.append((location, lbl, w, h, clr,acoustic)) return locations, not_found ### newest, accept combined table def get_similar_colors_all(selected_columns_new): def generate_rgb(): return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) unique_keys = selected_columns_new['door_type'].unique() key_colors = {key: generate_rgb() for key in unique_keys} #Column fields clmns_fields = selected_columns_new.columns.to_list() def col_template(): d = { 'values': [], 'color_annot': None } for field in clmns_fields: d[field] = [] return d col_dict = defaultdict(col_template) for _, row in selected_columns_new.iterrows(): key = row['door_type'] col_dict[key]['values'].append(row['door_id']) for field in clmns_fields: col_dict[key][field].append(row.get(field, None)) col_dict[key]['color_annot'] = key_colors[key] return dict(col_dict) ### newest, accept combined table def get_flattened_tuples_list_all(col_dict): exclude_fields = ['door_type', 'values'] flattened_list = [] for values_dict in col_dict.values(): # All fields that are lists and not in the excluded fields list_fields = [k for k, v in values_dict.items() if isinstance(v, list) and k not in exclude_fields] n_rows = len(values_dict[list_fields[0]]) if list_fields else 0 for i in range(n_rows): tuple_row = tuple(values_dict[field][i] for field in list_fields) + (values_dict['color_annot'],) flattened_list.append(tuple_row) return flattened_list def get_flattened_tuples_list_no_doortype(selected_columns): flattened_list_no_color = list(selected_columns.itertuples(name=None, index=False)) col = (0,0,255) new_fl_list = [] for tu in flattened_list_no_color: new_fl_list.append(tu + (col,)) return new_fl_list #SECONDARY def get_cleaned_data_secondary(locations, main_info, secondary_info): processed = defaultdict(int) new_data = [] if len(main_info) == 2 and len(secondary_info) == 1: for coords, label, color, acous in locations: if len(coords)>1: index = processed[label] % len(coords) # Round-robin indexing new_coord = [coords[index]] # Pick the correct coordinate new_data.append((new_coord, label, color, acous)) processed[label] += 1 # Move to the next coordinate for this label if len(coords)==1: new_data.append((coords, label, color, acous)) if len(main_info) == 2 and len(secondary_info) == 2: for coords, label, color, acous, fire in locations: if len(coords)>1: index = processed[label] % len(coords) # Round-robin indexing new_coord = [coords[index]] # Pick the correct coordinate new_data.append((new_coord, label, color, acous, fire)) processed[label] += 1 # Move to the next coordinate for this label if len(coords)==1: new_data.append((coords, label, color, acous, fire)) if len(main_info) == 3 and len(secondary_info) == 1: for coords, label, width, color, acous in locations: if len(coords)>1: index = processed[label] % len(coords) # Round-robin indexing new_coord = [coords[index]] # Pick the correct coordinate new_data.append((new_coord, label, width, color, acous)) processed[label] += 1 # Move to the next coordinate for this label if len(coords)==1: new_data.append((coords, label, width, color, acous)) if len(main_info) == 3 and len(secondary_info) == 2: for coords, label, width, color, acous, fire in locations: if len(coords)>1: index = processed[label] % len(coords) # Round-robin indexing new_coord = [coords[index]] # Pick the correct coordinate new_data.append((new_coord, label, width, color, acous, fire)) processed[label] += 1 # Move to the next coordinate for this label if len(coords)==1: new_data.append((coords, label, width, color, acous, fire)) if len(main_info) == 4 and len(secondary_info) == 1: for coords, label, width, height, color, acous in locations: if len(coords)>1: index = processed[label] % len(coords) # Round-robin indexing new_coord = [coords[index]] # Pick the correct coordinate new_data.append((new_coord, label, width, height, color, acous)) processed[label] += 1 # Move to the next coordinate for this label if len(coords)==1: new_data.append((coords, label, width, height, color, acous)) if len(main_info) == 4 and len(secondary_info) == 2: for coords, label, width, height, color, acous, fire in locations: if len(coords)>1: index = processed[label] % len(coords) # Round-robin indexing new_coord = [coords[index]] # Pick the correct coordinate new_data.append((new_coord, label, width, height, color, acous, fire)) processed[label] += 1 # Move to the next coordinate for this label if len(coords)==1: new_data.append((coords, label, width, height, color, acous, fire)) return new_data def merge_pdf_bytes_list(pdfs): writer = PdfWriter() for pdf_bytes in pdfs: pdf_stream = io.BytesIO(pdf_bytes) reader = PdfReader(pdf_stream) for page in reader.pages: writer.add_page(page) output_stream = io.BytesIO() writer.write(output_stream) output_stream.seek(0) return output_stream.read() def calculate_bounding_rect_count(vertices,padding): x, y = vertices[0] xmin = x - padding ymin = y - padding xmax = x + padding ymax = y + padding return [xmin, ymin, xmax, ymax] def rgb_string_to_hex(rgb_string): r, g, b = map(float, rgb_string.strip().split()) return '#{:02X}{:02X}{:02X}'.format(int(r * 255), int(g * 255), int(b * 255)) def generate_annotation_xml_block_count(vertices, area_text, author, custom_data: dict, column_order: list, index: int, label: str = '',height:str='',width:str='', color:str='',countstyle:str='',countsize:str=''): now = datetime.datetime.utcnow() mod_date = now.strftime("D:%Y%m%d%H%M%S+00'00'") creation_date = now.isoformat() + 'Z' id_str = "fitz-" + uuid.uuid4().hex[:4].upper() vert_str = ' '.join([f'{x:.4f}' for point in vertices for x in point]) ordered_column_values = [f'({custom_data.get(col, "")})' for col in column_order] bsi_column_data = ''.join(ordered_column_values) type_internal= 'Bluebeam.PDF.Annotations.AnnotationMeasureCount' subject ='Count Measurement' padding=10 rectvertices=calculate_bounding_rect_count(vertices,padding) bbmeasure = '''<>] /D[<>] /A[<>] /T[<>] /V[<>] /TargetUnitConversion 0.3527778>>''' raw_text = f'''<< /Version 1 /DS(font: Helvetica 12pt; text-align:center; line-height:13.8pt; color:#FF0000) /CountStyle{countstyle} /CountScale {countsize} /MeasurementTypes 128 /BBMeasure{bbmeasure} /NumCounts {area_text} /AP<> /IT/PolygonCount /Vertices[{vert_str}] /IC[{color}] /T({author}) /CreationDate({mod_date}) /BSIColumnData[{bsi_column_data}] /RC(

{area_text}

) /Label({label}) /Height {height} /Width {width} /Subj({subject}) /NM({id_str}) /Subtype/Polygon /Rect[{rectvertices[0]} {rectvertices[1]} {rectvertices[2]} {rectvertices[3]}] /Contents({area_text}) /F 4 /C[{color}] /BS<> /M({mod_date}) >>'''.encode('utf-8') compressed = zlib.compress(raw_text) base64_raw = base64.b16encode(compressed).lower().decode() annotation = Element('Annotation') SubElement(annotation, 'Page') ############## newline ##################### SubElement(annotation, 'Contents').text = area_text SubElement(annotation, 'ModDate').text = creation_date SubElement(annotation, 'Color').text = rgb_string_to_hex(color) ############## newline ##################### SubElement(annotation, 'Type').text = 'Polygon' SubElement(annotation, 'ID').text = id_str SubElement(annotation, 'TypeInternal').text = type_internal SubElement(annotation, 'Raw').text = base64_raw SubElement(annotation, 'Index').text = str(index) custom = SubElement(annotation, 'Custom') for key, value in custom_data.items(): SubElement(custom, key).text = value SubElement(annotation, 'Subject').text = subject SubElement(annotation, 'CreationDate').text = creation_date SubElement(annotation, 'Author').text = author SubElement(annotation, 'Label').text = label SubElement(annotation, 'Height').text = height SubElement(annotation, 'Width').text = width return annotation def save_multiple_annotations_count_bax(annotations, output_path, column_order,pdfWidth,pdfHeight, num_pages): ##new parameter for page number handling """ annotations: list of dicts, each with: - vertices: [x, y] - text: str (label) - author: ADR - custom_data: dict of custom field values - type_internal: str (e.g., Bluebeam.PDF.Annotations.AnnotationMeasureCount) - subject: str (e.g., Count Measurement) """ doc = Element('Document', Version='1') #group annotations by page number annotations_by_page = defaultdict(list) for ann in annotations: page_num = ann.get('page', 1) annotations_by_page[page_num].append(ann) # Loop through ALL pages # for page_index, (page_num, page_annotations) in enumerate(sorted(annotations_by_page.items())): for page_index in range(num_pages): ##new line for page handling page = SubElement(doc, 'Page', Index=str(page_index)) SubElement(page, 'Label').text = str(page_index + 1) ##new line for page handling SubElement(page, 'Width').text = str(pdfWidth) SubElement(page, 'Height').text = str(pdfHeight) #adding annotations only if they exist for i, ann in enumerate(annotations_by_page.get(page_index + 1, [])): ## adjusted for page handling annotation_xml = generate_annotation_xml_block_count( vertices=ann['vertices'], area_text=ann['text'], author=ann['author'], custom_data=ann['custom_data'], column_order=column_order, index=i, label=ann.get('label', 'label1'), height=ann.get('height', '123'), width=ann.get('width', '123'), color=ann.get('color', ''), countstyle=ann.get('countstyle', ''), countsize=ann.get('countsize','') ) annotation_xml.find('Page').text = str(page_index+1) ## adjusted for page handling page.append(annotation_xml) # pretty_xml = parseString(tostring(doc)).toprettyxml(indent=" ") # with open(output_path, 'w', encoding='utf-8') as f: # f.write(pretty_xml) pretty_xml= tostring(doc, encoding="unicode", method="xml") print(f"Saved {len(annotations)} annotations to {output_path}") return pretty_xml #templates of countstyles so u can call e.g. CountStyles['Circle'] CountStyles = { 'Circle': '/Circle', 'Diamond':'/Diamond', 'Triangle':'/Triangle', 'Square':'/Square', 'Checkmark':'/Checkmark', } def convert_to_bytes(input_pdf_path): with open(input_pdf_path, "rb") as file: original_pdf_bytes = file.read() return original_pdf_bytes def mirrored_points(x, y, height_plan): #'vertices': [[new_data[i][0][0][0], new_data[i][0][0][1]]], mirrored = [] mirrored.append([x, height_plan - y]) return mirrored def point_mupdf_to_pdf(x, y, page): mediabox = page.mediabox H = float(mediabox.height) # Use mediabox height, not rect height pdf_x = mediabox.x0 + x pdf_y = mediabox.y0 + (H - y) return [[pdf_x, pdf_y]] # Modified to adjust mirrored points def create_bb_bax_secondary(new_data, widthat, heightat, secondary_tobeprinted, CountStyles, input_user_clmn_names, page_number, height_plan): bax_annotations = [] for i in range(len(new_data)): r,g,b = new_data[i][len(new_data[i])-1] # colorr R = str(float(r/255)) G = str(float(g/255)) B = str(float(b/255)) #vertix = mirrored_points(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) vertix = point_mupdf_to_pdf(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) if input_user_clmn_names[4] and input_user_clmn_names[5]: bax_annotations.append({ 'vertices': vertix, 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way 'author': 'ADR', 'custom_data': {'FireRating': secondary_tobeprinted[i][0], 'AcousticRating': secondary_tobeprinted[i][1], 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add ) 'label': new_data[i][1], #change label to whatever u want 'Height': heightat[i], #for tameem to change - i added any values' 'Width':widthat[i], 'page' : page_number, 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) 'countstyle': CountStyles['Circle'], 'countsize':'0.8' #how big or small is the count icon }) else: # Fire mawgooda if input_user_clmn_names[4]: bax_annotations.append({ 'vertices': vertix, 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way 'author': 'ADR', 'custom_data': {'FireRating': secondary_tobeprinted[i], 'AcousticRating': 'N/A', 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add ) 'label': new_data[i][1], #change label to whatever u want 'Height': heightat[i], #for tameem to change - i added any values' 'Width':widthat[i], 'page' : page_number, 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) 'countstyle': CountStyles['Circle'], 'countsize':'0.8' #how big or small is the count icon }) elif input_user_clmn_names[5]: bax_annotations.append({ 'vertices': vertix, 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way 'author': 'ADR', 'custom_data': {'FireRating': 'N/A', 'AcousticRating': secondary_tobeprinted[i], 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add ) 'label': new_data[i][1], #change label to whatever u want 'Height': heightat[i], #for tameem to change - i added any values' 'Width':widthat[i], 'page' : page_number, 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) 'countstyle': CountStyles['Circle'], 'countsize':'0.8' #how big or small is the count icon }) return bax_annotations # Modified to adjust mirrored points def create_bb_bax(new_data, widthat, heightat, CountStyles, page_number, height_plan): bax_annotations = [] for i in range(len(new_data)): #r,g,b = new_data[i][len(new_data[i])-2] # colorr r,g,b = new_data[i][2] # colorr R = str(float(r/255)) G = str(float(g/255)) B = str(float(b/255)) #vertix = mirrored_points(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) vertix = point_mupdf_to_pdf(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) bax_annotations.append({ 'vertices': vertix, 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way 'author': 'ADR', 'custom_data': {'FireRating': 'N/A', 'AcousticRating': 'N/A', 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add ) 'label': new_data[i][1], #change label to whatever u want 'height': heightat[i], #for tameem to change - i added any values' 'width':widthat[i], 'page' : page_number, 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) 'countstyle': CountStyles['Circle'], 'countsize':'0.8' #how big or small is the count icon }) return bax_annotations def add_location(col_dict, plan_texts): not_found = [] for key_outer, value_outer in col_dict.items(): locations = [] for id in value_outer['door_id']: location, _,_ = find_text_in_plan(id, plan_texts) if len(location) == 0: not_found.append(id) locations.append(location) value_outer['location'] = locations return col_dict, not_found import pandas as pd def _ensure_color_tuple(x): if x is None or isinstance(x, tuple): return x try: return tuple(x) except Exception: return x def _ensure_list_of_tuples(val): if val is None: return [] if isinstance(val, tuple): return [val] if isinstance(val, list): out = [] for item in val: if item is None: continue if isinstance(item, tuple): out.append(item) elif isinstance(item, list): out.append(tuple(item)) else: try: out.append(tuple(item)) except Exception: pass return out try: return [tuple(val)] except Exception: return [] def grouped_to_dataframe_dynamic(grouped, keep_group=False, explode_locations=False, drop_empty_locations=False): rows = [] for group_key, block in grouped.items(): ids = block.get('door_id') or block.get('values') or [] list_lengths = [len(v) for v in block.values() if isinstance(v, list)] n = max(list_lengths + [len(ids)]) if (list_lengths or ids) else 0 if n == 0: continue for i in range(n): row = {} door_id = ids[i] if i < len(ids) else f"{group_key}:{i}" row['door_id'] = door_id for k, v in block.items(): if k == 'values': continue val = (v[i] if isinstance(v, list) and i < len(v) else (v if not isinstance(v, list) else None)) if k == 'color': val = _ensure_color_tuple(val) elif k == 'location': val = _ensure_list_of_tuples(val) row[k] = val if keep_group: row['source_group'] = group_key rows.append(row) df = pd.DataFrame(rows) # dynamic union of keys # If there's a 'location' column, normalize + optionally drop empties / explode if 'location' in df.columns: df['location'] = df['location'].apply(_ensure_list_of_tuples) if drop_empty_locations: df = df[df['location'].map(lambda xs: len(xs) > 0)].reset_index(drop=True) if explode_locations: # after filtering empties, explode so each row has a single (x,y) tuple df = df.explode('location', ignore_index=True) return df # Modify it to return widths and height from width, height columns def get_width_clean_width_height(width_list, height_list): widths = [] heights = [] for width in width_list: w = re.sub(r",", "", width) if is_not_number(w): w = w else: if float(w).is_integer(): w = int(float(w)) else: w = w w = str(w) widths.append(w) for height in height_list: h = re.sub(r",", "", height) if is_not_number(h): h = h else: if float(h).is_integer(): h = int(float(h)) else: h = h h = str(h) heights.append(h) return widths, heights def get_widths_bb_format_st_op(cleaned_width, kelma): pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b" match = re.search(pattern, kelma) widths = [] heights = [] for widthaa in cleaned_width: index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x")) width_name = widthaa[:index] height_name = widthaa[index+1:] width_name = int(float(width_name)) height_name = int(float(height_name)) if match: full_text = f"{width_name} mm wide x {height_name} mm high" width = width_name height = height_name else: width = height_name height = width_name widths.append(width) heights.append(height) return widths, heights # New for new dictionary logic def create_bb_bax_new(df_points, CountStyles, page_number, height_plan): bax_annotations = [] exclude = {"location", "color_annot"} for _, row in df_points.iterrows(): rw = row customDta = row.drop(labels=exclude, errors="ignore").to_dict() r,g,b = rw['color_annot'] R = str(float(r/255)) G = str(float(g/255)) B = str(float(b/255)) x, y = rw['location'] vertix = point_mupdf_to_pdf(x, y, height_plan) bax_annotations.append({ 'vertices': vertix, 'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way 'author': 'ADR', 'custom_data': customDta, #identify custom colums here as( Column name: Text to add ) 'label': rw['door_id'], #change label to whatever u want 'page' : page_number, 'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255) 'countstyle': CountStyles['Circle'], 'countsize':'0.8' #how big or small is the count icon }) return bax_annotations, customDta #Handle missing widths or heights in some rows def generate_separate_dimensions(widths): widthat = [] heightat = [] #pattern = r'(\d+)\s*mm wide x\s*(\d+)\s*mm high' pattern = r'(\d+(?:\.\d+)?)\s*mm wide x\s*(\d+(?:\.\d+)?)\s*mm high' for s in widths: match = re.match(pattern, s) if match: width = match.group(1) height = match.group(2) widthat.append(width) heightat.append(height) else: widthat.append("N/A") heightat.append("N/A") return widthat, heightat def generate_bluebeam_columns_raw(column_names): """ Generate BluebeamUserDefinedColumns XML as raw string, without headers or extra fields. """ root = Element("BluebeamUserDefinedColumns") for idx, name in enumerate(column_names): item = SubElement(root, "BSIColumnItem", Index=str(idx), Subtype="Text") SubElement(item, "Name").text = name SubElement(item, "DisplayOrder").text = str(idx) SubElement(item, "Deleted").text = "False" SubElement(item, "Multiline").text = "False" # Convert to string and decode raw bytes return tostring(root, encoding="unicode", method="xml") def pick_approach(schedule, plan, searcharray, flag): not_found_list = [] missings = [] no_tables = False for p in plan: for k in range(len(schedule)): if flag == 1: dfs = extract_tables(schedule[k]) if flag == 2: dfs = extract_tables_model(schedule[k]) user_input_this_schedule = searcharray[k] for j in range(len(user_input_this_schedule)): user_input = user_input_this_schedule[j] secondary_presence = False if user_input[4] or user_input[5]: secondary_presence = True main_info_, secondary_info_ = separate_main_secondary(user_input) main_info = [item for item in main_info_ if item] secondary_info = [item for item in secondary_info_ if item] selected_columns_combined = get_selected_columns_all(dfs, user_input) if selected_columns_combined is None: dfs_normal = extract_tables(schedule[k]) column_indices = get_column_indices_from_dfs_normal(dfs_normal, user_input) if column_indices is None: missing_clmns = check_missing(dfs, user_input) missing_message = f"{missing_clmns} can't be extracted from table input {j+1} in schedule {k+1}" missings.append(missing_message) no_tables = True continue # continue to the next user input if len(dfs) == 1: selected_columns_combined = get_selected_columns_by_index(dfs[0], column_indices, user_input) if len(dfs) > 1: index_df = get_df_index(dfs, user_input) selected_columns_combined = get_selected_columns_by_index(dfs[index_df], column_indices, user_input) selected_columns_combined = selected_columns_combined.applymap(lambda x: 'N/A' if isinstance(x, str) and x.strip() == '' else x) selected_columns_combined = selected_columns_combined.fillna('N/A') selected_columns_combined = selected_columns_combined.replace(r'(?i)\bn/a\b', 'N/A', regex=True) kelma = get_st_op_pattern(selected_columns_combined, user_input) if "door_type" in selected_columns_combined.columns: col_dict = get_similar_colors_all(selected_columns_combined) flattened_list = get_flattened_tuples_list_all(col_dict) else: if secondary_presence: main_info = main_info + [""] flattened_list = get_flattened_tuples_list_no_doortype(selected_columns_combined) plan_texts = read_text(p) if secondary_presence: locations, not_found = get_word_locations_plan_secondary(flattened_list,plan_texts, main_info, secondary_info) not_found_list.append(not_found) else: locations, not_found = get_word_locations_plan(flattened_list,plan_texts) not_found_list.append(not_found) ## Getting the not found in all plans flattened_not_found_list = [item for sublist in not_found_list for item in sublist] from collections import Counter counts_not_found = Counter(flattened_not_found_list) not_found_any_plan = [] for key, value in counts_not_found.items(): if value == len(plan): not_found_any_plan.append(key) not_found_any_plan = [item for item in not_found_any_plan if item != "N/A"] return no_tables, not_found_any_plan def get_df_csv(sch): with open(sch, "rb") as f: raw = f.read(100_000) # read first 100 KB (enough for detection) guess = chardet.detect(raw) #print(guess) # {'encoding': 'Windows-1252', 'confidence': 0.73, ...} encoding = guess["encoding"] or "utf-8" # fallback df = pd.read_csv(sch, encoding=encoding) return df def mainRun(schedule, plan, searcharray, sch_csv_pdf): if sch_csv_pdf: print("shcedule type is PDF") no_tables_normal, not_found_any_plan_normal = pick_approach(schedule, plan, searcharray, 1) try: no_tables_model, not_found_any_plan_model = pick_approach(schedule, plan, searcharray, 2) except: print("Model detection has issue of file too large") #no_tables_model = True pick_normal = False pick_model = False if no_tables_model: pick_normal = True #print("choose normal") elif no_tables_normal: pick_model = True #print("choose model") elif no_tables_model and no_tables_normal: print("el etneen bayzeen") else: ## Decide according to the not found labels #print("el etneen shaghaleen") if len(not_found_any_plan_model) > len(not_found_any_plan_normal): #print("choose not_found_any_plan_normal") pick_normal = True elif len(not_found_any_plan_model) < len(not_found_any_plan_normal): pick_model = True #print("choose not_found_any_plan_model") else: # law ad ba3d choose the older approach (fitz) pick_normal = True #print("choose any") else: print("schedule type is CSV") df = get_df_csv(schedule[0]) print(df) print("mainRun is RUNNING") #print(type(plan)) eltype = type(plan) print(f"el type beta3 variable plan:: {eltype}") len_plan = len(plan) print(f"length of the plan's array is: {len_plan}") p1_type = type(plan[0]) print(f"el mawgood fe p[0]: {p1_type}") print(f"length of search array: {len(searcharray)}") #dfs = extract_tables(schedule) print(f"type of schedule: {type(schedule)}") print(f"length of schedules: {len(schedule)}") pdf_widths = [] pdf_heights = [] pdfs_count_type = [] annotation_counter = 0 page_number = 0 bax_annotations_all_inputs = [] #for the same plan #pdfs = [] not_found_list = [] repeated_labels_list = [] missings = [] for p in plan: annotation_counter +=1 page_number +=1 pdf_document = fitz.open("pdf", p) # Get the first page (0-indexed) page = pdf_document[0] rect = page.rect # Rectangle: contains x0, y0, x1, y1 width_plan = page.cropbox.width # or: width = rect.x1 - rect.x0 height_plan = page.cropbox.height # or: height = rect.y1 - rect.y0 #width_plan = math.ceil(width_plan) #height_plan = math.ceil(height_plan) for k in range(len(schedule)): if sch_csv_pdf and pick_normal: dfs = extract_tables(schedule[k]) if sch_csv_pdf and pick_model: dfs = extract_tables_model(schedule[k]) if sch_csv_pdf == False: df = get_df_csv(schedule[k]) dfs = [df] user_input_this_schedule = searcharray[k] for j in range(len(user_input_this_schedule)): user_input = user_input_this_schedule[j] secondary_presence = False if user_input[4] or user_input[5]: secondary_presence = True main_info_, secondary_info_ = separate_main_secondary(user_input) main_info = [item for item in main_info_ if item] secondary_info = [item for item in secondary_info_ if item] print("feh secondary information") if user_input[4]: print("Fire rate mawgooda") if user_input[5]: print("Acoustic Rate mawgooda") else: print("mafeesh secondary information") selected_columns_combined = get_selected_columns_all(dfs, user_input) if sch_csv_pdf: if selected_columns_combined is None: dfs_normal = extract_tables(schedule[k]) column_indices = get_column_indices_from_dfs_normal(dfs_normal, user_input) if column_indices is None: missing_clmns = check_missing(dfs, user_input) missing_message = f"{missing_clmns} can't be extracted from table input {j+1} in schedule {k+1}" missings.append(missing_message) continue # continue to the next user input if len(dfs) == 1: selected_columns_combined = get_selected_columns_by_index(dfs[0], column_indices, user_input) if len(dfs) > 1: index_df = get_df_index(dfs, user_input) selected_columns_combined = get_selected_columns_by_index(dfs[index_df], column_indices, user_input) selected_columns_combined = selected_columns_combined.applymap(lambda x: 'N/A' if isinstance(x, str) and x.strip() == '' else x) selected_columns_combined = selected_columns_combined.fillna('N/A') selected_columns_combined = selected_columns_combined.replace(r'(?i)\bn/a\b', 'N/A', regex=True) kelma = get_st_op_pattern(selected_columns_combined, user_input) if "door_type" in selected_columns_combined.columns: col_dict = get_similar_colors_all(selected_columns_combined) flattened_list = get_flattened_tuples_list_all(col_dict) else: if secondary_presence: main_info = main_info + [""] # new logic can handle it #col_dict = get_similar_colors_all(selected_columns_combined) flattened_list = get_flattened_tuples_list_no_doortype(selected_columns_combined) plan_texts = read_text(p) #locations, not_found = get_word_locations_plan_secondary(flattened_list,plan_texts, main_info, secondary_info) #not_found_list.append(not_found) #new_data3 = get_cleaned_data_secondary(locations,main_info,secondary_info) #repeated_labels = get_repeated_labels(locations) #repeated_labels = list(repeated_labels) #repeated_labels_list.append(repeated_labels) col_dict, not_found = add_location(col_dict, plan_texts) not_found_list.append(not_found) df_points = grouped_to_dataframe_dynamic(col_dict, drop_empty_locations=True, explode_locations=True) #df_points.columns = df_points.columns.str.strip().str.replace(r"\s+", "_", regex=True) # Clean column names df_points.columns = (df_points.columns .str.strip() .str.replace(r"[^\w-]+", "_", regex=True) .str.replace(r"_+", "_", regex=True) .str.strip("_")) print(f"col_dict: {col_dict}") print(f"selected_columns_combined: {selected_columns_combined}") print(f"df: {df_points}") if df_points.empty: continue # to the next user input # handling no door type in the new dictionary logic if 'color_annot' not in df_points: df_points['color_annot'] = (0, 0, 255) dupes = df_points['door_id'].value_counts() repeated_ids = dupes[dupes > 1].index.to_list() repeated_labels_list.append(repeated_ids) if ('width' in df_points and 'height' in df_points) or 'structural_opening' in df_points: if kelma: lst_st_op = df_points["structural_opening"].tolist() cleaned_st_op = get_cleaned_width(lst_st_op) widths, heights = get_widths_bb_format_st_op(cleaned_st_op, kelma) # remove a column (returns a new df) df_points = df_points.drop(columns=['structural_opening']) # add two columns (scalars, lists/arrays/Series of length len(df), or expressions) df_points['width'] = widths # e.g., a list/Series/np.array or a scalar df_points['height'] = heights else: # make sure they are strings first to keep the flow of get_width_clean_width_height function df_points['width'] = df_points['width'].astype('string') df_points['height'] = df_points['height'].astype('string') lst_width = df_points["width"].tolist() lst_height = df_points["height"].tolist() clean_widths, clean_height = get_width_clean_width_height(lst_width, lst_height) df_points["width"] = clean_widths df_points["height"] = clean_height df_points = df_points.rename(columns={'width': 'Width_', 'height':'Height_'}) #if kelma == None: #widths, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info) #else: #width_info_tobeprinted, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info) #cleaned_width = get_cleaned_width(width_info_tobeprinted) #widths = get_widths_bb_format(cleaned_width, kelma) #Count type annotation #widht_count, height_count = generate_separate_dimensions(widths) #bax = create_bb_bax_secondary(new_data3, widht_count, height_count, secondary_tobeprinted, CountStyles, user_input, page_number, page) #bax_annotations_all_inputs.append(bax) print(f"color_annot: {df_points['color_annot']}") print(f"df: {df_points}") bax, customDta = create_bb_bax_new(df_points, CountStyles, page_number, page) bax_annotations_all_inputs.append(bax) # if it is not byte type #pdfs_count_type.append(convert_to_bytes(p)) pdfs_count_type.append(p) pdf_widths.append(width_plan) pdf_heights.append(height_plan) merged_pdf = merge_pdf_bytes_list(pdfs_count_type) print(f"number of pges of merged_pdf is {len(merged_pdf)} and its type is {type(merged_pdf)}") bax_annotation = [] for bax_ann in bax_annotations_all_inputs: bax_annotation.extend(bax_ann) #column_order = ['FireRating', 'AcousticRating', 'Height_', 'Width_'] column_order = [] for key in customDta.keys(): column_order.append(key) ## Getting the not found in all plans flattened_not_found_list = [item for sublist in not_found_list for item in sublist] counts_not_found = Counter(flattened_not_found_list) not_found_any_plan = [] for key, value in counts_not_found.items(): if value == len(pdfs_count_type): not_found_any_plan.append(key) flattened_repeated_labels_list = [item for sublist in repeated_labels_list for item in sublist] pretty_xml = save_multiple_annotations_count_bax(bax_annotation, 'count_type_Windows.bax', column_order,pdf_widths,pdf_heights,page_number) column_xml = generate_bluebeam_columns_raw(column_order) repeated_labels = flattened_repeated_labels_list ##### SHOULD return pretty_xml, column_xml, merged_pdf not_found = [item for item in not_found_any_plan if item != "N/A"] annotatedimgs=[] doc2 =fitz.open('pdf',merged_pdf) len_doc2 = len(doc2) list1=pd.DataFrame(columns=['content', 'id', 'subject','color']) print(f"number of pges of doc2 is {len_doc2} and its type is {type(doc2)}") for page in doc2: print("now inside page in doc2") # page=doc2[0] pix = page.get_pixmap() # render page to an image pl=Image.frombytes('RGB', [pix.width,pix.height],pix.samples) img=np.array(pl) annotatedimg = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) annotatedimgs.append(annotatedimg) # Iterate through annotations on the page annotations_page = page.annots() print(f"annotations: {annotations_page}") ''' for annot in page.annots(): # Get the color of the annotation print("ann: {annot}") annot_color = annot.colors if annot_color is not None: # annot_color is a dictionary with 'stroke' and 'fill' keys print(annot_color) stroke_color = annot_color.get('stroke') # Border color fill_color = annot_color.get('fill') # Fill color if fill_color: v='fill' # print('fill') if stroke_color: v='stroke' x,y,z=int(annot_color.get(v)[0]*255),int(annot_color.get(v)[1]*255),int(annot_color.get(v)[2]*255) print(f"x: {x}") print(f"y: {y}") print(f"z: {z}") list1.loc[len(list1)] =[annot.info['content'],annot.info['id'],annot.info['subject'],[x,y,z]] print(f"list1 : {list1}") ''' return annotatedimgs, doc2 , list1, repeated_labels , not_found, pretty_xml, column_xml