# -*- coding: utf-8 -*- import streamlit as st from datetime import date, datetime, timedelta from typing import Dict, List, Optional from banco import SessionLocal from models import EventoCalendario from utils_permissoes import verificar_permissao from utils_auditoria import registrar_log from utils_datas import formatar_data_br # Tentativa segura do componente de calendário try: from streamlit_calendar import calendar _HAS_CAL = True _CAL_ERR = None except Exception as _e: _HAS_CAL = False _CAL_ERR = _e # Ambiente atual (opcional) try: from db_router import current_db_choice _HAS_ROUTER = True except Exception: _HAS_ROUTER = False def current_db_choice() -> str: return "prod" # ===================================================== # 📅 CALENDÁRIO + CRONOGRAMA ANUAL (D-3 / D-2 / D-1 / D 🚢) # ===================================================== # ------------------------------ # ⚙️ Regras de embarque (fase/seed e passo) # ------------------------------ REGRAS_FPSO: Dict[str, Dict[str, int]] = { "ATD": {"seed_day": 1, "step": 5}, "ADG": {"seed_day": 1, "step": 5}, "CDM": {"seed_day": 2, "step": 5}, "CDP": {"seed_day": 2, "step": 5}, "CDS": {"seed_day": 2, "step": 5}, "CDI": {"seed_day": 5, "step": 5}, # (ACDI → CDI) "CDA": {"seed_day": 5, "step": 5}, "SEP": {"seed_day": 4, "step": 4}, # sem dia vazio "ESS": {"seed_day": 3, "step": 7}, # blocos com pausa maior } # 🎨 Paleta COLOR_MAP = { "D-3": "#00B050", # verde "D-2": "#FF0000", # vermelho "D-1": "#C00000", # vermelho escuro "D": "#7F7F7F", # cinza } EMOJI_NAVIO = " 🚢" # adicionado aos títulos no dia D # ------------------------------ # Helpers de ambiente e auditoria # ------------------------------ def _usuario_atual() -> str: return (st.session_state.get("usuario") or "sistema") def _audit(acao: str, registro_id: Optional[int] = None): """Chama registrar_log com ambiente quando possível (sem quebrar a UX).""" try: registrar_log( usuario=_usuario_atual(), acao=acao, tabela="eventos_calendario", registro_id=registro_id, ambiente=current_db_choice() if _HAS_ROUTER else "prod", ) except Exception: # Mantém a UX mesmo se auditoria falhar pass def _can_access(mod_key: str = "calendario") -> bool: """ Verifica permissão com assinatura ampla (perfil/usuario/ambiente) e, se necessário, faz fallback para assinatura simples verificar_permissao(mod_key). """ try: return bool( verificar_permissao( perfil=st.session_state.get("perfil", "usuario"), modulo_key=mod_key, usuario=st.session_state.get("usuario"), ambiente=current_db_choice() if _HAS_ROUTER else "prod", ) ) except TypeError: # Fallback assinatura simples try: return bool(verificar_permissao(mod_key)) except Exception: return False except Exception: return False # ------------------------------ # Builders de eventos do cronograma # ------------------------------ def _criar_evento_fc(title: str, dt: date, color: str, extra: Dict = None) -> dict: """Monta um evento no formato FullCalendar/streamlit_calendar.""" ev = { "id": f"auto::{title}::{dt.isoformat()}", "title": title, "start": dt.isoformat(), "allDay": True, "color": color, "extendedProps": {"gerado_auto": True}, } if extra: ev["extendedProps"].update(extra) return ev def _rotulo_antes_de_d(dias: int) -> str: """Converte o deslocamento até D para rótulo: 0->D, 1->D-1, 2->D-2, 3->D-3, outros->''""" if dias == 0: return "D" if dias in (1, 2, 3): return f"D-{dias}" return "" def _gerar_cronograma_ano( ano: int, fpsos_sel: List[str], incluir_anteriores: bool = True, apenas_D: bool = False, ) -> List[dict]: """ Gera eventos 'D-3/D-2/D-1/D 🚢' para TODO o ano. - incluir_anteriores: inclui D-1..D-3 que caem no começo do ano (vindo do D-semente). - apenas_D: se True, somente 'D 🚢'. """ events: List[dict] = [] dt_ini = date(ano, 1, 1) dt_fim = date(ano, 12, 31) for fpso in fpsos_sel: cfg = REGRAS_FPSO.get(fpso) if not cfg: continue seed_day = max(1, min(cfg["seed_day"], 28)) # segurança (fev) seed = date(ano, 1, seed_day) step = int(cfg["step"]) # Todos os D do ano d = seed while d <= dt_fim: if d >= dt_ini: # D (com emoji) + cor titulo_d = f"{fpso} – D{EMOJI_NAVIO}" events.append( _criar_evento_fc( titulo_d, d, COLOR_MAP["D"], {"tipo": "D", "fpso": fpso} ) ) if not apenas_D: # D-1..D-3 for k in (1, 2, 3): dk = d - timedelta(days=k) if dt_ini <= dk <= dt_fim: label = f"D-{k}" events.append( _criar_evento_fc( f"{fpso} – {label}", dk, COLOR_MAP[label], {"tipo": label, "fpso": fpso}, ) ) d += timedelta(days=step) # Cobertura no início do ano (apenas rótulos anteriores ao D-semente) if incluir_anteriores and not apenas_D: for k in (1, 2, 3): dk = seed - timedelta(days=k) if dt_ini <= dk <= dt_fim: label = f"D-{k}" events.append( _criar_evento_fc( f"{fpso} – {label}", dk, COLOR_MAP[label], {"tipo": label, "fpso": fpso}, ) ) return events def _gerar_cronograma_intervalo( ano_ini: int, ano_fim: int, fpsos_sel: List[str], apenas_D: bool = False, ) -> List[dict]: """Gera eventos para [ano_ini..ano_fim].""" out: List[dict] = [] for y in range(ano_ini, ano_fim + 1): out.extend(_gerar_cronograma_ano(y, fpsos_sel, incluir_anteriores=True, apenas_D=apenas_D)) return out def _titulo_normalizado(titulo: str) -> str: """Remove o emoji ' 🚢' apenas para comparação/deduplicação.""" return titulo.replace(EMOJI_NAVIO, "") def _dedup_chave(titulo: str, data_evt: date) -> str: """Chave de de-duplicação (título normalizado + data).""" return f"{_titulo_normalizado(titulo)}::{data_evt.isoformat()}" # ------------------------------ # Persistência no banco # ------------------------------ def _gravar_cronograma_no_banco(db, eventos_fc: List[dict]) -> int: """ Grava no banco eventos 'gerado_auto' evitando duplicados (ignorando emoji). Retorna contagem de inserções. """ if not eventos_fc: return 0 min_day = min(date.fromisoformat(ev["start"][:10]) for ev in eventos_fc) max_day = max(date.fromisoformat(ev["start"][:10]) for ev in eventos_fc) existentes = ( db.query(EventoCalendario) .filter(EventoCalendario.data_evento >= min_day) .filter(EventoCalendario.data_evento <= max_day) .filter(EventoCalendario.ativo.is_(True)) .all() ) idx_existentes = { _dedup_chave(e.titulo, e.data_evento): e.id for e in existentes } ins = 0 for ev in eventos_fc: if not ev.get("extendedProps", {}).get("gerado_auto"): continue titulo = ev["title"] dt = date.fromisoformat(ev["start"][:10]) k = _dedup_chave(titulo, dt) if k in idx_existentes: continue novo = EventoCalendario( titulo=titulo, # mantém o emoji nos D descricao=f"Cronograma automático ({ev['extendedProps'].get('tipo','')})", data_evento=dt, data_lembrete=None, ativo=True, usuario_criacao=_usuario_atual(), data_criacao=datetime.now(), ) db.add(novo) try: db.commit() ins += 1 except Exception: db.rollback() return ins def _remover_cronograma_do_banco_intervalo(db, fpsos_sel: List[str], ano_ini: int, ano_fim: int) -> int: """ Remove do banco os eventos gerados por este módulo, para [ano_ini..ano_fim] e FPSOs. Busca por títulos (' – D' / ' – D 🚢' / ' – D-1/2/3') e data no intervalo. """ ini = date(ano_ini, 1, 1) fim = date(ano_fim, 12, 31) total = 0 for fpso in fpsos_sel: base = [f"{fpso} – D", f"{fpso} – D-1", f"{fpso} – D-2", f"{fpso} – D-3"] # inclui com emoji para D variantes = base + [f"{fpso} – D{EMOJI_NAVIO}"] to_del = ( db.query(EventoCalendario) .filter(EventoCalendario.data_evento >= ini) .filter(EventoCalendario.data_evento <= fim) .filter(EventoCalendario.titulo.in_(variantes)) .all() ) for e in to_del: db.delete(e) total += 1 try: db.commit() except Exception: db.rollback() return total # ------------------------------ # UI principal # ------------------------------ def main(): # ===================================================== # 🔒 PROTEÇÃO POR PERFIL # ===================================================== if not _can_access("calendario"): st.error("⛔ Acesso não autorizado.") return if not _HAS_CAL: st.warning( f"O componente `streamlit-calendar` não está disponível ({_CAL_ERR}). " f"Adicione `streamlit-calendar==1.0.0` ao requirements.txt." ) return st.title("📅 Calendário e Lembretes") hoje = date.today() db = SessionLocal() # Helper: cor por status (eventos do banco) def _cor_evento_db(e: "EventoCalendario") -> str: if not e.ativo: return "#95a5a6" # Cinza if e.data_evento < hoje: return "#e74c3c" # Vermelho (passado) if e.data_lembrete and e.data_lembrete == hoje: return "#f39c12" # Laranja (lembrete hoje) return "#2ecc71" # Verde (ativo futuro) # Converte EventoCalendario do banco → FullCalendar def _to_fc_event_db(e: "EventoCalendario") -> dict: return { "id": str(e.id), "title": e.titulo, "start": e.data_evento.isoformat(), "allDay": True, "color": _cor_evento_db(e), "extendedProps": { "descricao": (e.descricao or ""), "data_evento": e.data_evento.isoformat(), "data_lembrete": e.data_lembrete.isoformat() if e.data_lembrete else None, "ativo": e.ativo, "gerado_auto": False, }, } try: # ===================================================== # 🔔 LEMBRETES DO DIA # ===================================================== st.subheader("⏰ Lembretes de Hoje | Adicione o calendário da sua embarcação") lembretes = ( db.query(EventoCalendario) .filter(EventoCalendario.data_lembrete == hoje) .filter(EventoCalendario.ativo.is_(True)) .order_by(EventoCalendario.data_evento) .all() ) if lembretes: for l in lembretes: st.warning(f"🔔 **{l.titulo}** — Evento em {formatar_data_br(l.data_evento)}") else: st.info("Nenhum lembrete para hoje.") st.divider() # ===================================================== # 🎛️ CONTROLES DO CRONOGRAMA # ===================================================== st.subheader("🛠️ Cronograma de Embarques (D-3 / D-2 / D-1 / D 🚢)") col_a, col_b, col_c = st.columns([1, 2, 2]) with col_a: ano_sel = st.number_input( "Ano", min_value=2000, max_value=2100, value=hoje.year, step=1, key="cal_ano_sel" ) fpsos_all = list(REGRAS_FPSO.keys()) with col_b: fpsos_sel = st.multiselect( "FPSOs", options=fpsos_all, default=fpsos_all, key="cal_fpsos_sel", ) if not fpsos_sel: fpsos_sel = fpsos_all with col_c: apenas_D = st.checkbox("Exibir apenas dias de Embarque (D)", value=False) # Gera cronograma em memória para o ANO selecionado (visualização) eventos_auto = _gerar_cronograma_ano( ano_sel, fpsos_sel, incluir_anteriores=True, apenas_D=apenas_D ) # 🔁 Ações de banco: ANO col_b1, col_b2, col_b3, col_b4 = st.columns([1.7, 1.7, 2, 2]) with col_b1: if st.button("💾 Gravar cronograma (ano) no banco"): qtd = _gravar_cronograma_no_banco(db, eventos_auto) if qtd > 0: _audit("CRIAR", None) st.success(f"Cronograma do ano {ano_sel} gravado/atualizado. Inserções: {qtd}.") st.rerun() with col_b2: if st.button("🧹 Remover cronograma (ano) do banco"): qtd = _remover_cronograma_do_banco_intervalo(db, fpsos_sel, ano_sel, ano_sel) if qtd > 0: _audit("EXCLUIR", None) st.warning(f"Eventos removidos do banco (ano {ano_sel}): {qtd}.") st.rerun() # 🔁 Ações de banco: INTERVALO ATÉ 2030 with col_b3: if st.button("💾 Gravar cronograma até 2030 (banco)"): eventos_lote = _gerar_cronograma_intervalo( ano_ini=ano_sel, ano_fim=2030, fpsos_sel=fpsos_sel, apenas_D=apenas_D ) qtd = _gravar_cronograma_no_banco(db, eventos_lote) if qtd > 0: _audit("CRIAR", None) st.success(f"Cronogramas {ano_sel}–2030 gravados/atualizados. Inserções: {qtd}.") st.rerun() with col_b4: if st.button("🧹 Remover cronograma até 2030 (banco)"): qtd = _remover_cronograma_do_banco_intervalo(db, fpsos_sel, ano_sel, 2030) if qtd > 0: _audit("EXCLUIR", None) st.warning(f"Eventos removidos do banco ({ano_sel}–2030): {qtd}.") st.rerun() st.caption( "• A geração automática **não** altera seus eventos manuais. " "Use os botões para **gravar** ou **remover** do banco apenas os eventos criados por este módulo. " "Nos dias de **D**, o título inclui o ícone de navio (🚢)." ) st.divider() # ===================================================== # ➕ NOVO EVENTO / LEMBRETE (manual) # ===================================================== with st.expander("➕ Novo Evento / Lembrete"): with st.form("form_evento"): titulo = st.text_input("Título *") descricao = st.text_area("Descrição") data_evento = st.date_input("Data do Evento", value=hoje, format="DD/MM/YYYY") informar_lembrete = st.checkbox("Definir lembrete?") data_lembrete = None if informar_lembrete: data_lembrete = st.date_input( "Data do Lembrete", value=hoje, format="DD/MM/YYYY", key="dt_lembrete_novo" ) ativo = st.checkbox("Evento ativo", value=True) salvar = st.form_submit_button("💾 Salvar Evento") if salvar: if not titulo.strip(): st.error("⚠️ O título é obrigatório.") elif data_lembrete and (data_lembrete > data_evento): st.error("⚠️ O lembrete não pode ser após a data do evento.") else: evento = EventoCalendario( titulo=titulo.strip(), descricao=(descricao or "").strip(), data_evento=data_evento, data_lembrete=data_lembrete, ativo=ativo, usuario_criacao=_usuario_atual(), data_criacao=datetime.now() ) db.add(evento) try: db.commit() except Exception as e: db.rollback() st.error(f"❌ Erro ao salvar evento: {e}") else: _audit("CRIAR", getattr(evento, "id", None)) st.success("✅ Evento criado com sucesso!") st.rerun() st.divider() # ===================================================== # 📆 CALENDÁRIO (eventos do banco + cronograma do ANO selecionado) # ===================================================== st.subheader("📆 Calendário (clique no dia ou no evento para ver a observação)") # Banco (apenas ano selecionado na visualização) ini_year = date(ano_sel, 1, 1) end_year = date(ano_sel, 12, 31) eventos_db = ( db.query(EventoCalendario) .filter(EventoCalendario.data_evento >= ini_year) .filter(EventoCalendario.data_evento <= end_year) .order_by(EventoCalendario.data_evento.asc()) .all() ) eventos_fc_db = [_to_fc_event_db(e) for e in eventos_db] # Junta cronograma automático (memória) + banco (para a visualização do ano) eventos_fc = eventos_fc_db + eventos_auto options = { "initialView": "dayGridMonth", "locale": "pt-br", "height": 700, "firstDay": 1, "weekNumbers": False, "headerToolbar": { "left": "prev,next today", "center": "title", "right": "dayGridMonth,dayGridWeek,listWeek" }, "buttonText": { "today": "Hoje", "month": "Mês", "week": "Semana", "day": "Dia", "list": "Lista" }, "dayMaxEventRows": True, "navLinks": True, } state = calendar( events=eventos_fc, options=options, custom_css="", key=f"calendario_eventos_{ano_sel}" ) # Legenda with st.container(): cols = st.columns([1.2, 1.2, 1.2, 1.2, 2.2, 3]) cols[0].markdown("⬛ **D** (cinza) " + EMOJI_NAVIO) cols[1].markdown("🟥 **D‑1** (vinho)") cols[2].markdown("🟥 **D‑2** (vermelho)") cols[3].markdown("🟩 **D‑3** (verde)") cols[4].markdown("🟧 **Lembrete hoje (eventos do banco)**") cols[5].markdown("🟦 **Outros eventos (banco)**") st.divider() # ===================================================== # 🔎 Detalhe por clique (evento ou dia) # ===================================================== clicked_event = None if isinstance(state, dict): clicked_event = (state.get("eventClick") or {}).get("event") clicked_date_str = (state.get("dateClick") or {}).get("dateStr") else: clicked_date_str = None if clicked_event: ev_id = clicked_event.get("id") ev_title = clicked_event.get("title") ev_start = clicked_event.get("start") ev_ext = clicked_event.get("extendedProps") or {} # Se for do banco, traz detalhes atualizados e = None if ev_id and not str(ev_id).startswith("auto::"): try: # SQLAlchemy 2.x: use Session.get(Model, pk) e = db.get(EventoCalendario, int(ev_id)) except Exception: e = None st.subheader(f"📌 {ev_title or 'Evento'}") if e: st.markdown( f""" **Descrição:** {e.descricao or "_Sem descrição_"} **📅 Data do Evento:** {formatar_data_br(e.data_evento)} **⏰ Data do Lembrete:** {formatar_data_br(e.data_lembrete) if e.data_lembrete else "_Sem lembrete_"} **📌 Status:** {"Ativo ✅" if e.ativo else "Inativo ❌"} """ ) if _can_access("administracao"): col1, col2 = st.columns(2) with col1: if e.ativo and st.button("🚫 Desativar", key=f"desativar_{e.id}"): e.ativo = False try: db.commit() except Exception as ex: db.rollback() st.error(f"Erro ao desativar: {ex}") else: _audit("DESATIVAR", e.id) st.success("Evento desativado.") st.rerun() with col2: if st.button("🗑️ Excluir", key=f"excluir_{e.id}"): db.delete(e) try: db.commit() except Exception as ex: db.rollback() st.error(f"Erro ao excluir: {ex}") else: _audit("EXCLUIR", e.id) st.success("Evento excluído.") st.rerun() else: # Evento do cronograma automático (memória) try: dt_evt = date.fromisoformat(ev_start[:10]) except Exception: dt_evt = None st.markdown( f""" **FPSO:** {ev_title.split(' – ')[0] if ev_title and ' – ' in ev_title else '—'} **Tipo:** {ev_ext.get('tipo', '—')} **📅 Data:** {formatar_data_br(dt_evt) if dt_evt else '—'} **Origem:** _Cronograma automático (não gravado no banco)_ """ ) elif clicked_date_str: try: data_clicada = date.fromisoformat(clicked_date_str) except Exception: data_clicada = None if data_clicada: st.subheader(f"🗓️ Eventos em {formatar_data_br(data_clicada)}") # Banco eventos_no_dia_db = ( db.query(EventoCalendario) .filter(EventoCalendario.data_evento == data_clicada) .order_by(EventoCalendario.id.desc()) .all() ) if not eventos_no_dia_db: st.info("Nenhum evento do banco para este dia.") else: st.markdown("**📦 Eventos do banco**") for e in eventos_no_dia_db: with st.expander(f"📌 {e.titulo}"): st.markdown( f""" **Descrição:** {e.descricao or "_Sem descrição_"} **📅 Data do Evento:** {formatar_data_br(e.data_evento)} **⏰ Data do Lembrete:** {formatar_data_br(e.data_lembrete) if e.data_lembrete else "_Sem lembrete_"} **📌 Status:** {"Ativo ✅" if e.ativo else "Inativo ❌"} """ ) if _can_access("administracao"): c1, c2 = st.columns(2) with c1: if e.ativo and st.button("🚫 Desativar", key=f"desativar_list_{e.id}"): e.ativo = False try: db.commit() except Exception as ex: db.rollback() st.error(f"Erro ao desativar: {ex}") else: _audit("DESATIVAR", e.id) st.success("Evento desativado.") st.rerun() with c2: if st.button("🗑️ Excluir", key=f"excluir_list_{e.id}"): db.delete(e) try: db.commit() except Exception as ex: db.rollback() st.error(f"Erro ao excluir: {ex}") else: _audit("EXCLUIR", e.id) st.success("Evento excluído.") st.rerun() # Cronograma automático (memória) – ano selecionado eventos_auto_no_dia = [ ev for ev in eventos_auto if ev.get("start", "")[:10] == data_clicada.isoformat() ] if eventos_auto_no_dia: st.markdown("**🛠️ Eventos gerados automaticamente (cronograma)**") for ev in sorted(eventos_auto_no_dia, key=lambda x: x.get("title","")): fps = ev.get("title","").split(" – ")[0] if " – " in ev.get("title","") else "—" tipo = ev.get("extendedProps", {}).get("tipo", "—") st.write(f"• **{fps}** — **{tipo}** ({formatar_data_br(data_clicada)})") st.divider() # ===================================================== # 📆 Consultar Eventos por Data (modo antigo) — inclui AUTO # ===================================================== with st.expander("📆 Consultar Eventos por Data (modo antigo)"): data_consulta = st.date_input("Selecione uma data", value=hoje, format="DD/MM/YYYY", key="consulta_antiga") # Banco eventos = ( db.query(EventoCalendario) .filter(EventoCalendario.data_evento == data_consulta) .order_by(EventoCalendario.id.desc()) .all() ) if not eventos: st.info("Nenhum evento do banco para esta data.") else: st.markdown("**📦 Eventos do banco**") for e in eventos: with st.expander(f"📌 {e.titulo}"): st.markdown( f""" **Descrição:** {e.descricao or "_Sem descrição_"} **📅 Data do Evento:** {formatar_data_br(e.data_evento)} **⏰ Data do Lembrete:** {formatar_data_br(e.data_lembrete) if e.data_lembrete else "_Sem lembrete_"} **📌 Status:** {"Ativo ✅" if e.ativo else "Inativo ❌"} """ ) if _can_access("administracao"): col1, col2 = st.columns(2) with col1: if e.ativo and st.button("🚫 Desativar", key=f"desativar_old_{e.id}"): e.ativo = False try: db.commit() except Exception as ex: db.rollback() st.error(f"Erro ao desativar: {ex}") else: _audit("DESATIVAR", e.id) st.success("Evento desativado.") st.rerun() with col2: if st.button("🗑️ Excluir", key=f"excluir_old_{e.id}"): db.delete(e) try: db.commit() except Exception as ex: db.rollback() st.error(f"Erro ao excluir: {ex}") else: _audit("EXCLUIR", e.id) st.success("Evento excluído.") st.rerun() # AUTO (memória) no ano selecionado eventos_auto_antigo = [ ev for ev in eventos_auto if ev.get("start", "")[:10] == data_consulta.isoformat() ] if eventos_auto_antigo: st.markdown("**🛠️ Eventos gerados automaticamente (cronograma)**") for ev in sorted(eventos_auto_antigo, key=lambda x: x.get("title","")): fps = ev.get("title","").split(" – ")[0] if " – " in ev.get("title","") else "—" tipo = ev.get("extendedProps", {}).get("tipo", "—") st.write(f"• **{fps}** — **{tipo}** ({formatar_data_br(data_consulta)})") finally: try: db.close() except Exception: pass