testchat / app.py
vcollos's picture
testchgat
262692a verified
import html
import io
import re
import textwrap
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional, Tuple
import gradio as gr
import pandas as pd
DEFAULT_PATH = Path("base/wpp.csv")
MIN_PAGE_SIZE = 50
MAX_PAGE_SIZE = 2000
MAX_CONTEXT = 5000
MAX_WINDOW = 5000
@dataclass(frozen=True)
class Columns:
name: str
kind: str
content: str
def _normalize_columns(df: pd.DataFrame) -> Tuple[pd.DataFrame, Columns]:
col_map = {str(c).strip(): c for c in df.columns}
normalized = {str(c).strip().casefold(): str(c).strip() for c in df.columns}
def pick(*candidates: str) -> Optional[str]:
for cand in candidates:
key = cand.casefold()
if key in normalized:
return col_map[normalized[key]]
return None
name_col = pick("Nome", "Name")
kind_col = pick("Tipo", "Type")
content_col = pick("Conteúdo", "Conteudo", "Content", "Mensagem", "Message")
missing = [k for k, v in {"Nome": name_col, "Tipo": kind_col, "Conteúdo": content_col}.items() if v is None]
if missing:
raise ValueError(f"Arquivo precisa das colunas {missing}. Colunas encontradas: {list(df.columns)}")
cols = Columns(name=str(name_col), kind=str(kind_col), content=str(content_col))
out = df.rename(columns={cols.name: "Nome", cols.kind: "Tipo", cols.content: "Conteúdo"})
out["Nome"] = out["Nome"].astype(str).str.strip()
out["Tipo"] = out["Tipo"].astype(str).str.strip()
out["Conteúdo"] = out["Conteúdo"].astype(str)
return out, Columns(name="Nome", kind="Tipo", content="Conteúdo")
def load_data_from_path(path: str) -> pd.DataFrame:
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"Arquivo não encontrado: {p}")
if p.suffix.casefold() == ".parquet":
df = pd.read_parquet(str(p))
else:
df = pd.read_csv(str(p), sep=None, engine="python", dtype=str, keep_default_na=False)
df, _ = _normalize_columns(df)
return df
def load_data_from_upload(file: Any) -> pd.DataFrame:
if file is None:
raise ValueError("Nenhum arquivo enviado.")
name = getattr(file, "name", "") or ""
data: Optional[bytes] = None
if hasattr(file, "read"):
data = file.read()
if data is None and hasattr(file, "value"):
data = file.value
if data is None:
# Gradio geralmente entrega um caminho temporário via file.name
if name and Path(name).exists():
return load_data_from_path(name)
raise ValueError("Não consegui ler o conteúdo do upload.")
if Path(name).suffix.casefold() == ".parquet":
df = pd.read_parquet(io.BytesIO(data))
else:
df = pd.read_csv(io.BytesIO(data), sep=None, engine="python", dtype=str, keep_default_na=False)
df, _ = _normalize_columns(df)
return df
def message_side(kind: str) -> str:
k = (kind or "").strip().casefold()
if k in {"enviada", "enviado", "sent"}:
return "sent"
if k in {"recebida", "recebido", "received"}:
return "received"
return "received"
def clamp_int(value: int, min_value: int, max_value: int) -> int:
return max(min_value, min(int(value), max_value))
def normalize_page_size(value: int) -> int:
return clamp_int(int(value), MIN_PAGE_SIZE, MAX_PAGE_SIZE)
def normalize_context(value: int) -> int:
return clamp_int(int(value), 0, MAX_CONTEXT)
def page_start_for_focus(focus: int, total: int, page_size: int) -> int:
if total <= 0:
return 0
page_size = max(1, int(page_size))
max_start = max(0, total - page_size)
centered = int(focus) - (page_size // 2)
return clamp_int(centered, 0, max_start)
def build_chat_html(chat_slice: pd.DataFrame, query: str, focus_index: Optional[int], start_offset: int) -> str:
pattern = re.compile(re.escape(query), flags=re.IGNORECASE) if query else None
parts = ['<div class="chat-wrap">']
for i in range(len(chat_slice)):
global_i = int(start_offset) + i
row = chat_slice.iloc[i]
kind = str(row.get("Tipo", "") or "")
side = message_side(kind)
raw = str(row.get("Conteúdo", "") or "")
safe = html.escape(raw)
if pattern is not None:
safe = pattern.sub(lambda m: f"<mark>{m.group(0)}</mark>", safe)
focus = (focus_index is not None) and (global_i == int(focus_index))
bubble_style = "outline: 2px solid rgba(255, 153, 0, 0.55);" if focus else ""
parts.append(
textwrap.dedent(
f"""\
<div id="msg-{global_i}" class="msg-row {side}">
<div class="bubble-wrap">
<div class="bubble" style="{bubble_style}">{safe}</div>
<div class="meta">msg {global_i + 1}{html.escape(kind)}</div>
</div>
</div>
"""
).strip()
)
parts.append("</div>")
return "\n".join(parts)
def compute_matches(chat: pd.DataFrame, query: str) -> list[int]:
q = (query or "").strip()
if chat is None or chat.empty or not q:
return []
mask = chat["Conteúdo"].str.contains(q, case=False, na=False, regex=False)
return chat.index[mask].tolist()
def render_view(
chat: Optional[pd.DataFrame],
query: str,
matches: list[int],
match_pos: int,
page_start: int,
page_size: int,
focus: Optional[int],
context_before: int,
context_after: int,
) -> Tuple[str, str, list[int], int, int, int, Optional[int]]:
if chat is None or chat.empty:
return (
"<div class='empty'>Carregue um arquivo e selecione um contato.</div>",
"Sem conversa carregada.",
[],
0,
0,
int(page_size),
None,
)
total = int(len(chat))
page_size = normalize_page_size(int(page_size))
max_start = max(0, total - page_size)
page_start = clamp_int(int(page_start), 0, max_start)
focus_index = focus
if focus_index is not None:
focus_index = clamp_int(int(focus_index), 0, max(0, total - 1))
context_before = normalize_context(int(context_before))
context_after = normalize_context(int(context_after))
start = max(0, focus_index - context_before)
end = min(total, focus_index + context_after + 1)
# hard cap to avoid DOM explosion on huge chats
if (end - start) > MAX_WINDOW:
start = page_start_for_focus(focus_index, total=total, page_size=MAX_WINDOW)
end = min(total, start + MAX_WINDOW)
else:
start = page_start
end = min(total, start + page_size)
chat_html = build_chat_html(chat.iloc[start:end], query=query, focus_index=focus_index, start_offset=start)
occ = ""
if query.strip():
if matches:
match_pos = clamp_int(int(match_pos), 0, len(matches) - 1)
occ = f"{match_pos + 1}/{len(matches)} ocorrência(s)"
else:
match_pos = 0
occ = "0 ocorrência(s)"
info = f"Mostrando msgs {start + 1}{end} de {total} (janela {end - start}). {occ}".strip()
return chat_html, info, matches, int(match_pos), int(page_start), int(page_size), focus_index
def on_load(
path: str, upload: Any
) -> Tuple[pd.DataFrame, Any, pd.DataFrame, str, list[int], int, int, int, Optional[int], str, str, str, str]:
if upload is not None:
df = load_data_from_upload(upload)
source_desc = f"Upload: {getattr(upload, 'name', '')}"
else:
p = (path or "").strip() or str(DEFAULT_PATH)
df = load_data_from_path(p)
source_desc = f"Arquivo: {p}"
names = sorted([n for n in df["Nome"].dropna().unique().tolist() if str(n).strip() != ""])
if not names:
raise ValueError("Não encontrei nenhum valor em `Nome`.")
selected = names[0]
chat = df[df["Nome"] == selected].reset_index(drop=True)
total = int(len(chat))
page_size = 200
page_start = max(0, total - page_size)
matches: list[int] = []
match_pos = 0
focus = None
query = ""
html_chat, info, *_ = render_view(chat, query, matches, match_pos, page_start, page_size, focus, 20, 200)
return (
df,
gr.update(choices=names, value=selected),
chat,
html_chat,
matches,
match_pos,
page_start,
page_size,
focus,
info,
source_desc,
"",
"",
)
def on_select_contact(df: pd.DataFrame, name: str, page_size: int) -> Tuple[pd.DataFrame, str, list[int], int, int, int, Optional[int], str]:
if df is None or df.empty:
return None, "<div class='empty'>Carregue um arquivo primeiro.</div>", [], 0, 0, int(page_size), None, "Sem dados."
chat = df[df["Nome"] == name].reset_index(drop=True)
total = int(len(chat))
page_size = normalize_page_size(int(page_size))
page_start = max(0, total - page_size)
html_chat, info, matches, match_pos, page_start, page_size, focus = render_view(
chat, query="", matches=[], match_pos=0, page_start=page_start, page_size=page_size, focus=None, context_before=20, context_after=200
)
return chat, html_chat, matches, match_pos, page_start, page_size, focus, info
def on_search(
chat: pd.DataFrame, query_ui: str, page_size: int, context_before: int, context_after: int
) -> Tuple[str, list[int], int, int, int, Optional[int], str, str]:
q = (query_ui or "").strip()
matches = compute_matches(chat, q) if q else []
match_pos = 0
focus = matches[0] if matches else None
total = int(len(chat)) if chat is not None else 0
page_size = normalize_page_size(int(page_size))
page_start = max(0, total - page_size)
html_chat, info, matches, match_pos, page_start, page_size, focus = render_view(
chat,
query=q,
matches=matches,
match_pos=match_pos,
page_start=page_start,
page_size=page_size,
focus=focus,
context_before=context_before,
context_after=context_after,
)
return html_chat, matches, match_pos, page_start, page_size, focus, info, q
def on_prev_next(
chat: pd.DataFrame,
query: str,
matches: list[int],
match_pos: int,
page_start: int,
page_size: int,
context_before: int,
context_after: int,
direction: int,
) -> Tuple[str, list[int], int, int, int, Optional[int], str]:
if not matches:
html_chat, info, matches, match_pos, page_start, page_size, focus = render_view(
chat,
query=query,
matches=[],
match_pos=0,
page_start=page_start,
page_size=page_size,
focus=None,
context_before=context_before,
context_after=context_after,
)
return html_chat, matches, match_pos, page_start, page_size, focus, info
match_pos = clamp_int(int(match_pos) + int(direction), 0, len(matches) - 1)
focus = matches[match_pos]
html_chat, info, matches, match_pos, page_start, page_size, focus = render_view(
chat,
query=query,
matches=matches,
match_pos=match_pos,
page_start=page_start,
page_size=page_size,
focus=focus,
context_before=context_before,
context_after=context_after,
)
return html_chat, matches, match_pos, page_start, page_size, focus, info
def on_clear(
chat: pd.DataFrame, page_size: int, context_before: int, context_after: int
) -> Tuple[str, list[int], int, int, int, Optional[int], str, str, str]:
total = int(len(chat)) if chat is not None else 0
page_size = normalize_page_size(int(page_size))
page_start = max(0, total - page_size)
html_chat, info, matches, match_pos, page_start, page_size, focus = render_view(
chat,
query="",
matches=[],
match_pos=0,
page_start=page_start,
page_size=page_size,
focus=None,
context_before=context_before,
context_after=context_after,
)
return html_chat, matches, match_pos, page_start, page_size, focus, info, "", ""
def on_page(
chat: pd.DataFrame,
query: str,
matches: list[int],
match_pos: int,
page_start: int,
page_size: int,
context_before: int,
context_after: int,
action: str,
goto_msg: int,
) -> Tuple[str, list[int], int, int, int, Optional[int], str]:
total = int(len(chat)) if chat is not None else 0
page_size = normalize_page_size(int(page_size))
max_start = max(0, total - page_size)
page_start = clamp_int(int(page_start), 0, max_start)
focus: Optional[int] = None
if action == "prev":
page_start = max(0, page_start - page_size)
elif action == "next":
page_start = min(max_start, page_start + page_size)
elif action == "end":
page_start = max_start
elif action == "goto":
focus = clamp_int(int(goto_msg) - 1, 0, max(0, total - 1))
html_chat, info, matches, match_pos, page_start, page_size, focus = render_view(
chat,
query=query,
matches=matches,
match_pos=match_pos,
page_start=page_start,
page_size=page_size,
focus=focus,
context_before=context_before,
context_after=context_after,
)
return html_chat, matches, match_pos, page_start, page_size, focus, info
def export_parquet(df: pd.DataFrame, out_path: str) -> str:
if df is None or df.empty:
raise ValueError("Nada carregado para exportar.")
p = Path((out_path or "").strip() or str(DEFAULT_PATH.with_suffix(".parquet")))
p.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(str(p), index=False)
return str(p)
CSS = """
:root {
--findbar-offset: 92px;
}
@media (max-width: 900px) {
:root { --findbar-offset: 140px; }
}
body, #root, .gradio-container {
padding-top: var(--findbar-offset);
}
/* Barra "Localizar" */
#findbar {
position: fixed;
top: 0;
left: 0;
right: 0;
z-index: 1000;
box-sizing: border-box;
background: rgba(255,255,255,0.96);
backdrop-filter: blur(6px);
border-bottom: 1px solid rgba(0,0,0,0.08);
padding: 10px 12px 6px 12px;
}
.chat-wrap { max-width: 1280px; margin: 0 auto; padding: 10px 0 40px 0; }
.msg-row { display: flex; margin: 6px 0; }
.msg-row.received { justify-content: flex-start; }
.msg-row.sent { justify-content: flex-end; }
.bubble-wrap { max-width: 88%; }
.bubble {
padding: 10px 12px;
border-radius: 14px;
line-height: 1.25;
white-space: pre-wrap;
word-wrap: break-word;
border: 1px solid rgba(0,0,0,0.07);
}
.received .bubble { background: #f2f3f5; color: #111; border-top-left-radius: 6px; }
.sent .bubble { background: #d9fdd3; color: #111; border-top-right-radius: 6px; }
.meta { font-size: 12px; opacity: 0.65; margin: 2px 8px 0; }
mark { padding: 0 2px; border-radius: 3px; }
.empty { opacity: 0.7; padding: 18px; }
"""
with gr.Blocks(title="Chat CSV", css=CSS) as demo:
df_state = gr.State(None) # full df
chat_state = gr.State(None) # filtered df
matches_state = gr.State([]) # list[int]
match_pos_state = gr.State(0)
page_start_state = gr.State(0)
page_size_state = gr.State(200)
focus_state = gr.State(None) # Optional[int]
query_state = gr.State("") # committed query
gr.Markdown("# Chat CSV → visualização estilo mensageiro (Gradio)")
with gr.Row():
with gr.Column(scale=2):
path_in = gr.Textbox(label="Caminho (csv/parquet)", value=str(DEFAULT_PATH))
with gr.Column(scale=2):
upload_in = gr.File(label="Ou envie um arquivo (csv/parquet)", file_types=[".csv", ".parquet"])
with gr.Column(scale=1, min_width=160):
load_btn = gr.Button("Carregar", variant="primary")
with gr.Row():
contact = gr.Dropdown(label="Contato (Nome)", choices=[], value=None, interactive=True)
source_info = gr.Textbox(label="Fonte", interactive=False)
with gr.Row(elem_id="findbar"):
q_in = gr.Textbox(label="Localizar", placeholder="Digite e clique Buscar", scale=5)
search_btn = gr.Button("Buscar", scale=1, variant="primary")
prev_btn = gr.Button("◀", scale=1)
next_btn = gr.Button("▶", scale=1)
clear_btn = gr.Button("Limpar", scale=1)
with gr.Row():
info = gr.Markdown("Carregue um arquivo para começar.")
with gr.Row():
chat_html = gr.HTML("<div class='empty'>Carregue um arquivo e selecione um contato.</div>")
with gr.Accordion("Navegação (para chats grandes)", open=False):
with gr.Row():
page_size_in = gr.Number(label="Msgs/tela", value=200, precision=0)
goto_in = gr.Number(label="Ir para msg #", value=1, precision=0)
before_in = gr.Number(label="Contexto antes (busca)", value=20, precision=0)
after_in = gr.Number(label="Contexto depois (busca)", value=200, precision=0)
with gr.Row():
page_prev_btn = gr.Button("Página ◀")
page_next_btn = gr.Button("Página ▶")
page_end_btn = gr.Button("Ir para o fim")
goto_btn = gr.Button("Ir")
with gr.Accordion("Exportar (opcional)", open=False):
out_path = gr.Textbox(label="Salvar parquet em", value=str(DEFAULT_PATH.with_suffix(".parquet")))
export_btn = gr.Button("Exportar para Parquet")
export_out = gr.Textbox(label="Salvo em", interactive=False)
load_btn.click(
on_load,
inputs=[path_in, upload_in],
outputs=[
df_state,
contact,
chat_state,
chat_html,
matches_state,
match_pos_state,
page_start_state,
page_size_state,
focus_state,
info,
source_info,
q_in,
query_state,
],
)
contact.change(
on_select_contact,
inputs=[df_state, contact, page_size_in],
outputs=[chat_state, chat_html, matches_state, match_pos_state, page_start_state, page_size_state, focus_state, info],
)
search_btn.click(
on_search,
inputs=[chat_state, q_in, page_size_in, before_in, after_in],
outputs=[chat_html, matches_state, match_pos_state, page_start_state, page_size_state, focus_state, info, query_state],
)
prev_btn.click(
lambda chat, q, matches, pos, ps, psz, cb, ca: on_prev_next(chat, q, matches, pos, ps, psz, cb, ca, direction=-1),
inputs=[chat_state, query_state, matches_state, match_pos_state, page_start_state, page_size_state, before_in, after_in],
outputs=[chat_html, matches_state, match_pos_state, page_start_state, page_size_state, focus_state, info],
)
next_btn.click(
lambda chat, q, matches, pos, ps, psz, cb, ca: on_prev_next(chat, q, matches, pos, ps, psz, cb, ca, direction=+1),
inputs=[chat_state, query_state, matches_state, match_pos_state, page_start_state, page_size_state, before_in, after_in],
outputs=[chat_html, matches_state, match_pos_state, page_start_state, page_size_state, focus_state, info],
)
clear_btn.click(
on_clear,
inputs=[chat_state, page_size_in, before_in, after_in],
outputs=[chat_html, matches_state, match_pos_state, page_start_state, page_size_state, focus_state, info, q_in, query_state],
)
page_prev_btn.click(
lambda chat, q, matches, pos, ps, psz, cb, ca, goto: on_page(chat, q, matches, pos, ps, psz, cb, ca, "prev", goto),
inputs=[chat_state, query_state, matches_state, match_pos_state, page_start_state, page_size_in, before_in, after_in, goto_in],
outputs=[chat_html, matches_state, match_pos_state, page_start_state, page_size_state, focus_state, info],
)
page_next_btn.click(
lambda chat, q, matches, pos, ps, psz, cb, ca, goto: on_page(chat, q, matches, pos, ps, psz, cb, ca, "next", goto),
inputs=[chat_state, query_state, matches_state, match_pos_state, page_start_state, page_size_in, before_in, after_in, goto_in],
outputs=[chat_html, matches_state, match_pos_state, page_start_state, page_size_state, focus_state, info],
)
page_end_btn.click(
lambda chat, q, matches, pos, ps, psz, cb, ca, goto: on_page(chat, q, matches, pos, ps, psz, cb, ca, "end", goto),
inputs=[chat_state, query_state, matches_state, match_pos_state, page_start_state, page_size_in, before_in, after_in, goto_in],
outputs=[chat_html, matches_state, match_pos_state, page_start_state, page_size_state, focus_state, info],
)
goto_btn.click(
lambda chat, q, matches, pos, ps, psz, cb, ca, goto: on_page(chat, q, matches, pos, ps, psz, cb, ca, "goto", goto),
inputs=[chat_state, query_state, matches_state, match_pos_state, page_start_state, page_size_in, before_in, after_in, goto_in],
outputs=[chat_html, matches_state, match_pos_state, page_start_state, page_size_state, focus_state, info],
)
export_btn.click(export_parquet, inputs=[df_state, out_path], outputs=[export_out])
def _patch_gradio_client_bool_jsonschema() -> None:
"""
Workaround: gradio_client utils can't parse boolean JSON Schemas (e.g.
additionalProperties: false), causing Gradio startup to crash.
"""
try:
from gradio_client import utils as client_utils
except Exception:
return
original = client_utils._json_schema_to_python_type
def patched(schema: Any, defs: Any) -> str:
if isinstance(schema, bool):
return "Any"
return original(schema, defs)
client_utils._json_schema_to_python_type = patched # type: ignore[assignment]
if __name__ == "__main__":
_patch_gradio_client_bool_jsonschema()
demo.launch(server_name="127.0.0.1", share=False)