import os
import html
import base64
from datetime import datetime
from typing import Tuple, List, Dict, Any, Optional
import pandas as pd
import streamlit as st
from utils import base_utils as bu
from utils.conversation_word_export import build_conversation_docx, build_single_response_docx
# Ruta base relativa a este archivo (para localizar styles.css)
ROOT = os.path.dirname(__file__)
CONFIG = bu.load_config("configs/config.json")
API_URL = CONFIG.get("ui", {}).get("api_url", "http://127.0.0.1:8000/query")
BASE_API_URL = API_URL.rsplit("/", 1)[0]
UPLOAD_URL = BASE_API_URL + "/upload_document"
PRECHECK_URL = BASE_API_URL + "/precheck_document"
PRESENTATION_URL = BASE_API_URL + "/generate_presentation"
# =========================================================
# API helpers
# =========================================================
def chamar_api(pergunta: str, mode: str = "chatbot") -> Tuple[str, List[Dict[str, Any]]]:
"""Chama a API apenas com a pergunta e o modo."""
import requests
try:
payload = {"question": pergunta, "mode": mode}
resp = requests.post(API_URL, json=payload, timeout=60)
resp.raise_for_status()
data = resp.json()
return data["answer"], data.get("retrieved", [])
except Exception:
return "Resposta simulada (API não conectada).", []
def upload_document(
filename: str, file_bytes: bytes, mime_type: str = "text/markdown"
) -> Tuple[bool, str, Optional[str]]:
"""Envia um arquivo para upload/indexação no backend."""
import requests
try:
files = {"file": (filename, file_bytes, mime_type or "text/markdown")}
resp = requests.post(UPLOAD_URL, files=files, timeout=180)
data = resp.json() if resp.content else {}
if resp.status_code >= 400:
detail = data.get("detail") if isinstance(data, dict) else None
return False, detail or "Falha no upload/indexação.", None
return (
True,
data.get("message", "Documento indexado com sucesso."),
data.get("document_id"),
)
except Exception as exc:
return False, f"Erro de conexão com API: {exc}", None
def precheck_document(
filename: str, file_bytes: bytes, mime_type: str = "text/markdown"
) -> Tuple[bool, Dict[str, Any], str]:
"""Avalia se o arquivo está no escopo temático antes de indexar."""
import requests
try:
files = {"file": (filename, file_bytes, mime_type or "text/markdown")}
resp = requests.post(PRECHECK_URL, files=files, timeout=60)
data = resp.json() if resp.content else {}
if resp.status_code >= 400:
detail = data.get("detail") if isinstance(data, dict) else None
return False, {}, detail or "Falha no pré-chequeo do documento."
if not isinstance(data, dict):
return False, {}, "Resposta inválida no pré-chequeo do documento."
return True, data, ""
except Exception as exc:
return False, {}, f"Erro de conexão no pré-chequeo: {exc}"
def generate_presentation(ideas: List[str], title: str | None = None) -> Tuple[bool, str, bytes | None]:
"""Chama o endpoint de geração de apresentação e devolve o binário do PPTX.
Retorna (ok, mensagem, conteúdo_bytes_ou_None).
"""
import requests
try:
payload: Dict[str, Any] = {
"ideas": ideas,
"title": title,
}
resp = requests.post(PRESENTATION_URL, json=payload, timeout=180)
if resp.status_code >= 400:
try:
data = resp.json()
detail = data.get("detail") if isinstance(data, dict) else None
except Exception:
detail = None
return False, detail or "Falha ao gerar apresentação.", None
return True, "Apresentação gerada com sucesso.", resp.content
except Exception as exc:
return False, f"Erro de conexão com API: {exc}", None
# =========================================================
# Texto y tablas
# =========================================================
def normalize_message_text(text: str) -> str:
import re
lines = text.splitlines()
result, blank_count = [], 0
is_list_line = lambda l: bool(re.match(r"^\s*(\d+\.|[-*•])\s+", l))
i = 0
while i < len(lines):
line = lines[i]
if line.strip() == "":
blank_count += 1
i += 1
continue
if re.match(r"^\s*\d+\.\s*$", line):
j = i + 1
while j < len(lines) and lines[j].strip() == "":
j += 1
if j < len(lines):
line = f"{line.strip()} {lines[j].strip()}"
i = j
if blank_count > 0 and result:
if not (is_list_line(result[-1]) and is_list_line(line)):
result.append("")
blank_count = 0
result.append(line)
i += 1
cleaned = "\n".join(result)
return re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned).strip()
def render_markdown_table_if_exists(text: str):
import re
lines = text.splitlines()
start, end = None, None
is_md_separator = lambda l: "|" in l.strip() and re.fullmatch(r"[\s\-|:]+", l.strip()) and "-" in l.strip()
for i, line in enumerate(lines):
if "|" in line:
for j in range(i + 1, min(i + 3, len(lines))):
if is_md_separator(lines[j]):
start = i
break
if start is not None:
end = start
for k in range(start, len(lines)):
if "|" in lines[k] or is_md_separator(lines[k]):
end = k
else:
break
break
if start is None or end is None:
return None, text
table_lines = lines[start : end + 1]
header, rows = None, []
for line in table_lines:
if is_md_separator(line):
continue
cells = [c.strip() for c in line.strip().strip("|").split("|")]
if header is None:
header = cells
else:
rows.append(cells)
if header is None:
return None, text
max_cols = max([len(header)] + [len(r) for r in rows]) if rows else len(header)
header += [""] * (max_cols - len(header))
norm_rows = [r + [""] * (max_cols - len(r)) for r in rows]
try:
df = pd.DataFrame(norm_rows, columns=header)
except Exception:
return None, text
rest_text = normalize_message_text("\n".join(lines[:start] + lines[end + 1 :]))
return df, rest_text
def render_message_markdown_html(text: str) -> str:
import re
lines = (text or "").splitlines()
parts = []
for line in lines:
raw = line.strip()
if not raw:
parts.append("
")
continue
h3 = re.match(r"^###\s+(.*)$", raw)
h2 = re.match(r"^##\s+(.*)$", raw)
h1 = re.match(r"^#\s+(.*)$", raw)
if h3:
parts.append(f"
{html.escape(h3.group(1))}
")
elif h2:
parts.append(f"{html.escape(h2.group(1))}
")
elif h1:
parts.append(f"{html.escape(h1.group(1))}
")
else:
parts.append(html.escape(raw))
rendered = "
".join(parts)
return re.sub(r"(?:
\s*){3,}", "
", rendered)
# =========================================================
# Estilos (CSS)
# =========================================================
THEMES = {
"dark": {
"bg": "#1e1e1e",
"panel": "#2b2d31",
"input_bg": "#303134",
"text_primary": "#ffffff",
"muted": "#9ca3af",
"accent": "#f31260",
"chip_bg": "#2b2d31",
"chip_border": "#3b3f46",
"chip_hover": "#343740",
"sidebar_bg": "#171717",
"hover_text": "#ffffff",
},
"light": {
"bg": "#f5f5f7",
"panel": "#ffffff",
"input_bg": "#f0f1f3",
"text_primary": "#111827",
"muted": "#6b7280",
"accent": "#f31260",
"chip_bg": "#eef0f4",
"chip_border": "#d1d5db",
"chip_hover": "#e5e7eb",
"sidebar_bg": "#ffffff",
"hover_text": "#111827",
},
}
def apply_theme_and_css():
"""Aplica tema (variables CSS) y carga el archivo styles.css del frontend."""
if "theme" not in st.session_state:
st.session_state["theme"] = "dark"
theme_name = st.session_state["theme"] if st.session_state["theme"] in THEMES else "dark"
colors = THEMES[theme_name]
# 1) Definir variables CSS en :root
css_vars = "\n".join(f"--{k}: {v};" for k, v in colors.items())
st.markdown(
f"""
""",
unsafe_allow_html=True,
)
# 2) Inyectar CSS estático desde styles.css (misma carpeta)
css_path = os.path.join(ROOT, "styles.css")
try:
with open(css_path, "r", encoding="utf-8") as f:
styles = f.read()
st.markdown(f"", unsafe_allow_html=True)
except FileNotFoundError:
# En caso de que el archivo no exista, no rompemos la app.
pass
return colors
# =========================================================
# UI Components
# =========================================================
def reset_to_home():
st.session_state.messages = []
st.session_state.current_conversation_id = None
st.session_state.pending_question = ""
st.session_state.pending_suggestion_question = ""
st.session_state.pending_chat_submission = None
st.session_state.pending_upload_confirmation = None
st.session_state.pop("last_mode", None)
st.session_state.pop("last_ideas_text", None)
def render_sidebar():
with st.sidebar:
st.title("🗂️ Conversas")
if st.button("➕ Nova Conversa", use_container_width=True, on_click=reset_to_home):
st.rerun()
st.markdown("---")
if st.session_state.conversations:
for cid, cdata in reversed(list(st.session_state.conversations.items())):
if st.button(cdata["title"], key=f"hist_{cid}", use_container_width=True):
st.session_state.messages = cdata["messages"]
st.session_state.current_conversation_id = cid
st.rerun()
def render_header(colors):
"""Renderiza o cabeçalho.
- Tela inicial (sem mensagens): título grande central.
- Conversa em andamento: versão compacta no canto superior esquerdo.
"""
has_messages = bool(st.session_state.get("messages"))
# Portada: manter o layout antigo com título centralizado
if not has_messages:
col_spacer, col_theme = st.columns([10, 2])
with col_theme:
c_text, c_btn = st.columns([2, 1])
with c_text:
st.markdown(
f"Tema
",
unsafe_allow_html=True,
)
with c_btn:
if st.button("☀️" if st.session_state.theme == "dark" else "🌙", key="theme_toggle"):
st.session_state.theme = "light" if st.session_state.theme == "dark" else "dark"
st.rerun()
st.markdown("⚛ Chatbot NORM
", unsafe_allow_html=True)
st.markdown(
"Assistente Especializado em Química e NORM
",
unsafe_allow_html=True,
)
return
# Conversa em andamento: cabeçalho compacto à esquerda
col_title, col_spacer, col_theme = st.columns([4, 6, 2])
with col_title:
st.button(
"Chatbot NORM",
key="header_home",
icon="⚛",
type="tertiary",
width="content",
help="Voltar à tela inicial",
on_click=reset_to_home,
)
with col_theme:
c_text, c_btn = st.columns([2, 1])
with c_text:
st.markdown(
f"Tema
",
unsafe_allow_html=True,
)
with c_btn:
if st.button("☀️" if st.session_state.theme == "dark" else "🌙", key="theme_toggle"):
st.session_state.theme = "light" if st.session_state.theme == "dark" else "dark"
st.rerun()
def render_chat_history():
"""Renders the chat history using custom HTML bubbles for a real chat app look."""
messages = st.session_state.messages
def render_conversation_download_button(message_idx: int):
try:
docx_data = build_conversation_docx(messages)
created_at = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"conversacao_chatbot_norm_{created_at}.docx"
left_spacer, button_col, right_spacer = st.columns([1, 4, 10])
with button_col:
st.download_button(
label="⬇️ Baixar conversa",
data=docx_data,
file_name=filename,
mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
key=f"download_chat_docx_{st.session_state.current_conversation_id}_{message_idx}",
)
except Exception:
st.caption("Não foi possível gerar o documento Word da conversa.")
last_assistant_index = None
for rev_idx in range(len(messages) - 1, -1, -1):
if messages[rev_idx].get("role") == "Assistente":
last_assistant_index = rev_idx
break
for idx, msg in enumerate(messages):
is_user = msg.get("role") == "Você"
avatar = "🗣️" if is_user else "⚛"
if is_user:
user_text_html = html.escape(normalize_message_text(msg.get("content", ""))).replace("\n", "
")
st.markdown(
f"""
{avatar}
{user_text_html}
""",
unsafe_allow_html=True,
)
else:
df, rest_text = render_markdown_table_if_exists(msg.get("content", ""))
rest_text_html = (
render_message_markdown_html(normalize_message_text(rest_text)) if rest_text else ""
)
download_inline_html = ""
if msg.get("show_last_response_download_button"):
try:
target_idx = msg.get("download_target_index")
target_msg = None
if isinstance(target_idx, int) and 0 <= target_idx < len(messages):
target_msg = messages[target_idx]
if target_msg is not None:
docx_data = build_single_response_docx(target_msg)
created_at = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"ultima_resposta_chatbot_{created_at}.docx"
b64_data = base64.b64encode(docx_data).decode("utf-8")
safe_filename = html.escape(filename, quote=True)
download_inline_html = (
""
)
except Exception:
st.caption("Não foi possível gerar o documento Word da última resposta.")
refs_html = (
f"📚 {msg['references']}
"
if msg.get("references")
else ""
)
assistant_bubble_html = (
f""
f"
{avatar}
"
f"
"
f"{rest_text_html}"
f"{refs_html}"
f"{download_inline_html}"
"
"
"
"
)
st.markdown(assistant_bubble_html, unsafe_allow_html=True)
if df is not None:
left_indent, table_col = st.columns([1, 7])
with table_col:
st.table(df)
if last_assistant_index is not None and idx == last_assistant_index:
render_conversation_download_button(idx)
def _queue_suggestion_question(text: str):
st.session_state.pending_suggestion_question = text
def render_suggestions(colors):
if not st.session_state.messages:
st.markdown(
f"Sugestões
",
unsafe_allow_html=True,
)
_, s_container, _ = st.columns([1, 4, 1])
with s_container:
cols = st.columns(4)
suggestions = [
"Liste os documentos indexados",
"O que é NORM?",
"Principais isótopos radioativos",
"Resumo sobre Césio-137",
]
for col, text in zip(cols, suggestions):
with col:
st.markdown('', unsafe_allow_html=True)
st.button(
text,
key=f"sugg_{text}",
use_container_width=True,
on_click=_queue_suggestion_question,
args=(text,),
)
st.markdown('
', unsafe_allow_html=True)
st.markdown(
"""
👋 Olá! Como posso ajudá-lo hoje?
Faça perguntas sobre química, NORM ou solicite resumos.
""",
unsafe_allow_html=True,
)
# =========================================================
# Conversaciones y flujo
# =========================================================
def formatar_referencias(fragmentos):
import re
refs_por_id = {}
for m in fragmentos:
cit_id = m.get("citation_id")
if not cit_id or cit_id in refs_por_id:
continue
titulo = m.get("document_title") or "Documento"
titulo = re.sub(r"\[\d+\]", "", titulo).strip().replace("_", " ").replace("-", " ")
autores = m.get("document_authors") or []
pub_year = m.get("publication_year")
pub_date = m.get("publication_date")
partes_ref = [f"[{cit_id}] {titulo}"]
if autores:
partes_ref.append("Autores: " + "; ".join(autores))
if pub_year:
partes_ref.append(f"Ano: {pub_year}")
if pub_date:
partes_ref.append(f"Data: {pub_date}")
refs_por_id[cit_id] = " | ".join(partes_ref)
partes = [refs_por_id[k] for k in sorted(refs_por_id.keys())]
return "
".join(partes)
def determine_mode(pergunta_lower: str) -> str:
import re
def contains_trigger(text: str, trigger: str) -> bool:
trigger_clean = (trigger or "").strip().lower()
if not text or not trigger_clean:
return False
pattern = re.escape(trigger_clean).replace(r"\ ", r"\s+")
return re.search(rf"(? bool:
return any(contains_trigger(text, trigger) for trigger in triggers)
summary_keywords = ["resumo", "resumen", "summary"]
table_keywords = ["tabela", "tabla", "table", "gera uma tabela", "gerar tabela"]
doc_keywords = [
"documento",
"documentos",
"artigo",
"artigos",
"relatório",
"relatorio",
"norma",
"normas",
]
ideias_keywords = ["ideia", "ideias", "idea", "inovar"]
summary_sections_kw = [
"introdução",
"introducao",
"introduccion",
"introduction",
"metodologia",
"metodología",
"methodology",
"resultados",
"results",
]
section_summary_hints = ["seção", "secao", "secciones", "sections", "por seção", "por secao"]
multi_doc_markers = [
"esses documentos",
"documentos listados",
"documentos acima",
"documentos mostrados",
"os documentos",
"los documentos",
"todos",
"todas",
"all",
"ambos",
"both",
"os 3",
"los 3",
"3 documentos",
"3 docs",
]
has_summary = contains_any_trigger(pergunta_lower, summary_keywords)
has_table = contains_any_trigger(pergunta_lower, table_keywords)
has_section_target = contains_any_trigger(pergunta_lower, summary_sections_kw)
has_section_hint = contains_any_trigger(pergunta_lower, section_summary_hints)
has_doc_hint = contains_any_trigger(pergunta_lower, doc_keywords)
has_multi = contains_any_trigger(pergunta_lower, multi_doc_markers)
if contains_any_trigger(pergunta_lower, ideias_keywords):
return "gerar_ideias"
if has_table and has_multi:
return "table_multi"
if has_table:
return "table"
if has_summary and has_section_target and has_multi:
return "summary_sections_multi"
if has_summary and has_section_target:
return "summary_sections"
if has_section_target and has_section_hint:
return "summary_sections"
if has_summary and has_doc_hint:
return "summary"
return "chatbot"
def is_download_request(question_lower: str) -> bool:
download_terms = ["descargar", "download", "baixar"]
target_terms = [
"resumen",
"resumo",
"informacion",
"informação",
"informacao",
"informacion generada",
"informação gerada",
"informacao gerada",
"conversacion",
"conversação",
"conversacao",
"chat",
"tabela",
"table",
"resposta",
"respuesta",
]
has_download_action = any(term in question_lower for term in download_terms)
has_download_target = any(term in question_lower for term in target_terms)
return has_download_action and has_download_target
def find_last_assistant_response_index(messages: List[Dict[str, Any]]) -> Optional[int]:
for idx in range(len(messages) - 1, -1, -1):
msg = messages[idx]
if msg.get("role") != "Assistente":
continue
if msg.get("is_download_prompt_response"):
continue
if msg.get("show_last_response_download_button"):
continue
return idx
return None
def process_user_question(question: str):
# Se a pergunta já foi registrada no histórico e há um placeholder de
# digitação logo depois, não duplicamos a mensagem do usuário.
has_existing_user_message = (
bool(st.session_state.messages)
and st.session_state.messages[-1].get("role") == "Você"
and st.session_state.messages[-1].get("content") == question
)
has_user_before_typing_placeholder = (
len(st.session_state.messages) >= 2
and st.session_state.messages[-1].get("is_typing")
and st.session_state.messages[-2].get("role") == "Você"
and st.session_state.messages[-2].get("content") == question
)
if not (has_existing_user_message or has_user_before_typing_placeholder):
st.session_state.messages.append({"role": "Você", "content": question})
if st.session_state.current_conversation_id is None:
cid = datetime.now().strftime("%Y%m%d_%H%M%S")
st.session_state.current_conversation_id = cid
st.session_state.conversations[cid] = {
"title": question[:30],
"messages": [],
"created_at": datetime.now().isoformat(),
}
if is_download_request(question.lower()):
target_idx = find_last_assistant_response_index(st.session_state.messages[:-1])
has_target = target_idx is not None
response_text = (
"Claro. Aqui está o arquivo da última resposta gerada."
if has_target
else "Ainda não há uma resposta anterior do chatbot para baixar."
)
download_message = {
"role": "Assistente",
"content": response_text,
"references": "",
"show_last_response_download_button": has_target,
"download_target_index": target_idx,
"is_download_prompt_response": True,
}
if st.session_state.messages and st.session_state.messages[-1].get("is_typing"):
st.session_state.messages[-1] = download_message
else:
st.session_state.messages.append(download_message)
st.session_state.conversations[st.session_state.current_conversation_id][
"messages"
] = st.session_state.messages.copy()
return
# Mostrar o overlay "Analisando..." apenas na primeira resposta real
mode = determine_mode(question.lower())
resp_text, fragments = chamar_api(question, mode=mode)
refs_text = formatar_referencias(fragments)
assistant_message = {
"role": "Assistente",
"content": resp_text,
"references": refs_text,
"is_download_prompt_response": False,
}
if st.session_state.messages and st.session_state.messages[-1].get("is_typing"):
st.session_state.messages[-1] = assistant_message
else:
st.session_state.messages.append(assistant_message)
# Guardar modo e ideias para fluxos posteriores (ex.: gerar apresentação)
st.session_state["last_mode"] = mode
if mode == "gerar_ideias":
st.session_state["last_ideas_text"] = resp_text
else:
st.session_state.pop("last_ideas_text", None)
st.session_state.conversations[st.session_state.current_conversation_id][
"messages"
] = st.session_state.messages.copy()
def render_presentation_button():
"""Mostra um botão para gerar apresentação quando a última resposta foi de ideias."""
if not st.session_state.get("last_mode") == "gerar_ideias":
return
ideas_text = st.session_state.get("last_ideas_text") or ""
if not ideas_text.strip():
return
# Simples split por quebras de linha; se o formato das ideias
# mudar no futuro, podemos ajustar este parser.
ideas = [line.strip("-• ").strip() for line in ideas_text.splitlines() if line.strip()]
if not ideas:
return
with st.expander("📑 Gerar apresentação em PowerPoint a partir destas ideias"):
default_title = ideas[0][:80] if ideas else "Apresentação"
st.markdown(
"Título da apresentação
",
unsafe_allow_html=True,
)
title_col, _ = st.columns([3, 1])
with title_col:
# Mostra o título sugerido como texto estático, não editável,
# para não dar a impressão de ser um campo de chat.
safe_title = html.escape(default_title)
st.markdown(
f"{safe_title}
",
unsafe_allow_html=True,
)
title = default_title
st.markdown("", unsafe_allow_html=True)
left_spacer, btn_col, right_spacer = st.columns([2, 3, 2])
with btn_col:
st.markdown('', unsafe_allow_html=True)
gerar = st.button(
"🎞️ Gerar apresentação em PPTX",
key="btn_generate_pptx",
use_container_width=True,
)
st.markdown('
', unsafe_allow_html=True)
if gerar:
with st.spinner("Gerando apresentação..."):
ok, msg, content = generate_presentation(ideas, title=title)
if not ok or content is None:
st.error(msg)
else:
st.success(msg)
st.download_button(
label="⬇️ Baixar apresentação em PowerPoint",
data=content,
file_name=f"{title or 'apresentacao'}.pptx",
mime="application/vnd.openxmlformats-officedocument.presentationml.presentation",
type="primary",
use_container_width=True,
)