beta-NORM / app /frontend /layout.py
GitHub Actions
Sync from GitHub master
92145af
import os
import html
import base64
from datetime import datetime
from typing import Tuple, List, Dict, Any, Optional
import pandas as pd
import streamlit as st
from utils import base_utils as bu
from utils.conversation_word_export import build_conversation_docx, build_single_response_docx
# Ruta base relativa a este archivo (para localizar styles.css)
ROOT = os.path.dirname(__file__)
CONFIG = bu.load_config("configs/config.json")
API_URL = CONFIG.get("ui", {}).get("api_url", "http://127.0.0.1:8000/query")
BASE_API_URL = API_URL.rsplit("/", 1)[0]
UPLOAD_URL = BASE_API_URL + "/upload_document"
PRECHECK_URL = BASE_API_URL + "/precheck_document"
PRESENTATION_URL = BASE_API_URL + "/generate_presentation"
# =========================================================
# API helpers
# =========================================================
def chamar_api(pergunta: str, mode: str = "chatbot") -> Tuple[str, List[Dict[str, Any]]]:
"""Chama a API apenas com a pergunta e o modo."""
import requests
try:
payload = {"question": pergunta, "mode": mode}
resp = requests.post(API_URL, json=payload, timeout=60)
resp.raise_for_status()
data = resp.json()
return data["answer"], data.get("retrieved", [])
except Exception:
return "Resposta simulada (API não conectada).", []
def upload_document(
filename: str, file_bytes: bytes, mime_type: str = "text/markdown"
) -> Tuple[bool, str, Optional[str]]:
"""Envia um arquivo para upload/indexação no backend."""
import requests
try:
files = {"file": (filename, file_bytes, mime_type or "text/markdown")}
resp = requests.post(UPLOAD_URL, files=files, timeout=180)
data = resp.json() if resp.content else {}
if resp.status_code >= 400:
detail = data.get("detail") if isinstance(data, dict) else None
return False, detail or "Falha no upload/indexação.", None
return (
True,
data.get("message", "Documento indexado com sucesso."),
data.get("document_id"),
)
except Exception as exc:
return False, f"Erro de conexão com API: {exc}", None
def precheck_document(
filename: str, file_bytes: bytes, mime_type: str = "text/markdown"
) -> Tuple[bool, Dict[str, Any], str]:
"""Avalia se o arquivo está no escopo temático antes de indexar."""
import requests
try:
files = {"file": (filename, file_bytes, mime_type or "text/markdown")}
resp = requests.post(PRECHECK_URL, files=files, timeout=60)
data = resp.json() if resp.content else {}
if resp.status_code >= 400:
detail = data.get("detail") if isinstance(data, dict) else None
return False, {}, detail or "Falha no pré-chequeo do documento."
if not isinstance(data, dict):
return False, {}, "Resposta inválida no pré-chequeo do documento."
return True, data, ""
except Exception as exc:
return False, {}, f"Erro de conexão no pré-chequeo: {exc}"
def generate_presentation(ideas: List[str], title: str | None = None) -> Tuple[bool, str, bytes | None]:
"""Chama o endpoint de geração de apresentação e devolve o binário do PPTX.
Retorna (ok, mensagem, conteúdo_bytes_ou_None).
"""
import requests
try:
payload: Dict[str, Any] = {
"ideas": ideas,
"title": title,
}
resp = requests.post(PRESENTATION_URL, json=payload, timeout=180)
if resp.status_code >= 400:
try:
data = resp.json()
detail = data.get("detail") if isinstance(data, dict) else None
except Exception:
detail = None
return False, detail or "Falha ao gerar apresentação.", None
return True, "Apresentação gerada com sucesso.", resp.content
except Exception as exc:
return False, f"Erro de conexão com API: {exc}", None
# =========================================================
# Texto y tablas
# =========================================================
def normalize_message_text(text: str) -> str:
import re
lines = text.splitlines()
result, blank_count = [], 0
is_list_line = lambda l: bool(re.match(r"^\s*(\d+\.|[-*•])\s+", l))
i = 0
while i < len(lines):
line = lines[i]
if line.strip() == "":
blank_count += 1
i += 1
continue
if re.match(r"^\s*\d+\.\s*$", line):
j = i + 1
while j < len(lines) and lines[j].strip() == "":
j += 1
if j < len(lines):
line = f"{line.strip()} {lines[j].strip()}"
i = j
if blank_count > 0 and result:
if not (is_list_line(result[-1]) and is_list_line(line)):
result.append("")
blank_count = 0
result.append(line)
i += 1
cleaned = "\n".join(result)
return re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned).strip()
def render_markdown_table_if_exists(text: str):
import re
lines = text.splitlines()
start, end = None, None
is_md_separator = lambda l: "|" in l.strip() and re.fullmatch(r"[\s\-|:]+", l.strip()) and "-" in l.strip()
for i, line in enumerate(lines):
if "|" in line:
for j in range(i + 1, min(i + 3, len(lines))):
if is_md_separator(lines[j]):
start = i
break
if start is not None:
end = start
for k in range(start, len(lines)):
if "|" in lines[k] or is_md_separator(lines[k]):
end = k
else:
break
break
if start is None or end is None:
return None, text
table_lines = lines[start : end + 1]
header, rows = None, []
for line in table_lines:
if is_md_separator(line):
continue
cells = [c.strip() for c in line.strip().strip("|").split("|")]
if header is None:
header = cells
else:
rows.append(cells)
if header is None:
return None, text
max_cols = max([len(header)] + [len(r) for r in rows]) if rows else len(header)
header += [""] * (max_cols - len(header))
norm_rows = [r + [""] * (max_cols - len(r)) for r in rows]
try:
df = pd.DataFrame(norm_rows, columns=header)
except Exception:
return None, text
rest_text = normalize_message_text("\n".join(lines[:start] + lines[end + 1 :]))
return df, rest_text
def render_message_markdown_html(text: str) -> str:
import re
lines = (text or "").splitlines()
parts = []
for line in lines:
raw = line.strip()
if not raw:
parts.append("<br>")
continue
h3 = re.match(r"^###\s+(.*)$", raw)
h2 = re.match(r"^##\s+(.*)$", raw)
h1 = re.match(r"^#\s+(.*)$", raw)
if h3:
parts.append(f"<h3>{html.escape(h3.group(1))}</h3>")
elif h2:
parts.append(f"<h2>{html.escape(h2.group(1))}</h2>")
elif h1:
parts.append(f"<h1>{html.escape(h1.group(1))}</h1>")
else:
parts.append(html.escape(raw))
rendered = "<br>".join(parts)
return re.sub(r"(?:<br>\s*){3,}", "<br><br>", rendered)
# =========================================================
# Estilos (CSS)
# =========================================================
THEMES = {
"dark": {
"bg": "#1e1e1e",
"panel": "#2b2d31",
"input_bg": "#303134",
"text_primary": "#ffffff",
"muted": "#9ca3af",
"accent": "#f31260",
"chip_bg": "#2b2d31",
"chip_border": "#3b3f46",
"chip_hover": "#343740",
"sidebar_bg": "#171717",
"hover_text": "#ffffff",
},
"light": {
"bg": "#f5f5f7",
"panel": "#ffffff",
"input_bg": "#f0f1f3",
"text_primary": "#111827",
"muted": "#6b7280",
"accent": "#f31260",
"chip_bg": "#eef0f4",
"chip_border": "#d1d5db",
"chip_hover": "#e5e7eb",
"sidebar_bg": "#ffffff",
"hover_text": "#111827",
},
}
def apply_theme_and_css():
"""Aplica tema (variables CSS) y carga el archivo styles.css del frontend."""
if "theme" not in st.session_state:
st.session_state["theme"] = "dark"
theme_name = st.session_state["theme"] if st.session_state["theme"] in THEMES else "dark"
colors = THEMES[theme_name]
# 1) Definir variables CSS en :root
css_vars = "\n".join(f"--{k}: {v};" for k, v in colors.items())
st.markdown(
f"""
<style>
:root {{
{css_vars}
}}
</style>
""",
unsafe_allow_html=True,
)
# 2) Inyectar CSS estático desde styles.css (misma carpeta)
css_path = os.path.join(ROOT, "styles.css")
try:
with open(css_path, "r", encoding="utf-8") as f:
styles = f.read()
st.markdown(f"<style>{styles}</style>", unsafe_allow_html=True)
except FileNotFoundError:
# En caso de que el archivo no exista, no rompemos la app.
pass
return colors
# =========================================================
# UI Components
# =========================================================
def reset_to_home():
st.session_state.messages = []
st.session_state.current_conversation_id = None
st.session_state.pending_question = ""
st.session_state.pending_suggestion_question = ""
st.session_state.pending_chat_submission = None
st.session_state.pending_upload_confirmation = None
st.session_state.pop("last_mode", None)
st.session_state.pop("last_ideas_text", None)
def render_sidebar():
with st.sidebar:
st.title("🗂️ Conversas")
if st.button("➕ Nova Conversa", use_container_width=True, on_click=reset_to_home):
st.rerun()
st.markdown("---")
if st.session_state.conversations:
for cid, cdata in reversed(list(st.session_state.conversations.items())):
if st.button(cdata["title"], key=f"hist_{cid}", use_container_width=True):
st.session_state.messages = cdata["messages"]
st.session_state.current_conversation_id = cid
st.rerun()
def render_header(colors):
"""Renderiza o cabeçalho.
- Tela inicial (sem mensagens): título grande central.
- Conversa em andamento: versão compacta no canto superior esquerdo.
"""
has_messages = bool(st.session_state.get("messages"))
# Portada: manter o layout antigo com título centralizado
if not has_messages:
col_spacer, col_theme = st.columns([10, 2])
with col_theme:
c_text, c_btn = st.columns([2, 1])
with c_text:
st.markdown(
f"<div style='text-align:right; padding-top:10px; color:{colors['muted']}'>Tema</div>",
unsafe_allow_html=True,
)
with c_btn:
if st.button("☀️" if st.session_state.theme == "dark" else "🌙", key="theme_toggle"):
st.session_state.theme = "light" if st.session_state.theme == "dark" else "dark"
st.rerun()
st.markdown("<h1 class=\"main-title\">⚛ Chatbot NORM</h1>", unsafe_allow_html=True)
st.markdown(
"<p class=\"subtitle\">Assistente Especializado em Química e NORM</p><br>",
unsafe_allow_html=True,
)
return
# Conversa em andamento: cabeçalho compacto à esquerda
col_title, col_spacer, col_theme = st.columns([4, 6, 2])
with col_title:
st.button(
"Chatbot NORM",
key="header_home",
icon="⚛",
type="tertiary",
width="content",
help="Voltar à tela inicial",
on_click=reset_to_home,
)
with col_theme:
c_text, c_btn = st.columns([2, 1])
with c_text:
st.markdown(
f"<div style='text-align:right; padding-top:10px; color:{colors['muted']}'>Tema</div>",
unsafe_allow_html=True,
)
with c_btn:
if st.button("☀️" if st.session_state.theme == "dark" else "🌙", key="theme_toggle"):
st.session_state.theme = "light" if st.session_state.theme == "dark" else "dark"
st.rerun()
def render_chat_history():
"""Renders the chat history using custom HTML bubbles for a real chat app look."""
messages = st.session_state.messages
def render_conversation_download_button(message_idx: int):
try:
docx_data = build_conversation_docx(messages)
created_at = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"conversacao_chatbot_norm_{created_at}.docx"
left_spacer, button_col, right_spacer = st.columns([1, 4, 10])
with button_col:
st.download_button(
label="⬇️ Baixar conversa",
data=docx_data,
file_name=filename,
mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
key=f"download_chat_docx_{st.session_state.current_conversation_id}_{message_idx}",
)
except Exception:
st.caption("Não foi possível gerar o documento Word da conversa.")
last_assistant_index = None
for rev_idx in range(len(messages) - 1, -1, -1):
if messages[rev_idx].get("role") == "Assistente":
last_assistant_index = rev_idx
break
for idx, msg in enumerate(messages):
is_user = msg.get("role") == "Você"
avatar = "🗣️" if is_user else "⚛"
if is_user:
user_text_html = html.escape(normalize_message_text(msg.get("content", ""))).replace("\n", "<br>")
st.markdown(
f"""
<div class="chat-row user">
<div class="chat-avatar">{avatar}</div>
<div class="chat-bubble user">{user_text_html}</div>
</div>
""",
unsafe_allow_html=True,
)
else:
df, rest_text = render_markdown_table_if_exists(msg.get("content", ""))
rest_text_html = (
render_message_markdown_html(normalize_message_text(rest_text)) if rest_text else ""
)
download_inline_html = ""
if msg.get("show_last_response_download_button"):
try:
target_idx = msg.get("download_target_index")
target_msg = None
if isinstance(target_idx, int) and 0 <= target_idx < len(messages):
target_msg = messages[target_idx]
if target_msg is not None:
docx_data = build_single_response_docx(target_msg)
created_at = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"ultima_resposta_chatbot_{created_at}.docx"
b64_data = base64.b64encode(docx_data).decode("utf-8")
safe_filename = html.escape(filename, quote=True)
download_inline_html = (
"<div style='margin-top: 12px;'>"
f"<a class='download-inline-btn' href='data:application/vnd.openxmlformats-officedocument.wordprocessingml.document;base64,{b64_data}' "
f"download='{safe_filename}'>Baixar arquivo</a>"
"</div>"
)
except Exception:
st.caption("Não foi possível gerar o documento Word da última resposta.")
refs_html = (
f"<div style='margin-top: 15px; font-size: 0.85em; color: gray;'>📚 {msg['references']}</div>"
if msg.get("references")
else ""
)
assistant_bubble_html = (
f"<div class='chat-row assistant'>"
f"<div class='chat-avatar'>{avatar}</div>"
f"<div class='chat-bubble assistant'>"
f"{rest_text_html}"
f"{refs_html}"
f"{download_inline_html}"
"</div>"
"</div>"
)
st.markdown(assistant_bubble_html, unsafe_allow_html=True)
if df is not None:
left_indent, table_col = st.columns([1, 7])
with table_col:
st.table(df)
if last_assistant_index is not None and idx == last_assistant_index:
render_conversation_download_button(idx)
def _queue_suggestion_question(text: str):
st.session_state.pending_suggestion_question = text
def render_suggestions(colors):
if not st.session_state.messages:
st.markdown(
f"<div style='text-align:center; color:{colors['muted']}; margin-top:30px; margin-bottom:20px; font-size:1rem;'>Sugestões</div>",
unsafe_allow_html=True,
)
_, s_container, _ = st.columns([1, 4, 1])
with s_container:
cols = st.columns(4)
suggestions = [
"Liste os documentos indexados",
"O que é NORM?",
"Principais isótopos radioativos",
"Resumo sobre Césio-137",
]
for col, text in zip(cols, suggestions):
with col:
st.markdown('<div class="suggestion-btn">', unsafe_allow_html=True)
st.button(
text,
key=f"sugg_{text}",
use_container_width=True,
on_click=_queue_suggestion_question,
args=(text,),
)
st.markdown('</div>', unsafe_allow_html=True)
st.markdown(
"""
<div class="bottom-welcome">
<span class="welcome-icon">👋</span> <b>Olá! Como posso ajudá-lo hoje?</b><br>
Faça perguntas sobre química, NORM ou solicite resumos.
</div>
""",
unsafe_allow_html=True,
)
# =========================================================
# Conversaciones y flujo
# =========================================================
def formatar_referencias(fragmentos):
import re
refs_por_id = {}
for m in fragmentos:
cit_id = m.get("citation_id")
if not cit_id or cit_id in refs_por_id:
continue
titulo = m.get("document_title") or "Documento"
titulo = re.sub(r"\[\d+\]", "", titulo).strip().replace("_", " ").replace("-", " ")
autores = m.get("document_authors") or []
pub_year = m.get("publication_year")
pub_date = m.get("publication_date")
partes_ref = [f"[{cit_id}] {titulo}"]
if autores:
partes_ref.append("Autores: " + "; ".join(autores))
if pub_year:
partes_ref.append(f"Ano: {pub_year}")
if pub_date:
partes_ref.append(f"Data: {pub_date}")
refs_por_id[cit_id] = " | ".join(partes_ref)
partes = [refs_por_id[k] for k in sorted(refs_por_id.keys())]
return "<br>".join(partes)
def determine_mode(pergunta_lower: str) -> str:
import re
def contains_trigger(text: str, trigger: str) -> bool:
trigger_clean = (trigger or "").strip().lower()
if not text or not trigger_clean:
return False
pattern = re.escape(trigger_clean).replace(r"\ ", r"\s+")
return re.search(rf"(?<!\w){pattern}(?!\w)", text) is not None
def contains_any_trigger(text: str, triggers: List[str]) -> bool:
return any(contains_trigger(text, trigger) for trigger in triggers)
summary_keywords = ["resumo", "resumen", "summary"]
table_keywords = ["tabela", "tabla", "table", "gera uma tabela", "gerar tabela"]
doc_keywords = [
"documento",
"documentos",
"artigo",
"artigos",
"relatório",
"relatorio",
"norma",
"normas",
]
ideias_keywords = ["ideia", "ideias", "idea", "inovar"]
summary_sections_kw = [
"introdução",
"introducao",
"introduccion",
"introduction",
"metodologia",
"metodología",
"methodology",
"resultados",
"results",
]
section_summary_hints = ["seção", "secao", "secciones", "sections", "por seção", "por secao"]
multi_doc_markers = [
"esses documentos",
"documentos listados",
"documentos acima",
"documentos mostrados",
"os documentos",
"los documentos",
"todos",
"todas",
"all",
"ambos",
"both",
"os 3",
"los 3",
"3 documentos",
"3 docs",
]
has_summary = contains_any_trigger(pergunta_lower, summary_keywords)
has_table = contains_any_trigger(pergunta_lower, table_keywords)
has_section_target = contains_any_trigger(pergunta_lower, summary_sections_kw)
has_section_hint = contains_any_trigger(pergunta_lower, section_summary_hints)
has_doc_hint = contains_any_trigger(pergunta_lower, doc_keywords)
has_multi = contains_any_trigger(pergunta_lower, multi_doc_markers)
if contains_any_trigger(pergunta_lower, ideias_keywords):
return "gerar_ideias"
if has_table and has_multi:
return "table_multi"
if has_table:
return "table"
if has_summary and has_section_target and has_multi:
return "summary_sections_multi"
if has_summary and has_section_target:
return "summary_sections"
if has_section_target and has_section_hint:
return "summary_sections"
if has_summary and has_doc_hint:
return "summary"
return "chatbot"
def is_download_request(question_lower: str) -> bool:
download_terms = ["descargar", "download", "baixar"]
target_terms = [
"resumen",
"resumo",
"informacion",
"informação",
"informacao",
"informacion generada",
"informação gerada",
"informacao gerada",
"conversacion",
"conversação",
"conversacao",
"chat",
"tabela",
"table",
"resposta",
"respuesta",
]
has_download_action = any(term in question_lower for term in download_terms)
has_download_target = any(term in question_lower for term in target_terms)
return has_download_action and has_download_target
def find_last_assistant_response_index(messages: List[Dict[str, Any]]) -> Optional[int]:
for idx in range(len(messages) - 1, -1, -1):
msg = messages[idx]
if msg.get("role") != "Assistente":
continue
if msg.get("is_download_prompt_response"):
continue
if msg.get("show_last_response_download_button"):
continue
return idx
return None
def process_user_question(question: str):
# Se a pergunta já foi registrada no histórico e há um placeholder de
# digitação logo depois, não duplicamos a mensagem do usuário.
has_existing_user_message = (
bool(st.session_state.messages)
and st.session_state.messages[-1].get("role") == "Você"
and st.session_state.messages[-1].get("content") == question
)
has_user_before_typing_placeholder = (
len(st.session_state.messages) >= 2
and st.session_state.messages[-1].get("is_typing")
and st.session_state.messages[-2].get("role") == "Você"
and st.session_state.messages[-2].get("content") == question
)
if not (has_existing_user_message or has_user_before_typing_placeholder):
st.session_state.messages.append({"role": "Você", "content": question})
if st.session_state.current_conversation_id is None:
cid = datetime.now().strftime("%Y%m%d_%H%M%S")
st.session_state.current_conversation_id = cid
st.session_state.conversations[cid] = {
"title": question[:30],
"messages": [],
"created_at": datetime.now().isoformat(),
}
if is_download_request(question.lower()):
target_idx = find_last_assistant_response_index(st.session_state.messages[:-1])
has_target = target_idx is not None
response_text = (
"Claro. Aqui está o arquivo da última resposta gerada."
if has_target
else "Ainda não há uma resposta anterior do chatbot para baixar."
)
download_message = {
"role": "Assistente",
"content": response_text,
"references": "",
"show_last_response_download_button": has_target,
"download_target_index": target_idx,
"is_download_prompt_response": True,
}
if st.session_state.messages and st.session_state.messages[-1].get("is_typing"):
st.session_state.messages[-1] = download_message
else:
st.session_state.messages.append(download_message)
st.session_state.conversations[st.session_state.current_conversation_id][
"messages"
] = st.session_state.messages.copy()
return
# Mostrar o overlay "Analisando..." apenas na primeira resposta real
mode = determine_mode(question.lower())
resp_text, fragments = chamar_api(question, mode=mode)
refs_text = formatar_referencias(fragments)
assistant_message = {
"role": "Assistente",
"content": resp_text,
"references": refs_text,
"is_download_prompt_response": False,
}
if st.session_state.messages and st.session_state.messages[-1].get("is_typing"):
st.session_state.messages[-1] = assistant_message
else:
st.session_state.messages.append(assistant_message)
# Guardar modo e ideias para fluxos posteriores (ex.: gerar apresentação)
st.session_state["last_mode"] = mode
if mode == "gerar_ideias":
st.session_state["last_ideas_text"] = resp_text
else:
st.session_state.pop("last_ideas_text", None)
st.session_state.conversations[st.session_state.current_conversation_id][
"messages"
] = st.session_state.messages.copy()
def render_presentation_button():
"""Mostra um botão para gerar apresentação quando a última resposta foi de ideias."""
if not st.session_state.get("last_mode") == "gerar_ideias":
return
ideas_text = st.session_state.get("last_ideas_text") or ""
if not ideas_text.strip():
return
# Simples split por quebras de linha; se o formato das ideias
# mudar no futuro, podemos ajustar este parser.
ideas = [line.strip("-• ").strip() for line in ideas_text.splitlines() if line.strip()]
if not ideas:
return
with st.expander("📑 Gerar apresentação em PowerPoint a partir destas ideias"):
default_title = ideas[0][:80] if ideas else "Apresentação"
st.markdown(
"<p style='margin-bottom: 4px; font-size: 0.9rem;'>Título da apresentação</p>",
unsafe_allow_html=True,
)
title_col, _ = st.columns([3, 1])
with title_col:
# Mostra o título sugerido como texto estático, não editável,
# para não dar a impressão de ser um campo de chat.
safe_title = html.escape(default_title)
st.markdown(
f"<div class='ppt-title-display'>{safe_title}</div>",
unsafe_allow_html=True,
)
title = default_title
st.markdown("<div style='height: 8px;'></div>", unsafe_allow_html=True)
left_spacer, btn_col, right_spacer = st.columns([2, 3, 2])
with btn_col:
st.markdown('<div class="suggestion-btn">', unsafe_allow_html=True)
gerar = st.button(
"🎞️ Gerar apresentação em PPTX",
key="btn_generate_pptx",
use_container_width=True,
)
st.markdown('</div>', unsafe_allow_html=True)
if gerar:
with st.spinner("Gerando apresentação..."):
ok, msg, content = generate_presentation(ideas, title=title)
if not ok or content is None:
st.error(msg)
else:
st.success(msg)
st.download_button(
label="⬇️ Baixar apresentação em PowerPoint",
data=content,
file_name=f"{title or 'apresentacao'}.pptx",
mime="application/vnd.openxmlformats-officedocument.presentationml.presentation",
type="primary",
use_container_width=True,
)