checklist / utils /database.py
Abimael Torcate
Fix secret parsing to match Hugging Face Space configuration
32be338
"""Módulo de conexão e operações com o banco de dados PostgreSQL"""
import psycopg2
from psycopg2.extras import RealDictCursor
import uuid
from datetime import datetime
from contextlib import contextmanager
import streamlit as st
import os
import re
def extrair_credenciais():
"""Extrai as credenciais do secret db_checklist"""
# Primeiro tenta o secret db_checklist
db_checklist = os.getenv('db_checklist', '')
if db_checklist:
# Extrai as credenciais do formato usado no secret
credenciais = {}
# Padrões para extrair cada valor do formato específico
padroes = {
'host': r'Db_host:\s*([\d\.]+)',
'port': r'Porta:\s*(\d+)',
'user': r'Username:\s*(\w+)',
'database': r'Database:\s*(\w+)',
'password': r'Password:\s*(\w+)'
}
for key, padrao in padroes.items():
match = re.search(padrao, db_checklist)
if match:
if key == 'port':
credenciais[key] = int(match.group(1))
else:
credenciais[key] = match.group(1)
return credenciais
# Fallback para variáveis individuais
credenciais = {
'host': os.getenv('DB_HOST'),
'port': int(os.getenv('DB_PORT', '5432')),
'database': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD')
}
# Fallback para valores hardcoded quando variáveis não estão definidas
if not credenciais['host']:
credenciais.update({
'host': '77.37.43.160',
'port': 5432,
'database': 'checklist',
'user': 'abimael',
'password': 'ctweek'
})
return credenciais
def criar_conexao():
"""Cria uma nova conexão com o banco de dados"""
try:
creds = extrair_credenciais()
# Debug
print(f"Conectando com: host={creds.get('host')}, port={creds.get('port')} (tipo: {type(creds.get('port'))}), database={creds.get('database')}, user={creds.get('user')}")
# Garante que a porta seja um inteiro
porta = creds.get('port')
if isinstance(porta, str):
porta = int(porta)
conn = psycopg2.connect(
host=creds.get('host'),
port=porta,
database=creds.get('database'),
user=creds.get('user'),
password=creds.get('password')
)
print("✅ Conexão estabelecida com sucesso!")
return conn
except Exception as e:
print(f"❌ Erro ao conectar com PostgreSQL: {e}")
print(f"Tipo do erro: {type(e).__name__}")
return None
@contextmanager
def get_db_connection():
"""Context manager para conexão com o banco de dados"""
conn = None
try:
conn = criar_conexao()
if not conn:
raise Exception("Não foi possível estabelecer conexão com o banco")
yield conn
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
if conn:
conn.close()
def create_tables():
"""Cria as tabelas do banco se não existirem"""
# SQL inline para evitar dependência de arquivo externo
sql_script = """
-- Criar tabela de checklists
CREATE TABLE IF NOT EXISTS checklists (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
numero_processo VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Criar tabela de itens do checklist
CREATE TABLE IF NOT EXISTS checklist_items (
id VARCHAR(36) PRIMARY KEY,
checklist_id VARCHAR(36) REFERENCES checklists(id) ON DELETE CASCADE,
text TEXT NOT NULL,
position INTEGER NOT NULL
);
-- Criar tabela de interações
CREATE TABLE IF NOT EXISTS item_interactions (
id SERIAL PRIMARY KEY,
item_id VARCHAR(36) REFERENCES checklist_items(id) ON DELETE CASCADE,
checklist_id VARCHAR(36) REFERENCES checklists(id) ON DELETE CASCADE,
action VARCHAR(20) NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Criar view para estados atuais dos itens
CREATE OR REPLACE VIEW current_item_states AS
WITH latest_interactions AS (
SELECT
item_id,
action,
timestamp,
ROW_NUMBER() OVER (PARTITION BY item_id ORDER BY timestamp DESC) as rn
FROM item_interactions
)
SELECT
item_id,
CASE WHEN action = 'checked' THEN true ELSE false END as is_checked
FROM latest_interactions
WHERE rn = 1;
-- Criar view para análise de tempo por item
CREATE OR REPLACE VIEW item_time_analysis AS
WITH item_sessions AS (
SELECT
ii.item_id,
ci.text as item_text,
ci.position,
ci.checklist_id,
ii.timestamp,
ii.action,
LAG(ii.timestamp) OVER (
PARTITION BY ii.item_id
ORDER BY ii.timestamp
) as prev_timestamp,
LAG(ii.action) OVER (
PARTITION BY ii.item_id
ORDER BY ii.timestamp
) as prev_action
FROM item_interactions ii
JOIN checklist_items ci ON ii.item_id = ci.id
ORDER BY ii.item_id, ii.timestamp
),
work_sessions AS (
SELECT
item_id,
item_text,
position,
checklist_id,
CASE
WHEN prev_action = 'unchecked' AND action = 'checked' AND prev_timestamp IS NOT NULL
THEN EXTRACT(EPOCH FROM (timestamp - prev_timestamp))
ELSE 0
END as session_seconds
FROM item_sessions
WHERE prev_timestamp IS NOT NULL
)
SELECT
item_id,
item_text,
position,
checklist_id,
COUNT(CASE WHEN session_seconds > 0 THEN 1 END) as times_worked,
COALESCE(SUM(session_seconds), 0) as total_seconds_spent,
CASE
WHEN COUNT(CASE WHEN session_seconds > 0 THEN 1 END) > 0
THEN COALESCE(SUM(session_seconds), 0) / COUNT(CASE WHEN session_seconds > 0 THEN 1 END)
ELSE 0
END as avg_seconds_per_completion
FROM work_sessions
GROUP BY item_id, item_text, position, checklist_id;
"""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql_script)
conn.commit()
# Tabelas criadas com sucesso - removido print para compatibilidade Streamlit
def save_checklist(name, items, numero_processo=None):
"""Salva um novo checklist no banco de dados"""
checklist_id = str(uuid.uuid4())
with get_db_connection() as conn:
with conn.cursor() as cur:
# Inserir checklist
cur.execute("""
INSERT INTO checklists (id, name, numero_processo)
VALUES (%s, %s, %s)
""", (checklist_id, name, numero_processo))
# Inserir items
for position, item_text in enumerate(items):
if item_text.strip():
item_id = str(uuid.uuid4())
cur.execute("""
INSERT INTO checklist_items (id, checklist_id, text, position)
VALUES (%s, %s, %s, %s)
""", (item_id, checklist_id, item_text.strip(), position))
conn.commit()
return checklist_id
def get_all_checklists():
"""Retorna todos os checklists do banco"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT id, name, numero_processo, created_at
FROM checklists
ORDER BY created_at DESC
""")
return cur.fetchall()
def get_checklist_with_items(checklist_id):
"""Retorna um checklist com seus itens e estados atuais"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Buscar checklist
cur.execute("""
SELECT id, name, numero_processo, created_at
FROM checklists
WHERE id = %s
""", (checklist_id,))
checklist = cur.fetchone()
if not checklist:
return None
# Buscar items com estado atual
cur.execute("""
SELECT
ci.id,
ci.text,
ci.position,
COALESCE(cis.is_checked, false) as is_checked
FROM checklist_items ci
LEFT JOIN current_item_states cis ON ci.id = cis.item_id
WHERE ci.checklist_id = %s
ORDER BY ci.position
""", (checklist_id,))
checklist['items'] = cur.fetchall()
return checklist
def toggle_item(item_id, checklist_id, new_state):
"""Registra a mudança de estado de um item"""
action = 'checked' if new_state else 'unchecked'
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO item_interactions (item_id, checklist_id, action)
VALUES (%s, %s, %s)
""", (item_id, checklist_id, action))
conn.commit()
def get_checklist_analytics(checklist_id):
"""Retorna análises de tempo do checklist"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Estatísticas gerais
cur.execute("""
SELECT
COUNT(DISTINCT ci.id) as total_items,
COUNT(DISTINCT CASE WHEN cis.is_checked THEN ci.id END) as completed_items,
MIN(ii.timestamp) as first_interaction,
MAX(ii.timestamp) as last_interaction
FROM checklist_items ci
LEFT JOIN current_item_states cis ON ci.id = cis.item_id
LEFT JOIN item_interactions ii ON ci.id = ii.item_id
WHERE ci.checklist_id = %s
""", (checklist_id,))
stats = cur.fetchone()
# Análise de tempo por item
cur.execute("""
SELECT
item_text,
position,
times_worked,
total_seconds_spent,
avg_seconds_per_completion
FROM item_time_analysis
WHERE checklist_id = %s
ORDER BY position
""", (checklist_id,))
time_analysis = cur.fetchall()
return {
'stats': stats,
'time_analysis': time_analysis
}
def delete_checklist(checklist_id):
"""Deleta um checklist e todos os seus dados relacionados"""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM checklists WHERE id = %s", (checklist_id,))
conn.commit()
def get_all_checklists_with_stats():
"""Retorna todos os checklists com estatísticas de progresso"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT
c.id,
c.name,
c.numero_processo,
c.created_at,
COUNT(ci.id) as total_items,
COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items,
CASE
WHEN COUNT(ci.id) > 0 THEN
ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1)
ELSE 0
END as progress_percentage
FROM checklists c
LEFT JOIN checklist_items ci ON c.id = ci.checklist_id
LEFT JOIN current_item_states cis ON ci.id = cis.item_id
GROUP BY c.id, c.name, c.numero_processo, c.created_at
ORDER BY c.created_at DESC
""")
return cur.fetchall()
def get_general_stats():
"""Retorna estatísticas gerais do sistema"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
WITH checklist_progress AS (
SELECT
c.id,
c.numero_processo,
c.created_at,
COUNT(ci.id) as total_items,
COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items,
CASE
WHEN COUNT(ci.id) > 0 THEN
ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1)
ELSE 0
END as progress_percentage
FROM checklists c
LEFT JOIN checklist_items ci ON c.id = ci.checklist_id
LEFT JOIN current_item_states cis ON ci.id = cis.item_id
GROUP BY c.id, c.numero_processo, c.created_at
)
SELECT
COUNT(*) as total_checklists,
COUNT(DISTINCT numero_processo) as total_processos,
SUM(total_items) as total_items,
SUM(completed_items) as completed_items,
COUNT(CASE WHEN progress_percentage = 100 THEN 1 END) as completed_checklists,
MIN(created_at) as first_checklist_date,
MAX(created_at) as last_checklist_date,
ROUND(AVG(progress_percentage), 1) as avg_progress
FROM checklist_progress
""")
return cur.fetchone()
def get_checklist_interactions_summary():
"""Retorna resumo de interações por checklist"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT
c.id,
c.name,
c.numero_processo,
c.created_at,
COUNT(ii.id) as total_interactions,
COUNT(DISTINCT ii.item_id) as items_with_interactions,
MIN(ii.timestamp) as first_interaction,
MAX(ii.timestamp) as last_interaction
FROM checklists c
LEFT JOIN item_interactions ii ON c.id = ii.checklist_id
GROUP BY c.id, c.name, c.numero_processo, c.created_at
HAVING COUNT(ii.id) > 0
ORDER BY c.created_at DESC
""")
return cur.fetchall()
def get_process_summary():
"""Retorna resumo por número de processo"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
WITH checklist_progress AS (
SELECT
c.id,
c.numero_processo,
COUNT(ci.id) as total_items,
COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items,
CASE
WHEN COUNT(ci.id) > 0 THEN
ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1)
ELSE 0
END as progress_percentage
FROM checklists c
LEFT JOIN checklist_items ci ON c.id = ci.checklist_id
LEFT JOIN current_item_states cis ON ci.id = cis.item_id
WHERE c.numero_processo IS NOT NULL
GROUP BY c.id, c.numero_processo
)
SELECT
numero_processo,
COUNT(*) as total_checklists,
SUM(total_items) as total_items,
SUM(completed_items) as completed_items,
ROUND(AVG(progress_percentage), 1) as avg_progress
FROM checklist_progress
GROUP BY numero_processo
ORDER BY avg_progress DESC
""")
return cur.fetchall()
def get_fastest_processes():
"""Retorna os processos mais rápidos baseado no tempo médio de conclusão"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
WITH checklist_completion_times AS (
SELECT
c.id,
c.numero_processo,
c.name,
c.created_at,
MIN(ii.timestamp) as first_interaction,
MAX(ii.timestamp) as last_interaction,
COUNT(DISTINCT ci.id) as total_items,
COUNT(DISTINCT CASE WHEN cis.is_checked THEN ci.id END) as completed_items,
CASE
WHEN COUNT(ci.id) > 0 THEN
ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1)
ELSE 0
END as progress_percentage
FROM checklists c
LEFT JOIN checklist_items ci ON c.id = ci.checklist_id
LEFT JOIN current_item_states cis ON ci.id = cis.item_id
LEFT JOIN item_interactions ii ON c.id = ii.checklist_id
WHERE c.numero_processo IS NOT NULL
GROUP BY c.id, c.numero_processo, c.name, c.created_at
),
process_times AS (
SELECT
numero_processo,
COUNT(*) as total_checklists,
COUNT(CASE WHEN progress_percentage = 100 THEN 1 END) as completed_checklists,
AVG(progress_percentage) as avg_progress,
AVG(
CASE
WHEN first_interaction IS NOT NULL AND last_interaction IS NOT NULL
THEN EXTRACT(EPOCH FROM (last_interaction - first_interaction)) / 3600.0 -- horas
ELSE NULL
END
) as avg_completion_hours,
AVG(
CASE
WHEN first_interaction IS NOT NULL
THEN EXTRACT(EPOCH FROM (first_interaction - created_at)) / 3600.0 -- horas até primeira interação
ELSE NULL
END
) as avg_start_hours
FROM checklist_completion_times
GROUP BY numero_processo
)
SELECT
numero_processo,
total_checklists,
completed_checklists,
ROUND(avg_progress::numeric, 1) as avg_progress,
ROUND(avg_completion_hours::numeric, 2) as avg_completion_hours,
ROUND(avg_start_hours::numeric, 2) as avg_start_hours,
ROUND((avg_completion_hours + avg_start_hours)::numeric, 2) as total_avg_time_hours
FROM process_times
WHERE avg_completion_hours IS NOT NULL
OR avg_start_hours IS NOT NULL
ORDER BY total_avg_time_hours ASC NULLS LAST
LIMIT 10
""")
return cur.fetchall()
def get_process_deadline_analysis():
"""Analisa prazos dos processos com deadline de 6 meses"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
WITH process_progress AS (
SELECT
c.numero_processo,
c.created_at,
COUNT(ci.id) as total_items,
COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items,
CASE
WHEN COUNT(ci.id) > 0 THEN
ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1)
ELSE 0
END as progress_percentage,
MIN(ii.timestamp) as first_interaction,
MAX(ii.timestamp) as last_interaction,
COUNT(DISTINCT c.id) as total_checklists
FROM checklists c
LEFT JOIN checklist_items ci ON c.id = ci.checklist_id
LEFT JOIN current_item_states cis ON ci.id = cis.item_id
LEFT JOIN item_interactions ii ON c.id = ii.checklist_id
WHERE c.numero_processo IS NOT NULL
GROUP BY c.numero_processo, c.created_at
),
process_summary AS (
SELECT
numero_processo,
MIN(created_at) as process_start_date,
MAX(created_at) as latest_checklist_date,
SUM(total_items) as total_items,
SUM(completed_items) as completed_items,
ROUND(AVG(progress_percentage), 1) as avg_progress,
MIN(first_interaction) as first_interaction,
MAX(last_interaction) as last_interaction,
SUM(total_checklists) as total_checklists
FROM process_progress
GROUP BY numero_processo
)
SELECT
numero_processo,
process_start_date,
latest_checklist_date,
total_items,
completed_items,
avg_progress,
first_interaction,
last_interaction,
total_checklists,
-- Cálculos de prazo (6 meses = 180 dias)
(process_start_date + INTERVAL '180 days')::date as deadline_date,
CURRENT_DATE - process_start_date::date as days_elapsed,
(process_start_date + INTERVAL '180 days')::date - CURRENT_DATE as days_remaining,
-- Velocidade e projeção
CASE
WHEN (CURRENT_DATE - process_start_date::date) > 0 AND avg_progress > 0 THEN
ROUND((avg_progress / (CURRENT_DATE - process_start_date::date)) *
(100 - avg_progress), 0)
ELSE NULL
END as projected_days_to_complete,
-- Status do prazo
CASE
WHEN avg_progress = 100 THEN 'CONCLUIDO'
WHEN (CURRENT_DATE - process_start_date::date) > 180 THEN 'ATRASADO'
WHEN (CURRENT_DATE - process_start_date::date) > 0 AND avg_progress > 0 THEN
CASE
WHEN ((avg_progress / (CURRENT_DATE - process_start_date::date)) *
(100 - avg_progress) + (CURRENT_DATE - process_start_date::date)) > 180
THEN 'RISCO_ATRASO'
WHEN ((avg_progress / (CURRENT_DATE - process_start_date::date)) *
(100 - avg_progress) + (CURRENT_DATE - process_start_date::date)) > 150
THEN 'EM_RISCO'
ELSE 'NO_PRAZO'
END
ELSE 'SEM_DADOS'
END as status_prazo
FROM process_summary
ORDER BY days_remaining ASC NULLS LAST
""")
return cur.fetchall()
def get_comprehensive_analysis_data():
"""Retorna dados completos para análise com IA"""
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# Primeira query: dados básicos dos processos
cur.execute("""
SELECT
c.numero_processo,
COUNT(DISTINCT c.id) as total_checklists,
MIN(c.created_at) as process_start_date,
MAX(c.created_at) as latest_checklist_date,
COUNT(ci.id) as total_items,
COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items,
CASE
WHEN COUNT(ci.id) > 0 THEN
ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1)
ELSE 0
END as avg_progress_percentage,
CURRENT_DATE - MIN(c.created_at)::date as days_elapsed,
(MIN(c.created_at) + INTERVAL '180 days')::date - CURRENT_DATE as days_remaining_to_deadline
FROM checklists c
LEFT JOIN checklist_items ci ON c.id = ci.checklist_id
LEFT JOIN current_item_states cis ON ci.id = cis.item_id
WHERE c.numero_processo IS NOT NULL
GROUP BY c.numero_processo
ORDER BY days_remaining_to_deadline ASC
""")
basic_process_data = cur.fetchall()
# Segunda query: dados de interações por processo
process_data = []
for process in basic_process_data:
cur.execute("""
SELECT
COUNT(ii.id) as total_interactions,
MIN(ii.timestamp) as first_interaction,
MAX(ii.timestamp) as last_interaction
FROM checklists c
LEFT JOIN item_interactions ii ON c.id = ii.checklist_id
WHERE c.numero_processo = %s
""", (process['numero_processo'],))
interaction_data = cur.fetchone()
# Combinar dados
combined_process = dict(process)
combined_process.update(interaction_data)
# Adicionar campos calculados
combined_process['status_prazo'] = (
'CONCLUIDO' if combined_process['avg_progress_percentage'] == 100
else 'ATRASADO' if combined_process['days_remaining_to_deadline'] < 0
else 'EM_RISCO' if combined_process['days_remaining_to_deadline'] < 30
else 'NO_PRAZO'
)
if combined_process['days_elapsed'] > 0 and combined_process['avg_progress_percentage'] > 0:
combined_process['projected_days_to_complete'] = round(
(combined_process['avg_progress_percentage'] / combined_process['days_elapsed']) *
(100 - combined_process['avg_progress_percentage'])
)
else:
combined_process['projected_days_to_complete'] = None
process_data.append(combined_process)
# Estatísticas globais - query simples
cur.execute("""
SELECT
COUNT(DISTINCT c.id) as total_checklists_global,
COUNT(DISTINCT c.numero_processo) as total_processes,
COUNT(ci.id) as total_items_global,
COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items_global,
COUNT(ii.id) as total_interactions_global,
MIN(c.created_at) as earliest_process,
MAX(c.created_at) as latest_process
FROM checklists c
LEFT JOIN checklist_items ci ON c.id = ci.checklist_id
LEFT JOIN current_item_states cis ON ci.id = cis.item_id
LEFT JOIN item_interactions ii ON c.id = ii.checklist_id
""")
global_stats = cur.fetchone()
return {
'process_data': process_data,
'global_stats': global_stats
}
# Função para testar conexão
def test_connection():
"""Testa a conexão com o banco de dados"""
try:
conn = criar_conexao()
if conn:
conn.close()
return True
return False
except Exception as e:
print(f"❌ Erro no teste de conexão: {e}")
return False