""" Aba de Monitoramento e Métricas Visualiza estatísticas de uso e performance do sistema """ import gradio as gr from src.database import DatabaseManager def create_monitoring_tab(db_manager: DatabaseManager): """Cria aba de monitoramento e métricas""" with gr.Tab(" Monitoramento"): gr.Markdown(""" ## Dashboard de Monitoramento Acompanhe estatísticas de uso e performance: - **Métricas de Query**: Latências e throughput - **Uso da Base**: Documentos e chats - **Performance**: Análise de tempos de resposta """) refresh_all_btn = gr.Button(" Atualizar Todas as Métricas", variant="primary", size="lg") with gr.Row(): with gr.Column(): gr.Markdown("### Estatísticas Gerais") general_stats = gr.JSON(label="Resumo Geral") with gr.Column(): gr.Markdown("### Performance de Queries") query_performance = gr.JSON(label="Métricas de Latência") with gr.Row(): gr.Markdown("### Distribuição de Scores de Similaridade") score_distribution = gr.Markdown("Sem dados disponíveis") with gr.Row(): gr.Markdown("### Últimas Queries") with gr.Row(): recent_queries_limit = gr.Slider( minimum=5, maximum=50, value=10, step=5, label="Quantidade de queries recentes" ) recent_queries_table = gr.Dataframe( headers=["ID", "Query", "Resultados", "Tempo Total (ms)", "Top K", "Data"], label="Histórico de Queries", wrap=True ) # Função para buscar estatísticas gerais def get_general_stats(): conn = db_manager.connect() if not conn: return {"erro": "Não foi possível conectar ao banco"} try: stats = {} with conn.cursor() as cur: # Total de documentos cur.execute("SELECT COUNT(*) FROM documents") stats['total_documentos'] = cur.fetchone()[0] # Total de chats cur.execute("SELECT COUNT(*) FROM chats") stats['total_chats'] = cur.fetchone()[0] # Total de mensagens cur.execute("SELECT COUNT(*) FROM messages") stats['total_mensagens'] = cur.fetchone()[0] # Total de queries registradas cur.execute("SELECT COUNT(*) FROM query_metrics") stats['total_queries'] = cur.fetchone()[0] # Média de mensagens por chat cur.execute(""" SELECT AVG(msg_count) FROM ( SELECT COUNT(*) as msg_count FROM messages GROUP BY chat_id ) as counts """) result = cur.fetchone() stats['media_mensagens_por_chat'] = round(result[0], 2) if result[0] else 0 return stats except Exception as e: return {"erro": str(e)} # Função para calcular métricas de performance def get_query_performance(): conn = db_manager.connect() if not conn: return {"erro": "Não foi possível conectar ao banco"} try: metrics = {} with conn.cursor() as cur: # Latências médias cur.execute(""" SELECT AVG(retrieval_time_ms) as avg_retrieval, AVG(generation_time_ms) as avg_generation, AVG(total_time_ms) as avg_total, MIN(total_time_ms) as min_total, MAX(total_time_ms) as max_total FROM query_metrics """) row = cur.fetchone() if row and row[0] is not None: metrics['latencia_media_retrieval_ms'] = round(row[0], 2) metrics['latencia_media_generation_ms'] = round(row[1], 2) metrics['latencia_media_total_ms'] = round(row[2], 2) metrics['latencia_min_total_ms'] = round(row[3], 2) metrics['latencia_max_total_ms'] = round(row[4], 2) else: metrics['info'] = "Nenhuma query registrada ainda" # Média de resultados por query cur.execute("SELECT AVG(num_results) FROM query_metrics") result = cur.fetchone() metrics['media_resultados_por_query'] = round(result[0], 2) if result[0] else 0 # Top K mais usado cur.execute(""" SELECT top_k, COUNT(*) as count FROM query_metrics GROUP BY top_k ORDER BY count DESC LIMIT 1 """) result = cur.fetchone() if result: metrics['top_k_mais_usado'] = result[0] metrics['top_k_frequency'] = result[1] return metrics except Exception as e: return {"erro": str(e)} # Função para calcular distribuição de scores def get_score_distribution(): conn = db_manager.connect() if not conn: return " Não foi possível conectar ao banco" try: with conn.cursor() as cur: # Busca scores recentes (precisaríamos salvar isso nas métricas) # Por enquanto, retorna mensagem cur.execute("SELECT COUNT(*) FROM query_metrics") count = cur.fetchone()[0] if count == 0: return " Nenhuma query registrada ainda. Execute algumas queries no Chat RAG para ver a distribuição de scores." return f""" **Distribuição de Scores** Total de queries registradas: {count} _Nota: Para visualização completa de scores, seriam necessários gráficos._ _Scores indicam similaridade entre query e documentos (0.0 a 1.0)_ """ except Exception as e: return f" Erro: {str(e)}" # Função para buscar queries recentes def get_recent_queries(limit): conn = db_manager.connect() if not conn: return [] try: with conn.cursor() as cur: cur.execute(""" SELECT id, query, num_results, total_time_ms, top_k, created_at FROM query_metrics ORDER BY created_at DESC LIMIT %s """, (int(limit),)) rows = cur.fetchall() table_data = [] for row in rows: query_preview = row[1][:50] + "..." if len(row[1]) > 50 else row[1] table_data.append([ row[0], query_preview, row[2], f"{row[3]:.0f}", row[4], str(row[5]) ]) return table_data except Exception as e: return [] # Função para atualizar tudo def refresh_all(limit): gen_stats = get_general_stats() perf_stats = get_query_performance() score_dist = get_score_distribution() recent = get_recent_queries(limit) return gen_stats, perf_stats, score_dist, recent # Conecta eventos refresh_all_btn.click( fn=refresh_all, inputs=[recent_queries_limit], outputs=[general_stats, query_performance, score_distribution, recent_queries_table] ) recent_queries_limit.change( fn=get_recent_queries, inputs=[recent_queries_limit], outputs=[recent_queries_table] ) return { "refresh_btn": refresh_all_btn }