# app.py
# Koala DX — Single-Page In-Memory Dashboard + Responder Console (Passcode-gated)
# - Sources:
# (A) Cloudflare Worker (encrypted AES-GCM) pull → decrypt → in-memory
# (B) CSV upload fallback (reads into memory only)
# - Presets & KB:
# - Load from a single Excel: preset.xlsx
# - Each preset is a separate sheet (sheet name = preset name).
# Expected columns (flexible): summary, actions, qna, rewrite
# Alternative format: rows with columns [key, value]
# - Optional sheet "Hotlines" with columns: name, dial/url, hours, note
# - Q&A sheet: "Q&A" (or "QA") with columns: keyword, question, answer
# - RA sheet: "RA" with columns: keyword, action (or actions)
# - Volunteer Assist (Summary / Actions / Q&A):
# - Uses OPENAI_API_KEY from environment (HF Secret) and OPENAI_MODEL (default: gpt-4o-mini)
# - Sends last ~500 tokens of context (won’t cut sentences)
# - For Actions/Q&A: LLM picks 1–3 relevant keywords (JSON) and we display rows from Excel.
import os
import re
import io
import time
import json
import base64
import socket
import datetime as dt
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Tuple, Optional
from urllib.parse import urlparse
import requests
import pandas as pd
import streamlit as st
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from streamlit.components.v1 import html as st_html
import html as _pyhtml # for safe escaping of message text
from stats_tab import render_stats_tab
# ── Streamlit config MUST be the first Streamlit call ──────────────────────────
st.set_page_config(page_title="Koala DX — Dashboard + Responder", layout="wide")
# ── Compat helper for Streamlit rerun ──────────────────────────────────────────
rerun = getattr(st, "rerun", None) or getattr(st, "experimental_rerun", None)
# ── JST timezone for console timestamps ───────────────────────────────────────
try:
from zoneinfo import ZoneInfo # Python 3.9+
JPN_TZ = ZoneInfo("Asia/Tokyo")
except Exception:
JPN_TZ = dt.timezone(dt.timedelta(hours=9), name="JST")
def fmt_ts_ms_to_jst(ms):
try:
utc_dt = dt.datetime.fromtimestamp(int(ms) / 1000, tz=dt.timezone.utc)
jst_dt = utc_dt.astimezone(JPN_TZ)
return jst_dt.strftime("%Y-%m-%d %H:%M")
except Exception:
return str(ms)
def _esc(s: str) -> str:
return _pyhtml.escape(str(s or ""))
def render_conversation_bubbles(messages: list[dict], height_px: int = 520, key: str | None = None) -> None:
"""
Render all messages into a single scrollable HTML container (inside the iframe),
with its own CSS so formatting is preserved and the area always scrolls.
"""
rows = []
SYSTEM_TYPES = {"follow", "unfollow", "join", "leave", "block", "unblock", "member_joined", "member_left"}
for m in messages:
role = (m.get("role") or "").lower().strip()
text = (m.get("text") or "").strip()
msg_type = (m.get("type") or "").lower().strip()
ts_str = fmt_ts_ms_to_jst(m.get("ts") or m.get("ts_ms"))
responder = (m.get("responder") or "").strip()
is_system = False
effective_text = text
if not effective_text and msg_type:
effective_text = f"[{msg_type}]"
is_system = True
if is_system:
align = "center"; bubble_cls = "system"; meta = _esc(ts_str)
else:
align = "left" if role == "user" else "right"
bubble_cls = "user" if role == "user" else "account"
meta = _esc(ts_str + (f" · {responder}" if role != "user" and responder else ""))
rows.append(
f'
'
f'
{_esc(effective_text)}'
f'
{meta}
'
f'
'
f'
'
)
block_id = (key or "chat") + "_scroll"
css = f"""
"""
html_block = f"""
{css}
"""
# Some Streamlit builds don’t support key= on html(); try with key then without.
try:
if key is not None:
st_html(html_block, height=height_px + 24, scrolling=False, key=key)
else:
st_html(html_block, height=height_px + 24, scrolling=False)
except TypeError:
st_html(html_block, height=height_px + 24, scrolling=False)
# Debug log (keep last 200 lines)
# ---- Session state handle (must exist before any ss.* usage) ----
ss = st.session_state
ss.setdefault("debug_log", [])
def dbg(msg: str):
try:
ts = dt.datetime.now(JPN_TZ).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
ss["debug_log"].append(f"[{ts}] {msg}")
ss["debug_log"] = ss["debug_log"][-200:]
# =========================
# Defaults / Config (env overrides optional)
# =========================
# 🔒 Locked Worker base for PUBLIC HF Space (do not take from user input)
WORKER_BASE_LOCKED = os.getenv("WORKER_BASE_LOCKED", "https://c93a29b9.koalarely-alt.pages.dev/").strip()
def _normalize_and_lock_worker_base(url: str) -> str:
u = urlparse((url or "").strip())
if u.scheme != "https":
raise ValueError("WORKER_BASE_LOCKED must be https.")
host = (u.hostname or "").lower()
if not host:
raise ValueError("WORKER_BASE_LOCKED missing hostname.")
if u.username or u.password:
raise ValueError("Credentials in WORKER_BASE_LOCKED are not allowed.")
if u.port not in (None, 443):
raise ValueError("Unexpected port in WORKER_BASE_LOCKED.")
# Important: drop any path/query/fragment
return f"https://{host}"
try:
WORKER_BASE_DEFAULT = _normalize_and_lock_worker_base(WORKER_BASE_LOCKED)
except Exception as e:
# Fail closed: if misconfigured, don't run against arbitrary URLs
raise RuntimeError(f"Invalid WORKER_BASE_LOCKED: {e}")
# Optional (still secret): used only for /messages pull (NOT responder console)
READ_API_KEY_DEFAULT = os.getenv("READ_API_KEY", "")
SINCE_MS_DEFAULT = int(os.getenv("SINCE_MS", "0"))
FETCH_LIMIT_DEFAULT = int(os.getenv("FETCH_LIMIT", "1000"))
MAX_CTX_CHARS = int(os.getenv("MAX_CTX_CHARS", "3500"))
KEY_TTL_MIN = int(os.getenv("D1_KEY_TTL_MIN", "30")) # AES key session TTL (minutes)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5-nano")
OPENAI_URL = os.getenv("OPENAI_URL", "https://api.openai.com/v1/responses")
def _extract_responses_output_text(resp_json: dict) -> str:
"""
Extract assistant text from Responses API JSON.
Works for:
- SDK-like helper field: resp_json["output_text"] (if present)
- Raw HTTP shape: resp_json["output"] items with message.content[type=output_text]
"""
if not isinstance(resp_json, dict):
return ""
# Some clients may include this helper-like field
ot = resp_json.get("output_text")
if isinstance(ot, str) and ot.strip():
return ot.strip()
out_chunks: list[str] = []
for item in (resp_json.get("output") or []):
if not isinstance(item, dict):
continue
if item.get("type") != "message":
continue
for c in (item.get("content") or []):
if not isinstance(c, dict):
continue
if c.get("type") == "output_text":
t = c.get("text")
if isinstance(t, str) and t:
out_chunks.append(t)
return "".join(out_chunks).strip()
def _trim(s: str, limit: int = MAX_CTX_CHARS) -> str:
s = s or ""
return s if len(s) <= limit else s[-limit:]
# =========================
# Session state (in-memory DB + auth + presets)
# =========================
ss = st.session_state
ss.setdefault("messages_df", pd.DataFrame(columns=["ts", "user_id", "display_name", "role", "text", "risk_level", "risk_terms"]))
ss.setdefault("sec_cache", {})
ss.setdefault("worker_base", WORKER_BASE_DEFAULT)
ss.setdefault("token", "")
ss.setdefault("scope", "")
ss.setdefault("responder", "")
ss.setdefault("selected_user", "")
ss.setdefault("selected_user_label", "")
ss.setdefault("nickname_current", "")
ss.setdefault("last_refresh", 0.0)
ss.setdefault("auto_set_nick_once_for", "")
ss.setdefault("presets", {}) # {sheet_name: {"summary":..., "actions":..., "qna":..., "rewrite":...}}
ss.setdefault("hotlines", []) # [{"name":...,"dial":...,"url":...,"hours":...,"note":...}]
ss.setdefault("active_preset", "") # selected sheet name
ss.setdefault("user_index", {}) # user_id -> {"nickname":..., "display_name":..., "suffix":...}
ss.setdefault("llm_out", {}) # per-user outputs: {"uid": {"summary":"...", "actions":"...", "qna":"..."}}
# KB state
ss.setdefault("qa_rows", pd.DataFrame(columns=["keyword","question","answer"]))
ss.setdefault("ra_rows", pd.DataFrame(columns=["keyword","action"]))
ss.setdefault("qa_keywords", [])
ss.setdefault("ra_keywords", [])
ss.setdefault("qa_selected", [])
ss.setdefault("ra_selected", [])
# =========================
# Demo KB defaults (used when preset.xlsx not loaded)
# =========================
DEFAULT_PRESET_NAME = "Demo (Built-in)"
DEFAULT_PRESETS = {
DEFAULT_PRESET_NAME: {
"summary": (
""
),
"actions": (
"(デモ用)\n"
"- まず安全確認(今いる場所・加害者が近くにいるか・怪我の有無)\n"
"- 今夜の居場所/食事/お金/通信手段の確保\n"
"- 相談先(自治体・支援団体・医療)へつなぐ\n"
),
"qna": (
"(デモ用)\n"
"- すぐに役立つQ&Aの候補を表示します(キーワード選択)\n"
),
"rewrite": (
"(デモ用・安全寄りの言い換えテンプレ)\n"
"まず、ここまで一人で抱えてきたこと自体がとても大変でしたね。\n"
"今の安全を確認したいです。今いらっしゃる場所は安全ですか?(加害者や危険な人は近くにいますか?)\n"
"もし今すぐ危険がある/怪我がある場合は、ためらわずに緊急連絡(119/110)も選択肢です。\n"
"差し支えなければ、①年齢 ②妊娠週数(分かれば) ③同居/家族状況 ④今日困っていること(お金/住まい/暴力/体調)を教えてください。\n"
),
}
}
# Q&A demo rows (keyword, question, answer)
DEFAULT_QA_ROWS = [
# Underage pregnancy
{
"keyword": "未成年の妊娠",
"question": "未成年で妊娠したかもしれません。まず何をすればいい?",
"answer": (
"不安でいっぱいになりますよね。まずは妊娠の確認(検査薬/産婦人科)と、体調の安全確保が優先です。\n"
"次に、信頼できる大人(家族・学校の養護教諭・自治体の相談窓口・支援団体)に早めに相談して、"
"選択肢(継続/中断/出産後の支援)を一緒に整理するのが安全です。"
),
},
{
"keyword": "未成年の妊娠",
"question": "親に言えない・怒られそうで怖い",
"answer": (
"言い出しにくいのは自然です。まずは“安全に相談できる大人”を確保しましょう。\n"
"学校(養護教諭/スクールカウンセラー)や自治体の女性相談など、家族以外の窓口から入るのも方法です。"
),
},
# DV / domestic abuse
{
"keyword": "DV・家庭内暴力",
"question": "パートナーが怖い。暴力や脅しがあるかもしれない",
"answer": (
"怖い中で相談してくれてありがとう。最優先は“今の安全”です。\n"
"加害者が近くにいる/今すぐ危険なら緊急連絡(110/119)も選択肢です。\n"
"可能なら、①今いる場所は安全か ②怪我の有無 ③避難できる先(友人/家族/支援)を一緒に確認しましょう。"
),
},
{
"keyword": "DV・家庭内暴力",
"question": "スマホを見られているかも。連絡すると危ない",
"answer": (
"それは重要なサインです。安全のため、履歴が残りにくい手段(別端末/安全な場所のWi-Fi/短いやり取り)を検討しましょう。\n"
"“いま危険かどうか”の確認を優先し、避難計画(荷物・身分証・現金・連絡先)を最小限で整えるのも有効です。"
),
},
# Financial hardship
{
"keyword": "お金がない・生活困窮",
"question": "お金がなくて病院や生活が不安。どうしたらいい?",
"answer": (
"つらい状況ですね。まず“今日〜今週を乗り切る支え”を確保しましょう。\n"
"自治体の相談(福祉/生活支援)、妊娠中なら母子保健の窓口、食料や物資支援の団体など、使える制度があります。\n"
"差し支えなければ、①住まい ②収入状況 ③妊娠週数 ④今いちばん足りないもの(食費/家賃/交通/受診)を教えてください。"
),
},
# Mental health
{
"keyword": "メンタル不調・不安",
"question": "不安で眠れない、涙が止まらない。どうしたらいい?",
"answer": (
"しんどい中でよく話してくれました。まず“いま少し楽になること”を一緒に探しましょう。\n"
"睡眠や食事が崩れている時は、短い休息(呼吸/水分/温かい飲み物)→支援につなぐ順が安全です。\n"
"もし自分を傷つけたい気持ちが強い/今すぐ危険なら、緊急連絡や身近な人への連絡を優先してください。"
),
},
# Housing / safety
{
"keyword": "住まい・避難",
"question": "家にいられない。今夜の居場所がない",
"answer": (
"それはとても切迫しています。まず今夜の安全な居場所を確保することが最優先です。\n"
"自治体の窓口や支援団体、状況によっては緊急の避難先(保護)につながる可能性があります。\n"
"今いる場所の安全と、移動できる手段(交通費・連絡手段)を確認しましょう。"
),
},
]
# RA demo rows (keyword, action)
DEFAULT_RA_ROWS = [
# Underage pregnancy
{"keyword": "未成年の妊娠", "action": "妊娠確認(検査薬/産婦人科)と体調の安全確保を優先する"},
{"keyword": "未成年の妊娠", "action": "安全に相談できる大人(学校/自治体/支援団体)を確保する"},
{"keyword": "未成年の妊娠", "action": "選択肢(継続/中断/出産後支援)を“急がせず”整理する"},
# DV
{"keyword": "DV・家庭内暴力", "action": "今すぐ危険か確認(加害者が近い/怪我/監視)。緊急なら110/119も選択肢"},
{"keyword": "DV・家庭内暴力", "action": "避難計画:身分証・現金・薬・連絡先を最小限で準備"},
{"keyword": "DV・家庭内暴力", "action": "デジタル安全:履歴が残りにくい連絡手段を提案(状況に応じて)"},
# Money
{"keyword": "お金がない・生活困窮", "action": "今日〜今週の優先順位を確認(食事/家賃/受診/交通)"},
{"keyword": "お金がない・生活困窮", "action": "自治体の福祉・母子保健・生活支援につなぐ(制度の案内)"},
{"keyword": "お金がない・生活困窮", "action": "緊急物資/食料支援の選択肢を提示(地域により差あり)"},
# Mental
{"keyword": "メンタル不調・不安", "action": "睡眠/食事/安全の簡易チェック(緊急性の見立て)"},
{"keyword": "メンタル不調・不安", "action": "自傷の意図がある場合は安全確保を最優先(身近な人/緊急連絡/相談窓口)"},
{"keyword": "メンタル不調・不安", "action": "医療/相談機関につなぐ前に、短い落ち着ける行動(呼吸/水分/休息)を提案"},
# Housing
{"keyword": "住まい・避難", "action": "今夜の居場所の確保を最優先(安全な場所へ移動できるか確認)"},
{"keyword": "住まい・避難", "action": "避難先候補(友人/家族/支援)と移動手段の確認"},
]
DEFAULT_HOTLINES = [
{"name": "緊急", "dial": "110 / 119", "url": "", "hours": "24h", "note": "今すぐ危険・怪我がある場合"},
{"name": "自治体の女性相談・母子保健(例)", "dial": "", "url": "", "hours": "", "note": "地域の窓口へ(市区町村で名称が異なります)"},
]
def _demo_unique_keywords(df: pd.DataFrame, col: str = "keyword", cap: int = 500) -> List[str]:
if df is None or df.empty or (col not in df.columns):
return []
kws = sorted({str(x).strip() for x in df[col].tolist() if str(x).strip()})
return kws[:cap]
def _load_demo_kb_if_empty():
"""
Load built-in demo KB only when nothing is loaded from preset.xlsx yet.
- If user later loads Excel, the Excel KB will overwrite these.
"""
if ss.get("_demo_kb_bootstrapped", False):
return
# Presets
if not ss.get("presets"):
ss["presets"] = dict(DEFAULT_PRESETS)
ss["active_preset"] = DEFAULT_PRESET_NAME
# Hotlines
if not ss.get("hotlines"):
ss["hotlines"] = list(DEFAULT_HOTLINES)
# Q&A
if ss.get("qa_rows") is None or ss["qa_rows"].empty:
ss["qa_rows"] = pd.DataFrame(DEFAULT_QA_ROWS)
ss["qa_keywords"] = _demo_unique_keywords(ss["qa_rows"], "keyword")
# RA
if ss.get("ra_rows") is None or ss["ra_rows"].empty:
ss["ra_rows"] = pd.DataFrame(DEFAULT_RA_ROWS)
ss["ra_keywords"] = _demo_unique_keywords(ss["ra_rows"], "keyword")
ss["_demo_kb_bootstrapped"] = True
# Call once at startup (after session_state defaults exist)
# =========================
# Risk detection
# =========================
# =========================
# Risk detection patterns (JP) — expanded for Koala DX dummy wording
# =========================
# HIGH: できるだけ「明確な自傷・自殺意図」「切迫」「直近の実行を示唆」「重篤な緊急症状」を中心に
RISK_PATTERNS_HIGH = [
# 明確な希死念慮/自殺表明
r"死にたい",
r"自殺(する|したい)",
r"消えたい",
r"生きていたくない",
r"もう生きられない",
r"もう終わりにしたい",
r"楽になりたい.*(死|消)",
r"天国(に|へ)行きたい",
# 具体性・切迫感(時間/今すぐ 等)
r"(今すぐ|今日|今夜|これから).*(死|自殺|消え|終わり)",
r"(最後|遺書|さよなら).*(書|残|言)",
r"(もう|ほんとに).*(限界|無理).*(死|消え|終わり)",
# 自傷(明示)
r"リスカ",
r"自傷(する|した)",
r"(切った|切ってしまった|傷(を)?作った)",
r"(血が|出血).*(止まらない|止まらん|やばい)",
r"(意識が|気を)失(った|いそう)",
]
# MEDIUM: 直接ワードが弱い/曖昧でも「強い絶望」「助け求め」「DV/虐待の危険」を拾う
RISK_PATTERNS_MEDIUM = [
# 希死念慮の婉曲表現
r"いなくなりたい",
r"消えてしまいたい",
r"全部やめたい",
r"もう無理",
r"もういや",
r"限界",
r"しんどい",
r"苦しい",
r"つらい",
r"壊れそう",
r"頭がおかしくなりそう",
r"何も感じない",
r"何もできない",
# 強い孤立/救援要請(あなたのダミーで出やすい)
r"助けて",
r"誰か助けて",
r"一人(ぼっち|きり)で",
r"頼れる人がいない",
r"話せる人がいない",
r"もうどうしたらいいかわからない",
r"限界.*助けて",
r"泣(いて|き)ばかり",
r"眠(れない|れなくて)",
r"食べ(られない|れなくて)",
# 産後/育児のメンタル(過度に広げすぎない範囲)
r"産後(うつ|鬱)",
r"育児(うつ|鬱)",
r"パニック",
r"発作",
r"過呼吸",
r"動悸が",
r"息(が)?苦しい",
r"ぼーっとして",
r"現実感がない",
r"涙が止まらない",
# DV/家族関係での危険(“今危ない”ニュアンスを拾う)
r"(殴られた|叩かれた|蹴られた)",
r"(怒鳴られ|脅され|恐喝され)",
r"家(に)?帰れない",
r"(怖い|こわい).*(夫|旦那|パートナー|家族)",
r"(監視|束縛|鍵を|スマホを).*(取られ|壊され|見られ)",
r"(逃げたい|避難したい)",
# 子どもへの危険サイン(直接的すぎないが要注意ワード)
r"子ども(を)?(叩いて|殴って|怒鳴って)しまった",
r"手が出そう",
r"(放置|置き去り)してしまいそう",
r"子どもが(怖い|苦手)",
]
RISK_PATTERNS = RISK_PATTERNS_HIGH + RISK_PATTERNS_MEDIUM
def detect_risk(text: str, role: str | None = None) -> Tuple[str, List[str]]:
"""
Risk detection should ONLY evaluate end-user messages.
If role is not "user", returns LOW regardless of text.
"""
r = (role or "").lower().strip()
if r != "user":
return "LOW", []
text_norm = str(text or "").strip()
hits = []
for pat in RISK_PATTERNS:
if re.search(pat, text_norm, flags=re.IGNORECASE):
hits.append(pat)
if hits:
high_terms = [h for h in hits if re.search(r"(死にたい|自殺|リスカ|飛び降り|首を)", h)]
level = "HIGH" if high_terms else "MEDIUM"
return level, hits
return "LOW", []
# =========================
# AES key handling (session, TTL)
# =========================
def _b64_to_bytes(s: Optional[str]) -> Optional[bytes]:
if not s: return None
t = "".join(str(s).split())
t += "=" * ((4 - len(t) % 4) % 4)
return base64.b64decode(t)
def _set_session_key(key_bytes: bytes):
ss["d1_key_b64"] = base64.b64encode(key_bytes).decode()
ss["d1_key_set_ts"] = datetime.utcnow().timestamp()
def _get_session_key() -> Optional[bytes]:
b64 = ss.get("d1_key_b64")
ts = ss.get("d1_key_set_ts", 0)
if not b64:
return None
if (datetime.utcnow().timestamp() - ts) > KEY_TTL_MIN * 60:
ss.pop("d1_key_b64", None)
ss.pop("d1_key_set_ts", None)
return None
return base64.b64decode(b64)
def _clear_session_key():
ss.pop("d1_key_b64", None)
ss.pop("d1_key_set_ts", None)
# =========================
# Cloudflare Worker pull + decrypt (in-memory)
# =========================
def _http_get(url: str, headers: Optional[dict] = None, params: Optional[dict] = None, timeout: int = 30):
r = requests.get(url, headers=headers or {}, params=params or {}, timeout=timeout)
r.raise_for_status()
return r
def fetch_since(base_url: str, api_key: str, since_ms: int = 0, limit: int = 1000) -> List[Dict]:
url = f"{base_url.rstrip('/')}/messages"
headers = {"x-api-key": api_key} if api_key else {}
params = {"since": int(since_ms), "limit": int(limit)}
r = requests.get(url, headers=headers, params=params, timeout=30)
r.raise_for_status()
data = r.json()
if isinstance(data, dict) and "results" in data:
return data["results"] or []
return data if isinstance(data, list) else []
def decrypt_row(aes_key_bytes: bytes, row: Dict) -> Optional[str]:
ct_b64 = row.get("ciphertext_b64")
iv_b64 = row.get("iv_b64")
if not ct_b64 or not iv_b64:
return None
try:
ct = _b64_to_bytes(ct_b64)
iv = _b64_to_bytes(iv_b64)
if not ct or not iv:
return None
return AESGCM(aes_key_bytes).decrypt(iv, ct, None).decode("utf-8")
except Exception:
return None
def import_worker_to_memory(
base_url: str,
api_key: str,
aes_key_b64: str,
since_ms: int = 0,
limit: int = 1000,
overwrite: bool = False,
) -> Tuple[bool, str]:
"""
Fetch encrypted rows from Worker (/messages), decrypt (AES-GCM), and load into ss["messages_df"].
Improvements vs previous version:
- Enforces since_ms + limit (assuming fetch_since() actually sends query params)
- Robust handling for mixed row schemas:
* accepts plaintext row["text"] if present
* tries alternate ciphertext/iv field names if Worker schema differs
- Returns detailed counters: fetched vs kept vs skipped (missing vs decrypt_fail)
"""
# ---- validate AES key ----
try:
aes_key_bytes = _b64_to_bytes((aes_key_b64 or "").strip())
if not aes_key_bytes or len(aes_key_bytes) not in (16, 24, 32):
return False, "Invalid AES key: must be base64 of 16/24/32 bytes."
except Exception:
return False, "Invalid AES key: base64 decode error."
# ---- fetch rows (Worker) ----
try:
rows = fetch_since(base_url, api_key, since_ms=int(since_ms), limit=int(limit))
except Exception as e:
return False, f"Fetch failed: {e}"
if not rows:
return True, "No rows returned from Worker."
# ---- local helpers for schema-flex decrypt ----
def _first_present(d: Dict, keys: List[str]) -> Optional[str]:
for k in keys:
v = d.get(k)
if v is None:
continue
s = str(v).strip()
if s:
return s
return None
def _decrypt_or_plain(row: Dict) -> Tuple[Optional[str], str]:
"""
Returns (text, reason)
reason in {"ok_plain", "ok_decrypt", "missing_cipher", "decrypt_fail"}
"""
# 1) plaintext fallback if Worker already gives it
plain = _first_present(row, ["text", "plain", "message", "body"])
if plain:
return plain, "ok_plain"
# 2) try decrypt with flexible field names
ct_b64 = _first_present(row, ["ciphertext_b64", "cipher_b64", "ciphertext", "ct_b64", "ct"])
iv_b64 = _first_present(row, ["iv_b64", "nonce_b64", "iv", "nonce"])
if not ct_b64 or not iv_b64:
return None, "missing_cipher"
# temporarily map into the expected keys for decrypt_row()
tmp = dict(row)
tmp["ciphertext_b64"] = ct_b64
tmp["iv_b64"] = iv_b64
out = decrypt_row(aes_key_bytes, tmp)
if out is None:
return None, "decrypt_fail"
return out, "ok_decrypt"
# ---- instrumentation counters ----
total = len(rows)
kept = 0
ok_plain = 0
ok_decrypt = 0
missing_cipher = 0
decrypt_fail = 0
example_missing_keys: List[str] = []
example_fail_keys: List[str] = []
# Optional: early sanity check (but don’t hard-fail if first row is a non-message row)
# We'll just proceed and report counters.
new_records = []
for row in rows:
text, reason = _decrypt_or_plain(row)
if text is None:
if reason == "missing_cipher":
missing_cipher += 1
if len(example_missing_keys) < 3:
example_missing_keys.append(", ".join(sorted(list(row.keys()))[:25]))
else:
decrypt_fail += 1
if len(example_fail_keys) < 3:
example_fail_keys.append(", ".join(sorted(list(row.keys()))[:25]))
continue
kept += 1
if reason == "ok_plain":
ok_plain += 1
else:
ok_decrypt += 1
# ts handling: prefer ts_ms (epoch ms). Fall back to ts iso, else now.
ts_iso = None
ts_ms = row.get("ts_ms")
if ts_ms is not None and str(ts_ms).strip() != "":
try:
ts_iso = datetime.fromtimestamp(int(ts_ms) / 1000, tz=timezone.utc).isoformat()
except Exception:
ts_iso = None
if not ts_iso:
ts_raw = row.get("ts")
if ts_raw and str(ts_raw).strip():
# If already ISO-like, keep it; otherwise fallback to now
ts_iso = str(ts_raw).strip()
else:
ts_iso = datetime.now(timezone.utc).isoformat()
user_id = str(row.get("user_id") or "U_unknown")
display_name = str(row.get("display_name") or "").strip()
role = str(row.get("role") or "user").strip().lower() # allow Worker to supply
lvl, terms = detect_risk(text, role=role)
new_records.append(
{
"ts": ts_iso,
"user_id": user_id,
"display_name": display_name,
"role": role,
"text": text,
"risk_level": lvl,
"risk_terms": terms,
}
)
new_df = pd.DataFrame(new_records)
if overwrite:
ss["messages_df"] = new_df.reset_index(drop=True)
else:
ss["messages_df"] = pd.concat([ss["messages_df"], new_df], ignore_index=True)
msg = (
f"Fetched {total} rows from Worker. "
f"Kept {kept} (plain={ok_plain}, decrypted={ok_decrypt}). "
f"Skipped {missing_cipher} (missing iv/cipher) + {decrypt_fail} (decrypt failed)."
)
# Add tiny hints if we skipped a lot (no huge dumps)
if missing_cipher > 0 and example_missing_keys:
msg += " Example missing-key row keys: [" + " | ".join(example_missing_keys) + "]"
if decrypt_fail > 0 and example_fail_keys:
msg += " Example decrypt-fail row keys: [" + " | ".join(example_fail_keys) + "]"
return True, msg
# =========================
# CSV → memory (no disk)
# =========================
def import_csv_to_memory(file, overwrite: bool = True) -> Tuple[bool, str]:
try:
df = pd.read_csv(file, dtype=str).fillna("")
except Exception as e:
return False, f"Failed to read CSV: {e}"
required = {"user_id", "role", "text"}
if not required.issubset(set(df.columns)):
return False, f"CSV must contain columns: {', '.join(sorted(required))}"
now = datetime.now(timezone.utc)
if "ts" not in df.columns or all(str(x).strip() == "" for x in df["ts"]):
base_time = now - timedelta(minutes=len(df))
df["ts"] = [(base_time + timedelta(minutes=i)).isoformat() for i in range(len(df))]
if "display_name" not in df.columns:
df["display_name"] = ""
levels, terms_list = [], []
for role, text in zip(df["role"].tolist(), df["text"].tolist()):
lvl, terms = detect_risk(text, role=role)
levels.append(lvl)
terms_list.append(terms)
df = df[["ts", "user_id", "display_name", "role", "text", "risk_level", "risk_terms"]]
if overwrite:
ss["messages_df"] = df.reset_index(drop=True)
else:
ss["messages_df"] = pd.concat([ss["messages_df"], df], ignore_index=True)
return True, f"Imported {len(df)} rows into memory."
# =========================
# Presets & KB from Excel (no hardcoded)
# =========================
def parse_preset_sheet(df: pd.DataFrame) -> Dict[str, str]:
"""
Accept either:
A) Row-wise key/value columns: ["key","value"] (case-insensitive) OR
B) Column-wise single-row: columns include any of {summary, actions, qna, rewrite}
If multiple rows, join non-empty cells with two newlines.
"""
out = {"summary": "", "actions": "", "qna": "", "rewrite": ""}
cols_lower = {c.lower(): c for c in df.columns}
# A) key/value
if ("key" in cols_lower) and ("value" in cols_lower):
key_col = cols_lower["key"]
val_col = cols_lower["value"]
for _, r in df.iterrows():
k = str(r.get(key_col, "")).strip().lower()
v = str(r.get(val_col, "")).strip()
if k in out:
out[k] = v
return out
# B) column-wise
for k in list(out.keys()):
if k in cols_lower:
col = cols_lower[k]
vals = [str(x).strip() for x in df[col].tolist() if str(x).strip()]
out[k] = "\n\n".join(vals)
return out
def _normalize_cols(df: pd.DataFrame) -> pd.DataFrame:
df2 = df.copy()
df2.columns = [str(c).strip() for c in df2.columns]
return df2
def load_presets_and_kb_from_xlsx(filelike) -> Tuple[Dict[str, Dict[str, str]], List[Dict[str, str]], pd.DataFrame, pd.DataFrame]:
"""
Returns (presets_dict, hotlines_list, qa_df, ra_df)
- Presets: any sheet EXCEPT "Hotlines", "Q&A", "QA", "RA"
Expected columns: summary, actions, qna, rewrite (or key/value)
- Hotlines: optional sheet "Hotlines" with name, dial/url, hours, note
- Q&A: sheet "Q&A" (preferred) or "QA" with columns: keyword, question, answer
- RA: sheet "RA" with columns: keyword, action (or actions)
"""
presets: Dict[str, Dict[str, str]] = {}
hotlines: List[Dict[str, str]] = []
qa_df = pd.DataFrame(columns=["keyword", "question", "answer"])
ra_df = pd.DataFrame(columns=["keyword", "action"])
xls = pd.ExcelFile(filelike)
for sheet in xls.sheet_names:
df = _normalize_cols(xls.parse(sheet).fillna(""))
low = sheet.strip().lower()
if low == "hotlines":
name_col = next((c for c in df.columns if c.lower() == "name"), None)
dial_col = next((c for c in df.columns if c.lower() in ("dial","phone","tel")), None)
url_col = next((c for c in df.columns if c.lower() in ("url","link")), None)
hours_col= next((c for c in df.columns if c.lower() == "hours"), None)
note_col = next((c for c in df.columns if c.lower() == "note"), None)
for _, r in df.iterrows():
name = str(r.get(name_col,"")).strip() if name_col else ""
if not name:
continue
hotlines.append({
"name": name,
"dial": str(r.get(dial_col,"")).strip() if dial_col else "",
"url": str(r.get(url_col,"")).strip() if url_col else "",
"hours": str(r.get(hours_col,"")).strip() if hours_col else "",
"note": str(r.get(note_col,"")).strip() if note_col else "",
})
continue
if low in ("q&a","qa"):
# Columns: keyword, question, answer
kcol = next((c for c in df.columns if c.lower() == "keyword"), None)
qcol = next((c for c in df.columns if c.lower() == "question"), None)
acol = next((c for c in df.columns if c.lower() == "answer"), None)
if kcol and (qcol or acol):
tmp = pd.DataFrame({
"keyword": df[kcol].astype(str).str.strip(),
"question": df[qcol].astype(str).str.strip() if qcol else "",
"answer": df[acol].astype(str).str.strip() if acol else "",
})
qa_df = tmp[(tmp["keyword"]!="") & ((tmp["question"]!="") | (tmp["answer"]!=""))].reset_index(drop=True)
continue
if low == "ra":
# Columns: keyword, action (or actions)
kcol = next((c for c in df.columns if c.lower() == "keyword"), None)
acol = next((c for c in df.columns if c.lower() in ("action","actions")), None)
if kcol and acol:
tmp = pd.DataFrame({
"keyword": df[kcol].astype(str).str.strip(),
"action": df[acol].astype(str).str.strip(),
})
ra_df = tmp[(tmp["keyword"]!="") & (tmp["action"]!="")].reset_index(drop=True)
continue
# Otherwise: treat as a preset sheet
presets[sheet] = parse_preset_sheet(df)
return presets, hotlines, qa_df, ra_df
def unique_keywords_from_df(df: pd.DataFrame, col: str = "keyword") -> List[str]:
if df is None or df.empty or (col not in df.columns):
return []
kws = sorted({str(x).strip() for x in df[col].tolist() if str(x).strip()})
return kws[:500] # sanity cap
def _strip_json_codefences(s: str) -> str:
# remove ```json ... ``` fences if present
s = str(s or "")
if "```" in s:
s = re.sub(r"```(?:json)?\s*(.*?)\s*```", r"\1", s, flags=re.DOTALL | re.IGNORECASE)
return s.strip()
def _extract_first_json_blob(s: str) -> str:
"""
Try to pull the first JSON-looking object/array from a messy response.
Returns "" if nothing plausible found.
"""
s = _strip_json_codefences(s)
# Fast path: already pure JSON
if (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")):
return s
# Try to find an array first, then an object (greedy, but usually fine here)
m = re.search(r"\[.*\]", s, flags=re.DOTALL)
if m:
return m.group(0).strip()
m = re.search(r"\{.*\}", s, flags=re.DOTALL)
if m:
return m.group(0).strip()
return ""
def safe_parse_json_keywords(s: str) -> List[str]:
"""
Accepts either:
- ["kw1","kw2"]
- {"keywords":["kw1","kw2"]}
Returns list[str] (deduped, up to 3).
"""
try:
blob = _extract_first_json_blob(s)
if not blob:
return []
data = json.loads(blob)
if isinstance(data, list):
arr = data
elif isinstance(data, dict):
# common variants
if "keywords" in data:
arr = data["keywords"]
elif "keyword" in data:
arr = data["keyword"]
elif "selected" in data:
arr = data["selected"]
else:
return []
else:
return []
# normalize to list
if isinstance(arr, (str, int, float)):
arr = [arr]
out: List[str] = []
for x in (arr or []):
sx = str(x).strip()
if sx and sx not in out:
out.append(sx)
return out[:3]
except Exception:
return []
def _fallback_keywords_from_text(resp_text: str, all_keywords: List[str], top_k: int = 3) -> List[str]:
"""
Lenient fallback:
If the model didn't return valid JSON, try to pick keywords that literally appear
in the response text (case-insensitive for ASCII; JP unaffected).
"""
text = str(resp_text or "").strip()
if not text:
return []
text_low = text.lower()
hits: List[Tuple[int, int, str]] = [] # (pos, -len, keyword)
for kw in all_keywords:
k = str(kw or "").strip()
if not k:
continue
k_low = k.lower()
pos = text_low.find(k_low)
if pos >= 0:
hits.append((pos, -len(k_low), k))
if not hits:
return []
hits.sort()
picked: List[str] = []
for _, __, k in hits:
if k not in picked:
picked.append(k)
if len(picked) >= top_k:
break
return picked
def llm_pick_keywords(context_text: str, all_keywords: List[str], kind: str = "Q&A", top_k: int = 3) -> List[str]:
"""
Calls OpenAI (Responses API) to select 1–3 keywords from all_keywords that best match the context_text.
More lenient parsing:
1) Try strict JSON parse
2) If not parseable, fallback to literal keyword mentions in the model output
Returns a (possibly empty) list of selected keywords.
"""
if not OPENAI_API_KEY:
return []
if not all_keywords:
return []
# Keep prompt reasonable
kw_list = ", ".join(all_keywords[:2000])
sys_prompt = (
"You are a careful assistant. Given a context and a list of available keywords, "
f"pick 1–{top_k} {kind} keywords that are MOST relevant. "
"Output ONLY JSON, no commentary. Either a JSON array of strings, or {\"keywords\": [..]}."
)
user_prompt = (
f"Context (truncated):\n\n{context_text}\n\n"
f"Available {kind} keywords (choose at most {top_k}):\n{kw_list}\n\n"
"Return JSON only."
)
payload = {
"model": OPENAI_MODEL,
"input": [
{"role": "system", "content": [{"type": "input_text", "text": sys_prompt}]},
{"role": "user", "content": [{"type": "input_text", "text": user_prompt}]},
],
"store": False,
}
try:
r = requests.post(
OPENAI_URL,
headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"},
json=payload,
timeout=60,
)
if r.status_code != 200:
return []
data = r.json()
raw = (_extract_responses_output_text(data) or "").strip()
if not raw:
return []
# 1) Strict JSON parse
picked = safe_parse_json_keywords(raw)
# 2) Lenient fallback: literal mentions in model output
if not picked:
picked = _fallback_keywords_from_text(raw, all_keywords, top_k=top_k)
# Keep only those present in the known list (case-insensitive match)
aset = {str(k).lower(): k for k in all_keywords}
final: List[str] = []
for p in picked:
v = aset.get(str(p).lower())
if v and v not in final:
final.append(v)
return final[:top_k]
except Exception:
return []
def get_active_preset_content() -> Dict[str, str]:
name = ss.get("active_preset") or ""
return (ss["presets"].get(name) or {"summary": "", "actions": "", "qna": "", "rewrite": ""})
_load_demo_kb_if_empty()
# =========================
# Utility: views & context
# =========================
def db_all_messages() -> pd.DataFrame:
return ss["messages_df"]
def db_recent_messages(user_id: Optional[str]=None, minutes: int=1440) -> pd.DataFrame:
df = ss["messages_df"]
if df.empty:
return df
since = (datetime.utcnow() - timedelta(minutes=minutes)).replace(tzinfo=timezone.utc)
dtcol = pd.to_datetime(df["ts"], utc=True, errors="coerce")
mask = (dtcol >= since)
if user_id:
mask &= (df["user_id"] == user_id)
return df[mask].copy().sort_values(by="ts", ascending=True)
def db_distinct_users() -> List[str]:
df = ss["messages_df"]
if df.empty:
return []
return sorted([u for u in df["user_id"].dropna().unique() if str(u).strip()])
def resolve_user_label(user_id: str, df_all: pd.DataFrame) -> str:
# Prefer nickname from ss.user_index; else most recent non-empty display_name from df; else suffix of user_id.
nm = (ss.get("user_index", {}).get(user_id, {}) or {}).get("nickname", "").strip()
if nm:
base = nm
else:
dfx = df_all[df_all["user_id"] == user_id]
dn = ""
if not dfx.empty:
dn_series = dfx[dfx["display_name"].astype(str).str.strip() != ""].tail(1)["display_name"]
if not dn_series.empty:
dn = str(dn_series.iloc[0]).strip()
base = dn or user_id
suffix = user_id[-6:] if len(user_id) >= 6 else user_id
return f"{base} ({suffix})"
def split_sentences(text: str) -> List[str]:
# Rough splitter for JA/EN punctuation; keeps punctuation with sentence.
parts = re.split(r'(?<=[。.\.!\?!?])\s*', text)
return [p for p in parts if p.strip()]
def approx_tokens(s: str) -> int:
# crude but works cross-lang: ~4 chars per token on average
return max(1, int(len(s) / 4))
def last_context_with_sentence_guard(dfu: pd.DataFrame, max_tokens: int = 500) -> str:
# Build role: text lines from recent convo
lines = []
for _, r in dfu.tail(200).iterrows():
text_clean = str(r["text"]).strip().replace("\n", " ")
role = r.get("role", "user")
lines.append(f"{role}: {text_clean}")
blob = "\n".join(lines).strip()
if not blob:
return ""
# If already short, return
if approx_tokens(blob) <= max_tokens:
return blob
# Sentence guard: accumulate from end by full sentences
sents = split_sentences(blob)
picked = []
count = 0
for sent in reversed(sents):
picked.append(sent)
count += approx_tokens(sent)
if count >= max_tokens:
break
return "\n".join(reversed(picked)).strip()
# =========================
# Probes (optional)
# =========================
def _dns_check(host: str):
try:
infos = socket.getaddrinfo(host, 443, proto=socket.IPPROTO_TCP)
addrs = sorted({f"{i[4][0]}:{i[4][1]}" for i in infos})
return True, addrs
except Exception as e:
return False, [f"DNS error: {e}"]
def _http_probe(url: str, headers: dict | None = None):
try:
r = requests.get(url, headers=headers or {}, timeout=15)
return True, f"{r.status_code} {r.reason}", r.text[:300]
except Exception as e:
return False, str(e), ""
# =========================
# FOLLOW-UPS — D1 via Worker API (storage & helpers)
# =========================
import os, datetime as dt
from typing import Optional
import requests
import streamlit as st
# Assumes you already have these elsewhere:
# - ss = st.session_state
# - JPN_TZ = zoneinfo.ZoneInfo("Asia/Tokyo")
FOLLOWUPS_API_BASE = (os.getenv("FOLLOWUPS_API_BASE") or ss.get("worker_base") or "").rstrip("/")
# Optional: x-api-key auth fallback (only needed if you are NOT using Bearer token)
FOLLOWUPS_READ_KEY = os.getenv("FOLLOWUPS_READ_KEY", "")
FOLLOWUPS_WRITE_KEY = os.getenv("FOLLOWUPS_WRITE_KEY", "")
ss.setdefault("followups_cache", [])
ss.setdefault("followups_loaded", False)
ss.setdefault("followup_alert_modal_shown", False)
def _now_jst() -> dt.datetime:
return dt.datetime.now(JPN_TZ)
def _ensure_tz(dt_obj: dt.datetime) -> dt.datetime:
if dt_obj.tzinfo is None:
return dt_obj.replace(tzinfo=JPN_TZ)
return dt_obj
def _api_headers(required_scope: str) -> dict:
"""
required_scope: "read" or "write"
Prefers Bearer token if present, else x-api-key.
"""
h = {"content-type": "application/json"}
token = ss.get("api_token") or ss.get("token") or ss.get("worker_token") or ""
if token:
h["authorization"] = f"Bearer {token}"
return h
# fallback: x-api-key
if required_scope == "write" and FOLLOWUPS_WRITE_KEY:
h["x-api-key"] = FOLLOWUPS_WRITE_KEY
elif required_scope == "read" and FOLLOWUPS_READ_KEY:
h["x-api-key"] = FOLLOWUPS_READ_KEY
elif FOLLOWUPS_WRITE_KEY: # allow write key for reads too
h["x-api-key"] = FOLLOWUPS_WRITE_KEY
return h
def _api_get(path: str, params: Optional[dict] = None, scope: str = "read"):
if not FOLLOWUPS_API_BASE:
raise RuntimeError("FOLLOWUPS_API_BASE not set (and ss.worker_base is empty)")
url = f"{FOLLOWUPS_API_BASE}{path}"
return requests.get(url, headers=_api_headers(scope), params=params or {}, timeout=20)
def _api_post(path: str, payload: dict, scope: str = "write"):
if not FOLLOWUPS_API_BASE:
raise RuntimeError("FOLLOWUPS_API_BASE not set (and ss.worker_base is empty)")
url = f"{FOLLOWUPS_API_BASE}{path}"
return requests.post(url, headers=_api_headers(scope), json=payload, timeout=20)
def load_followups_into_state(force: bool = False):
if ss.get("followups_loaded") and not force:
return
try:
resp = _api_get("/api/followups/open", scope="read")
if not resp.ok:
st.warning(f"Follow-ups: API read error ({resp.status_code}): {resp.text[:200]}")
ss["followups_cache"] = []
ss["followups_loaded"] = True
return
data = resp.json() if resp.text else []
ss["followups_cache"] = data if isinstance(data, list) else []
ss["followups_loaded"] = True
except Exception as e:
st.warning(f"Follow-ups: load error ({e}).")
ss["followups_cache"] = []
ss["followups_loaded"] = True
def add_followup(user_id: str, due_dt: dt.datetime, note: str = "") -> str:
due_dt = _ensure_tz(due_dt)
payload = {"user_id": str(user_id), "due_iso": due_dt.isoformat(), "note": (note or "").strip()}
resp = _api_post("/api/followups/set", payload, scope="write")
if not resp.ok:
raise RuntimeError(f"Follow-up set failed ({resp.status_code}): {resp.text[:200]}")
out = resp.json() if resp.text else {}
load_followups_into_state(force=True)
return str(out.get("id") or "")
def resolve_followup(fid: str):
resp = _api_post("/api/followups/resolve", {"id": str(fid)}, scope="write")
if not resp.ok:
raise RuntimeError(f"Resolve failed ({resp.status_code}): {resp.text[:200]}")
load_followups_into_state(force=True)
def list_overdue_followups(now_dt: Optional[dt.datetime] = None) -> list:
now_dt = _ensure_tz(now_dt or _now_jst())
try:
resp = _api_get("/api/followups/overdue", params={"now_iso": now_dt.isoformat()}, scope="read")
if not resp.ok:
st.warning(f"Follow-ups: overdue query error ({resp.status_code}): {resp.text[:200]}")
return []
data = resp.json() if resp.text else []
return data if isinstance(data, list) else []
except Exception as e:
st.warning(f"Follow-ups: overdue query error ({e}).")
return []
def list_user_open_followups(user_id: str) -> list:
uid = str(user_id)
out = [it for it in ss.get("followups_cache", []) if str(it.get("user_id", "")) == uid and not bool(it.get("resolved", False))]
out.sort(key=lambda x: x.get("due_iso", ""))
return out
# =========================
# Worker API helpers (Responder Console)
# =========================
def auth_headers() -> dict:
return {"Authorization": f"Bearer {ss.token}"} if ss.token else {}
def api_get(path: str, params: dict | None = None):
base = ss.worker_base.strip().rstrip("/")
try:
return requests.get(f"{base}{path}", params=params, headers=auth_headers(), timeout=20)
except Exception as e:
return type("Obj", (), {"status_code": 599, "text": str(e)})
def api_post(path: str, json_data: dict):
base = ss.worker_base.strip().rstrip("/")
try:
h = {"Content-Type": "application/json"}; h.update(auth_headers())
return requests.post(f"{base}{path}", json=json_data, headers=h, timeout=20)
except Exception as e:
return type("Obj", (), {"status_code": 599, "text": str(e)})
# =========================
# Global CSS for chat bubbles
# =========================
st.markdown("""
""", unsafe_allow_html=True)
st.markdown("""
""", unsafe_allow_html=True)
# =========================
# PASSCODE GATE — Entire App (LOCKED base + brute-force throttle)
# =========================
st.markdown("🐨 Koala DX — Dashboard + Responder
", unsafe_allow_html=True)
# --- Brute-force throttle (per-session) ---
ss.setdefault("auth_fail_count", 0)
ss.setdefault("auth_block_until", 0.0)
def _auth_is_blocked_now() -> tuple[bool, int]:
now = time.time()
until = float(ss.get("auth_block_until", 0.0) or 0.0)
if now < until:
return True, int(until - now)
return False, 0
def _auth_register_fail():
# Exponential backoff up to 5 minutes (per session)
ss["auth_fail_count"] = int(ss.get("auth_fail_count", 0) or 0) + 1
delay = min(300, 2 ** min(ss["auth_fail_count"], 8)) # 2,4,8,... up to 256s, capped at 300
ss["auth_block_until"] = time.time() + delay
def _auth_register_success():
ss["auth_fail_count"] = 0
ss["auth_block_until"] = 0.0
# Force the locked base into session state (prevents tampering)
ss.worker_base = WORKER_BASE_DEFAULT
if not ss.token:
blocked, remain_s = _auth_is_blocked_now()
if blocked:
st.error(f"Too many attempts. Try again in ~{remain_s}s.")
st.stop()
with st.form("login_gate", clear_on_submit=False):
st.subheader("🔐 Sign in")
st.caption(f"Worker: {WORKER_BASE_DEFAULT} (locked)")
passcode = st.text_input("Passcode", type="password", placeholder="Enter READ or WRITE passcode")
submitted = st.form_submit_button("Sign in")
if submitted:
# Always add a tiny delay to make guessing slower
time.sleep(0.5)
base = WORKER_BASE_DEFAULT # 🔒 locked
try:
r = requests.post(
f"{base}/api/auth",
json={"passcode": passcode},
timeout=20,
allow_redirects=False, # 🔒 avoid redirect-to-attacker leakage
)
except Exception as e:
_auth_register_fail()
st.error(f"Network error: {e}")
st.stop()
if r.status_code != 200:
_auth_register_fail()
st.error(f"Auth failed: {r.text}")
st.stop()
# Success
try:
data = r.json()
except Exception:
_auth_register_fail()
st.error("Auth failed: invalid JSON response.")
st.stop()
ss.worker_base = base
ss.token = data.get("token", "") or ""
ss.scope = data.get("scope", "") or ""
if not ss.token:
_auth_register_fail()
st.error("Auth failed: missing token.")
st.stop()
_auth_register_success()
st.success(f"Signed in as {ss.scope.upper()} — {ss.worker_base}")
if rerun:
rerun()
st.stop()
# =========================
# FOLLOW-UPS — load & notify once per login (no st.modal)
# =========================
load_followups_into_state(force=False)
def _fmt_user_label(uid: str) -> str:
uid = (uid or "").strip()
rec = (ss.get("user_index") or {}).get(uid, {}) or {}
nick = (rec.get("nickname") or "").strip()
disp = (rec.get("display_name") or "").strip()
suffix = (rec.get("suffix") or (uid[-6:] if len(uid) >= 6 else uid))
name = nick or disp or f"unfollowed({suffix})"
return f"{name} ({suffix})"
def _fmt_due_jst(due_iso: str) -> str:
s = (due_iso or "").strip()
if not s:
return "(no due date)"
try:
d = dt.datetime.fromisoformat(s) # handles "+09:00" offsets too
if d.tzinfo is None:
d = d.replace(tzinfo=JPN_TZ)
else:
d = d.astimezone(JPN_TZ)
return d.strftime("%Y-%m-%d %H:%M")
except Exception:
# fallback: show raw if parsing fails
return s
if not ss["followup_alert_modal_shown"]:
overdue = list_overdue_followups()
if overdue:
st.warning("⏰ Follow-ups due — please review below.")
with st.expander("Follow-ups due (click to open)", expanded=True):
st.write("Resolve now or keep to be reminded again next login.")
st.divider()
for it in overdue:
uid = it.get("user_id", "")
fid = it.get("id")
note = (it.get("note", "") or "").strip()
due_pretty = _fmt_due_jst(it.get("due_iso", ""))
st.checkbox(
f"Resolve • {_fmt_user_label(uid)} • due={due_pretty} • note={note or '(no note)'}",
key=f"ck_resolve_{fid}",
value=False,
)
c1, c2 = st.columns(2)
with c1:
if st.button("✅ Update (resolve checked)", key="fu_update"):
any_change = False
for it in overdue:
fid = it.get("id")
if fid and st.session_state.get(f"ck_resolve_{fid}", False):
try:
resolve_followup(str(fid))
any_change = True
except Exception as e:
st.error(f"Resolve failed for {fid}: {e}")
if any_change:
st.success("Updated follow-ups.")
else:
st.info("No changes made.")
ss["followup_alert_modal_shown"] = True
if rerun: rerun()
with c2:
if st.button("🔁 Keep all for later", key="fu_keep"):
ss["followup_alert_modal_shown"] = True
st.info("Okay — you’ll be reminded again on next login.")
if rerun: rerun()
else:
ss["followup_alert_modal_shown"] = True
# =========================
# Sidebar — (visible only after login)
# =========================
with st.sidebar:
st.header("Data Fetch")
# --- Backend config (hidden from main UI) ---
# Keep these in env/secrets; not exposed in the sidebar UI
base_url = (ss.get("worker_base") or WORKER_BASE_DEFAULT).strip().rstrip("/")
api_key = READ_API_KEY_DEFAULT # keep in env/secrets
# --- Default: fetch past 1 year ---
now_jst = dt.datetime.now(JPN_TZ)
default_since_dt = (now_jst - dt.timedelta(days=365)).replace(hour=0, minute=0, second=0, microsecond=0)
# --- Inputs the user asked to keep ---
aes_key_in = st.text_input("AES key (base64, 16/24/32 bytes)", type="password")
limit = st.number_input("Limit", min_value=1, max_value=5000, value=int(FETCH_LIMIT_DEFAULT), step=100)
# Human-readable since date (JST) + computed since_ms (UTC epoch ms)
since_date = st.date_input("Since (JST date)", value=default_since_dt.date())
# Optional time control (kept minimal; defaults midnight JST)
since_time = dt.time(0, 0)
since_dt_jst = dt.datetime.combine(since_date, since_time).replace(tzinfo=JPN_TZ)
since_ms = int(since_dt_jst.astimezone(dt.timezone.utc).timestamp() * 1000)
st.caption(f"Computed since_ms: `{since_ms}` (UTC ms) · JST: {since_dt_jst.strftime('%Y-%m-%d %H:%M')}")
c1, c2 = st.columns([1.2, 1.0])
with c1:
if st.button("☁️ Fetch & Decrypt → Memory", type="primary"):
# Validate AES key and store into session key cache
kb = None
if aes_key_in:
try:
kb = _b64_to_bytes(aes_key_in.strip())
except Exception:
kb = None
if not kb or len(kb) not in (16, 24, 32):
st.error("Invalid AES key. Must be base64 for 16/24/32 bytes.")
else:
_set_session_key(kb)
# Recommended: overwrite to avoid duplicates on repeated fetches
ok, msg = import_worker_to_memory(
base_url=base_url,
api_key=api_key,
aes_key_b64=base64.b64encode(kb).decode(),
since_ms=int(since_ms),
limit=int(limit),
overwrite=True,
)
if ok:
st.success(msg)
st.toast("Worker import complete.", icon="✅")
else:
st.error(msg)
with c2:
if st.button("🧹 Clear memory"):
ss["messages_df"] = ss["messages_df"].iloc[0:0].copy()
ss["sec_cache"].clear()
st.success("Cleared in-memory data.")
st.divider()
# --- Backend imports kept but hidden/collapsed ---
with st.expander("Backend imports (CSV / preset.xlsx)", expanded=False):
st.subheader("CSV Import")
up = st.file_uploader("Upload CSV (user_id, role, text[, ts, display_name])", type=["csv"])
if st.button("📥 Import CSV → Memory"):
if up is None:
st.warning("Please choose a CSV file first.")
else:
ok, msg = import_csv_to_memory(up, overwrite=True)
(st.success if ok else st.error)(msg)
st.divider()
st.subheader("Presets from Excel")
preset_file = st.file_uploader("preset.xlsx", type=["xlsx"])
if st.button("📚 Load presets"):
if not preset_file:
st.warning("Select an .xlsx file first.")
else:
try:
data = preset_file.read()
presets, hotlines, qa_df, ra_df = load_presets_and_kb_from_xlsx(io.BytesIO(data))
ss["presets"] = presets
ss["hotlines"] = hotlines
ss["qa_rows"] = qa_df
ss["ra_rows"] = ra_df
ss["qa_keywords"] = unique_keywords_from_df(qa_df, "keyword")
ss["ra_keywords"] = unique_keywords_from_df(ra_df, "keyword")
if presets:
ss["active_preset"] = sorted(presets.keys())[0]
st.success(
f"Loaded {len(presets)} presets, {len(hotlines)} hotlines, "
f"{len(ss['qa_keywords'])} Q&A keywords, {len(ss['ra_keywords'])} RA keywords."
)
except Exception as e:
st.error(f"Failed to load presets: {e}")
if ss.get("presets"):
preset_names = sorted(ss["presets"].keys())
ss["active_preset"] = st.selectbox(
"Active preset",
options=preset_names,
index=max(
0,
preset_names.index(ss.get("active_preset", "")) if ss.get("active_preset", "") in preset_names else 0
)
)
tab_main, tab_stats = st.tabs(["🏠 Main", "📊 Stats"])
with tab_main:
# =========================
# Single-Page Layout — Left (Responder Console) | Right (Dashboard)
# =========================
left, right = st.columns([1, 1]) # left half for chat console
# ==== LEFT: Responder Console (uses Worker passcode token) =====================
with left:
st.subheader("💬 Responder Console")
st.caption(f"Signed in as {ss.scope.upper()} — {ss.worker_base}")
st.text_input("Responder name(担当者名)", key="responder", placeholder="e.g., Ken / 山田")
st.markdown("**Select a user**")
resp = api_get("/api/users")
if resp.status_code != 200:
st.error(f"Failed to load users: {resp.text}")
users = []
else:
users = resp.json() or []
# Build uid -> display info map (stable)
user_map = {}
uid_options = []
for row in users:
uid = (row.get("user_id") or "").strip()
if not uid:
continue
nickname = (row.get("nickname") or "").strip()
display = (row.get("display_name") or "").strip()
suffix = uid[-6:] if len(uid) >= 6 else uid
user_map[uid] = {"nickname": nickname, "display_name": display, "suffix": suffix}
uid_options.append(uid)
ss["user_index"] = user_map # persist for use on the right column & messages table
def _last_ts(uid: str) -> int:
for row in users:
if (row.get("user_id") or "").strip() == uid:
try:
return int(row.get("last_ts") or 0)
except Exception:
return 0
return 0
# Sort by last_ts desc (stable values, only order changes)
uid_options = sorted(uid_options, key=_last_ts, reverse=True)
def fmt_uid(uid: str) -> str:
rec = user_map.get(uid, {}) or {}
nickname = (rec.get("nickname") or "").strip()
display = (rec.get("display_name") or "").strip()
suffix = rec.get("suffix") or (uid[-6:] if len(uid) >= 6 else uid)
name = nickname or display or f"unfollowed({suffix})"
last_ts = _last_ts(uid)
return f"{name} ({suffix}) · last: {fmt_ts_ms_to_jst(last_ts)}"
if uid_options:
# Keep previous selection if possible
default_uid = ss.get("selected_user") if ss.get("selected_user") in uid_options else uid_options[0]
default_idx = uid_options.index(default_uid)
picked_uid = st.selectbox(
"Users",
options=uid_options,
index=default_idx,
format_func=fmt_uid,
key="user_select_uid", # IMPORTANT: stable widget key
)
ss.selected_user = picked_uid
ss.selected_user_label = fmt_uid(picked_uid)
ss.nickname_current = (user_map.get(ss.selected_user, {}) or {}).get("nickname") or ""
else:
st.info("No users yet.")
# Nickname editor
if ss.selected_user:
st.markdown("**Nickname (表示名・ニックネーム)**")
cols = st.columns([3,1])
with cols[0]:
new_nick = st.text_input("Set nickname for this user", value=ss.nickname_current, key="nick_input", placeholder="e.g., 山田さん / Yamada")
with cols[1]:
if st.button("Save"):
if ss.scope != "write":
st.warning("You are signed in with READ-only scope.")
else:
r = api_post("/api/nickname", {"user_id": ss.selected_user, "nickname": new_nick.strip() or None})
if r.status_code == 200:
st.success("Nickname updated ✅")
ss.nickname_current = new_nick.strip()
time.sleep(0.3)
if rerun: rerun()
else:
st.error(f"Failed to update nickname: {r.text}")
# Conversation
messages = []
if ss.selected_user:
cols = st.columns([1,1,2])
with cols[0]:
if st.button("↻ Refresh"):
ss.last_refresh = time.time()
if rerun: rerun()
with cols[1]:
st.caption("Auto-refresh ~20s")
if time.time() - ss.last_refresh > 20:
ss.last_refresh = time.time()
if rerun: rerun()
r = api_get("/api/messages", params={"user_id": ss.selected_user})
if r.status_code != 200:
st.error(f"Failed to load messages: {r.text}")
else:
messages = r.json() or []
# Auto-set unfollowed nickname if applicable
if ss.selected_user and ss.scope == "write":
uinfo = user_map.get(ss.selected_user, {}) if 'user_map' in locals() else {}
if (not (uinfo.get("nickname") or uinfo.get("display_name"))
and any((m.get("type") or "").lower().strip() == "unfollow" for m in messages)
and ss.auto_set_nick_once_for != ss.selected_user):
fallback_nick = f"unfollowed({uinfo.get('suffix','')})"
resp_set = api_post("/api/nickname", {"user_id": ss.selected_user, "nickname": fallback_nick})
if resp_set.status_code == 200:
ss.auto_set_nick_once_for = ss.selected_user
ss.nickname_current = fallback_nick
st.info(f"Nickname auto-set to **{fallback_nick}** for unfollowed user.")
time.sleep(0.3)
if rerun: rerun()
else:
st.warning(f"Attempted to auto-set unfollow nickname but failed: {resp_set.text}")
shown_label = ss.selected_user_label or ss.selected_user or "(none)"
st.markdown(f"**Conversation — {shown_label}**")
render_conversation_bubbles(messages, height_px=520, key=f"chat_{ss.selected_user or 'none'}")
# Send reply
if ss.selected_user:
st.markdown("**Send a reply**")
if ss.scope != "write":
st.info("You are signed in with READ-only scope. Replies are disabled.")
else:
reply = st.chat_input("Type your reply…")
if reply:
if not ss.responder.strip():
st.warning("Please enter your Responder name(担当者名)above before sending.")
else:
payload = {"user_id": ss.selected_user, "text": reply.strip(), "responder": ss.responder.strip()}
r = api_post("/api/messages", payload)
if r.status_code == 200:
try:
out = r.json() or {}
except Exception:
out = {}
status = (out.get("delivery_status") or "").lower().strip()
if status == "sent":
st.success("Sent ✅")
elif status == "quota":
st.warning("⚠️今月の無料送信数を超えています。手動でラインを送ってください.")
else:
st.error("Error ❌ (will auto-retry once after 10s)")
# Optional: refresh conversation so you can see delivery_status fields updated
if rerun: rerun()
else:
st.error(f"Send failed: {r.text}")
with st.expander("🔎 Debug log", expanded=False):
if st.button("Clear debug log", key="btn_clear_debug"):
ss["debug_log"] = []
st.code("\n".join(ss.get("debug_log", [])) or "(empty)")
# ==== RIGHT: Dashboard (Risk, Volunteer Assist with KB/LLM, then Messages) =====
SUMMARIZE_SYSTEM_PROMPT = (
"You are a careful support assistant for volunteer responders. "
"Write in Japanese. Be supportive, neutral, and anonymized. "
"Do not give medical/legal certainty. If self-harm risk is present, prioritize safety steps and hotlines."
)
with right:
st.subheader("⚠️ High/Medium Risk")
df_all = db_all_messages()
if not df_all.empty:
risky = df_all[df_all["risk_level"].isin(["HIGH","MEDIUM"])]
if risky.empty:
st.success("No risk flagged.")
else:
st.dataframe(
risky[["ts","user_id","display_name","text","risk_level"]].tail(200),
use_container_width=True, height=220
)
else:
st.info("No messages loaded yet.")
# --- Volunteer Assist target user (fix NameError: pick_uid) ---
pick_uid = (ss.get("selected_user") or "").strip()
ss["assist_pick_uid"] = pick_uid # keep follow-up section consistent
# Build context for summarization (prefer live Worker messages; fallback to in-memory)
ctx = ""
if pick_uid:
rr = api_get("/api/messages", params={"user_id": pick_uid})
if rr.status_code == 200:
msgs_ctx = rr.json() or []
lines = []
for m in msgs_ctx[-200:]:
role = (m.get("role") or "user").lower().strip()
text = str(m.get("text") or "").strip().replace("\n", " ")
if text:
lines.append(f"{role}: {text}")
blob = "\n".join(lines).strip()
# sentence-guard to ~500 tokens (uses your existing helpers)
if approx_tokens(blob) <= 500:
ctx = blob
else:
sents = split_sentences(blob)
picked = []
count = 0
for sent in reversed(sents):
picked.append(sent)
count += approx_tokens(sent)
if count >= 500:
break
ctx = "\n".join(reversed(picked)).strip()
else:
# fallback: in-memory df (only if you imported CSV/Worker→memory)
dfu = db_recent_messages(user_id=pick_uid, minutes=60 * 24 * 30)
ctx = last_context_with_sentence_guard(dfu, max_tokens=500)
if not pick_uid:
st.info("左側でユーザーを選択すると、要約/提案が有効になります。")
# ---- ① Conversation Summary (robust: updates text_area after rerun) ----
st.markdown("**① Conversation Summary**")
# Keys:
# - sum_widget_key: the actual widget key (Streamlit owns this once created)
# - sum_value_key: our backing store (safe to update any time)
# - sum_pending_key: a "mailbox" for updates that will be applied BEFORE widget instantiation
sum_value_key = f"summary_value_{pick_uid}"
sum_widget_key = f"summary_widget_{pick_uid}"
sum_pending_key = f"_pending_summary_{pick_uid}"
# Seed initial text (preset -> previous saved -> empty)
preset = get_active_preset_content()
out_map = (ss.get("llm_out", {}).get(pick_uid, {}) or {})
initial = (out_map.get("summary") or preset.get("summary") or "").strip()
if sum_value_key not in ss:
ss[sum_value_key] = initial
# ✅ Apply pending update BEFORE creating the widget (this is the crucial part)
if ss.get(sum_pending_key):
new_text = ss.pop(sum_pending_key)
ss[sum_value_key] = new_text
ss[sum_widget_key] = new_text # safe here because widget isn't created yet
ss.setdefault("llm_out", {}).setdefault(pick_uid, {})["summary"] = new_text
# Ensure widget key exists before first render
if sum_widget_key not in ss:
ss[sum_widget_key] = ss.get(sum_value_key, "")
# Render widget (after this point, DO NOT assign ss[sum_widget_key] directly)
summary_txt = st.text_area("Summary", key=sum_widget_key, height=150)
c_sum1, c_sum2 = st.columns([1, 1])
with c_sum1:
if st.button("🧠 Summarize (OpenAI)", key=f"btn_summarize_{pick_uid}"):
dbg("Summarize clicked")
if not OPENAI_API_KEY:
dbg("OPENAI_API_KEY missing")
st.error("OPENAI_API_KEY is not set.")
else:
dbg(f"OPENAI_URL={OPENAI_URL}")
dbg(f"OPENAI_MODEL={OPENAI_MODEL}")
dbg(f"ctx_chars={len(ctx or '')}")
# Optional DNS hint
try:
host = (OPENAI_URL or "").split("/")[2]
ok_dns, addrs = _dns_check(host)
dbg(f"dns_ok={ok_dns} addrs={addrs[:3]}")
except Exception as e:
dbg(f"dns_check_error={e}")
# ✅ Yes: the summarize system prompt is written here (explicit)
sys_prompt = (
"You are a careful assistant supporting volunteer responders for caregiver chats in Japan. "
"Summarize the conversation into 5–8 bullet points in Japanese. "
)
user_prompt = (
"Conversation context:\n\n"
f"{ctx}\n\n"
"Output only the summary bullets."
)
payload = {
"model": OPENAI_MODEL,
"input": [
{"role": "system", "content": [{"type": "input_text", "text": sys_prompt}]},
{"role": "user", "content": [{"type": "input_text", "text": user_prompt}]},
],
"store": False,
}
t0 = time.time()
with st.spinner("Calling OpenAI..."):
try:
r = requests.post(
OPENAI_URL,
headers={
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
},
json=payload,
timeout=60,
)
dt_s = time.time() - t0
dbg(f"openai_http_done status={r.status_code} elapsed={dt_s:.2f}s")
dbg(f"openai_resp_head={r.text[:300].replace(chr(10),' ')}")
if r.status_code != 200:
st.error(f"LLM error: {r.status_code}\n\n{r.text}")
else:
data = r.json()
content = (_extract_responses_output_text(data) or "").strip()
dbg(f"parsed_output_chars={len(content)}")
if not content:
st.warning("Got HTTP 200 but no text output was parsed. Check Debug log.")
else:
# ✅ DO NOT set ss[sum_widget_key] here (widget already exists)
# Instead, stash it and rerun so it applies BEFORE widget instantiation.
ss[sum_pending_key] = content
ss[sum_value_key] = content
ss.setdefault("llm_out", {}).setdefault(pick_uid, {})["summary"] = content
st.success("Summary updated.")
if rerun:
rerun()
except Exception as e:
dbg(f"openai_request_exception={repr(e)}")
st.exception(e)
with c_sum2:
if st.button("💾 Save current text", key=f"btn_save_summary_{pick_uid}"):
cur = (ss.get(sum_widget_key, "") or "").strip()
ss[sum_value_key] = cur
ss.setdefault("llm_out", {}).setdefault(pick_uid, {})["summary"] = cur
st.success("Saved to session.")
# ---- ② Recommended Actions (KB-driven) ----
st.markdown("**② Recommended Actions (from RA sheet via keywords)**")
ra_available = ss.get("ra_keywords", []) if not ss.get("ra_rows", pd.DataFrame()).empty else []
if not ra_available:
st.info("No RA sheet loaded. Add an 'RA' sheet with columns: keyword, action.")
# ✅ Make suggestion actually select the tags in the multiselect
ra_widget_key = f"ra_select_{pick_uid}" # this is the multiselect's key
ra_pending_key = f"_pending_ra_{pick_uid}" # pending mailbox (applied BEFORE widget)
# Apply pending suggestion before widget instantiation
if ra_pending_key in ss:
ss[ra_widget_key] = ss.pop(ra_pending_key) or []
c_ra1, c_ra2 = st.columns([1, 1])
# ensure the widget key exists (so first render is empty, no warning)
ss.setdefault(ra_widget_key, [])
with c_ra1:
pick_ra = st.multiselect(
"Pick action keywords",
options=ra_available,
key=ra_widget_key,
)
# (optional) show what LLM picked last time
last_ra = ss.get(f"_last_ra_suggest_{pick_uid}", [])
if last_ra:
st.caption("LLM selected: " + ", ".join(last_ra))
with c_ra2:
if st.button("🔎 Suggest Action Keywords (OpenAI)", key=f"btn_ra_{pick_uid}"):
if not OPENAI_API_KEY:
st.error("OPENAI_API_KEY is not set.")
elif not ra_available:
st.warning("No RA keywords loaded.")
else:
picked = llm_pick_keywords(ctx, ra_available, kind="action")
if not picked:
st.warning("No keywords suggested.")
else:
# ✅ write into pending → rerun so widget reflects it
ss[ra_pending_key] = picked
st.success(f"Suggested: {', '.join(picked)}")
if rerun:
rerun()
ra_selected_now = ss.get(ra_widget_key, []) or []
ss["ra_selected"] = ra_selected_now # keep your old state key if other code uses it
if ra_selected_now and not ss["ra_rows"].empty:
show = ss["ra_rows"][ss["ra_rows"]["keyword"].isin(ra_selected_now)].copy()
if show.empty:
st.info("No rows match the selected keywords.")
else:
st.write("**Actions**")
for kw in ra_selected_now:
sub = show[show["keyword"] == kw]
if not sub.empty:
with st.expander(f"Keyword: {kw} · {len(sub)} item(s)", expanded=True):
for _, r in sub.iterrows():
st.markdown(f"- {r['action']}")
else:
st.caption("Pick 1–3 keywords (or use the suggestion button).")
# ---- ③ Relevant Q&A Snippets (KB-driven) ----
st.markdown("**③ Relevant Q&A (from Q&A sheet via keywords)**")
qa_available = ss.get("qa_keywords", []) if not ss.get("qa_rows", pd.DataFrame()).empty else []
if not qa_available:
st.info("No Q&A sheet loaded. Add a 'Q&A' (or 'QA') sheet with columns: keyword, question, answer.")
# ✅ Same fix for Q&A multiselect
qa_widget_key = f"qa_select_{pick_uid}"
qa_pending_key = f"_pending_qa_{pick_uid}"
if qa_pending_key in ss:
ss[qa_widget_key] = ss.pop(qa_pending_key) or []
c_q1, c_q2 = st.columns([1, 1])
ss.setdefault(qa_widget_key, [])
with c_q1:
pick_qa = st.multiselect(
"Pick Q&A keywords",
options=qa_available,
key=qa_widget_key,
)
last_qa = ss.get(f"_last_qa_suggest_{pick_uid}", [])
if last_qa:
st.caption("LLM selected: " + ", ".join(last_qa))
with c_q2:
if st.button("🔎 Suggest Q&A Keywords (OpenAI)", key=f"btn_qa_{pick_uid}"):
if not OPENAI_API_KEY:
st.error("OPENAI_API_KEY is not set.")
elif not qa_available:
st.warning("No Q&A keywords loaded.")
else:
picked = llm_pick_keywords(ctx, qa_available, kind="Q&A")
if not picked:
st.warning("No keywords suggested.")
else:
ss[qa_pending_key] = picked
st.success(f"Suggested: {', '.join(picked)}")
if rerun:
rerun()
qa_selected_now = ss.get(qa_widget_key, []) or []
ss["qa_selected"] = qa_selected_now
if qa_selected_now and not ss["qa_rows"].empty:
show = ss["qa_rows"][ss["qa_rows"]["keyword"].isin(qa_selected_now)].copy()
if show.empty:
st.info("No Q&A rows match the selected keywords.")
else:
for kw in qa_selected_now:
sub = show[show["keyword"] == kw]
if not sub.empty:
with st.expander(f"Keyword: {kw} · {len(sub)} Q&A", expanded=True):
for _, r in sub.iterrows():
q = str(r.get("question", "")).strip()
a = str(r.get("answer", "")).strip()
if q or a:
st.markdown(f"**Q:** {q if q else '(no question)'}")
st.markdown(f"**A:** {a if a else '(no answer)'}")
st.markdown("---")
else:
st.caption("Pick 1–3 keywords (or use the suggestion button).")
# Hotlines from Excel (optional)
if ss.get("hotlines"):
st.markdown("**Hotlines / External Resources**")
for h in ss["hotlines"]:
parts = [h.get("name", "")]
if h.get("dial"):
parts.append(h.get("dial"))
elif h.get("url"):
parts.append(h.get("url"))
if h.get("hours"):
parts.append(f"({h['hours']})")
if h.get("note"):
parts.append(f"- {h['note']}")
st.write(" ".join([p for p in parts if p]))
st.markdown("---")
st.markdown("**Draft a reply → Safety check & rewrite (preset)**")
draft = st.text_area(
"Your draft (before sending)",
height=120,
placeholder="(例)昨夜は本当に大変でしたね…。今はどのあたりにいらっしゃいますか?まず安全を一緒に確認させてください。",
)
if st.button("🛡️ Apply Preset Rewrite", key=f"rewrite_{pick_uid}"):
rewrite = get_active_preset_content().get("rewrite", "").strip()
st.text_area("Rewritten (safer)", (rewrite or draft), height=120, key=f"rewritten_{pick_uid}")
st.toast("Preset rewrite applied.", icon="✅")
# ---- ⏰ Follow-up (per-user) ----
st.markdown("**⏰ Follow-up**")
if users:
# we already computed pick_uid earlier in Volunteer Assist
if "assist_pick_uid" in ss and ss["assist_pick_uid"]:
fu_uid = ss["assist_pick_uid"]
mode = st.radio("When", ["In X days", "Pick date & time (JST)"], horizontal=True, key=f"fu_mode_{fu_uid}")
note = st.text_input("Optional note", key=f"fu_note_{fu_uid}", placeholder="e.g., confirm safety plan / share clinic info")
if mode == "In X days":
xdays = st.number_input("X days", min_value=1, max_value=60, value=2, step=1, key=f"fu_days_{fu_uid}")
if st.button("➕ Add follow-up", key=f"fu_add_days_{fu_uid}"):
due = _now_jst() + dt.timedelta(days=int(xdays))
fid = add_followup(fu_uid, due, note)
st.success(f"Follow-up added for {due.strftime('%Y-%m-%d %H:%M')} JST (id={fid}).")
else:
today = _now_jst().date() + dt.timedelta(days=2)
d = st.date_input("Date (JST)", value=today, key=f"fu_date_{fu_uid}")
t = st.time_input("Time (JST)", value=dt.time(10, 0), key=f"fu_time_{fu_uid}")
if st.button("➕ Add follow-up", key=f"fu_add_dt_{fu_uid}"):
due_dt = dt.datetime.combine(d, t).replace(tzinfo=JPN_TZ)
fid = add_followup(fu_uid, due_dt, note)
st.success(f"Follow-up added for {due_dt.strftime('%Y-%m-%d %H:%M')} JST (id={fid}).")
# List open follow-ups for this user with resolve buttons
open_items = list_user_open_followups(fu_uid)
if open_items:
st.caption("Open follow-ups for selected user:")
for it in open_items:
due_local = it["due_iso"]
short_note = it.get("note","")
cols = st.columns([3,5,2])
cols[0].markdown(f"**Due:** {due_local}")
cols[1].markdown(f"**Note:** {short_note or '(no note)'}")
if cols[2].button("Resolve", key=f"fu_resolve_{it['id']}"):
resolve_followup(it["id"])
st.toast("Follow-up resolved.", icon="✅")
if rerun: rerun()
else:
st.caption("No open follow-ups for this user.")
else:
st.info("Select a user above to add a follow-up.")
st.markdown("---")
# ---- 🔎 Conversation Search (Tag or Text) ----
st.subheader("🔎 Conversation Search")
# Helpers (local to this UI block)
def _tag_options_from_kb() -> list[str]:
# Prefer explicit tag sheet later; for now derive from Q&A + RA keywords
qa = ss.get("qa_keywords", []) or []
ra = ss.get("ra_keywords", []) or []
uniq = sorted({x.strip() for x in (qa + ra) if str(x).strip()})
return uniq[:500]
def _compile_pattern_from_tags(tags: list[str]) -> re.Pattern | None:
if not tags:
return None
# Simple OR of literal tags; if you later add a Tags sheet with custom patterns, plug them here.
safe = [re.escape(t) for t in tags if str(t).strip()]
if not safe:
return None
return re.compile(r"(" + "|".join(safe) + r")", flags=re.IGNORECASE)
def _compile_pattern_from_text(q: str, mode: str) -> re.Pattern | None:
q = str(q or "").strip()
if not q:
return None
if mode == "Exact (literal)":
return re.compile(re.escape(q), flags=re.IGNORECASE)
# "Any word" → split on whitespace, OR them
parts = [re.escape(p) for p in re.split(r"\s+", q) if p.strip()]
if not parts:
return None
return re.compile(r"(" + "|".join(parts) + r")", flags=re.IGNORECASE)
def _search_with_context(
df: pd.DataFrame,
pat: re.Pattern,
context: int = 2,
max_snippets: int = 30,
since_minutes: int | None = None,
) -> list[dict]:
"""
Returns a list of snippets:
{ 'user_id':..., 'rows': DataFrame segment, 'hit_idx_in_segment': int }
Each snippet is a contiguous slice around a hit within a single user's thread.
"""
if df.empty or pat is None:
return []
# Optional time window
work = df.copy()
if since_minutes:
since = (datetime.utcnow() - timedelta(minutes=int(since_minutes))).replace(tzinfo=timezone.utc)
tsdt = pd.to_datetime(work["ts"], utc=True, errors="coerce")
work = work[tsdt >= since]
# Sort for stable context slicing
work = work.sort_values(by=["user_id", "ts"]).reset_index(drop=True)
out: list[dict] = []
for uid, g in work.groupby("user_id", sort=False):
g = g.reset_index(drop=True)
texts = g["text"].astype(str).fillna("")
for i, txt in enumerate(texts):
if not txt:
continue
if pat.search(txt):
start = max(0, i - context)
end = min(len(g) - 1, i + context)
seg = g.iloc[start:end+1].copy()
out.append({
"user_id": uid,
"rows": seg,
"hit_idx_in_segment": i - start,
})
if len(out) >= max_snippets:
return out
return out
# UI controls
scope_col, ctx_col, lim_col = st.columns([1.1, 0.9, 0.9])
with scope_col:
mode = st.radio("Mode", ["Text", "Tag"], horizontal=True, key="conv_search_mode")
with ctx_col:
ctx_messages = st.slider("Context window (msgs)", min_value=1, max_value=5, value=2, step=1)
with lim_col:
max_snips = st.slider("Max snippets", min_value=5, max_value=100, value=30, step=5)
timewin = st.selectbox("Time window", ["All", "Last day", "Last week", "Last month"], index=1)
since_map = {"All": None, "Last day": 60*24, "Last week": 60*24*7, "Last month": 60*24*30}
since_minutes = since_map.get(timewin)
pat = None
if mode == "Tag":
tag_opts = _tag_options_from_kb()
if not tag_opts:
st.info("タグ情報がありません(Q&A/RAのキーワードを読み込むとタグ検索が使えます)。")
tag_select = []
else:
tag_select = st.multiselect("Tags (1–3推奨)", options=tag_opts, default=[], key="conv_search_tags")
do_search = st.button("🔎 Search by Tag")
if do_search:
pat = _compile_pattern_from_tags(tag_select)
else:
text_q = st.text_input("Text query", value="", placeholder="例:育児 不安 眠れない 里帰り など…")
text_mode = st.radio("Match", ["Any word", "Exact (literal)"], horizontal=True, key="conv_search_textmode")
do_search = st.button("🔎 Search Text")
if do_search:
pat = _compile_pattern_from_text(text_q, text_mode)
# Run search
if do_search:
if pat is None:
st.warning("検索条件を入力してください。")
else:
hits = _search_with_context(df_all, pat, context=ctx_messages, max_snippets=max_snips, since_minutes=since_minutes)
if not hits:
st.info("該当する会話は見つかりませんでした。")
else:
st.write(f"**{len(hits)} 件ヒット**(最大 {max_snips} 件を表示)")
# Render each snippet as a small conversation block
for j, snip in enumerate(hits, start=1):
uid = snip["user_id"]
seg = snip["rows"]
hit_rel = snip["hit_idx_in_segment"]
# Header
header = f"[{j}] {resolve_user_label(uid, df_all)}"
t0 = str(seg.iloc[0]["ts"])
t1 = str(seg.iloc[-1]["ts"])
st.markdown(f"**{header}** `{t0} 〜 {t1}`")
# Show bubbles (reuse your chat style)
for k, r in seg.iterrows():
role = str(r.get("role","user")).lower().strip()
text = str(r.get("text","")).strip()
ts_str = str(r.get("ts",""))
is_hit = (k == seg.index[0] + hit_rel)
# Highlight the matching segment lightly
if is_hit and text:
try:
text = re.sub(pat, r"**\g<0>**", text)
except Exception:
pass
is_system = False
msg_type = "" # not stored in df_all; we only have text/role
effective_text = text
if not effective_text and msg_type:
effective_text = f"[{msg_type}]"
is_system = True
if is_system:
align = "center"; bubble_cls = "system"; meta = ts_str
else:
align = "left" if role == "user" else "right"
bubble_cls = "user" if role == "user" else "account"
meta = ts_str
st.markdown(
f'',
unsafe_allow_html=True
)
st.markdown("---")
# ── 📡 Messages (in memory) — moved to the lowest part (under rewrite) ─────
st.subheader("📡 Messages Export")
if df_all.empty:
st.info("No messages loaded. Use Cloud Pull or CSV import from the sidebar.")
else:
# Attach nickname column by mapping from ss.user_index
idx_map = ss.get("user_index", {})
nick_col = df_all["user_id"].map(lambda u: (idx_map.get(u, {}) or {}).get("nickname", "") if isinstance(u, str) else "")
view = df_all.copy()
insert_pos = view.columns.get_loc("display_name")+1 if "display_name" in view.columns else 2
view.insert(insert_pos, "nickname", nick_col)
view = view[["ts","user_id","display_name","nickname","role","text","risk_level"]]
st.dataframe(view.tail(300), use_container_width=True, height=360)
st.caption("⚠️ Demo only. Not a clinical tool. For emergencies in Japan, call local hotlines or 119.")
with tab_stats:
render_stats_tab(db_all_messages(), ss)