rag_template / ui /monitoring_tab.py
Guilherme Favaron
Initial commit of local project
f5eb34f
"""
Aba de Monitoramento e Métricas
Visualiza estatísticas de uso e performance do sistema
"""
import gradio as gr
from src.database import DatabaseManager
def create_monitoring_tab(db_manager: DatabaseManager):
"""Cria aba de monitoramento e métricas"""
with gr.Tab(" Monitoramento"):
gr.Markdown("""
## Dashboard de Monitoramento
Acompanhe estatísticas de uso e performance:
- **Métricas de Query**: Latências e throughput
- **Uso da Base**: Documentos e chats
- **Performance**: Análise de tempos de resposta
""")
refresh_all_btn = gr.Button(" Atualizar Todas as Métricas", variant="primary", size="lg")
with gr.Row():
with gr.Column():
gr.Markdown("### Estatísticas Gerais")
general_stats = gr.JSON(label="Resumo Geral")
with gr.Column():
gr.Markdown("### Performance de Queries")
query_performance = gr.JSON(label="Métricas de Latência")
with gr.Row():
gr.Markdown("### Distribuição de Scores de Similaridade")
score_distribution = gr.Markdown("Sem dados disponíveis")
with gr.Row():
gr.Markdown("### Últimas Queries")
with gr.Row():
recent_queries_limit = gr.Slider(
minimum=5,
maximum=50,
value=10,
step=5,
label="Quantidade de queries recentes"
)
recent_queries_table = gr.Dataframe(
headers=["ID", "Query", "Resultados", "Tempo Total (ms)", "Top K", "Data"],
label="Histórico de Queries",
wrap=True
)
# Função para buscar estatísticas gerais
def get_general_stats():
conn = db_manager.connect()
if not conn:
return {"erro": "Não foi possível conectar ao banco"}
try:
stats = {}
with conn.cursor() as cur:
# Total de documentos
cur.execute("SELECT COUNT(*) FROM documents")
stats['total_documentos'] = cur.fetchone()[0]
# Total de chats
cur.execute("SELECT COUNT(*) FROM chats")
stats['total_chats'] = cur.fetchone()[0]
# Total de mensagens
cur.execute("SELECT COUNT(*) FROM messages")
stats['total_mensagens'] = cur.fetchone()[0]
# Total de queries registradas
cur.execute("SELECT COUNT(*) FROM query_metrics")
stats['total_queries'] = cur.fetchone()[0]
# Média de mensagens por chat
cur.execute("""
SELECT AVG(msg_count)
FROM (
SELECT COUNT(*) as msg_count
FROM messages
GROUP BY chat_id
) as counts
""")
result = cur.fetchone()
stats['media_mensagens_por_chat'] = round(result[0], 2) if result[0] else 0
return stats
except Exception as e:
return {"erro": str(e)}
# Função para calcular métricas de performance
def get_query_performance():
conn = db_manager.connect()
if not conn:
return {"erro": "Não foi possível conectar ao banco"}
try:
metrics = {}
with conn.cursor() as cur:
# Latências médias
cur.execute("""
SELECT
AVG(retrieval_time_ms) as avg_retrieval,
AVG(generation_time_ms) as avg_generation,
AVG(total_time_ms) as avg_total,
MIN(total_time_ms) as min_total,
MAX(total_time_ms) as max_total
FROM query_metrics
""")
row = cur.fetchone()
if row and row[0] is not None:
metrics['latencia_media_retrieval_ms'] = round(row[0], 2)
metrics['latencia_media_generation_ms'] = round(row[1], 2)
metrics['latencia_media_total_ms'] = round(row[2], 2)
metrics['latencia_min_total_ms'] = round(row[3], 2)
metrics['latencia_max_total_ms'] = round(row[4], 2)
else:
metrics['info'] = "Nenhuma query registrada ainda"
# Média de resultados por query
cur.execute("SELECT AVG(num_results) FROM query_metrics")
result = cur.fetchone()
metrics['media_resultados_por_query'] = round(result[0], 2) if result[0] else 0
# Top K mais usado
cur.execute("""
SELECT top_k, COUNT(*) as count
FROM query_metrics
GROUP BY top_k
ORDER BY count DESC
LIMIT 1
""")
result = cur.fetchone()
if result:
metrics['top_k_mais_usado'] = result[0]
metrics['top_k_frequency'] = result[1]
return metrics
except Exception as e:
return {"erro": str(e)}
# Função para calcular distribuição de scores
def get_score_distribution():
conn = db_manager.connect()
if not conn:
return " Não foi possível conectar ao banco"
try:
with conn.cursor() as cur:
# Busca scores recentes (precisaríamos salvar isso nas métricas)
# Por enquanto, retorna mensagem
cur.execute("SELECT COUNT(*) FROM query_metrics")
count = cur.fetchone()[0]
if count == 0:
return " Nenhuma query registrada ainda. Execute algumas queries no Chat RAG para ver a distribuição de scores."
return f"""
**Distribuição de Scores**
Total de queries registradas: {count}
_Nota: Para visualização completa de scores, seriam necessários gráficos._
_Scores indicam similaridade entre query e documentos (0.0 a 1.0)_
"""
except Exception as e:
return f" Erro: {str(e)}"
# Função para buscar queries recentes
def get_recent_queries(limit):
conn = db_manager.connect()
if not conn:
return []
try:
with conn.cursor() as cur:
cur.execute("""
SELECT
id,
query,
num_results,
total_time_ms,
top_k,
created_at
FROM query_metrics
ORDER BY created_at DESC
LIMIT %s
""", (int(limit),))
rows = cur.fetchall()
table_data = []
for row in rows:
query_preview = row[1][:50] + "..." if len(row[1]) > 50 else row[1]
table_data.append([
row[0],
query_preview,
row[2],
f"{row[3]:.0f}",
row[4],
str(row[5])
])
return table_data
except Exception as e:
return []
# Função para atualizar tudo
def refresh_all(limit):
gen_stats = get_general_stats()
perf_stats = get_query_performance()
score_dist = get_score_distribution()
recent = get_recent_queries(limit)
return gen_stats, perf_stats, score_dist, recent
# Conecta eventos
refresh_all_btn.click(
fn=refresh_all,
inputs=[recent_queries_limit],
outputs=[general_stats, query_performance, score_distribution, recent_queries_table]
)
recent_queries_limit.change(
fn=get_recent_queries,
inputs=[recent_queries_limit],
outputs=[recent_queries_table]
)
return {
"refresh_btn": refresh_all_btn
}