import os import html import base64 from datetime import datetime from typing import Tuple, List, Dict, Any, Optional import pandas as pd import streamlit as st from utils import base_utils as bu from utils.conversation_word_export import build_conversation_docx, build_single_response_docx # Ruta base relativa a este archivo (para localizar styles.css) ROOT = os.path.dirname(__file__) CONFIG = bu.load_config("configs/config.json") API_URL = CONFIG.get("ui", {}).get("api_url", "http://127.0.0.1:8000/query") BASE_API_URL = API_URL.rsplit("/", 1)[0] UPLOAD_URL = BASE_API_URL + "/upload_document" PRECHECK_URL = BASE_API_URL + "/precheck_document" PRESENTATION_URL = BASE_API_URL + "/generate_presentation" # ========================================================= # API helpers # ========================================================= def chamar_api(pergunta: str, mode: str = "chatbot") -> Tuple[str, List[Dict[str, Any]]]: """Chama a API apenas com a pergunta e o modo.""" import requests try: payload = {"question": pergunta, "mode": mode} resp = requests.post(API_URL, json=payload, timeout=60) resp.raise_for_status() data = resp.json() return data["answer"], data.get("retrieved", []) except Exception: return "Resposta simulada (API não conectada).", [] def upload_document( filename: str, file_bytes: bytes, mime_type: str = "text/markdown" ) -> Tuple[bool, str, Optional[str]]: """Envia um arquivo para upload/indexação no backend.""" import requests try: files = {"file": (filename, file_bytes, mime_type or "text/markdown")} resp = requests.post(UPLOAD_URL, files=files, timeout=180) data = resp.json() if resp.content else {} if resp.status_code >= 400: detail = data.get("detail") if isinstance(data, dict) else None return False, detail or "Falha no upload/indexação.", None return ( True, data.get("message", "Documento indexado com sucesso."), data.get("document_id"), ) except Exception as exc: return False, f"Erro de conexão com API: {exc}", None def precheck_document( filename: str, file_bytes: bytes, mime_type: str = "text/markdown" ) -> Tuple[bool, Dict[str, Any], str]: """Avalia se o arquivo está no escopo temático antes de indexar.""" import requests try: files = {"file": (filename, file_bytes, mime_type or "text/markdown")} resp = requests.post(PRECHECK_URL, files=files, timeout=60) data = resp.json() if resp.content else {} if resp.status_code >= 400: detail = data.get("detail") if isinstance(data, dict) else None return False, {}, detail or "Falha no pré-chequeo do documento." if not isinstance(data, dict): return False, {}, "Resposta inválida no pré-chequeo do documento." return True, data, "" except Exception as exc: return False, {}, f"Erro de conexão no pré-chequeo: {exc}" def generate_presentation(ideas: List[str], title: str | None = None) -> Tuple[bool, str, bytes | None]: """Chama o endpoint de geração de apresentação e devolve o binário do PPTX. Retorna (ok, mensagem, conteúdo_bytes_ou_None). """ import requests try: payload: Dict[str, Any] = { "ideas": ideas, "title": title, } resp = requests.post(PRESENTATION_URL, json=payload, timeout=180) if resp.status_code >= 400: try: data = resp.json() detail = data.get("detail") if isinstance(data, dict) else None except Exception: detail = None return False, detail or "Falha ao gerar apresentação.", None return True, "Apresentação gerada com sucesso.", resp.content except Exception as exc: return False, f"Erro de conexão com API: {exc}", None # ========================================================= # Texto y tablas # ========================================================= def normalize_message_text(text: str) -> str: import re lines = text.splitlines() result, blank_count = [], 0 is_list_line = lambda l: bool(re.match(r"^\s*(\d+\.|[-*•])\s+", l)) i = 0 while i < len(lines): line = lines[i] if line.strip() == "": blank_count += 1 i += 1 continue if re.match(r"^\s*\d+\.\s*$", line): j = i + 1 while j < len(lines) and lines[j].strip() == "": j += 1 if j < len(lines): line = f"{line.strip()} {lines[j].strip()}" i = j if blank_count > 0 and result: if not (is_list_line(result[-1]) and is_list_line(line)): result.append("") blank_count = 0 result.append(line) i += 1 cleaned = "\n".join(result) return re.sub(r"\n\s*\n\s*\n+", "\n\n", cleaned).strip() def render_markdown_table_if_exists(text: str): import re lines = text.splitlines() start, end = None, None is_md_separator = lambda l: "|" in l.strip() and re.fullmatch(r"[\s\-|:]+", l.strip()) and "-" in l.strip() for i, line in enumerate(lines): if "|" in line: for j in range(i + 1, min(i + 3, len(lines))): if is_md_separator(lines[j]): start = i break if start is not None: end = start for k in range(start, len(lines)): if "|" in lines[k] or is_md_separator(lines[k]): end = k else: break break if start is None or end is None: return None, text table_lines = lines[start : end + 1] header, rows = None, [] for line in table_lines: if is_md_separator(line): continue cells = [c.strip() for c in line.strip().strip("|").split("|")] if header is None: header = cells else: rows.append(cells) if header is None: return None, text max_cols = max([len(header)] + [len(r) for r in rows]) if rows else len(header) header += [""] * (max_cols - len(header)) norm_rows = [r + [""] * (max_cols - len(r)) for r in rows] try: df = pd.DataFrame(norm_rows, columns=header) except Exception: return None, text rest_text = normalize_message_text("\n".join(lines[:start] + lines[end + 1 :])) return df, rest_text def render_message_markdown_html(text: str) -> str: import re lines = (text or "").splitlines() parts = [] for line in lines: raw = line.strip() if not raw: parts.append("
") continue h3 = re.match(r"^###\s+(.*)$", raw) h2 = re.match(r"^##\s+(.*)$", raw) h1 = re.match(r"^#\s+(.*)$", raw) if h3: parts.append(f"

{html.escape(h3.group(1))}

") elif h2: parts.append(f"

{html.escape(h2.group(1))}

") elif h1: parts.append(f"

{html.escape(h1.group(1))}

") else: parts.append(html.escape(raw)) rendered = "
".join(parts) return re.sub(r"(?:
\s*){3,}", "

", rendered) # ========================================================= # Estilos (CSS) # ========================================================= THEMES = { "dark": { "bg": "#1e1e1e", "panel": "#2b2d31", "input_bg": "#303134", "text_primary": "#ffffff", "muted": "#9ca3af", "accent": "#f31260", "chip_bg": "#2b2d31", "chip_border": "#3b3f46", "chip_hover": "#343740", "sidebar_bg": "#171717", "hover_text": "#ffffff", }, "light": { "bg": "#f5f5f7", "panel": "#ffffff", "input_bg": "#f0f1f3", "text_primary": "#111827", "muted": "#6b7280", "accent": "#f31260", "chip_bg": "#eef0f4", "chip_border": "#d1d5db", "chip_hover": "#e5e7eb", "sidebar_bg": "#ffffff", "hover_text": "#111827", }, } def apply_theme_and_css(): """Aplica tema (variables CSS) y carga el archivo styles.css del frontend.""" if "theme" not in st.session_state: st.session_state["theme"] = "dark" theme_name = st.session_state["theme"] if st.session_state["theme"] in THEMES else "dark" colors = THEMES[theme_name] # 1) Definir variables CSS en :root css_vars = "\n".join(f"--{k}: {v};" for k, v in colors.items()) st.markdown( f""" """, unsafe_allow_html=True, ) # 2) Inyectar CSS estático desde styles.css (misma carpeta) css_path = os.path.join(ROOT, "styles.css") try: with open(css_path, "r", encoding="utf-8") as f: styles = f.read() st.markdown(f"", unsafe_allow_html=True) except FileNotFoundError: # En caso de que el archivo no exista, no rompemos la app. pass return colors # ========================================================= # UI Components # ========================================================= def reset_to_home(): st.session_state.messages = [] st.session_state.current_conversation_id = None st.session_state.pending_question = "" st.session_state.pending_suggestion_question = "" st.session_state.pending_chat_submission = None st.session_state.pending_upload_confirmation = None st.session_state.pop("last_mode", None) st.session_state.pop("last_ideas_text", None) def render_sidebar(): with st.sidebar: st.title("🗂️ Conversas") if st.button("➕ Nova Conversa", use_container_width=True, on_click=reset_to_home): st.rerun() st.markdown("---") if st.session_state.conversations: for cid, cdata in reversed(list(st.session_state.conversations.items())): if st.button(cdata["title"], key=f"hist_{cid}", use_container_width=True): st.session_state.messages = cdata["messages"] st.session_state.current_conversation_id = cid st.rerun() def render_header(colors): """Renderiza o cabeçalho. - Tela inicial (sem mensagens): título grande central. - Conversa em andamento: versão compacta no canto superior esquerdo. """ has_messages = bool(st.session_state.get("messages")) # Portada: manter o layout antigo com título centralizado if not has_messages: col_spacer, col_theme = st.columns([10, 2]) with col_theme: c_text, c_btn = st.columns([2, 1]) with c_text: st.markdown( f"
Tema
", unsafe_allow_html=True, ) with c_btn: if st.button("☀️" if st.session_state.theme == "dark" else "🌙", key="theme_toggle"): st.session_state.theme = "light" if st.session_state.theme == "dark" else "dark" st.rerun() st.markdown("

⚛ Chatbot NORM

", unsafe_allow_html=True) st.markdown( "

Assistente Especializado em Química e NORM


", unsafe_allow_html=True, ) return # Conversa em andamento: cabeçalho compacto à esquerda col_title, col_spacer, col_theme = st.columns([4, 6, 2]) with col_title: st.button( "Chatbot NORM", key="header_home", icon="⚛", type="tertiary", width="content", help="Voltar à tela inicial", on_click=reset_to_home, ) with col_theme: c_text, c_btn = st.columns([2, 1]) with c_text: st.markdown( f"
Tema
", unsafe_allow_html=True, ) with c_btn: if st.button("☀️" if st.session_state.theme == "dark" else "🌙", key="theme_toggle"): st.session_state.theme = "light" if st.session_state.theme == "dark" else "dark" st.rerun() def render_chat_history(): """Renders the chat history using custom HTML bubbles for a real chat app look.""" messages = st.session_state.messages def render_conversation_download_button(message_idx: int): try: docx_data = build_conversation_docx(messages) created_at = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"conversacao_chatbot_norm_{created_at}.docx" left_spacer, button_col, right_spacer = st.columns([1, 4, 10]) with button_col: st.download_button( label="⬇️ Baixar conversa", data=docx_data, file_name=filename, mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", key=f"download_chat_docx_{st.session_state.current_conversation_id}_{message_idx}", ) except Exception: st.caption("Não foi possível gerar o documento Word da conversa.") last_assistant_index = None for rev_idx in range(len(messages) - 1, -1, -1): if messages[rev_idx].get("role") == "Assistente": last_assistant_index = rev_idx break for idx, msg in enumerate(messages): is_user = msg.get("role") == "Você" avatar = "🗣️" if is_user else "⚛" if is_user: user_text_html = html.escape(normalize_message_text(msg.get("content", ""))).replace("\n", "
") st.markdown( f"""
{avatar}
{user_text_html}
""", unsafe_allow_html=True, ) else: df, rest_text = render_markdown_table_if_exists(msg.get("content", "")) rest_text_html = ( render_message_markdown_html(normalize_message_text(rest_text)) if rest_text else "" ) download_inline_html = "" if msg.get("show_last_response_download_button"): try: target_idx = msg.get("download_target_index") target_msg = None if isinstance(target_idx, int) and 0 <= target_idx < len(messages): target_msg = messages[target_idx] if target_msg is not None: docx_data = build_single_response_docx(target_msg) created_at = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"ultima_resposta_chatbot_{created_at}.docx" b64_data = base64.b64encode(docx_data).decode("utf-8") safe_filename = html.escape(filename, quote=True) download_inline_html = ( "
" f"Baixar arquivo" "
" ) except Exception: st.caption("Não foi possível gerar o documento Word da última resposta.") refs_html = ( f"
📚 {msg['references']}
" if msg.get("references") else "" ) assistant_bubble_html = ( f"
" f"
{avatar}
" f"
" f"{rest_text_html}" f"{refs_html}" f"{download_inline_html}" "
" "
" ) st.markdown(assistant_bubble_html, unsafe_allow_html=True) if df is not None: left_indent, table_col = st.columns([1, 7]) with table_col: st.table(df) if last_assistant_index is not None and idx == last_assistant_index: render_conversation_download_button(idx) def _queue_suggestion_question(text: str): st.session_state.pending_suggestion_question = text def render_suggestions(colors): if not st.session_state.messages: st.markdown( f"
Sugestões
", unsafe_allow_html=True, ) _, s_container, _ = st.columns([1, 4, 1]) with s_container: cols = st.columns(4) suggestions = [ "Liste os documentos indexados", "O que é NORM?", "Principais isótopos radioativos", "Resumo sobre Césio-137", ] for col, text in zip(cols, suggestions): with col: st.markdown('
', unsafe_allow_html=True) st.button( text, key=f"sugg_{text}", use_container_width=True, on_click=_queue_suggestion_question, args=(text,), ) st.markdown('
', unsafe_allow_html=True) st.markdown( """
👋 Olá! Como posso ajudá-lo hoje?
Faça perguntas sobre química, NORM ou solicite resumos.
""", unsafe_allow_html=True, ) # ========================================================= # Conversaciones y flujo # ========================================================= def formatar_referencias(fragmentos): import re refs_por_id = {} for m in fragmentos: cit_id = m.get("citation_id") if not cit_id or cit_id in refs_por_id: continue titulo = m.get("document_title") or "Documento" titulo = re.sub(r"\[\d+\]", "", titulo).strip().replace("_", " ").replace("-", " ") autores = m.get("document_authors") or [] pub_year = m.get("publication_year") pub_date = m.get("publication_date") partes_ref = [f"[{cit_id}] {titulo}"] if autores: partes_ref.append("Autores: " + "; ".join(autores)) if pub_year: partes_ref.append(f"Ano: {pub_year}") if pub_date: partes_ref.append(f"Data: {pub_date}") refs_por_id[cit_id] = " | ".join(partes_ref) partes = [refs_por_id[k] for k in sorted(refs_por_id.keys())] return "
".join(partes) def determine_mode(pergunta_lower: str) -> str: import re def contains_trigger(text: str, trigger: str) -> bool: trigger_clean = (trigger or "").strip().lower() if not text or not trigger_clean: return False pattern = re.escape(trigger_clean).replace(r"\ ", r"\s+") return re.search(rf"(? bool: return any(contains_trigger(text, trigger) for trigger in triggers) summary_keywords = ["resumo", "resumen", "summary"] table_keywords = ["tabela", "tabla", "table", "gera uma tabela", "gerar tabela"] doc_keywords = [ "documento", "documentos", "artigo", "artigos", "relatório", "relatorio", "norma", "normas", ] ideias_keywords = ["ideia", "ideias", "idea", "inovar"] summary_sections_kw = [ "introdução", "introducao", "introduccion", "introduction", "metodologia", "metodología", "methodology", "resultados", "results", ] section_summary_hints = ["seção", "secao", "secciones", "sections", "por seção", "por secao"] multi_doc_markers = [ "esses documentos", "documentos listados", "documentos acima", "documentos mostrados", "os documentos", "los documentos", "todos", "todas", "all", "ambos", "both", "os 3", "los 3", "3 documentos", "3 docs", ] has_summary = contains_any_trigger(pergunta_lower, summary_keywords) has_table = contains_any_trigger(pergunta_lower, table_keywords) has_section_target = contains_any_trigger(pergunta_lower, summary_sections_kw) has_section_hint = contains_any_trigger(pergunta_lower, section_summary_hints) has_doc_hint = contains_any_trigger(pergunta_lower, doc_keywords) has_multi = contains_any_trigger(pergunta_lower, multi_doc_markers) if contains_any_trigger(pergunta_lower, ideias_keywords): return "gerar_ideias" if has_table and has_multi: return "table_multi" if has_table: return "table" if has_summary and has_section_target and has_multi: return "summary_sections_multi" if has_summary and has_section_target: return "summary_sections" if has_section_target and has_section_hint: return "summary_sections" if has_summary and has_doc_hint: return "summary" return "chatbot" def is_download_request(question_lower: str) -> bool: download_terms = ["descargar", "download", "baixar"] target_terms = [ "resumen", "resumo", "informacion", "informação", "informacao", "informacion generada", "informação gerada", "informacao gerada", "conversacion", "conversação", "conversacao", "chat", "tabela", "table", "resposta", "respuesta", ] has_download_action = any(term in question_lower for term in download_terms) has_download_target = any(term in question_lower for term in target_terms) return has_download_action and has_download_target def find_last_assistant_response_index(messages: List[Dict[str, Any]]) -> Optional[int]: for idx in range(len(messages) - 1, -1, -1): msg = messages[idx] if msg.get("role") != "Assistente": continue if msg.get("is_download_prompt_response"): continue if msg.get("show_last_response_download_button"): continue return idx return None def process_user_question(question: str): # Se a pergunta já foi registrada no histórico e há um placeholder de # digitação logo depois, não duplicamos a mensagem do usuário. has_existing_user_message = ( bool(st.session_state.messages) and st.session_state.messages[-1].get("role") == "Você" and st.session_state.messages[-1].get("content") == question ) has_user_before_typing_placeholder = ( len(st.session_state.messages) >= 2 and st.session_state.messages[-1].get("is_typing") and st.session_state.messages[-2].get("role") == "Você" and st.session_state.messages[-2].get("content") == question ) if not (has_existing_user_message or has_user_before_typing_placeholder): st.session_state.messages.append({"role": "Você", "content": question}) if st.session_state.current_conversation_id is None: cid = datetime.now().strftime("%Y%m%d_%H%M%S") st.session_state.current_conversation_id = cid st.session_state.conversations[cid] = { "title": question[:30], "messages": [], "created_at": datetime.now().isoformat(), } if is_download_request(question.lower()): target_idx = find_last_assistant_response_index(st.session_state.messages[:-1]) has_target = target_idx is not None response_text = ( "Claro. Aqui está o arquivo da última resposta gerada." if has_target else "Ainda não há uma resposta anterior do chatbot para baixar." ) download_message = { "role": "Assistente", "content": response_text, "references": "", "show_last_response_download_button": has_target, "download_target_index": target_idx, "is_download_prompt_response": True, } if st.session_state.messages and st.session_state.messages[-1].get("is_typing"): st.session_state.messages[-1] = download_message else: st.session_state.messages.append(download_message) st.session_state.conversations[st.session_state.current_conversation_id][ "messages" ] = st.session_state.messages.copy() return # Mostrar o overlay "Analisando..." apenas na primeira resposta real mode = determine_mode(question.lower()) resp_text, fragments = chamar_api(question, mode=mode) refs_text = formatar_referencias(fragments) assistant_message = { "role": "Assistente", "content": resp_text, "references": refs_text, "is_download_prompt_response": False, } if st.session_state.messages and st.session_state.messages[-1].get("is_typing"): st.session_state.messages[-1] = assistant_message else: st.session_state.messages.append(assistant_message) # Guardar modo e ideias para fluxos posteriores (ex.: gerar apresentação) st.session_state["last_mode"] = mode if mode == "gerar_ideias": st.session_state["last_ideas_text"] = resp_text else: st.session_state.pop("last_ideas_text", None) st.session_state.conversations[st.session_state.current_conversation_id][ "messages" ] = st.session_state.messages.copy() def render_presentation_button(): """Mostra um botão para gerar apresentação quando a última resposta foi de ideias.""" if not st.session_state.get("last_mode") == "gerar_ideias": return ideas_text = st.session_state.get("last_ideas_text") or "" if not ideas_text.strip(): return # Simples split por quebras de linha; se o formato das ideias # mudar no futuro, podemos ajustar este parser. ideas = [line.strip("-• ").strip() for line in ideas_text.splitlines() if line.strip()] if not ideas: return with st.expander("📑 Gerar apresentação em PowerPoint a partir destas ideias"): default_title = ideas[0][:80] if ideas else "Apresentação" st.markdown( "

Título da apresentação

", unsafe_allow_html=True, ) title_col, _ = st.columns([3, 1]) with title_col: # Mostra o título sugerido como texto estático, não editável, # para não dar a impressão de ser um campo de chat. safe_title = html.escape(default_title) st.markdown( f"
{safe_title}
", unsafe_allow_html=True, ) title = default_title st.markdown("
", unsafe_allow_html=True) left_spacer, btn_col, right_spacer = st.columns([2, 3, 2]) with btn_col: st.markdown('
', unsafe_allow_html=True) gerar = st.button( "🎞️ Gerar apresentação em PPTX", key="btn_generate_pptx", use_container_width=True, ) st.markdown('
', unsafe_allow_html=True) if gerar: with st.spinner("Gerando apresentação..."): ok, msg, content = generate_presentation(ideas, title=title) if not ok or content is None: st.error(msg) else: st.success(msg) st.download_button( label="⬇️ Baixar apresentação em PowerPoint", data=content, file_name=f"{title or 'apresentacao'}.pptx", mime="application/vnd.openxmlformats-officedocument.presentationml.presentation", type="primary", use_container_width=True, )