"""Módulo de conexão e operações com o banco de dados PostgreSQL""" import psycopg2 from psycopg2.extras import RealDictCursor import uuid from datetime import datetime from contextlib import contextmanager import streamlit as st import os import re def extrair_credenciais(): """Extrai as credenciais do secret db_checklist""" # Primeiro tenta o secret db_checklist db_checklist = os.getenv('db_checklist', '') if db_checklist: # Extrai as credenciais do formato usado no secret credenciais = {} # Padrões para extrair cada valor do formato específico padroes = { 'host': r'Db_host:\s*([\d\.]+)', 'port': r'Porta:\s*(\d+)', 'user': r'Username:\s*(\w+)', 'database': r'Database:\s*(\w+)', 'password': r'Password:\s*(\w+)' } for key, padrao in padroes.items(): match = re.search(padrao, db_checklist) if match: if key == 'port': credenciais[key] = int(match.group(1)) else: credenciais[key] = match.group(1) return credenciais # Fallback para variáveis individuais credenciais = { 'host': os.getenv('DB_HOST'), 'port': int(os.getenv('DB_PORT', '5432')), 'database': os.getenv('DB_NAME'), 'user': os.getenv('DB_USER'), 'password': os.getenv('DB_PASSWORD') } # Fallback para valores hardcoded quando variáveis não estão definidas if not credenciais['host']: credenciais.update({ 'host': '77.37.43.160', 'port': 5432, 'database': 'checklist', 'user': 'abimael', 'password': 'ctweek' }) return credenciais def criar_conexao(): """Cria uma nova conexão com o banco de dados""" try: creds = extrair_credenciais() # Debug print(f"Conectando com: host={creds.get('host')}, port={creds.get('port')} (tipo: {type(creds.get('port'))}), database={creds.get('database')}, user={creds.get('user')}") # Garante que a porta seja um inteiro porta = creds.get('port') if isinstance(porta, str): porta = int(porta) conn = psycopg2.connect( host=creds.get('host'), port=porta, database=creds.get('database'), user=creds.get('user'), password=creds.get('password') ) print("✅ Conexão estabelecida com sucesso!") return conn except Exception as e: print(f"❌ Erro ao conectar com PostgreSQL: {e}") print(f"Tipo do erro: {type(e).__name__}") return None @contextmanager def get_db_connection(): """Context manager para conexão com o banco de dados""" conn = None try: conn = criar_conexao() if not conn: raise Exception("Não foi possível estabelecer conexão com o banco") yield conn except Exception as e: if conn: conn.rollback() raise e finally: if conn: conn.close() def create_tables(): """Cria as tabelas do banco se não existirem""" # SQL inline para evitar dependência de arquivo externo sql_script = """ -- Criar tabela de checklists CREATE TABLE IF NOT EXISTS checklists ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL, numero_processo VARCHAR(100), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Criar tabela de itens do checklist CREATE TABLE IF NOT EXISTS checklist_items ( id VARCHAR(36) PRIMARY KEY, checklist_id VARCHAR(36) REFERENCES checklists(id) ON DELETE CASCADE, text TEXT NOT NULL, position INTEGER NOT NULL ); -- Criar tabela de interações CREATE TABLE IF NOT EXISTS item_interactions ( id SERIAL PRIMARY KEY, item_id VARCHAR(36) REFERENCES checklist_items(id) ON DELETE CASCADE, checklist_id VARCHAR(36) REFERENCES checklists(id) ON DELETE CASCADE, action VARCHAR(20) NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Criar view para estados atuais dos itens CREATE OR REPLACE VIEW current_item_states AS WITH latest_interactions AS ( SELECT item_id, action, timestamp, ROW_NUMBER() OVER (PARTITION BY item_id ORDER BY timestamp DESC) as rn FROM item_interactions ) SELECT item_id, CASE WHEN action = 'checked' THEN true ELSE false END as is_checked FROM latest_interactions WHERE rn = 1; -- Criar view para análise de tempo por item CREATE OR REPLACE VIEW item_time_analysis AS WITH item_sessions AS ( SELECT ii.item_id, ci.text as item_text, ci.position, ci.checklist_id, ii.timestamp, ii.action, LAG(ii.timestamp) OVER ( PARTITION BY ii.item_id ORDER BY ii.timestamp ) as prev_timestamp, LAG(ii.action) OVER ( PARTITION BY ii.item_id ORDER BY ii.timestamp ) as prev_action FROM item_interactions ii JOIN checklist_items ci ON ii.item_id = ci.id ORDER BY ii.item_id, ii.timestamp ), work_sessions AS ( SELECT item_id, item_text, position, checklist_id, CASE WHEN prev_action = 'unchecked' AND action = 'checked' AND prev_timestamp IS NOT NULL THEN EXTRACT(EPOCH FROM (timestamp - prev_timestamp)) ELSE 0 END as session_seconds FROM item_sessions WHERE prev_timestamp IS NOT NULL ) SELECT item_id, item_text, position, checklist_id, COUNT(CASE WHEN session_seconds > 0 THEN 1 END) as times_worked, COALESCE(SUM(session_seconds), 0) as total_seconds_spent, CASE WHEN COUNT(CASE WHEN session_seconds > 0 THEN 1 END) > 0 THEN COALESCE(SUM(session_seconds), 0) / COUNT(CASE WHEN session_seconds > 0 THEN 1 END) ELSE 0 END as avg_seconds_per_completion FROM work_sessions GROUP BY item_id, item_text, position, checklist_id; """ with get_db_connection() as conn: with conn.cursor() as cur: cur.execute(sql_script) conn.commit() # Tabelas criadas com sucesso - removido print para compatibilidade Streamlit def save_checklist(name, items, numero_processo=None): """Salva um novo checklist no banco de dados""" checklist_id = str(uuid.uuid4()) with get_db_connection() as conn: with conn.cursor() as cur: # Inserir checklist cur.execute(""" INSERT INTO checklists (id, name, numero_processo) VALUES (%s, %s, %s) """, (checklist_id, name, numero_processo)) # Inserir items for position, item_text in enumerate(items): if item_text.strip(): item_id = str(uuid.uuid4()) cur.execute(""" INSERT INTO checklist_items (id, checklist_id, text, position) VALUES (%s, %s, %s, %s) """, (item_id, checklist_id, item_text.strip(), position)) conn.commit() return checklist_id def get_all_checklists(): """Retorna todos os checklists do banco""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT id, name, numero_processo, created_at FROM checklists ORDER BY created_at DESC """) return cur.fetchall() def get_checklist_with_items(checklist_id): """Retorna um checklist com seus itens e estados atuais""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: # Buscar checklist cur.execute(""" SELECT id, name, numero_processo, created_at FROM checklists WHERE id = %s """, (checklist_id,)) checklist = cur.fetchone() if not checklist: return None # Buscar items com estado atual cur.execute(""" SELECT ci.id, ci.text, ci.position, COALESCE(cis.is_checked, false) as is_checked FROM checklist_items ci LEFT JOIN current_item_states cis ON ci.id = cis.item_id WHERE ci.checklist_id = %s ORDER BY ci.position """, (checklist_id,)) checklist['items'] = cur.fetchall() return checklist def toggle_item(item_id, checklist_id, new_state): """Registra a mudança de estado de um item""" action = 'checked' if new_state else 'unchecked' with get_db_connection() as conn: with conn.cursor() as cur: cur.execute(""" INSERT INTO item_interactions (item_id, checklist_id, action) VALUES (%s, %s, %s) """, (item_id, checklist_id, action)) conn.commit() def get_checklist_analytics(checklist_id): """Retorna análises de tempo do checklist""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: # Estatísticas gerais cur.execute(""" SELECT COUNT(DISTINCT ci.id) as total_items, COUNT(DISTINCT CASE WHEN cis.is_checked THEN ci.id END) as completed_items, MIN(ii.timestamp) as first_interaction, MAX(ii.timestamp) as last_interaction FROM checklist_items ci LEFT JOIN current_item_states cis ON ci.id = cis.item_id LEFT JOIN item_interactions ii ON ci.id = ii.item_id WHERE ci.checklist_id = %s """, (checklist_id,)) stats = cur.fetchone() # Análise de tempo por item cur.execute(""" SELECT item_text, position, times_worked, total_seconds_spent, avg_seconds_per_completion FROM item_time_analysis WHERE checklist_id = %s ORDER BY position """, (checklist_id,)) time_analysis = cur.fetchall() return { 'stats': stats, 'time_analysis': time_analysis } def delete_checklist(checklist_id): """Deleta um checklist e todos os seus dados relacionados""" with get_db_connection() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM checklists WHERE id = %s", (checklist_id,)) conn.commit() def get_all_checklists_with_stats(): """Retorna todos os checklists com estatísticas de progresso""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT c.id, c.name, c.numero_processo, c.created_at, COUNT(ci.id) as total_items, COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items, CASE WHEN COUNT(ci.id) > 0 THEN ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1) ELSE 0 END as progress_percentage FROM checklists c LEFT JOIN checklist_items ci ON c.id = ci.checklist_id LEFT JOIN current_item_states cis ON ci.id = cis.item_id GROUP BY c.id, c.name, c.numero_processo, c.created_at ORDER BY c.created_at DESC """) return cur.fetchall() def get_general_stats(): """Retorna estatísticas gerais do sistema""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" WITH checklist_progress AS ( SELECT c.id, c.numero_processo, c.created_at, COUNT(ci.id) as total_items, COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items, CASE WHEN COUNT(ci.id) > 0 THEN ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1) ELSE 0 END as progress_percentage FROM checklists c LEFT JOIN checklist_items ci ON c.id = ci.checklist_id LEFT JOIN current_item_states cis ON ci.id = cis.item_id GROUP BY c.id, c.numero_processo, c.created_at ) SELECT COUNT(*) as total_checklists, COUNT(DISTINCT numero_processo) as total_processos, SUM(total_items) as total_items, SUM(completed_items) as completed_items, COUNT(CASE WHEN progress_percentage = 100 THEN 1 END) as completed_checklists, MIN(created_at) as first_checklist_date, MAX(created_at) as last_checklist_date, ROUND(AVG(progress_percentage), 1) as avg_progress FROM checklist_progress """) return cur.fetchone() def get_checklist_interactions_summary(): """Retorna resumo de interações por checklist""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT c.id, c.name, c.numero_processo, c.created_at, COUNT(ii.id) as total_interactions, COUNT(DISTINCT ii.item_id) as items_with_interactions, MIN(ii.timestamp) as first_interaction, MAX(ii.timestamp) as last_interaction FROM checklists c LEFT JOIN item_interactions ii ON c.id = ii.checklist_id GROUP BY c.id, c.name, c.numero_processo, c.created_at HAVING COUNT(ii.id) > 0 ORDER BY c.created_at DESC """) return cur.fetchall() def get_process_summary(): """Retorna resumo por número de processo""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" WITH checklist_progress AS ( SELECT c.id, c.numero_processo, COUNT(ci.id) as total_items, COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items, CASE WHEN COUNT(ci.id) > 0 THEN ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1) ELSE 0 END as progress_percentage FROM checklists c LEFT JOIN checklist_items ci ON c.id = ci.checklist_id LEFT JOIN current_item_states cis ON ci.id = cis.item_id WHERE c.numero_processo IS NOT NULL GROUP BY c.id, c.numero_processo ) SELECT numero_processo, COUNT(*) as total_checklists, SUM(total_items) as total_items, SUM(completed_items) as completed_items, ROUND(AVG(progress_percentage), 1) as avg_progress FROM checklist_progress GROUP BY numero_processo ORDER BY avg_progress DESC """) return cur.fetchall() def get_fastest_processes(): """Retorna os processos mais rápidos baseado no tempo médio de conclusão""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" WITH checklist_completion_times AS ( SELECT c.id, c.numero_processo, c.name, c.created_at, MIN(ii.timestamp) as first_interaction, MAX(ii.timestamp) as last_interaction, COUNT(DISTINCT ci.id) as total_items, COUNT(DISTINCT CASE WHEN cis.is_checked THEN ci.id END) as completed_items, CASE WHEN COUNT(ci.id) > 0 THEN ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1) ELSE 0 END as progress_percentage FROM checklists c LEFT JOIN checklist_items ci ON c.id = ci.checklist_id LEFT JOIN current_item_states cis ON ci.id = cis.item_id LEFT JOIN item_interactions ii ON c.id = ii.checklist_id WHERE c.numero_processo IS NOT NULL GROUP BY c.id, c.numero_processo, c.name, c.created_at ), process_times AS ( SELECT numero_processo, COUNT(*) as total_checklists, COUNT(CASE WHEN progress_percentage = 100 THEN 1 END) as completed_checklists, AVG(progress_percentage) as avg_progress, AVG( CASE WHEN first_interaction IS NOT NULL AND last_interaction IS NOT NULL THEN EXTRACT(EPOCH FROM (last_interaction - first_interaction)) / 3600.0 -- horas ELSE NULL END ) as avg_completion_hours, AVG( CASE WHEN first_interaction IS NOT NULL THEN EXTRACT(EPOCH FROM (first_interaction - created_at)) / 3600.0 -- horas até primeira interação ELSE NULL END ) as avg_start_hours FROM checklist_completion_times GROUP BY numero_processo ) SELECT numero_processo, total_checklists, completed_checklists, ROUND(avg_progress::numeric, 1) as avg_progress, ROUND(avg_completion_hours::numeric, 2) as avg_completion_hours, ROUND(avg_start_hours::numeric, 2) as avg_start_hours, ROUND((avg_completion_hours + avg_start_hours)::numeric, 2) as total_avg_time_hours FROM process_times WHERE avg_completion_hours IS NOT NULL OR avg_start_hours IS NOT NULL ORDER BY total_avg_time_hours ASC NULLS LAST LIMIT 10 """) return cur.fetchall() def get_process_deadline_analysis(): """Analisa prazos dos processos com deadline de 6 meses""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" WITH process_progress AS ( SELECT c.numero_processo, c.created_at, COUNT(ci.id) as total_items, COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items, CASE WHEN COUNT(ci.id) > 0 THEN ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1) ELSE 0 END as progress_percentage, MIN(ii.timestamp) as first_interaction, MAX(ii.timestamp) as last_interaction, COUNT(DISTINCT c.id) as total_checklists FROM checklists c LEFT JOIN checklist_items ci ON c.id = ci.checklist_id LEFT JOIN current_item_states cis ON ci.id = cis.item_id LEFT JOIN item_interactions ii ON c.id = ii.checklist_id WHERE c.numero_processo IS NOT NULL GROUP BY c.numero_processo, c.created_at ), process_summary AS ( SELECT numero_processo, MIN(created_at) as process_start_date, MAX(created_at) as latest_checklist_date, SUM(total_items) as total_items, SUM(completed_items) as completed_items, ROUND(AVG(progress_percentage), 1) as avg_progress, MIN(first_interaction) as first_interaction, MAX(last_interaction) as last_interaction, SUM(total_checklists) as total_checklists FROM process_progress GROUP BY numero_processo ) SELECT numero_processo, process_start_date, latest_checklist_date, total_items, completed_items, avg_progress, first_interaction, last_interaction, total_checklists, -- Cálculos de prazo (6 meses = 180 dias) (process_start_date + INTERVAL '180 days')::date as deadline_date, CURRENT_DATE - process_start_date::date as days_elapsed, (process_start_date + INTERVAL '180 days')::date - CURRENT_DATE as days_remaining, -- Velocidade e projeção CASE WHEN (CURRENT_DATE - process_start_date::date) > 0 AND avg_progress > 0 THEN ROUND((avg_progress / (CURRENT_DATE - process_start_date::date)) * (100 - avg_progress), 0) ELSE NULL END as projected_days_to_complete, -- Status do prazo CASE WHEN avg_progress = 100 THEN 'CONCLUIDO' WHEN (CURRENT_DATE - process_start_date::date) > 180 THEN 'ATRASADO' WHEN (CURRENT_DATE - process_start_date::date) > 0 AND avg_progress > 0 THEN CASE WHEN ((avg_progress / (CURRENT_DATE - process_start_date::date)) * (100 - avg_progress) + (CURRENT_DATE - process_start_date::date)) > 180 THEN 'RISCO_ATRASO' WHEN ((avg_progress / (CURRENT_DATE - process_start_date::date)) * (100 - avg_progress) + (CURRENT_DATE - process_start_date::date)) > 150 THEN 'EM_RISCO' ELSE 'NO_PRAZO' END ELSE 'SEM_DADOS' END as status_prazo FROM process_summary ORDER BY days_remaining ASC NULLS LAST """) return cur.fetchall() def get_comprehensive_analysis_data(): """Retorna dados completos para análise com IA""" with get_db_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: # Primeira query: dados básicos dos processos cur.execute(""" SELECT c.numero_processo, COUNT(DISTINCT c.id) as total_checklists, MIN(c.created_at) as process_start_date, MAX(c.created_at) as latest_checklist_date, COUNT(ci.id) as total_items, COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items, CASE WHEN COUNT(ci.id) > 0 THEN ROUND((COUNT(CASE WHEN cis.is_checked THEN 1 END)::decimal / COUNT(ci.id)) * 100, 1) ELSE 0 END as avg_progress_percentage, CURRENT_DATE - MIN(c.created_at)::date as days_elapsed, (MIN(c.created_at) + INTERVAL '180 days')::date - CURRENT_DATE as days_remaining_to_deadline FROM checklists c LEFT JOIN checklist_items ci ON c.id = ci.checklist_id LEFT JOIN current_item_states cis ON ci.id = cis.item_id WHERE c.numero_processo IS NOT NULL GROUP BY c.numero_processo ORDER BY days_remaining_to_deadline ASC """) basic_process_data = cur.fetchall() # Segunda query: dados de interações por processo process_data = [] for process in basic_process_data: cur.execute(""" SELECT COUNT(ii.id) as total_interactions, MIN(ii.timestamp) as first_interaction, MAX(ii.timestamp) as last_interaction FROM checklists c LEFT JOIN item_interactions ii ON c.id = ii.checklist_id WHERE c.numero_processo = %s """, (process['numero_processo'],)) interaction_data = cur.fetchone() # Combinar dados combined_process = dict(process) combined_process.update(interaction_data) # Adicionar campos calculados combined_process['status_prazo'] = ( 'CONCLUIDO' if combined_process['avg_progress_percentage'] == 100 else 'ATRASADO' if combined_process['days_remaining_to_deadline'] < 0 else 'EM_RISCO' if combined_process['days_remaining_to_deadline'] < 30 else 'NO_PRAZO' ) if combined_process['days_elapsed'] > 0 and combined_process['avg_progress_percentage'] > 0: combined_process['projected_days_to_complete'] = round( (combined_process['avg_progress_percentage'] / combined_process['days_elapsed']) * (100 - combined_process['avg_progress_percentage']) ) else: combined_process['projected_days_to_complete'] = None process_data.append(combined_process) # Estatísticas globais - query simples cur.execute(""" SELECT COUNT(DISTINCT c.id) as total_checklists_global, COUNT(DISTINCT c.numero_processo) as total_processes, COUNT(ci.id) as total_items_global, COUNT(CASE WHEN cis.is_checked THEN 1 END) as completed_items_global, COUNT(ii.id) as total_interactions_global, MIN(c.created_at) as earliest_process, MAX(c.created_at) as latest_process FROM checklists c LEFT JOIN checklist_items ci ON c.id = ci.checklist_id LEFT JOIN current_item_states cis ON ci.id = cis.item_id LEFT JOIN item_interactions ii ON c.id = ii.checklist_id """) global_stats = cur.fetchone() return { 'process_data': process_data, 'global_stats': global_stats } # Função para testar conexão def test_connection(): """Testa a conexão com o banco de dados""" try: conn = criar_conexao() if conn: conn.close() return True return False except Exception as e: print(f"❌ Erro no teste de conexão: {e}") return False