LatestDuplicate_Working / Doors_Schedule.py
Marthee's picture
Update Doors_Schedule.py
fb5d442 verified
from collections import defaultdict
from collections import Counter
import pandas as pd
import random
import math
import re
import io
import pypdfium2 as pdfium
import fitz
from PIL import Image, ImageDraw
from PyPDF2 import PdfReader, PdfWriter
from PyPDF2.generic import TextStringObject, NameObject, ArrayObject, FloatObject
from PyPDF2.generic import NameObject, TextStringObject, DictionaryObject, FloatObject, ArrayObject
from PyPDF2 import PdfReader
from PyPDF2.generic import TextStringObject
import numpy as np
import cv2
from collections import defaultdict
import random
import fitz # PyMuPDF
import PyPDF2
import io
from PyPDF2.generic import TextStringObject # ✅ Required for setting string values
from PyPDF2 import PdfReader, PdfWriter
import zlib
import base64
import datetime
import uuid
from xml.etree.ElementTree import Element, SubElement, tostring, ElementTree
from xml.dom.minidom import parseString
from collections import defaultdict
from xml.etree.ElementTree import Element, SubElement, tostring
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential
import chardet
def convert2img(path):
pdf = pdfium.PdfDocument(path)
page = pdf.get_page(0)
pil_image = page.render().to_pil()
pl1=np.array(pil_image)
img = cv2.cvtColor(pl1, cv2.COLOR_RGB2BGR)
return img
def convert2pillow(path):
pdf = pdfium.PdfDocument(path)
page = pdf.get_page(0)
pil_image = page.render().to_pil()
return pil_image
def calculate_midpoint(x1,y1,x2,y2):
xm = int((x1 + x2) / 2)
ym = int((y1 + y2) / 2)
return (xm, ym)
def read_text(input_pdf_path):
pdf_document = fitz.open('pdf',input_pdf_path)
for page_num in range(pdf_document.page_count):
page = pdf_document[page_num]
text_instances = page.get_text("words")
page.apply_redactions()
return text_instances
def normalize_text(text):
"""
Normalize text by removing all whitespace characters and converting to lowercase.
"""
if not isinstance(text, str):
return ""
# Remove all whitespace characters (spaces, tabs, newlines)
text = re.sub(r'\s+', '', text)
return text.lower()
def build_flexible_regex(term):
"""
Match the full string, allowing whitespace or light punctuation between words,
but not allowing extra words or partial matches.
"""
words = normalize_text(term).split()
pattern = r'[\s\.\:\-]*'.join(map(re.escape, words))
full_pattern = rf'^{pattern}$'
return re.compile(full_pattern, re.IGNORECASE)
def flexible_search(df, search_terms):
"""
Search for terms in column names and top N rows.
Returns matched column indices and cell positions.
"""
normalized_columns = [normalize_text(col) for col in df.columns]
results = {term: {"col_matches": [], "cell_matches": []} for term in search_terms}
for term in search_terms:
regex = build_flexible_regex(term)
# Search in column names
for col_idx, col_text in enumerate(df.columns):
norm_col = normalize_text(col_text)
if regex.search(norm_col):
results[term]["col_matches"].append(col_idx)
# Search in top N rows
for row_idx in range(min(3, len(df))):
for col_idx in range(len(df.columns)):
cell_text = normalize_text(df.iat[row_idx, col_idx])
if regex.search(cell_text):
results[term]["cell_matches"].append((row_idx, col_idx))
return results
"""def generate_current_table_without_cropping(clm_idx, clmn_name, df):
selected_df = df.iloc[:, clm_idx]
print("hello I generated the selected columns table without cropping")
selected_df.columns = clmn_name
return selected_df"""
def generate_current_table_without_cropping(clm_idx,df):
selected_df = df.iloc[:, clm_idx]
print("hello I generated the selected columns table without cropping")
return selected_df
def crop_rename_table(indices, clmn_name, clmn_idx,df):
#crop_at = (max(set(indices), key=indices.count)) + 1
crop_at = max(indices) + 1
df = df.iloc[crop_at:] # Starts from row index 5 (zero-based index)
df.reset_index(drop=True, inplace=True) # Reset index after cropping
slctd_clms = df.iloc[:, clmn_idx] # Select columns by index
slctd_clms.columns = clmn_name # Rename selected columns
return slctd_clms
def clean_column_row(row):
return [re.sub(r'^\d+-\s*', '', str(cell)) for cell in row]
def details_in_another_table(clmn_name, clmn_idx, current_dfs, dfs):
matching_dfs = [
dff for dff in dfs
if dff is not current_dfs and current_dfs.shape[1] == dff.shape[1]
]
if not matching_dfs:
return None
updated_dfs = []
for dff in matching_dfs:
selected_dff = dff.iloc[:, clmn_idx].copy()
# Clean the column names and make them a row
cleaned_header = clean_column_row(selected_dff.columns.tolist())
col_names_as_row = pd.DataFrame([cleaned_header])
# Rename columns
selected_dff.columns = clmn_name
col_names_as_row.columns = clmn_name
# Combine the cleaned row with data
temp_df = pd.concat([col_names_as_row, selected_dff], ignore_index=True)
updated_dfs.append(temp_df)
combined_df = pd.concat(updated_dfs, ignore_index=True)
return combined_df
def map_user_input_to_standard_labels(user_inputs):
patterns = {
'door_id': r'\b(?:door\s*)?(?:id|no|number)\b|\bdoor\s*name\b',
'door_type': r'\b(?:\S+\s+)?door\s*type\b|\btype(?:\s+\w+)?\b',
'structural_opening': r'\bstructural\s+opening\b',
'width': r'\bwidth\b',
'height': r'\bheight\b',
}
def normalize(text):
return re.sub(r'\s+', ' ', text.strip(), flags=re.MULTILINE).lower()
mapped = {}
for item in user_inputs:
normalized_item = normalize(item)
matched = False
for label, pattern in patterns.items():
if label not in mapped and re.search(pattern, normalized_item, re.IGNORECASE):
mapped[label] = item
matched = True
break
#if not matched:
# mapped[normalized_item] = None
return mapped
def analyse_cell_columns(cell_columns_appearance):
cell_matches = []
col_matches = []
for key in cell_columns_appearance.keys():
if len(cell_columns_appearance[key]['cell_matches']) >0:
cell_matches.append(cell_columns_appearance[key]['cell_matches'][0])
if len(cell_columns_appearance[key]['col_matches']) >0:
col_matches.append(cell_columns_appearance[key]['col_matches'][0])
return cell_matches, col_matches
# when column names are located in the cells
def get_row_column_indices(cell_clmn_indx):
row_index = []
column_index = []
for t in cell_clmn_indx:
row_index.append(t[0])
column_index.append(t[1])
return row_index, column_index
# when column names are located in the coulmns itself
def get_column_index(col_matches):
idx = []
for t in col_matches:
idx.append(t)
return idx
def extract_tables(schedule):
doc = fitz.open("pdf",schedule)
for page in doc:
tabs = page.find_tables()
dfs = []
for tab in tabs:
df = tab.to_pandas()
dfs.append(df)
return dfs
def get_selected_columns(dfs, user_patterns):
selected_columns = []
selected_columns_new = None # Initialize selected_columns_new to None
for i in range(len(dfs)):
cell_columns_appearance = flexible_search(dfs[i], user_patterns)
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance)
if len(user_patterns) == 2:
clmn_name = ["door_id", "door_type"]
if len(user_patterns) == 4:
clmn_name = ["door_id", "door_type", "width", "height"]
if len(user_patterns) == 3:
clmn_name = ["door_id", "door_type", "structural opening"]
if len(cell_matches) == 0 and len(col_matches) == 0:
print(f"this is df {i}, SEARCH IN ANOTHER DF")
else:
#IN COLUMNS
if len(col_matches) == len(user_patterns):
column_index_list = get_column_index(col_matches)
print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany")
print(column_index_list)
if len(dfs[i]) <10:
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs)
#details in the same table
if len(dfs[i]) >10:
selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i])
#break
#IN CELLS
if len(cell_matches) == len(user_patterns):
row_index_list, column_index_list = get_row_column_indices(cell_matches)
print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany")
#details in another table
if len(dfs[i]) <10:
#selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs)
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs)
break
#details in the same table
if len(dfs[i]) >10:
print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)")
selected_columns_new = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i])
break
return selected_columns_new
def separate_main_secondary(input_user_clmn_names):
main_info = input_user_clmn_names[:4]
secondary_info = input_user_clmn_names[4:]
return main_info, secondary_info
# take main info
def get_column_name(user_input_m):
#get empty indices
empty_indices = [i for i, v in enumerate(user_input_m) if v == '']
# fixed column names
fixed_list = ["door_id", "door_type", "width", "height"]
for i in range(len(empty_indices)):
if empty_indices[i] == 3 and empty_indices[i - 1] == 2:
fixed_list[2] = ""
if empty_indices[i] == 3 and not empty_indices[i - 1] == 2:
fixed_list[2] = "structural_opening"
fixed_list[empty_indices[i]] = ""
#finalize the column name structure
clmn_name_m = [i for i in fixed_list if i]
return clmn_name_m
# take secondary info
def get_column_name_secondary(user_input_m):
#get empty indices
empty_indices = [i for i, v in enumerate(user_input_m) if v == '']
# fixed column names
fixed_list = ["fire_rate", "acoustic_rate"]
for i in range(len(empty_indices)):
fixed_list[empty_indices[i]] = ""
#finalize the column name structure
clmn_name_m = [i for i in fixed_list if i]
return clmn_name_m
### byte type not path
def extract_tables_model(schedule_byte):
# Set your Azure credentials
endpoint = "https://tabledetection2.cognitiveservices.azure.com/"
key = "5lr94dODMJihbGOMw2Vdz29zXRBiqt528fSGoGmzSJHTrWtHSnRdJQQJ99BEACYeBjFXJ3w3AAALACOGBANH"
# Create client
client = DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key))
poller = client.begin_analyze_document("prebuilt-layout", document=schedule_byte)
# Get result
result = poller.result()
#print(result)
import pandas as pd
tables = []
for table in result.tables:
max_cols = max(cell.column_index for cell in table.cells) + 1
max_rows = max(cell.row_index for cell in table.cells) + 1
table_data = [["" for _ in range(max_cols)] for _ in range(max_rows)]
for cell in table.cells:
table_data[cell.row_index][cell.column_index] = cell.content
df = pd.DataFrame(table_data)
tables.append(df)
return tables
#handling both main and secondary info together in one table
def get_selected_columns_all(dfs, user_patterns):
selected_columns = []
selected_columns_new = None # Initialize selected_columns_new to None
for i in range(len(dfs)):
main_info, secondary_info = separate_main_secondary(user_patterns)
clmn_name_main = get_column_name(main_info)
non_empty_main_info = [item for item in main_info if item]
clmn_name_secondary = get_column_name_secondary(secondary_info)
non_empty_secondary_info = [item for item in secondary_info if item]
clmn_name = clmn_name_main + clmn_name_secondary
non_empty_info = non_empty_main_info + non_empty_secondary_info
#print(f"main info: {main_info}")
print(f"clmn name: {clmn_name}")
print(f"non-empty info: {non_empty_info}")
#print(f"length of non-empty info: {len(non_empty_main_info)}")
cell_columns_appearance = flexible_search(dfs[i], non_empty_info)
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance)
print(f"length of cell_matches: {len(cell_matches)}")
print(f"cell_matches: {cell_matches}")
#clmn_name = map_user_input_to_standard_labels(user_patterns)
#if len(clmn_name) < len(user_patterns):
print(clmn_name)
if len(cell_matches) == 0 and len(col_matches) == 0:
print(f"this is df {i}, SEARCH IN ANOTHER DF")
else:
#IN COLUMNS
if len(col_matches) == len(non_empty_info):
column_index_list = get_column_index(col_matches)
print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany")
#print(len(clm_idx))
#details in another table
print(column_index_list)
if len(dfs[i]) <10:
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs)
#break
#other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs)
#details in the same table
if len(dfs[i]) >10:
selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i])
#break
#IN CELLS
if len(cell_matches) == len(non_empty_info):
row_index_list, column_index_list = get_row_column_indices(cell_matches)
print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany")
#details in another table
#if len(dfs[i]) <2:
#selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs)
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs)
selected_columns_new2 = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i])
selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True)
break
#other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs)
##details in the same table
#if len(dfs[i]) >2:
# #print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)")
#break
return selected_columns_new
#for new dictionary logic
def get_selected_columns_all(dfs, user_patterns):
selected_columns = []
selected_columns_new = None # Initialize selected_columns_new to None
for i in range(len(dfs)):
extra_info = user_patterns[6:]
main_info, secondary_info = separate_main_secondary(user_patterns)
clmn_name_main = get_column_name(main_info)
non_empty_main_info = [item for item in main_info if item]
clmn_name_secondary = get_column_name_secondary(secondary_info)
non_empty_secondary_info = [item for item in secondary_info if item]
#clmn_name = clmn_name_main + clmn_name_secondary
clmn_name = clmn_name_main + clmn_name_secondary + extra_info
non_empty_info = non_empty_main_info + non_empty_secondary_info
#print(f"main info: {main_info}")
print(f"clmn name: {clmn_name}")
print(f"non-empty info: {non_empty_info}")
#print(f"length of non-empty info: {len(non_empty_main_info)}")
cell_columns_appearance = flexible_search(dfs[i], non_empty_info)
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance)
print(f"length of cell_matches: {len(cell_matches)}")
print(f"cell_matches: {cell_matches}")
print(f"col_matches: {col_matches}")
#clmn_name = map_user_input_to_standard_labels(user_patterns)
#if len(clmn_name) < len(user_patterns):
print(clmn_name)
if len(cell_matches) == 0 and len(col_matches) == 0:
print(f"this is df {i}, SEARCH IN ANOTHER DF")
else:
#IN COLUMNS
if len(col_matches) == len(non_empty_info):
column_index_list = get_column_index(col_matches)
print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany")
#print(len(clm_idx))
#details in another table
print(column_index_list)
#if len(dfs[i]) <10:
#break
#other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs)
#details in the same table
#if len(dfs[i]) >10:
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs)
selected_columns_new2 = generate_current_table_without_cropping(column_index_list,dfs[i])
selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True)
selected_columns_new.columns = clmn_name # must match number of columns
#break
#IN CELLS
if len(cell_matches) == len(non_empty_info):
row_index_list, column_index_list = get_row_column_indices(cell_matches)
print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany")
#details in another table
#if len(dfs[i]) <2:
#selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs)
print(f"column names: {clmn_name}")
print(f"column index list: {column_index_list}")
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs)
selected_columns_new2 = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i])
selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True)
break
#other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs)
##details in the same table
#if len(dfs[i]) >2:
# #print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)")
#break
return selected_columns_new
# 3ayz akhaleehaa te search fel selected_columns column names nafsaha
# 7ab2a 3ayz a3raf bardo maktooba ezay fel df el 7a2e2ya (akeed za ma el user medakhalha bezabt)
def get_st_op_pattern(selected_columns, user_input):
target = 'structural_opening'
if target in selected_columns.columns:
name = user_input[2]
return name
return None
def find_text_in_plan(label, x):
substring_coordinates = []
words = []
point_list = []
#None, None, None
for tpl in x:
if tpl[4] == label:
substring_coordinates.append(calculate_midpoint(tpl[0],tpl[1],tpl[2],tpl[3]))# for pdf
point_list.append(calculate_midpoint(tpl[1],tpl[0],tpl[3],tpl[2]))# for rotated
words.append(tpl[4])
return substring_coordinates, words, point_list
def get_selected_columns_by_index(df, column_index_list, user_patterns):
selected_df = df.iloc[:, column_index_list]
# Rename columns to match the structure of the clr_dictionary
main_info, secondary_info = separate_main_secondary(user_patterns)
clmn_name_main = get_column_name(main_info)
clmn_name_secondary = get_column_name_secondary(secondary_info)
clmn_name = clmn_name_main + clmn_name_secondary
print(f"clmn_name from the function el 3amla moshkela: {clmn_name}")
selected_df.columns = clmn_name
return selected_df
## Get the column indices from extract_tables(schedule)
def get_column_indices_from_dfs_normal(dfs, user_patterns):
for i in range(len(dfs)):
main_info, secondary_info = separate_main_secondary(user_patterns)
clmn_name_main = get_column_name(main_info)
non_empty_main_info = [item for item in main_info if item]
clmn_name_secondary = get_column_name_secondary(secondary_info)
non_empty_secondary_info = [item for item in secondary_info if item]
clmn_name = clmn_name_main + clmn_name_secondary
non_empty_info = non_empty_main_info + non_empty_secondary_info
cell_columns_appearance = flexible_search(dfs[i], non_empty_info)
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance)
if len(cell_matches) == 0 and len(col_matches) == 0 and i < len(dfs) - 1:
continue
elif len(cell_matches) == 0 and len(col_matches) == 0:
column_index_list = None
else:
#IN COLUMNS
if len(col_matches) == len(non_empty_info):
column_index_list = get_column_index(col_matches)
print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany")
#print(f"column index list: {column_index_list}")
break
#IN CELLS
if len(cell_matches) == len(non_empty_info):
row_index_list, column_index_list = get_row_column_indices(cell_matches)
print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany")
#print(f"column index list: {column_index_list}")
break
return column_index_list
def find_missing_columns(complete_list, non_complete_list):
def normalize_text(text):
if not isinstance(text, str):
return ""
text = re.sub(r'\s+', '', text) # Remove all whitespace
return text.lower()
def normalize_text(text):
"""
Normalize text by removing all whitespace, brackets, and converting to lowercase.
"""
if not isinstance(text, str):
return ""
# Remove all whitespace characters (spaces, tabs, newlines)
text = re.sub(r'\s+', '', text)
# Remove brackets of any type
text = re.sub(r'[\(\)\[\]\{\}]', '', text)
return text.lower()
complete_list = complete_list
non_complete = non_complete_list
# Normalize non_complete just once for speed
normalized_non_complete = [normalize_text(item) for item in non_complete]
missing = []
for item in complete_list:
normalized_item = normalize_text(item)
if normalized_item not in normalized_non_complete:
missing.append(item)
#delete empty fields as it is the 6 fixed fields approach
missing = [item for item in missing if item]
#print(f"{missing} can't be found in the schedule, make sure you entered it right or try entering the first row information instead of the column names")
return missing
# Returns the columns the code failed to locate on the schedule
def check_missing(dfs, user_patterns):
all_words = []
for i in range(len(dfs)):
main_info, secondary_info = separate_main_secondary(user_patterns)
clmn_name_main = get_column_name(main_info)
non_empty_main_info = [item for item in main_info if item]
clmn_name_secondary = get_column_name_secondary(secondary_info)
non_empty_secondary_info = [item for item in secondary_info if item]
clmn_name = clmn_name_main + clmn_name_secondary
non_empty_info = non_empty_main_info + non_empty_secondary_info
cell_columns_appearance = flexible_search(dfs[i], non_empty_info)
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance)
words = [dfs[i].iloc[row, col] for row, col in cell_matches]
all_words.append(words)
found_words = max(all_words, key=len)
print(found_words)
missings = find_missing_columns(user_patterns, found_words)
return missings
# get the index of dataframe that has the maximum column matches in the dfs from model table detection
def get_df_index(dfs, user_patterns):
df_matches = []
for i in range(len(dfs)):
main_info, secondary_info = separate_main_secondary(user_patterns)
clmn_name_main = get_column_name(main_info)
non_empty_main_info = [item for item in main_info if item]
clmn_name_secondary = get_column_name_secondary(secondary_info)
non_empty_secondary_info = [item for item in secondary_info if item]
clmn_name = clmn_name_main + clmn_name_secondary
non_empty_info = non_empty_main_info + non_empty_secondary_info
cell_columns_appearance = flexible_search(dfs[i], non_empty_info)
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance)
if len(cell_matches) == 0 and len(col_matches) == 0:
continue
else:
column_index_list_from_columns = get_column_index(col_matches)
row_index_list, column_index_list_from_cells = get_row_column_indices(cell_matches)
if len(column_index_list_from_columns) > len(column_index_list_from_cells):
df_matches.append((column_index_list_from_columns,i))
else:
df_matches.append((column_index_list_from_cells,i))
longest_list = max(df_matches, key=lambda x: len(x[0]))
#index of the longest list will be the df number
index_longest_list = longest_list[1]
return index_longest_list
def get_word_locations_plan(flattened_list, plan_texts):
locations = []
not_found = []
if len(flattened_list[0]) == 2:
for lbl, clr in flattened_list:
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts)
if len(location) ==0:
not_found.append(lbl)
locations.append((location, lbl, clr))
if len(flattened_list[0]) == 3:
for lbl, w, clr in flattened_list:
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts)
if len(location) ==0:
not_found.append(lbl)
locations.append((location, lbl, clr, w))
if len(flattened_list[0]) == 4:
for lbl, w, h, clr in flattened_list:
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts)
if len(location) ==0:
not_found.append(lbl)
locations.append((location, lbl, clr, w, h))
return locations, not_found
def get_repeated_labels(locations):
seen_labels = set()
repeated_labels = set()
for item in locations:
label = item[1]
if label in seen_labels:
repeated_labels.add(label)
else:
seen_labels.add(label)
return repeated_labels
def get_cleaned_data(locations):
processed = defaultdict(int)
new_data = []
if len(locations[0]) == 3:
for coords, label, color in locations:
if len(coords)>1:
index = processed[label] % len(coords) # Round-robin indexing
new_coord = [coords[index]] # Pick the correct coordinate
new_data.append((new_coord, label, color))
processed[label] += 1 # Move to the next coordinate for this label
if len(coords)==1:
new_data.append((coords, label, color))
if len(locations[0]) == 4:
for coords, label, color, w in locations:
if len(coords)>1:
index = processed[label] % len(coords) # Round-robin indexing
new_coord = [coords[index]] # Pick the correct coordinate
new_data.append((new_coord, label, color, w))
processed[label] += 1 # Move to the next coordinate for this label
if len(coords)==1:
new_data.append((coords, label, color, w))
if len(locations[0]) == 5:
for coords, label, color, w, h in locations:
if len(coords)>1:
index = processed[label] % len(coords) # Round-robin indexing
new_coord = [coords[index]] # Pick the correct coordinate
new_data.append((new_coord, label, color, w, h))
processed[label] += 1 # Move to the next coordinate for this label
if len(coords)==1:
new_data.append((coords, label, color, w, h))
return new_data
# law 0.5 maslan tetkatab we law mesh keda yesheel el decimal point
def get_width_info_tobeprinted(new_data):
width_info_tobeprinted = []
if len(new_data[0]) < 4:
for _,_,_, in new_data:
width_info_tobeprinted.append("N/A mm wide x N/A mm high")
if len(new_data[0]) == 4:
for _,_,_, w in new_data:
#w = re.sub(r",", "", w)
#w = int(float(w))
width_info_tobeprinted.append(w)
if len(new_data[0]) == 5:
for _,_,_, w,h in new_data:
w = re.sub(r",", "", w)
h = re.sub(r",", "", h)
#if w == "N/A":
#if w.isalpha():
if is_not_number(w):
w = w
else:
if float(w).is_integer():
w = int(float(w))
else:
w = w
#if h == "N/A":
#if h.isalpha():
if is_not_number(h):
h = h
else:
if float(h).is_integer():
h = int(float(h))
else:
h = h
width_info_tobeprinted.append(f"{w} mm wide x {h} mm high")
return width_info_tobeprinted
def clean_dimensions(text):
# Remove commas and "mm"
text = re.sub(r'[,\s]*mm', '', text) # Remove "mm" with optional spaces or commas before it
text = text.replace(",", "") # Remove remaining commas if any
return text
def get_cleaned_width(width_info_tobeprinted):
cleaned_width = []
for w in width_info_tobeprinted:
cleaned_width.append(clean_dimensions(w))
return cleaned_width
def get_widths_bb_format(cleaned_width, kelma):
pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b"
match = re.search(pattern, kelma)
widths = []
for widthaa in cleaned_width:
index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x"))
width_name = widthaa[:index]
height_name = widthaa[index+1:]
width_name = int(float(width_name))
height_name = int(float(height_name))
if match:
full_text = f"{width_name} mm wide x {height_name} mm high"
else:
full_text = f"{height_name} mm wide x {width_name} mm high"
widths.append(full_text)
return widths
def is_not_number(s: str) -> bool:
try:
float(s) # accepts ints, floats, scientific notation
return False # it *is* a number
except ValueError:
return True # not a number
def get_width_info_tobeprinted_secondary(new_data, main_info, secondary_info):
width_info_tobeprinted = []
secondary_info_tobeprinted = []
if len(main_info) == 2 and len(secondary_info) == 1:
for coords, label, acous, color in new_data:
secondary_info_tobeprinted.append(acous)
width_info_tobeprinted.append("N/A mm wide x N/A mm high")
if len(main_info) == 2 and len(secondary_info) == 2:
for coords, label, acous, fire, color in new_data:
secondary_info_tobeprinted.append((acous, fire))
width_info_tobeprinted.append("N/A mm wide x N/A mm high")
if len(main_info) == 3 and len(secondary_info) == 1:
for coords, label, width, acous, color in new_data:
width_info_tobeprinted.append(width)
secondary_info_tobeprinted.append(acous)
if len(main_info) == 3 and len(secondary_info) == 2:
for coords, label, width, acous, fire, color in new_data:
width_info_tobeprinted.append(width)
secondary_info_tobeprinted.append((acous, fire))
if len(main_info) == 4 and len(secondary_info) == 1:
for coords, label, width, height, acous, color in new_data:
w = re.sub(r",", "", width)
h = re.sub(r",", "", height)
#if w.isalpha():
if is_not_number(w):
w = w
else:
if float(w).is_integer():
w = int(float(w))
else:
w = w
#if h == "N/A":
#if h.isalpha():
if is_not_number(h):
h = h
else:
if float(h).is_integer():
h = int(float(h))
else:
h = h
width_info_tobeprinted.append(f"{w} mm wide x {h} mm high")
secondary_info_tobeprinted.append((acous))
if len(main_info) == 4 and len(secondary_info) == 2:
for coords, label, width, height, acous, fire, color in new_data:
print(type(width))
print(type(height))
w = re.sub(r",", "", width)
h = re.sub(r",", "", height)
#if w == "N/A":
#if w.isalpha():
if is_not_number(w):
w = w
else:
if float(w).is_integer():
w = int(float(w))
else:
w = w
#if h == "N/A":
#if h.isalpha():
if is_not_number(h):
h = h
else:
if float(h).is_integer():
h = int(float(h))
else:
h = h
width_info_tobeprinted.append(f"{w} mm wide x {h} mm high")
secondary_info_tobeprinted.append((acous, fire))
return width_info_tobeprinted, secondary_info_tobeprinted
def get_word_locations_plan_secondary(flattened_list, plan_texts, main_info, secondary_info):
#hena fe 7alet en keda keda fe secondary information
locations = []
not_found = []
len_main = len(main_info) #3 or #4 #sometimes maybe 2
len_secondary = len(secondary_info) #2 or #1
if len_main == 2 and len_secondary == 2:
for lbl, clr, acoustic, fire in flattened_list:
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts)
if len(location) ==0:
not_found.append(lbl)
locations.append((location, lbl, clr, acoustic, fire))
if len_main == 2 and len_secondary == 1:
for lbl, clr, acoustic in flattened_list:
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts)
if len(location) ==0:
not_found.append(lbl)
locations.append((location, lbl, clr, acoustic))
if len_main == 3 and len_secondary == 2:
for lbl, w, clr, acoustic, fire in flattened_list:
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts)
if len(location) ==0:
not_found.append(lbl)
locations.append((location, lbl, w, clr, acoustic, fire))
if len_main == 3 and len_secondary == 1:
for lbl, w, clr, acoustic in flattened_list:
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts)
if len(location) ==0:
not_found.append(lbl)
locations.append((location, lbl, w, clr, acoustic))
if len_main == 4 and len_secondary == 2:
for lbl, w, h, clr, acoustic, fire in flattened_list:
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts)
if len(location) ==0:
not_found.append(lbl)
locations.append((location, lbl, w, h, clr, acoustic, fire))
if len_main == 4 and len_secondary == 1:
for lbl, w, h, clr, acoustic in flattened_list:
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts)
if len(location) ==0:
not_found.append(lbl)
locations.append((location, lbl, w, h, clr,acoustic))
return locations, not_found
### newest, accept combined table
def get_similar_colors_all(selected_columns_new):
def generate_rgb():
return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
unique_keys = selected_columns_new['door_type'].unique()
key_colors = {key: generate_rgb() for key in unique_keys}
#Column fields
clmns_fields = selected_columns_new.columns.to_list()
def col_template():
d = {
'values': [],
'color_annot': None
}
for field in clmns_fields:
d[field] = []
return d
col_dict = defaultdict(col_template)
for _, row in selected_columns_new.iterrows():
key = row['door_type']
col_dict[key]['values'].append(row['door_id'])
for field in clmns_fields:
col_dict[key][field].append(row.get(field, None))
col_dict[key]['color_annot'] = key_colors[key]
return dict(col_dict)
### newest, accept combined table
def get_flattened_tuples_list_all(col_dict):
exclude_fields = ['door_type', 'values']
flattened_list = []
for values_dict in col_dict.values():
# All fields that are lists and not in the excluded fields
list_fields = [k for k, v in values_dict.items()
if isinstance(v, list) and k not in exclude_fields]
n_rows = len(values_dict[list_fields[0]]) if list_fields else 0
for i in range(n_rows):
tuple_row = tuple(values_dict[field][i] for field in list_fields) + (values_dict['color_annot'],)
flattened_list.append(tuple_row)
return flattened_list
def get_flattened_tuples_list_no_doortype(selected_columns):
flattened_list_no_color = list(selected_columns.itertuples(name=None, index=False))
col = (0,0,255)
new_fl_list = []
for tu in flattened_list_no_color:
new_fl_list.append(tu + (col,))
return new_fl_list
#SECONDARY
def get_cleaned_data_secondary(locations, main_info, secondary_info):
processed = defaultdict(int)
new_data = []
if len(main_info) == 2 and len(secondary_info) == 1:
for coords, label, color, acous in locations:
if len(coords)>1:
index = processed[label] % len(coords) # Round-robin indexing
new_coord = [coords[index]] # Pick the correct coordinate
new_data.append((new_coord, label, color, acous))
processed[label] += 1 # Move to the next coordinate for this label
if len(coords)==1:
new_data.append((coords, label, color, acous))
if len(main_info) == 2 and len(secondary_info) == 2:
for coords, label, color, acous, fire in locations:
if len(coords)>1:
index = processed[label] % len(coords) # Round-robin indexing
new_coord = [coords[index]] # Pick the correct coordinate
new_data.append((new_coord, label, color, acous, fire))
processed[label] += 1 # Move to the next coordinate for this label
if len(coords)==1:
new_data.append((coords, label, color, acous, fire))
if len(main_info) == 3 and len(secondary_info) == 1:
for coords, label, width, color, acous in locations:
if len(coords)>1:
index = processed[label] % len(coords) # Round-robin indexing
new_coord = [coords[index]] # Pick the correct coordinate
new_data.append((new_coord, label, width, color, acous))
processed[label] += 1 # Move to the next coordinate for this label
if len(coords)==1:
new_data.append((coords, label, width, color, acous))
if len(main_info) == 3 and len(secondary_info) == 2:
for coords, label, width, color, acous, fire in locations:
if len(coords)>1:
index = processed[label] % len(coords) # Round-robin indexing
new_coord = [coords[index]] # Pick the correct coordinate
new_data.append((new_coord, label, width, color, acous, fire))
processed[label] += 1 # Move to the next coordinate for this label
if len(coords)==1:
new_data.append((coords, label, width, color, acous, fire))
if len(main_info) == 4 and len(secondary_info) == 1:
for coords, label, width, height, color, acous in locations:
if len(coords)>1:
index = processed[label] % len(coords) # Round-robin indexing
new_coord = [coords[index]] # Pick the correct coordinate
new_data.append((new_coord, label, width, height, color, acous))
processed[label] += 1 # Move to the next coordinate for this label
if len(coords)==1:
new_data.append((coords, label, width, height, color, acous))
if len(main_info) == 4 and len(secondary_info) == 2:
for coords, label, width, height, color, acous, fire in locations:
if len(coords)>1:
index = processed[label] % len(coords) # Round-robin indexing
new_coord = [coords[index]] # Pick the correct coordinate
new_data.append((new_coord, label, width, height, color, acous, fire))
processed[label] += 1 # Move to the next coordinate for this label
if len(coords)==1:
new_data.append((coords, label, width, height, color, acous, fire))
return new_data
def merge_pdf_bytes_list(pdfs):
writer = PdfWriter()
for pdf_bytes in pdfs:
pdf_stream = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_stream)
for page in reader.pages:
writer.add_page(page)
output_stream = io.BytesIO()
writer.write(output_stream)
output_stream.seek(0)
return output_stream.read()
def calculate_bounding_rect_count(vertices,padding):
x, y = vertices[0]
xmin = x - padding
ymin = y - padding
xmax = x + padding
ymax = y + padding
return [xmin, ymin, xmax, ymax]
def rgb_string_to_hex(rgb_string):
r, g, b = map(float, rgb_string.strip().split())
return '#{:02X}{:02X}{:02X}'.format(int(r * 255), int(g * 255), int(b * 255))
def generate_annotation_xml_block_count(vertices, area_text, author, custom_data: dict, column_order: list, index: int,
label: str = '',height:str='',width:str='',
color:str='',countstyle:str='',countsize:str=''):
now = datetime.datetime.utcnow()
mod_date = now.strftime("D:%Y%m%d%H%M%S+00'00'")
creation_date = now.isoformat() + 'Z'
id_str = "fitz-" + uuid.uuid4().hex[:4].upper()
vert_str = ' '.join([f'{x:.4f}' for point in vertices for x in point])
ordered_column_values = [f'({custom_data.get(col, "")})' for col in column_order]
bsi_column_data = ''.join(ordered_column_values)
type_internal= 'Bluebeam.PDF.Annotations.AnnotationMeasureCount'
subject ='Count Measurement'
padding=10
rectvertices=calculate_bounding_rect_count(vertices,padding)
bbmeasure = '''<</Type/Measure
/Subtype/RL
/R(1 mm = 1 mm)
/X[<</Type/NumberFormat/U(mm)/C 0.3527778/D 100/SS()>>]
/D[<</Type/NumberFormat/U(mm)/C 1/D 100/SS()>>]
/A[<</Type/NumberFormat/U(sq mm)/C 1/D 100/FD true/SS()>>]
/T[<</Type/NumberFormat/U(\\260)/C 1/D 100/FD true/PS()/SS()>>]
/V[<</Type/NumberFormat/U(cu mm)/C 1/D 100/FD true/SS()>>]
/TargetUnitConversion 0.3527778>>'''
raw_text = f'''<<
/Version 1
/DS(font: Helvetica 12pt; text-align:center; line-height:13.8pt; color:#FF0000)
/CountStyle{countstyle}
/CountScale {countsize}
/MeasurementTypes 128
/BBMeasure{bbmeasure}
/NumCounts {area_text}
/AP<</N/BBObjPtr_{uuid.uuid4().hex.upper()}>>
/IT/PolygonCount
/Vertices[{vert_str}]
/IC[{color}]
/T({author})
/CreationDate({mod_date})
/BSIColumnData[{bsi_column_data}]
/RC(<?xml version="1.0"?><body xmlns:xfa="http://www.xfa.org/schema/xfa-data/1.0/" xfa:contentType="text/html" xfa:APIVersion="BluebeamPDFRevu:2018" xfa:spec="2.2.0" style="font:Helvetica 12pt; text-align:center; line-height:13.8pt; color:#FF0000" xmlns="http://www.w3.org/1999/xhtml"><p>{area_text}</p></body>)
/Label({label})
/Height {height}
/Width {width}
/Subj({subject})
/NM({id_str})
/Subtype/Polygon
/Rect[{rectvertices[0]} {rectvertices[1]} {rectvertices[2]} {rectvertices[3]}]
/Contents({area_text})
/F 4
/C[{color}]
/BS<</Type/Border/W 0/S/S>>
/M({mod_date})
>>'''.encode('utf-8')
compressed = zlib.compress(raw_text)
base64_raw = base64.b16encode(compressed).lower().decode()
annotation = Element('Annotation')
SubElement(annotation, 'Page') ############## newline #####################
SubElement(annotation, 'Contents').text = area_text
SubElement(annotation, 'ModDate').text = creation_date
SubElement(annotation, 'Color').text = rgb_string_to_hex(color) ############## newline #####################
SubElement(annotation, 'Type').text = 'Polygon'
SubElement(annotation, 'ID').text = id_str
SubElement(annotation, 'TypeInternal').text = type_internal
SubElement(annotation, 'Raw').text = base64_raw
SubElement(annotation, 'Index').text = str(index)
custom = SubElement(annotation, 'Custom')
for key, value in custom_data.items():
SubElement(custom, key).text = value
SubElement(annotation, 'Subject').text = subject
SubElement(annotation, 'CreationDate').text = creation_date
SubElement(annotation, 'Author').text = author
SubElement(annotation, 'Label').text = label
SubElement(annotation, 'Height').text = height
SubElement(annotation, 'Width').text = width
return annotation
def save_multiple_annotations_count_bax(annotations, output_path, column_order,pdfWidth,pdfHeight, num_pages): ##new parameter for page number handling
"""
annotations: list of dicts, each with:
- vertices: [x, y]
- text: str (label)
- author: ADR
- custom_data: dict of custom field values
- type_internal: str (e.g., Bluebeam.PDF.Annotations.AnnotationMeasureCount)
- subject: str (e.g., Count Measurement)
"""
doc = Element('Document', Version='1')
#group annotations by page number
annotations_by_page = defaultdict(list)
for ann in annotations:
page_num = ann.get('page', 1)
annotations_by_page[page_num].append(ann)
# Loop through ALL pages
# for page_index, (page_num, page_annotations) in enumerate(sorted(annotations_by_page.items())):
for page_index in range(num_pages): ##new line for page handling
page = SubElement(doc, 'Page', Index=str(page_index))
SubElement(page, 'Label').text = str(page_index + 1) ##new line for page handling
SubElement(page, 'Width').text = str(pdfWidth)
SubElement(page, 'Height').text = str(pdfHeight)
#adding annotations only if they exist
for i, ann in enumerate(annotations_by_page.get(page_index + 1, [])): ## adjusted for page handling
annotation_xml = generate_annotation_xml_block_count(
vertices=ann['vertices'],
area_text=ann['text'],
author=ann['author'],
custom_data=ann['custom_data'],
column_order=column_order,
index=i,
label=ann.get('label', 'label1'),
height=ann.get('height', '123'),
width=ann.get('width', '123'),
color=ann.get('color', ''),
countstyle=ann.get('countstyle', ''),
countsize=ann.get('countsize','')
)
annotation_xml.find('Page').text = str(page_index+1) ## adjusted for page handling
page.append(annotation_xml)
# pretty_xml = parseString(tostring(doc)).toprettyxml(indent=" ")
# with open(output_path, 'w', encoding='utf-8') as f:
# f.write(pretty_xml)
pretty_xml= tostring(doc, encoding="unicode", method="xml")
print(f"Saved {len(annotations)} annotations to {output_path}")
return pretty_xml
#templates of countstyles so u can call e.g. CountStyles['Circle']
CountStyles = {
'Circle': '/Circle',
'Diamond':'/Diamond',
'Triangle':'/Triangle',
'Square':'/Square',
'Checkmark':'/Checkmark',
}
def convert_to_bytes(input_pdf_path):
with open(input_pdf_path, "rb") as file:
original_pdf_bytes = file.read()
return original_pdf_bytes
def mirrored_points(x, y, height_plan):
#'vertices': [[new_data[i][0][0][0], new_data[i][0][0][1]]],
mirrored = []
mirrored.append([x, height_plan - y])
return mirrored
def point_mupdf_to_pdf(x, y, page):
mediabox = page.mediabox
H = float(mediabox.height) # Use mediabox height, not rect height
pdf_x = mediabox.x0 + x
pdf_y = mediabox.y0 + (H - y)
return [[pdf_x, pdf_y]]
# Modified to adjust mirrored points
def create_bb_bax_secondary(new_data, widthat, heightat, secondary_tobeprinted, CountStyles, input_user_clmn_names, page_number, height_plan):
bax_annotations = []
for i in range(len(new_data)):
r,g,b = new_data[i][len(new_data[i])-1] # colorr
R = str(float(r/255))
G = str(float(g/255))
B = str(float(b/255))
#vertix = mirrored_points(new_data[i][0][0][0], new_data[i][0][0][1], height_plan)
vertix = point_mupdf_to_pdf(new_data[i][0][0][0], new_data[i][0][0][1], height_plan)
if input_user_clmn_names[4] and input_user_clmn_names[5]:
bax_annotations.append({
'vertices': vertix,
'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way
'author': 'ADR',
'custom_data': {'FireRating': secondary_tobeprinted[i][0], 'AcousticRating': secondary_tobeprinted[i][1], 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add )
'label': new_data[i][1], #change label to whatever u want
'Height': heightat[i], #for tameem to change - i added any values'
'Width':widthat[i],
'page' : page_number,
'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255)
'countstyle': CountStyles['Circle'],
'countsize':'0.8' #how big or small is the count icon
})
else:
# Fire mawgooda
if input_user_clmn_names[4]:
bax_annotations.append({
'vertices': vertix,
'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way
'author': 'ADR',
'custom_data': {'FireRating': secondary_tobeprinted[i], 'AcousticRating': 'N/A', 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add )
'label': new_data[i][1], #change label to whatever u want
'Height': heightat[i], #for tameem to change - i added any values'
'Width':widthat[i],
'page' : page_number,
'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255)
'countstyle': CountStyles['Circle'],
'countsize':'0.8' #how big or small is the count icon
})
elif input_user_clmn_names[5]:
bax_annotations.append({
'vertices': vertix,
'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way
'author': 'ADR',
'custom_data': {'FireRating': 'N/A', 'AcousticRating': secondary_tobeprinted[i], 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add )
'label': new_data[i][1], #change label to whatever u want
'Height': heightat[i], #for tameem to change - i added any values'
'Width':widthat[i],
'page' : page_number,
'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255)
'countstyle': CountStyles['Circle'],
'countsize':'0.8' #how big or small is the count icon
})
return bax_annotations
# Modified to adjust mirrored points
def create_bb_bax(new_data, widthat, heightat, CountStyles, page_number, height_plan):
bax_annotations = []
for i in range(len(new_data)):
#r,g,b = new_data[i][len(new_data[i])-2] # colorr
r,g,b = new_data[i][2] # colorr
R = str(float(r/255))
G = str(float(g/255))
B = str(float(b/255))
#vertix = mirrored_points(new_data[i][0][0][0], new_data[i][0][0][1], height_plan)
vertix = point_mupdf_to_pdf(new_data[i][0][0][0], new_data[i][0][0][1], height_plan)
bax_annotations.append({
'vertices': vertix,
'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way
'author': 'ADR',
'custom_data': {'FireRating': 'N/A', 'AcousticRating': 'N/A', 'Height_': heightat[i],'Width_': widthat[i]} , #identify custom colums here as( Column name: Text to add )
'label': new_data[i][1], #change label to whatever u want
'height': heightat[i], #for tameem to change - i added any values'
'width':widthat[i],
'page' : page_number,
'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255)
'countstyle': CountStyles['Circle'],
'countsize':'0.8' #how big or small is the count icon
})
return bax_annotations
def add_location(col_dict, plan_texts):
not_found = []
for key_outer, value_outer in col_dict.items():
locations = []
for id in value_outer['door_id']:
location, _,_ = find_text_in_plan(id, plan_texts)
if len(location) == 0:
not_found.append(id)
locations.append(location)
value_outer['location'] = locations
return col_dict, not_found
import pandas as pd
def _ensure_color_tuple(x):
if x is None or isinstance(x, tuple):
return x
try:
return tuple(x)
except Exception:
return x
def _ensure_list_of_tuples(val):
if val is None:
return []
if isinstance(val, tuple):
return [val]
if isinstance(val, list):
out = []
for item in val:
if item is None:
continue
if isinstance(item, tuple):
out.append(item)
elif isinstance(item, list):
out.append(tuple(item))
else:
try:
out.append(tuple(item))
except Exception:
pass
return out
try:
return [tuple(val)]
except Exception:
return []
def grouped_to_dataframe_dynamic(grouped, keep_group=False,
explode_locations=False,
drop_empty_locations=False):
rows = []
for group_key, block in grouped.items():
ids = block.get('door_id') or block.get('values') or []
list_lengths = [len(v) for v in block.values() if isinstance(v, list)]
n = max(list_lengths + [len(ids)]) if (list_lengths or ids) else 0
if n == 0:
continue
for i in range(n):
row = {}
door_id = ids[i] if i < len(ids) else f"{group_key}:{i}"
row['door_id'] = door_id
for k, v in block.items():
if k == 'values':
continue
val = (v[i] if isinstance(v, list) and i < len(v)
else (v if not isinstance(v, list) else None))
if k == 'color':
val = _ensure_color_tuple(val)
elif k == 'location':
val = _ensure_list_of_tuples(val)
row[k] = val
if keep_group:
row['source_group'] = group_key
rows.append(row)
df = pd.DataFrame(rows) # dynamic union of keys
# If there's a 'location' column, normalize + optionally drop empties / explode
if 'location' in df.columns:
df['location'] = df['location'].apply(_ensure_list_of_tuples)
if drop_empty_locations:
df = df[df['location'].map(lambda xs: len(xs) > 0)].reset_index(drop=True)
if explode_locations:
# after filtering empties, explode so each row has a single (x,y) tuple
df = df.explode('location', ignore_index=True)
return df
# Modify it to return widths and height from width, height columns
def get_width_clean_width_height(width_list, height_list):
widths = []
heights = []
for width in width_list:
w = re.sub(r",", "", width)
if is_not_number(w):
w = w
else:
if float(w).is_integer():
w = int(float(w))
else:
w = w
w = str(w)
widths.append(w)
for height in height_list:
h = re.sub(r",", "", height)
if is_not_number(h):
h = h
else:
if float(h).is_integer():
h = int(float(h))
else:
h = h
h = str(h)
heights.append(h)
return widths, heights
def get_widths_bb_format_st_op(cleaned_width, kelma):
pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b"
match = re.search(pattern, kelma)
widths = []
heights = []
for widthaa in cleaned_width:
index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x"))
width_name = widthaa[:index]
height_name = widthaa[index+1:]
width_name = int(float(width_name))
height_name = int(float(height_name))
if match:
full_text = f"{width_name} mm wide x {height_name} mm high"
width = width_name
height = height_name
else:
width = height_name
height = width_name
widths.append(width)
heights.append(height)
return widths, heights
# New for new dictionary logic
def create_bb_bax_new(df_points, CountStyles, page_number, height_plan):
bax_annotations = []
exclude = {"location", "color_annot"}
for _, row in df_points.iterrows():
rw = row
customDta = row.drop(labels=exclude, errors="ignore").to_dict()
r,g,b = rw['color_annot']
R = str(float(r/255))
G = str(float(g/255))
B = str(float(b/255))
x, y = rw['location']
vertix = point_mupdf_to_pdf(x, y, height_plan)
bax_annotations.append({
'vertices': vertix,
'text': '1', #number of counts in one time (in markup written as count 1) -> if u want to change it we can look for a way
'author': 'ADR',
'custom_data': customDta, #identify custom colums here as( Column name: Text to add )
'label': rw['door_id'], #change label to whatever u want
'page' : page_number,
'color':R+ ' '+G + ' '+B,# normalized (RGB --> R/255 G/255 B/255)
'countstyle': CountStyles['Circle'],
'countsize':'0.8' #how big or small is the count icon
})
return bax_annotations, customDta
#Handle missing widths or heights in some rows
def generate_separate_dimensions(widths):
widthat = []
heightat = []
#pattern = r'(\d+)\s*mm wide x\s*(\d+)\s*mm high'
pattern = r'(\d+(?:\.\d+)?)\s*mm wide x\s*(\d+(?:\.\d+)?)\s*mm high'
for s in widths:
match = re.match(pattern, s)
if match:
width = match.group(1)
height = match.group(2)
widthat.append(width)
heightat.append(height)
else:
widthat.append("N/A")
heightat.append("N/A")
return widthat, heightat
def generate_bluebeam_columns_raw(column_names):
"""
Generate BluebeamUserDefinedColumns XML as raw string, without headers or extra fields.
"""
root = Element("BluebeamUserDefinedColumns")
for idx, name in enumerate(column_names):
item = SubElement(root, "BSIColumnItem", Index=str(idx), Subtype="Text")
SubElement(item, "Name").text = name
SubElement(item, "DisplayOrder").text = str(idx)
SubElement(item, "Deleted").text = "False"
SubElement(item, "Multiline").text = "False"
# Convert to string and decode raw bytes
return tostring(root, encoding="unicode", method="xml")
def pick_approach(schedule, plan, searcharray, flag):
not_found_list = []
missings = []
no_tables = False
for p in plan:
for k in range(len(schedule)):
if flag == 1:
dfs = extract_tables(schedule[k])
if flag == 2:
dfs = extract_tables_model(schedule[k])
user_input_this_schedule = searcharray[k]
for j in range(len(user_input_this_schedule)):
user_input = user_input_this_schedule[j]
secondary_presence = False
if user_input[4] or user_input[5]:
secondary_presence = True
main_info_, secondary_info_ = separate_main_secondary(user_input)
main_info = [item for item in main_info_ if item]
secondary_info = [item for item in secondary_info_ if item]
selected_columns_combined = get_selected_columns_all(dfs, user_input)
if selected_columns_combined is None:
dfs_normal = extract_tables(schedule[k])
column_indices = get_column_indices_from_dfs_normal(dfs_normal, user_input)
if column_indices is None:
missing_clmns = check_missing(dfs, user_input)
missing_message = f"{missing_clmns} can't be extracted from table input {j+1} in schedule {k+1}"
missings.append(missing_message)
no_tables = True
continue # continue to the next user input
if len(dfs) == 1:
selected_columns_combined = get_selected_columns_by_index(dfs[0], column_indices, user_input)
if len(dfs) > 1:
index_df = get_df_index(dfs, user_input)
selected_columns_combined = get_selected_columns_by_index(dfs[index_df], column_indices, user_input)
selected_columns_combined = selected_columns_combined.applymap(lambda x: 'N/A' if isinstance(x, str) and x.strip() == '' else x)
selected_columns_combined = selected_columns_combined.fillna('N/A')
selected_columns_combined = selected_columns_combined.replace(r'(?i)\bn/a\b', 'N/A', regex=True)
kelma = get_st_op_pattern(selected_columns_combined, user_input)
if "door_type" in selected_columns_combined.columns:
col_dict = get_similar_colors_all(selected_columns_combined)
flattened_list = get_flattened_tuples_list_all(col_dict)
else:
if secondary_presence:
main_info = main_info + [""]
flattened_list = get_flattened_tuples_list_no_doortype(selected_columns_combined)
plan_texts = read_text(p)
if secondary_presence:
locations, not_found = get_word_locations_plan_secondary(flattened_list,plan_texts, main_info, secondary_info)
not_found_list.append(not_found)
else:
locations, not_found = get_word_locations_plan(flattened_list,plan_texts)
not_found_list.append(not_found)
## Getting the not found in all plans
flattened_not_found_list = [item for sublist in not_found_list for item in sublist]
from collections import Counter
counts_not_found = Counter(flattened_not_found_list)
not_found_any_plan = []
for key, value in counts_not_found.items():
if value == len(plan):
not_found_any_plan.append(key)
not_found_any_plan = [item for item in not_found_any_plan if item != "N/A"]
return no_tables, not_found_any_plan
def get_df_csv(sch):
with open(sch, "rb") as f:
raw = f.read(100_000) # read first 100 KB (enough for detection)
guess = chardet.detect(raw)
#print(guess) # {'encoding': 'Windows-1252', 'confidence': 0.73, ...}
encoding = guess["encoding"] or "utf-8" # fallback
df = pd.read_csv(sch, encoding=encoding)
return df
def mainRun(schedule, plan, searcharray, sch_csv_pdf):
if sch_csv_pdf:
print("shcedule type is PDF")
no_tables_normal, not_found_any_plan_normal = pick_approach(schedule, plan, searcharray, 1)
try:
no_tables_model, not_found_any_plan_model = pick_approach(schedule, plan, searcharray, 2)
except:
print("Model detection has issue of file too large")
#no_tables_model = True
pick_normal = False
pick_model = False
if no_tables_model:
pick_normal = True
#print("choose normal")
elif no_tables_normal:
pick_model = True
#print("choose model")
elif no_tables_model and no_tables_normal:
print("el etneen bayzeen")
else:
## Decide according to the not found labels
#print("el etneen shaghaleen")
if len(not_found_any_plan_model) > len(not_found_any_plan_normal):
#print("choose not_found_any_plan_normal")
pick_normal = True
elif len(not_found_any_plan_model) < len(not_found_any_plan_normal):
pick_model = True
#print("choose not_found_any_plan_model")
else: # law ad ba3d choose the older approach (fitz)
pick_normal = True
#print("choose any")
else:
print("schedule type is CSV")
df = get_df_csv(schedule[0])
print(df)
print("mainRun is RUNNING")
#print(type(plan))
eltype = type(plan)
print(f"el type beta3 variable plan:: {eltype}")
len_plan = len(plan)
print(f"length of the plan's array is: {len_plan}")
p1_type = type(plan[0])
print(f"el mawgood fe p[0]: {p1_type}")
print(f"length of search array: {len(searcharray)}")
#dfs = extract_tables(schedule)
print(f"type of schedule: {type(schedule)}")
print(f"length of schedules: {len(schedule)}")
pdf_widths = []
pdf_heights = []
pdfs_count_type = []
annotation_counter = 0
page_number = 0
bax_annotations_all_inputs = [] #for the same plan
#pdfs = []
not_found_list = []
repeated_labels_list = []
missings = []
for p in plan:
annotation_counter +=1
page_number +=1
pdf_document = fitz.open("pdf", p)
# Get the first page (0-indexed)
page = pdf_document[0]
rect = page.rect # Rectangle: contains x0, y0, x1, y1
width_plan = page.cropbox.width # or: width = rect.x1 - rect.x0
height_plan = page.cropbox.height # or: height = rect.y1 - rect.y0
#width_plan = math.ceil(width_plan)
#height_plan = math.ceil(height_plan)
for k in range(len(schedule)):
if sch_csv_pdf and pick_normal:
dfs = extract_tables(schedule[k])
if sch_csv_pdf and pick_model:
dfs = extract_tables_model(schedule[k])
if sch_csv_pdf == False:
df = get_df_csv(schedule[k])
dfs = [df]
user_input_this_schedule = searcharray[k]
for j in range(len(user_input_this_schedule)):
user_input = user_input_this_schedule[j]
secondary_presence = False
if user_input[4] or user_input[5]:
secondary_presence = True
main_info_, secondary_info_ = separate_main_secondary(user_input)
main_info = [item for item in main_info_ if item]
secondary_info = [item for item in secondary_info_ if item]
print("feh secondary information")
if user_input[4]:
print("Fire rate mawgooda")
if user_input[5]:
print("Acoustic Rate mawgooda")
else:
print("mafeesh secondary information")
selected_columns_combined = get_selected_columns_all(dfs, user_input)
if sch_csv_pdf:
if selected_columns_combined is None:
dfs_normal = extract_tables(schedule[k])
column_indices = get_column_indices_from_dfs_normal(dfs_normal, user_input)
if column_indices is None:
missing_clmns = check_missing(dfs, user_input)
missing_message = f"{missing_clmns} can't be extracted from table input {j+1} in schedule {k+1}"
missings.append(missing_message)
continue # continue to the next user input
if len(dfs) == 1:
selected_columns_combined = get_selected_columns_by_index(dfs[0], column_indices, user_input)
if len(dfs) > 1:
index_df = get_df_index(dfs, user_input)
selected_columns_combined = get_selected_columns_by_index(dfs[index_df], column_indices, user_input)
selected_columns_combined = selected_columns_combined.applymap(lambda x: 'N/A' if isinstance(x, str) and x.strip() == '' else x)
selected_columns_combined = selected_columns_combined.fillna('N/A')
selected_columns_combined = selected_columns_combined.replace(r'(?i)\bn/a\b', 'N/A', regex=True)
kelma = get_st_op_pattern(selected_columns_combined, user_input)
if "door_type" in selected_columns_combined.columns:
col_dict = get_similar_colors_all(selected_columns_combined)
flattened_list = get_flattened_tuples_list_all(col_dict)
else:
if secondary_presence:
main_info = main_info + [""]
# new logic can handle it
#col_dict = get_similar_colors_all(selected_columns_combined)
flattened_list = get_flattened_tuples_list_no_doortype(selected_columns_combined)
plan_texts = read_text(p)
#locations, not_found = get_word_locations_plan_secondary(flattened_list,plan_texts, main_info, secondary_info)
#not_found_list.append(not_found)
#new_data3 = get_cleaned_data_secondary(locations,main_info,secondary_info)
#repeated_labels = get_repeated_labels(locations)
#repeated_labels = list(repeated_labels)
#repeated_labels_list.append(repeated_labels)
col_dict, not_found = add_location(col_dict, plan_texts)
not_found_list.append(not_found)
df_points = grouped_to_dataframe_dynamic(col_dict,
drop_empty_locations=True,
explode_locations=True)
#df_points.columns = df_points.columns.str.strip().str.replace(r"\s+", "_", regex=True)
# Clean column names
df_points.columns = (df_points.columns
.str.strip()
.str.replace(r"[^\w-]+", "_", regex=True)
.str.replace(r"_+", "_", regex=True)
.str.strip("_"))
print(f"col_dict: {col_dict}")
print(f"selected_columns_combined: {selected_columns_combined}")
print(f"df: {df_points}")
if df_points.empty:
continue # to the next user input
# handling no door type in the new dictionary logic
if 'color_annot' not in df_points:
df_points['color_annot'] = (0, 0, 255)
dupes = df_points['door_id'].value_counts()
repeated_ids = dupes[dupes > 1].index.to_list()
repeated_labels_list.append(repeated_ids)
if ('width' in df_points and 'height' in df_points) or 'structural_opening' in df_points:
if kelma:
lst_st_op = df_points["structural_opening"].tolist()
cleaned_st_op = get_cleaned_width(lst_st_op)
widths, heights = get_widths_bb_format_st_op(cleaned_st_op, kelma)
# remove a column (returns a new df)
df_points = df_points.drop(columns=['structural_opening'])
# add two columns (scalars, lists/arrays/Series of length len(df), or expressions)
df_points['width'] = widths # e.g., a list/Series/np.array or a scalar
df_points['height'] = heights
else:
# make sure they are strings first to keep the flow of get_width_clean_width_height function
df_points['width'] = df_points['width'].astype('string')
df_points['height'] = df_points['height'].astype('string')
lst_width = df_points["width"].tolist()
lst_height = df_points["height"].tolist()
clean_widths, clean_height = get_width_clean_width_height(lst_width, lst_height)
df_points["width"] = clean_widths
df_points["height"] = clean_height
df_points = df_points.rename(columns={'width': 'Width_', 'height':'Height_'})
#if kelma == None:
#widths, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info)
#else:
#width_info_tobeprinted, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info)
#cleaned_width = get_cleaned_width(width_info_tobeprinted)
#widths = get_widths_bb_format(cleaned_width, kelma)
#Count type annotation
#widht_count, height_count = generate_separate_dimensions(widths)
#bax = create_bb_bax_secondary(new_data3, widht_count, height_count, secondary_tobeprinted, CountStyles, user_input, page_number, page)
#bax_annotations_all_inputs.append(bax)
print(f"color_annot: {df_points['color_annot']}")
print(f"df: {df_points}")
bax, customDta = create_bb_bax_new(df_points, CountStyles, page_number, page)
bax_annotations_all_inputs.append(bax)
# if it is not byte type
#pdfs_count_type.append(convert_to_bytes(p))
pdfs_count_type.append(p)
pdf_widths.append(width_plan)
pdf_heights.append(height_plan)
merged_pdf = merge_pdf_bytes_list(pdfs_count_type)
print(f"number of pges of merged_pdf is {len(merged_pdf)} and its type is {type(merged_pdf)}")
bax_annotation = []
for bax_ann in bax_annotations_all_inputs:
bax_annotation.extend(bax_ann)
#column_order = ['FireRating', 'AcousticRating', 'Height_', 'Width_']
column_order = []
for key in customDta.keys():
column_order.append(key)
## Getting the not found in all plans
flattened_not_found_list = [item for sublist in not_found_list for item in sublist]
counts_not_found = Counter(flattened_not_found_list)
not_found_any_plan = []
for key, value in counts_not_found.items():
if value == len(pdfs_count_type):
not_found_any_plan.append(key)
flattened_repeated_labels_list = [item for sublist in repeated_labels_list for item in sublist]
pretty_xml = save_multiple_annotations_count_bax(bax_annotation, 'count_type_Windows.bax', column_order,pdf_widths,pdf_heights,page_number)
column_xml = generate_bluebeam_columns_raw(column_order)
repeated_labels = flattened_repeated_labels_list
##### SHOULD return pretty_xml, column_xml, merged_pdf
not_found = [item for item in not_found_any_plan if item != "N/A"]
annotatedimgs=[]
doc2 =fitz.open('pdf',merged_pdf)
len_doc2 = len(doc2)
list1=pd.DataFrame(columns=['content', 'id', 'subject','color'])
print(f"number of pges of doc2 is {len_doc2} and its type is {type(doc2)}")
for page in doc2:
print("now inside page in doc2")
# page=doc2[0]
pix = page.get_pixmap() # render page to an image
pl=Image.frombytes('RGB', [pix.width,pix.height],pix.samples)
img=np.array(pl)
annotatedimg = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
annotatedimgs.append(annotatedimg)
# Iterate through annotations on the page
annotations_page = page.annots()
print(f"annotations: {annotations_page}")
'''
for annot in page.annots():
# Get the color of the annotation
print("ann: {annot}")
annot_color = annot.colors
if annot_color is not None:
# annot_color is a dictionary with 'stroke' and 'fill' keys
print(annot_color)
stroke_color = annot_color.get('stroke') # Border color
fill_color = annot_color.get('fill') # Fill color
if fill_color:
v='fill'
# print('fill')
if stroke_color:
v='stroke'
x,y,z=int(annot_color.get(v)[0]*255),int(annot_color.get(v)[1]*255),int(annot_color.get(v)[2]*255)
print(f"x: {x}")
print(f"y: {y}")
print(f"z: {z}")
list1.loc[len(list1)] =[annot.info['content'],annot.info['id'],annot.info['subject'],[x,y,z]]
print(f"list1 : {list1}")
'''
return annotatedimgs, doc2 , list1, repeated_labels , not_found, pretty_xml, column_xml