|
|
from collections import defaultdict |
|
|
from collections import Counter |
|
|
import pandas as pd |
|
|
import random |
|
|
import math |
|
|
import re |
|
|
import io |
|
|
import pypdfium2 as pdfium |
|
|
import fitz |
|
|
from PIL import Image, ImageDraw |
|
|
from PyPDF2 import PdfReader, PdfWriter |
|
|
from PyPDF2.generic import TextStringObject, NameObject, ArrayObject, FloatObject |
|
|
from PyPDF2.generic import NameObject, TextStringObject, DictionaryObject, FloatObject, ArrayObject |
|
|
from PyPDF2 import PdfReader |
|
|
from PyPDF2.generic import TextStringObject |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from collections import defaultdict |
|
|
import random |
|
|
import fitz |
|
|
import PyPDF2 |
|
|
import io |
|
|
from PyPDF2.generic import TextStringObject |
|
|
from PyPDF2 import PdfReader, PdfWriter |
|
|
import zlib |
|
|
import base64 |
|
|
import datetime |
|
|
import uuid |
|
|
from xml.etree.ElementTree import Element, SubElement, tostring, ElementTree |
|
|
from xml.dom.minidom import parseString |
|
|
from collections import defaultdict |
|
|
from xml.etree.ElementTree import Element, SubElement, tostring |
|
|
from azure.ai.formrecognizer import DocumentAnalysisClient |
|
|
from azure.core.credentials import AzureKeyCredential |
|
|
import chardet |
|
|
|
|
|
|
|
|
def convert2img(path): |
|
|
pdf = pdfium.PdfDocument(path) |
|
|
page = pdf.get_page(0) |
|
|
pil_image = page.render().to_pil() |
|
|
pl1=np.array(pil_image) |
|
|
img = cv2.cvtColor(pl1, cv2.COLOR_RGB2BGR) |
|
|
return img |
|
|
|
|
|
def convert2pillow(path): |
|
|
pdf = pdfium.PdfDocument(path) |
|
|
page = pdf.get_page(0) |
|
|
pil_image = page.render().to_pil() |
|
|
return pil_image |
|
|
|
|
|
def calculate_midpoint(x1,y1,x2,y2): |
|
|
xm = int((x1 + x2) / 2) |
|
|
ym = int((y1 + y2) / 2) |
|
|
return (xm, ym) |
|
|
|
|
|
def read_text(input_pdf_path): |
|
|
pdf_document = fitz.open('pdf',input_pdf_path) |
|
|
|
|
|
for page_num in range(pdf_document.page_count): |
|
|
page = pdf_document[page_num] |
|
|
text_instances = page.get_text("words") |
|
|
|
|
|
page.apply_redactions() |
|
|
return text_instances |
|
|
|
|
|
def normalize_text(text): |
|
|
""" |
|
|
Normalize text by removing all whitespace characters and converting to lowercase. |
|
|
""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
|
|
|
text = re.sub(r'\s+', '', text) |
|
|
return text.lower() |
|
|
|
|
|
|
|
|
def build_flexible_regex(term): |
|
|
""" |
|
|
Match the full string, allowing whitespace or light punctuation between words, |
|
|
but not allowing extra words or partial matches. |
|
|
""" |
|
|
words = normalize_text(term).split() |
|
|
pattern = r'[\s\.\:\-]*'.join(map(re.escape, words)) |
|
|
full_pattern = rf'^{pattern}$' |
|
|
return re.compile(full_pattern, re.IGNORECASE) |
|
|
|
|
|
def flexible_search(df, search_terms): |
|
|
""" |
|
|
Search for terms in column names and top N rows. |
|
|
Returns matched column indices and cell positions. |
|
|
""" |
|
|
normalized_columns = [normalize_text(col) for col in df.columns] |
|
|
results = {term: {"col_matches": [], "cell_matches": []} for term in search_terms} |
|
|
|
|
|
for term in search_terms: |
|
|
regex = build_flexible_regex(term) |
|
|
|
|
|
|
|
|
for col_idx, col_text in enumerate(df.columns): |
|
|
norm_col = normalize_text(col_text) |
|
|
if regex.search(norm_col): |
|
|
results[term]["col_matches"].append(col_idx) |
|
|
|
|
|
|
|
|
for row_idx in range(min(3, len(df))): |
|
|
for col_idx in range(len(df.columns)): |
|
|
cell_text = normalize_text(df.iat[row_idx, col_idx]) |
|
|
if regex.search(cell_text): |
|
|
results[term]["cell_matches"].append((row_idx, col_idx)) |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
"""def generate_current_table_without_cropping(clm_idx, clmn_name, df): |
|
|
selected_df = df.iloc[:, clm_idx] |
|
|
print("hello I generated the selected columns table without cropping") |
|
|
selected_df.columns = clmn_name |
|
|
return selected_df""" |
|
|
|
|
|
def generate_current_table_without_cropping(clm_idx,df): |
|
|
selected_df = df.iloc[:, clm_idx] |
|
|
print("hello I generated the selected columns table without cropping") |
|
|
return selected_df |
|
|
|
|
|
def crop_rename_table(indices, clmn_name, clmn_idx,df): |
|
|
|
|
|
crop_at = max(indices) + 1 |
|
|
|
|
|
df = df.iloc[crop_at:] |
|
|
df.reset_index(drop=True, inplace=True) |
|
|
|
|
|
|
|
|
slctd_clms = df.iloc[:, clmn_idx] |
|
|
slctd_clms.columns = clmn_name |
|
|
|
|
|
return slctd_clms |
|
|
|
|
|
def clean_column_row(row): |
|
|
return [re.sub(r'^\d+-\s*', '', str(cell)) for cell in row] |
|
|
|
|
|
def details_in_another_table(clmn_name, clmn_idx, current_dfs, dfs): |
|
|
matching_dfs = [ |
|
|
dff for dff in dfs |
|
|
if dff is not current_dfs and current_dfs.shape[1] == dff.shape[1] |
|
|
] |
|
|
|
|
|
if not matching_dfs: |
|
|
return None |
|
|
|
|
|
updated_dfs = [] |
|
|
for dff in matching_dfs: |
|
|
selected_dff = dff.iloc[:, clmn_idx].copy() |
|
|
|
|
|
|
|
|
cleaned_header = clean_column_row(selected_dff.columns.tolist()) |
|
|
col_names_as_row = pd.DataFrame([cleaned_header]) |
|
|
|
|
|
|
|
|
selected_dff.columns = clmn_name |
|
|
col_names_as_row.columns = clmn_name |
|
|
|
|
|
|
|
|
temp_df = pd.concat([col_names_as_row, selected_dff], ignore_index=True) |
|
|
updated_dfs.append(temp_df) |
|
|
|
|
|
combined_df = pd.concat(updated_dfs, ignore_index=True) |
|
|
|
|
|
return combined_df |
|
|
|
|
|
def map_user_input_to_standard_labels(user_inputs): |
|
|
patterns = { |
|
|
'door_id': r'\b(?:door\s*)?(?:id|no|number)\b|\bdoor\s*name\b', |
|
|
'door_type': r'\b(?:\S+\s+)?door\s*type\b|\btype(?:\s+\w+)?\b', |
|
|
'structural_opening': r'\bstructural\s+opening\b', |
|
|
'width': r'\bwidth\b', |
|
|
'height': r'\bheight\b', |
|
|
} |
|
|
|
|
|
def normalize(text): |
|
|
return re.sub(r'\s+', ' ', text.strip(), flags=re.MULTILINE).lower() |
|
|
|
|
|
mapped = {} |
|
|
|
|
|
for item in user_inputs: |
|
|
normalized_item = normalize(item) |
|
|
matched = False |
|
|
for label, pattern in patterns.items(): |
|
|
if label not in mapped and re.search(pattern, normalized_item, re.IGNORECASE): |
|
|
mapped[label] = item |
|
|
matched = True |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
return mapped |
|
|
|
|
|
def analyse_cell_columns(cell_columns_appearance): |
|
|
cell_matches = [] |
|
|
col_matches = [] |
|
|
for key in cell_columns_appearance.keys(): |
|
|
if len(cell_columns_appearance[key]['cell_matches']) >0: |
|
|
cell_matches.append(cell_columns_appearance[key]['cell_matches'][0]) |
|
|
if len(cell_columns_appearance[key]['col_matches']) >0: |
|
|
col_matches.append(cell_columns_appearance[key]['col_matches'][0]) |
|
|
return cell_matches, col_matches |
|
|
|
|
|
|
|
|
def get_row_column_indices(cell_clmn_indx): |
|
|
row_index = [] |
|
|
column_index = [] |
|
|
for t in cell_clmn_indx: |
|
|
row_index.append(t[0]) |
|
|
column_index.append(t[1]) |
|
|
return row_index, column_index |
|
|
|
|
|
|
|
|
def get_column_index(col_matches): |
|
|
idx = [] |
|
|
for t in col_matches: |
|
|
idx.append(t) |
|
|
return idx |
|
|
|
|
|
|
|
|
def extract_tables(schedule): |
|
|
doc = fitz.open("pdf",schedule) |
|
|
for page in doc: |
|
|
tabs = page.find_tables() |
|
|
dfs = [] |
|
|
for tab in tabs: |
|
|
df = tab.to_pandas() |
|
|
dfs.append(df) |
|
|
return dfs |
|
|
|
|
|
def get_selected_columns(dfs, user_patterns): |
|
|
selected_columns = [] |
|
|
selected_columns_new = None |
|
|
|
|
|
for i in range(len(dfs)): |
|
|
cell_columns_appearance = flexible_search(dfs[i], user_patterns) |
|
|
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) |
|
|
|
|
|
|
|
|
|
|
|
if len(user_patterns) == 2: |
|
|
clmn_name = ["door_id", "door_type"] |
|
|
if len(user_patterns) == 4: |
|
|
clmn_name = ["door_id", "door_type", "width", "height"] |
|
|
if len(user_patterns) == 3: |
|
|
clmn_name = ["door_id", "door_type", "structural opening"] |
|
|
if len(cell_matches) == 0 and len(col_matches) == 0: |
|
|
print(f"this is df {i}, SEARCH IN ANOTHER DF") |
|
|
else: |
|
|
|
|
|
if len(col_matches) == len(user_patterns): |
|
|
column_index_list = get_column_index(col_matches) |
|
|
print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") |
|
|
|
|
|
print(column_index_list) |
|
|
if len(dfs[i]) <10: |
|
|
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
|
|
|
|
|
|
|
|
if len(dfs[i]) >10: |
|
|
selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) |
|
|
|
|
|
|
|
|
|
|
|
if len(cell_matches) == len(user_patterns): |
|
|
row_index_list, column_index_list = get_row_column_indices(cell_matches) |
|
|
print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") |
|
|
|
|
|
|
|
|
if len(dfs[i]) <10: |
|
|
|
|
|
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
|
|
break |
|
|
|
|
|
if len(dfs[i]) >10: |
|
|
print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") |
|
|
selected_columns_new = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) |
|
|
break |
|
|
return selected_columns_new |
|
|
|
|
|
|
|
|
|
|
|
def separate_main_secondary(input_user_clmn_names): |
|
|
main_info = input_user_clmn_names[:4] |
|
|
secondary_info = input_user_clmn_names[4:] |
|
|
return main_info, secondary_info |
|
|
|
|
|
|
|
|
|
|
|
def get_column_name(user_input_m): |
|
|
|
|
|
empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] |
|
|
|
|
|
|
|
|
fixed_list = ["door_id", "door_type", "width", "height"] |
|
|
for i in range(len(empty_indices)): |
|
|
if empty_indices[i] == 3 and empty_indices[i - 1] == 2: |
|
|
fixed_list[2] = "" |
|
|
if empty_indices[i] == 3 and not empty_indices[i - 1] == 2: |
|
|
fixed_list[2] = "structural_opening" |
|
|
fixed_list[empty_indices[i]] = "" |
|
|
|
|
|
|
|
|
clmn_name_m = [i for i in fixed_list if i] |
|
|
|
|
|
return clmn_name_m |
|
|
|
|
|
|
|
|
def get_column_name_secondary(user_input_m): |
|
|
|
|
|
empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] |
|
|
|
|
|
|
|
|
fixed_list = ["fire_rate", "acoustic_rate"] |
|
|
for i in range(len(empty_indices)): |
|
|
fixed_list[empty_indices[i]] = "" |
|
|
|
|
|
|
|
|
clmn_name_m = [i for i in fixed_list if i] |
|
|
|
|
|
return clmn_name_m |
|
|
|
|
|
|
|
|
|
|
|
def extract_tables_model(schedule_byte): |
|
|
|
|
|
endpoint = "https://tabledetection2.cognitiveservices.azure.com/" |
|
|
key = "5lr94dODMJihbGOMw2Vdz29zXRBiqt528fSGoGmzSJHTrWtHSnRdJQQJ99BEACYeBjFXJ3w3AAALACOGBANH" |
|
|
|
|
|
|
|
|
client = DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key)) |
|
|
|
|
|
poller = client.begin_analyze_document("prebuilt-layout", document=schedule_byte) |
|
|
|
|
|
|
|
|
result = poller.result() |
|
|
|
|
|
|
|
|
|
|
|
import pandas as pd |
|
|
|
|
|
tables = [] |
|
|
|
|
|
for table in result.tables: |
|
|
max_cols = max(cell.column_index for cell in table.cells) + 1 |
|
|
max_rows = max(cell.row_index for cell in table.cells) + 1 |
|
|
table_data = [["" for _ in range(max_cols)] for _ in range(max_rows)] |
|
|
|
|
|
for cell in table.cells: |
|
|
table_data[cell.row_index][cell.column_index] = cell.content |
|
|
|
|
|
df = pd.DataFrame(table_data) |
|
|
tables.append(df) |
|
|
return tables |
|
|
|
|
|
|
|
|
def get_selected_columns_all(dfs, user_patterns): |
|
|
selected_columns = [] |
|
|
selected_columns_new = None |
|
|
|
|
|
for i in range(len(dfs)): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
main_info, secondary_info = separate_main_secondary(user_patterns) |
|
|
clmn_name_main = get_column_name(main_info) |
|
|
non_empty_main_info = [item for item in main_info if item] |
|
|
|
|
|
clmn_name_secondary = get_column_name_secondary(secondary_info) |
|
|
|
|
|
|
|
|
non_empty_secondary_info = [item for item in secondary_info if item] |
|
|
|
|
|
clmn_name = clmn_name_main + clmn_name_secondary |
|
|
non_empty_info = non_empty_main_info + non_empty_secondary_info |
|
|
|
|
|
|
|
|
print(f"clmn name: {clmn_name}") |
|
|
print(f"non-empty info: {non_empty_info}") |
|
|
|
|
|
|
|
|
|
|
|
cell_columns_appearance = flexible_search(dfs[i], non_empty_info) |
|
|
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) |
|
|
|
|
|
print(f"length of cell_matches: {len(cell_matches)}") |
|
|
print(f"cell_matches: {cell_matches}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(clmn_name) |
|
|
|
|
|
if len(cell_matches) == 0 and len(col_matches) == 0: |
|
|
print(f"this is df {i}, SEARCH IN ANOTHER DF") |
|
|
|
|
|
else: |
|
|
|
|
|
if len(col_matches) == len(non_empty_info): |
|
|
column_index_list = get_column_index(col_matches) |
|
|
print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") |
|
|
|
|
|
|
|
|
print(column_index_list) |
|
|
if len(dfs[i]) <10: |
|
|
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
|
|
|
|
|
|
|
|
|
|
|
if len(dfs[i]) >10: |
|
|
selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) |
|
|
|
|
|
|
|
|
|
|
|
if len(cell_matches) == len(non_empty_info): |
|
|
row_index_list, column_index_list = get_row_column_indices(cell_matches) |
|
|
print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
|
|
selected_columns_new2 = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) |
|
|
selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True) |
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return selected_columns_new |
|
|
|
|
|
|
|
|
def get_selected_columns_all(dfs, user_patterns): |
|
|
selected_columns = [] |
|
|
selected_columns_new = None |
|
|
|
|
|
for i in range(len(dfs)): |
|
|
|
|
|
|
|
|
|
|
|
extra_info = user_patterns[6:] |
|
|
|
|
|
main_info, secondary_info = separate_main_secondary(user_patterns) |
|
|
clmn_name_main = get_column_name(main_info) |
|
|
non_empty_main_info = [item for item in main_info if item] |
|
|
|
|
|
clmn_name_secondary = get_column_name_secondary(secondary_info) |
|
|
|
|
|
|
|
|
non_empty_secondary_info = [item for item in secondary_info if item] |
|
|
|
|
|
|
|
|
clmn_name = clmn_name_main + clmn_name_secondary + extra_info |
|
|
|
|
|
non_empty_info = non_empty_main_info + non_empty_secondary_info |
|
|
|
|
|
|
|
|
print(f"clmn name: {clmn_name}") |
|
|
print(f"non-empty info: {non_empty_info}") |
|
|
|
|
|
|
|
|
|
|
|
cell_columns_appearance = flexible_search(dfs[i], non_empty_info) |
|
|
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) |
|
|
|
|
|
print(f"length of cell_matches: {len(cell_matches)}") |
|
|
print(f"cell_matches: {cell_matches}") |
|
|
print(f"col_matches: {col_matches}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(clmn_name) |
|
|
|
|
|
if len(cell_matches) == 0 and len(col_matches) == 0: |
|
|
print(f"this is df {i}, SEARCH IN ANOTHER DF") |
|
|
|
|
|
else: |
|
|
|
|
|
if len(col_matches) == len(non_empty_info): |
|
|
column_index_list = get_column_index(col_matches) |
|
|
print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") |
|
|
|
|
|
|
|
|
print(column_index_list) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
|
|
selected_columns_new2 = generate_current_table_without_cropping(column_index_list,dfs[i]) |
|
|
selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True) |
|
|
selected_columns_new.columns = clmn_name |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(cell_matches) == len(non_empty_info): |
|
|
row_index_list, column_index_list = get_row_column_indices(cell_matches) |
|
|
print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"column names: {clmn_name}") |
|
|
print(f"column index list: {column_index_list}") |
|
|
selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) |
|
|
selected_columns_new2 = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) |
|
|
selected_columns_new = pd.concat([selected_columns_new, selected_columns_new2], ignore_index=True) |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return selected_columns_new |
|
|
|
|
|
|
|
|
|
|
|
def get_st_op_pattern(selected_columns, user_input): |
|
|
target = 'structural_opening' |
|
|
if target in selected_columns.columns: |
|
|
name = user_input[2] |
|
|
return name |
|
|
return None |
|
|
|
|
|
|
|
|
def find_text_in_plan(label, x): |
|
|
substring_coordinates = [] |
|
|
words = [] |
|
|
point_list = [] |
|
|
|
|
|
for tpl in x: |
|
|
if tpl[4] == label: |
|
|
substring_coordinates.append(calculate_midpoint(tpl[0],tpl[1],tpl[2],tpl[3])) |
|
|
point_list.append(calculate_midpoint(tpl[1],tpl[0],tpl[3],tpl[2])) |
|
|
words.append(tpl[4]) |
|
|
return substring_coordinates, words, point_list |
|
|
|
|
|
|
|
|
def get_selected_columns_by_index(df, column_index_list, user_patterns): |
|
|
selected_df = df.iloc[:, column_index_list] |
|
|
|
|
|
|
|
|
main_info, secondary_info = separate_main_secondary(user_patterns) |
|
|
clmn_name_main = get_column_name(main_info) |
|
|
clmn_name_secondary = get_column_name_secondary(secondary_info) |
|
|
clmn_name = clmn_name_main + clmn_name_secondary |
|
|
|
|
|
print(f"clmn_name from the function el 3amla moshkela: {clmn_name}") |
|
|
selected_df.columns = clmn_name |
|
|
|
|
|
return selected_df |
|
|
|
|
|
|
|
|
def get_column_indices_from_dfs_normal(dfs, user_patterns): |
|
|
for i in range(len(dfs)): |
|
|
|
|
|
main_info, secondary_info = separate_main_secondary(user_patterns) |
|
|
|
|
|
clmn_name_main = get_column_name(main_info) |
|
|
non_empty_main_info = [item for item in main_info if item] |
|
|
|
|
|
clmn_name_secondary = get_column_name_secondary(secondary_info) |
|
|
non_empty_secondary_info = [item for item in secondary_info if item] |
|
|
|
|
|
clmn_name = clmn_name_main + clmn_name_secondary |
|
|
non_empty_info = non_empty_main_info + non_empty_secondary_info |
|
|
|
|
|
|
|
|
cell_columns_appearance = flexible_search(dfs[i], non_empty_info) |
|
|
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) |
|
|
|
|
|
if len(cell_matches) == 0 and len(col_matches) == 0 and i < len(dfs) - 1: |
|
|
continue |
|
|
elif len(cell_matches) == 0 and len(col_matches) == 0: |
|
|
column_index_list = None |
|
|
else: |
|
|
|
|
|
if len(col_matches) == len(non_empty_info): |
|
|
column_index_list = get_column_index(col_matches) |
|
|
print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") |
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
if len(cell_matches) == len(non_empty_info): |
|
|
row_index_list, column_index_list = get_row_column_indices(cell_matches) |
|
|
print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") |
|
|
|
|
|
break |
|
|
|
|
|
return column_index_list |
|
|
|
|
|
|
|
|
|
|
|
def find_missing_columns(complete_list, non_complete_list): |
|
|
|
|
|
def normalize_text(text): |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
text = re.sub(r'\s+', '', text) |
|
|
return text.lower() |
|
|
def normalize_text(text): |
|
|
""" |
|
|
Normalize text by removing all whitespace, brackets, and converting to lowercase. |
|
|
""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
|
|
|
text = re.sub(r'\s+', '', text) |
|
|
|
|
|
text = re.sub(r'[\(\)\[\]\{\}]', '', text) |
|
|
return text.lower() |
|
|
|
|
|
complete_list = complete_list |
|
|
non_complete = non_complete_list |
|
|
|
|
|
|
|
|
normalized_non_complete = [normalize_text(item) for item in non_complete] |
|
|
|
|
|
missing = [] |
|
|
for item in complete_list: |
|
|
normalized_item = normalize_text(item) |
|
|
if normalized_item not in normalized_non_complete: |
|
|
missing.append(item) |
|
|
|
|
|
missing = [item for item in missing if item] |
|
|
|
|
|
|
|
|
return missing |
|
|
|
|
|
|
|
|
def check_missing(dfs, user_patterns): |
|
|
all_words = [] |
|
|
for i in range(len(dfs)): |
|
|
main_info, secondary_info = separate_main_secondary(user_patterns) |
|
|
clmn_name_main = get_column_name(main_info) |
|
|
non_empty_main_info = [item for item in main_info if item] |
|
|
|
|
|
clmn_name_secondary = get_column_name_secondary(secondary_info) |
|
|
|
|
|
|
|
|
non_empty_secondary_info = [item for item in secondary_info if item] |
|
|
|
|
|
clmn_name = clmn_name_main + clmn_name_secondary |
|
|
non_empty_info = non_empty_main_info + non_empty_secondary_info |
|
|
|
|
|
|
|
|
cell_columns_appearance = flexible_search(dfs[i], non_empty_info) |
|
|
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) |
|
|
|
|
|
words = [dfs[i].iloc[row, col] for row, col in cell_matches] |
|
|
all_words.append(words) |
|
|
|
|
|
found_words = max(all_words, key=len) |
|
|
print(found_words) |
|
|
missings = find_missing_columns(user_patterns, found_words) |
|
|
|
|
|
return missings |
|
|
|
|
|
|
|
|
def get_df_index(dfs, user_patterns): |
|
|
df_matches = [] |
|
|
for i in range(len(dfs)): |
|
|
main_info, secondary_info = separate_main_secondary(user_patterns) |
|
|
clmn_name_main = get_column_name(main_info) |
|
|
non_empty_main_info = [item for item in main_info if item] |
|
|
|
|
|
clmn_name_secondary = get_column_name_secondary(secondary_info) |
|
|
|
|
|
|
|
|
non_empty_secondary_info = [item for item in secondary_info if item] |
|
|
|
|
|
clmn_name = clmn_name_main + clmn_name_secondary |
|
|
non_empty_info = non_empty_main_info + non_empty_secondary_info |
|
|
|
|
|
|
|
|
|
|
|
cell_columns_appearance = flexible_search(dfs[i], non_empty_info) |
|
|
cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) |
|
|
|
|
|
|
|
|
if len(cell_matches) == 0 and len(col_matches) == 0: |
|
|
continue |
|
|
else: |
|
|
column_index_list_from_columns = get_column_index(col_matches) |
|
|
row_index_list, column_index_list_from_cells = get_row_column_indices(cell_matches) |
|
|
if len(column_index_list_from_columns) > len(column_index_list_from_cells): |
|
|
df_matches.append((column_index_list_from_columns,i)) |
|
|
else: |
|
|
df_matches.append((column_index_list_from_cells,i)) |
|
|
|
|
|
longest_list = max(df_matches, key=lambda x: len(x[0])) |
|
|
|
|
|
index_longest_list = longest_list[1] |
|
|
|
|
|
|
|
|
|
|
|
return index_longest_list |
|
|
|
|
|
def get_word_locations_plan(flattened_list, plan_texts): |
|
|
locations = [] |
|
|
not_found = [] |
|
|
|
|
|
if len(flattened_list[0]) == 2: |
|
|
for lbl, clr in flattened_list: |
|
|
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
|
|
if len(location) ==0: |
|
|
not_found.append(lbl) |
|
|
locations.append((location, lbl, clr)) |
|
|
|
|
|
if len(flattened_list[0]) == 3: |
|
|
for lbl, w, clr in flattened_list: |
|
|
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
|
|
if len(location) ==0: |
|
|
not_found.append(lbl) |
|
|
locations.append((location, lbl, clr, w)) |
|
|
if len(flattened_list[0]) == 4: |
|
|
for lbl, w, h, clr in flattened_list: |
|
|
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
|
|
if len(location) ==0: |
|
|
not_found.append(lbl) |
|
|
locations.append((location, lbl, clr, w, h)) |
|
|
return locations, not_found |
|
|
|
|
|
|
|
|
def get_repeated_labels(locations): |
|
|
seen_labels = set() |
|
|
repeated_labels = set() |
|
|
|
|
|
for item in locations: |
|
|
label = item[1] |
|
|
if label in seen_labels: |
|
|
repeated_labels.add(label) |
|
|
else: |
|
|
seen_labels.add(label) |
|
|
return repeated_labels |
|
|
|
|
|
def get_cleaned_data(locations): |
|
|
processed = defaultdict(int) |
|
|
|
|
|
new_data = [] |
|
|
if len(locations[0]) == 3: |
|
|
for coords, label, color in locations: |
|
|
if len(coords)>1: |
|
|
index = processed[label] % len(coords) |
|
|
new_coord = [coords[index]] |
|
|
new_data.append((new_coord, label, color)) |
|
|
processed[label] += 1 |
|
|
if len(coords)==1: |
|
|
new_data.append((coords, label, color)) |
|
|
|
|
|
if len(locations[0]) == 4: |
|
|
for coords, label, color, w in locations: |
|
|
if len(coords)>1: |
|
|
index = processed[label] % len(coords) |
|
|
new_coord = [coords[index]] |
|
|
new_data.append((new_coord, label, color, w)) |
|
|
processed[label] += 1 |
|
|
if len(coords)==1: |
|
|
new_data.append((coords, label, color, w)) |
|
|
if len(locations[0]) == 5: |
|
|
for coords, label, color, w, h in locations: |
|
|
if len(coords)>1: |
|
|
index = processed[label] % len(coords) |
|
|
new_coord = [coords[index]] |
|
|
new_data.append((new_coord, label, color, w, h)) |
|
|
processed[label] += 1 |
|
|
if len(coords)==1: |
|
|
new_data.append((coords, label, color, w, h)) |
|
|
|
|
|
return new_data |
|
|
|
|
|
|
|
|
|
|
|
def get_width_info_tobeprinted(new_data): |
|
|
width_info_tobeprinted = [] |
|
|
if len(new_data[0]) < 4: |
|
|
for _,_,_, in new_data: |
|
|
width_info_tobeprinted.append("N/A mm wide x N/A mm high") |
|
|
if len(new_data[0]) == 4: |
|
|
for _,_,_, w in new_data: |
|
|
|
|
|
|
|
|
width_info_tobeprinted.append(w) |
|
|
if len(new_data[0]) == 5: |
|
|
for _,_,_, w,h in new_data: |
|
|
w = re.sub(r",", "", w) |
|
|
h = re.sub(r",", "", h) |
|
|
|
|
|
|
|
|
|
|
|
if is_not_number(w): |
|
|
w = w |
|
|
else: |
|
|
if float(w).is_integer(): |
|
|
w = int(float(w)) |
|
|
else: |
|
|
w = w |
|
|
|
|
|
|
|
|
if is_not_number(h): |
|
|
h = h |
|
|
else: |
|
|
if float(h).is_integer(): |
|
|
h = int(float(h)) |
|
|
else: |
|
|
h = h |
|
|
width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") |
|
|
return width_info_tobeprinted |
|
|
|
|
|
def clean_dimensions(text): |
|
|
|
|
|
text = re.sub(r'[,\s]*mm', '', text) |
|
|
text = text.replace(",", "") |
|
|
return text |
|
|
|
|
|
def get_cleaned_width(width_info_tobeprinted): |
|
|
cleaned_width = [] |
|
|
for w in width_info_tobeprinted: |
|
|
cleaned_width.append(clean_dimensions(w)) |
|
|
return cleaned_width |
|
|
|
|
|
|
|
|
def get_widths_bb_format(cleaned_width, kelma): |
|
|
pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b" |
|
|
match = re.search(pattern, kelma) |
|
|
widths = [] |
|
|
for widthaa in cleaned_width: |
|
|
index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x")) |
|
|
width_name = widthaa[:index] |
|
|
height_name = widthaa[index+1:] |
|
|
width_name = int(float(width_name)) |
|
|
height_name = int(float(height_name)) |
|
|
if match: |
|
|
full_text = f"{width_name} mm wide x {height_name} mm high" |
|
|
else: |
|
|
full_text = f"{height_name} mm wide x {width_name} mm high" |
|
|
widths.append(full_text) |
|
|
return widths |
|
|
|
|
|
|
|
|
|
|
|
def is_not_number(s: str) -> bool: |
|
|
try: |
|
|
float(s) |
|
|
return False |
|
|
except ValueError: |
|
|
return True |
|
|
|
|
|
|
|
|
def get_width_info_tobeprinted_secondary(new_data, main_info, secondary_info): |
|
|
width_info_tobeprinted = [] |
|
|
secondary_info_tobeprinted = [] |
|
|
|
|
|
if len(main_info) == 2 and len(secondary_info) == 1: |
|
|
for coords, label, acous, color in new_data: |
|
|
secondary_info_tobeprinted.append(acous) |
|
|
width_info_tobeprinted.append("N/A mm wide x N/A mm high") |
|
|
|
|
|
|
|
|
if len(main_info) == 2 and len(secondary_info) == 2: |
|
|
for coords, label, acous, fire, color in new_data: |
|
|
secondary_info_tobeprinted.append((acous, fire)) |
|
|
width_info_tobeprinted.append("N/A mm wide x N/A mm high") |
|
|
|
|
|
if len(main_info) == 3 and len(secondary_info) == 1: |
|
|
for coords, label, width, acous, color in new_data: |
|
|
width_info_tobeprinted.append(width) |
|
|
secondary_info_tobeprinted.append(acous) |
|
|
|
|
|
|
|
|
if len(main_info) == 3 and len(secondary_info) == 2: |
|
|
for coords, label, width, acous, fire, color in new_data: |
|
|
width_info_tobeprinted.append(width) |
|
|
secondary_info_tobeprinted.append((acous, fire)) |
|
|
|
|
|
if len(main_info) == 4 and len(secondary_info) == 1: |
|
|
for coords, label, width, height, acous, color in new_data: |
|
|
w = re.sub(r",", "", width) |
|
|
h = re.sub(r",", "", height) |
|
|
|
|
|
if is_not_number(w): |
|
|
w = w |
|
|
else: |
|
|
if float(w).is_integer(): |
|
|
w = int(float(w)) |
|
|
else: |
|
|
w = w |
|
|
|
|
|
|
|
|
if is_not_number(h): |
|
|
h = h |
|
|
else: |
|
|
if float(h).is_integer(): |
|
|
h = int(float(h)) |
|
|
else: |
|
|
h = h |
|
|
width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") |
|
|
secondary_info_tobeprinted.append((acous)) |
|
|
|
|
|
if len(main_info) == 4 and len(secondary_info) == 2: |
|
|
for coords, label, width, height, acous, fire, color in new_data: |
|
|
print(type(width)) |
|
|
print(type(height)) |
|
|
w = re.sub(r",", "", width) |
|
|
h = re.sub(r",", "", height) |
|
|
|
|
|
|
|
|
if is_not_number(w): |
|
|
w = w |
|
|
else: |
|
|
if float(w).is_integer(): |
|
|
w = int(float(w)) |
|
|
else: |
|
|
w = w |
|
|
|
|
|
|
|
|
if is_not_number(h): |
|
|
h = h |
|
|
else: |
|
|
if float(h).is_integer(): |
|
|
h = int(float(h)) |
|
|
else: |
|
|
h = h |
|
|
width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") |
|
|
secondary_info_tobeprinted.append((acous, fire)) |
|
|
return width_info_tobeprinted, secondary_info_tobeprinted |
|
|
|
|
|
def get_word_locations_plan_secondary(flattened_list, plan_texts, main_info, secondary_info): |
|
|
|
|
|
locations = [] |
|
|
not_found = [] |
|
|
len_main = len(main_info) |
|
|
len_secondary = len(secondary_info) |
|
|
|
|
|
if len_main == 2 and len_secondary == 2: |
|
|
for lbl, clr, acoustic, fire in flattened_list: |
|
|
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
|
|
if len(location) ==0: |
|
|
not_found.append(lbl) |
|
|
locations.append((location, lbl, clr, acoustic, fire)) |
|
|
|
|
|
if len_main == 2 and len_secondary == 1: |
|
|
for lbl, clr, acoustic in flattened_list: |
|
|
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
|
|
if len(location) ==0: |
|
|
not_found.append(lbl) |
|
|
locations.append((location, lbl, clr, acoustic)) |
|
|
|
|
|
|
|
|
|
|
|
if len_main == 3 and len_secondary == 2: |
|
|
for lbl, w, clr, acoustic, fire in flattened_list: |
|
|
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
|
|
if len(location) ==0: |
|
|
not_found.append(lbl) |
|
|
locations.append((location, lbl, w, clr, acoustic, fire)) |
|
|
|
|
|
if len_main == 3 and len_secondary == 1: |
|
|
for lbl, w, clr, acoustic in flattened_list: |
|
|
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
|
|
if len(location) ==0: |
|
|
not_found.append(lbl) |
|
|
locations.append((location, lbl, w, clr, acoustic)) |
|
|
|
|
|
|
|
|
|
|
|
if len_main == 4 and len_secondary == 2: |
|
|
for lbl, w, h, clr, acoustic, fire in flattened_list: |
|
|
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
|
|
if len(location) ==0: |
|
|
not_found.append(lbl) |
|
|
locations.append((location, lbl, w, h, clr, acoustic, fire)) |
|
|
|
|
|
if len_main == 4 and len_secondary == 1: |
|
|
for lbl, w, h, clr, acoustic in flattened_list: |
|
|
location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) |
|
|
if len(location) ==0: |
|
|
not_found.append(lbl) |
|
|
locations.append((location, lbl, w, h, clr,acoustic)) |
|
|
return locations, not_found |
|
|
|
|
|
|
|
|
def get_similar_colors_all(selected_columns_new): |
|
|
def generate_rgb(): |
|
|
return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) |
|
|
|
|
|
unique_keys = selected_columns_new['door_type'].unique() |
|
|
key_colors = {key: generate_rgb() for key in unique_keys} |
|
|
|
|
|
|
|
|
clmns_fields = selected_columns_new.columns.to_list() |
|
|
|
|
|
def col_template(): |
|
|
d = { |
|
|
'values': [], |
|
|
'color_annot': None |
|
|
} |
|
|
for field in clmns_fields: |
|
|
d[field] = [] |
|
|
return d |
|
|
|
|
|
col_dict = defaultdict(col_template) |
|
|
|
|
|
for _, row in selected_columns_new.iterrows(): |
|
|
key = row['door_type'] |
|
|
col_dict[key]['values'].append(row['door_id']) |
|
|
|
|
|
for field in clmns_fields: |
|
|
col_dict[key][field].append(row.get(field, None)) |
|
|
|
|
|
col_dict[key]['color_annot'] = key_colors[key] |
|
|
|
|
|
return dict(col_dict) |
|
|
|
|
|
|
|
|
def get_flattened_tuples_list_all(col_dict): |
|
|
exclude_fields = ['door_type', 'values'] |
|
|
flattened_list = [] |
|
|
|
|
|
for values_dict in col_dict.values(): |
|
|
|
|
|
list_fields = [k for k, v in values_dict.items() |
|
|
if isinstance(v, list) and k not in exclude_fields] |
|
|
n_rows = len(values_dict[list_fields[0]]) if list_fields else 0 |
|
|
|
|
|
for i in range(n_rows): |
|
|
tuple_row = tuple(values_dict[field][i] for field in list_fields) + (values_dict['color_annot'],) |
|
|
flattened_list.append(tuple_row) |
|
|
|
|
|
return flattened_list |
|
|
|
|
|
def get_flattened_tuples_list_no_doortype(selected_columns): |
|
|
flattened_list_no_color = list(selected_columns.itertuples(name=None, index=False)) |
|
|
col = (0,0,255) |
|
|
new_fl_list = [] |
|
|
for tu in flattened_list_no_color: |
|
|
new_fl_list.append(tu + (col,)) |
|
|
return new_fl_list |
|
|
|
|
|
|
|
|
def get_cleaned_data_secondary(locations, main_info, secondary_info): |
|
|
processed = defaultdict(int) |
|
|
|
|
|
new_data = [] |
|
|
if len(main_info) == 2 and len(secondary_info) == 1: |
|
|
for coords, label, color, acous in locations: |
|
|
if len(coords)>1: |
|
|
index = processed[label] % len(coords) |
|
|
new_coord = [coords[index]] |
|
|
new_data.append((new_coord, label, color, acous)) |
|
|
processed[label] += 1 |
|
|
if len(coords)==1: |
|
|
new_data.append((coords, label, color, acous)) |
|
|
|
|
|
if len(main_info) == 2 and len(secondary_info) == 2: |
|
|
for coords, label, color, acous, fire in locations: |
|
|
if len(coords)>1: |
|
|
index = processed[label] % len(coords) |
|
|
new_coord = [coords[index]] |
|
|
new_data.append((new_coord, label, color, acous, fire)) |
|
|
processed[label] += 1 |
|
|
if len(coords)==1: |
|
|
new_data.append((coords, label, color, acous, fire)) |
|
|
|
|
|
|
|
|
if len(main_info) == 3 and len(secondary_info) == 1: |
|
|
for coords, label, width, color, acous in locations: |
|
|
if len(coords)>1: |
|
|
index = processed[label] % len(coords) |
|
|
new_coord = [coords[index]] |
|
|
new_data.append((new_coord, label, width, color, acous)) |
|
|
processed[label] += 1 |
|
|
if len(coords)==1: |
|
|
new_data.append((coords, label, width, color, acous)) |
|
|
|
|
|
if len(main_info) == 3 and len(secondary_info) == 2: |
|
|
for coords, label, width, color, acous, fire in locations: |
|
|
if len(coords)>1: |
|
|
index = processed[label] % len(coords) |
|
|
new_coord = [coords[index]] |
|
|
new_data.append((new_coord, label, width, color, acous, fire)) |
|
|
processed[label] += 1 |
|
|
if len(coords)==1: |
|
|
new_data.append((coords, label, width, color, acous, fire)) |
|
|
|
|
|
if len(main_info) == 4 and len(secondary_info) == 1: |
|
|
for coords, label, width, height, color, acous in locations: |
|
|
if len(coords)>1: |
|
|
index = processed[label] % len(coords) |
|
|
new_coord = [coords[index]] |
|
|
new_data.append((new_coord, label, width, height, color, acous)) |
|
|
processed[label] += 1 |
|
|
if len(coords)==1: |
|
|
new_data.append((coords, label, width, height, color, acous)) |
|
|
|
|
|
if len(main_info) == 4 and len(secondary_info) == 2: |
|
|
for coords, label, width, height, color, acous, fire in locations: |
|
|
if len(coords)>1: |
|
|
index = processed[label] % len(coords) |
|
|
new_coord = [coords[index]] |
|
|
new_data.append((new_coord, label, width, height, color, acous, fire)) |
|
|
processed[label] += 1 |
|
|
if len(coords)==1: |
|
|
new_data.append((coords, label, width, height, color, acous, fire)) |
|
|
|
|
|
return new_data |
|
|
|
|
|
|
|
|
def merge_pdf_bytes_list(pdfs): |
|
|
writer = PdfWriter() |
|
|
|
|
|
for pdf_bytes in pdfs: |
|
|
pdf_stream = io.BytesIO(pdf_bytes) |
|
|
reader = PdfReader(pdf_stream) |
|
|
for page in reader.pages: |
|
|
writer.add_page(page) |
|
|
|
|
|
output_stream = io.BytesIO() |
|
|
writer.write(output_stream) |
|
|
output_stream.seek(0) |
|
|
|
|
|
return output_stream.read() |
|
|
|
|
|
|
|
|
def calculate_bounding_rect_count(vertices,padding): |
|
|
x, y = vertices[0] |
|
|
xmin = x - padding |
|
|
ymin = y - padding |
|
|
xmax = x + padding |
|
|
ymax = y + padding |
|
|
return [xmin, ymin, xmax, ymax] |
|
|
|
|
|
def rgb_string_to_hex(rgb_string): |
|
|
r, g, b = map(float, rgb_string.strip().split()) |
|
|
return '#{:02X}{:02X}{:02X}'.format(int(r * 255), int(g * 255), int(b * 255)) |
|
|
|
|
|
|
|
|
def generate_annotation_xml_block_count(vertices, area_text, author, custom_data: dict, column_order: list, index: int, |
|
|
label: str = '',height:str='',width:str='', |
|
|
color:str='',countstyle:str='',countsize:str=''): |
|
|
now = datetime.datetime.utcnow() |
|
|
mod_date = now.strftime("D:%Y%m%d%H%M%S+00'00'") |
|
|
creation_date = now.isoformat() + 'Z' |
|
|
id_str = "fitz-" + uuid.uuid4().hex[:4].upper() |
|
|
|
|
|
vert_str = ' '.join([f'{x:.4f}' for point in vertices for x in point]) |
|
|
ordered_column_values = [f'({custom_data.get(col, "")})' for col in column_order] |
|
|
bsi_column_data = ''.join(ordered_column_values) |
|
|
|
|
|
type_internal= 'Bluebeam.PDF.Annotations.AnnotationMeasureCount' |
|
|
subject ='Count Measurement' |
|
|
padding=10 |
|
|
rectvertices=calculate_bounding_rect_count(vertices,padding) |
|
|
bbmeasure = '''<</Type/Measure |
|
|
/Subtype/RL |
|
|
/R(1 mm = 1 mm) |
|
|
/X[<</Type/NumberFormat/U(mm)/C 0.3527778/D 100/SS()>>] |
|
|
/D[<</Type/NumberFormat/U(mm)/C 1/D 100/SS()>>] |
|
|
/A[<</Type/NumberFormat/U(sq mm)/C 1/D 100/FD true/SS()>>] |
|
|
/T[<</Type/NumberFormat/U(\\260)/C 1/D 100/FD true/PS()/SS()>>] |
|
|
/V[<</Type/NumberFormat/U(cu mm)/C 1/D 100/FD true/SS()>>] |
|
|
/TargetUnitConversion 0.3527778>>''' |
|
|
|
|
|
raw_text = f'''<< |
|
|
/Version 1 |
|
|
/DS(font: Helvetica 12pt; text-align:center; line-height:13.8pt; color:#FF0000) |
|
|
/CountStyle{countstyle} |
|
|
/CountScale {countsize} |
|
|
/MeasurementTypes 128 |
|
|
/BBMeasure{bbmeasure} |
|
|
/NumCounts {area_text} |
|
|
/AP<</N/BBObjPtr_{uuid.uuid4().hex.upper()}>> |
|
|
/IT/PolygonCount |
|
|
/Vertices[{vert_str}] |
|
|
/IC[{color}] |
|
|
/T({author}) |
|
|
/CreationDate({mod_date}) |
|
|
/BSIColumnData[{bsi_column_data}] |
|
|
/RC(<?xml version="1.0"?><body xmlns:xfa="http://www.xfa.org/schema/xfa-data/1.0/" xfa:contentType="text/html" xfa:APIVersion="BluebeamPDFRevu:2018" xfa:spec="2.2.0" style="font:Helvetica 12pt; text-align:center; line-height:13.8pt; color:#FF0000" xmlns="http://www.w3.org/1999/xhtml"><p>{area_text}</p></body>) |
|
|
/Label({label}) |
|
|
/Height {height} |
|
|
/Width {width} |
|
|
/Subj({subject}) |
|
|
/NM({id_str}) |
|
|
/Subtype/Polygon |
|
|
/Rect[{rectvertices[0]} {rectvertices[1]} {rectvertices[2]} {rectvertices[3]}] |
|
|
/Contents({area_text}) |
|
|
/F 4 |
|
|
/C[{color}] |
|
|
/BS<</Type/Border/W 0/S/S>> |
|
|
/M({mod_date}) |
|
|
>>'''.encode('utf-8') |
|
|
|
|
|
compressed = zlib.compress(raw_text) |
|
|
base64_raw = base64.b16encode(compressed).lower().decode() |
|
|
|
|
|
annotation = Element('Annotation') |
|
|
SubElement(annotation, 'Page') |
|
|
SubElement(annotation, 'Contents').text = area_text |
|
|
SubElement(annotation, 'ModDate').text = creation_date |
|
|
SubElement(annotation, 'Color').text = rgb_string_to_hex(color) |
|
|
SubElement(annotation, 'Type').text = 'Polygon' |
|
|
SubElement(annotation, 'ID').text = id_str |
|
|
SubElement(annotation, 'TypeInternal').text = type_internal |
|
|
SubElement(annotation, 'Raw').text = base64_raw |
|
|
SubElement(annotation, 'Index').text = str(index) |
|
|
|
|
|
custom = SubElement(annotation, 'Custom') |
|
|
for key, value in custom_data.items(): |
|
|
SubElement(custom, key).text = value |
|
|
|
|
|
SubElement(annotation, 'Subject').text = subject |
|
|
SubElement(annotation, 'CreationDate').text = creation_date |
|
|
SubElement(annotation, 'Author').text = author |
|
|
SubElement(annotation, 'Label').text = label |
|
|
SubElement(annotation, 'Height').text = height |
|
|
SubElement(annotation, 'Width').text = width |
|
|
|
|
|
|
|
|
return annotation |
|
|
|
|
|
|
|
|
def save_multiple_annotations_count_bax(annotations, output_path, column_order,pdfWidth,pdfHeight, num_pages): |
|
|
""" |
|
|
annotations: list of dicts, each with: |
|
|
- vertices: [x, y] |
|
|
- text: str (label) |
|
|
- author: ADR |
|
|
- custom_data: dict of custom field values |
|
|
- type_internal: str (e.g., Bluebeam.PDF.Annotations.AnnotationMeasureCount) |
|
|
- subject: str (e.g., Count Measurement) |
|
|
""" |
|
|
doc = Element('Document', Version='1') |
|
|
|
|
|
annotations_by_page = defaultdict(list) |
|
|
for ann in annotations: |
|
|
page_num = ann.get('page', 1) |
|
|
annotations_by_page[page_num].append(ann) |
|
|
|
|
|
|
|
|
for page_index in range(num_pages): |
|
|
page = SubElement(doc, 'Page', Index=str(page_index)) |
|
|
SubElement(page, 'Label').text = str(page_index + 1) |
|
|
SubElement(page, 'Width').text = str(pdfWidth) |
|
|
SubElement(page, 'Height').text = str(pdfHeight) |
|
|
|
|
|
|
|
|
for i, ann in enumerate(annotations_by_page.get(page_index + 1, [])): |
|
|
|
|
|
annotation_xml = generate_annotation_xml_block_count( |
|
|
vertices=ann['vertices'], |
|
|
area_text=ann['text'], |
|
|
author=ann['author'], |
|
|
custom_data=ann['custom_data'], |
|
|
column_order=column_order, |
|
|
index=i, |
|
|
label=ann.get('label', 'label1'), |
|
|
height=ann.get('height', '123'), |
|
|
width=ann.get('width', '123'), |
|
|
color=ann.get('color', ''), |
|
|
countstyle=ann.get('countstyle', ''), |
|
|
countsize=ann.get('countsize','') |
|
|
) |
|
|
annotation_xml.find('Page').text = str(page_index+1) |
|
|
page.append(annotation_xml) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pretty_xml= tostring(doc, encoding="unicode", method="xml") |
|
|
print(f"Saved {len(annotations)} annotations to {output_path}") |
|
|
return pretty_xml |
|
|
|
|
|
|
|
|
|
|
|
CountStyles = { |
|
|
'Circle': '/Circle', |
|
|
'Diamond':'/Diamond', |
|
|
'Triangle':'/Triangle', |
|
|
'Square':'/Square', |
|
|
'Checkmark':'/Checkmark', |
|
|
} |
|
|
|
|
|
def convert_to_bytes(input_pdf_path): |
|
|
with open(input_pdf_path, "rb") as file: |
|
|
original_pdf_bytes = file.read() |
|
|
return original_pdf_bytes |
|
|
|
|
|
def mirrored_points(x, y, height_plan): |
|
|
|
|
|
mirrored = [] |
|
|
mirrored.append([x, height_plan - y]) |
|
|
return mirrored |
|
|
def point_mupdf_to_pdf(x, y, page): |
|
|
mediabox = page.mediabox |
|
|
H = float(mediabox.height) |
|
|
|
|
|
pdf_x = mediabox.x0 + x |
|
|
pdf_y = mediabox.y0 + (H - y) |
|
|
|
|
|
return [[pdf_x, pdf_y]] |
|
|
|
|
|
def create_bb_bax_secondary(new_data, widthat, heightat, secondary_tobeprinted, CountStyles, input_user_clmn_names, page_number, height_plan): |
|
|
bax_annotations = [] |
|
|
for i in range(len(new_data)): |
|
|
r,g,b = new_data[i][len(new_data[i])-1] |
|
|
R = str(float(r/255)) |
|
|
G = str(float(g/255)) |
|
|
B = str(float(b/255)) |
|
|
|
|
|
vertix = point_mupdf_to_pdf(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) |
|
|
if input_user_clmn_names[4] and input_user_clmn_names[5]: |
|
|
bax_annotations.append({ |
|
|
'vertices': vertix, |
|
|
'text': '1', |
|
|
'author': 'ADR', |
|
|
'custom_data': {'FireRating': secondary_tobeprinted[i][0], 'AcousticRating': secondary_tobeprinted[i][1], 'Height_': heightat[i],'Width_': widthat[i]} , |
|
|
'label': new_data[i][1], |
|
|
'Height': heightat[i], |
|
|
'Width':widthat[i], |
|
|
'page' : page_number, |
|
|
'color':R+ ' '+G + ' '+B, |
|
|
'countstyle': CountStyles['Circle'], |
|
|
'countsize':'0.8' |
|
|
}) |
|
|
else: |
|
|
|
|
|
if input_user_clmn_names[4]: |
|
|
bax_annotations.append({ |
|
|
'vertices': vertix, |
|
|
'text': '1', |
|
|
'author': 'ADR', |
|
|
'custom_data': {'FireRating': secondary_tobeprinted[i], 'AcousticRating': 'N/A', 'Height_': heightat[i],'Width_': widthat[i]} , |
|
|
'label': new_data[i][1], |
|
|
'Height': heightat[i], |
|
|
'Width':widthat[i], |
|
|
'page' : page_number, |
|
|
'color':R+ ' '+G + ' '+B, |
|
|
'countstyle': CountStyles['Circle'], |
|
|
'countsize':'0.8' |
|
|
}) |
|
|
elif input_user_clmn_names[5]: |
|
|
bax_annotations.append({ |
|
|
'vertices': vertix, |
|
|
'text': '1', |
|
|
'author': 'ADR', |
|
|
'custom_data': {'FireRating': 'N/A', 'AcousticRating': secondary_tobeprinted[i], 'Height_': heightat[i],'Width_': widthat[i]} , |
|
|
'label': new_data[i][1], |
|
|
'Height': heightat[i], |
|
|
'Width':widthat[i], |
|
|
'page' : page_number, |
|
|
'color':R+ ' '+G + ' '+B, |
|
|
'countstyle': CountStyles['Circle'], |
|
|
'countsize':'0.8' |
|
|
}) |
|
|
|
|
|
|
|
|
return bax_annotations |
|
|
|
|
|
|
|
|
def create_bb_bax(new_data, widthat, heightat, CountStyles, page_number, height_plan): |
|
|
bax_annotations = [] |
|
|
for i in range(len(new_data)): |
|
|
|
|
|
r,g,b = new_data[i][2] |
|
|
R = str(float(r/255)) |
|
|
G = str(float(g/255)) |
|
|
B = str(float(b/255)) |
|
|
|
|
|
|
|
|
vertix = point_mupdf_to_pdf(new_data[i][0][0][0], new_data[i][0][0][1], height_plan) |
|
|
bax_annotations.append({ |
|
|
'vertices': vertix, |
|
|
'text': '1', |
|
|
'author': 'ADR', |
|
|
'custom_data': {'FireRating': 'N/A', 'AcousticRating': 'N/A', 'Height_': heightat[i],'Width_': widthat[i]} , |
|
|
'label': new_data[i][1], |
|
|
'height': heightat[i], |
|
|
'width':widthat[i], |
|
|
'page' : page_number, |
|
|
'color':R+ ' '+G + ' '+B, |
|
|
'countstyle': CountStyles['Circle'], |
|
|
'countsize':'0.8' |
|
|
}) |
|
|
|
|
|
return bax_annotations |
|
|
|
|
|
|
|
|
def add_location(col_dict, plan_texts): |
|
|
not_found = [] |
|
|
for key_outer, value_outer in col_dict.items(): |
|
|
locations = [] |
|
|
for id in value_outer['door_id']: |
|
|
location, _,_ = find_text_in_plan(id, plan_texts) |
|
|
if len(location) == 0: |
|
|
not_found.append(id) |
|
|
locations.append(location) |
|
|
value_outer['location'] = locations |
|
|
return col_dict, not_found |
|
|
|
|
|
import pandas as pd |
|
|
|
|
|
def _ensure_color_tuple(x): |
|
|
if x is None or isinstance(x, tuple): |
|
|
return x |
|
|
try: |
|
|
return tuple(x) |
|
|
except Exception: |
|
|
return x |
|
|
|
|
|
def _ensure_list_of_tuples(val): |
|
|
if val is None: |
|
|
return [] |
|
|
if isinstance(val, tuple): |
|
|
return [val] |
|
|
if isinstance(val, list): |
|
|
out = [] |
|
|
for item in val: |
|
|
if item is None: |
|
|
continue |
|
|
if isinstance(item, tuple): |
|
|
out.append(item) |
|
|
elif isinstance(item, list): |
|
|
out.append(tuple(item)) |
|
|
else: |
|
|
try: |
|
|
out.append(tuple(item)) |
|
|
except Exception: |
|
|
pass |
|
|
return out |
|
|
try: |
|
|
return [tuple(val)] |
|
|
except Exception: |
|
|
return [] |
|
|
|
|
|
def grouped_to_dataframe_dynamic(grouped, keep_group=False, |
|
|
explode_locations=False, |
|
|
drop_empty_locations=False): |
|
|
rows = [] |
|
|
|
|
|
for group_key, block in grouped.items(): |
|
|
ids = block.get('door_id') or block.get('values') or [] |
|
|
list_lengths = [len(v) for v in block.values() if isinstance(v, list)] |
|
|
n = max(list_lengths + [len(ids)]) if (list_lengths or ids) else 0 |
|
|
if n == 0: |
|
|
continue |
|
|
|
|
|
for i in range(n): |
|
|
row = {} |
|
|
door_id = ids[i] if i < len(ids) else f"{group_key}:{i}" |
|
|
row['door_id'] = door_id |
|
|
|
|
|
for k, v in block.items(): |
|
|
if k == 'values': |
|
|
continue |
|
|
val = (v[i] if isinstance(v, list) and i < len(v) |
|
|
else (v if not isinstance(v, list) else None)) |
|
|
if k == 'color': |
|
|
val = _ensure_color_tuple(val) |
|
|
elif k == 'location': |
|
|
val = _ensure_list_of_tuples(val) |
|
|
row[k] = val |
|
|
|
|
|
if keep_group: |
|
|
row['source_group'] = group_key |
|
|
rows.append(row) |
|
|
|
|
|
df = pd.DataFrame(rows) |
|
|
|
|
|
|
|
|
if 'location' in df.columns: |
|
|
df['location'] = df['location'].apply(_ensure_list_of_tuples) |
|
|
|
|
|
if drop_empty_locations: |
|
|
df = df[df['location'].map(lambda xs: len(xs) > 0)].reset_index(drop=True) |
|
|
|
|
|
if explode_locations: |
|
|
|
|
|
df = df.explode('location', ignore_index=True) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
def get_width_clean_width_height(width_list, height_list): |
|
|
widths = [] |
|
|
heights = [] |
|
|
for width in width_list: |
|
|
w = re.sub(r",", "", width) |
|
|
if is_not_number(w): |
|
|
w = w |
|
|
else: |
|
|
if float(w).is_integer(): |
|
|
w = int(float(w)) |
|
|
else: |
|
|
w = w |
|
|
w = str(w) |
|
|
widths.append(w) |
|
|
for height in height_list: |
|
|
h = re.sub(r",", "", height) |
|
|
if is_not_number(h): |
|
|
h = h |
|
|
else: |
|
|
if float(h).is_integer(): |
|
|
h = int(float(h)) |
|
|
else: |
|
|
h = h |
|
|
h = str(h) |
|
|
heights.append(h) |
|
|
return widths, heights |
|
|
|
|
|
def get_widths_bb_format_st_op(cleaned_width, kelma): |
|
|
pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b" |
|
|
match = re.search(pattern, kelma) |
|
|
widths = [] |
|
|
heights = [] |
|
|
for widthaa in cleaned_width: |
|
|
index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x")) |
|
|
width_name = widthaa[:index] |
|
|
height_name = widthaa[index+1:] |
|
|
width_name = int(float(width_name)) |
|
|
height_name = int(float(height_name)) |
|
|
if match: |
|
|
full_text = f"{width_name} mm wide x {height_name} mm high" |
|
|
width = width_name |
|
|
height = height_name |
|
|
else: |
|
|
width = height_name |
|
|
height = width_name |
|
|
widths.append(width) |
|
|
heights.append(height) |
|
|
return widths, heights |
|
|
|
|
|
|
|
|
def create_bb_bax_new(df_points, CountStyles, page_number, height_plan): |
|
|
bax_annotations = [] |
|
|
exclude = {"location", "color_annot"} |
|
|
for _, row in df_points.iterrows(): |
|
|
rw = row |
|
|
customDta = row.drop(labels=exclude, errors="ignore").to_dict() |
|
|
r,g,b = rw['color_annot'] |
|
|
R = str(float(r/255)) |
|
|
G = str(float(g/255)) |
|
|
B = str(float(b/255)) |
|
|
x, y = rw['location'] |
|
|
vertix = point_mupdf_to_pdf(x, y, height_plan) |
|
|
bax_annotations.append({ |
|
|
'vertices': vertix, |
|
|
'text': '1', |
|
|
'author': 'ADR', |
|
|
'custom_data': customDta, |
|
|
'label': rw['door_id'], |
|
|
'page' : page_number, |
|
|
'color':R+ ' '+G + ' '+B, |
|
|
'countstyle': CountStyles['Circle'], |
|
|
'countsize':'0.8' |
|
|
}) |
|
|
|
|
|
|
|
|
return bax_annotations, customDta |
|
|
|
|
|
|
|
|
|
|
|
def generate_separate_dimensions(widths): |
|
|
widthat = [] |
|
|
heightat = [] |
|
|
|
|
|
pattern = r'(\d+(?:\.\d+)?)\s*mm wide x\s*(\d+(?:\.\d+)?)\s*mm high' |
|
|
for s in widths: |
|
|
match = re.match(pattern, s) |
|
|
if match: |
|
|
width = match.group(1) |
|
|
height = match.group(2) |
|
|
widthat.append(width) |
|
|
heightat.append(height) |
|
|
else: |
|
|
widthat.append("N/A") |
|
|
heightat.append("N/A") |
|
|
return widthat, heightat |
|
|
|
|
|
def generate_bluebeam_columns_raw(column_names): |
|
|
""" |
|
|
Generate BluebeamUserDefinedColumns XML as raw string, without headers or extra fields. |
|
|
""" |
|
|
root = Element("BluebeamUserDefinedColumns") |
|
|
|
|
|
for idx, name in enumerate(column_names): |
|
|
item = SubElement(root, "BSIColumnItem", Index=str(idx), Subtype="Text") |
|
|
SubElement(item, "Name").text = name |
|
|
SubElement(item, "DisplayOrder").text = str(idx) |
|
|
SubElement(item, "Deleted").text = "False" |
|
|
SubElement(item, "Multiline").text = "False" |
|
|
|
|
|
|
|
|
return tostring(root, encoding="unicode", method="xml") |
|
|
|
|
|
|
|
|
def pick_approach(schedule, plan, searcharray, flag): |
|
|
not_found_list = [] |
|
|
missings = [] |
|
|
no_tables = False |
|
|
for p in plan: |
|
|
for k in range(len(schedule)): |
|
|
if flag == 1: |
|
|
dfs = extract_tables(schedule[k]) |
|
|
if flag == 2: |
|
|
dfs = extract_tables_model(schedule[k]) |
|
|
user_input_this_schedule = searcharray[k] |
|
|
for j in range(len(user_input_this_schedule)): |
|
|
user_input = user_input_this_schedule[j] |
|
|
secondary_presence = False |
|
|
if user_input[4] or user_input[5]: |
|
|
secondary_presence = True |
|
|
main_info_, secondary_info_ = separate_main_secondary(user_input) |
|
|
main_info = [item for item in main_info_ if item] |
|
|
secondary_info = [item for item in secondary_info_ if item] |
|
|
|
|
|
selected_columns_combined = get_selected_columns_all(dfs, user_input) |
|
|
if selected_columns_combined is None: |
|
|
dfs_normal = extract_tables(schedule[k]) |
|
|
column_indices = get_column_indices_from_dfs_normal(dfs_normal, user_input) |
|
|
if column_indices is None: |
|
|
missing_clmns = check_missing(dfs, user_input) |
|
|
missing_message = f"{missing_clmns} can't be extracted from table input {j+1} in schedule {k+1}" |
|
|
missings.append(missing_message) |
|
|
no_tables = True |
|
|
continue |
|
|
if len(dfs) == 1: |
|
|
selected_columns_combined = get_selected_columns_by_index(dfs[0], column_indices, user_input) |
|
|
if len(dfs) > 1: |
|
|
index_df = get_df_index(dfs, user_input) |
|
|
selected_columns_combined = get_selected_columns_by_index(dfs[index_df], column_indices, user_input) |
|
|
selected_columns_combined = selected_columns_combined.applymap(lambda x: 'N/A' if isinstance(x, str) and x.strip() == '' else x) |
|
|
selected_columns_combined = selected_columns_combined.fillna('N/A') |
|
|
selected_columns_combined = selected_columns_combined.replace(r'(?i)\bn/a\b', 'N/A', regex=True) |
|
|
kelma = get_st_op_pattern(selected_columns_combined, user_input) |
|
|
if "door_type" in selected_columns_combined.columns: |
|
|
col_dict = get_similar_colors_all(selected_columns_combined) |
|
|
flattened_list = get_flattened_tuples_list_all(col_dict) |
|
|
else: |
|
|
if secondary_presence: |
|
|
main_info = main_info + [""] |
|
|
flattened_list = get_flattened_tuples_list_no_doortype(selected_columns_combined) |
|
|
plan_texts = read_text(p) |
|
|
|
|
|
if secondary_presence: |
|
|
locations, not_found = get_word_locations_plan_secondary(flattened_list,plan_texts, main_info, secondary_info) |
|
|
not_found_list.append(not_found) |
|
|
|
|
|
|
|
|
else: |
|
|
locations, not_found = get_word_locations_plan(flattened_list,plan_texts) |
|
|
not_found_list.append(not_found) |
|
|
|
|
|
|
|
|
flattened_not_found_list = [item for sublist in not_found_list for item in sublist] |
|
|
from collections import Counter |
|
|
counts_not_found = Counter(flattened_not_found_list) |
|
|
not_found_any_plan = [] |
|
|
for key, value in counts_not_found.items(): |
|
|
if value == len(plan): |
|
|
not_found_any_plan.append(key) |
|
|
not_found_any_plan = [item for item in not_found_any_plan if item != "N/A"] |
|
|
|
|
|
return no_tables, not_found_any_plan |
|
|
|
|
|
def get_df_csv(sch): |
|
|
with open(sch, "rb") as f: |
|
|
raw = f.read(100_000) |
|
|
guess = chardet.detect(raw) |
|
|
|
|
|
encoding = guess["encoding"] or "utf-8" |
|
|
df = pd.read_csv(sch, encoding=encoding) |
|
|
return df |
|
|
|
|
|
def mainRun(schedule, plan, searcharray, sch_csv_pdf): |
|
|
if sch_csv_pdf: |
|
|
print("shcedule type is PDF") |
|
|
no_tables_normal, not_found_any_plan_normal = pick_approach(schedule, plan, searcharray, 1) |
|
|
try: |
|
|
no_tables_model, not_found_any_plan_model = pick_approach(schedule, plan, searcharray, 2) |
|
|
except: |
|
|
print("Model detection has issue of file too large") |
|
|
|
|
|
pick_normal = False |
|
|
pick_model = False |
|
|
if no_tables_model: |
|
|
pick_normal = True |
|
|
|
|
|
elif no_tables_normal: |
|
|
pick_model = True |
|
|
|
|
|
elif no_tables_model and no_tables_normal: |
|
|
print("el etneen bayzeen") |
|
|
else: |
|
|
|
|
|
|
|
|
if len(not_found_any_plan_model) > len(not_found_any_plan_normal): |
|
|
|
|
|
pick_normal = True |
|
|
elif len(not_found_any_plan_model) < len(not_found_any_plan_normal): |
|
|
pick_model = True |
|
|
|
|
|
else: |
|
|
pick_normal = True |
|
|
|
|
|
|
|
|
else: |
|
|
print("schedule type is CSV") |
|
|
df = get_df_csv(schedule[0]) |
|
|
print(df) |
|
|
print("mainRun is RUNNING") |
|
|
|
|
|
|
|
|
eltype = type(plan) |
|
|
print(f"el type beta3 variable plan:: {eltype}") |
|
|
len_plan = len(plan) |
|
|
print(f"length of the plan's array is: {len_plan}") |
|
|
p1_type = type(plan[0]) |
|
|
print(f"el mawgood fe p[0]: {p1_type}") |
|
|
|
|
|
|
|
|
print(f"length of search array: {len(searcharray)}") |
|
|
|
|
|
print(f"type of schedule: {type(schedule)}") |
|
|
print(f"length of schedules: {len(schedule)}") |
|
|
|
|
|
pdf_widths = [] |
|
|
pdf_heights = [] |
|
|
pdfs_count_type = [] |
|
|
|
|
|
annotation_counter = 0 |
|
|
page_number = 0 |
|
|
bax_annotations_all_inputs = [] |
|
|
|
|
|
not_found_list = [] |
|
|
repeated_labels_list = [] |
|
|
missings = [] |
|
|
for p in plan: |
|
|
annotation_counter +=1 |
|
|
page_number +=1 |
|
|
pdf_document = fitz.open("pdf", p) |
|
|
|
|
|
page = pdf_document[0] |
|
|
rect = page.rect |
|
|
|
|
|
width_plan = page.cropbox.width |
|
|
height_plan = page.cropbox.height |
|
|
|
|
|
|
|
|
for k in range(len(schedule)): |
|
|
if sch_csv_pdf and pick_normal: |
|
|
dfs = extract_tables(schedule[k]) |
|
|
if sch_csv_pdf and pick_model: |
|
|
dfs = extract_tables_model(schedule[k]) |
|
|
|
|
|
if sch_csv_pdf == False: |
|
|
df = get_df_csv(schedule[k]) |
|
|
dfs = [df] |
|
|
user_input_this_schedule = searcharray[k] |
|
|
for j in range(len(user_input_this_schedule)): |
|
|
user_input = user_input_this_schedule[j] |
|
|
|
|
|
secondary_presence = False |
|
|
if user_input[4] or user_input[5]: |
|
|
secondary_presence = True |
|
|
main_info_, secondary_info_ = separate_main_secondary(user_input) |
|
|
main_info = [item for item in main_info_ if item] |
|
|
secondary_info = [item for item in secondary_info_ if item] |
|
|
print("feh secondary information") |
|
|
if user_input[4]: |
|
|
print("Fire rate mawgooda") |
|
|
if user_input[5]: |
|
|
print("Acoustic Rate mawgooda") |
|
|
else: |
|
|
print("mafeesh secondary information") |
|
|
|
|
|
selected_columns_combined = get_selected_columns_all(dfs, user_input) |
|
|
if sch_csv_pdf: |
|
|
if selected_columns_combined is None: |
|
|
dfs_normal = extract_tables(schedule[k]) |
|
|
column_indices = get_column_indices_from_dfs_normal(dfs_normal, user_input) |
|
|
if column_indices is None: |
|
|
missing_clmns = check_missing(dfs, user_input) |
|
|
missing_message = f"{missing_clmns} can't be extracted from table input {j+1} in schedule {k+1}" |
|
|
missings.append(missing_message) |
|
|
|
|
|
continue |
|
|
if len(dfs) == 1: |
|
|
selected_columns_combined = get_selected_columns_by_index(dfs[0], column_indices, user_input) |
|
|
if len(dfs) > 1: |
|
|
index_df = get_df_index(dfs, user_input) |
|
|
selected_columns_combined = get_selected_columns_by_index(dfs[index_df], column_indices, user_input) |
|
|
selected_columns_combined = selected_columns_combined.applymap(lambda x: 'N/A' if isinstance(x, str) and x.strip() == '' else x) |
|
|
selected_columns_combined = selected_columns_combined.fillna('N/A') |
|
|
selected_columns_combined = selected_columns_combined.replace(r'(?i)\bn/a\b', 'N/A', regex=True) |
|
|
kelma = get_st_op_pattern(selected_columns_combined, user_input) |
|
|
if "door_type" in selected_columns_combined.columns: |
|
|
col_dict = get_similar_colors_all(selected_columns_combined) |
|
|
flattened_list = get_flattened_tuples_list_all(col_dict) |
|
|
else: |
|
|
if secondary_presence: |
|
|
main_info = main_info + [""] |
|
|
|
|
|
|
|
|
|
|
|
flattened_list = get_flattened_tuples_list_no_doortype(selected_columns_combined) |
|
|
|
|
|
|
|
|
|
|
|
plan_texts = read_text(p) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
col_dict, not_found = add_location(col_dict, plan_texts) |
|
|
not_found_list.append(not_found) |
|
|
|
|
|
df_points = grouped_to_dataframe_dynamic(col_dict, |
|
|
drop_empty_locations=True, |
|
|
explode_locations=True) |
|
|
|
|
|
|
|
|
df_points.columns = (df_points.columns |
|
|
.str.strip() |
|
|
.str.replace(r"[^\w-]+", "_", regex=True) |
|
|
.str.replace(r"_+", "_", regex=True) |
|
|
.str.strip("_")) |
|
|
|
|
|
print(f"col_dict: {col_dict}") |
|
|
print(f"selected_columns_combined: {selected_columns_combined}") |
|
|
print(f"df: {df_points}") |
|
|
if df_points.empty: |
|
|
continue |
|
|
|
|
|
if 'color_annot' not in df_points: |
|
|
df_points['color_annot'] = (0, 0, 255) |
|
|
dupes = df_points['door_id'].value_counts() |
|
|
repeated_ids = dupes[dupes > 1].index.to_list() |
|
|
repeated_labels_list.append(repeated_ids) |
|
|
|
|
|
if ('width' in df_points and 'height' in df_points) or 'structural_opening' in df_points: |
|
|
if kelma: |
|
|
lst_st_op = df_points["structural_opening"].tolist() |
|
|
cleaned_st_op = get_cleaned_width(lst_st_op) |
|
|
widths, heights = get_widths_bb_format_st_op(cleaned_st_op, kelma) |
|
|
|
|
|
df_points = df_points.drop(columns=['structural_opening']) |
|
|
|
|
|
|
|
|
df_points['width'] = widths |
|
|
df_points['height'] = heights |
|
|
else: |
|
|
|
|
|
df_points['width'] = df_points['width'].astype('string') |
|
|
df_points['height'] = df_points['height'].astype('string') |
|
|
|
|
|
lst_width = df_points["width"].tolist() |
|
|
lst_height = df_points["height"].tolist() |
|
|
clean_widths, clean_height = get_width_clean_width_height(lst_width, lst_height) |
|
|
df_points["width"] = clean_widths |
|
|
df_points["height"] = clean_height |
|
|
df_points = df_points.rename(columns={'width': 'Width_', 'height':'Height_'}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"color_annot: {df_points['color_annot']}") |
|
|
print(f"df: {df_points}") |
|
|
bax, customDta = create_bb_bax_new(df_points, CountStyles, page_number, page) |
|
|
bax_annotations_all_inputs.append(bax) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pdfs_count_type.append(p) |
|
|
pdf_widths.append(width_plan) |
|
|
pdf_heights.append(height_plan) |
|
|
merged_pdf = merge_pdf_bytes_list(pdfs_count_type) |
|
|
print(f"number of pges of merged_pdf is {len(merged_pdf)} and its type is {type(merged_pdf)}") |
|
|
|
|
|
bax_annotation = [] |
|
|
for bax_ann in bax_annotations_all_inputs: |
|
|
bax_annotation.extend(bax_ann) |
|
|
|
|
|
|
|
|
column_order = [] |
|
|
for key in customDta.keys(): |
|
|
column_order.append(key) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
flattened_not_found_list = [item for sublist in not_found_list for item in sublist] |
|
|
counts_not_found = Counter(flattened_not_found_list) |
|
|
not_found_any_plan = [] |
|
|
for key, value in counts_not_found.items(): |
|
|
if value == len(pdfs_count_type): |
|
|
not_found_any_plan.append(key) |
|
|
|
|
|
flattened_repeated_labels_list = [item for sublist in repeated_labels_list for item in sublist] |
|
|
pretty_xml = save_multiple_annotations_count_bax(bax_annotation, 'count_type_Windows.bax', column_order,pdf_widths,pdf_heights,page_number) |
|
|
column_xml = generate_bluebeam_columns_raw(column_order) |
|
|
|
|
|
repeated_labels = flattened_repeated_labels_list |
|
|
|
|
|
not_found = [item for item in not_found_any_plan if item != "N/A"] |
|
|
annotatedimgs=[] |
|
|
doc2 =fitz.open('pdf',merged_pdf) |
|
|
len_doc2 = len(doc2) |
|
|
list1=pd.DataFrame(columns=['content', 'id', 'subject','color']) |
|
|
print(f"number of pges of doc2 is {len_doc2} and its type is {type(doc2)}") |
|
|
for page in doc2: |
|
|
print("now inside page in doc2") |
|
|
|
|
|
pix = page.get_pixmap() |
|
|
pl=Image.frombytes('RGB', [pix.width,pix.height],pix.samples) |
|
|
img=np.array(pl) |
|
|
annotatedimg = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) |
|
|
annotatedimgs.append(annotatedimg) |
|
|
|
|
|
annotations_page = page.annots() |
|
|
print(f"annotations: {annotations_page}") |
|
|
''' |
|
|
for annot in page.annots(): |
|
|
# Get the color of the annotation |
|
|
print("ann: {annot}") |
|
|
annot_color = annot.colors |
|
|
if annot_color is not None: |
|
|
# annot_color is a dictionary with 'stroke' and 'fill' keys |
|
|
print(annot_color) |
|
|
stroke_color = annot_color.get('stroke') # Border color |
|
|
fill_color = annot_color.get('fill') # Fill color |
|
|
if fill_color: |
|
|
v='fill' |
|
|
# print('fill') |
|
|
if stroke_color: |
|
|
v='stroke' |
|
|
x,y,z=int(annot_color.get(v)[0]*255),int(annot_color.get(v)[1]*255),int(annot_color.get(v)[2]*255) |
|
|
print(f"x: {x}") |
|
|
print(f"y: {y}") |
|
|
print(f"z: {z}") |
|
|
list1.loc[len(list1)] =[annot.info['content'],annot.info['id'],annot.info['subject'],[x,y,z]] |
|
|
print(f"list1 : {list1}") |
|
|
''' |
|
|
return annotatedimgs, doc2 , list1, repeated_labels , not_found, pretty_xml, column_xml |
|
|
|
|
|
|