kaburia's picture
normalise
b0fa464
from typing import Dict, Any
# import sparknlp
# from sparknlp.base import DocumentAssembler, Pipeline
# from sparknlp.annotator import MultiDateMatcher
# from pyspark.sql.types import StringType
from langchain_community.embeddings import HuggingFaceEmbeddings
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import re
import gradio as gr
import spacy
import dateparser
# Initialize Spark NLP
# spark = sparknlp.start()
# Spark NLP Pipeline for Date Extraction
# document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
# multi_date_matcher = MultiDateMatcher().setInputCols("document").setOutputCol("dates").setOutputFormat("yyyy/MM/dd")
# nlpPipeline = Pipeline(stages=[document_assembler, multi_date_matcher])
# Embeddings setup
embedder = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
PROBLEM_LABELS = ["battery problem", "communication problem", "sensor failure", "general maintenance"]
SENSORS = SENSORS = [
"Atmospheric pressure", "Depth of water", "Electrical conductivity of precipitation",
"Electrical conductivity of water", "Lightning distance", "Lightning events",
"Shortwave radiation", "Soil moisture content", "Soil temperature",
"Surface air temperature", "Vapor pressure", "Wind gusts", "Wind speed",
"Temperature of humidity sensor", "X-axis level", "Y-axis level",
"Logger battery percentage", "Logger reference pressure", "Logger temperature",
"Cumulative precipitation", "Water level", "Water velocity", "Precipitation",
"Relative humidity", "Wind direction", "Soil electrical conductivity",
"Water temperature", "Water discharge", "Matric potential",
"Precipitation drop count", "Precipitation tip count", "Tilt angle",
] # simplified list
problem_embs = np.array([embedder.embed_query(lbl) for lbl in PROBLEM_LABELS])
sensor_embs = np.array([embedder.embed_query(sensor) for sensor in SENSORS])
# Helper Functions
def softmax(x):
e_x = np.exp(x - np.max(x))
return e_x / e_x.sum()
def normalize_scores(scores: np.ndarray) -> np.ndarray:
"""Normalize scores to [0, 1] range."""
min_s, max_s = np.min(scores), np.max(scores)
if max_s == min_s: # Avoid division by zero
return np.zeros_like(scores)
return (scores - min_s) / (max_s - min_s)
def classify_log(text: str) -> Dict[str, Any]:
vec = np.array(embedder.embed_query(text))
# Compute cosine similarities
problem_sim = cosine_similarity([vec], problem_embs)[0]
sensor_sim = cosine_similarity([vec], sensor_embs)[0]
# Normalize similarities to [0, 1]
problem_scores = normalize_scores(problem_sim)
sensor_scores = normalize_scores(sensor_sim)
threshold = 0.8 # similarity threshold
# Select labels above threshold
problem_labels = [PROBLEM_LABELS[i] for i, score in enumerate(problem_scores) if score >= threshold]
sensor_labels = [SENSORS[i] for i, score in enumerate(sensor_scores) if score >= threshold]
# If no label passes threshold, fallback to top-1
if not problem_labels:
problem_labels = [PROBLEM_LABELS[np.argmax(problem_scores)]]
if not sensor_labels:
sensor_labels = [SENSORS[np.argmax(sensor_scores)]]
# Flags
flags = []
if re.search(r"(low|under|below|blocked|clogged|zero)", text, re.I):
flags.append("pr-1")
if re.search(r"(high|over|exceed|too much)", text, re.I):
flags.append("pr-2")
return {
"problem_labels": problem_labels,
"sensor_labels": sensor_labels,
"flags": flags
}
def date_extractor(text):
date_label = ['DATE']
nlp = spacy.load('en_core_web_lg')
doc = nlp(text)
dates_pattern = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
dates = set(ent.text for ent in doc.ents if ent.label_ in date_label)
filtered_dates = set(date for date in dates if not dates_pattern.match(date))
# Parse dates using dateparser
parsed_dates = []
for date_str in filtered_dates:
parsed = dateparser.parse(date_str, settings={"PREFER_DAY_OF_MONTH": "first"})
if parsed:
# If the original string does not specify a day, force day=1
if not re.search(r"\b\d{1,2}\b", date_str):
parsed = parsed.replace(day=1)
parsed_dates.append(parsed.strftime('%Y-%m-%d'))
sorted_dates = sorted(parsed_dates, reverse=True)
return sorted_dates
# Unified function to process logs
def process_log(input_text: str) -> Dict[str, Any]:
extracted_dates = date_extractor(input_text)
classification = classify_log(input_text)
classification["extracted_dates"] = extracted_dates
return classification
# def process_log(input_text: str) -> Dict[str, Any]:
# spark_df = spark.createDataFrame([input_text], StringType()).toDF("text")
# result = nlpPipeline.fit(spark_df).transform(spark_df)
# extracted_dates = result.selectExpr("dates.result as extracted_dates").collect()[0][0]
# classification = classify_log(input_text)
# classification["extracted_dates"] = extracted_dates
# return classification
# Gradio Interface
demo = gr.Interface(
fn=process_log,
inputs=gr.Textbox(lines=5, label="Technician Log", placeholder="Enter technician log here..."),
outputs=gr.JSON(label="Structured Output"),
title="Technician Log Analyzer",
description="Analyze technician logs to extract dates and classify problems and sensors."
)
if __name__ == "__main__":
demo.launch(debug=True)