Spaces:
Paused
Paused
| from collections import defaultdict | |
| import pandas as pd | |
| import random | |
| import re | |
| import io | |
| import pypdfium2 as pdfium | |
| import fitz | |
| from PIL import Image, ImageDraw | |
| from PyPDF2 import PdfReader, PdfWriter | |
| from PyPDF2.generic import TextStringObject, NameObject, ArrayObject, FloatObject | |
| from PyPDF2.generic import NameObject, TextStringObject, DictionaryObject, FloatObject, ArrayObject | |
| from PyPDF2 import PdfReader | |
| from PyPDF2.generic import TextStringObject | |
| import numpy as np | |
| import cv2 | |
| def convert2img(path): | |
| pdf = pdfium.PdfDocument(path) | |
| page = pdf.get_page(0) | |
| pil_image = page.render().to_pil() | |
| pl1=np.array(pil_image) | |
| img = cv2.cvtColor(pl1, cv2.COLOR_RGB2BGR) | |
| return img | |
| def convert2pillow(path): | |
| pdf = pdfium.PdfDocument(path) | |
| page = pdf.get_page(0) | |
| pil_image = page.render().to_pil() | |
| return pil_image | |
| def calculate_midpoint(x1,y1,x2,y2): | |
| xm = int((x1 + x2) / 2) | |
| ym = int((y1 + y2) / 2) | |
| return (xm, ym) | |
| def read_text(input_pdf_path): | |
| pdf_document = fitz.open('pdf',input_pdf_path) | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document[page_num] | |
| text_instances = page.get_text("words") | |
| page.apply_redactions() | |
| return text_instances | |
| def normalize_text(text): | |
| """ | |
| Normalize text by removing all whitespace characters and converting to lowercase. | |
| """ | |
| if not isinstance(text, str): | |
| return "" | |
| # Remove all whitespace characters (spaces, tabs, newlines) | |
| text = re.sub(r'\s+', '', text) | |
| return text.lower() | |
| def build_flexible_regex(term): | |
| """ | |
| Match the full string, allowing whitespace or light punctuation between words, | |
| but not allowing extra words or partial matches. | |
| """ | |
| words = normalize_text(term).split() | |
| pattern = r'[\s\.\:\-]*'.join(map(re.escape, words)) | |
| full_pattern = rf'^{pattern}$' | |
| return re.compile(full_pattern, re.IGNORECASE) | |
| def flexible_search(df, search_terms): | |
| """ | |
| Search for terms in column names and top N rows. | |
| Returns matched column indices and cell positions. | |
| """ | |
| normalized_columns = [normalize_text(col) for col in df.columns] | |
| results = {term: {"col_matches": [], "cell_matches": []} for term in search_terms} | |
| for term in search_terms: | |
| regex = build_flexible_regex(term) | |
| # Search in column names | |
| for col_idx, col_text in enumerate(df.columns): | |
| norm_col = normalize_text(col_text) | |
| if regex.search(norm_col): | |
| results[term]["col_matches"].append(col_idx) | |
| # Search in top N rows | |
| for row_idx in range(min(3, len(df))): | |
| for col_idx in range(len(df.columns)): | |
| cell_text = normalize_text(df.iat[row_idx, col_idx]) | |
| if regex.search(cell_text): | |
| results[term]["cell_matches"].append((row_idx, col_idx)) | |
| return results | |
| def generate_current_table_without_cropping(clm_idx, clmn_name, df): | |
| selected_df = df.iloc[:, clm_idx] | |
| print("hello I generated the selected columns table without cropping") | |
| selected_df.columns = clmn_name | |
| return selected_df | |
| def crop_rename_table(indices, clmn_name, clmn_idx,df): | |
| #crop_at = (max(set(indices), key=indices.count)) + 1 | |
| crop_at = max(indices) + 1 | |
| df = df.iloc[crop_at:] # Starts from row index 5 (zero-based index) | |
| df.reset_index(drop=True, inplace=True) # Reset index after cropping | |
| slctd_clms = df.iloc[:, clmn_idx] # Select columns by index | |
| slctd_clms.columns = clmn_name # Rename selected columns | |
| return slctd_clms | |
| def clean_column_row(row): | |
| return [re.sub(r'^\d+-\s*', '', str(cell)) for cell in row] | |
| def details_in_another_table(clmn_name, clmn_idx, current_dfs, dfs): | |
| matching_dfs = [ | |
| dff for dff in dfs | |
| if dff is not current_dfs and current_dfs.shape[1] == dff.shape[1] | |
| ] | |
| if not matching_dfs: | |
| return None | |
| updated_dfs = [] | |
| for dff in matching_dfs: | |
| selected_dff = dff.iloc[:, clmn_idx].copy() | |
| # Clean the column names and make them a row | |
| cleaned_header = clean_column_row(selected_dff.columns.tolist()) | |
| col_names_as_row = pd.DataFrame([cleaned_header]) | |
| # Rename columns | |
| selected_dff.columns = clmn_name | |
| col_names_as_row.columns = clmn_name | |
| # Combine the cleaned row with data | |
| temp_df = pd.concat([col_names_as_row, selected_dff], ignore_index=True) | |
| updated_dfs.append(temp_df) | |
| combined_df = pd.concat(updated_dfs, ignore_index=True) | |
| return combined_df | |
| def map_user_input_to_standard_labels(user_inputs): | |
| patterns = { | |
| 'door_id': r'\b(?:door\s*)?(?:id|no|number)\b|\bdoor\s*name\b', | |
| 'door_type': r'\b(?:\S+\s+)?door\s*type\b|\btype(?:\s+\w+)?\b', | |
| 'structural_opening': r'\bstructural\s+opening\b', | |
| 'width': r'\bwidth\b', | |
| 'height': r'\bheight\b', | |
| } | |
| def normalize(text): | |
| return re.sub(r'\s+', ' ', text.strip(), flags=re.MULTILINE).lower() | |
| mapped = {} | |
| for item in user_inputs: | |
| normalized_item = normalize(item) | |
| matched = False | |
| for label, pattern in patterns.items(): | |
| if label not in mapped and re.search(pattern, normalized_item, re.IGNORECASE): | |
| mapped[label] = item | |
| matched = True | |
| break | |
| #if not matched: | |
| # mapped[normalized_item] = None | |
| return mapped | |
| def analyse_cell_columns(cell_columns_appearance): | |
| cell_matches = [] | |
| col_matches = [] | |
| for key in cell_columns_appearance.keys(): | |
| if len(cell_columns_appearance[key]['cell_matches']) >0: | |
| cell_matches.append(cell_columns_appearance[key]['cell_matches'][0]) | |
| if len(cell_columns_appearance[key]['col_matches']) >0: | |
| col_matches.append(cell_columns_appearance[key]['col_matches'][0]) | |
| return cell_matches, col_matches | |
| # when column names are located in the cells | |
| def get_row_column_indices(cell_clmn_indx): | |
| row_index = [] | |
| column_index = [] | |
| for t in cell_clmn_indx: | |
| row_index.append(t[0]) | |
| column_index.append(t[1]) | |
| return row_index, column_index | |
| # when column names are located in the coulmns itself | |
| def get_column_index(col_matches): | |
| idx = [] | |
| for t in col_matches: | |
| idx.append(t) | |
| return idx | |
| def extract_tables(schedule): | |
| doc = fitz.open("pdf",schedule) | |
| for page in doc: | |
| tabs = page.find_tables() | |
| dfs = [] | |
| for tab in tabs: | |
| df = tab.to_pandas() | |
| dfs.append(df) | |
| return dfs | |
| def get_selected_columns(dfs, user_patterns): | |
| selected_columns = [] | |
| selected_columns_new = None # Initialize selected_columns_new to None | |
| for i in range(len(dfs)): | |
| cell_columns_appearance = flexible_search(dfs[i], user_patterns) | |
| cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) | |
| if len(user_patterns) == 2: | |
| clmn_name = ["door_id", "door_type"] | |
| if len(user_patterns) == 4: | |
| clmn_name = ["door_id", "door_type", "width", "height"] | |
| if len(user_patterns) == 3: | |
| clmn_name = ["door_id", "door_type", "structural opening"] | |
| if len(cell_matches) == 0 and len(col_matches) == 0: | |
| print(f"this is df {i}, SEARCH IN ANOTHER DF") | |
| else: | |
| #IN COLUMNS | |
| if len(col_matches) == len(user_patterns): | |
| column_index_list = get_column_index(col_matches) | |
| print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") | |
| print(column_index_list) | |
| if len(dfs[i]) <10: | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) | |
| #break | |
| #IN CELLS | |
| if len(cell_matches) == len(user_patterns): | |
| row_index_list, column_index_list = get_row_column_indices(cell_matches) | |
| print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") | |
| #details in another table | |
| if len(dfs[i]) <10: | |
| #selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| break | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") | |
| selected_columns_new = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) | |
| break | |
| return selected_columns_new | |
| def separate_main_secondary(input_user_clmn_names): | |
| main_info = input_user_clmn_names[:4] | |
| secondary_info = input_user_clmn_names[4:] | |
| return main_info, secondary_info | |
| # take main info | |
| def get_column_name(user_input_m): | |
| #get empty indices | |
| empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] | |
| # fixed column names | |
| fixed_list = ["door_id", "door_type", "width", "height"] | |
| for i in range(len(empty_indices)): | |
| if empty_indices[i] == 3: | |
| fixed_list[2] = "structural_opening" | |
| fixed_list[empty_indices[i]] = "" | |
| #finalize the column name structure | |
| clmn_name_m = [i for i in fixed_list if i] | |
| return clmn_name_m | |
| # take secondary info | |
| def get_column_name_secondary(user_input_m): | |
| #get empty indices | |
| empty_indices = [i for i, v in enumerate(user_input_m) if v == ''] | |
| # fixed column names | |
| fixed_list = ["fire_rate", "acoustic_rate"] | |
| for i in range(len(empty_indices)): | |
| fixed_list[empty_indices[i]] = "" | |
| #finalize the column name structure | |
| clmn_name_m = [i for i in fixed_list if i] | |
| return clmn_name_m | |
| #handling both main and secondary info together in one table | |
| def get_selected_columns_all(dfs, user_patterns): | |
| selected_columns = [] | |
| selected_columns_new = None # Initialize selected_columns_new to None | |
| for i in range(len(dfs)): | |
| main_info, secondary_info = separate_main_secondary(user_patterns) | |
| clmn_name_main = get_column_name(main_info) | |
| non_empty_main_info = [item for item in main_info if item] | |
| clmn_name_secondary = get_column_name_secondary(secondary_info) | |
| non_empty_secondary_info = [item for item in secondary_info if item] | |
| clmn_name = clmn_name_main + clmn_name_secondary | |
| non_empty_info = non_empty_main_info + non_empty_secondary_info | |
| #print(f"main info: {main_info}") | |
| print(f"clmn name: {clmn_name}") | |
| print(f"non-empty info: {non_empty_info}") | |
| #print(f"length of non-empty info: {len(non_empty_main_info)}") | |
| cell_columns_appearance = flexible_search(dfs[i], non_empty_info) | |
| cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) | |
| print(f"length of cell_matches: {len(cell_matches)}") | |
| print(f"cell_matches: {cell_matches}") | |
| #clmn_name = map_user_input_to_standard_labels(user_patterns) | |
| #if len(clmn_name) < len(user_patterns): | |
| print(clmn_name) | |
| if len(cell_matches) == 0 and len(col_matches) == 0: | |
| print(f"this is df {i}, SEARCH IN ANOTHER DF") | |
| else: | |
| #IN COLUMNS | |
| if len(col_matches) == len(non_empty_info): | |
| column_index_list = get_column_index(col_matches) | |
| print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") | |
| #print(len(clm_idx)) | |
| #details in another table | |
| print(column_index_list) | |
| if len(dfs[i]) <10: | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| #break | |
| #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) | |
| #break | |
| #IN CELLS | |
| if len(cell_matches) == len(non_empty_info): | |
| row_index_list, column_index_list = get_row_column_indices(cell_matches) | |
| print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") | |
| #details in another table | |
| #if len(dfs[i]) <2: | |
| #selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| selected_columns_new = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) | |
| break | |
| #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) | |
| ##details in the same table | |
| #if len(dfs[i]) >2: | |
| # #print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") | |
| #break | |
| return selected_columns_new | |
| # 3ayz akhaleehaa te search fel selected_columns column names nafsaha | |
| # 7ab2a 3ayz a3raf bardo maktooba ezay fel df el 7a2e2ya (akeed za ma el user medakhalha bezabt) | |
| def get_st_op_pattern(selected_columns, user_input): | |
| target = 'structural_opening' | |
| if target in selected_columns.columns: | |
| name = user_input[2] | |
| return name | |
| return None | |
| def get_similar_colors(selected_columns_new): | |
| def generate_rgb(): | |
| return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) # RGB tuple | |
| unique_keys = selected_columns_new['door_type'].unique() | |
| key_colors = {key: generate_rgb() for key in unique_keys} # Assign a unique RGB color to each key | |
| # Create dictionary storing values, colors, and widths | |
| if 'structural_opening' in selected_columns_new.columns: | |
| col_dict = defaultdict(lambda: {'values': [], 'color': None, 'widths': []}) | |
| else: | |
| col_dict = defaultdict(lambda: {'values': [], 'color': None, 'widths': [], 'heights': []}) | |
| if selected_columns_new.shape[1] == 2: | |
| col_dict = defaultdict(lambda: {'values': [], 'color': None}) | |
| for _, row in selected_columns_new.iterrows(): | |
| key = row['door_type'] | |
| col_dict[key]['values'].append(row['door_id']) | |
| if 'structural_opening' in selected_columns_new.columns: | |
| col_dict[key]['widths'].append(row['structural_opening']) # Add structural opening | |
| else: | |
| if selected_columns_new.shape[1] > 2: | |
| col_dict[key]['widths'].append(row['width']) # Assuming 'widht' is a typo for 'width' | |
| col_dict[key]['heights'].append(row['height']) | |
| col_dict[key]['color'] = key_colors[key] # Assign the unique RGB color | |
| # Convert defaultdict to a normal dictionary | |
| col_dict = dict(col_dict) | |
| return col_dict | |
| def get_flattened_tuples_list(col_dict): | |
| tuples_list = [] | |
| for key, values_dict in col_dict.items(): | |
| if 'heights' in values_dict and 'widths' in values_dict: | |
| # Case: Both widths and heights present | |
| tuples_list.append([ | |
| (value, width, height, values_dict["color"]) | |
| for value, width, height in zip(values_dict['values'], values_dict['widths'], values_dict['heights']) | |
| ]) | |
| elif 'widths' in values_dict: | |
| # Case: Only widths present | |
| tuples_list.append([ | |
| (value, width, values_dict["color"]) | |
| for value, width in zip(values_dict['values'], values_dict['widths']) | |
| ]) | |
| else: | |
| # Case: Neither widths nor heights | |
| tuples_list.append([ | |
| (value, values_dict["color"]) | |
| for value in values_dict['values'] | |
| ]) | |
| # Flatten the list of lists | |
| flattened_list = [item for sublist in tuples_list for item in sublist] | |
| return flattened_list | |
| def find_text_in_plan(label, x): | |
| substring_coordinates = [] | |
| words = [] | |
| point_list = [] | |
| #None, None, None | |
| for tpl in x: | |
| if tpl[4] == label: | |
| substring_coordinates.append(calculate_midpoint(tpl[0],tpl[1],tpl[2],tpl[3]))# for pdf | |
| point_list.append(calculate_midpoint(tpl[1],tpl[0],tpl[3],tpl[2]))# for rotated | |
| words.append(tpl[4]) | |
| return substring_coordinates, words, point_list | |
| def get_word_locations_plan(flattened_list, plan_texts): | |
| locations = [] | |
| not_found = [] | |
| if len(flattened_list[0]) == 2: | |
| for lbl, clr in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr)) | |
| if len(flattened_list[0]) == 3: | |
| for lbl, w, clr in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr, w)) | |
| if len(flattened_list[0]) == 4: | |
| for lbl, w, h, clr in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr, w, h)) | |
| return locations, not_found | |
| def get_repeated_labels(locations): | |
| seen_labels = set() | |
| repeated_labels = set() | |
| for item in locations: | |
| label = item[1] | |
| if label in seen_labels: | |
| repeated_labels.add(label) | |
| else: | |
| seen_labels.add(label) | |
| return repeated_labels | |
| def get_cleaned_data(locations): | |
| processed = defaultdict(int) | |
| new_data = [] | |
| if len(locations[0]) == 3: | |
| for coords, label, color in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color)) | |
| if len(locations[0]) == 4: | |
| for coords, label, color, w in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color, w)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color, w)) | |
| if len(locations[0]) == 5: | |
| for coords, label, color, w, h in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color, w, h)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color, w, h)) | |
| return new_data | |
| # law 0.5 maslan tetkatab we law mesh keda yesheel el decimal point | |
| def get_width_info_tobeprinted(new_data): | |
| width_info_tobeprinted = [] | |
| if len(new_data[0]) == 4: | |
| for _,_,_, w in new_data: | |
| #w = re.sub(r",", "", w) | |
| #w = int(float(w)) | |
| width_info_tobeprinted.append(w) | |
| if len(new_data[0]) == 5: | |
| for _,_,_, w,h in new_data: | |
| w = re.sub(r",", "", w) | |
| h = re.sub(r",", "", h) | |
| if float(w).is_integer(): | |
| w = int(float(w)) | |
| else: | |
| w = w | |
| if float(h).is_integer(): | |
| h = int(float(h)) | |
| else: | |
| h = h | |
| width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") | |
| return width_info_tobeprinted | |
| def clean_dimensions(text): | |
| # Remove commas and "mm" | |
| text = re.sub(r'[,\s]*mm', '', text) # Remove "mm" with optional spaces or commas before it | |
| text = text.replace(",", "") # Remove remaining commas if any | |
| return text | |
| def get_cleaned_width(width_info_tobeprinted): | |
| cleaned_width = [] | |
| for w in width_info_tobeprinted: | |
| cleaned_width.append(clean_dimensions(w)) | |
| return cleaned_width | |
| def get_widths_bb_format(cleaned_width, kelma): | |
| pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b" | |
| match = re.search(pattern, kelma) | |
| widths = [] | |
| for widthaa in cleaned_width: | |
| index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x")) | |
| width_name = widthaa[:index] | |
| height_name = widthaa[index+1:] | |
| width_name = int(float(width_name)) | |
| height_name = int(float(height_name)) | |
| if match: | |
| full_text = f"{width_name} mm wide x {height_name} mm high" | |
| else: | |
| full_text = f"{height_name} mm wide x {width_name} mm high" | |
| widths.append(full_text) | |
| return widths | |
| def get_secondary_info(dfs, user_patterns): | |
| selected_columns = [] | |
| selected_columns_new = None # Initialize selected_columns_new to None | |
| for i in range(len(dfs)): | |
| cell_columns_appearance = flexible_search(dfs[i], user_patterns) | |
| cell_matches, col_matches = analyse_cell_columns(cell_columns_appearance) | |
| clmn_name = user_patterns | |
| if len(cell_matches) == 0 and len(col_matches) == 0: | |
| print(f"this is df {i}, SEARCH IN ANOTHER DF") | |
| else: | |
| #IN COLUMNS | |
| if len(col_matches) == len(user_patterns): | |
| column_index_list = get_column_index(col_matches) | |
| print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") | |
| #print(len(clm_idx)) | |
| #details in another table | |
| print(column_index_list) | |
| if len(dfs[i]) <10: | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| #break | |
| #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| selected_columns_new = generate_current_table_without_cropping(column_index_list,dfs[i]) | |
| #break | |
| #IN CELLS | |
| if len(cell_matches) == len(user_patterns): | |
| row_index_list, column_index_list = get_row_column_indices(cell_matches) | |
| print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") | |
| #details in another table | |
| if len(dfs[i]) <10: | |
| #selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) | |
| selected_columns_new = details_in_another_table(clmn_name, column_index_list, dfs[i], dfs) | |
| break | |
| #other_matches = details_in_another_table_mod(clmn_name, clmn_idx, dfs[i], dfs) | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") | |
| selected_columns_new = crop_rename_table(row_index_list, clmn_name, column_index_list,dfs[i]) | |
| break | |
| return selected_columns_new | |
| def get_similar_colors_secondary(selected_columns_new, user_input): | |
| def generate_rgb(): | |
| return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) | |
| unique_keys = selected_columns_new['door_type'].unique() | |
| key_colors = {key: generate_rgb() for key in unique_keys} | |
| # Only exclude actual None values; allow empty string column names if they exist in the DataFrame | |
| extra_fields = [col for col in user_input[4:] if col is not None] | |
| def col_template(): | |
| d = { | |
| 'values': [], | |
| 'color': None | |
| } | |
| if 'structural_opening' in selected_columns_new.columns: | |
| d['widths'] = [] | |
| elif selected_columns_new.shape[1] > 2: | |
| d['widths'] = [] | |
| d['heights'] = [] | |
| for field in extra_fields: | |
| d[field] = [] | |
| return d | |
| col_dict = defaultdict(col_template) | |
| for _, row in selected_columns_new.iterrows(): | |
| key = row['door_type'] | |
| col_dict[key]['values'].append(row['door_id']) | |
| if 'structural_opening' in selected_columns_new.columns: | |
| col_dict[key]['widths'].append(row['structural_opening']) | |
| elif selected_columns_new.shape[1] > 2: | |
| col_dict[key]['widths'].append(row.get('width', 0)) | |
| col_dict[key]['heights'].append(row.get('height', 0)) | |
| for field in extra_fields: | |
| col_dict[key][field].append(row.get(field, None)) | |
| col_dict[key]['color'] = key_colors[key] | |
| return dict(col_dict) | |
| '''def get_width_info_tobeprinted_secondary(new_data, main_info, secondary_info): | |
| width_info_tobeprinted = [] | |
| secondary_info_tobeprinted = [] | |
| if len(main_info) == 2 and len(secondary_info) == 1: | |
| for coords, label, color, acous in new_data: | |
| secondary_info_tobeprinted.append(acous) | |
| if len(main_info) == 2 and len(secondary_info) == 2: | |
| for coords, label, color, acous, fire in new_data: | |
| secondary_info_tobeprinted.append((acous, fire)) | |
| if len(main_info) == 3 and len(secondary_info) == 1: | |
| for coords, label, width, color, acous in new_data: | |
| width_info_tobeprinted.append(width) | |
| secondary_info_tobeprinted.append(acous) | |
| if len(main_info) == 3 and len(secondary_info) == 2: | |
| for coords, label, width, color, acous, fire in new_data: | |
| width_info_tobeprinted.append(width) | |
| secondary_info_tobeprinted.append((acous, fire)) | |
| if len(main_info) == 4 and len(secondary_info) == 1: | |
| for coords, label, width, height, color, acous in new_data: | |
| w = re.sub(r",", "", width) | |
| h = re.sub(r",", "", height) | |
| w = int(float(w)) | |
| h = int(float(h)) | |
| width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") | |
| secondary_info_tobeprinted.append(acous) | |
| if len(main_info) == 4 and len(secondary_info) == 2: | |
| for coords, label, width, height, color, acous, fire in new_data: | |
| w = re.sub(r",", "", width) | |
| h = re.sub(r",", "", height) | |
| w = int(float(w)) | |
| h = int(float(h)) | |
| width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") | |
| secondary_info_tobeprinted.append((acous, fire)) | |
| return width_info_tobeprinted, secondary_info_tobeprinted''' | |
| def get_width_info_tobeprinted_secondary(new_data, main_info, secondary_info): | |
| width_info_tobeprinted = [] | |
| secondary_info_tobeprinted = [] | |
| if len(main_info) == 2 and len(secondary_info) == 1: | |
| for coords, label, acous, color in new_data: | |
| secondary_info_tobeprinted.append(acous) | |
| if len(main_info) == 2 and len(secondary_info) == 2: | |
| for coords, label, acous, fire, color in new_data: | |
| secondary_info_tobeprinted.append((acous, fire)) | |
| if len(main_info) == 3 and len(secondary_info) == 1: | |
| for coords, label, width, acous, color in new_data: | |
| width_info_tobeprinted.append(width) | |
| secondary_info_tobeprinted.append(acous) | |
| if len(main_info) == 3 and len(secondary_info) == 2: | |
| for coords, label, width, acous, fire, color in new_data: | |
| width_info_tobeprinted.append(width) | |
| secondary_info_tobeprinted.append((acous, fire)) | |
| if len(main_info) == 4 and len(secondary_info) == 1: | |
| for coords, label, width, height, acous, color in new_data: | |
| w = re.sub(r",", "", width) | |
| h = re.sub(r",", "", height) | |
| w = int(float(w)) | |
| h = int(float(h)) | |
| width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") | |
| secondary_info_tobeprinted.append(acous) | |
| if len(main_info) == 4 and len(secondary_info) == 2: | |
| for coords, label, width, height, acous, fire, color in new_data: | |
| w = re.sub(r",", "", width) | |
| h = re.sub(r",", "", height) | |
| w = int(float(w)) | |
| h = int(float(h)) | |
| width_info_tobeprinted.append(f"{w} mm wide x {h} mm high") | |
| secondary_info_tobeprinted.append((acous, fire)) | |
| return width_info_tobeprinted, secondary_info_tobeprinted | |
| def get_flattened_tuples_list_SECONDARY(col_dict): | |
| tuples_list = [] | |
| for key, values_dict in col_dict.items(): | |
| # Find actual keys containing "Acoustic" and "Fire" | |
| acoustic_key = next((k for k in values_dict if 'acoustic' in k.lower()), None) | |
| fire_key = next((k for k in values_dict if 'fire' in k.lower()), None) | |
| acoustic_values = values_dict.get(acoustic_key, [None] * len(values_dict['values'])) if acoustic_key else [None] * len(values_dict['values']) | |
| fire_values = values_dict.get(fire_key, [None] * len(values_dict['values'])) if fire_key else [None] * len(values_dict['values']) | |
| if 'heights' in values_dict and 'widths' in values_dict: | |
| tuples_list.append([ | |
| (value, width, height, values_dict["color"], acoustic, fire) | |
| for value, width, height, acoustic, fire in zip( | |
| values_dict['values'], | |
| values_dict['widths'], | |
| values_dict['heights'], | |
| acoustic_values, | |
| fire_values | |
| ) | |
| ]) | |
| elif 'widths' in values_dict: | |
| tuples_list.append([ | |
| (value, width, values_dict["color"], acoustic, fire) | |
| for value, width, acoustic, fire in zip( | |
| values_dict['values'], | |
| values_dict['widths'], | |
| acoustic_values, | |
| fire_values | |
| ) | |
| ]) | |
| else: | |
| tuples_list.append([ | |
| (value, values_dict["color"], acoustic, fire) | |
| for value, acoustic, fire in zip( | |
| values_dict['values'], | |
| acoustic_values, | |
| fire_values | |
| ) | |
| ]) | |
| flattened_list = [item for sublist in tuples_list for item in sublist] | |
| return flattened_list | |
| def get_word_locations_plan_secondary(flattened_list, plan_texts, main_info, secondary_info): | |
| #hena fe 7alet en keda keda fe secondary information | |
| locations = [] | |
| not_found = [] | |
| len_main = len(main_info) #3 or #4 #sometimes maybe 2 | |
| len_secondary = len(secondary_info) #2 or #1 | |
| if len_main == 2 and len_secondary == 2: | |
| for lbl, clr, acoustic, fire in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr, acoustic, fire)) | |
| if len_main == 2 and len_secondary == 1: | |
| for lbl, clr, acoustic in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr, acoustic)) | |
| if len_main == 3 and len_secondary == 2: | |
| for lbl, w, clr, acoustic, fire in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, w, clr, acoustic, fire)) | |
| if len_main == 3 and len_secondary == 1: | |
| for lbl, w, clr, acoustic in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, w, clr, acoustic)) | |
| if len_main == 4 and len_secondary == 2: | |
| for lbl, w, h, clr, acoustic, fire in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, w, h, clr, acoustic, fire)) | |
| if len_main == 4 and len_secondary == 1: | |
| for lbl, w, h, clr, acoustic in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, w, h, clr,acoustic)) | |
| return locations, not_found | |
| ### newest, accept combined table | |
| from collections import defaultdict | |
| import random | |
| def get_similar_colors_all(selected_columns_new): | |
| def generate_rgb(): | |
| return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) | |
| unique_keys = selected_columns_new['door_type'].unique() | |
| key_colors = {key: generate_rgb() for key in unique_keys} | |
| #Column fields | |
| clmns_fields = selected_columns_new.columns.to_list() | |
| def col_template(): | |
| d = { | |
| 'values': [], | |
| 'color': None | |
| } | |
| for field in clmns_fields: | |
| d[field] = [] | |
| return d | |
| col_dict = defaultdict(col_template) | |
| for _, row in selected_columns_new.iterrows(): | |
| key = row['door_type'] | |
| col_dict[key]['values'].append(row['door_id']) | |
| for field in clmns_fields: | |
| col_dict[key][field].append(row.get(field, None)) | |
| col_dict[key]['color'] = key_colors[key] | |
| return dict(col_dict) | |
| ### newest, accept combined table | |
| def get_flattened_tuples_list_all(col_dict): | |
| exclude_fields = ['door_type', 'values'] | |
| flattened_list = [] | |
| for values_dict in col_dict.values(): | |
| # All fields that are lists and not in the excluded fields | |
| list_fields = [k for k, v in values_dict.items() | |
| if isinstance(v, list) and k not in exclude_fields] | |
| n_rows = len(values_dict[list_fields[0]]) if list_fields else 0 | |
| for i in range(n_rows): | |
| tuple_row = tuple(values_dict[field][i] for field in list_fields) + (values_dict['color'],) | |
| flattened_list.append(tuple_row) | |
| return flattened_list | |
| #SECONDARY | |
| def get_cleaned_data_secondary(locations, main_info, secondary_info): | |
| processed = defaultdict(int) | |
| new_data = [] | |
| if len(main_info) == 2 and len(secondary_info) == 1: | |
| for coords, label, color, acous in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color, acous)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color, acous)) | |
| if len(main_info) == 2 and len(secondary_info) == 2: | |
| for coords, label, color, acous, fire in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color, acous, fire)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color, acous, fire)) | |
| if len(main_info) == 3 and len(secondary_info) == 1: | |
| for coords, label, width, color, acous in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, width, color, acous)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, width, color, acous)) | |
| if len(main_info) == 3 and len(secondary_info) == 2: | |
| for coords, label, width, color, acous, fire in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, width, color, acous, fire)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, width, color, acous, fire)) | |
| if len(main_info) == 4 and len(secondary_info) == 1: | |
| for coords, label, width, height, color, acous in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, width, height, color, acous)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, width, height, color, acous)) | |
| if len(main_info) == 4 and len(secondary_info) == 2: | |
| for coords, label, width, height, color, acous, fire in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, width, height, color, acous, fire)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, width, height, color, acous, fire)) | |
| return new_data | |
| from collections import defaultdict | |
| def get_cleaned_data_gpt(locations): | |
| processed = defaultdict(int) | |
| new_data = [] | |
| for entry in locations: | |
| coords = entry[0] | |
| label = entry[1] | |
| index = processed[label] % len(coords) if len(coords) > 1 else 0 | |
| new_coord = [coords[index]] if len(coords) > 1 else coords | |
| processed[label] += 1 if len(coords) > 1 else 0 | |
| # Rebuild the entry with updated coordinates | |
| new_entry = (new_coord,) + entry[1:] | |
| new_data.append(new_entry) | |
| return new_data | |
| '''def get_secondary_tobeprinted_clean(selected_secondary_info, secondary_tobeprinted, secondary_info): | |
| secondary_printed_clean = [] | |
| if len(secondary_info) == 1: | |
| if any('Acoustic' in col for col in selected_secondary_info.columns): | |
| for acous in secondary_tobeprinted: | |
| new_text = f"acoustic rating: {acous}" | |
| secondary_printed_clean.append(new_text) | |
| if any('Fire' in col for col in selected_secondary_info.columns): | |
| for fire in secondary_tobeprinted: | |
| new_text = f"fire rating: {fire}" | |
| secondary_printed_clean.append(new_text) | |
| if len(secondary_info) == 2: | |
| for acous, fire in secondary_tobeprinted: | |
| new_text = f"fire rating: {fire}; acoustic rating: {acous}" | |
| secondary_printed_clean.append(new_text) | |
| print(new_text) | |
| return secondary_printed_clean''' | |
| def get_secondary_tobeprinted_clean(selected_secondary_info, secondary_tobeprinted, secondary_info): | |
| secondary_printed_clean = [] | |
| if len(secondary_info) == 1: | |
| if any('acoustic' in col for col in selected_secondary_info.columns): | |
| for acous in secondary_tobeprinted: | |
| new_text = f"acoustic rating: {acous};" | |
| secondary_printed_clean.append(new_text) | |
| if any('fire' in col for col in selected_secondary_info.columns): | |
| for fire in secondary_tobeprinted: | |
| new_text = f"fire rating: {fire};" | |
| secondary_printed_clean.append(new_text) | |
| if len(secondary_info) == 2: | |
| for fire, acous in secondary_tobeprinted: | |
| new_text = f"fire rating: {fire}; acoustic rating: {acous};" | |
| secondary_printed_clean.append(new_text) | |
| print(new_text) | |
| return secondary_printed_clean | |
| def mix_width_secondary(widths, secondary_printed_clean): | |
| all_print = [] | |
| for i in range(len(widths)): | |
| newest_text = f"{widths[i]}; {secondary_printed_clean[i]}" | |
| all_print.append(newest_text) | |
| return all_print | |
| def add_bluebeam_count_annotations_secondary(pdf_bytes, locations, main_info, secondary_info): | |
| pdf_stream = io.BytesIO(pdf_bytes) # Load PDF from bytes | |
| pdf_document = fitz.open("pdf", pdf_stream.read()) # Open PDF in memory | |
| page = pdf_document[0] # First page | |
| if len(main_info) == 2 and len(secondary_info) == 1: | |
| for loc in locations: | |
| coor, lbl, acous, clr = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| if len(main_info) == 2 and len(secondary_info) == 2: | |
| for loc in locations: | |
| coor, lbl, acous, fire, clr = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| if len(main_info) == 3 and len(secondary_info) == 1: | |
| for loc in locations: | |
| if len(loc) != 5: | |
| continue | |
| coor, lbl, w, acous, clr = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| if len(main_info) == 3 and len(secondary_info) == 2: | |
| for loc in locations: | |
| coor, lbl, w, acous, fire, clr = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| if len(main_info) == 4 and len(secondary_info) == 1: | |
| for loc in locations: | |
| coor, lbl, w, h, acous, clr = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| if len(main_info) == 4 and len(secondary_info) == 2: | |
| for loc in locations: | |
| coor, lbl, w, h, acous, fire, clr = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| #Save modified PDF to a variable instead of a file | |
| output_stream = io.BytesIO() | |
| pdf_document.save(output_stream) | |
| pdf_document.close() | |
| return output_stream.getvalue() # Return the modified PDF as bytes | |
| def get_user_input(user_words): | |
| user_input = [] | |
| for item in user_words: | |
| user_input.append(item[0]) | |
| return user_input | |
| def modify_author_in_pypdf2(pdf_bytes, new_authors): | |
| pdf_stream = io.BytesIO(pdf_bytes) # Load PDF from bytes | |
| reader = PyPDF2.PdfReader(pdf_stream) | |
| writer = PyPDF2.PdfWriter() | |
| author_index = 0 # Track author assignment | |
| for page in reader.pages: | |
| if "/Annots" in page: #Check if annotations exist | |
| for annot in page["/Annots"]: | |
| annot_obj = annot.get_object() | |
| # Assign each annotation a unique author | |
| if len(new_authors) == 0: | |
| break | |
| if author_index < len(new_authors): | |
| annot_obj.update({"/T": TextStringObject(new_authors[author_index])})#Convert to PdfString | |
| author_index += 1 # Move to next author | |
| # If authors list is exhausted, keep the last one | |
| else: | |
| annot_obj.update({"/T": TextStringObject(new_authors[-1])}) | |
| writer.add_page(page) | |
| #Save the modified PDF to a variable | |
| output_stream = io.BytesIO() | |
| writer.write(output_stream) | |
| output_stream.seek(0) | |
| return output_stream.read() | |
| # return output_stream.getvalue() # Return modified PDF as bytes | |
| def process_pdf_secondary(input_pdf_path, output_pdf_path, locations, new_authors, main_info, secondary_info): | |
| #Add Bluebeam-compatible count annotations | |
| annotated_pdf_bytes = add_bluebeam_count_annotations_secondary(input_pdf_path, locations, main_info, secondary_info) | |
| #Modify author field using PyPDF2 | |
| final_pdf_bytes = modify_author_in_pypdf2(annotated_pdf_bytes, new_authors) | |
| return final_pdf_bytes | |
| import fitz # PyMuPDF | |
| import PyPDF2 | |
| import io | |
| from PyPDF2.generic import TextStringObject # ✅ Required for setting string values | |
| '''def add_bluebeam_count_annotations(pdf_bytes, locations): | |
| pdf_stream = io.BytesIO(pdf_bytes) # Load PDF from bytes | |
| pdf_document = fitz.open("pdf", pdf_stream.read()) # Open PDF in memory | |
| page = pdf_document[0] # First page | |
| if len(locations[0]) == 3: | |
| for loc in locations: | |
| coor, lbl, clr = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| if len(locations[0]) == 4: | |
| for loc in locations: | |
| coor, lbl, clr,w = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| if len(locations[0]) == 5: | |
| for loc in locations: | |
| coor, lbl, clr,w,h = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| #Save modified PDF to a variable instead of a file | |
| output_stream = io.BytesIO() | |
| pdf_document.save(output_stream) | |
| pdf_document.close() | |
| return output_stream.getvalue() # Return the modified PDF as bytes | |
| ''' | |
| def add_bluebeam_count_annotations(pdf_bytes, locations): | |
| pdf_stream = io.BytesIO(pdf_bytes) # Load PDF from bytes | |
| pdf_document = fitz.open("pdf", pdf_stream.read()) # Open PDF in memory | |
| page = pdf_document[0] # First page | |
| print(f"length of locations 0 from not sec presence: {len(locations[0])}") | |
| for loc in locations: | |
| if len(loc) == 3: | |
| coor, lbl, clr = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| if len(loc) == 4: | |
| coor, lbl, clr,w = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| if len(loc) == 5: | |
| coor, lbl, clr,w,h = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| #Save modified PDF to a variable instead of a file | |
| output_stream = io.BytesIO() | |
| pdf_document.save(output_stream) | |
| pdf_document.close() | |
| return output_stream.getvalue() # Return the modified PDF as bytes | |
| def get_user_input(user_words): | |
| user_input = [] | |
| for item in user_words: | |
| user_input.append(item[0]) | |
| return user_input | |
| def modify_author_in_pypdf2(pdf_bytes, new_authors): | |
| pdf_stream = io.BytesIO(pdf_bytes) # Load PDF from bytes | |
| reader = PyPDF2.PdfReader(pdf_stream) | |
| writer = PyPDF2.PdfWriter() | |
| author_index = 0 # Track author assignment | |
| for page in reader.pages: | |
| if "/Annots" in page: #Check if annotations exist | |
| for annot in page["/Annots"]: | |
| annot_obj = annot.get_object() | |
| # Assign each annotation a unique author | |
| if len(new_authors) == 0: | |
| break | |
| if author_index < len(new_authors): | |
| annot_obj.update({"/T": TextStringObject(new_authors[author_index])})#Convert to PdfString | |
| author_index += 1 # Move to next author | |
| # If authors list is exhausted, keep the last one | |
| else: | |
| annot_obj.update({"/T": TextStringObject(new_authors[-1])}) | |
| writer.add_page(page) | |
| #Save the modified PDF to a variable | |
| output_stream = io.BytesIO() | |
| writer.write(output_stream) | |
| output_stream.seek(0) | |
| return output_stream.read() | |
| # return output_stream.getvalue() # Return modified PDF as bytes | |
| from PyPDF2 import PdfReader, PdfWriter | |
| def merge_pdf_bytes_list(pdfs): | |
| writer = PdfWriter() | |
| for pdf_bytes in pdfs: | |
| pdf_stream = io.BytesIO(pdf_bytes) | |
| reader = PdfReader(pdf_stream) | |
| for page in reader.pages: | |
| writer.add_page(page) | |
| output_stream = io.BytesIO() | |
| writer.write(output_stream) | |
| output_stream.seek(0) | |
| return output_stream.read() | |
| def process_pdf(input_pdf_path, output_pdf_path, locations, new_authors): | |
| #Load original PDF | |
| # with open(input_pdf_path, "rb") as file: | |
| # original_pdf_bytes = file.read() | |
| #Add Bluebeam-compatible count annotations | |
| annotated_pdf_bytes = add_bluebeam_count_annotations(input_pdf_path, locations) | |
| #Modify author field using PyPDF2 | |
| final_pdf_bytes = modify_author_in_pypdf2(annotated_pdf_bytes, new_authors) | |
| return final_pdf_bytes | |
| # #Save the final modified PDF to disk | |
| # with open(output_pdf_path, "wb") as file: | |
| # file.write(final_pdf_bytes) | |
| def process_pdf_secondary(input_pdf_path, output_pdf_path, locations, new_authors, main_info, secondary_info): | |
| if isinstance(input_pdf_path, bytes): | |
| original_pdf_bytes = input_pdf_path | |
| else: | |
| with open(input_pdf_path, "rb") as file: | |
| original_pdf_bytes = file.read() | |
| #Add Bluebeam-compatible count annotations | |
| annotated_pdf_bytes = add_bluebeam_count_annotations_secondary(original_pdf_bytes, locations, main_info, secondary_info) | |
| #Modify author field using PyPDF2 | |
| final_pdf_bytes = modify_author_in_pypdf2(annotated_pdf_bytes, new_authors) | |
| return final_pdf_bytes | |
| def process_pdf(input_pdf_path, output_pdf_path, locations, new_authors): | |
| #Load original PDF | |
| if isinstance(input_pdf_path, bytes): | |
| original_pdf_bytes = input_pdf_path | |
| else: | |
| with open(input_pdf_path, "rb") as file: | |
| original_pdf_bytes = file.read() | |
| #Add Bluebeam-compatible count annotations | |
| annotated_pdf_bytes = add_bluebeam_count_annotations(original_pdf_bytes, locations) | |
| #Modify author field using PyPDF2 | |
| final_pdf_bytes = modify_author_in_pypdf2(annotated_pdf_bytes, new_authors) | |
| return final_pdf_bytes | |
| def mainRun(schedule, plan, searcharray): | |
| #print(type(plan)) | |
| eltype = type(plan) | |
| print(f"el type beta3 variable plan:: {eltype}") | |
| len_plan = len(plan) | |
| print(f"length of the plan's array is: {len_plan}") | |
| p1_type = type(plan[0]) | |
| print(f"el mawgood fe p[0]: {p1_type}") | |
| print(f"search array: {searcharray}") | |
| dfs = extract_tables(schedule) | |
| pdfs = [] | |
| for p in plan: | |
| all_new_data = [] | |
| all_widths = [] | |
| pdf_outputs = [] | |
| for j in range(len(searcharray)): | |
| user_input = searcharray[j] | |
| secondary_presence = False | |
| if user_input[4] or user_input[5]: | |
| secondary_presence = True | |
| main_info_, secondary_info_ = separate_main_secondary(user_input) | |
| main_info = [item for item in main_info_ if item] | |
| secondary_info = [item for item in secondary_info_ if item] | |
| print("feh secondary information") | |
| if user_input[4]: | |
| print("Fire rate mawgooda") | |
| if user_input[5]: | |
| print("Acoustic Rate mawgooda") | |
| else: | |
| print("mafeesh secondary information") | |
| selected_columns_combined = get_selected_columns_all(dfs, user_input) | |
| kelma = get_st_op_pattern(selected_columns_combined, user_input) | |
| col_dict = get_similar_colors_all(selected_columns_combined) | |
| flattened_list = get_flattened_tuples_list_all(col_dict) | |
| plan_texts = read_text(p) | |
| if secondary_presence: | |
| plan_texts = read_text(p) | |
| locations, not_found = get_word_locations_plan_secondary(flattened_list,plan_texts, main_info, secondary_info) | |
| new_data3 = get_cleaned_data_secondary(locations,main_info,secondary_info) | |
| #Single page annotation | |
| all_new_data.append(new_data3) | |
| repeated_labels = get_repeated_labels(locations) | |
| if kelma == None: | |
| widths, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info) | |
| else: | |
| width_info_tobeprinted, secondary_tobeprinted = get_width_info_tobeprinted_secondary(new_data3, main_info, secondary_info) | |
| cleaned_width = get_cleaned_width(width_info_tobeprinted) | |
| widths = get_widths_bb_format(cleaned_width, kelma) | |
| #Handling schedules without dimensions (width and height) | |
| if selected_columns_combined.shape[1] == 2: | |
| widths = [] | |
| secondary_printed_clean = get_secondary_tobeprinted_clean(selected_columns_combined, secondary_tobeprinted, secondary_info) | |
| all_print = mix_width_secondary(widths, secondary_printed_clean) | |
| #Single page annotation | |
| all_widths.append(all_print) | |
| #flat_list_new_data = [item for sublist in all_new_data for item in sublist] | |
| #flat_list_widths = [item for sublist in all_widths for item in sublist] | |
| if pdf_outputs: | |
| final_pdf_bytes = process_pdf_secondary(pdf_outputs[j-1], "final_output_multiple_input_new2.pdf", all_new_data[j], all_widths[j], main_info, secondary_info) | |
| pdf_outputs.append(final_pdf_bytes) | |
| else: | |
| final_pdf_bytes = process_pdf_secondary(p, "final_output_multiple_input_new2.pdf", all_new_data[j], all_widths[j], main_info, secondary_info) | |
| pdf_outputs.append(final_pdf_bytes) | |
| else: | |
| locations, not_found = get_word_locations_plan(flattened_list,plan_texts) | |
| new_data = get_cleaned_data(locations) | |
| #Single page annotation | |
| all_new_data.append(new_data) | |
| repeated_labels = get_repeated_labels(locations) | |
| if kelma == None: | |
| widths = get_width_info_tobeprinted(new_data) | |
| else: | |
| width_info_tobeprinted = get_width_info_tobeprinted(new_data) | |
| cleaned_width = get_cleaned_width(width_info_tobeprinted) | |
| widths = get_widths_bb_format(cleaned_width, kelma) | |
| #Handling schedules without dimensions (width and height) | |
| if selected_columns_combined.shape[1] == 2: | |
| widths = [] | |
| #Single page annotation | |
| all_widths.append(widths) | |
| flat_list_new_data = [item for sublist in all_new_data for item in sublist] | |
| flat_list_widths = [item for sublist in all_widths for item in sublist] | |
| if pdf_outputs: | |
| final_pdf_bytes = process_pdf(pdf_outputs[j-1], "final_output_width_trial.pdf", all_new_data[j], all_widths[j]) | |
| pdf_outputs.append(final_pdf_bytes) | |
| else: | |
| final_pdf_bytes = process_pdf(p, "final_output_width_trial.pdf", all_new_data[j], all_widths[j]) | |
| pdf_outputs.append(final_pdf_bytes) | |
| pdfs.append(final_pdf_bytes) | |
| merged_pdf = merge_pdf_bytes_list(pdfs) | |
| print(f"number of pges of merged_pdf is {len(merged_pdf)} and its type is {type(merged_pdf)}") | |
| not_found = [] | |
| doc2 =fitz.open('pdf',merged_pdf) | |
| len_doc2 = len(doc2) | |
| print(f"number of pges of doc2 is {len_doc2} and its type is {type(doc2)}") | |
| page=doc2[0] | |
| pix = page.get_pixmap() # render page to an image | |
| pl=Image.frombytes('RGB', [pix.width,pix.height],pix.samples) | |
| img=np.array(pl) | |
| annotatedimg = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| list1=pd.DataFrame(columns=['content', 'id', 'subject','color']) | |
| # for page in doc: | |
| for page in doc2: | |
| # Iterate through annotations on the page | |
| for annot in page.annots(): | |
| # Get the color of the annotation | |
| annot_color = annot.colors | |
| if annot_color is not None: | |
| # annot_color is a dictionary with 'stroke' and 'fill' keys | |
| stroke_color = annot_color.get('stroke') # Border color | |
| fill_color = annot_color.get('fill') # Fill color | |
| if fill_color: | |
| v='fill' | |
| # print('fill') | |
| if stroke_color: | |
| v='stroke' | |
| x,y,z=int(annot_color.get(v)[0]*255),int(annot_color.get(v)[1]*255),int(annot_color.get(v)[2]*255) | |
| list1.loc[len(list1)] =[annot.info['content'],annot.info['id'],annot.info['subject'],[x,y,z]] | |
| return annotatedimg, doc2 , list1, repeated_labels , not_found | |