Spaces:
Sleeping
Sleeping
| from langchain_mongodb import MongoDBAtlasVectorSearch | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| # from dotenv import load_dotenv | |
| import os | |
| import pymongo | |
| import logging | |
| import nest_asyncio | |
| from langchain.docstore.document import Document | |
| import redis | |
| import threading | |
| import asyncio | |
| import gradio as gr | |
| import pandas as pd | |
| import plotly.express as px | |
| from datetime import datetime, timedelta | |
| # config | |
| # nest_asyncio.apply() | |
| logging.basicConfig(level = logging.INFO) | |
| database = "AlertSimAndRemediation" | |
| collection = "alert_embed" | |
| stream_name = "alerts" | |
| # Global variables to store alert information | |
| latest_alert = "No alerts yet." | |
| alert_count = 0 | |
| # embedding model | |
| embedding_args = { | |
| "model_name" : "BAAI/bge-large-en-v1.5", | |
| "model_kwargs" : {"device": "cpu"}, | |
| "encode_kwargs" : {"normalize_embeddings": True} | |
| } | |
| embedding_model = HuggingFaceEmbeddings(**embedding_args) | |
| # Mongo Connection | |
| connection = pymongo.MongoClient(os.environ["MONGO_URI"]) | |
| alert_collection = connection[database][collection] | |
| # Redis connection | |
| r = redis.Redis(host=os.environ['REDIS_HOST'], password=os.environ['REDIS_PWD'], port=16652) | |
| # Preprocessing | |
| async def create_textual_description(entry_data): | |
| entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()} | |
| category = entry_dict["Category"] | |
| created_at = entry_dict["CreatedAt"] | |
| acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged" | |
| remedy = entry_dict["Remedy"] | |
| severity = entry_dict["Severity"] | |
| source = entry_dict["Source"] | |
| node = entry_dict["node"] | |
| description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}." | |
| return description, entry_dict | |
| # Saving alert doc | |
| async def save(entry): | |
| vector_search = MongoDBAtlasVectorSearch.from_documents( | |
| documents=[Document( | |
| page_content=entry["content"], | |
| metadata=entry["metadata"] | |
| )], | |
| embedding=embedding_model, | |
| collection=alert_collection, | |
| index_name="alert_index", | |
| ) | |
| logging.info("Alerts stored successfully!") | |
| # Listening to alert stream | |
| async def listen_to_alerts(r): | |
| global latest_alert, alert_count | |
| try: | |
| last_id = '$' | |
| while True: | |
| entries = r.xread({stream_name: last_id}, block=0, count=None) | |
| if entries: | |
| stream, new_entries = entries[0] | |
| for entry_id, entry_data in new_entries: | |
| description, entry_dict = await create_textual_description(entry_data) | |
| await save({ | |
| "content" : description, | |
| "metadata" : entry_dict | |
| }) | |
| print(description) | |
| latest_alert = description | |
| alert_count += 1 | |
| # Update the last ID read | |
| last_id = entry_id | |
| await asyncio.sleep(1) | |
| except KeyboardInterrupt: | |
| print("Exiting...") | |
| def run_alert_listener(): | |
| asyncio.run(listen_to_alerts(r)) | |
| # Start the alert listener thread | |
| alert_thread = threading.Thread(target=run_alert_listener) | |
| alert_thread.start() | |
| # dashboard | |
| # Function to get the latest alert and count | |
| def get_latest_alert(): | |
| global latest_alert, alert_count | |
| return latest_alert, alert_count | |
| with gr.Blocks(theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# 🚨 Alert Dashboard 🚨") | |
| with gr.Row(): | |
| with gr.Column(): | |
| latest_alert_box = gr.Textbox(label="Latest Alert", lines=3, interactive=False) | |
| alert_count_box = gr.Number(label="Alert Count", interactive=False) | |
| refresh_button = gr.Button("Refresh", variant="primary") | |
| refresh_button.click(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box]) | |
| app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box]) | |
| # Auto-refresh every 30 seconds | |
| app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box], every=30) | |
| # Launch the app | |
| # Launch the app | |
| # if __name__ == "__main__": | |
| app.launch() |