make advanced features , for opc ua it should for production, login password should be admin , admin add logo of aeonx digital and remember i will it as exe applications here is the previous code:import os import sys import asyncio import logging import json import pandas as pd import numpy as np from datetime import datetime, timedelta import subprocess from asyncua import Client, ua from asyncua.ua import UaStatusCodeError from PyQt5.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QComboBox, QPushButton, QTreeWidget, QTreeWidgetItem, QMessageBox, QProgressBar, QTabWidget, QTableWidget, QTableWidgetItem, QCheckBox, QSpinBox, QFileDialog, QInputDialog, QMenu, QAction, QSplitter, QDockWidget, QTextEdit, QToolBar, QGroupBox, QDialog, QFormLayout ) from PyQt5.QtCore import QTimer, Qt, QThreadPool, QRunnable, QLocale, QTranslator from PyQt5.QtGui import QFont from PyQt5.QtSql import QSqlDatabase, QSqlQuery import matplotlib.pyplot as plt from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from cryptography.fernet import Fernet import sqlite3 import hashlib import time import uuid from google.cloud import pubsub_v1 from google.api_core.exceptions import GoogleAPICallError from google.auth.exceptions import GoogleAuthError # Configure logging logging.basicConfig(filename='opc_ua_client.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("opc_ua_client") # Audit logging audit_handler = logging.FileHandler('audit.log') audit_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(user)s - %(session_id)s - %(message)s')) audit_logger = logging.getLogger("audit") audit_logger.setLevel(logging.INFO) audit_logger.addHandler(audit_handler) class Worker(QRunnable): def __init__(self, fn, *args, **kwargs): super().__init__() self.fn = fn self.args = args self.kwargs = kwargs def run(self): try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self.fn(*self.args, **self.kwargs)) except Exception as e: logger.error(f"Worker error: {e}") finally: loop.close() class LoginDialog(QDialog): def __init__(self, db): super().__init__() self.db = db self.setWindowTitle("Login") self.setGeometry(500, 300, 300, 150) layout = QFormLayout() self.username_input = QLineEdit() self.password_input = QLineEdit() self.password_input.setEchoMode(QLineEdit.Password) self.login_button = QPushButton("Login") self.login_button.clicked.connect(self.validate_login) layout.addRow("Username:", self.username_input) layout.addRow("Password:", self.password_input) layout.addWidget(self.login_button) self.setLayout(layout) self.current_user = None self.current_role = None self.session_id = str(uuid.uuid4()) def validate_login(self): username = self.username_input.text() password = self.password_input.text() hashed_password = hashlib.sha256(password.encode()).hexdigest() query = QSqlQuery(self.db) query.prepare("SELECT role FROM users WHERE username = ? AND password = ?") query.addBindValue(username) query.addBindValue(hashed_password) if query.exec_() and query.next(): self.current_user = username self.current_role = query.value(0) audit_logger.info(f"User {username} ({self.current_role}) logged in", extra={"user": username, "session_id": self.session_id}) self.accept() else: QMessageBox.critical(self, "Error", "Invalid credentials.") audit_logger.warning(f"Failed login attempt for {username}", extra={"user": "unknown", "session_id": self.session_id}) class HyperAdvancedOPCUClientGUI(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Hyper Advanced OPC UA Client for Kepware") self.setGeometry(100, 100, 1400, 900) self.clients = {} self.subscriptions = {} self.data_log = [] self.favorites = set() self.metadata_cache = {} self.tag_groups = {} self.running = False self.threadpool = QThreadPool() self.threadpool.setMaxThreadCount(4) self.encryption_key = Fernet.generate_key() self.cipher = Fernet(self.encryption_key) self.current_user = None self.current_role = None self.session_id = None self.uptime_start = None self.tag_update_count = 0 self.anomaly_log = [] self.schedules = [] self.dependencies = {} self.pubsub_publisher = None self.pubsub_topic = "projects/your-gcp-project/topics/opc-ua-data" self.alerts = [] self.reconnect_strategy = {"initial_delay": 2, "max_attempts": 5} self.diagnostic_log = [] self.reconnect_attempts ={} self.event_handlers = {} self.schedule_timer = QTimer() self.schedule_timer.timeout.connect(self.run_scheduled_tasks) self.reconnect_timer = QTimer() self.reconnect_timer.timeout.connect(self.attempt_reconnect) self.alert_timer = QTimer() self.alert_timer.timeout.connect(self.update_alerts) self.diagnostic_timer = QTimer() self.diagnostic_timer.timeout.connect(self.log_diagnostic) #self.load_config() # Load saved configuration on startup try: self.pubsub_publisher = pubsub_v1.PublisherClient() logger.info("Google Cloud Pub/Sub client initialized successfully") except GoogleAuthError as e: logger.error(f"Google Cloud credentials error: {e}") self.show_error("Google Cloud credentials not configured. Please set up Application Default Credentials.") self.pubsub_publisher = None except Exception as e: logger.error(f"Error initializing Pub/Sub client: {e}") self.pubsub_publisher = None self.show_error("Pub/Sub initialization failed. Continuing without Pub/Sub.") self.init_db() if not self.show_login(): sys.exit() self.init_ui() self.load_config() self.timer.start(1000) self.schedule_timer.start(1000) self.alert_timer_start(5000) self.diagnostic_timer.start(60000) def init_db(self): """Initialize SQLite database with settings and diagnostic tables.""" self.db = QSqlDatabase.addDatabase("QSQLITE") self.db.setDatabaseName("opc_ua_client.db") if not self.db.open(): logger.error("Failed to open database") QMessageBox.critical(self, "Error", "Failed to open database.") return query = QSqlQuery() queries = [ """ CREATE TABLE IF NOT EXISTS users ( username TEXT PRIMARY KEY, password TEXT, role TEXT ) """, """ CREATE TABLE IF NOT EXISTS tag_data ( node_id TEXT, timestamp TEXT, value TEXT ) """, """ CREATE TABLE IF NOT EXISTS tag_metadata ( node_id TEXT PRIMARY KEY, units TEXT, scaling_factor REAL, min_value REAL, max_value REAL ) """, """ CREATE TABLE IF NOT EXISTS tag_groups ( group_name TEXT, node_id TEXT ) """, """ CREATE TABLE IF NOT EXISTS schedules ( id TEXT PRIMARY KEY, task_type TEXT, node_id TEXT, value TEXT, interval INTEGER ) """, """ CREATE TABLE IF NOT EXISTS dependencies ( source_node_id TEXT, target_node_id TEXT, threshold REAL, target_value TEXT ) """, """ CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT ) """, """ CREATE TABLE IF NOT EXISTS diagnostics ( timestamp TEXT, event TEXT, details TEXT ) """ ] for q in queries: if not query.exec_(q): logger.error(f"Database initialization error: {query.lastError().text()}") query.prepare("INSERT OR IGNORE INTO users (username, password, role) VALUES (?, ?, ?)") query.addBindValue("admin") query.addBindValue(hashlib.sha256("admin123".encode()).hexdigest()) query.addBindValue("Admin") if not query.exec_(): logger.error(f"Failed to initialize default admin user: {query.lastError().text()}") def show_login(self): login_dialog = LoginDialog(self.db) if login_dialog.exec_() == QDialog.Accepted: self.current_user = login_dialog.current_user self.current_role = login_dialog.current_role self.session_id = login_dialog.session_id return True return False def init_ui(self): self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.layout = QVBoxLayout(self.central_widget) # Toolbar toolbar = QToolBar() self.addToolBar(toolbar) lang_menu = QMenu("Language") for lang in ["English", "Spanish", "German"]: action = QAction(lang, self) action.triggered.connect(lambda checked, l=lang: self.change_language(l)) lang_menu.addAction(action) toolbar.addAction(lang_menu.menuAction()) report_action = QAction("Generate Report", self) report_action.triggered.connect(self.generate_report) toolbar.addAction(report_action) backup_action = QAction("Backup Data", self) backup_action.triggered.connect(self.backup_data) toolbar.addAction(backup_action) restore_action = QAction("Restore Data", self) restore_action.triggered.connect(self.restore_data) toolbar.addAction(restore_action) diagnostics_action = QAction("View Diagnostics", self) diagnostics_action.triggered.connect(self.show_diagnostic_log) toolbar.addAction(diagnostics_action) # Server management server_layout = QVBoxLayout() self.server_combo = QComboBox() self.server_combo.currentIndexChanged.connect(self.switch_server) server_layout.addWidget(QLabel("Active Server:")) server_layout.addWidget(self.server_combo) conn_layout = QHBoxLayout() self.url_label = QLabel("Kepware URL:") self.url_input = QLineEdit() self.secondary_url_label = QLabel("Secondary URL:") self.secondary_url_input = QLineEdit() self.port_label = QLabel("Port:") self.port_input = QLineEdit("49320") self.user_label = QLabel("Username:") self.user_input = QLineEdit() self.pass_label = QLabel("Password:") self.pass_input = QLineEdit() self.pass_input.setEchoMode(QLineEdit.Password) self.security_combo = QComboBox() self.security_combo.addItems(["None", "Basic256Sha256", "Aes128Sha256RsaOaep"]) self.pubsub_label = QLabel("GCP Pub/Sub Topic:") self.pubsub_input = QLineEdit(self.pubsub_topic) conn_layout.addWidget(self.url_label) conn_layout.addWidget(self.url_input) conn_layout.addWidget(self.secondary_url_label) conn_layout.addWidget(self.secondary_url_input) conn_layout.addWidget(self.port_label) conn_layout.addWidget(self.port_input) conn_layout.addWidget(self.user_label) conn_layout.addWidget(self.user_input) conn_layout.addWidget(self.pass_label) conn_layout.addWidget(self.pass_input) conn_layout.addWidget(QLabel("Security:")) conn_layout.addWidget(self.security_combo) conn_layout.addWidget(self.pubsub_label) conn_layout.addWidget(self.pubsub_input) server_layout.addLayout(conn_layout) # Settings settings_layout = QHBoxLayout() self.poll_label = QLabel("Default Polling (ms):") self.poll_spin = QSpinBox() self.poll_spin.setRange(100, 5000) self.poll_spin.setValue(500) self.depth_label = QLabel("Browse Depth:") self.depth_spin = QSpinBox() self.depth_spin.setRange(1, 10) self.depth_spin.setValue(5) self.batch_size_label = QLabel("Subscription Batch Size:") self.batch_size_spin = QSpinBox() self.batch_size_spin.setRange(1, 100) self.batch_size_spin.setValue(10) self.simulate_check = QCheckBox("Simulate Tags") self.simulate_check.stateChanged.connect(self.toggle_simulation) self.validate_data_check = QCheckBox("Validate Data") self.validate_data_check.stateChanged.connect(self.toggle_validation) settings_layout.addWidget(self.poll_label) settings_layout.addWidget(self.poll_spin) settings_layout.addWidget(self.depth_label) settings_layout.addWidget(self.depth_spin) settings_layout.addWidget(self.batch_size_label) settings_layout.addWidget(self.batch_size_spin) settings_layout.addWidget(self.simulate_check) settings_layout.addWidget(self.validate_data_check) server_layout.addLayout(settings_layout) # Buttons button_layout = QHBoxLayout() self.connect_button = QPushButton("Connect to Kepware") self.disconnect_button = QPushButton("Disconnect Server") self.save_config_button = QPushButton("Save Config") self.load_config_button = QPushButton("Load Config") self.export_tags_button = QPushButton("Export Tags") self.import_tags_button = QPushButton("Import Tags") self.batch_write_button = QPushButton("Batch Write from CSV") self.schedule_button = QPushButton("Manage Schedules") self.set_defaults_button = QPushButton("Set Defaults") self.disconnect_button.setEnabled(False) self.connect_button.clicked.connect(self.add_server) self.disconnect_button.clicked.connect(self.disconnect_server) self.save_config_button.clicked.connect(self.save_config) self.load_config_button.clicked.connect(self.load_config) self.export_tags_button.clicked.connect(self.export_tags) self.import_tags_button.clicked.connect(self.import_tags) self.batch_write_button.clicked.connect(self.batch_write) self.schedule_button.clicked.connect(self.manage_schedules) self.set_defaults_button.clicked.connect(self.set_defaults) self.batch_write_button.setEnabled(self.current_role == "Admin") self.schedule_button.setEnabled(self.current_role == "Admin") button_layout.addWidget(self.connect_button) button_layout.addWidget(self.disconnect_button) button_layout.addWidget(self.save_config_button) button_layout.addWidget(self.load_config_button) button_layout.addWidget(self.export_tags_button) button_layout.addWidget(self.import_tags_button) button_layout.addWidget(self.batch_write_button) button_layout.addWidget(self.schedule_button) button_layout.addWidget(self.set_defaults_button) server_layout.addLayout(button_layout) self.progress_bar = QProgressBar() self.progress_bar.setVisible(False) server_layout.addWidget(self.progress_bar) self.layout.addLayout(server_layout) # Tabbed interface self.tabs = QTabWidget() self.browse_tab = QWidget() self.monitor_tab = QWidget() self.write_tab = QWidget() self.history_tab = QWidget() self.events_tab = QWidget() self.script_tab = QWidget() self.kpi_tab = QWidget() self.groups_tab = QWidget() self.alerts_tab = QWidget() self.tabs.addTab(self.browse_tab, "Browse Nodes") self.tabs.addTab(self.monitor_tab, "Monitor Tags") self.tabs.addTab(self.write_tab, "Write Values") self.tabs.addTab(self.history_tab, "Historical Data") self.tabs.addTab(self.events_tab, "Events") self.tabs.addTab(self.script_tab, "Scripting") self.tabs.addTab(self.kpi_tab, "KPI Dashboard") self.tabs.addTab(self.groups_tab, "Tag Groups") self.tabs.addTab(self.alerts_tab, "Alerts") self.layout.addWidget(self.tabs) # Browse tab browse_layout = QVBoxLayout(self.browse_tab) self.filter_input = QLineEdit() self.filter_input.setPlaceholderText("Filter by Node Name or ID") self.filter_input.textChanged.connect(self.filter_nodes) browse_layout.addWidget(self.filter_input) self.browse_tree = QTreeWidget() self.browse_tree.setHeaderLabels(["Node Name", "Node ID", "Data Type", "Description", "Favorite", "Status"]) self.browse_tree.setColumnWidth(0, 250) self.browse_tree.setColumnWidth(1, 200) self.browse_tree.setColumnWidth(2, 100) self.browse_tree.setContextMenuPolicy(Qt.CustomContextMenu) self.browse_tree.customContextMenuRequested.connect(self.open_context_menu) browse_layout.addWidget(self.browse_tree) # Monitor tab monitor_splitter = QSplitter(Qt.Horizontal) self.monitor_tab.setLayout(QVBoxLayout()) self.monitor_table = QTableWidget() self.monitor_table.setColumnCount(9) self.monitor_table.setHorizontalHeaderLabels(["Node Name", "Node ID", "Value", "Timestamp", "Polling (ms)", "Units", "Scaling", "Anomaly", "Last Update Status"]) self.monitor_table.setColumnWidth(0, 250) self.monitor_table.setColumnWidth(1, 200) self.monitor_table.setColumnWidth(2, 150) self.monitor_table.setContextMenuPolicy(Qt.CustomContextMenu) self.monitor_table.customContextMenuRequested.connect(self.monitor_context_menu) self.trend_canvas = FigureCanvas(plt.Figure()) monitor_splitter.addWidget(self.monitor_table) monitor_splitter.addWidget(self.trend_canvas) self.monitor_tab.layout().addWidget(monitor_splitter) self.log_check = QCheckBox("Enable Data Logging to CSV") self.log_check.stateChanged.connect(self.toggle_logging) self.monitor_tab.layout().addWidget(self.log_check) # Write tab write_layout = QVBoxLayout(self.write_tab) self.write_node_input = QLineEdit() self.write_node_input.setPlaceholderText("Enter Node ID to Write") self.write_value_input = QLineEdit() self.write_value_input.setPlaceholderText("Enter Value") self.write_button = QPushButton("Write Value") self.write_button.clicked.connect(self.write_value) write_layout.addWidget(self.write_node_input) write_layout.addWidget(self.write_value_input) write_layout.addWidget(self.write_button) # Historical data tab history_layout = QVBoxLayout(self.history_tab) self.history_node_input = QLineEdit() self.history_node_input.setPlaceholderText("Enter Node ID for Historical Data") self.history_duration = QSpinBox() self.history_duration.setRange(1, 1440) self.history_duration.setValue(60) self.history_button = QPushButton("Fetch Historical Data") self.history_button.clicked.connect(self.fetch_historical_data) history_layout.addWidget(self.history_node_input) history_layout.addWidget(QLabel("Duration (minutes):")) history_layout.addWidget(self.history_duration) history_layout.addWidget(self.history_button) self.history_canvas = FigureCanvas(plt.Figure()) self.history_stats_label = QLabel("Statistics: N/A") history_layout.addWidget(self.history_canvas) history_layout.addWidget(self.history_stats_label) # Events tab events_layout = QVBoxLayout(self.events_tab) self.event_filter_input = QLineEdit() self.event_filter_input.setPlaceholderText("Filter by Event Type or Message") self.event_filter_input.textChanged.connect(self.filter_events) self.events_table = QTableWidget() self.events_table.setColumnCount(5) self.events_table.setHorizontalHeaderLabels(["Event Type", "Message", "Timestamp", "Severity", "Acknowledged By"]) self.events_table.setColumnWidth(0, 200) self.events_table.setColumnWidth(1, 300) self.events_table.setContextMenuPolicy(Qt.CustomContextMenu) self.events_table.customContextMenuRequested.connect(self.event_context_menu) events_layout.addWidget(self.event_filter_input) events_layout.addWidget(self.events_table) # Scripting tab script_layout = QVBoxLayout(self.script_tab) self.script_editor = QTextEdit() self.script_editor.setPlaceholderText("Enter Lua script for tag processing...") self.script_run_button = QPushButton("Run Script") self.script_run_button.clicked.connect(self.run_script) self.script_run_button.setEnabled(self.current_role == "Admin") script_layout.addWidget(self.script_editor) script_layout.addWidget(self.script_run_button) # KPI Dashboard tab kpi_layout = QVBoxLayout(self.kpi_tab) self.kpi_label = QLabel("KPIs: Tag Update Rate: 0/s, Server Uptime: 0s, Connection Success: 0%") kpi_layout.addWidget(self.kpi_label) self.kpi_canvas = FigureCanvas(plt.Figure()) kpi_layout.addWidget(self.kpi_canvas) # Tag Groups tab groups_layout = QVBoxLayout(self.groups_tab) self.group_combo = QComboBox() self.group_combo.currentTextChanged.connect(self.display_group) groups_layout.addWidget(QLabel("Select Group:")) groups_layout.addWidget(self.group_combo) self.group_table = QTableWidget() self.group_table.setColumnCount(4) self.group_table.setHorizontalHeaderLabels(["Node Name", "Node ID", "Latest Value", "Statistics"]) self.group_table.setColumnWidth(0, 250) self.group_table.setColumnWidth(1, 200) groups_layout.addWidget(self.group_table) self.group_export_button = QPushButton("Export Group Data") self.group_export_button.clicked.connect(self.export_group_data) groups_layout.addWidget(self.group_export_button) # Alerts tab alerts_layout = QVBoxLayout(self.alerts_tab) self.alerts_table = QTableWidget() self.alerts_table.setColumnCount(3) self.alerts_table.setHorizontalHeaderLabels(["Timestamp", "Message", "Status"]) self.alerts_table.setColumnWidth(0, 200) self.alerts_table.setColumnWidth(1, 400) alerts_layout.addWidget(self.alerts_table) # Dashboard dock self.dashboard_dock = QDockWidget("Dashboard", self) self.dashboard_widget = QWidget() self.dashboard_layout = QVBoxLayout(self.dashboard_widget) self.add_dock_widget() self.addDockWidget(Qt.RightDockWidgetArea, self.dashboard_dock) # Status and metrics status_layout = QHBoxLayout() self.status_label = QLabel("No Active Servers") self.status_label.setFont(QFont("Arial", 10, QFont.Bold)) self.response_time_label = QLabel("Response Time: N/A") status_layout.addWidget(self.status_label) status_layout.addWidget(self.response_time_label) self.layout.addLayout(status_layout) self.load_groups() self.timer.start(1000) self.schedule_timer.start(1000) self.alert_timer.start(5000) self.diagnostic_timer.start(60000) def add_dock_widget(self): tag_widget = QGroupBox("Tag Value") tag_layout = QVBoxLayout() self.tag_display_label = QLabel("Select a tag to display") tag_layout.addWidget(self.tag_display_label) tag_widget.setLayout(tag_layout) self.dashboard_layout.addWidget(tag_widget) alert_widget = QGroupBox("Current Alerts") alert_layout = QVBoxLayout() self.alert_display_label = QLabel("No active alerts") alert_layout.addWidget(self.alert_display_label) alert_widget.setLayout(alert_layout) self.dashboard_layout.addWidget(alert_widget) def load_language(self): if self.language == "es": self.translator.load(":/translations/es.qm") elif self.language == "de": self.translator.load(":/translations/de.qm") else: self.translator.load("") QApplication.instance().installTranslator(self.translator) self.update_ui_texts() def change_language(self, lang): lang_map = {"English": "en", "Spanish": "es", "German": "de"} self.language = lang_map.get(lang, "en") self.load_language() audit_logger.info(f"Changed language to {self.language}", extra={"user": self.current_user, "session_id": self.session_id}) def update_ui_texts(self): self.setWindowTitle(self.tr("Hyper Advanced OPC UA Client for Kepware")) self.url_label.setText(self.tr("Kepware URL:")) self.secondary_url_label.setText(self.tr("Secondary URL:")) self.port_label.setText(self.tr("Port:")) self.user_label.setText(self.tr("Username:")) self.pass_label.setText(self.tr("Password:")) self.pubsub_label.setText(self.tr("GCP Pub/Sub Topic:")) self.poll_label.setText(self.tr("Default Polling (ms):")) self.depth_label.setText(self.tr("Browse Depth:")) self.batch_size_label.setText(self.tr("Subscription Batch Size:")) self.connect_button.setText(self.tr("Connect to Kepware")) self.disconnect_button.setText(self.tr("Disconnect Server")) self.save_config_button.setText(self.tr("Save Config")) self.load_config_button.setText(self.tr("Load Config")) self.export_tags_button.setText(self.tr("Export Tags")) self.import_tags_button.setText(self.tr("Import Tags")) self.batch_write_button.setText(self.tr("Batch Write from CSV")) self.schedule_button.setText(self.tr("Manage Schedules")) self.set_defaults_button.setText(self.tr("Set Defaults")) self.simulate_check.setText(self.tr("Simulate Tags")) self.validate_data_check.setText(self.tr("Validate Data")) self.tabs.setTabText(0, self.tr("Browse Nodes")) self.tabs.setTabText(1, self.tr("Monitor Tags")) self.tabs.setTabText(2, self.tr("Write Values")) self.tabs.setTabText(3, self.tr("Historical Data")) self.tabs.setTabText(4, self.tr("Events")) self.tabs.setTabText(5, self.tr("Scripting")) self.tabs.setTabText(6, self.tr("KPI Dashboard")) self.tabs.setTabText(7, self.tr("Tag Groups")) self.tabs.setTabText(8, self.tr("Alerts")) self.log_check.setText(self.tr("Enable Data Logging to CSV")) self.write_button.setText(self.tr("Write Value")) self.history_button.setText(self.tr("Fetch Historical Data")) self.script_run_button.setText(self.tr("Run Script")) self.group_export_button.setText(self.tr("Export Group Data")) async def browse_nodes(self, client, node, parent_item, depth=0, max_depth=5): if depth > max_depth: return try: children = await node.get_children() for child in children: node_id = str(child.nodeid) display_name = (await child.read_display_name()).Text try: data_type = (await child.read_data_type()).Identifier data_type_node = client.get_node(ua.NodeId(data_type)) data_type_str = (await data_type_node.read_display_name()).Text except: data_type_str = "Unknown" try: description = (await child.read_description()).Text or "No Description" except: description = "No Description" status = "OK" if await self.check_node_status(client, child) else "Error" if "Channel" in display_name or "CHANNEL" in node_id.upper(): item_type = "Channel" elif "Device" in display_name or "DEVICE" in node_id.upper(): item_type = "Device" else: item_type = "Tag" if item_type in ["Channel", "Device"]: new_item = QTreeWidgetItem(parent_item, [f"{item_type}: {display_name}", node_id, data_type_str, description, "★" if node_id in self.favorites else "☆", status]) else: new_item = QTreeWidgetItem(parent_item, [display_name, node_id, data_type_str, description, "★" if node_id in self.favorites else "☆", status]) new_item.setData(0, Qt.UserRole, (client, child)) if node_id in self.favorites: new_item.setForeground(0, Qt.darkYellow) if status == "Error": new_item.setForeground(5, Qt.red) self.metadata_cache[node_id] = (display_name, data_type_str, description) await self.browse_nodes(client, child, new_item, depth + 1, max_depth) except Exception as e: logger.error(f"Error browsing nodes: {e}") self.add_alert(f"Browse Error: {e}") async def check_node_status(self, client, node): try: await node.read_value() return True except UaStatusCodeError as e: logger.warning(f"Node status check failed: {e}") return False async def setup_event_subscription(self, client): try: event_obj = client.get_node(ua.ObjectIds.Server) handler = self.EventHandler(self.events_table, self.current_user) sub = await client.create_subscription(1000, handler) await sub.subscribe_events(event_obj) self.event_handlers[client] = (sub, handler) except Exception as e: logger.error(f"Error setting up event subscription: {e}") self.add_alert(f"Event Subscription Error: {e}") class SubscriptionHandler: def __init__(self, table, row, canvas, node_id, parent): self.table = table self.row = row self.canvas = canvas self.node_id = node_id self.parent = parent self.values = [] self.timestamps = [] self.last_update = None def datachange_notification(self, node, val, data): try: if self.parent.simulate_check.isChecked(): val = np.sin(time.time()) * 100 scaling = float(self.table.item(self.row, 6).text() or 1) scaled_val = val * scaling anomaly = self.detect_anomaly(val) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.last_update = "Success" self.table.setItem(self.row, 2, QTableWidgetItem(f"{scaled_val:.2f}")) self.table.setItem(self.row, 3, QTableWidgetItem(timestamp)) self.table.setItem(self.row, 7, QTableWidgetItem("Anomaly" if anomaly else "")) self.table.setItem(self.row, 8, QTableWidgetItem(self.last_update)) if anomaly: self.parent.anomaly_log.append({"node_id": self.node_id, "value": scaled_val, "timestamp": timestamp}) self.parent.add_alert(f"Anomaly Detected: {self.node_id} = {scaled_val}") audit_logger.warning(f"Anomaly detected for {self.node_id}: {scaled_val}", extra={"user": self.parent.current_user, "session_id": self.parent.session_id}) self.values.append(scaled_val) self.timestamps.append(datetime.now()) if len(self.values) > 100: self.values.pop(0) self.timestamps.pop(0) self.update_trend() self.parent.tag_update_count += 1 if self.parent.log_check.isChecked(): self.parent.log_data(self.table.item(self.row, 0).text(), self.node_id, str(scaled_val), timestamp) if self.parent.validate_data_check.isChecked() and not self.validate_value(scaled_val): self.last_update = "Validation Failed" self.table.setItem(self.row, 8, QTableWidgetItem(self.last_update)) self.parent.add_alert(f"Validation Failed: {self.node_id} = {scaled_val}") query = QSqlQuery(self.parent.db) query.prepare("INSERT INTO tag_data (node_id, timestamp, value) VALUES (?, ?, ?)") query.addBindValue(self.node_id) query.addBindValue(timestamp) query.addBindValue(str(val)) if not query.exec_(): logger.error(f"Failed to log tag data: {query.lastError().text()}") if self.parent.pubsub_publisher: try: self.parent.pubsub_publisher.publish( self.parent.pubsub_topic, json.dumps({ "node_id": self.node_id, "value": scaled_val, "timestamp": timestamp, "anomaly": anomaly, "status": self.last_update }).encode() ) except Exception as e: logger.error(f"Error publishing to Pub/Sub: {e}") self.parent.check_dependencies(self.node_id, scaled_val) except Exception as e: self.last_update = f"Error: {e}" self.table.setItem(self.row, 8, QTableWidgetItem(self.last_update)) logger.error(f"Error in datachange notification: {e}") self.parent.add_alert(f"Data Update Error: {self.node_id} - {e}") def detect_anomaly(self, value): if len(self.values) < 10: return False mean = np.mean(self.values) std = np.std(self.values) if std == 0: return False z_score = abs(value - mean) / std return z_score > 3 def update_trend(self): self.canvas.figure.clear() ax = self.canvas.figure.add_subplot(111) ax.plot(self.timestamps, self.values, label=self.table.item(self.row, 0).text()) ax.set_xlabel("Time") ax.set_ylabel("Value") ax.legend() ax.grid(True) plt.setp(ax.get_xticklabels(), rotation=45) self.canvas.figure.tight_layout() self.canvas.draw() def validate_value(self, value): metadata = self.parent.get_metadata(self.node_id) min_val = metadata[1].get("min_value") max_val = metadata[1].get("max_value") return min_val is None or max_val is None or (min_val <= value <= max_val) class EventHandler: def __init__(self, table, user): self.table = table self.user = user def event_notification(self, event): try: row = self.table.rowCount() self.table.insertRow(row) self.table.setItem(row, 0, QTableWidgetItem(str(event.EventType))) self.table.setItem(row, 1, QTableWidgetItem(str(event.Message))) self.table.setItem(row, 2, QTableWidgetItem(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) self.table.setItem(row, 3, QTableWidgetItem(str(getattr(event, "Severity", "N/A")))) self.table.setItem(row, 4, QTableWidgetItem("")) self.table.viewport().update() except Exception as e: logger.error(f"Error in event notification: {e}") def add_server(self): url = f"opc.tcp://{self.url_input.text()}:{self.port_input.text()}" secondary_url = f"opc.tcp://{self.secondary_url_input.text()}:{self.port_input.text()}" if self.secondary_url_input.text() else "" if not url or not self.port_input.text(): self.show_error("Please enter a valid Kepware URL and Port.") return self.connect_button.setEnabled(False) self.disconnect_button.setEnabled(True) self.progress_bar.setVisible(True) self.status_label.setText(f"Connecting to Kepware at {url}...") self.progress_bar.setRange(0, 0) self.uptime_start = datetime.now() worker = Worker(self._async_connect, url, secondary_url) self.threadpool.start(worker) audit_logger.info(f"Attempted to connect to Kepware at {url}", extra={"user": self.current_user, "session_id": self.session_id}) async def _async_connect(self, url, secondary_url): client = None try: client = Client(url) if self.user_input.text() and self.pass_input.text(): client.set_user(self.user_input.text()) client.set_password(self.pass_input.text()) security_policy = self.security_combo.currentText() if security_policy != "None": client.set_security_string(f"Basic256Sha256,SignAndEncrypt,{security_policy}") async with client: start_time = datetime.now() await client.connect() response_time = (datetime.now() - start_time).total_seconds() * 1000 self.clients[url] = (client, secondary_url) self.subscriptions[client] = [] self.reconnect_attempts[url] = 0 self.server_combo.addItem(url) self.server_combo.setCurrentText(url) self.status_label.setText(f"Connected to Kepware at {url}") self.response_time_label.setText(f"Response Time: {response_time:.0f} ms") self.progress_bar.setVisible(False) self.running = True await self.browse_nodes(client, client.get_root_node(), self.browse_tree, max_depth=self.depth_spin.value()) await self.setup_event_subscription(client) self.add_alert(f"Successfully connected to {url}") except Exception as e: logger.error(f"Connection error to Kepware at {url}: {e}") self.add_alert(f"Connection Error to {url}: {e}") self.show_error(f"Connection error to Kepware at {url}: {e}\nCheck URL, port, or credentials.") if secondary_url and self.reconnect_attempts.get(url, 0) < self.reconnect_strategy["max_attempts"]: self.reconnect_attempts[url] = self.reconnect_attempts.get(url, 0) + 1 self.status_label.setText(f"Attempting failover to {secondary_url}... Attempt {self.reconnect_attempts[url]}/{self.reconnect_strategy['max_attempts']}") await asyncio.sleep(self.reconnect_strategy["initial_delay"] * 2 ** (self.reconnect_attempts[url] - 1)) await self._async_connect(secondary_url, "") else: self.attempt_reconnect(url) finally: if client and client not in self.clients.values(): try: await client.disconnect() except: pass def switch_server(self): url = self.server_combo.currentText() if url in self.clients: self.browse_tree.clear() self.monitor_table.setRowCount(0) self.status_label.setText(f"Switched to Kepware at {url}") client, _ = self.clients[url] worker = Worker(self.browse_nodes, client, client.get_root_node(), self.browse_tree, max_depth=self.depth_spin.value()) self.threadpool.start(worker) def attempt_reconnect(self): for url in list(self.clients.keys()): if self.reconnect_attempts.get(url, 0) < self.reconnect_strategy["max_attempts"]: self.reconnect_attempts[url] = self.reconnect_attempts.get(url, 0) + 1 self.status_label.setText(f"Reconnecting to {url}... Attempt {self.reconnect_attempts[url]}/{self.reconnect_strategy['max_attempts']}") client, secondary_url = self.clients[url] worker = Worker(self._async_connect, url, secondary_url) self.threadpool.start(worker) else: self.disconnect_server(url) self.add_alert(f"Max reconnect attempts reached for {url}") def disconnect_server(self, url=None): if not url: url = self.server_combo.currentText() if url not in self.clients: return client, _ = self.clients[url] for sub, _, handle in self.subscriptions.get(client, []): try: loop = asyncio.get_event_loop() loop.run_until_complete(sub.delete()) except Exception as e: logger.error(f"Error unsubscribing: {e}") if client in self.event_handlers: sub, _ = self.event_handlers[client] try: loop = asyncio.get_event_loop() loop.run_until_complete(sub.delete()) except Exception as e: logger.error(f"Error unsubscribing events: {e}") del self.event_handlers[client] try: loop = asyncio.get_event_loop() loop.run_until_complete(client.disconnect()) except Exception as e: logger.error(f"Error disconnecting client: {e}") del self.clients[url] del self.subscriptions[client] self.server_combo.removeItem(self.server_combo.findText(url)) self.status_label.setText(f"Disconnected from Kepware at {url}") self.connect_button.setEnabled(True) self.disconnect_button.setEnabled(len(self.clients) > 0) if not self.clients: self.running = False self.timer.stop() self.add_alert(f"Disconnected from {url}") audit_logger.info(f"Disconnected from Kepware at {url}", extra={"user": self.current_user, "session_id": self.session_id}) def open_context_menu(self, position): item = self.browse_tree.itemAt(position) if not item: return client, node = item.data(0, Qt.UserRole) node_id = item.text(1) menu = QMenu() subscribe_action = QAction("Subscribe", self) unsubscribe_action = QAction("Unsubscribe", self) favorite_action = QAction("Toggle Favorite", self) metadata_action = QAction("Edit Metadata", self) add_to_group_action = QAction("Add to Group", self) dependency_action = QAction("Add Dependency", self) diagnose_action = QAction("Diagnose Node", self) subscribe_action.triggered.connect(lambda: self.subscribe_node(client, node, node_id)) unsubscribe_action.triggered.connect(lambda: self.unsubscribe_node(client, node)) favorite_action.triggered.connect(lambda: self.toggle_favorite(node_id)) metadata_action.triggered.connect(lambda: self.edit_metadata(node_id)) add_to_group_action.triggered.connect(lambda: self.add_to_group(node_id)) dependency_action.triggered.connect(lambda: self.add_dependency(node_id)) diagnose_action.triggered.connect(lambda: self.diagnose_node(client, node, node_id)) menu.addAction(subscribe_action) menu.addAction(unsubscribe_action) menu.addAction(favorite_action) menu.addAction(metadata_action) menu.addAction(add_to_group_action) if self.current_role == "Admin": menu.addAction(dependency_action) menu.addAction(diagnose_action) menu.exec_(self.browse_tree.viewport().mapToGlobal(position)) def monitor_context_menu(self, position): item = self.monitor_table.itemAt(position) if not item: return row = self.monitor_table.row(item) menu = QMenu() update_polling_action = QAction("Update Polling", self) remove_action = QAction("Remove", self) diagnose_action = QAction("Diagnose", self) update_polling_action.triggered.connect(lambda: self.update_polling(row)) remove_action.triggered.connect(lambda: self.remove_monitored_tag(row)) diagnose_action.triggered.connect(lambda: self.diagnose_monitored_tag(row)) menu.addAction(update_polling_action) menu.addAction(remove_action) menu.addAction(diagnose_action) menu.exec_(self.monitor_table.viewport().mapToGlobal(position)) def event_context_menu(self, position): item = self.events_table.itemAt(position) if not item: return row = self.events_table.row(item) menu = QMenu() acknowledge_action = QAction("Acknowledge", self) acknowledge_action.triggered.connect(lambda: self.acknowledge_event(row)) menu.addAction(acknowledge_action) menu.exec_(self.events_table.viewport().mapToGlobal(position)) def subscribe_node(self, client, node, node_id): try: display_name = self.metadata_cache[node_id][0] row = self.monitor_table.rowCount() self.monitor_table.insertRow(row) self.monitor_table.setItem(row, 0, QTableWidgetItem(display_name)) self.monitor_table.setItem(row, 1, QTableWidgetItem(node_id)) self.monitor_table.setItem(row, 2, QTableWidgetItem("")) self.monitor_table.setItem(row, 3, QTableWidgetItem("")) polling_item = QTableWidgetItem(str(self.poll_spin.value())) polling_item.setFlags(Qt.ItemIsEditable | Qt.ItemIsEnabled) self.monitor_table.setItem(row, 4, polling_item) units, scaling = self.get_metadata(node_id) self.monitor_table.setItem(row, 5, QTableWidgetItem(units)) self.monitor_table.setItem(row, 6, QTableWidgetItem(str(scaling))) self.monitor_table.setItem(row, 7, QTableWidgetItem("")) self.monitor_table.setItem(row, 8, QTableWidgetItem("Pending")) handler = self.SubscriptionHandler(self.monitor_table, row, self.trend_canvas, node_id, self) sub = asyncio.run_coroutine_threadsafe(client.create_subscription(self.poll_spin.value(), handler), asyncio.get_event_loop()).result() handle = asyncio.run_coroutine_threadsafe(sub.subscribe_data_change(node), asyncio.get_event_loop()).result() self.subscriptions[client].append((sub, node, handle)) audit_logger.info(f"Subscribed to Kepware node {node_id}", extra={"user": self.current_user, "session_id": self.session_id}) except Exception as e: logger.error(f"Error subscribing to Kepware node {node_id}: {e}") self.show_error(f"Error subscribing to node: {e}") self.monitor_table.removeRow(row) self.add_alert(f"Subscription Error: {node_id} - {e}") def unsubscribe_node(self, client, node): try: for sub, sub_node, handle in self.subscriptions.get(client, []): if sub_node == node: asyncio.run_coroutine_threadsafe(sub.delete(), asyncio.get_event_loop()).result() self.subscriptions[client].remove((sub, sub_node, handle)) for row in range(self.monitor_table.rowCount()): if self.monitor_table.item(row, 1).text() == str(node.nodeid): self.monitor_table.removeRow(row) break audit_logger.info(f"Unsubscribed from Kepware node {str(node.nodeid)}", extra={"user": self.current_user, "session_id": self.session_id}) break except Exception as e: logger.error(f"Error unsubscribing from Kepware node: {e}") self.show_error(f"Error unsubscribing from node: {e}") def toggle_favorite(self, node_id): if node_id in self.favorites: self.favorites.remove(node_id) audit_logger.info(f"Removed favorite: {node_id}", extra={"user": self.current_user, "session_id": self.session_id}) else: self.favorites.add(node_id) audit_logger.info(f"Added favorite: {node_id}", extra={"user": self.current_user, "session_id": self.session_id}) for i in range(self.browse_tree.topLevelItemCount()): item = self.browse_tree.topLevelItem(i) if item.text(1) == node_id: item.setText(4, "★" if node_id in self.favorites else "☆") item.setForeground(0, Qt.darkYellow if node_id in self.favorites else Qt.black) def edit_metadata(self, node_id): dialog = QDialog(self) dialog.setWindowTitle("Edit Metadata") layout = QFormLayout() units_input = QLineEdit() scaling_input = QLineEdit() min_value_input = QLineEdit() max_value_input = QLineEdit() units, scaling = self.get_metadata(node_id) units_input.setText(units) scaling_input.setText(str(scaling)) layout.addRow("Units:", units_input) layout.addRow("Scaling Factor:", scaling_input) layout.addRow("Min Value:", min_value_input) layout.addRow("Max Value:", max_value_input) save_button = QPushButton("Save") save_button.clicked.connect(lambda: self.save_metadata(node_id, units_input.text(), scaling_input.text(), min_value_input.text(), max_value_input.text(), dialog)) layout.addWidget(save_button) dialog.setLayout(layout) dialog.exec_() def save_metadata(self, node_id, units, scaling, min_value, max_value, dialog): try: query = QSqlQuery(self.db) query.prepare(""" INSERT OR REPLACE INTO tag_metadata (node_id, units, scaling_factor, min_value, max_value) VALUES (?, ?, ?, ?, ?) """) query.addBindValue(node_id) query.addBindValue(units) query.addBindValue(float(scaling) if scaling else 1.0) query.addBindValue(float(min_value) if min_value else None) query.addBindValue(float(max_value) if max_value else None) if not query.exec_(): logger.error(f"Error saving metadata: {query.lastError().text()}") self.show_error(f"Error saving metadata: {query.lastError().text()}") return audit_logger.info(f"Updated metadata for Kepware node {node_id}", extra={"user": self.current_user, "session_id": self.session_id}) dialog.accept() except Exception as e: logger.error(f"Error saving metadata: {e}") self.show_error(f"Error saving metadata: {e}") def get_metadata(self, node_id): query = QSqlQuery(self.db) query.prepare("SELECT units, scaling_factor, min_value, max_value FROM tag_metadata WHERE node_id = ?") query.addBindValue(node_id) if query.exec_() and query.next(): return (query.value(0) or "", {"scaling_factor": query.value(1) or 1.0, "min_value": query.value(2), "max_value": query.value(3)}) return ("", {"scaling_factor": 1.0, "min_value": None, "max_value": None}) def add_to_group(self, node_id): group_name, ok = QInputDialog.getText(self, "Add to Group", "Enter Group Name:") if ok and group_name: query = QSqlQuery(self.db) query.prepare("INSERT INTO tag_groups (group_name, node_id) VALUES (?, ?)") query.addBindValue(group_name) query.addBindValue(node_id) if not query.exec_(): logger.error(f"Error adding to group: {query.lastError().text()}") return self.load_groups() audit_logger.info(f"Added Kepware node {node_id} to group {group_name}", extra={"user": self.current_user, "session_id": self.session_id}) def load_groups(self): self.group_combo.clear() query = QSqlQuery(self.db) if query.exec_("SELECT DISTINCT group_name FROM tag_groups"): while query.next(): self.group_combo.addItem(query.value(0)) self.tag_groups.clear() query = QSqlQuery(self.db) if query.exec_("SELECT group_name, node_id FROM tag_groups"): while query.next(): group_name = query.value(0) node_id = query.value(1) if group_name not in self.tag_groups: self.tag_groups[group_name] = [] self.tag_groups[group_name].append(node_id) else: logger.error(f"Error loading groups: {query.lastError().text()}") def display_group(self, group_name): self.group_table.setRowCount(0) if not group_name or group_name not in self.tag_groups: return for node_id in self.tag_groups[group_name]: row = self.group_table.rowCount() self.group_table.insertRow(row) display_name = self.metadata_cache.get(node_id, ["Unknown"])[0] query = QSqlQuery(self.db) query.prepare("SELECT value, timestamp FROM tag_data WHERE node_id = ? ORDER BY timestamp DESC LIMIT 1") query.addBindValue(node_id) value = "N/A" if query.exec_() and query.next(): value = query.value(0) query.prepare("SELECT value FROM tag_data WHERE node_id = ?") query.addBindValue(node_id) values = [] if query.exec_(): while query.next(): try: values.append(float(query.value(0))) except: pass stats = f"Mean: {np.mean(values):.2f}, Std: {np.std(values):.2f}" if values else "N/A" self.group_table.setItem(row, 0, QTableWidgetItem(display_name)) self.group_table.setItem(row, 1, QTableWidgetItem(node_id)) self.group_table.setItem(row, 2, QTableWidgetItem(value)) self.group_table.setItem(row, 3, QTableWidgetItem(stats)) def export_group_data(self): group_name = self.group_combo.currentText() if not group_name: return filename, _ = QFileDialog.getSaveFileName(self, "Export Group Data", f"{group_name}.csv", "CSV Files (*.csv)") if filename: data = [] query = QSqlQuery(self.db) for node_id in self.tag_groups.get(group_name, []): query.prepare("SELECT timestamp, value FROM tag_data WHERE node_id = ?") query.addBindValue(node_id) if query.exec_(): while query.next(): data.append({ "Node ID": node_id, "Timestamp": query.value(0), "Value": query.value(1) }) df = pd.DataFrame(data) df.to_csv(filename, index=False) audit_logger.info(f"Exported Kepware group {group_name} to {filename}", extra={"user": self.current_user, "session_id": self.session_id}) def add_dependency(self, node_id): if self.current_role != "Admin": self.show_error("Admin access required.") return dialog = QDialog(self) dialog.setWindowTitle("Add Dependency") layout = QFormLayout() target_node_input = QLineEdit() threshold_input = QLineEdit() target_value_input = QLineEdit() layout.addRow("Target Node ID:", target_node_input) layout.addRow("Threshold Value:", threshold_input) layout.addRow("Target Value:", target_value_input) save_button = QPushButton("Save") save_button.clicked.connect(lambda: self.save_dependency(node_id, target_node_input.text(), threshold_input.text(), target_value_input.text(), dialog)) layout.addWidget(save_button) dialog.setLayout(layout) dialog.exec_() def save_dependency(self, source_node_id, target_node_id, threshold, target_value, dialog): try: query = QSqlQuery(self.db) query.prepare("INSERT INTO dependencies (source_node_id, target_node_id, threshold, target_value) VALUES (?, ?, ?, ?)") query.addBindValue(source_node_id) query.addBindValue(target_node_id) query.addBindValue(float(threshold)) query.addBindValue(target_value) if not query.exec_(): logger.error(f"Error saving dependency: {query.lastError().text()}") self.show_error(f"Error saving dependency: {query.lastError().text()}") return if source_node_id not in self.dependencies: self.dependencies[source_node_id] = [] self.dependencies[source_node_id].append((target_node_id, float(threshold), target_value)) audit_logger.info(f"Added dependency from Kepware node {source_node_id} to {target_node_id}", extra={"user": self.current_user, "session_id": self.session_id}) dialog.accept() except Exception as e: logger.error(f"Error saving dependency: {e}") self.show_error(f"Error saving dependency: {e}") def check_dependencies(self, node_id, value): if node_id not in self.dependencies: return for target_node_id, threshold, target_value in self.dependencies[node_id]: if value > threshold: for client, _ in self.clients.values(): try: node = client.get_node(target_node_id) asyncio.run_coroutine_threadsafe( node.write_value(ua.DataValue(ua.Variant(float(target_value)))), asyncio.get_event_loop() ).result() if self.pubsub_publisher: try: self.pubsub_publisher.publish( self.pubsub_topic, json.dumps({ "event": "dependency_triggered", "source_node": node_id, "target_node": target_node_id, "threshold": threshold, "value_written": target_value, "timestamp": datetime.now().isoformat() }).encode("utf-8") ) except Exception as e: logger.error(f"Pub/Sub publish failed: {e}") self.add_alert(f"Dependency Triggered: {node_id} -> {target_node_id} = {target_value}") audit_logger.info( f"Dependency triggered: Wrote {target_value} to Kepware node {target_node_id}", extra={"user": self.current_user, "session_id": self.session_id} ) except Exception as e: logger.error(f"Failed to write to Kepware node {target_node_id}: {e}") self.add_alert(f"Dependency Write Error: {target_node_id} - {e}") def write_value(self): node_id = self.write_node_input.text() value = self.write_value_input.text() if not node_id or not value: self.show_error("Please enter Node ID and Value.") return try: for client, _ in self.clients.values(): node = client.get_node(node_id) asyncio.run_coroutine_threadsafe( node.write_value(ua.DataValue(ua.Variant(float(value)))), asyncio.get_event_loop() ).result() self.add_alert(f"Wrote {value} to {node_id}") audit_logger.info(f"Wrote value {value} to Kepware node {node_id}", extra={"user": self.current_user, "session_id": self.session_id}) self.show_info(f"Successfully wrote {value} to {node_id}") except Exception as e: logger.error(f"Error writing value to Kepware node {node_id}: {e}") self.show_error(f"Error writing value: {e}") self.add_alert(f"Write Error: {node_id} - {e}") def fetch_historical_data(self): node_id = self.history_node_input.text() duration = self.history_duration.value() if not node_id: self.show_error("Please enter Node ID.") return query = QSqlQuery(self.db) query.prepare("SELECT timestamp, value FROM tag_data WHERE node_id = ? AND timestamp >= ?") query.addBindValue(node_id) query.addBindValue((datetime.now() - timedelta(minutes=duration)).strftime("%Y-%m-%d %H:%M:%S")) if not query.exec_(): logger.error(f"Error fetching historical data: {query.lastError().text()}") self.show_error("Error fetching historical data.") return timestamps = [] values = [] while query.next(): timestamps.append(datetime.strptime(query.value(0), "%Y-%m-%d %H:%M:%S")) try: values.append(float(query.value(1))) except: continue if not values: self.show_error("No historical data found.") return self.history_canvas.figure.clear() ax = self.history_canvas.figure.add_subplot(111) ax.plot(timestamps, values, label=node_id) ax.set_xlabel("Time") ax.set_ylabel("Value") ax.legend() ax.grid(True) plt.setp(ax.get_xticklabels(), rotation=45) self.history_canvas.figure.tight_layout() self.history_canvas.draw() stats = f"Mean: {np.mean(values):.2f}, Std: {np.std(values):.2f}, Min: {np.min(values):.2f}, Max: {np.max(values):.2f}" self.history_stats_label.setText(f"Statistics: {stats}") audit_logger.info(f"Fetched historical data for Kepware node {node_id}", extra={"user": self.current_user, "session_id": self.session_id}) def filter_nodes(self): filter_text = self.filter_input.text().lower() for i in range(self.browse_tree.topLevelItemCount()): item = self.browse_tree.topLevelItem(i) visible = filter_text in item.text(0).lower() or filter_text in item.text(1).lower() item.setHidden(not visible) def filter_events(self): filter_text = self.event_filter_input.text().lower() for row in range(self.events_table.rowCount()): visible = filter_text in self.events_table.item(row, 0).text().lower() or filter_text in self.events_table.item(row, 1).text().lower() self.events_table.setRowHidden(row, not visible) def acknowledge_event(self, row): self.events_table.setItem(row, 4, QTableWidgetItem(self.current_user)) audit_logger.info(f"Acknowledged event at row {row}", extra={"user": self.current_user, "session_id": self.session_id}) def run_script(self): script = self.script_editor.toPlainText() if not script: self.show_error("No script provided.") return self.show_info("Lua script execution not implemented. Script received:\n" + script) audit_logger.info(f"Attempted to run Lua script on Kepware", extra={"user": self.current_user, "session_id": self.session_id}) def update_metrics(self): if not self.running: return uptime = (datetime.now() - self.uptime_start).total_seconds() / 60 if self.uptime_start else 0 update_rate = self.tag_update_count self.tag_update_count = 0 connection_success = (len(self.clients) / (len(self.clients) + sum(self.reconnect_attempts.values())) * 100) if self.clients else 0 self.kpi_label.setText(f"KPIs: Tag Update Rate: {update_rate:.0f}/s, Server Uptime: {uptime:.1f}m, Connection Success: {connection_success:.0f}%") self.kpi_canvas.figure.clear() ax = self.kpi_canvas.figure.add_subplot(111) ax.bar(["Update Rate", "Uptime (min)", "Conn. Success (%)"], [update_rate, uptime, connection_success], color=['#1f77b4', '#ff7f0e', '#2ca02c']) ax.set_ylabel("Value") self.kpi_canvas.draw() def toggle_logging(self): audit_logger.info(f"Data logging {'enabled' if self.log_check.isChecked() else 'disabled'}", extra={'user': self.current_user, 'session_id': self.session_id}) def log_data(self, name, node_id, value, timestamp): self.data_log.append({"Name": name, "Node ID": node_id, "Value": value, "Timestamp": timestamp}) if len(self.data_log) >= 100: try: df = pd.DataFrame(self.data_log) df.to_csv("opc_ua_data_log.csv", mode='a', index=False, header=not os.path.exists("opc_ua_data_log.csv")) self.data_log = [] except Exception as e: logger.error(f"Error logging data to CSV: {e}") def toggle_simulation(self): audit_logger.info(f"Tag simulation {'enabled' if self.simulate_check.isChecked() else 'disabled'}", extra={'user': self.current_user, 'session_id': self.session_id}) def toggle_validation(self): audit_logger.info(f"Data validation {'enabled' if self.validate_data_check.isChecked() else 'disabled'}", extra={'user': self.current_user, 'session_id': self.session_id}) def save_config(self): filename, _ = QFileDialog.getSaveFileName(self, "Save Config", "config.json", "JSON Files (*.json)") if filename: config = { "servers": list(self.clients.keys()), "favorites": list(self.favorites), "metadata": self.metadata_cache, "groups": self.tag_groups, "pubsub_topic": self.pubsub_input.text(), "settings": { "url": self.url_input.text(), "secondary_url": self.secondary_url_input.text(), "port": self.port_input.text(), "username": self.user_input.text(), "password": self.cipher.encrypt(self.pass_input.text().encode()).decode() if self.pass_input.text() else "", "security_policy": self.security_combo.currentText(), "polling_rate": self.poll_spin.value(), "browse_depth": self.depth_spin.value(), "batch_size": self.batch_size_spin.value(), "simulate_tags": self.simulate_check.isChecked(), "validate_data": self.validate_data_check.isChecked() } } try: with open(filename, 'w') as f: json.dump(config, f, indent=2) audit_logger.info(f"Saved encrypted configuration to {filename}", extra={"user": self.current_user, "session_id": self.session_id}) except Exception as e: logger.error(f"Error saving config: {e}") self.show_error(f"Error saving config: {e}") def load_config(self): filename, _ = QFileDialog.getOpenFileName(self, "Load Config", "", "JSON Files (*.json)") if filename: try: with open(filename, 'r') as f: config = json.load(f) self.favorites = set(config.get("favorites", [])) self.metadata_cache = config.get("metadata", {}) self.tag_groups = config.get("groups", {}) self.pubsub_input.setText(config.get("pubsub_topic", self.pubsub_topic)) settings = config.get("settings", {}) self.url_input.setText(settings.get("url", "")) self.secondary_url_input.setText(settings.get("secondary_url", "")) self.port_input.setText(settings.get("port", "49320")) self.user_input.setText(settings.get("username", "")) self.pass_input.setText(self.cipher.decrypt(settings.get("password", "").encode()).decode() if settings.get("password") else "") self.security_combo.setCurrentText(settings.get("security_policy", "None")) self.poll_spin.setValue(settings.get("polling_rate", 500)) self.depth_spin.setValue(settings.get("browse_depth", 5)) self.batch_size_spin.setValue(settings.get("batch_size", 10)) self.simulate_check.setChecked(settings.get("simulate_tags", False)) self.validate_data_check.setChecked(settings.get("validate_data", False)) self.load_groups() audit_logger.info(f"Loaded configuration from {filename}", extra={"user": self.current_user, "session_id": self.session_id}) except Exception as e: logger.error(f"Error loading config: {e}") self.show_error(f"Error loading config: {e}") def export_tags(self): filename, _ = QFileDialog.getSaveFileName(self, "Export Tags", "tags.csv", "CSV Files (*.csv)") if filename: tags = [] for node_id, (name, dtype, desc) in self.metadata_cache.items(): units, scaling = self.get_metadata(node_id) tags.append({"Node ID": node_id, "Name": name, "Data Type": dtype, "Description": desc, "Units": units, "Scaling": scaling}) try: df = pd.DataFrame(tags) df.to_csv(filename, index=False) audit_logger.info(f"Exported Kepware tags to {filename}", extra={"user": self.current_user, "session_id": self.session_id}) except Exception as e: logger.error(f"Error exporting tags: {e}") self.show_error(f"Error exporting tags: {e}") def import_tags(self): filename, _ = QFileDialog.getOpenFileName(self, "Import Tags", "", "CSV Files (*.csv)") if filename: try: df = pd.read_csv(filename) for _, row in df.iterrows(): node_id = row["Node ID"] self.metadata_cache[node_id] = (row["Name"], row["Data Type"], row["Description"]) self.save_metadata(node_id, row["Units"], str(row["Scaling"]), "", "") audit_logger.info(f"Imported tags from {filename}", extra={"user": self.current_user, "session_id": self.session_id}) except Exception as e: logger.error(f"Error importing tags: {e}") self.show_error(f"Error importing tags: {e}") def batch_write(self): if self.current_role != "Admin": self.show_error("Admin access required.") return filename, _ = QFileDialog.getOpenFileName(self, "Select CSV", "", "CSV Files (*.csv)") if filename: try: df = pd.read_csv(filename) for _, row in df.iterrows(): node_id = row["Node ID"] value = row["Value"] for client, _ in self.clients.values(): node = client.get_node(node_id) asyncio.run_coroutine_threadsafe( node.write_value(ua.DataValue(ua.Variant(float(value)))), asyncio.get_event_loop() ).result() audit_logger.info(f"Batch wrote {value} to Kepware node {node_id}", extra={"user": self.current_user, "session_id": self.session_id}) self.show_info("Batch write completed.") except Exception as e: logger.error(f"Error during batch write: {e}") self.show_error(f"Error during batch write: {e}") def manage_schedules(self): if self.current_role != "Admin": self.show_error("Admin access required.") return dialog = QDialog(self) dialog.setWindowTitle("Manage Schedules") layout = QVBoxLayout() table = QTableWidget() table.setColumnCount(5) table.setHorizontalHeaderLabels(["ID", "Task Type", "Node ID", "Value", "Interval (s)"]) table.setColumnWidth(0, 150) table.setColumnWidth(2, 200) query = QSqlQuery(self.db) if query.exec_("SELECT id, task_type, node_id, value, interval FROM schedules"): row = 0 while query.next(): table.insertRow(row) table.setItem(row, 0, QTableWidgetItem(query.value(0))) table.setItem(row, 1, QTableWidgetItem(query.value(1))) table.setItem(row, 2, QTableWidgetItem(query.value(2))) table.setItem(row, 3, QTableWidgetItem(query.value(3))) table.setItem(row, 4, QTableWidgetItem(str(query.value(4)))) row += 1 add_button = QPushButton("Add Schedule") add_button.clicked.connect(lambda: self.add_schedule(dialog)) delete_button = QPushButton("Delete Selected") delete_button.clicked.connect(lambda: self.delete_schedule(table)) layout.addWidget(table) layout.addWidget(add_button) layout.addWidget(delete_button) dialog.setLayout(layout) dialog.exec_() def add_schedule(self, dialog): task_dialog = QDialog(self) task_dialog.setWindowTitle("Add Schedule") layout = QFormLayout() task_type_input = QComboBox() task_type_input.addItems(["Read", "Write"]) node_id_input = QLineEdit() value_input = QLineEdit() interval_input = QLineEdit() layout.addRow("Task Type:", task_type_input) layout.addRow("Node ID:", node_id_input) layout.addRow("Value (for Write):", value_input) layout.addRow("Interval (s):", interval_input) save_button = QPushButton("Save") save_button.clicked.connect(lambda: self.save_schedule(task_type_input.currentText(), node_id_input.text(), value_input.text(), interval_input.text(), task_dialog, dialog)) layout.addWidget(save_button) task_dialog.setLayout(layout) task_dialog.exec_() def save_schedule(self, task_type, node_id, value, interval, task_dialog, parent_dialog): try: schedule_id = str(uuid.uuid4()) query = QSqlQuery(self.db) query.prepare("INSERT INTO schedules (id, task_type, node_id, value, interval) VALUES (?, ?, ?, ?, ?)") query.addBindValue(schedule_id) query.addBindValue(task_type) query.addBindValue(node_id) query.addBindValue(value) query.addBindValue(int(interval)) if not query.exec_(): logger.error(f"Error saving schedule: {query.lastError().text()}") self.show_error(f"Error saving schedule: {query.lastError().text()}") return self.schedules.append({"id": schedule_id, "task_type": task_type, "node_id": node_id, "value": value, "interval": int(interval), "last_run": 0}) self.schedule_timer.start(1000) audit_logger.info(f"Added schedule {schedule_id} for {task_type} on Kepware node {node_id}", extra={"user": self.current_user, "session_id": self.session_id}) task_dialog.accept() parent_dialog.accept() self.manage_schedules() except Exception as e: logger.error(f"Error saving schedule: {e}") self.show_error(f"Error saving schedule: {e}") def delete_schedule(self, table): selected_rows = [index.row() for index in table.selectionModel().selectedRows()] if not selected_rows: return for row in sorted(selected_rows, reverse=True): schedule_id = table.item(row, 0).text() query = QSqlQuery(self.db) query.prepare("DELETE FROM schedules WHERE id = ?") query.addBindValue(schedule_id) if not query.exec_(): logger.error(f"Error deleting schedule: {query.lastError().text()}") continue self.schedules = [s for s in self.schedules if s["id"] != schedule_id] table.removeRow(row) audit_logger.info(f"Deleted schedule {schedule_id}", extra={"user": self.current_user, "session_id": self.session_id}) if not self.schedules: self.schedule_timer.stop() def run_scheduled_tasks(self): current_time = time.time() for task in self.schedules: if current_time - task["last_run"] >= task["interval"]: if task["task_type"] == "Read": for client, _ in self.clients.values(): try: node = client.get_node(task["node_id"]) value = asyncio.run_coroutine_threadsafe(node.read_value(), asyncio.get_event_loop()).result() audit_logger.info(f"Scheduled read from Kepware node {task['node_id']}: {value}", extra={"user": self.current_user, "session_id": self.session_id}) except Exception as e: logger.error(f"Error in scheduled read: {e}") elif task["task_type"] == "Write": for client, _ in self.clients.values(): try: node = client.get_node(task["node_id"]) asyncio.run_coroutine_threadsafe( node.write_value(ua.DataValue(ua.Variant(float(task["value"])))), asyncio.get_event_loop() ).result() audit_logger.info(f"Scheduled write {task['value']} to Kepware node {task['node_id']}", extra={"user": self.current_user, "session_id": self.session_id}) except Exception as e: logger.error(f"Error in scheduled write: {e}") task["last_run"] = current_time def backup_data(self): filename, _ = QFileDialog.getSaveFileName(self, "Backup Data", "backup.json", "JSON Files (*.json)") if filename: data = { "tag_data": [], "tag_metadata": [], "tag_groups": [], "schedules": [], "dependencies": [] } query = QSqlQuery(self.db) for table in data.keys(): if query.exec_(f"SELECT * FROM {table}"): while query.next(): row = {} for i in range(query.record().count()): row[query.record().fieldName(i)] = query.value(i) data[table].append(row) try: with open(filename, 'w') as f: json.dump(data, f, indent=2) audit_logger.info(f"Backup created at {filename}", extra={"user": self.current_user, "session_id": self.session_id}) except Exception as e: logger.error(f"Error creating backup: {e}") self.show_error(f"Error creating backup: {e}") def restore_data(self): filename, _ = QFileDialog.getOpenFileName(self, "Restore Data", "", "JSON Files (*.json)") if filename: try: with open(filename, 'r') as f: data = json.load(f) query = QSqlQuery(self.db) query.exec_("BEGIN TRANSACTION") for table in data.keys(): query.exec_(f"DELETE FROM {table}") for row in data[table]: placeholders = ", ".join("?" * len(row)) query.prepare(f"INSERT INTO {table} ({', '.join(row.keys())}) VALUES ({placeholders})") for value in row.values(): query.addBindValue(value) if not query.exec_(): logger.error(f"Error restoring {table}: {query.lastError().text()}") query.exec_("COMMIT") self.load_groups() audit_logger.info(f"Data restored from {filename}", extra={"user": self.current_user, "session_id": self.session_id}) self.show_info("Data restored successfully.") except Exception as e: logger.error(f"Error restoring data: {e}") self.show_error(f"Error restoring data: {e}") query.exec_("ROLLBACK") def generate_report(self): filename, _ = QFileDialog.getSaveFileName(self, "Generate Report", "report.pdf", "PDF Files (*.pdf)") if filename: try: import matplotlib.backends.backend_pdf pdf = matplotlib.backends.backend_pdf.PdfPages(filename) fig = plt.Figure() canvas = FigureCanvas(fig) ax = fig.add_subplot(111) query = QSqlQuery(self.db) if query.exec_("SELECT node_id, COUNT(*) as count FROM tag_data GROUP BY node_id"): nodes = [] counts = [] while query.next(): nodes.append(query.value(0)) counts.append(query.value(1)) ax.bar(nodes, counts, color=['#1f77b4', '#ff7f0e', '#2ca02c']) ax.set_title("Tag Data Counts") ax.set_ylabel("Number of Entries") plt.xticks(rotation=45) pdf.savefig(fig) pdf.close() audit_logger.info(f"Report generated at {filename}", extra={"user": self.current_user, "session_id": self.session_id}) self.show_info(f"Report generated and saved to {filename}") except Exception as e: logger.error(f"Error generating report: {e}") self.show_error(f"Error generating report: {e}") def show_error(self, message): QMessageBox.critical(self, "Error", message) def show_info(self, message): QMessageBox.information(self, "Info", message) def add_alert(self, message): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.alerts.append({"timestamp": timestamp, "message": message, "status": "Active"}) self.update_alerts_table() def update_alerts(self): for alert in self.alerts[:]: if datetime.now() > datetime.strptime(alert["timestamp"], "%Y-%m-%d %H:%M:%S") + timedelta(minutes=15): alert["status"] = "Resolved" self.update_alerts_table() def update_alerts_table(self): self.alerts_table.setRowCount(0) for idx, alert in enumerate(self.alerts): row = self.alerts_table.rowCount() self.alerts_table.insertRow(row) self.alerts_table.setItem(row, 0, QTableWidgetItem(alert["timestamp"])) self.alerts_table.setItem(row, 1, QTableWidgetItem(alert["message"])) self.alerts_table.setItem(row, 2, QTableWidgetItem(alert["status"])) if alert["status"] == "Active": self.alert_display_label.setText(alert["message"]) if alert["status"] == "Resolved": self.alerts.remove(alert) self.alerts_table.removeRow(row) break self.alerts_table.viewport().update() def set_defaults(self): self.url_input.setText("localhost") self.port_input.setText("49320") self.user_input.setText("") self.pass_input.setText("") self.security_combo.setCurrentText("None") self.poll_spin.setValue(500) self.depth_spin.setValue(5) self.batch_size_spin.setValue(10) self.simulate_check.setChecked(False) self.validate_data_check.setChecked(False) audit_logger.info("Reset to default settings", extra={"user": self.current_user, "session_id": self.session_id}) def update_polling(self, row): polling, ok = QInputDialog.getInt(self, "Update Polling", "New Polling Rate (ms):", self.monitor_table.item(row, 4).text(), 100, 5000) if ok: self.monitor_table.setItem(row, 4, QTableWidgetItem(str(polling))) node_id = self.monitor_table.item(row, 1).text() for client, _ in self.clients.values(): for sub, node, handle in self.subscriptions.get(client, []): if str(node.nodeid) == node_id: asyncio.run_coroutine_threadsafe( sub.modify_polling(polling), asyncio.get_event_loop() ).result() audit_logger.info(f"Updated polling rate for Kepware node {node_id} to {polling}ms", extra={"user": self.current_user, "session_id": self.session_id}) break def remove_monitored_tag(self, row): node_id = self.monitor_table.item(row, 1).text() for client, _ in self.clients.values(): for sub, node, handle in self.subscriptions.get(client, []): if str(node.nodeid) == node_id: asyncio.run_coroutine_threadsafe(sub.delete(), asyncio.get_event_loop()).result() self.subscriptions[client].remove((sub, node, handle)) self.monitor_table.removeRow(row) audit_logger.info(f"Removed monitored Kepware node {node_id}", extra={"user": self.current_user, "session_id": self.session_id}) break async def diagnose_node(self, client, node, node_id): try: start_time = time.time() await node.read_value() latency = (time.time() - start_time) * 1000 self.diagnostic_log.append({ "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "event": "Node Diagnosis", "details": f"Node {node_id} responded in {latency:.2f} ms" }) query = QSqlQuery(self.db) query.prepare("INSERT INTO diagnostics (timestamp, event, details) VALUES (?, ?, ?)") query.addBindValue(self.diagnostic_log[-1]["timestamp"]) query.addBindValue(self.diagnostic_log[-1]["event"]) query.addBindValue(self.diagnostic_log[-1]["details"]) if not query.exec_(): logger.error(f"Error logging diagnostics: {query.lastError().text()}") self.add_alert(f"Diagnosis: {node_id} responded in {latency:.2f} ms") audit_logger.info(f"Diagnosed Kepware node {node_id} with latency {latency:.2f}ms", extra={"user": self.current_user, "session_id": self.session_id}) except Exception as e: self.diagnostic_log.append({ "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "event": "Node Diagnosis", "details": f"Node {node_id} failed to respond: {e}" }) query = QSqlQuery(self.db) query.prepare("INSERT INTO diagnostics (timestamp, event, details) VALUES (?, ?, ?)") query.addBindValue(self.diagnostic_log[-1]["timestamp"]) query.addBindValue(self.diagnostic_log[-1]["event"]) query.addBindValue(self.diagnostic_log[-1]["details"]) if not query.exec_(): logger.error(f"Error logging diagnostics: {query.lastError().text()}") self.add_alert(f"Diagnosis Error: {node_id} - {e}") logger.error(f"Diagnosis failed for Kepware node {node_id}: {e}") def diagnose_monitored_tag(self, row): node_id = self.monitor_table.item(row, 1).text() for client, _ in self.clients.values(): try: node = client.get_node(node_id) worker = Worker(self.diagnose_node, client, node, node_id) # Pass the async function self.threadpool.start(worker) except Exception as e: logger.error(f"Error diagnosing monitored tag {node_id}: {e}") self.add_alert(f"Diagnosis Error: {node_id} - {e}") def show_diagnostic_log(self): dialog = QDialog(self) dialog.setWindowTitle("Diagnostic Log") layout = QVBoxLayout() log_table = QTableWidget() log_table.setColumnCount(3) log_table.setHorizontalHeaderLabels(["Timestamp", "Event", "Details"]) log_table.setColumnWidth(0, 200) log_table.setColumnWidth(1, 150) query = QSqlQuery(self.db) if query.exec_("SELECT timestamp, event, details FROM diagnostics ORDER BY timestamp DESC LIMIT 100"): while query.next(): row = log_table.rowCount() log_table.insertRow(row) log_table.setItem(row, 0, QTableWidgetItem(query.value(0))) log_table.setItem(row, 1, QTableWidgetItem(query.value(1))) log_table.setItem(row, 2, QTableWidgetItem(query.value(2))) layout.addWidget(log_table) dialog.setLayout(layout) dialog.exec_() def log_diagnostic(self): if self.running: self.diagnostic_log.append({ "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "event": "System Health", "details": f"Active connections: {len(self.clients)}, Pending tasks: {self.threadpool.activeThreadCount()}" }) query = QSqlQuery(self.db) query.prepare("INSERT INTO diagnostics (timestamp, event, details) VALUES (?, ?, ?)") query.addBindValue(self.diagnostic_log[-1]["timestamp"]) query.addBindValue(self.diagnostic_log[-1]["event"]) query.addBindValue(self.diagnostic_log[-1]["details"]) if not query.exec_(): logger.error(f"Error logging diagnostics: {query.lastError().text()}") # Move this block outside the class if __name__ == "__main__": app = QApplication(sys.argv) translator = QTranslator() window = HyperAdvancedOPCUClientGUI() window.show() sys.exit(app.exec_()) - Initial Deployment
4868374 verified metadata
title: dk
emoji: 🐳
colorFrom: blue
colorTo: blue
sdk: static
pinned: false
tags:
- deepsite
Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference