# ============================================================================== # 1. IMPORTAÇÕES & CONFIGURAÇÃO # ============================================================================== from modules.brain import call_aivion_brain, generate_diagnostic_report, generate_math_steps, reset_math_state, NForgeLabEngine, ArtificialNervousSystem from modules.vision import analyze_image_text from modules.pid_engine import IndustrialPID, ThermalProcess, PressureProcess, LevelProcess, ChemicalReactor, calculate_performance_metrics, run_robustness_test from modules.cmms_engine import render_cmms_tab from modules.calculus import solve_calculus from modules.rag_engine import build_vector_store, query_knowledge_base from modules.plc_studio import PLCEngine from modules.robotics_lab import RoboticArm from modules.logic_lab import solve_logic import streamlit as st import pandas as pd import numpy as np import plotly.graph_objects as go import streamlit.components.v1 as components import time import re import datetime import math st.set_page_config(page_title="NForge Lab", layout="wide", page_icon="🧬") # CSS Avançado st.markdown(""" """, unsafe_allow_html=True) # ============================================================================== # 2. ESTADO E CACHE # ============================================================================== @st.cache_data(show_spinner=False) def cached_solve_calculus(expr, op, lims, params): return solve_calculus(expr, op, lims, params) @st.cache_data(show_spinner=False) def cached_math_steps(expr, op, lims, engine): return generate_math_steps(expr, op, lims, engine) if "nforge_lab_engine" not in st.session_state: st.session_state.nforge_lab_engine = NForgeLabEngine() st.session_state.ans_brain = ArtificialNervousSystem() if "robot" not in st.session_state: st.session_state.robot = RoboticArm() if "chat_history" not in st.session_state: st.session_state.chat_history = [] if "pid_kp" not in st.session_state: st.session_state.pid_kp, st.session_state.pid_ki, st.session_state.pid_kd = 2.0, 0.5, 1.0 if "robot_trace" not in st.session_state: st.session_state.robot_trace = [] if "last_traj" not in st.session_state: st.session_state.last_traj = [] if "teach_points_6" not in st.session_state: st.session_state.teach_points_6 = [] if "robot_trace_3d" not in st.session_state: st.session_state.robot_trace_3d = [] if "plc_engine" not in st.session_state: plc = PLCEngine(); plc.add_rung('I0.0', 'OR', 'Q0.0', 'NORMAL', 'Q0.0'); st.session_state.plc_engine = plc # ESTADOS SCADA (NOVOS) if "scada_live" not in st.session_state: st.session_state.scada_live = False if "scada_alarms" not in st.session_state: st.session_state.scada_alarms = [] if "scada_v" not in st.session_state: st.session_state.scada_v = 2.5 if "scada_t" not in st.session_state: st.session_state.scada_t = 45.0 # ============================================================================== # 3. HELPERS GLOBAIS # ============================================================================== def get_system_context(): ctx = "--- REAL-TIME FACTORY DATA ---\n" if "ans_brain" in st.session_state: hist = st.session_state.ans_brain.history last_vib = hist["vib"][-1] if hist["vib"] else 0.0 ctx += f"[SCADA] Vibration: {last_vib:.2f} mm/s\n" if "pid_metrics" in st.session_state: m = st.session_state.pid_metrics ctx += f"[PID] Stability: {m.get('Estabilidade','N/A')} | Overshoot: {m.get('Overshoot','N/A')}\n" if "plc_engine" in st.session_state: p = st.session_state.plc_engine ctx += f"[PLC] Motor State: {'ON' if p.memory['Q0.0'] else 'OFF'}\n" return ctx def clean_latex_response(text): if not text: return "" text = re.sub(r'\\\[(.*?)\\\]', r'$$\1$$', text, flags=re.DOTALL) text = re.sub(r'\\\((.*?)\\\)', r'$\1$', text, flags=re.DOTALL) return text def render_visual_rung(rung, plc_memory, plc_timers): def get_st(t): if not t or t=="NONE": return False clean = t[1:] if t.startswith('!') else t val = plc_memory.get(clean, False) if clean in plc_timers: val = plc_timers[clean].done return not val if t.startswith('!') else val def comp(t, s, shp=""): if not t or t=="NONE": return "" icon = "|/|" if "!" in t else "| |" if shp=="coil": icon = "( )" if "T" in t and shp=="coil": icon = "(TON)" return f'
{icon}
{t}
' sa = get_st(rung['contact_a']); sb = get_st(rung['contact_b']) wa = sa; wb = False; fo = wa if rung['op'] == 'AND': wb=sb; fo = wa and wb elif rung['op'] == 'OR': fo = wa or sb dest_s = plc_memory.get(rung['dest'], False) if rung['dest'] in plc_timers: dest_s = plc_timers[rung['dest']].done h = f'
' h += f'
' h += comp(rung['contact_a'], sa) if rung['op'] == 'AND': h += f'
{comp(rung["contact_b"], sb)}' elif rung['op'] == 'OR': h += f'
OR
{comp(rung["contact_b"], sb)}' h += f'
' h += comp(rung['dest'], dest_s, "coil ladder-coil") h += f'
' h += '
' return h def swap_units(k1, k2): st.session_state[k1], st.session_state[k2] = st.session_state[k2], st.session_state[k1] def get_ph_color(ph): if ph < 4: return "#ff1a1a" if ph < 6.5: return "#ff9900" if ph <= 7.5: return "#00ff00" if ph < 10: return "#00ccff" return "#8c1aff" ACID_OPTIONS = {"Ácido Sulfúrico (H2SO4) - Forte": 1.5, "Ácido Clorídrico (HCl) - Forte": 1.4, "Ácido Cítrico (C6H8O7) - Fraco": 0.5, "Gás Carbônico (CO2) - Fraco": 0.3} BASE_OPTIONS = {"Hidróxido de Sódio (NaOH) - Forte": 1.5, "Hidróxido de Cálcio (Ca(OH)2) - Forte": 1.3, "Carbonato de Sódio (Na2CO3) - Fraco": 0.6, "Amônia (NH3) - Fraca": 0.4} def generate_motor_hmi_html(v, t): """Gera um Motor Industrial Dinâmico (HTML/CSS) para o SCADA.""" if t < 60: body_color = "#3a4a5a" # Normal/Frio (Azul metálico) elif t < 90: body_color = "#b87333" # Alerta (Cobre/Laranja) else: body_color = "#ff3333" # Crítico (Vermelho incandescente) shake_class = "motor-shake" if v > 4.5 else "" anim_speed = max(0.05, 0.5 - (v/30.0)) # Treme mais rápido com vibração alta return f"""
T: {t:.1f}°C | V: {v:.1f}
""" def generate_3d_pinout_html(hw_name, in_pins, out_pins): if "ESP32 WROOM" in hw_name: pins_per_side, chip_label = 19, "ESP32
WROOM" elif "ESP32-S3" in hw_name: pins_per_side, chip_label = 22, "ESP32-S3
AI CORE" elif "ESP32-C3" in hw_name: pins_per_side, chip_label = 15, "ESP32-C3
RISC-V" elif "ESP8266" in hw_name: pins_per_side, chip_label = 15, "ESP8266
NodeMCU" elif "Uno" in hw_name or "Nano" in hw_name: pins_per_side, chip_label = 15, "ARDUINO
" + ("UNO" if "Uno" in hw_name else "NANO") elif "Mega" in hw_name: pins_per_side, chip_label = 27, "ARDUINO
MEGA" elif "Pico" in hw_name: pins_per_side, chip_label = 20, "RP2040
" + ("PICO W" if "W" in hw_name else "PICO") elif "Raspberry" in hw_name: pins_per_side, chip_label = 20, "RPI 4
HEADER" else: pins_per_side, chip_label = 15, "MCU
GENERIC" board_w = 140 board_h = max(200, pins_per_side * 20 + 20) board_d = 12 html = f"""
■ IN (Sensor)    ■ OUT (Atuador)
🖱️ Clique e arraste para girar a placa em 3D
{chip_label}
NFORGE LAB HIL 3D
""" for i in range(pins_per_side): pin_num = str(i) color = "#00ccff" if pin_num in in_pins else ("#ff6600" if pin_num in out_pins else "#555") glow = f"box-shadow: 0 0 12px {color};" if color != "#555" else "box-shadow: 1px 1px 2px #000;" top_pos = 15 + i * 20 html += f"""
P{pin_num}
""" for i in range(pins_per_side, pins_per_side*2): pin_num = str(i) color = "#00ccff" if pin_num in in_pins else ("#ff6600" if pin_num in out_pins else "#555") glow = f"box-shadow: 0 0 12px {color};" if color != "#555" else "box-shadow: 1px 1px 2px #000;" top_pos = 15 + (i - pins_per_side) * 20 html += f"""
P{pin_num}
""" html += """
""" return html # ============================================================================== # 4. INTERFACE PRINCIPAL # ============================================================================== st.sidebar.title("🧬 NForge Lab") st.sidebar.caption("V137.0 | SCADA Control Room Edition") if st.sidebar.button("🔍 Diagnóstico Geral", key="btn_main_diag"): from modules.diagnostics import display_diagnostic_dashboard display_diagnostic_dashboard() tabs = st.tabs([ "🧠 Brain", "🤖 Robotics", "📡 SCADA", "📈 PID Pro", "⚡ PLC Studio", "🧮 Math", "🛠️ CMMS", "👁️ Vision", "🔄 Conversor", "🧪 Química", "💻 Embedded" ]) # ------------------------------------------------------------------------------ # ABA 1: BRAIN (V144.0 - AUTONOMOUS AGENT, MULTI-AGENT & MERMAID CAD) # ------------------------------------------------------------------------------ with tabs[0]: st.header("🧠 Neuralk Brain (AI Agent)") st.caption("Agente Autônomo, Comitê Multi-Agente e Geração Dinâmica de Diagramas (Mermaid.js).") c_p1, c_p2 = st.columns([3, 1]) with c_p1: persona = st.radio("Persona Principal:", ["👷 Eng. Geral", "⚡ Eletricista", "🛡️ Safety", "💻 Dev", "🧮 Math"], horizontal=True, key="brain_persona") use_committee = st.toggle("🏛️ Ativar Comitê Multi-Agente (Debate de Especialistas)") with c_p2: st.success("AGENTIC SYSTEM ONLINE") st.info("💡 **Dicas:**\n- Diga *'Ligar o motor'* para acionar o CLP.\n- Diga *'Ajustar Kp para 5'* para sintonizar o PID.\n- Diga *'Gera o diagrama de blocos de um PID'*.") # Função Especial para Renderizar Chat + Diagramas Mermaid Dinamicamente def render_chat_message(role, content): with st.chat_message(role): # Divide o texto onde encontrar blocos de diagrama Mermaid parts = re.split(r'```mermaid(.*?)```', content, flags=re.DOTALL) for i, part in enumerate(parts): if i % 2 == 0: st.markdown(part) # Renderiza texto e Markdown normal else: # Injeta o motor Mermaid.js e desenha o diagrama no meio do chat htmlcode = f"""
{part.strip()}
""" components.html(htmlcode, height=350, scrolling=True) # Renderiza o Histórico for msg in st.session_state.chat_history: render_chat_message(msg["role"], msg["content"]) # Input do Utilizador e Processamento Agêntico if user_in := st.chat_input("Comande a fábrica, peça um diagrama ou faça uma pergunta..."): # --- 1. AÇÃO DO AGENTE AUTÔNOMO (ROTEAMENTO E ATUAÇÃO) --- agent_action = None cmd = user_in.lower() # Atuação sobre o CLP (Aba 5) if "ligar motor" in cmd or "liga o motor" in cmd: if "plc_engine" in st.session_state: st.session_state.plc_engine.memory['Q0.0'] = True agent_action = "🔌 **[Agente Autônomo]** Motor Q0.0 LIGADO com sucesso no PLC Studio." elif "desligar motor" in cmd or "desliga o motor" in cmd: if "plc_engine" in st.session_state: st.session_state.plc_engine.memory['Q0.0'] = False agent_action = "🛑 **[Agente Autônomo]** Motor Q0.0 DESLIGADO com sucesso no PLC Studio." # Atuação sobre a Sintonia do PID (Aba 4 e 10) kp_match = re.search(r'kp\s*(?:para|pra|=)?\s*(\d+[\.,]?\d*)', cmd) if kp_match: novo_kp = float(kp_match.group(1).replace(',','.')) st.session_state.kp_apc = novo_kp # PID Aba 4 st.session_state.ph_kp = novo_kp # PID Aba Química agent_action = f"⚙️ **[Agente Autônomo]** Parâmetro Proporcional (Kp) ajustado globalmente para {novo_kp}." # Regista e mostra a entrada do Utilizador st.session_state.chat_history.append({"role": "user", "content": user_in}) render_chat_message("user", user_in) # Se o agente atuou na fábrica, notifica visualmente no chat if agent_action: st.session_state.chat_history.append({"role": "assistant", "content": agent_action}) render_chat_message("assistant", agent_action) # --- 2. GERAÇÃO DE CONTEXTO E PROMPT DE CONTENÇÃO --- ctx = get_system_context() if use_committee: sys_prompt = """[SYSTEM] Atue como um Comitê Multi-Agente de Engenharia. Você é composto por: 👨‍🔬 Eng. de Processos, 🛡️ Eng. de Segurança e 💻 Dev de Software. Discutam o problema. Encerre com um '🧠 Veredito Final'. [REGRA]: Se for pedido código (Python, C++, etc), o Dev deve fornecê-lo na linguagem correta. Se for pedido diagrama, use um bloco ```mermaid.""" else: sys_prompt = f"""[SYSTEM] Persona atual: {persona}. Aja como este especialista. Se a persona for '💻 Dev', atue como um Engenheiro de Software Sênior (Python, C++, IA, Automação). [REGRA DE CÓDIGO E CONTEXTO]: Se o usuário pedir código (ex: Python, SymPy, Pandas), escreva EXATAMENTE na linguagem solicitada usando blocos ```python, ```cpp, etc. NÃO force códigos de Arduino/CLP a menos que seja explicitamente solicitado. [REGRA DE DIAGRAMA]: Só gere blocos ```mermaid se o usuário escrever a palavra 'diagrama', 'fluxograma' ou 'arquitetura'. Caso contrário, responda em texto normal.""" # O truque mestre: Mandar a IA ignorar a fábrica se a pergunta for de programação geral full_prompt = f"{sys_prompt}\n\n[CONTEXTO DA FÁBRICA] (Use apenas se a pergunta for sobre a fábrica. Ignore totalmente se for uma pergunta geral de programação de software):\n{ctx}\n\n[USER] {user_in}" # --- 3. CHAMADA AO MOTOR NEURAL E RESPOSTA --- with st.spinner("Neuralk AI Agent a processar a matriz lógica..."): raw_resp = call_aivion_brain("Engenheiro", full_prompt, st.session_state.chat_history, "llama-3.1-8b-instant") final_resp = clean_latex_response(raw_resp) st.session_state.chat_history.append({"role": "assistant", "content": final_resp}) render_chat_message("assistant", final_resp) # ------------------------------------------------------------------------------ # ABA 2: ROBOTICS (V147.0 - FULL KINEMATICS & AI VISION PATH PLANNING) # ------------------------------------------------------------------------------ with tabs[1]: st.header("🤖 Robotics (6-Axis) & Path Planning") st.caption("Controle total: Cinemática Direta, Inversa, Teach Pendant e Visão Computacional com desvio de obstáculos.") robot = st.session_state.robot # Layout assimétrico para dar mais espaço ao gráfico 3D c_ctrl, c_viz = st.columns([1.2, 2.2]) with c_ctrl: mode = st.radio("Modo de Operação do Robô:", ["Jog Manual", "Cinemática Inversa", "Teach Mode", "Visão & Autonomia (IA)"], horizontal=True, key="rob_mode") st.divider() # Variáveis base para a IA de Visão (Target e Obstáculo) px, py, pz = 2.0, 0.0, 0.5 ox, oy, oz = 1.5, 0.0, 1.0 # --------------------------------------------------------- # 1. JOG MANUAL (Cinemática Direta) # --------------------------------------------------------- if mode == "Jog Manual": st.markdown("#### Controle Articular (Juntas)") j1 = st.slider("J1 (Base)", -170, 170, 0, key="r_j1") j2 = st.slider("J2 (Ombro)", -100, 135, 0, key="r_j2") j3 = st.slider("J3 (Cotovelo)", -100, 150, 0, key="r_j3") j4 = st.slider("J4 (Pulso 1)", -350, 350, 0, key="r_j4") j5 = st.slider("J5 (Pulso 2)", -120, 120, -90, key="r_j5") j6 = st.slider("J6 (Flange)", -350, 350, 0, key="r_j6") joints, T = robot.forward_kinematics(j1, j2, j3, j4, j5, j6) st.session_state.last_angles = (j1, j2, j3, j4, j5, j6) # --------------------------------------------------------- # 2. CINEMÁTICA INVERSA # --------------------------------------------------------- elif mode == "Cinemática Inversa": st.markdown("#### Coordenadas Alvo (TCP)") x = st.number_input("Posição X", -3., 3., 1.5, key="r_x", step=0.1) y = st.number_input("Posição Y", -3., 3., 0., key="r_y", step=0.1) z = st.number_input("Posição Z", 0., 3., 1.5, key="r_z", step=0.1) if st.button("🚀 Calcular Trajetória e Mover", key="r_move", type="primary"): with st.spinner("Resolvendo matrizes Jacobianas..."): ang, msg = robot.inverse_kinematics(x, y, z, 0, 0, 0) if ang: tr = robot.generate_trajectory(st.session_state.get("last_angles", (0,)*6), ang, 15) st.session_state.last_traj = tr st.session_state.last_angles = ang joints, T = robot.forward_kinematics(*ang) st.success("Movimento concluído com sucesso!") else: st.error(f"Falha na Cinemática: {msg}") joints, T = robot.forward_kinematics(*st.session_state.get("last_angles", (0,)*6)) else: joints, T = robot.forward_kinematics(*st.session_state.get("last_angles", (0,)*6)) # --------------------------------------------------------- # 3. TEACH PENDANT (Gravação de Ciclos) # --------------------------------------------------------- elif mode == "Teach Mode": st.markdown("#### Memória de Pontos (Waypoints)") st.info(f"📍 Pontos Gravados na Memória: **{len(st.session_state.teach_points_6)}**") c_t1, c_t2 = st.columns(2) if c_t1.button("⏺️ Gravar Ponto Atual", key="r_rec"): st.session_state.teach_points_6.append(st.session_state.last_angles) st.success("Ponto Adicionado!") if c_t2.button("🗑️ Limpar", key="r_clear"): st.session_state.teach_points_6 = [] st.warning("Memória apagada.") st.markdown("---") if st.button("▶️ Reproduzir Ciclo de Trabalho", key="r_play", type="primary", use_container_width=True): wps = st.session_state.teach_points_6 if len(wps) > 1: full_traj = [] pb = st.progress(0, text="Interpolação de Trajetória...") for i in range(len(wps)-1): tr = robot.generate_trajectory(wps[i], wps[i+1], 10) full_traj.extend(tr) time.sleep(0.05) pb.progress((i+1)/(len(wps)-1), text=f"Movendo para Waypoint {i+2}") st.session_state.last_traj = full_traj st.success("Ciclo de Trabalho Finalizado!") else: st.error("Grave pelo menos 2 pontos para criar uma rota mecânica.") joints, T = robot.forward_kinematics(*st.session_state.get("last_angles", (0,)*6)) # --------------------------------------------------------- # 4. VISÃO COMPUTACIONAL & PATH PLANNING (A*) # --------------------------------------------------------- elif mode == "Visão & Autonomia (IA)": st.markdown("#### Câmera Virtual & Path Planning") st.info("👁️ A IA identificou a Peça Alvo e detetou um Obstáculo na rota linear direta.") c_p_coord, c_o_coord = st.columns(2) with c_p_coord: st.success("🎯 Peça Alvo (Verde)") px = st.number_input("Peça X", value=2.0, step=0.1) py = st.number_input("Peça Y", value=0.0, step=0.1) pz = st.number_input("Peça Z", value=0.2, step=0.1) with c_o_coord: st.error("⚠️ Obstáculo (Vermelho)") st.write(f"**Posição Física:**\nX: {ox} \nY: {oy} \nZ: {oz}") st.markdown("---") if st.button("🧠 Calcular Rota de Evasão (A*)", type="primary", use_container_width=True): with st.spinner("Motor de IA a calcular vetores de evasão 3D..."): # Lógica de Evasão: Criar um Waypoint "Seguro" 80cm acima do obstáculo safe_x, safe_y, safe_z = ox, oy, oz + 0.8 # Calcula as duas pernas da viagem ang_safe, msg1 = robot.inverse_kinematics(safe_x, safe_y, safe_z, 0, 0, 0) ang_target, msg2 = robot.inverse_kinematics(px, py, pz, 0, 0, 0) if ang_safe and ang_target: # Funde as trajetórias (Atual -> Ponto Seguro Acima -> Alvo Final) tr_part1 = robot.generate_trajectory(st.session_state.get("last_angles", (0,)*6), ang_safe, 15) tr_part2 = robot.generate_trajectory(ang_safe, ang_target, 15) st.session_state.last_traj = tr_part1 + tr_part2 st.session_state.last_angles = ang_target joints, T = robot.forward_kinematics(*ang_target) st.success("✅ Caminho Seguro Encontrado! Trajetória Fundida com sucesso.") else: st.error("Falha no Path Planning: Alvo ou Ponto de Fuga fora do alcance mecânico.") joints, T = robot.forward_kinematics(*st.session_state.get("last_angles", (0,)*6)) else: joints, T = robot.forward_kinematics(*st.session_state.get("last_angles", (0,)*6)) # --------------------------------------------------------- # PAINEL DE TELEMETRIA FIXA (Mantido em todos os modos) # --------------------------------------------------------- st.divider() st.markdown("#### Telemetria em Tempo Real") st.code(f"Posição TCP (Efetuador):\nX: {T[0,3]:.3f} m\nY: {T[1,3]:.3f} m\nZ: {T[2,3]:.3f} m", language="yaml") with c_viz: # Renderização do Manipulador Robótico 3D fig = robot.plot_robot(joints, T_tcp=T) # Injeção de Objetos Virtuais (Visão Computacional) if mode == "Visão & Autonomia (IA)": # Renderiza o Obstáculo (Caixa Vermelha) fig.add_trace(go.Scatter3d(x=[ox], y=[oy], z=[oz], mode='markers', marker=dict(size=25, color='red', symbol='square'), name='Caixa Obstáculo')) # Renderiza a Peça Alvo (Círculo Verde) fig.add_trace(go.Scatter3d(x=[px], y=[py], z=[pz], mode='markers', marker=dict(size=8, color='#00ff00', symbol='circle'), name='Peça Alvo')) # Mostra o Robô st.plotly_chart(fig, use_container_width=True) # Renderização do Painel de Dinâmica e Gráficos de Trajetória if st.session_state.last_traj: st.markdown("#### 📈 Perfil de Velocidade e Trajetória Articular") st.plotly_chart(robot.plot_telemetry(st.session_state.last_traj), use_container_width=True) # ------------------------------------------------------------------------------ # ABA 3: SCADA CONTROL ROOM (V210.1 - CLOUD-SAFE, APM & AI ASSET ONBOARDING) # ------------------------------------------------------------------------------ import time import math import datetime import numpy as np import json # Necessário para o conversor da IA with tabs[2]: st.header("📡 SCADA & Asset Performance Management (APM)") st.caption("Gêmeo Digital, ESD Interlock, Sensor Fusion e Configurador Inteligente de Ativos (IA).") # --- 1. GESTÃO DE ESTADO DO SCADA VIVO E REGISTRO DE ATIVOS --- if "scada_live" not in st.session_state: st.session_state.scada_live = False if "scada_interlock" not in st.session_state: st.session_state.scada_interlock = False if "anomaly_score" not in st.session_state: st.session_state.anomaly_score = 0.0 # Base de dados dinâmica de ativos (Substitui o hardcode) if "asset_registry" not in st.session_state: st.session_state.asset_registry = { "WEG W22 Premium": {"Power": "50 CV", "RPM": 1750, "TempMax": 90}, "Siemens SD": {"Power": "75 CV", "RPM": 3600, "TempMax": 105} } # Filas (Buffers) para o Live Historian e FFT if "hist_v" not in st.session_state: st.session_state.hist_v = [st.session_state.scada_v] * 50 if "hist_t" not in st.session_state: st.session_state.hist_t = [st.session_state.scada_t] * 50 if "hist_fft" not in st.session_state: st.session_state.hist_fft = [np.zeros(100) for _ in range(30)] # --- 2. CONFIGURADOR DE ATIVOS VIA INTELIGÊNCIA ARTIFICIAL --- with st.expander("🛠️ Cadastro Inteligente de Ativos (Onboarding por IA)", expanded=False): st.markdown("Descreva a máquina física. O **Neuralk AI** vai inferir a rotação, potência e temperatura de colapso para calibrar o Gêmeo Digital.") col_desc, col_btn = st.columns([3, 1]) with col_desc: new_asset_desc = st.text_input("Descrição do Ativo (Ex: 'Bomba Sulzer de Água Fria 150CV' ou 'Turbina a Gás GE'):", key="ai_asset_input_v3") with col_btn: st.markdown("
", unsafe_allow_html=True) if st.button("✨ Gerar Gêmeo Digital", use_container_width=True, type="secondary", key="btn_infer_ai_v3"): if new_asset_desc: with st.spinner("A modelar física do equipamento..."): # Prompt rigoroso para forçar um JSON limpo prompt = f"""Atue como um Engenheiro Sênior de Confiabilidade. O utilizador quer cadastrar esta máquina no SCADA: '{new_asset_desc}'. Retorne EXATAMENTE UM JSON VÁLIDO (sem texto antes ou depois, sem formatação markdown) com estes limites estimados: {{"Nome": "Nome Curto para Painel", "Power": "Potência", "RPM": 1750, "TempMax": 95}} A 'TempMax' deve ser um valor inteiro em Graus Celsius antes de falha catastrófica. O 'RPM' deve ser inteiro.""" try: # Chama o cérebro (Garante que a função call_aivion_brain está importada) resposta_ia = call_aivion_brain("Engenheiro", prompt, [], "llama-3.1-8b-instant") # Limpeza de markdown teimoso clean_json = resposta_ia.replace("```json", "").replace("```", "").strip() new_specs = json.loads(clean_json) # Injeta na memória da planta st.session_state.asset_registry[new_specs["Nome"]] = { "Power": str(new_specs["Power"]), "RPM": int(new_specs["RPM"]), "TempMax": int(new_specs["TempMax"]) } st.success(f"✅ Ativo '{new_specs['Nome']}' integrado! (RPM: {new_specs['RPM']} | Interlock: {new_specs['TempMax']}°C)") except Exception as e: st.error(f"A IA gerou um formato inválido. Tente reformular a descrição. Erro: {str(e)}\n\nDebug: {resposta_ia}") # --- 3. BARRA DE CONTROLO SUPERIOR --- c_top1, c_top2, c_top3 = st.columns([1.5, 1, 1.5]) with c_top1: # A Seleção agora lê do Dicionário Dinâmico m = st.selectbox("Máquina (Ativo Industrial)", list(st.session_state.asset_registry.keys()), key="scada_mot_dinamico_v3") specs = st.session_state.asset_registry[m] st.caption(f"⚙️ **Especificações Inferidas:** {specs['Power']} | {specs['RPM']} RPM | Alarme: {specs['TempMax']}°C") with c_top2: st.markdown("
", unsafe_allow_html=True) # --- LÓGICA DO INTERLOCK ESD --- if st.session_state.scada_interlock: if st.button("🔴 RESET INTERLOCK", type="primary", use_container_width=True, key="btn_reset_esd_v3"): st.session_state.scada_interlock = False st.session_state.scada_v = 1.0 st.session_state.scada_t = 30.0 st.rerun() elif not st.session_state.scada_live: if st.button("▶️ Ligar Planta (Live)", type="primary", use_container_width=True, key="btn_play_scada_v3"): st.session_state.scada_live = True st.rerun() else: if st.button("⏹️ Parar Planta", use_container_width=True, key="btn_stop_scada_v3"): st.session_state.scada_live = False st.rerun() with c_top3: # IA: RUL Dinâmico (Adaptado à TempMax do novo equipamento) vib_penalty = math.exp(-0.4 * max(0, st.session_state.scada_v - 2.5)) temp_penalty = math.exp(-0.08 * max(0, st.session_state.scada_t - (specs['TempMax'] * 0.6))) rul_days = 3650 * vib_penalty * temp_penalty rul_color = "#ff3333" if rul_days < 100 else ("#ffcc00" if rul_days < 1000 else "#00ff00") rul_status = "CRÍTICO" if rul_days < 100 else ("ATENÇÃO" if rul_days < 1000 else "SAUDÁVEL") fusion_alert = "
⚠️ CORRELAÇÃO FÍSICA QUEBRADA!" if st.session_state.anomaly_score > 80 else "" rul_html = f"""
🤖 IA Preditiva RUL: {rul_status}
{rul_days:.0f} Dias{fusion_alert}
""" st.markdown(rul_html, unsafe_allow_html=True) st.divider() # ========================================================================== # 4. LÓGICA DE TEMPO REAL, SENSOR FUSION E GERAÇÃO FFT # ========================================================================== if st.session_state.scada_live and not st.session_state.scada_interlock: st.session_state.scada_v += np.random.normal(0, 0.05) st.session_state.scada_t += np.random.normal(0, 0.2) st.session_state.scada_v = max(0.0, st.session_state.scada_v) st.session_state.scada_t = max(20.0, st.session_state.scada_t) # IA Sensor Fusion st.session_state.anomaly_score = st.session_state.ans_brain.calculate_anomaly_score(st.session_state.scada_v, st.session_state.scada_t) # Histórico 1D st.session_state.hist_v.append(st.session_state.scada_v) st.session_state.hist_v.pop(0) st.session_state.hist_t.append(st.session_state.scada_t) st.session_state.hist_t.pop(0) # Histórico 3D FFT (O RPM do harmônico ajusta-se à máquina lida da Base de Dados) rpm = specs['RPM'] v_live = st.session_state.scada_v amps = np.abs(np.random.normal(0, 0.2 + (v_live*0.05), 100)) main_idx = int((rpm / 60) / (1000 / 100)) if 0 <= main_idx < 100: amps[main_idx] += v_live * 2.0 if v_live > 4.5 and main_idx*2 < 100: amps[main_idx*2] += v_live * 1.5 st.session_state.hist_fft.append(amps) st.session_state.hist_fft.pop(0) # Interlock ajustado dinamicamente para o TempMax do Ativo if st.session_state.scada_v > 7.1 or st.session_state.scada_t > specs['TempMax']: st.session_state.scada_interlock = True st.session_state.scada_live = False if "pro_mem" in st.session_state: st.session_state.pro_mem['Q0.0'] = False st.session_state.pro_mem['Q0.1'] = True # ========================================================================== # 5. RENDERIZAÇÃO E HMI DINÂMICO # ========================================================================== c_p1, c_p2 = st.columns([1.2, 1.8]) with c_p1: st.subheader("Gêmeo Digital do Ativo") v_val = st.session_state.scada_v t_val = st.session_state.scada_t # Cores térmicas ajustam-se aos limites da máquina if st.session_state.scada_interlock: body_color = "#ff0000" elif st.session_state.anomaly_score > 80: body_color = "#ff33cc" elif t_val < (specs['TempMax'] * 0.6): body_color = "#3a4a5a" elif t_val < (specs['TempMax'] * 0.85): body_color = "#b87333" else: body_color = "#ff3333" shake_class = "motor-shake" if v_val > 4.5 and not st.session_state.scada_interlock else "" anim_speed = max(0.05, 0.5 - (v_val/30.0)) motor_html = f"""
T: {t_val:.1f}°C | V: {v_val:.1f}
""" st.markdown(motor_html, unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) st.subheader("Painel de Alarmes (ISA 18.2)") active_msgs = [] if st.session_state.scada_interlock: active_msgs.append("EMERGENCY SHUTDOWN (ESD ATIVADO)") if st.session_state.anomaly_score > 85: active_msgs.append("ALERTA PREDITIVO: Anomalia de Assinatura Multivariável") # Limites de alarme calculados com base na TempMax da máquina ativa if st.session_state.scada_t > specs['TempMax']: active_msgs.append(f"Sobreaquecimento Crítico do Ativo (>{specs['TempMax']}°C)") elif st.session_state.scada_t > specs['TempMax'] * 0.85: active_msgs.append(f"Elevação de Temperatura Anormal (>{int(specs['TempMax']*0.85)}°C)") if st.session_state.scada_v > 4.5: active_msgs.append("Vibração Excessiva (Verificar Mancal)") st.session_state.scada_alarms = [a for a in st.session_state.scada_alarms if a['msg'] in active_msgs] for msg in active_msgs: if not any(a['msg'] == msg for a in st.session_state.scada_alarms): sev = "CRIT" if "Crítico" in msg or "Excessiva" in msg or "ESD" in msg else "WARN" st.session_state.scada_alarms.insert(0, {'time': datetime.datetime.now().strftime("%H:%M:%S"), 'msg': msg, 'sev': sev, 'ack': False}) c_ack1, c_ack2 = st.columns([2, 1]) with c_ack2: if st.button("Reconhecer (ACK)", key="btn_ack_v3"): for a in st.session_state.scada_alarms: a['ack'] = True st.rerun() alarm_html = "
" if not st.session_state.scada_alarms: alarm_html += "Sistema Normal. Nenhuma anomalia no ativo." for a in st.session_state.scada_alarms: color = "#ff0000" if "ESD" in a['msg'] else ("#ff33cc" if "PREDITIVO" in a['msg'] else ("#ff3333" if a['sev'] == "CRIT" else "#ffcc00")) status = "[ACK]" if a['ack'] else "⚠️ [NEW]" opacity = "0.5" if a['ack'] else "1.0" weight = "bold" if "ESD" in a['msg'] or "PREDITIVO" in a['msg'] else "normal" alarm_html += f"
{status} {a['time']} - {a['msg']}
" alarm_html += "
" st.markdown(alarm_html, unsafe_allow_html=True) with c_p2: st.subheader("Painel APM & Telemetria Dinâmica") def update_v(): st.session_state.scada_v = st.session_state.scada_v_widget def update_t(): st.session_state.scada_t = st.session_state.scada_t_widget # Limite máximo de temperatura do slider ajustado dinamicamente (+20% de folga sobre a TempMax) dynamic_t_max = float(max(130, specs['TempMax'] * 1.2)) c_sl1, c_sl2 = st.columns(2) with c_sl1: st.slider("Forçar Vibração (mm/s)", 0.0, 15.0, value=float(st.session_state.scada_v), key="scada_v_widget_v3", on_change=update_v, disabled=st.session_state.scada_interlock) with c_sl2: st.slider("Forçar Temperatura (°C)", 20.0, dynamic_t_max, value=float(st.session_state.scada_t), key="scada_t_widget_v3", on_change=update_t, disabled=st.session_state.scada_interlock) stat, emer, health, msg, load = st.session_state.ans_brain.process_stimuli(st.session_state.scada_v, st.session_state.scada_t, specs['TempMax']) # Gauge da Temperatura com a escala 100% dinâmica! c_g1, c_g2, c_g3 = st.columns(3) c_g1.plotly_chart(st.session_state.ans_brain.create_gauge(st.session_state.scada_v, "Vibração", 15, [4.5, 7.1]), use_container_width=True, key="gauge_vib_v3") c_g2.plotly_chart(st.session_state.ans_brain.create_gauge(st.session_state.scada_t, "Temp", dynamic_t_max, [specs['TempMax']*0.8, specs['TempMax']]), use_container_width=True, key="gauge_temp_v3") c_g3.plotly_chart(st.session_state.ans_brain.create_gauge(st.session_state.anomaly_score, "Anomaly Score", 100, [60, 85], color_theme="#ff33cc"), use_container_width=True, key="gauge_anomaly_v3") c_f1, c_f2 = st.columns(2) c_f1.plotly_chart(st.session_state.ans_brain.create_waterfall_chart(st.session_state.hist_fft), use_container_width=True, key="chart_waterfall_v3") c_f2.plotly_chart(st.session_state.ans_brain.create_trend_chart(st.session_state.hist_v, st.session_state.hist_t), use_container_width=True, key="chart_trend_v3") # ========================================================================== # MOTOR DE RECARREGAMENTO CLOUD-SAFE (PROTEÇÃO CONTRA ERROR 500) # ========================================================================== # IMPORTANTE: Definido para 1.5 segundos. A nuvem respira, processa os gráficos # pesados do Plotly, envia para o navegador e garante estabilidade absoluta. if st.session_state.scada_live and not st.session_state.scada_interlock: time.sleep(1.5) st.rerun() # ------------------------------------------------------------------------------ # ABA 4: PID PRO (V160.0 - LIVE DCS, NOISE FILTER & DERIVATIVE ON PV) # ------------------------------------------------------------------------------ with tabs[3]: st.header("📈 PID APC & Safety Systems (LIVE DCS)") st.caption("Operação Contínua: Strip Chart, Filtro de Ruído, Derivada na PV, Auto-Tuning Z-N e Override.") # ========================================================================== # 0. GESTÃO DE ESTADO DO DCS (MÁQUINA DO TEMPO) # ========================================================================== if "kp_apc" not in st.session_state: st.session_state.kp_apc, st.session_state.ki_apc, st.session_state.kd_apc = 2.0, 0.5, 0.0 if "tune_msg" not in st.session_state: st.session_state.tune_msg = "" # Memória viva da simulação contínua if "apc_live" not in st.session_state: st.session_state.apc_live = False if "apc_state" not in st.session_state: st.session_state.apc_state = { "t": 0.0, "pv_real": 25.0, "model_pv": 25.0, "slave_pv": 0.0, "valve_pos": 0.0, "buffer_real": [25.0]*20, "buffer_model": [25.0]*20, "integral_main": 0.0, "prev_error_main": 0.0, "prev_pv_main": 25.0, "filtered_d": 0.0, "integral_ovr": 0.0, "prev_error_ovr": 0.0, "data": {"t":[], "pv_lido":[], "pv_real":[], "sp":[], "mv":[], "press":[]}, "override_triggered": False } def reset_apc_state(base_val): """Limpa a memória e reinicia a planta""" st.session_state.apc_live = False st.session_state.apc_state = { "t": 0.0, "pv_real": base_val, "model_pv": base_val, "slave_pv": 0.0, "valve_pos": 0.0, "buffer_real": [base_val]*20, "buffer_model": [base_val]*20, "integral_main": 0.0, "prev_error_main": 0.0, "prev_pv_main": base_val, "filtered_d": 0.0, "integral_ovr": 0.0, "prev_error_ovr": 0.0, "data": {"t":[], "pv_lido":[], "pv_real":[], "sp":[], "mv":[], "press":[]}, "override_triggered": False } st.session_state.tune_msg = "" def run_auto_tune(plant_type, dead_t): """Auto-Tuning Algorítmico (Ziegler-Nichols)""" K = 2.5 if "Térmico" in plant_type else 1.0 T = 20.0 if "Térmico" in plant_type else 2.0 L = max(0.1, dead_t) kp_zn = (1.2 * T) / (K * L) st.session_state.kp_apc = min(50.0, max(0.1, round(kp_zn, 2))) st.session_state.ki_apc = min(10.0, max(0.01, round(kp_zn / (2 * L), 3))) st.session_state.kd_apc = min(20.0, max(0.0, round(kp_zn * (0.5 * L), 2))) st.session_state.tune_msg = f"✅ Auto-Tuning Z-N Concluído (Atraso: {L}s)." # ========================================================================== # 1. LAYOUT DA INTERFACE (CONTROLOS VS GRÁFICOS) # ========================================================================== # Botões do Motor de Tempo Real c_live1, c_live2, c_live3 = st.columns([1, 1, 2]) with c_live1: if st.button("▶️ RUN DCS", type="primary", use_container_width=True): st.session_state.apc_live = True with c_live2: if st.button("⏸️ PAUSE DCS", use_container_width=True): st.session_state.apc_live = False with c_live3: if st.button("🔄 Reset Planta", use_container_width=True): base = 25.0 if "Térmico" in st.session_state.get('proc_type_sel', 'Térmico') else 0.0 reset_apc_state(base) st.rerun() st.divider() c1, c2 = st.columns([1.2, 2.5]) with c1: st.subheader("1. Configuração da Malha Principal") proc_type = st.selectbox("Planta Industrial", ["Reator Térmico (Lento)", "Linha de Pressão (Rápido)"], key="proc_type_sel") dead_time = st.slider("⏱️ Tempo Morto (Atraso/s)", 0.0, 10.0, 2.0, step=0.5, key="dt_slider") if st.button("🤖 Auto-Tuning (Ziegler-Nichols)", use_container_width=True): run_auto_tune(st.session_state.proc_type_sel, st.session_state.dt_slider) if st.session_state.tune_msg: st.success(st.session_state.tune_msg) c_p, c_i, c_d = st.columns(3) with c_p: kp = st.number_input("Kp", 0., 100., st.session_state.kp_apc, key="kp_apc") with c_i: ki = st.number_input("Ki", 0., 100., st.session_state.ki_apc, key="ki_apc") with c_d: kd = st.number_input("Kd", 0., 100., st.session_state.kd_apc, key="kd_apc") sp = st.number_input("🎯 SetPoint Live", 0., 200., 100.0) st.markdown("---") st.subheader("2. Mundo Real: Ruído e Segurança") use_noise = st.toggle("🌩️ Injetar Ruído de Sensor (Eletromagnético)") deriv_pv = st.toggle("🛡️ Derivada na PV (Evita Derivative Kick)", value=True, help="Usa a variação do sensor em vez do Erro para a derivada, evitando saltos violentos na válvula ao mudar o SP.") anti_windup = st.toggle("🛡️ Ativar Filtro Anti-Windup", value=True) use_manual = st.toggle("🖐️ Forçar Válvula em Manual") manual_mv = st.slider("Abertura Manual (%)", 0.0, 100.0, 20.0, disabled=not use_manual) use_bumpless = st.toggle("🛤️ Ativar Bumpless Transfer", value=True, disabled=not use_manual) use_override = st.toggle("🛡️ Controle Override (Alta Pressão)") max_press = st.slider("Limite de Pressão (PSI)", 50.0, 150.0, 80.0, disabled=not use_override) st.subheader("3. Estratégias APC (Avançadas)") use_cascade = st.toggle("🔄 Controle em Cascata") use_ff = st.toggle("⚡ Feedforward (Antecipatório)") use_smith = st.toggle("🧠 Preditor de Smith (Domador de Atraso)") stiction = st.slider("⚙️ Atrito da Válvula (Stiction %)", 0.0, 10.0, 0.0, step=0.5) dist_mag = st.slider("🌊 Choque de Carga (Instantâneo)", -50.0, 50.0, 0.0, step=5.0) # ========================================================================== # 2. MOTOR DE SIMULAÇÃO DCS (TEMPO REAL CONTÍNUO) # ========================================================================== if st.session_state.apc_live: s = st.session_state.apc_state base_val = 25.0 if "Térmico" in proc_type else 0.0 tau = 20.0 if "Térmico" in proc_type else 2.0 gain = 2.5 if "Térmico" in proc_type else 1.0 dt = 0.1 # Ajuste dinâmico do buffer de atraso se o utilizador mexer no slider ao vivo delay_steps = max(1, int(dead_time / dt)) while len(s["buffer_real"]) < delay_steps: s["buffer_real"].insert(0, s["pv_real"]) while len(s["buffer_real"]) > delay_steps: s["buffer_real"].pop(0) while len(s["buffer_model"]) < delay_steps: s["buffer_model"].insert(0, s["model_pv"]) while len(s["buffer_model"]) > delay_steps: s["buffer_model"].pop(0) kp_ovr, ki_ovr = 4.0, 0.5 # Ganhos de Segurança # Executa 5 ciclos de 0.1s (0.5s reais) por cada frame da UI para fluidez visual for _ in range(5): s["t"] += dt # 1. Leitura do Sensor (Com injeção de Ruído) noise = np.random.normal(0, 0.5) if use_noise else 0.0 pv_lido = s["buffer_real"][0] + noise # --- PREDITOR DE SMITH --- pv_feedback = (pv_lido + s["model_pv"] - s["buffer_model"][0]) if use_smith else pv_lido # --- BUMPLESS TRANSFER E LÓGICA PID PRINCIPAL --- error_main = sp - pv_feedback if use_manual: mv_master = manual_mv if use_bumpless: p_term = kp * error_main s["integral_main"] = (mv_master - p_term) / ki if ki > 0 else 0.0 else: s["integral_main"] = 0.0 else: p_term = kp * error_main # Saturação / Anti-Windup dinâmico if anti_windup: is_sat_high = (s["valve_pos"] >= 100.0 and error_main > 0) is_sat_low = (s["valve_pos"] <= 0.0 and error_main < 0) if not (is_sat_high or is_sat_low): s["integral_main"] += ki * error_main * dt else: s["integral_main"] += ki * error_main * dt # Derivada (Na PV ou no Erro) if deriv_pv: raw_d = -kd * (pv_feedback - s["prev_pv_main"]) / dt if dt > 0 else 0.0 else: raw_d = kd * (error_main - s["prev_error_main"]) / dt if dt > 0 else 0.0 # Filtro Passa-Baixa de Ruído (Alpha = 0.2) alpha = 0.2 s["filtered_d"] = (alpha * raw_d) + ((1.0 - alpha) * s["filtered_d"]) mv_master = p_term + s["integral_main"] + s["filtered_d"] s["prev_error_main"] = error_main s["prev_pv_main"] = pv_feedback # --- FEEDFORWARD --- mv_ff = -dist_mag / gain if use_ff else 0.0 cmd_mv = max(0.0, min(100.0, mv_master + mv_ff)) # --- CONTROLE OVERRIDE DE SEGURANÇA --- press_real = (s["pv_real"] * 0.4) + (s["valve_pos"] * 0.7) s["override_triggered"] = False if use_override: error_ovr = max_press - press_real p_term_ovr = kp_ovr * error_ovr s["integral_ovr"] += ki_ovr * error_ovr * dt mv_ovr = p_term_ovr + s["integral_ovr"] if mv_ovr < cmd_mv: cmd_mv = max(0.0, mv_ovr) s["override_triggered"] = True else: s["integral_ovr"] = (cmd_mv - p_term_ovr) / ki_ovr if ki_ovr > 0 else 0.0 # --- CONTROLE CASCATA & STICTION --- cmd_final = cmd_mv if use_cascade: slave_error = cmd_mv - s["slave_pv"] cmd_final = max(0.0, min(100.0, 5.0 * slave_error)) if abs(cmd_final - s["valve_pos"]) > stiction: s["valve_pos"] = cmd_final # --- ATUALIZAÇÃO DA FÍSICA FOPDT --- dist_aplicada = dist_mag if not use_ff else dist_mag # Impacto real na planta if use_cascade: s["slave_pv"] += ((s["valve_pos"] * 1.0 - s["slave_pv"]) / 0.5) * dt s["pv_real"] += (((s["slave_pv"] * gain) + base_val + dist_aplicada - s["pv_real"]) / tau) * dt else: s["pv_real"] += (((s["valve_pos"] * gain) + base_val + dist_aplicada - s["pv_real"]) / tau) * dt s["buffer_real"].append(s["pv_real"]); s["buffer_real"].pop(0) s["model_pv"] += (((s["valve_pos"] * gain) + base_val - s["model_pv"]) / tau) * dt s["buffer_model"].append(s["model_pv"]); s["buffer_model"].pop(0) # Salva dados d = s["data"] d["t"].append(s["t"]); d["pv_lido"].append(pv_lido); d["pv_real"].append(s["pv_real"]) d["sp"].append(sp); d["mv"].append(s["valve_pos"]); d["press"].append(press_real) # Efeito STRIP CHART (Mantém apenas os últimos 150 pontos para a esteira rolar) if len(d["t"]) > 150: for k in d.keys(): d[k].pop(0) # ========================================================================== # 3. RENDERIZAÇÃO DOS GRÁFICOS (LIVE UPDATE) # ========================================================================== with c2: d = st.session_state.apc_state["data"] if len(d["t"]) > 0: # Gráfico Principal: PV e Ruído fig1 = go.Figure() fig1.add_trace(go.Scatter(x=d['t'], y=d['pv_real'], name="PV Real", line=dict(color='#00ff00', width=2.5))) if use_noise: fig1.add_trace(go.Scatter(x=d['t'], y=d['pv_lido'], name="Sensor (Ruído)", line=dict(color='rgba(255,255,255,0.3)', width=1))) elif dead_time > 0: fig1.add_trace(go.Scatter(x=d['t'], y=d['pv_lido'], name="Sensor Atrasado", line=dict(color='rgba(255,255,255,0.4)', dash='dot'))) fig1.add_trace(go.Scatter(x=d['t'], y=d['sp'], name="Setpoint", line=dict(dash='dash', color='white'))) fig1.update_layout(title="Painel Principal: Nível do Tanque / Temperatura", template="plotly_dark", height=280, margin=dict(t=30,b=10,l=10,r=10)) st.plotly_chart(fig1, use_container_width=True) # Gráfico Secundário: Válvula & Pressão fig2 = go.Figure() fig2.add_trace(go.Scatter(x=d['t'], y=d['mv'], name="Abertura Válvula (MV %)", line=dict(color='#ff33cc', width=1.5), fill='tozeroy')) fig2.add_hline(y=100, line_width=1, line_dash="dot", line_color="red") fig2.add_hline(y=0, line_width=1, line_dash="dot", line_color="red") if use_override: fig2.add_trace(go.Scatter(x=d['t'], y=d['press'], name="Pressão Interna", line=dict(color='#ff9900', width=2))) fig2.add_hline(y=max_press, line_width=2, line_dash="dash", line_color="#ff0000", annotation_text=f"Limite Pressão") if st.session_state.apc_state["override_triggered"]: fig2.add_annotation(x=d['t'][-1], y=50, text="⚠️ OVERRIDE ATIVO!", showarrow=False, font=dict(color="red", size=15, weight="bold")) fig2.update_layout(title="Painel de Atuação e Segurança (Válvula)", template="plotly_dark", height=220, margin=dict(t=30,b=0,l=0,r=0), yaxis=dict(range=[-10, max(110, max_press*1.1) if use_override else 110])) st.plotly_chart(fig2, use_container_width=True) else: st.info("👈 Clique em **▶️ RUN DCS** para iniciar o Strip Chart em Tempo Real. Ajuste o SetPoint enquanto a planta roda para ver a dinâmica viva!") # --- NOVO LAUDO DE IA ESTRUTURADO --- if st.button("📝 Gerar Relatório Analítico (IA Especialista)", type="secondary"): with st.spinner("Compilando o relatório..."): laudo = call_aivion_brain("Engenheiro", f"Gere laudo DCS resumido. Kp={st.session_state.kp_apc}, Ki={st.session_state.ki_apc}. Ruído={use_noise}, DerivadaPV={deriv_pv}.", [], "llama-3.1-8b-instant") st.info(laudo) # Motor de Recarregamento Visual Contínuo if st.session_state.apc_live: time.sleep(0.05) st.rerun() # ------------------------------------------------------------------------------ # ABA 5: PLC STUDIO PRO (V150.1 - DYNAMIC LADDER, TIMERS & AI DEBUGGER) # ------------------------------------------------------------------------------ with tabs[4]: st.header("⚡ PLC Architect Pro & Edge Gateway") st.caption("IEC 61131-3: Visualizador Ladder Dinâmico, Timers (TON), Sinais Analógicos e Osciloscópio Lógico.") # CSS Injetado para dar vida às "Wires" e "Contacts" do Ladder st.markdown(""" """, unsafe_allow_html=True) # ========================================================================== # 0. ESTADOS DO MOTOR LÓGICO AVANÇADO # ========================================================================== if "pro_live" not in st.session_state: st.session_state.pro_live = False if "modbus_live" not in st.session_state: st.session_state.modbus_live = False if "box_pos" not in st.session_state: st.session_state.box_pos = 0.0 if "pro_rungs" not in st.session_state: st.session_state.pro_rungs = [] if "adv_blocks" not in st.session_state: st.session_state.adv_blocks = {} if "watch_vars" not in st.session_state: st.session_state.watch_vars = {'M0.0': False, 'M0.1': False} # Memória Digital e Analógica if "pro_mem" not in st.session_state: st.session_state.pro_mem = {k: False for k in ['I0.0','I0.1','I0.2','Q0.0','Q0.1','M0.0','M0.1','M0.2','T1','T2','C1','C2']} if "pro_prev_mem" not in st.session_state: st.session_state.pro_prev_mem = st.session_state.pro_mem.copy() if "pro_amem" not in st.session_state: st.session_state.pro_amem = {'IW0': 0.0, 'IW1': 0.0, 'MW10': 50.0} if "pro_history" not in st.session_state: st.session_state.pro_history = {"time": [], "Q0.0": [], "I0.0": [], "M0.0": []} if "scan_tick" not in st.session_state: st.session_state.scan_tick = 0 # ========================================================================== # 1. FUNÇÕES DE RENDERIZAÇÃO E AVALIAÇÃO (BACKEND) # ========================================================================== def eval_contact(addr, mod, mem, prev_mem, amem, comp_val=0): if addr == 'NONE': return False if mod == 'CMP_GT': return amem.get(addr, 0) > comp_val if mod == 'CMP_LT': return amem.get(addr, 0) < comp_val val = mem.get(addr, False) prev = prev_mem.get(addr, False) if mod == 'NO': return val if mod == 'NC': return not val if mod == 'P': return val and not prev if mod == 'N': return not val and prev return False def render_modern_rung(rung, mem, amem, blocks): sym_map = {'NO': '| |', 'NC': '|/|', 'P': '|P|', 'N': '|N|'} out_sym_map = {'COIL': '-( )-', 'INV': '-(/)-', 'SET': '-(S)-', 'RESET': '-(R)-'} def comp(addr, mod, state, val=0): if addr == 'NONE': return "" if 'CMP' in mod: op_str = ">" if mod == 'CMP_GT' else "<" bc = '#00ff00' if state else '#555' bg = '#113311' if state else '#222' return f'
CMP {op_str}
{addr} ({amem.get(addr,0):.1f})
Ref: {val}
' sym = sym_map.get(mod, '| |') return f'
{sym}
{addr}
' sa = eval_contact(rung['c_a'], rung['m_a'], mem, st.session_state.pro_prev_mem, amem, rung.get('val_a', 0)) sb = eval_contact(rung['c_b'], rung['m_b'], mem, st.session_state.pro_prev_mem, amem, rung.get('val_b', 0)) if rung['c_b'] != 'NONE' else False wa = sa; wb = False; fo = wa if rung['op'] == 'AND': wb = sb; fo = wa and wb elif rung['op'] == 'OR': fo = wa or sb dest = rung['dest'] dest_mod = rung['m_dest'] dest_state = mem.get(dest, False) h = f'
' h += f'
' h += comp(rung['c_a'], rung['m_a'], sa, rung.get('val_a', 0)) if rung['op'] == 'AND': h += f'
{comp(rung["c_b"], rung["m_b"], sb, rung.get("val_b", 0))}' elif rung['op'] == 'OR': # Desenha um ramo OR rudimentar h += f'
OR
{comp(rung["c_b"], rung["m_b"], sb, rung.get("val_b", 0))}' h += f'
' if dest_mod in ['TON', 'CTU']: b = blocks.get(dest, {'type': dest_mod, 'preset': 0, 'acc': 0}) bc = '#00ff00' if dest_state else '#555' bg = 'rgba(0,255,0,0.1)' if dest_state else '#222' tc = '#00ff00' if dest_state else '#ddd' h += f"""
{dest_mod} {dest}
PT: {b['preset']} s
AC: {b['acc']} s
""" else: sym = out_sym_map.get(dest_mod, '-( )') h += f'
{sym}
{dest}
' h += f'
' return h # ========================================================================== # 2. HEADER: CONTROLE DA CPU E IA # ========================================================================== c_live1, c_live2, c_live3, c_ia = st.columns([1, 1, 1.5, 1.5]) with c_live1: if st.button("▶️ RUN CPU", type="primary", use_container_width=True): st.session_state.pro_live = True with c_live2: if st.button("⏹️ STOP CPU", use_container_width=True): st.session_state.pro_live = False with c_live3: if st.button("🌐 Modbus TCP: " + ("ON" if st.session_state.modbus_live else "OFF"), use_container_width=True): st.session_state.modbus_live = not st.session_state.modbus_live with c_ia: if st.button("🤖 IA Code Reviewer (Debug)", use_container_width=True): with st.spinner("LLaMA a analisar lógica Ladder..."): rungs_str = str(st.session_state.pro_rungs) prompt = f"Analise o seguinte código Ladder interno (JSON): {rungs_str}. Identifique possíveis bugs lógicos (ex: Timers sem reset) e explique de forma técnica o que esta rotina faz." st.session_state.ai_plc_review = call_aivion_brain("💻 Dev", prompt, [], "llama-3.1-8b-instant") if st.session_state.get("ai_plc_review"): st.info(st.session_state.ai_plc_review) st.divider() # ========================================================================== # 3. EDITOR DE LÓGICA (CONSTRUTOR) # ========================================================================== with st.expander("🛠️ Construtor de Lógica (Adicionar Rung)", expanded=not st.session_state.pro_live): with st.container(border=True): cols = st.columns([2, 1, 2, 2]) with cols[0]: st.markdown("**Contato A**") c_a_mod = st.selectbox("Tipo A", ['NO', 'NC', 'P', 'N', 'CMP_GT', 'CMP_LT'], format_func=lambda x: {'NO':'| | Normal Aberto', 'NC':'|/| Normal Fechado', 'P':'|P| Borda Subida', 'N':'|N| Borda Descida', 'CMP_GT':'[ > ] Maior que', 'CMP_LT':'[ < ] Menor que'}[x], label_visibility="collapsed") if 'CMP' in c_a_mod: c_a_addr = st.selectbox("Endereço A", ['IW0','IW1','MW10'], label_visibility="collapsed") c_a_val = st.number_input("Valor Ref.", value=50.0, key="cmp_a") else: c_a_addr = st.selectbox("Endereço A", ['I0.0','I0.1','I0.2','M0.0','M0.1','T1','C1'], label_visibility="collapsed") c_a_val = 0 with cols[1]: st.markdown("**Associação**") op = st.selectbox("Operador", ['DIRECT', 'AND', 'OR'], label_visibility="collapsed") with cols[2]: st.markdown("**Contato B**") c_b_mod = st.selectbox("Tipo B", ['NO', 'NC', 'P', 'N', 'CMP_GT', 'CMP_LT'], format_func=lambda x: {'NO':'| | Normal Aberto', 'NC':'|/| Normal Fechado', 'P':'|P| Borda Subida', 'N':'|N| Borda Descida', 'CMP_GT':'[ > ] Maior que', 'CMP_LT':'[ < ] Menor que'}[x], label_visibility="collapsed") if 'CMP' in c_b_mod: c_b_addr = st.selectbox("Endereço B", ['NONE','IW0','IW1','MW10'], label_visibility="collapsed") c_b_val = st.number_input("Valor Ref.", value=50.0, key="cmp_b") else: c_b_addr = st.selectbox("Endereço B", ['NONE','I0.0','I0.1','I0.2','M0.0','M0.1','T1','C1'], label_visibility="collapsed") c_b_val = 0 with cols[3]: st.markdown("**Instrução de Saída**") dest_mod = st.selectbox("Tipo Saída", ['COIL', 'INV', 'SET', 'RESET', 'TON', 'CTU'], format_func=lambda x: {'COIL':'-( )- Bobina', 'INV':'-(/)- Invertida', 'SET':'-(S)- Latch', 'RESET':'-(R)- Unlatch', 'TON':'[TON] Timer On Delay', 'CTU':'[CTU] Counter Up'}[x], label_visibility="collapsed") c_dest_addr, c_dest_pt = st.columns(2) with c_dest_addr: if dest_mod in ['TON']: dest_addr = st.selectbox("Bloco", ['T1','T2'], label_visibility="collapsed") elif dest_mod in ['CTU']: dest_addr = st.selectbox("Bloco", ['C1','C2'], label_visibility="collapsed") else: dest_addr = st.selectbox("Endereço", ['Q0.0','Q0.1','M0.0','M0.1','M0.2'], label_visibility="collapsed") with c_dest_pt: preset = 0 if dest_mod in ['TON', 'CTU']: preset = st.number_input("Preset (s)", 1, 100, 5, label_visibility="collapsed") c_btn1, c_btn2 = st.columns([1, 1]) with c_btn1: if st.button("➕ Inserir Network", use_container_width=True, type="secondary"): rung = {'c_a': c_a_addr, 'm_a': c_a_mod, 'val_a': c_a_val, 'op': op, 'c_b': c_b_addr, 'm_b': c_b_mod, 'val_b': c_b_val, 'dest': dest_addr, 'm_dest': dest_mod} st.session_state.pro_rungs.append(rung) if dest_mod in ['TON', 'CTU']: st.session_state.adv_blocks[dest_addr] = {'type': dest_mod, 'preset': preset, 'acc': 0, 'last': False} with c_btn2: if st.button("🗑️ Limpar Programa", use_container_width=True): st.session_state.pro_rungs = [] st.session_state.adv_blocks = {} for k in st.session_state.pro_mem.keys(): st.session_state.pro_mem[k] = False # ========================================================================== # 4. CICLO DE SCAN E LÓGICA DE BACKEND (HMI E LADDER) # ========================================================================== if st.session_state.pro_live: mem = st.session_state.pro_mem amem = st.session_state.pro_amem prev = st.session_state.pro_prev_mem blocks = st.session_state.adv_blocks # A. Dinâmica da Planta Virtual (HMI) if mem.get('Q0.0', False): st.session_state.box_pos += 4.0 if st.session_state.box_pos > 110: st.session_state.box_pos = -10.0 # B. Feedback Físico (Sensor S1 e S2) mem['I0.1'] = 45 <= st.session_state.box_pos <= 55 mem['I0.2'] = st.session_state.box_pos > 100 # C. Forçamento da Watch Table for k, v in st.session_state.watch_vars.items(): mem[k] = v # D. Avaliação do Ladder (Scan Cycle real) for r in st.session_state.pro_rungs: sa = eval_contact(r['c_a'], r['m_a'], mem, prev, amem, r.get('val_a',0)) sb = eval_contact(r['c_b'], r['m_b'], mem, prev, amem, r.get('val_b',0)) if r['c_b'] != 'NONE' else False logic_true = sa if r['op'] == 'AND': logic_true = sa and sb elif r['op'] == 'OR': logic_true = sa or sb d = r['dest'] mod = r['m_dest'] if mod == 'COIL': mem[d] = logic_true elif mod == 'INV': mem[d] = not logic_true elif mod == 'SET': if logic_true: mem[d] = True elif mod == 'RESET': if logic_true: mem[d] = False elif mod == 'TON': b = blocks[d] if logic_true: b['acc'] += 0.2 # Ajustado ao refresh rate do Streamlit (~5Hz) if b['acc'] >= b['preset']: b['acc'] = b['preset'] mem[d] = True else: mem[d] = False else: b['acc'] = 0 mem[d] = False elif mod == 'CTU': b = blocks[d] if logic_true and not b['last']: b['acc'] += 1 b['last'] = logic_true mem[d] = (b['acc'] >= b['preset']) st.session_state.pro_prev_mem = mem.copy() # E. Registro do Osciloscópio st.session_state.scan_tick += 1 st.session_state.pro_history["time"].append(st.session_state.scan_tick) st.session_state.pro_history["Q0.0"].append(1 if mem.get('Q0.0') else 0) st.session_state.pro_history["I0.0"].append(1 if mem.get('I0.0') else 0) st.session_state.pro_history["M0.0"].append(1 if mem.get('M0.0') else 0) if len(st.session_state.pro_history["time"]) > 50: for k in st.session_state.pro_history.keys(): st.session_state.pro_history[k].pop(0) # ========================================================================== # 5. PAINÉIS VISUAIS (LADDER, GÊMEO E MODBUS) # ========================================================================== col_ladder, col_hmi = st.columns([1.8, 1.2]) with col_ladder: st.subheader("📈 Monitor Ladder (Real-Time)") with st.container(border=True): if not st.session_state.pro_rungs: st.info("💡 **Dica:** Insira um [TON] de 3s e use T1 para acionar o Q0.0!") else: for r in st.session_state.pro_rungs: st.markdown(render_modern_rung(r, st.session_state.pro_mem, st.session_state.pro_amem, st.session_state.adv_blocks), unsafe_allow_html=True) # Osciloscópio Lógico if st.session_state.pro_live and len(st.session_state.pro_history["time"]) > 2: st.markdown("#### 📡 Analisador Lógico (Timing Diagram)") fig_logic = go.Figure() t_data = st.session_state.pro_history["time"] fig_logic.add_trace(go.Scatter(x=t_data, y=[y + 4 for y in st.session_state.pro_history["Q0.0"]], mode='lines', line_shape='vh', name='Q0.0 (Motor)', line=dict(color='#00ff00'))) fig_logic.add_trace(go.Scatter(x=t_data, y=[y + 2 for y in st.session_state.pro_history["I0.0"]], mode='lines', line_shape='vh', name='I0.0 (Start)', line=dict(color='#00ccff'))) fig_logic.add_trace(go.Scatter(x=t_data, y=st.session_state.pro_history["M0.0"], mode='lines', line_shape='vh', name='M0.0 (Mem)', line=dict(color='#ffcc00'))) fig_logic.update_layout(template="plotly_dark", height=200, margin=dict(t=10,b=10,l=10,r=10), yaxis=dict(tickmode='array', tickvals=[0.5, 2.5, 4.5], ticktext=['M0.0', 'I0.0', 'Q0.0'], showgrid=False, zeroline=False), xaxis=dict(showgrid=True, gridcolor='#333')) st.plotly_chart(fig_logic, use_container_width=True) with col_hmi: st.subheader("🏭 Gêmeo Digital") bp = st.session_state.box_pos motor_on = st.session_state.pro_mem.get('Q0.0', False) alarm_on = st.session_state.pro_mem.get('Q0.1', False) sens_on = st.session_state.pro_mem.get('I0.1', False) m_color = "#00ff00" if motor_on else "#444" s_color = "#00ccff" if sens_on else "#444" a_color = "#ff3333" if alarm_on else "#222" a_glow = "box-shadow: 0 0 20px #ff3333;" if alarm_on else "" hmi_html = f"""
M1 (Q0.0) S1 (I0.1)
ALARM (Q0.1)
PEÇA
""" st.markdown(hmi_html, unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) # Controlos Manuais c_io, c_wt = st.columns(2) with c_io: st.markdown("**I/O Físico**") val_i00 = st.toggle("🟢 Botão Start (I0.0)", value=st.session_state.pro_mem.get('I0.0', False)) st.session_state.pro_mem['I0.0'] = val_i00 val_iw0 = st.slider("Termopar Simul. (IW0)", 0.0, 100.0, float(st.session_state.pro_amem['IW0'])) st.session_state.pro_amem['IW0'] = val_iw0 with c_wt: st.markdown("**Watch Table**") for var in ['M0.0', 'M0.1']: val = st.toggle(f"Forçar {var}", value=st.session_state.watch_vars.get(var, False), key=f"wt_{var}") st.session_state.watch_vars[var] = val st.session_state.pro_mem[var] = val if st.session_state.pro_live: time.sleep(0.2) st.rerun() # ------------------------------------------------------------------------------ # ABA 6: MATH STUDIO (V149.0 - Z-TRANSFORM, POLES/ZEROS & 3D CHAOS) # ------------------------------------------------------------------------------ with tabs[5]: st.header("🧮 Math Studio: P&D e Sinais Avançados") st.caption("Controlo Digital (Transformada Z), Atratores de Caos, Bode, Taylor e Álgebra Linear.") col_menu, col_calc = st.columns([1.2, 2.8]) with col_menu: st.markdown("#### ⚙️ Motor Analítico") op_category = st.selectbox("Categoria Matemática:", [ "Cálculo Clássico & 3D", "Álgebra Linear & Aproximação", "Sinais & Sistemas de Controlo", "Campos Vetoriais (Fluidos)", "IA & Sistemas Dinâmicos" ]) if op_category == "Cálculo Clássico & 3D": op_list = ["Derivada", "Derivada Parcial", "Integral Indefinida", "Integral Definida", "Integral Dupla", "Integral Tripla", "Superfície 3D"] elif op_category == "Álgebra Linear & Aproximação": op_list = ["Álgebra Linear (Matriz 2x2)", "Série de Taylor (Aproximação)"] elif op_category == "Sinais & Sistemas de Controlo": op_list = ["Diagrama de Bode (Frequência)", "Mapa de Polos e Zeros (Digital)", "Transformada de Laplace", "Série de Fourier"] elif op_category == "Campos Vetoriais (Fluidos)": op_list = ["Campo Vetorial (Quiver)", "Gradiente", "Divergente", "Rotacional"] else: op_list = ["Atrator de Caos (Sistema 3D)", "Descida do Gradiente (3D)", "Retrato de Fase (EDO)"] op = st.radio("Selecione a Operação:", op_list, label_visibility="collapsed") with col_calc: st.markdown("#### 📝 Entrada de Dados") with st.container(border=True): # 1. Inputs para Sinais, Bode e Z-Transform if op in ["Série de Fourier", "Transformada de Laplace", "Diagrama de Bode (Frequência)", "Mapa de Polos e Zeros (Digital)"]: if op == "Diagrama de Bode (Frequência)": st.info("📶 **Função de Transferência Analógica H(s):** Use a variável **s**.") expr = st.text_input("Equação H(s)", "100 / (s**2 + 5*s + 100)", key="math_expr") elif op == "Mapa de Polos e Zeros (Digital)": st.info("⭕ **Função de Transferência Digital H(z):** Use a variável **z**. Avalia a estabilidade de código para CLPs e Microcontroladores.") expr = st.text_input("Equação H(z)", "(z - 0.5) / (z**2 - 1.2*z + 0.5)", key="math_expr") else: st.info("📡 **Domínio do Tempo:** Use a variável **t**. O sistema fará a transformação.") expr = st.text_input("Sinal no tempo f(t)", "exp(-t) * sin(2*t)", key="math_expr") # 2. Inputs para Álgebra Linear (Matrizes) elif op == "Álgebra Linear (Matriz 2x2)": st.info("📐 **Matriz de Transformação:** Digite os 4 números separados por VÍRGULA (colunas) e PONTO-E-VÍRGULA (linhas).") expr = st.text_input("Matriz M = [a, b ; c, d]", "2, 1 ; 1, 2", key="math_expr") # 3. Inputs para Série de Taylor elif op == "Série de Taylor (Aproximação)": st.info("🧬 **Polinómio de Taylor:** A IA vai aprender a desenhar a sua função com polinómios.") expr = st.text_input("Função f(x)", "sin(x) * exp(x/5)", key="math_expr") # 4. Inputs para Fluidos, EDOs e Caos 3D elif op in ["Campo Vetorial (Quiver)", "Divergente", "Rotacional", "Retrato de Fase (EDO)", "Atrator de Caos (Sistema 3D)"]: if op == "Atrator de Caos (Sistema 3D)": st.info("🦋 **Teoria do Caos (3D):** Digite 3 equações EDO separadas por VÍRGULA (dx/dt, dy/dt, dz/dt). O padrão é o famoso Atrator de Lorenz.") expr = st.text_input("Sistema Caótico 3D", "10*(y - x), x*(28 - z) - y, x*y - (8/3)*z", key="math_expr") elif op == "Retrato de Fase (EDO)": st.info("🌪️ **Sistema Dinâmico 2D:** Digite duas equações separadas por VÍRGULA (dx/dt, dy/dt).") expr = st.text_input("Sistema: dx/dt, dy/dt", "y, -sin(x) - 0.5*y", key="math_expr") else: st.info("🌪️ **Vetor 2D:** Digite duas funções espaciais separadas por VÍRGULA.") expr = st.text_input("Vetor F(x, y) = P, Q", "-y, x", key="math_expr") # 5. Descida do Gradiente elif op == "Descida do Gradiente (3D)": st.info("🏔️ **Otimização:** Defina o terreno de Custo 3D. A IA vai rolar a bola até ao fundo.") expr = st.text_input("Função de Erro f(x,y)", "x**2 + y**2 + sin(x*3)", key="math_expr") # 6. Cálculo Clássico Normal else: st.info("📐 **Cálculo Base:** Use as variáveis **x**, **y** e **z**.") expr = st.text_input("Equação Matemática", "x**2 + sin(y)", key="math_expr") # --- Parâmetros Dinâmicos --- lims = [(None, -2., 2.), (None, -2., 2.), (None, -2., 2.)] params = {} if op == "Série de Taylor (Aproximação)": c_ta, c_tn = st.columns(2) with c_ta: params["taylor_a"] = st.number_input("Centrado em a =", value=0.0) with c_tn: params["taylor_n"] = st.number_input("Grau da Série (n)", min_value=1, max_value=20, value=5) elif op in ["Integral Definida", "Integral Dupla", "Integral Tripla"]: st.markdown("##### Limites de Integração") n_cols = 1 if op == "Integral Definida" else (2 if op == "Integral Dupla" else 3) cols = st.columns(n_cols) with cols[0]: lx1 = st.number_input("X Inf", -50., 50., -2., key="mx1"); lx2 = st.number_input("X Sup", -50., 50., 2., key="mx2") lims[0] = (None, lx1, lx2) if n_cols >= 2: with cols[1]: ly1 = st.number_input("Y Inf", -50., 50., -2., key="my1"); ly2 = st.number_input("Y Sup", -50., 50., 2., key="my2") lims[1] = (None, ly1, ly2) if n_cols == 3: with cols[2]: lz1 = st.number_input("Z Inf", -50., 50., -2., key="mz1"); lz2 = st.number_input("Z Sup", -50., 50., 2., key="mz2") lims[2] = (None, lz1, lz2) elif op == "Derivada Parcial": params["var"] = st.radio("Derivar em relação a:", ["x", "y"], horizontal=True) st.markdown("
", unsafe_allow_html=True) if st.button("🚀 Renderizar Simulação Computacional", type="primary", use_container_width=True, key="math_calc"): with st.spinner("A resolver sistemas dinâmicos e integrais numéricas..."): res, fig, _ = solve_calculus(expr, op, tuple(lims), params) st.session_state.calc_res = res st.session_state.calc_fig = fig st.session_state.calc_op = op st.session_state.calc_expr = expr if st.session_state.get("calc_res"): st.divider() st.markdown("#### Equação Resultante / Diagnóstico de Estabilidade") st.latex(st.session_state.calc_res) if st.session_state.get("calc_fig"): st.plotly_chart(st.session_state.calc_fig, use_container_width=True) if st.button("📝 Extrair Laudo Analítico da IA (Física e Controlo)", type="secondary", key="btn_laudo_math"): with st.spinner("A Neuralk AI está a redigir o relatório físico/matemático..."): prompt_math = f"""Atue como um Engenheiro Especialista de Sistemas Críticos. Acabei de realizar a operação: '{st.session_state.calc_op}' para a equação/sistema: '{st.session_state.calc_expr}'. Gere um laudo extremamente didático abordando: ### 🧠 Comportamento Físico Visualizado (O que o gráfico mostra) ### 🏭 Aplicação na Indústria e Controlo ### 📊 Diagnóstico de Estabilidade (Apenas se aplicável)""" laudo_math = call_aivion_brain("Engenheiro", prompt_math, [], "llama-3.1-8b-instant") st.info(laudo_math) # ------------------------------------------------------------------------------ # ABAS 7 & 8: CMMS e VISION AI # ------------------------------------------------------------------------------ with tabs[6]: # A lógica complexa desta aba está toda encapsulada no módulo externo render_cmms_tab() with tabs[7]: st.header("👁️ Vision AI & RAG") c_vis, c_rag = st.columns([1, 1]) with c_vis: st.subheader("Análise Visual de Engenharia") up_file = st.file_uploader("Upload de Imagem Técnica (Diagramas, Peças, Manuais)", key="vis_up") if up_file and st.button("Analisar Imagem", key="vis_btn", type="primary"): with st.spinner("Processando visão computacional..."): st.write(analyze_image_text(up_file)) with c_rag: st.subheader("📚 Consultar Manuais (RAG)") rag_q = st.text_input("Pergunta à Base de Conhecimento", key="rag_q", placeholder="Ex: Qual o torque de aperto do motor W22?") if rag_q and st.button("Buscar no Manual", key="rag_btn", type="primary"): with st.spinner("Lendo documentação técnica da fábrica..."): resp = query_knowledge_base(rag_q) st.info(resp) # ------------------------------------------------------------------------------ # ABA 9: CONVERSOR DE UNIDADES E PROJETOS (V141.0 - INDUSTRIAL MASTER) # ------------------------------------------------------------------------------ with tabs[8]: st.header("🔄 Conversor Universal & Projetos") st.caption("A verdadeira calculadora do engenheiro: Grandezas industriais, Padrões ANSI/NEC e Análise Dimensional IA.") # Seletor de Modo de Operação modo_conversor = st.radio("Modo de Operação", [ "🔢 Grandezas Físicas e Industriais", "📏 Normas Industriais (Cabos e Tubos)", "🧠 Analisador Dimensional (IA)" ], horizontal=True) st.divider() # ===================================================================== # MODO 1: GRANDEZAS FÍSICAS E INDUSTRIAIS # ===================================================================== if modo_conversor == "🔢 Grandezas Físicas e Industriais": conversao_data = { "Comprimento": {"Milímetro (mm)": 1e-3, "Centímetro (cm)": 1e-2, "Metro (m)": 1.0, "Quilômetro (km)": 1000.0, "Polegada (in)": 0.0254, "Pé (ft)": 0.03048, "Jarda (yd)": 0.9144, "Milha (mi)": 1609.344}, "Área": {"Milímetro² (mm²)": 1e-6, "Centímetro² (cm²)": 1e-4, "Metro² (m²)": 1.0, "Hectare (ha)": 10000.0, "Quilômetro² (km²)": 1e6, "Acre": 4046.856}, "Volume": {"Mililitro (mL)": 1e-3, "Litro (L)": 1.0, "Metro Cúbico (m³)": 1000.0, "Galão (US)": 3.78541, "Onça Líquida (fl oz)": 0.0295735}, "Massa": {"Miligrama (mg)": 1e-6, "Grama (g)": 1e-3, "Quilograma (kg)": 1.0, "Tonelada (t)": 1000.0, "Onça (oz)": 0.0283495, "Libra (lb)": 0.453592}, "Tempo": {"Milissegundo (ms)": 1e-3, "Segundo (s)": 1.0, "Minuto (min)": 60.0, "Hora (h)": 3600.0, "Dia (d)": 86400.0}, "Velocidade": {"Metro/segundo (m/s)": 1.0, "Quilômetro/hora (km/h)": 1/3.6, "Milha/hora (mph)": 0.44704, "Nó (knot)": 0.514444}, "Pressão (Industrial)": {"Pascal (Pa)": 1.0, "Bar (bar)": 1e5, "PSI (lbf/in²)": 6894.76, "Atmosfera (atm)": 101325.0, "Milímetro de Mercúrio (mmHg)": 133.322}, "Torque (Mecânica)": {"Newton-metro (N.m)": 1.0, "Quilograma-força centímetro (kgf.cm)": 0.0980665, "Libra-força pé (lb.ft)": 1.355818}, "Potência (Mecânica/Elétrica)": {"Watt (W)": 1.0, "Quilowatt (kW)": 1000.0, "Cavalo-vapor (CV)": 735.49875, "Horsepower (HP)": 745.699872, "BTU/h": 0.293071}, "Vazão/Caudal (Fluidos)": {"Metro cúbico/segundo (m³/s)": 1.0, "Metro cúbico/hora (m³/h)": 1/3600, "Litro/minuto (L/min)": 1/60000, "Galão/minuto (GPM)": 0.0000630902} } c_cat, c_val = st.columns([1, 2]) with c_cat: cat = st.selectbox("Categoria Física", list(conversao_data.keys()) + ["Temperatura"], key="conv_cat") with c_val: val_input = st.number_input("Valor para Converter", value=1.0, format="%.6f", key="conv_val") col_from, col_arrow, col_to, col_res = st.columns([2, 0.5, 2, 2]) if cat == "Temperatura": unit_list = ["Celsius (°C)", "Fahrenheit (°F)", "Kelvin (K)"] else: unit_list = list(conversao_data[cat].keys()) k_from = f"u_from_{cat}" k_to = f"u_to_{cat}" if k_from not in st.session_state: st.session_state[k_from] = unit_list[0] if k_to not in st.session_state: st.session_state[k_to] = unit_list[1] if len(unit_list) > 1 else unit_list[0] with col_from: u_from = st.selectbox("De", unit_list, key=k_from) with col_arrow: st.markdown("
", unsafe_allow_html=True) st.button("⇆", on_click=swap_units, args=(k_from, k_to), key=f"btn_swap_{cat}", help="Inverter Unidades") with col_to: u_to = st.selectbox("Para", unit_list, key=k_to) if cat == "Temperatura": res_temp = val_input if u_from == "Fahrenheit (°F)": res_temp = (val_input - 32) * 5/9 elif u_from == "Kelvin (K)": res_temp = val_input - 273.15 if u_to == "Fahrenheit (°F)": res_final = (res_temp * 9/5) + 32 elif u_to == "Kelvin (K)": res_final = res_temp + 273.15 else: res_final = res_temp simbolo = u_to.split('(')[-1].replace(')','') with col_res: st.metric(label="Resultado da Conversão", value=f"{res_final:,.4f} {simbolo}") else: base_val = val_input * conversao_data[cat][u_from] res_final = base_val / conversao_data[cat][u_to] simbolo = u_to.split('(')[-1].replace(')','') if res_final == 0: fmt_res = "0" elif abs(res_final) < 0.0001 or abs(res_final) > 1000000: fmt_res = f"{res_final:.4e}" else: fmt_res = f"{res_final:,.6f}" with col_res: st.metric(label="Resultado da Conversão", value=f"{fmt_res} {simbolo}") # ===================================================================== # MODO 2: NORMAS INDUSTRIAIS (AWG E NPS) # ===================================================================== elif modo_conversor == "📏 Normas Industriais (Cabos e Tubos)": st.subheader("Consultor de Normas (Tabelas AWG e NPS)") c_norm_cat, c_norm_sel = st.columns(2) with c_norm_cat: norm_type = st.selectbox("Categoria de Normalização", ["Fios e Cabos Elétricos (AWG)", "Tubagens de Aço/Fluidos (NPS)"]) with c_norm_sel: if "AWG" in norm_type: awg_dict = { "20 AWG (Telecom/Sensores)": "Seção: 0.52 mm² | Resistência: 33.3 Ω/km | Corrente Máx: ~11A", "18 AWG (Comandos/CLP)": "Seção: 0.82 mm² | Resistência: 21.0 Ω/km | Corrente Máx: ~16A", "16 AWG (Iluminação leve)": "Seção: 1.31 mm² | Resistência: 13.2 Ω/km | Corrente Máx: ~22A", "14 AWG (Tomadas standard)": "Seção: 2.08 mm² | Resistência: 8.28 Ω/km | Corrente Máx: ~32A", "12 AWG (Motores pequenos)": "Seção: 3.31 mm² | Resistência: 5.21 Ω/km | Corrente Máx: ~41A", "10 AWG (Ar condicionado)": "Seção: 5.26 mm² | Resistência: 3.28 Ω/km | Corrente Máx: ~55A", "8 AWG (Quadros gerais)": "Seção: 8.37 mm² | Resistência: 2.06 Ω/km | Corrente Máx: ~73A" } sel_awg = st.selectbox("Medida AWG (American Wire Gauge)", list(awg_dict.keys())) st.info(f"**Especificações de Engenharia:**\n\n{awg_dict[sel_awg]}") else: nps_dict = { "1/4 polegada (NPS 1/4)": "Ø Externo: 13.7 mm | Ø Interno (Sch 40): 9.2 mm", "1/2 polegada (NPS 1/2)": "Ø Externo: 21.3 mm | Ø Interno (Sch 40): 15.8 mm", "1 polegada (NPS 1)": "Ø Externo: 33.4 mm | Ø Interno (Sch 40): 26.6 mm", "2 polegadas (NPS 2)": "Ø Externo: 60.3 mm | Ø Interno (Sch 40): 52.5 mm", "4 polegadas (NPS 4)": "Ø Externo: 114.3 mm | Ø Interno (Sch 40): 102.3 mm", "6 polegadas (NPS 6)": "Ø Externo: 168.3 mm | Ø Interno (Sch 40): 154.1 mm", "8 polegadas (NPS 8)": "Ø Externo: 219.1 mm | Ø Interno (Sch 40): 202.7 mm" } sel_nps = st.selectbox("Tamanho Nominal da Tubagem (Nominal Pipe Size)", list(nps_dict.keys())) st.info(f"**Especificações de Tubagem:**\n\n{nps_dict[sel_nps]}") # ===================================================================== # MODO 3: ANALISADOR DIMENSIONAL COM IA # ===================================================================== elif modo_conversor == "🧠 Analisador Dimensional (IA)": st.subheader("Laboratório de Análise Dimensional") st.markdown("Escreva uma expressão com grandezas e a IA deduzirá a Unidade Física final.") c_dim_in, c_dim_btn = st.columns([3, 1]) with c_dim_in: dim_input = st.text_input("Expressão Dimensional (Use português, ex: (Massa * Velocidade) / Tempo)", "(Quilograma * Metro) / (Segundo^2)", key="dim_input") with c_dim_btn: st.markdown("
", unsafe_allow_html=True) btn_dim = st.button("🧠 Calcular Unidade", type="primary", use_container_width=True) if btn_dim: with st.spinner("A Neuralk AI está a analisar as grandezas e as unidades do SI..."): prompt = f"""Atue como um Engenheiro e Físico de referência. O utilizador pretende fazer a análise dimensional da seguinte combinação de grandezas: {dim_input}. Execute os seguintes passos em Markdown e Emojis: 1. Diga quais as unidades base do SI (kg, m, s, A, etc.) que compõem cada termo. 2. Apresente o cancelamento matemático das dimensões. 3. Conclua de forma CLARA qual é a GRANDEZA FÍSICA final obtida (Ex: Força, Energia, Pressão). 4. Indique o nome da UNIDADE DERIVADA que deve ser utilizada (Ex: Newton, Joule, Pascal).""" resp = call_aivion_brain("Engenheiro", prompt, [], "llama-3.1-8b-instant") st.success("Análise Concluída com Sucesso!") st.markdown(resp) # ------------------------------------------------------------------------------ # ABA 10: CONTROLE DE PH # ------------------------------------------------------------------------------ with tabs[9]: st.header("🧪 Controle Químico Industrial (pH)") st.caption("Ambiente de teste com distúrbios de carga, IA de Gain Scheduling, filtros DSP e HMI animado.") col_ctrl, col_sim = st.columns([1, 2.5]) with col_ctrl: st.subheader("1. Seleção de Reagentes") sel_acid = st.selectbox("Tanque de Ácido", list(ACID_OPTIONS.keys())) sel_base = st.selectbox("Tanque de Base", list(BASE_OPTIONS.keys())) st.subheader("2. Malha de Controle PID") c_kp, c_ki = st.columns(2) with c_kp: ph_kp = st.number_input("Kp Base", 0.0, 100.0, 0.5, key="ph_kp") with c_ki: ph_ki = st.number_input("Ki Base", 0.0, 50.0, 0.05, key="ph_ki") ph_kd = st.number_input("Kd", 0.0, 50.0, 0.0, key="ph_kd") use_gs = st.toggle("🧠 Ativar Gain Scheduling (PID IA)", help="Ajusta o Kp automaticamente.") st.subheader("3. Física do Processo") ph_sp = st.number_input("Setpoint pH", 0.0, 14.0, 10.0, step=0.1, key="ph_sp") delay = st.slider("Atraso de Mistura (Transporte)", 0, 50, 5) is_buffer = st.toggle("Efeito Tampão Químico", value=False) st.subheader("4. Chão de Fábrica (Estresse)") disturb_type = st.selectbox("Injetar Distúrbio (Aos 30s)", ["Nenhum", "Vazamento Ácido (Choque)", "Vazamento Alcalino (Choque)"]) sensor_wear = st.slider("Degradação do Eletrodo (%)", 0, 100, 0, help="Gera ruído no sensor.") use_dsp = st.toggle("📉 Ligar Filtro DSP (Média Móvel)") st.divider() if st.button("🚀 Iniciar Simulação Completa", type="primary", key="btn_run_ph"): pid = IndustrialPID(ph_kp, ph_ki, ph_kd, out_min=0, out_max=100) pid.integral = 50.0 / (ph_ki if ph_ki>0 else 1) reactor = ChemicalReactor(delay_steps=delay, buffer_effect=is_buffer, acid_factor=ACID_OPTIONS[sel_acid], base_factor=BASE_OPTIONS[sel_base]) data_ph = {"t":[], "pv":[], "sensor_pv":[], "sp":[], "acid":[], "base":[], "pid_mv":[]} dsp_buffer = [7.0] * 15 for t_step in range(600): dt = 0.1 if t_step == 300: if "Ácido" in disturb_type: reactor.net_reagent -= 0.8 elif "Alcalino" in disturb_type: reactor.net_reagent += 0.8 noise = np.random.normal(0, (sensor_wear / 100.0) * 0.8) raw_sensor = reactor.pv + noise dsp_buffer.pop(0); dsp_buffer.append(raw_sensor) filtered_sensor = np.mean(dsp_buffer) if use_dsp else raw_sensor if use_gs: pid.kp = ph_kp * 0.3 if abs(filtered_sensor - 7.0) < 1.5 else ph_kp * 2.0 else: pid.kp = ph_kp mv, _, _, _ = pid.update(ph_sp, filtered_sensor, dt) pv, acid, base = reactor.update(mv, dt) data_ph["t"].append(t_step*dt); data_ph["pv"].append(pv); data_ph["sensor_pv"].append(filtered_sensor) data_ph["sp"].append(ph_sp); data_ph["acid"].append(acid); data_ph["base"].append(base); data_ph["pid_mv"].append(mv) st.session_state.ph_data = data_ph st.session_state.acid_name = sel_acid.split(" ")[0] st.session_state.base_name = sel_base.split(" ")[0] st.session_state.has_noise = sensor_wear > 0 with col_sim: if "ph_data" in st.session_state: d = st.session_state.ph_data final_ph = d['pv'][-1]; final_acid = d['acid'][-1]; final_base = d['base'][-1] aname = st.session_state.get('acid_name', 'Ácido') bname = st.session_state.get('base_name', 'Base') color = get_ph_color(final_ph) st.markdown("#### Supervisório HMI (Diagrama P&ID)") hmi_html = f"""

Tanque {aname}

CV-101
{final_acid:.1f}%
pH {final_ph:.2f}

Tanque {bname}

CV-102
{final_base:.1f}%
""" st.markdown(hmi_html, unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) fig_ph = go.Figure() fig_ph.add_trace(go.Scatter(x=d['t'], y=d['pv'], mode='lines', name="pH Real", line=dict(color=color, width=3))) if st.session_state.get('has_noise', False): fig_ph.add_trace(go.Scatter(x=d['t'], y=d['sensor_pv'], mode='lines', name="Sensor/CLP", line=dict(color='rgba(255,255,255,0.3)', width=1.5))) fig_ph.add_trace(go.Scatter(x=d['t'], y=d['sp'], mode='lines', name="SetPoint", line=dict(dash='dash', color='white'))) fig_ph.update_layout(title="Curva de Titulação Dinâmica", template="plotly_dark", height=280, yaxis=dict(range=[0, 14]), margin=dict(t=30,b=10,l=10,r=10)) st.plotly_chart(fig_ph, use_container_width=True) fig_v = go.Figure() fig_v.add_trace(go.Scatter(x=d['t'], y=d['acid'], mode='lines', fill='tozeroy', name=f"CV-101 ({aname})", line=dict(color='#ff3333'))) fig_v.add_trace(go.Scatter(x=d['t'], y=d['base'], mode='lines', fill='tozeroy', name=f"CV-102 ({bname})", line=dict(color='#3333ff'))) fig_v.update_layout(title="Válvulas Split-Range (%)", template="plotly_dark", height=180, margin=dict(t=30,b=0,l=0,r=0), yaxis=dict(range=[0, 100])) st.plotly_chart(fig_v, use_container_width=True) else: st.info("👈 Selecione os reagentes químicos e clique em 'Iniciar Simulação'.") # ------------------------------------------------------------------------------ # ABA 11: EMBEDDED STUDIO & EDGE AI (V220.1 - KERNEL PANIC FIX & UNIFIED LOOP) # ------------------------------------------------------------------------------ with tabs[10]: st.header("💻 Embedded Studio & Edge AI") st.caption("IDE Virtual: Pinout 3D, WebSerial HIL, TinyML, Gerador de Código IA e RTOS Profiler.") # --- GESTÃO DE ESTADO DO RTOS E EDGE AI --- if "rtos_live" not in st.session_state: st.session_state.rtos_live = False if "rtos_t" not in st.session_state: st.session_state.rtos_t = 0 if "rtos_heap" not in st.session_state: st.session_state.rtos_heap = 128.0 if "edge_live" not in st.session_state: st.session_state.edge_live = False col_code, col_logic = st.columns([1.2, 1.8]) with col_code: st.subheader("1. Linguagem e Hardware") prog_lang = st.radio("Linguagem de Programação:", ["C++ (Arduino/FreeRTOS)", "Python (MicroPython)"], horizontal=True) HW_LIST = [ "ESP32 WROOM-32 (Tensilica Xtensa)", "ESP32-S3 (Dual Core AI)", "ESP32-C3 (RISC-V)", "ESP8266 (NodeMCU)", "Arduino Uno R3", "Arduino Nano", "Arduino Mega 2560", "Raspberry Pi Pico (RP2040)", "Raspberry Pi Pico W (WiFi)", "Raspberry Pi 4 (Header 40-pin)" ] hw_target = st.selectbox("Hardware Alvo", HW_LIST, index=0) st.subheader("2. Mapeamento Físico (3D Interativo)") c_p1, c_p2 = st.columns(2) with c_p1: in_pins_str = st.text_input("Pinos IN (Azul)", "34, 35") with c_p2: out_pins_str = st.text_input("Pinos OUT (Laranja)", "15, 2, 4") in_pins = [p.strip() for p in in_pins_str.split(',') if p.strip().isdigit()] out_pins = [p.strip() for p in out_pins_str.split(',') if p.strip().isdigit()] in_array_str = ", ".join(in_pins) if in_pins else "0" out_array_str = ", ".join(out_pins) if out_pins else "0" num_in, num_out = len(in_pins), len(out_pins) # Pinout 3D Arrastável (Preservado) components.html(generate_3d_pinout_html(hw_target, in_pins, out_pins), height=470) st.subheader("3. Módulo Lógico e Deploy") logic_module = st.selectbox("Selecione o Firmware:", [ "🧠 Código Customizado via IA", "Inferência de IA (TinyML - TensorFlow Lite)", "Controle PID (Tarefa RTOS/Loop)", "Cliente IoT (WiFi + MQTT)", "Máquina de Estados Finitos (FSM)", "Controle de Servomotores (Robótica)", "Filtro Digital Média Móvel (DSP)" ], label_visibility="collapsed") custom_prompt = "" if logic_module == "🧠 Código Customizado via IA": custom_prompt = st.text_area("🤖 Descreva o que o firmware deve fazer:", placeholder="Ex: Piscar o relé no pino de saída a cada 500ms e ler a temperatura no pino de entrada...") lang_ext = "cpp" if "C++" in prog_lang else "py" lang_syntax = "cpp" if "C++" in prog_lang else "python" plotter_data = None if st.button(f"⚙️ Compilar e Gerar Firmware", type="primary"): if num_in == 0 and num_out == 0: st.error("Insira pelo menos um pino numérico válido!") elif logic_module == "🧠 Código Customizado via IA" and not custom_prompt: st.warning("Por favor, descreva a lógica para a IA gerar o código.") else: st.success(f"Firmware '{logic_module}' gerado com sucesso!") codigo_gerado, term_log = "", "" # --- LÓGICA IA CUSTOMIZADA --- if logic_module == "🧠 Código Customizado via IA": with st.spinner("Neuralk AI está a programar o seu firmware do zero..."): prompt_ia = f"Escreva um código limpo em {prog_lang} para a placa {hw_target}. Pinos de Entrada conectados: {in_array_str}. Pinos de Saída conectados: {out_array_str}. Lógica pedida: {custom_prompt}. Responda APENAS com o código fonte, sem formatação markdown fora do código." codigo_bruto = call_aivion_brain("💻 Dev", prompt_ia, [], "llama-3.1-8b-instant") codigo_gerado = codigo_bruto.replace("```cpp", "").replace("```python", "").replace("```", "").strip() term_log = "[System] Firmware IA carregado. A aguardar dados..." # --- LÓGICA C++ (Templates) --- elif "C++" in prog_lang: if logic_module == "Inferência de IA (TinyML - TensorFlow Lite)": codigo_gerado = f"""// Target: {hw_target} | Module: Edge AI (TinyML)\n#include \n#include "model_data.h" // Modelo treinado exportado\n\nconst int INPUT_PINS[] = {{{in_array_str}}};\nconst int kTensorArenaSize = 4 * 1024; // 4KB RAM para a Rede Neural\nALIGN(16) uint8_t tensor_arena[kTensorArenaSize];\n\nvoid setup() {{\n Serial.begin(115200);\n Serial.println("[TinyML] Inicializando Interpretador TFLite...");\n // Setup do modelo omitido por brevidade\n}}\n\nvoid loop() {{\n // Leitura dos sensores para o Tensor de Entrada\n float raw_sensor = analogRead(INPUT_PINS[0]) / 4095.0;\n \n // Run Inference\n // invoke_status = interpreter->Invoke();\n \n float anomaly_score = random(0, 15) / 100.0;\n if(raw_sensor > 0.8) anomaly_score = 0.95;\n \n Serial.print("Sensor_In: "); Serial.print(raw_sensor);\n Serial.print(" | AI_Anomaly_Score: "); Serial.println(anomaly_score);\n delay(100);\n}}""" term_log = "[TinyML] Carregando Modelo...\nSensor_In: 0.35 | AI_Anomaly_Score: 0.02\nSensor_In: 0.91 | AI_Anomaly_Score: 0.95 !! ATENÇÃO !!" plotter_t = np.linspace(0, 100, 200); s_in = np.sin(plotter_t/5)*0.5 + 0.5; a_sc = np.where(s_in > 0.8, 0.95, np.random.normal(0.05, 0.02, 200)) plotter_data = {"Sensor Input": s_in, "IA Anomaly Score": a_sc} elif logic_module == "Controle PID (Tarefa RTOS/Loop)": codigo_gerado = f"""// Target: {hw_target} | Module: PID RTOS\n#include \n#include "freertos/FreeRTOS.h"\n#include "freertos/task.h"\n\nconst int INPUT_PINS[] = {{{in_array_str}}};\nconst int OUTPUT_PINS[] = {{{out_array_str}}};\n\nvoid vTaskPIDControl(void *pvParameters) {{\n float Kp=1.5, Ki=0.2, Kd=0.1, setpoint=50.0, intg=0, prev=0;\n for(;;) {{\n float pv = (analogRead(INPUT_PINS[0]) / 4095.0) * 100.0;\n float err = setpoint - pv;\n intg += err * 0.1;\n float out = (Kp*err) + (Ki*intg) + (Kd*(err-prev)/0.1);\n if(out>255) out=255; if(out<0) out=0;\n analogWrite(OUTPUT_PINS[0], (int)out);\n prev = err;\n Serial.print("PV:"); Serial.print(pv); Serial.print(",SP:"); Serial.println(setpoint);\n vTaskDelay(100 / portTICK_PERIOD_MS);\n }}\n}}\n\nvoid setup() {{\n Serial.begin(115200);\n xTaskCreatePinnedToCore(vTaskPIDControl, "PID", 2048, NULL, 1, NULL, 1);\n}}\nvoid loop() {{ vTaskDelete(NULL); }}""" term_log = "[RTOS] Booting PID Task...\nPV: 25.0, SP: 50.0\nPV: 28.5, SP: 50.0" plotter_t = np.linspace(0, 50, 100); plotter_data = {"SP": np.ones(100)*50, "PV": 50 - 25*np.exp(-plotter_t/10) * np.cos(plotter_t)} elif logic_module == "Filtro Digital Média Móvel (DSP)": codigo_gerado = f"""// Target: {hw_target} | Module: DSP Filter\n#include \nconst int INPUT_PINS[] = {{{in_array_str}}};\n#define NUM_SAMPLES 15\nint readings[NUM_SAMPLES]; int readIndex=0; long total=0; int average=0;\n\nvoid setup() {{\n Serial.begin(115200);\n for(int i=0; i 0.8 else 0.05 # Mock inference\n print("Sensor_In: {{:.2f}} | IA_Anomaly: {{:.2f}}".format(raw_val, score))\n time.sleep(0.1)""" term_log = "[TinyML] Modelo Neural Carregado.\nSensor_In: 0.35 | IA_Anomaly: 0.05\nSensor_In: 0.91 | IA_Anomaly: 0.95 !! ATENÇÃO !!" plotter_t = np.linspace(0, 100, 200); s_in = np.sin(plotter_t/5)*0.5 + 0.5; a_sc = np.where(s_in > 0.8, 0.95, np.random.normal(0.05, 0.02, 200)) plotter_data = {"Sensor Input": s_in, "IA Anomaly Score": a_sc} elif logic_module == "Controle PID (Tarefa RTOS/Loop)": codigo_gerado = f"""# Target: {hw_target} | Module: PID\nimport machine, time\nINPUT_PINS = [{in_array_str}]\nOUTPUT_PINS = [{out_array_str}]\nsensor = machine.ADC(machine.Pin(INPUT_PINS[0])) if INPUT_PINS else None\npwm = machine.PWM(machine.Pin(OUTPUT_PINS[0])) if OUTPUT_PINS else None\n\nKp, Ki, Kd, sp, intg, prev = 1.5, 0.2, 0.1, 50.0, 0, 0\nwhile True:\n if sensor and pwm:\n pv = (sensor.read()/4095.0)*100.0\n err = sp - pv; intg += err * 0.1\n out = max(0, min(1023, int(((Kp*err + Ki*intg + Kd*(err-prev)/0.1)/255.0)*1023)))\n pwm.duty(out); prev = err\n print("PV: {{}}, SP: {{}}".format(pv, sp))\n time.sleep(0.1)""" term_log = "[MicroPython] PID Loop...\nPV: 25.0, SP: 50.0" plotter_t = np.linspace(0, 50, 100); plotter_data = {"SP": np.ones(100)*50, "PV": 50 - 25*np.exp(-plotter_t/10) * np.cos(plotter_t)} else: codigo_gerado = f"# Modulo Python: {logic_module}\nprint('Iniciado')" term_log = "Script Iniciado..." st.code(codigo_gerado, language=lang_syntax) st.download_button(f"💾 Baixar main.{lang_ext}", data=codigo_gerado, file_name=f"main.{lang_ext}", mime="text/plain") # --- EDGE AI HARDWARE IN THE LOOP (HIL) --- if logic_module == "Inferência de IA (TinyML - TensorFlow Lite)": st.markdown("### 🔌 Simulação HIL (Hardware-In-The-Loop) com SCADA") edge_btn = st.button("▶️ Ligar Inferência Live" if not st.session_state.edge_live else "⏹️ Parar Inferência", key="btn_edge_live") if edge_btn: st.session_state.edge_live = not st.session_state.edge_live st.rerun() if st.session_state.edge_live: # Lê a variável Global do SCADA (Se existir) live_scada_v = st.session_state.get('scada_v', 2.5) + np.random.normal(0, 0.1) # Processamento IA na Borda (Edge) edge_score = (live_scada_v / 7.1) * 100.0 edge_status = "🔴 ANOMALIA!" if edge_score > 80 else "🟢 NORMAL" edge_html = f"""
[ESP32_TinyML] Lendo Sensor de Vibração...
> SCADA_Vib: {live_scada_v:.2f} mm/s
> TFLite_Model_Score: {edge_score:.1f}%
> Ação Local: {edge_status}
""" st.markdown(edge_html, unsafe_allow_html=True) # CORREÇÃO: O st.rerun() foi retirado daqui e movido para o final absoluto da aba! # --- TABS: TERMINAL vs SERIAL PLOTTER --- tab_term, tab_plot = st.tabs(["🖥️ Monitor Serial", "📈 Serial Plotter Virtual"]) with tab_term: st.markdown(f'
{term_log}
_
', unsafe_allow_html=True) with tab_plot: if plotter_data: fig_sp = go.Figure() for k, v in plotter_data.items(): fig_sp.add_trace(go.Scatter(y=v, mode='lines', name=k)) fig_sp.update_layout(template="plotly_dark", height=200, margin=dict(t=10,b=10,l=10,r=10)) st.plotly_chart(fig_sp, use_container_width=True, key="embedded_plotter") else: st.info("O módulo selecionado não gera dados contínuos para o plotter.") # --- CONEXÃO DIRETA WEBSERIAL (HIL REAL) --- st.subheader("4. Conexão WebSerial API (HIL Direto)") st.markdown("Ligue o seu ESP32/Arduino via USB. O navegador comunicará **diretamente** com o hardware!") webserial_html = """
Status: Desconectado
Aguardando Conexão Serial...
""" components.html(webserial_html, height=250) with col_logic: st.subheader("Analisador Lógico & RTOS Profiler") mode_analyzer = st.radio("Modo de Inspeção / Decodificador:", [ "Sinais Básicos (Digital/PWM)", "I2C (Mestre-Escravo)", "UART (RS-232 Asíncrono)", "SPI (Síncrono 4-Vias)", "Profiler FreeRTOS (Tracealyzer)" ], key="rd_logic_mode") # UI Dinâmica dependendo do Modo if mode_analyzer == "Sinais Básicos (Digital/PWM)": c_sig1, c_sig2 = st.columns(2) with c_sig1: duty_cycle = st.slider("Duty Cycle PWM (CH1)", 0, 100, 40, step=10, format="%d%%") with c_sig2: digital_toggle = st.toggle("Relé Digital (CH2)", value=False) elif mode_analyzer == "I2C (Mestre-Escravo)": i2c_addr = st.text_input("Endereço HEX", value="0x68") elif mode_analyzer == "UART (RS-232 Asíncrono)": uart_char = st.text_input("Caractere ASCII a Enviar", value="A", max_chars=1) # --- PROFILER FREERTOS AO VIVO --- elif mode_analyzer == "Profiler FreeRTOS (Tracealyzer)": st.markdown("#### Monitoramento de Recursos (Live RTOS)") c_rtos1, c_rtos2 = st.columns([1, 1]) with c_rtos1: if st.button("▶️ RUN Profiler" if not st.session_state.rtos_live else "⏹️ PAUSE Profiler", key="btn_rtos_run"): st.session_state.rtos_live = not st.session_state.rtos_live st.rerun() with c_rtos2: if st.button("💣 Simular Memory Leak", type="primary", key="btn_rtos_leak"): st.session_state.rtos_heap -= 20.0 # Destrói a RAM progressivamente cpu_load = st.slider("Injetar Carga na Task_IA (CPU Core 1)", 10, 100, 60, key="sld_rtos_cpu") # Avisos críticos do RTOS heap_color = "normal" if st.session_state.rtos_heap > 50 else "inverse" heap_status = "Estável" if st.session_state.rtos_heap > 50 else "CRÍTICO: Stack Overflow Iminente!" c_ram, c_c0, c_c1 = st.columns(3) c_ram.metric("RAM Livre (Heap)", f"{max(0, st.session_state.rtos_heap):.1f} KB", heap_status, delta_color=heap_color) c_c0.metric("Core 0 (PRO_CPU)", "15%", "OS / WiFi", delta_color="normal") c_c1.metric("Core 1 (APP_CPU)", f"{cpu_load}%", "Processamento IA", delta_color="inverse" if cpu_load > 85 else "normal") if st.session_state.rtos_heap <= 0: st.error("💀 KERNEL PANIC: Stack Overflow na Task_IA. O sistema reiniciou (Watchdog Reset).") st.session_state.rtos_heap = 128.0 # Auto-reset do microcontrolador st.session_state.rtos_live = False if st.button("📡 Atualizar Gráfico Analisador", type="secondary", key="btn_logic_update"): if not st.session_state.get('rtos_live', False): with st.spinner("Analisando frequências e barramentos..."): time.sleep(0.5) # --- DESENHO DOS GRÁFICOS DO ANALISADOR E DO RTOS --- fig_logic = go.Figure() # RTOS Profiler (Gantt Animado Contínuo) if mode_analyzer == "Profiler FreeRTOS (Tracealyzer)": def add_task(fig, core, start, duration, name, color): fig.add_trace(go.Scatter(x=[start, start+duration], y=[core, core], mode='lines', line=dict(color=color, width=22), name=name)) fig_logic.add_hline(y=0, line_color="#333", line_width=1); fig_logic.add_hline(y=1, line_color="#333", line_width=1) # O eixo do tempo avança dinamicamente t_base = st.session_state.rtos_t ia_duration = 5 + (cpu_load / 10.0) add_task(fig_logic, 1, t_base + 1, 8, "Task_PID (Alta)", "#00ff00") add_task(fig_logic, 1, t_base + 9, 3, "IDLE", "#444444") add_task(fig_logic, 1, t_base + 12, ia_duration, "Task_TinyML (IA)", "#ff33cc") add_task(fig_logic, 1, t_base + 12 + ia_duration, 5, "IDLE", "#444444") add_task(fig_logic, 0, t_base + 0, 2, "OS_Tick", "#ffcc00") add_task(fig_logic, 0, t_base + 2, 15, "Task_WiFi_MQTT", "#00ccff") add_task(fig_logic, 0, t_base + 17, 2, "OS_Tick", "#ffcc00") add_task(fig_logic, 0, t_base + 19, 16, "IDLE", "#444444") fig_logic.update_layout(title="Tracealyzer: Execução FreeRTOS (Microsegundos)", template="plotly_dark", height=300, margin=dict(t=40,b=20,l=10,r=10), yaxis=dict(tickmode='array', tickvals=[0, 1], ticktext=['Core 0 (OS)', 'Core 1 (App)'], showgrid=False, zeroline=False), xaxis=dict(title="Tempo (ms)", showgrid=True, gridcolor='#333', dtick=5, range=[t_base, t_base+50]), showlegend=False) # Protocolos Clássicos (Sinais Estáticos) else: t_logic = np.linspace(0, 50, 500) if mode_analyzer == "Sinais Básicos (Digital/PWM)": ch1_pwm = np.where((t_logic % 10) < (10 * (duty_cycle/100.0)), 1, 0) ch2_digital = np.ones(500) if digital_toggle else np.zeros(500) ch3_clk = np.where((t_logic % 2) < 1, 1, 0) fig_logic.add_trace(go.Scatter(x=t_logic, y=ch1_pwm + 4, mode='lines', line_shape='vh', name="PWM", line=dict(color='#00ff00'))) fig_logic.add_trace(go.Scatter(x=t_logic, y=ch2_digital + 2, mode='lines', line_shape='vh', name="Relé", line=dict(color='#ff3333'))) fig_logic.add_trace(go.Scatter(x=t_logic, y=ch3_clk, mode='lines', line_shape='vh', name="Clock", line=dict(color='#00ccff'))) y_ticks, y_text = [0.5, 2.5, 4.5], ['CLK', 'DIG', 'PWM'] elif mode_analyzer == "I2C (Mestre-Escravo)": ch_scl = np.where((t_logic % 4) < 2, 1, 0); ch_scl[:30] = 1; ch_scl[-30:] = 1 ch_sda = np.zeros(500); ch_sda[:20] = 1; ch_sda[20:30] = 0; ch_sda[30:200] = np.random.choice([0,1], 170); ch_sda[200:240] = 0; ch_sda[240:400] = np.random.choice([0,1], 160); ch_sda[400:480] = 0; ch_sda[480:] = 1 fig_logic.add_trace(go.Scatter(x=t_logic, y=ch_sda + 2, mode='lines', line_shape='vh', name="SDA", line=dict(color='#ffcc00'))) fig_logic.add_trace(go.Scatter(x=t_logic, y=ch_scl, mode='lines', line_shape='vh', name="SCL", line=dict(color='#00ccff'))) fig_logic.add_annotation(x=25, y=2.5, text="START"); fig_logic.add_annotation(x=220, y=2.2, text="ACK", font=dict(color="green")) y_ticks, y_text = [0.5, 2.5], ['SCL', 'SDA'] elif mode_analyzer == "UART (RS-232 Asíncrono)": ascii_val = ord(uart_char) if uart_char else 65 bin_str = format(ascii_val, '08b')[::-1] ch_tx = np.ones(500); ch_tx[50:100] = 0 for i in range(8): ch_tx[100 + (i*50) : 150 + (i*50)] = int(bin_str[i]) ch_tx[500:550] = 1 fig_logic.add_trace(go.Scatter(x=t_logic, y=ch_tx, mode='lines', line_shape='vh', name="TX", line=dict(color='#ff33cc', width=2))) fig_logic.add_annotation(x=75, y=0.5, text="START (0)", font=dict(color="white")) fig_logic.add_annotation(x=275, y=1.2, text=f"Data: '{uart_char}' (0x{ascii_val:X})", showarrow=False, font=dict(color="white")) y_ticks, y_text = [0.5], ['TX Line'] elif mode_analyzer == "SPI (Síncrono 4-Vias)": ch_cs = np.ones(500); ch_cs[30:450] = 0 ch_sck = np.zeros(500); ch_sck[50:430] = np.where(((t_logic[50:430] - 50) % 40) < 20, 1, 0) ch_mosi = np.zeros(500); ch_mosi[50:430] = np.random.choice([0,1], 380) ch_miso = np.zeros(500); ch_miso[50:430] = np.random.choice([0,1], 380) fig_logic.add_trace(go.Scatter(x=t_logic, y=ch_miso + 6, mode='lines', line_shape='vh', name="MISO", line=dict(color='#ff9900'))) fig_logic.add_trace(go.Scatter(x=t_logic, y=ch_mosi + 4, mode='lines', line_shape='vh', name="MOSI", line=dict(color='#ff33cc'))) fig_logic.add_trace(go.Scatter(x=t_logic, y=ch_sck + 2, mode='lines', line_shape='vh', name="SCK", line=dict(color='#00ccff'))) fig_logic.add_trace(go.Scatter(x=t_logic, y=ch_cs, mode='lines', line_shape='vh', name="CS", line=dict(color='#ffffff'))) fig_logic.add_annotation(x=30, y=0.5, text="CS Drop"); fig_logic.add_annotation(x=450, y=0.5, text="CS High") y_ticks, y_text = [0.5, 2.5, 4.5, 6.5], ['CS', 'SCK', 'MOSI', 'MISO'] fig_logic.update_layout(title=f"Decodificador Lógico: {mode_analyzer}", template="plotly_dark", height=350, margin=dict(t=40,b=20,l=10,r=10), yaxis=dict(tickmode='array', tickvals=y_ticks, ticktext=y_text, showgrid=False, zeroline=False), xaxis=dict(title="Tempo (ms)", showgrid=True, gridcolor='#333', dtick=5)) st.plotly_chart(fig_logic, use_container_width=True, key="logic_analyzer_plot_v2") # ====================================================================== # MOTOR DE RERUN UNIFICADO (FRAME RATE DE TODA A ABA 11) # ====================================================================== # CRÍTICO: Colocado estritamente no final para garantir que o Streamlit # consiga desenhar 100% dos botões e gráficos antes de reiniciar a aba. needs_rerun = False if mode_analyzer == "Profiler FreeRTOS (Tracealyzer)" and st.session_state.get('rtos_live', False): st.session_state.rtos_t += 5 needs_rerun = True if st.session_state.get('edge_live', False): needs_rerun = True if needs_rerun: time.sleep(0.1) # Taxa fixa de 10 FPS segura para o navegador st.rerun()