Spaces:
Sleeping
Sleeping
| """ | |
| Utilities module for DigiTwin Analytics | |
| Contains common functions, decorators, and data processing utilities | |
| """ | |
| import logging | |
| import pandas as pd | |
| from functools import wraps | |
| from PyPDF2 import PdfReader | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain.schema import Document as LCDocument | |
| import streamlit as st | |
| from config import ( | |
| NI_keywords, NC_keywords, module_keywords, rack_keywords, | |
| living_quarters_keywords, flare_keywords, fwd_keywords, hexagons_keywords, | |
| NI_keyword_map, NC_keyword_map | |
| ) | |
| import matplotlib.patches as patches | |
| import math | |
| import matplotlib.transforms as transforms | |
| # PAZ-specific keywords for data processing | |
| paz_module_keywords = ['P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6', 'S7', 'S8'] | |
| paz_rack_keywords = ['R1', 'R2', 'R3', 'R4', 'R5', 'R6'] | |
| # PAZ keyword mapping for preprocessing | |
| paz_keyword_map = { | |
| 'P1': 'P1', 'P2': 'P2', 'P3': 'P3', 'P4': 'P4', 'P5': 'P5', 'P6': 'P6', 'P7': 'P7', 'P8': 'P8', | |
| 'S1': 'S1', 'S2': 'S2', 'S3': 'S3', 'S4': 'S4', 'S5': 'S5', 'S6': 'S6', 'S7': 'S7', 'S8': 'S8', | |
| 'R1': 'R1', 'R2': 'R2', 'R3': 'R3', 'R4': 'R4', 'R5': 'R5', 'R6': 'R6' | |
| } | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --- DECORATORS --- | |
| def log_execution(func): | |
| """Decorator to log function execution for debugging""" | |
| def wrapper(*args, **kwargs): | |
| logger.info(f"Executing {func.__name__} with args: {args}, kwargs: {kwargs}") | |
| try: | |
| result = func(*args, **kwargs) | |
| logger.info(f"{func.__name__} executed successfully") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in {func.__name__}: {str(e)}") | |
| raise | |
| return wrapper | |
| # --- DATA PROCESSING FUNCTIONS --- | |
| def parse_pdf(file): | |
| """Parse PDF file and extract text content""" | |
| reader = PdfReader(file) | |
| return "\n".join([page.extract_text() for page in reader.pages if page.extract_text()]) | |
| def build_faiss_vectorstore(_docs): | |
| """Build FAISS vectorstore from documents with caching""" | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| chunks = [] | |
| for i, doc in enumerate(_docs): | |
| for chunk in splitter.split_text(doc.page_content): | |
| chunks.append(LCDocument(page_content=chunk, metadata={"source": f"doc_{i}"})) | |
| return FAISS.from_documents(chunks, embeddings) | |
| def preprocess_keywords(description): | |
| """Preprocess description text for keyword extraction""" | |
| description = str(description).upper() | |
| for lq_variant in living_quarters_keywords: | |
| if lq_variant != 'LQ': | |
| description = description.replace(lq_variant, 'LQ') | |
| # Handle CLV module keywords | |
| for module in module_keywords: | |
| number = module[1:] | |
| if number in description: | |
| description = description.replace(number, module) | |
| # Handle PAZ module keywords | |
| for module in paz_module_keywords: | |
| if module in description: | |
| description = description.replace(module, module) | |
| # Handle PAZ rack keywords | |
| for rack in paz_rack_keywords: | |
| if rack in description: | |
| description = description.replace(rack, rack) | |
| for original, grouped in {**NI_keyword_map, **NC_keyword_map}.items(): | |
| description = description.replace(original, grouped) | |
| return description | |
| def extract_ni_nc_keywords(row, notif_type_col, desc_col): | |
| """Extract NI/NC keywords from notification row""" | |
| description = preprocess_keywords(row[desc_col]) | |
| notif_type = row[notif_type_col] | |
| keywords = [kw for kw in (NI_keywords if notif_type == 'NI' else NC_keywords) if kw in description] | |
| return ', '.join(keywords) if keywords else 'None' | |
| def extract_location_keywords(row, desc_col, keyword_list): | |
| """Extract location keywords from notification row""" | |
| description = preprocess_keywords(row[desc_col]) | |
| if keyword_list == living_quarters_keywords: | |
| return 'LQ' if any(kw in description for kw in living_quarters_keywords) else 'None' | |
| locations = [kw for kw in keyword_list if kw in description] | |
| return ', '.join(locations) if locations else 'None' | |
| def create_pivot_table(df, index, columns, aggfunc='size', fill_value=0): | |
| """Create pivot table from dataframe""" | |
| df_exploded = df.assign(Keywords=df[columns].str.split(', ')).explode('Keywords') | |
| df_exploded = df_exploded[df_exploded['Keywords'] != 'None'] | |
| pivot = pd.pivot_table(df_exploded, index=index, columns='Keywords', aggfunc=aggfunc, fill_value=fill_value) | |
| return pivot | |
| def apply_fpso_colors(df): | |
| """Apply color styling to FPSO dataframe""" | |
| styles = pd.DataFrame('', index=df.index, columns=df.columns) | |
| color_map = {'GIR': '#FFA07A', 'DAL': '#ADD8E6', 'PAZ': '#D8BFD8', 'CLV': '#90EE90'} | |
| for fpso, color in color_map.items(): | |
| if fpso in df.index: | |
| styles.loc[fpso] = f'background-color: {color}' | |
| return styles | |
| def process_uploaded_files(files): | |
| """Process uploaded files and return PDF documents and Excel dataframe""" | |
| pdf_files = [f for f in files if f.type == "application/pdf"] | |
| excel_files = [f for f in files if f.type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"] | |
| # Process PDF files | |
| parsed_docs = [] | |
| if pdf_files: | |
| parsed_docs = [LCDocument(page_content=parse_pdf(f), metadata={"name": f.name}) for f in pdf_files] | |
| st.sidebar.success(f"{len(parsed_docs)} PDF reports indexed.") | |
| # Process Excel files | |
| df = None | |
| if excel_files: | |
| try: | |
| # Use the first Excel file if multiple are uploaded | |
| uploaded_xlsx = excel_files[0] | |
| df = pd.read_excel(uploaded_xlsx, sheet_name='Global Notifications') | |
| df.columns = df.columns.str.strip() | |
| expected_columns = { | |
| 'Notifictn type': 'Notifictn type', | |
| 'Created on': 'Created on', | |
| 'Description': 'Description', | |
| 'FPSO': 'FPSO' | |
| } | |
| missing_columns = [col for col in expected_columns.values() if col not in df.columns] | |
| if missing_columns: | |
| st.error(f"Missing columns: {missing_columns}") | |
| return parsed_docs, None | |
| df = df[list(expected_columns.values())] | |
| df.columns = list(expected_columns.keys()) | |
| df = df[df['FPSO'].isin(['GIR', 'DAL', 'PAZ', 'CLV'])] | |
| df['Extracted_Keywords'] = df.apply(extract_ni_nc_keywords, axis=1, args=('Notifictn type', 'Description')) | |
| for loc_type, keywords in [ | |
| ('Modules', module_keywords + paz_module_keywords), ('Racks', rack_keywords + paz_rack_keywords), ('LivingQuarters', living_quarters_keywords), | |
| ('Flare', flare_keywords), ('FWD', fwd_keywords), ('HeliDeck', hexagons_keywords) | |
| ]: | |
| df[f'Extracted_{loc_type}'] = df.apply(extract_location_keywords, axis=1, args=('Description', keywords)) | |
| st.sidebar.success("Excel file processed successfully.") | |
| except Exception as e: | |
| st.error(f"Error processing Excel: {e}") | |
| return parsed_docs, None | |
| return parsed_docs, df | |
| def add_rectangle(ax, xy, width, height, **kwargs): | |
| rectangle = patches.Rectangle(xy, width, height, **kwargs) | |
| ax.add_patch(rectangle) | |
| def add_chamfered_rectangle(ax, xy, width, height, chamfer, **kwargs): | |
| x, y = xy | |
| coords = [ | |
| (x + chamfer, y), | |
| (x + width - chamfer, y), | |
| (x + width, y + chamfer), | |
| (x + width, y + height - chamfer), | |
| (x + width - chamfer, y + height), | |
| (x + chamfer, y + height), | |
| (x, y + height - chamfer), | |
| (x, y + chamfer) | |
| ] | |
| polygon = patches.Polygon(coords, closed=True, **kwargs) | |
| ax.add_patch(polygon) | |
| def add_hexagon(ax, xy, radius, **kwargs): | |
| x, y = xy | |
| vertices = [(x + radius * math.cos(2 * math.pi * n / 6), y + radius * math.sin(2 * math.pi * n / 6)) for n in range(6)] | |
| hexagon = patches.Polygon(vertices, closed=True, **kwargs) | |
| ax.add_patch(hexagon) | |
| def add_fwd(ax, xy, width, height, **kwargs): | |
| x, y = xy | |
| top_width = width * 0.80 | |
| coords = [ | |
| (0, 0), | |
| (width, 0), | |
| (width - (width - top_width) / 2, height), | |
| ((width - top_width) / 2, height) | |
| ] | |
| trapezoid = patches.Polygon(coords, closed=True, **kwargs) | |
| t = transforms.Affine2D().rotate_deg(90).translate(x, y) | |
| trapezoid.set_transform(t + ax.transData) | |
| ax.add_patch(trapezoid) | |
| text_t = transforms.Affine2D().rotate_deg(90).translate(x + height / 2, y + width / 2) | |
| ax.text(0, -1, "FWD", ha='center', va='center', fontsize=7, weight='bold', transform=text_t + ax.transData) |