Spaces:
Paused
Paused
| from typing import Dict, Any | |
| # import sparknlp | |
| # from sparknlp.base import DocumentAssembler, Pipeline | |
| # from sparknlp.annotator import MultiDateMatcher | |
| # from pyspark.sql.types import StringType | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import re | |
| import gradio as gr | |
| import spacy | |
| import dateparser | |
| # Initialize Spark NLP | |
| # spark = sparknlp.start() | |
| # Spark NLP Pipeline for Date Extraction | |
| # document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document") | |
| # multi_date_matcher = MultiDateMatcher().setInputCols("document").setOutputCol("dates").setOutputFormat("yyyy/MM/dd") | |
| # nlpPipeline = Pipeline(stages=[document_assembler, multi_date_matcher]) | |
| # Embeddings setup | |
| embedder = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| PROBLEM_LABELS = ["battery problem", "communication problem", "sensor failure", "general maintenance"] | |
| SENSORS = SENSORS = [ | |
| "Atmospheric pressure", "Depth of water", "Electrical conductivity of precipitation", | |
| "Electrical conductivity of water", "Lightning distance", "Lightning events", | |
| "Shortwave radiation", "Soil moisture content", "Soil temperature", | |
| "Surface air temperature", "Vapor pressure", "Wind gusts", "Wind speed", | |
| "Temperature of humidity sensor", "X-axis level", "Y-axis level", | |
| "Logger battery percentage", "Logger reference pressure", "Logger temperature", | |
| "Cumulative precipitation", "Water level", "Water velocity", "Precipitation", | |
| "Relative humidity", "Wind direction", "Soil electrical conductivity", | |
| "Water temperature", "Water discharge", "Matric potential", | |
| "Precipitation drop count", "Precipitation tip count", "Tilt angle", | |
| ] # simplified list | |
| problem_embs = np.array([embedder.embed_query(lbl) for lbl in PROBLEM_LABELS]) | |
| sensor_embs = np.array([embedder.embed_query(sensor) for sensor in SENSORS]) | |
| # Helper Functions | |
| def softmax(x): | |
| e_x = np.exp(x - np.max(x)) | |
| return e_x / e_x.sum() | |
| def normalize_scores(scores: np.ndarray) -> np.ndarray: | |
| """Normalize scores to [0, 1] range.""" | |
| min_s, max_s = np.min(scores), np.max(scores) | |
| if max_s == min_s: # Avoid division by zero | |
| return np.zeros_like(scores) | |
| return (scores - min_s) / (max_s - min_s) | |
| def classify_log(text: str) -> Dict[str, Any]: | |
| vec = np.array(embedder.embed_query(text)) | |
| # Compute cosine similarities | |
| problem_sim = cosine_similarity([vec], problem_embs)[0] | |
| sensor_sim = cosine_similarity([vec], sensor_embs)[0] | |
| # Normalize similarities to [0, 1] | |
| problem_scores = normalize_scores(problem_sim) | |
| sensor_scores = normalize_scores(sensor_sim) | |
| threshold = 0.8 # similarity threshold | |
| # Select labels above threshold | |
| problem_labels = [PROBLEM_LABELS[i] for i, score in enumerate(problem_scores) if score >= threshold] | |
| sensor_labels = [SENSORS[i] for i, score in enumerate(sensor_scores) if score >= threshold] | |
| # If no label passes threshold, fallback to top-1 | |
| if not problem_labels: | |
| problem_labels = [PROBLEM_LABELS[np.argmax(problem_scores)]] | |
| if not sensor_labels: | |
| sensor_labels = [SENSORS[np.argmax(sensor_scores)]] | |
| # Flags | |
| flags = [] | |
| if re.search(r"(low|under|below|blocked|clogged|zero)", text, re.I): | |
| flags.append("pr-1") | |
| if re.search(r"(high|over|exceed|too much)", text, re.I): | |
| flags.append("pr-2") | |
| return { | |
| "problem_labels": problem_labels, | |
| "sensor_labels": sensor_labels, | |
| "flags": flags | |
| } | |
| def date_extractor(text): | |
| date_label = ['DATE'] | |
| nlp = spacy.load('en_core_web_lg') | |
| doc = nlp(text) | |
| dates_pattern = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})') | |
| dates = set(ent.text for ent in doc.ents if ent.label_ in date_label) | |
| filtered_dates = set(date for date in dates if not dates_pattern.match(date)) | |
| # Parse dates using dateparser | |
| parsed_dates = [] | |
| for date_str in filtered_dates: | |
| parsed = dateparser.parse(date_str, settings={"PREFER_DAY_OF_MONTH": "first"}) | |
| if parsed: | |
| # If the original string does not specify a day, force day=1 | |
| if not re.search(r"\b\d{1,2}\b", date_str): | |
| parsed = parsed.replace(day=1) | |
| parsed_dates.append(parsed.strftime('%Y-%m-%d')) | |
| sorted_dates = sorted(parsed_dates, reverse=True) | |
| return sorted_dates | |
| # Unified function to process logs | |
| def process_log(input_text: str) -> Dict[str, Any]: | |
| extracted_dates = date_extractor(input_text) | |
| classification = classify_log(input_text) | |
| classification["extracted_dates"] = extracted_dates | |
| return classification | |
| # def process_log(input_text: str) -> Dict[str, Any]: | |
| # spark_df = spark.createDataFrame([input_text], StringType()).toDF("text") | |
| # result = nlpPipeline.fit(spark_df).transform(spark_df) | |
| # extracted_dates = result.selectExpr("dates.result as extracted_dates").collect()[0][0] | |
| # classification = classify_log(input_text) | |
| # classification["extracted_dates"] = extracted_dates | |
| # return classification | |
| # Gradio Interface | |
| demo = gr.Interface( | |
| fn=process_log, | |
| inputs=gr.Textbox(lines=5, label="Technician Log", placeholder="Enter technician log here..."), | |
| outputs=gr.JSON(label="Structured Output"), | |
| title="Technician Log Analyzer", | |
| description="Analyze technician logs to extract dates and classify problems and sensors." | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True) | |