inspekta_deck / src /utils.py
amiguel's picture
Upload 13 files
29104c7 verified
"""
Utilities module for DigiTwin Analytics
Contains common functions, decorators, and data processing utilities
"""
import logging
import pandas as pd
from functools import wraps
from PyPDF2 import PdfReader
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.schema import Document as LCDocument
import streamlit as st
from config import (
NI_keywords, NC_keywords, module_keywords, rack_keywords,
living_quarters_keywords, flare_keywords, fwd_keywords, hexagons_keywords,
NI_keyword_map, NC_keyword_map
)
import matplotlib.patches as patches
import math
import matplotlib.transforms as transforms
# PAZ-specific keywords for data processing
paz_module_keywords = ['P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', 'S1', 'S2', 'S3', 'S4', 'S5', 'S6', 'S7', 'S8']
paz_rack_keywords = ['R1', 'R2', 'R3', 'R4', 'R5', 'R6']
# PAZ keyword mapping for preprocessing
paz_keyword_map = {
'P1': 'P1', 'P2': 'P2', 'P3': 'P3', 'P4': 'P4', 'P5': 'P5', 'P6': 'P6', 'P7': 'P7', 'P8': 'P8',
'S1': 'S1', 'S2': 'S2', 'S3': 'S3', 'S4': 'S4', 'S5': 'S5', 'S6': 'S6', 'S7': 'S7', 'S8': 'S8',
'R1': 'R1', 'R2': 'R2', 'R3': 'R3', 'R4': 'R4', 'R5': 'R5', 'R6': 'R6'
}
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- DECORATORS ---
def log_execution(func):
"""Decorator to log function execution for debugging"""
@wraps(func)
def wrapper(*args, **kwargs):
logger.info(f"Executing {func.__name__} with args: {args}, kwargs: {kwargs}")
try:
result = func(*args, **kwargs)
logger.info(f"{func.__name__} executed successfully")
return result
except Exception as e:
logger.error(f"Error in {func.__name__}: {str(e)}")
raise
return wrapper
# --- DATA PROCESSING FUNCTIONS ---
@log_execution
def parse_pdf(file):
"""Parse PDF file and extract text content"""
reader = PdfReader(file)
return "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])
@st.cache_resource
def build_faiss_vectorstore(_docs):
"""Build FAISS vectorstore from documents with caching"""
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = []
for i, doc in enumerate(_docs):
for chunk in splitter.split_text(doc.page_content):
chunks.append(LCDocument(page_content=chunk, metadata={"source": f"doc_{i}"}))
return FAISS.from_documents(chunks, embeddings)
@log_execution
def preprocess_keywords(description):
"""Preprocess description text for keyword extraction"""
description = str(description).upper()
for lq_variant in living_quarters_keywords:
if lq_variant != 'LQ':
description = description.replace(lq_variant, 'LQ')
# Handle CLV module keywords
for module in module_keywords:
number = module[1:]
if number in description:
description = description.replace(number, module)
# Handle PAZ module keywords
for module in paz_module_keywords:
if module in description:
description = description.replace(module, module)
# Handle PAZ rack keywords
for rack in paz_rack_keywords:
if rack in description:
description = description.replace(rack, rack)
for original, grouped in {**NI_keyword_map, **NC_keyword_map}.items():
description = description.replace(original, grouped)
return description
@log_execution
def extract_ni_nc_keywords(row, notif_type_col, desc_col):
"""Extract NI/NC keywords from notification row"""
description = preprocess_keywords(row[desc_col])
notif_type = row[notif_type_col]
keywords = [kw for kw in (NI_keywords if notif_type == 'NI' else NC_keywords) if kw in description]
return ', '.join(keywords) if keywords else 'None'
@log_execution
def extract_location_keywords(row, desc_col, keyword_list):
"""Extract location keywords from notification row"""
description = preprocess_keywords(row[desc_col])
if keyword_list == living_quarters_keywords:
return 'LQ' if any(kw in description for kw in living_quarters_keywords) else 'None'
locations = [kw for kw in keyword_list if kw in description]
return ', '.join(locations) if locations else 'None'
@log_execution
def create_pivot_table(df, index, columns, aggfunc='size', fill_value=0):
"""Create pivot table from dataframe"""
df_exploded = df.assign(Keywords=df[columns].str.split(', ')).explode('Keywords')
df_exploded = df_exploded[df_exploded['Keywords'] != 'None']
pivot = pd.pivot_table(df_exploded, index=index, columns='Keywords', aggfunc=aggfunc, fill_value=fill_value)
return pivot
@log_execution
def apply_fpso_colors(df):
"""Apply color styling to FPSO dataframe"""
styles = pd.DataFrame('', index=df.index, columns=df.columns)
color_map = {'GIR': '#FFA07A', 'DAL': '#ADD8E6', 'PAZ': '#D8BFD8', 'CLV': '#90EE90'}
for fpso, color in color_map.items():
if fpso in df.index:
styles.loc[fpso] = f'background-color: {color}'
return styles
@log_execution
def process_uploaded_files(files):
"""Process uploaded files and return PDF documents and Excel dataframe"""
pdf_files = [f for f in files if f.type == "application/pdf"]
excel_files = [f for f in files if f.type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"]
# Process PDF files
parsed_docs = []
if pdf_files:
parsed_docs = [LCDocument(page_content=parse_pdf(f), metadata={"name": f.name}) for f in pdf_files]
st.sidebar.success(f"{len(parsed_docs)} PDF reports indexed.")
# Process Excel files
df = None
if excel_files:
try:
# Use the first Excel file if multiple are uploaded
uploaded_xlsx = excel_files[0]
df = pd.read_excel(uploaded_xlsx, sheet_name='Global Notifications')
df.columns = df.columns.str.strip()
expected_columns = {
'Notifictn type': 'Notifictn type',
'Created on': 'Created on',
'Description': 'Description',
'FPSO': 'FPSO'
}
missing_columns = [col for col in expected_columns.values() if col not in df.columns]
if missing_columns:
st.error(f"Missing columns: {missing_columns}")
return parsed_docs, None
df = df[list(expected_columns.values())]
df.columns = list(expected_columns.keys())
df = df[df['FPSO'].isin(['GIR', 'DAL', 'PAZ', 'CLV'])]
df['Extracted_Keywords'] = df.apply(extract_ni_nc_keywords, axis=1, args=('Notifictn type', 'Description'))
for loc_type, keywords in [
('Modules', module_keywords + paz_module_keywords), ('Racks', rack_keywords + paz_rack_keywords), ('LivingQuarters', living_quarters_keywords),
('Flare', flare_keywords), ('FWD', fwd_keywords), ('HeliDeck', hexagons_keywords)
]:
df[f'Extracted_{loc_type}'] = df.apply(extract_location_keywords, axis=1, args=('Description', keywords))
st.sidebar.success("Excel file processed successfully.")
except Exception as e:
st.error(f"Error processing Excel: {e}")
return parsed_docs, None
return parsed_docs, df
def add_rectangle(ax, xy, width, height, **kwargs):
rectangle = patches.Rectangle(xy, width, height, **kwargs)
ax.add_patch(rectangle)
def add_chamfered_rectangle(ax, xy, width, height, chamfer, **kwargs):
x, y = xy
coords = [
(x + chamfer, y),
(x + width - chamfer, y),
(x + width, y + chamfer),
(x + width, y + height - chamfer),
(x + width - chamfer, y + height),
(x + chamfer, y + height),
(x, y + height - chamfer),
(x, y + chamfer)
]
polygon = patches.Polygon(coords, closed=True, **kwargs)
ax.add_patch(polygon)
def add_hexagon(ax, xy, radius, **kwargs):
x, y = xy
vertices = [(x + radius * math.cos(2 * math.pi * n / 6), y + radius * math.sin(2 * math.pi * n / 6)) for n in range(6)]
hexagon = patches.Polygon(vertices, closed=True, **kwargs)
ax.add_patch(hexagon)
def add_fwd(ax, xy, width, height, **kwargs):
x, y = xy
top_width = width * 0.80
coords = [
(0, 0),
(width, 0),
(width - (width - top_width) / 2, height),
((width - top_width) / 2, height)
]
trapezoid = patches.Polygon(coords, closed=True, **kwargs)
t = transforms.Affine2D().rotate_deg(90).translate(x, y)
trapezoid.set_transform(t + ax.transData)
ax.add_patch(trapezoid)
text_t = transforms.Affine2D().rotate_deg(90).translate(x + height / 2, y + width / 2)
ax.text(0, -1, "FWD", ha='center', va='center', fontsize=7, weight='bold', transform=text_t + ax.transData)