# app.py # Koala DX — Single-Page In-Memory Dashboard + Responder Console (Passcode-gated) # - Sources: # (A) Cloudflare Worker (encrypted AES-GCM) pull → decrypt → in-memory # (B) CSV upload fallback (reads into memory only) # - Presets & KB: # - Load from a single Excel: preset.xlsx # - Each preset is a separate sheet (sheet name = preset name). # Expected columns (flexible): summary, actions, qna, rewrite # Alternative format: rows with columns [key, value] # - Optional sheet "Hotlines" with columns: name, dial/url, hours, note # - Q&A sheet: "Q&A" (or "QA") with columns: keyword, question, answer # - RA sheet: "RA" with columns: keyword, action (or actions) # - Volunteer Assist (Summary / Actions / Q&A): # - Uses OPENAI_API_KEY from environment (HF Secret) and OPENAI_MODEL (default: gpt-4o-mini) # - Sends last ~500 tokens of context (won’t cut sentences) # - For Actions/Q&A: LLM picks 1–3 relevant keywords (JSON) and we display rows from Excel. import os import re import io import time import json import base64 import socket import datetime as dt from datetime import datetime, timedelta, timezone from typing import List, Dict, Tuple, Optional from urllib.parse import urlparse import requests import pandas as pd import streamlit as st from cryptography.hazmat.primitives.ciphers.aead import AESGCM from streamlit.components.v1 import html as st_html import html as _pyhtml # for safe escaping of message text from stats_tab import render_stats_tab # ── Streamlit config MUST be the first Streamlit call ────────────────────────── st.set_page_config(page_title="Koala DX — Dashboard + Responder", layout="wide") # ── Compat helper for Streamlit rerun ────────────────────────────────────────── rerun = getattr(st, "rerun", None) or getattr(st, "experimental_rerun", None) # ── JST timezone for console timestamps ─────────────────────────────────────── try: from zoneinfo import ZoneInfo # Python 3.9+ JPN_TZ = ZoneInfo("Asia/Tokyo") except Exception: JPN_TZ = dt.timezone(dt.timedelta(hours=9), name="JST") def fmt_ts_ms_to_jst(ms): try: utc_dt = dt.datetime.fromtimestamp(int(ms) / 1000, tz=dt.timezone.utc) jst_dt = utc_dt.astimezone(JPN_TZ) return jst_dt.strftime("%Y-%m-%d %H:%M") except Exception: return str(ms) def _esc(s: str) -> str: return _pyhtml.escape(str(s or "")) def render_conversation_bubbles(messages: list[dict], height_px: int = 520, key: str | None = None) -> None: """ Render all messages into a single scrollable HTML container (inside the iframe), with its own CSS so formatting is preserved and the area always scrolls. """ rows = [] SYSTEM_TYPES = {"follow", "unfollow", "join", "leave", "block", "unblock", "member_joined", "member_left"} for m in messages: role = (m.get("role") or "").lower().strip() text = (m.get("text") or "").strip() msg_type = (m.get("type") or "").lower().strip() ts_str = fmt_ts_ms_to_jst(m.get("ts") or m.get("ts_ms")) responder = (m.get("responder") or "").strip() is_system = False effective_text = text if not effective_text and msg_type: effective_text = f"[{msg_type}]" is_system = True if is_system: align = "center"; bubble_cls = "system"; meta = _esc(ts_str) else: align = "left" if role == "user" else "right" bubble_cls = "user" if role == "user" else "account" meta = _esc(ts_str + (f" · {responder}" if role != "user" and responder else "")) rows.append( f'
' f'
{_esc(effective_text)}' f'
{meta}
' f'
' f'
' ) block_id = (key or "chat") + "_scroll" css = f""" """ html_block = f""" {css}
{''.join(rows) if rows else '
(メッセージはまだありません)
'}
""" # Some Streamlit builds don’t support key= on html(); try with key then without. try: if key is not None: st_html(html_block, height=height_px + 24, scrolling=False, key=key) else: st_html(html_block, height=height_px + 24, scrolling=False) except TypeError: st_html(html_block, height=height_px + 24, scrolling=False) # Debug log (keep last 200 lines) # ---- Session state handle (must exist before any ss.* usage) ---- ss = st.session_state ss.setdefault("debug_log", []) def dbg(msg: str): try: ts = dt.datetime.now(JPN_TZ).strftime("%Y-%m-%d %H:%M:%S") except Exception: ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") ss["debug_log"].append(f"[{ts}] {msg}") ss["debug_log"] = ss["debug_log"][-200:] # ========================= # Defaults / Config (env overrides optional) # ========================= # 🔒 Locked Worker base for PUBLIC HF Space (do not take from user input) WORKER_BASE_LOCKED = os.getenv("WORKER_BASE_LOCKED", "https://c93a29b9.koalarely-alt.pages.dev/").strip() def _normalize_and_lock_worker_base(url: str) -> str: u = urlparse((url or "").strip()) if u.scheme != "https": raise ValueError("WORKER_BASE_LOCKED must be https.") host = (u.hostname or "").lower() if not host: raise ValueError("WORKER_BASE_LOCKED missing hostname.") if u.username or u.password: raise ValueError("Credentials in WORKER_BASE_LOCKED are not allowed.") if u.port not in (None, 443): raise ValueError("Unexpected port in WORKER_BASE_LOCKED.") # Important: drop any path/query/fragment return f"https://{host}" try: WORKER_BASE_DEFAULT = _normalize_and_lock_worker_base(WORKER_BASE_LOCKED) except Exception as e: # Fail closed: if misconfigured, don't run against arbitrary URLs raise RuntimeError(f"Invalid WORKER_BASE_LOCKED: {e}") # Optional (still secret): used only for /messages pull (NOT responder console) READ_API_KEY_DEFAULT = os.getenv("READ_API_KEY", "") SINCE_MS_DEFAULT = int(os.getenv("SINCE_MS", "0")) FETCH_LIMIT_DEFAULT = int(os.getenv("FETCH_LIMIT", "1000")) MAX_CTX_CHARS = int(os.getenv("MAX_CTX_CHARS", "3500")) KEY_TTL_MIN = int(os.getenv("D1_KEY_TTL_MIN", "30")) # AES key session TTL (minutes) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5-nano") OPENAI_URL = os.getenv("OPENAI_URL", "https://api.openai.com/v1/responses") def _extract_responses_output_text(resp_json: dict) -> str: """ Extract assistant text from Responses API JSON. Works for: - SDK-like helper field: resp_json["output_text"] (if present) - Raw HTTP shape: resp_json["output"] items with message.content[type=output_text] """ if not isinstance(resp_json, dict): return "" # Some clients may include this helper-like field ot = resp_json.get("output_text") if isinstance(ot, str) and ot.strip(): return ot.strip() out_chunks: list[str] = [] for item in (resp_json.get("output") or []): if not isinstance(item, dict): continue if item.get("type") != "message": continue for c in (item.get("content") or []): if not isinstance(c, dict): continue if c.get("type") == "output_text": t = c.get("text") if isinstance(t, str) and t: out_chunks.append(t) return "".join(out_chunks).strip() def _trim(s: str, limit: int = MAX_CTX_CHARS) -> str: s = s or "" return s if len(s) <= limit else s[-limit:] # ========================= # Session state (in-memory DB + auth + presets) # ========================= ss = st.session_state ss.setdefault("messages_df", pd.DataFrame(columns=["ts", "user_id", "display_name", "role", "text", "risk_level", "risk_terms"])) ss.setdefault("sec_cache", {}) ss.setdefault("worker_base", WORKER_BASE_DEFAULT) ss.setdefault("token", "") ss.setdefault("scope", "") ss.setdefault("responder", "") ss.setdefault("selected_user", "") ss.setdefault("selected_user_label", "") ss.setdefault("nickname_current", "") ss.setdefault("last_refresh", 0.0) ss.setdefault("auto_set_nick_once_for", "") ss.setdefault("presets", {}) # {sheet_name: {"summary":..., "actions":..., "qna":..., "rewrite":...}} ss.setdefault("hotlines", []) # [{"name":...,"dial":...,"url":...,"hours":...,"note":...}] ss.setdefault("active_preset", "") # selected sheet name ss.setdefault("user_index", {}) # user_id -> {"nickname":..., "display_name":..., "suffix":...} ss.setdefault("llm_out", {}) # per-user outputs: {"uid": {"summary":"...", "actions":"...", "qna":"..."}} # KB state ss.setdefault("qa_rows", pd.DataFrame(columns=["keyword","question","answer"])) ss.setdefault("ra_rows", pd.DataFrame(columns=["keyword","action"])) ss.setdefault("qa_keywords", []) ss.setdefault("ra_keywords", []) ss.setdefault("qa_selected", []) ss.setdefault("ra_selected", []) # ========================= # Demo KB defaults (used when preset.xlsx not loaded) # ========================= DEFAULT_PRESET_NAME = "Demo (Built-in)" DEFAULT_PRESETS = { DEFAULT_PRESET_NAME: { "summary": ( "" ), "actions": ( "(デモ用)\n" "- まず安全確認(今いる場所・加害者が近くにいるか・怪我の有無)\n" "- 今夜の居場所/食事/お金/通信手段の確保\n" "- 相談先(自治体・支援団体・医療)へつなぐ\n" ), "qna": ( "(デモ用)\n" "- すぐに役立つQ&Aの候補を表示します(キーワード選択)\n" ), "rewrite": ( "(デモ用・安全寄りの言い換えテンプレ)\n" "まず、ここまで一人で抱えてきたこと自体がとても大変でしたね。\n" "今の安全を確認したいです。今いらっしゃる場所は安全ですか?(加害者や危険な人は近くにいますか?)\n" "もし今すぐ危険がある/怪我がある場合は、ためらわずに緊急連絡(119/110)も選択肢です。\n" "差し支えなければ、①年齢 ②妊娠週数(分かれば) ③同居/家族状況 ④今日困っていること(お金/住まい/暴力/体調)を教えてください。\n" ), } } # Q&A demo rows (keyword, question, answer) DEFAULT_QA_ROWS = [ # Underage pregnancy { "keyword": "未成年の妊娠", "question": "未成年で妊娠したかもしれません。まず何をすればいい?", "answer": ( "不安でいっぱいになりますよね。まずは妊娠の確認(検査薬/産婦人科)と、体調の安全確保が優先です。\n" "次に、信頼できる大人(家族・学校の養護教諭・自治体の相談窓口・支援団体)に早めに相談して、" "選択肢(継続/中断/出産後の支援)を一緒に整理するのが安全です。" ), }, { "keyword": "未成年の妊娠", "question": "親に言えない・怒られそうで怖い", "answer": ( "言い出しにくいのは自然です。まずは“安全に相談できる大人”を確保しましょう。\n" "学校(養護教諭/スクールカウンセラー)や自治体の女性相談など、家族以外の窓口から入るのも方法です。" ), }, # DV / domestic abuse { "keyword": "DV・家庭内暴力", "question": "パートナーが怖い。暴力や脅しがあるかもしれない", "answer": ( "怖い中で相談してくれてありがとう。最優先は“今の安全”です。\n" "加害者が近くにいる/今すぐ危険なら緊急連絡(110/119)も選択肢です。\n" "可能なら、①今いる場所は安全か ②怪我の有無 ③避難できる先(友人/家族/支援)を一緒に確認しましょう。" ), }, { "keyword": "DV・家庭内暴力", "question": "スマホを見られているかも。連絡すると危ない", "answer": ( "それは重要なサインです。安全のため、履歴が残りにくい手段(別端末/安全な場所のWi-Fi/短いやり取り)を検討しましょう。\n" "“いま危険かどうか”の確認を優先し、避難計画(荷物・身分証・現金・連絡先)を最小限で整えるのも有効です。" ), }, # Financial hardship { "keyword": "お金がない・生活困窮", "question": "お金がなくて病院や生活が不安。どうしたらいい?", "answer": ( "つらい状況ですね。まず“今日〜今週を乗り切る支え”を確保しましょう。\n" "自治体の相談(福祉/生活支援)、妊娠中なら母子保健の窓口、食料や物資支援の団体など、使える制度があります。\n" "差し支えなければ、①住まい ②収入状況 ③妊娠週数 ④今いちばん足りないもの(食費/家賃/交通/受診)を教えてください。" ), }, # Mental health { "keyword": "メンタル不調・不安", "question": "不安で眠れない、涙が止まらない。どうしたらいい?", "answer": ( "しんどい中でよく話してくれました。まず“いま少し楽になること”を一緒に探しましょう。\n" "睡眠や食事が崩れている時は、短い休息(呼吸/水分/温かい飲み物)→支援につなぐ順が安全です。\n" "もし自分を傷つけたい気持ちが強い/今すぐ危険なら、緊急連絡や身近な人への連絡を優先してください。" ), }, # Housing / safety { "keyword": "住まい・避難", "question": "家にいられない。今夜の居場所がない", "answer": ( "それはとても切迫しています。まず今夜の安全な居場所を確保することが最優先です。\n" "自治体の窓口や支援団体、状況によっては緊急の避難先(保護)につながる可能性があります。\n" "今いる場所の安全と、移動できる手段(交通費・連絡手段)を確認しましょう。" ), }, ] # RA demo rows (keyword, action) DEFAULT_RA_ROWS = [ # Underage pregnancy {"keyword": "未成年の妊娠", "action": "妊娠確認(検査薬/産婦人科)と体調の安全確保を優先する"}, {"keyword": "未成年の妊娠", "action": "安全に相談できる大人(学校/自治体/支援団体)を確保する"}, {"keyword": "未成年の妊娠", "action": "選択肢(継続/中断/出産後支援)を“急がせず”整理する"}, # DV {"keyword": "DV・家庭内暴力", "action": "今すぐ危険か確認(加害者が近い/怪我/監視)。緊急なら110/119も選択肢"}, {"keyword": "DV・家庭内暴力", "action": "避難計画:身分証・現金・薬・連絡先を最小限で準備"}, {"keyword": "DV・家庭内暴力", "action": "デジタル安全:履歴が残りにくい連絡手段を提案(状況に応じて)"}, # Money {"keyword": "お金がない・生活困窮", "action": "今日〜今週の優先順位を確認(食事/家賃/受診/交通)"}, {"keyword": "お金がない・生活困窮", "action": "自治体の福祉・母子保健・生活支援につなぐ(制度の案内)"}, {"keyword": "お金がない・生活困窮", "action": "緊急物資/食料支援の選択肢を提示(地域により差あり)"}, # Mental {"keyword": "メンタル不調・不安", "action": "睡眠/食事/安全の簡易チェック(緊急性の見立て)"}, {"keyword": "メンタル不調・不安", "action": "自傷の意図がある場合は安全確保を最優先(身近な人/緊急連絡/相談窓口)"}, {"keyword": "メンタル不調・不安", "action": "医療/相談機関につなぐ前に、短い落ち着ける行動(呼吸/水分/休息)を提案"}, # Housing {"keyword": "住まい・避難", "action": "今夜の居場所の確保を最優先(安全な場所へ移動できるか確認)"}, {"keyword": "住まい・避難", "action": "避難先候補(友人/家族/支援)と移動手段の確認"}, ] DEFAULT_HOTLINES = [ {"name": "緊急", "dial": "110 / 119", "url": "", "hours": "24h", "note": "今すぐ危険・怪我がある場合"}, {"name": "自治体の女性相談・母子保健(例)", "dial": "", "url": "", "hours": "", "note": "地域の窓口へ(市区町村で名称が異なります)"}, ] def _demo_unique_keywords(df: pd.DataFrame, col: str = "keyword", cap: int = 500) -> List[str]: if df is None or df.empty or (col not in df.columns): return [] kws = sorted({str(x).strip() for x in df[col].tolist() if str(x).strip()}) return kws[:cap] def _load_demo_kb_if_empty(): """ Load built-in demo KB only when nothing is loaded from preset.xlsx yet. - If user later loads Excel, the Excel KB will overwrite these. """ if ss.get("_demo_kb_bootstrapped", False): return # Presets if not ss.get("presets"): ss["presets"] = dict(DEFAULT_PRESETS) ss["active_preset"] = DEFAULT_PRESET_NAME # Hotlines if not ss.get("hotlines"): ss["hotlines"] = list(DEFAULT_HOTLINES) # Q&A if ss.get("qa_rows") is None or ss["qa_rows"].empty: ss["qa_rows"] = pd.DataFrame(DEFAULT_QA_ROWS) ss["qa_keywords"] = _demo_unique_keywords(ss["qa_rows"], "keyword") # RA if ss.get("ra_rows") is None or ss["ra_rows"].empty: ss["ra_rows"] = pd.DataFrame(DEFAULT_RA_ROWS) ss["ra_keywords"] = _demo_unique_keywords(ss["ra_rows"], "keyword") ss["_demo_kb_bootstrapped"] = True # Call once at startup (after session_state defaults exist) # ========================= # Risk detection # ========================= # ========================= # Risk detection patterns (JP) — expanded for Koala DX dummy wording # ========================= # HIGH: できるだけ「明確な自傷・自殺意図」「切迫」「直近の実行を示唆」「重篤な緊急症状」を中心に RISK_PATTERNS_HIGH = [ # 明確な希死念慮/自殺表明 r"死にたい", r"自殺(する|したい)", r"消えたい", r"生きていたくない", r"もう生きられない", r"もう終わりにしたい", r"楽になりたい.*(死|消)", r"天国(に|へ)行きたい", # 具体性・切迫感(時間/今すぐ 等) r"(今すぐ|今日|今夜|これから).*(死|自殺|消え|終わり)", r"(最後|遺書|さよなら).*(書|残|言)", r"(もう|ほんとに).*(限界|無理).*(死|消え|終わり)", # 自傷(明示) r"リスカ", r"自傷(する|した)", r"(切った|切ってしまった|傷(を)?作った)", r"(血が|出血).*(止まらない|止まらん|やばい)", r"(意識が|気を)失(った|いそう)", ] # MEDIUM: 直接ワードが弱い/曖昧でも「強い絶望」「助け求め」「DV/虐待の危険」を拾う RISK_PATTERNS_MEDIUM = [ # 希死念慮の婉曲表現 r"いなくなりたい", r"消えてしまいたい", r"全部やめたい", r"もう無理", r"もういや", r"限界", r"しんどい", r"苦しい", r"つらい", r"壊れそう", r"頭がおかしくなりそう", r"何も感じない", r"何もできない", # 強い孤立/救援要請(あなたのダミーで出やすい) r"助けて", r"誰か助けて", r"一人(ぼっち|きり)で", r"頼れる人がいない", r"話せる人がいない", r"もうどうしたらいいかわからない", r"限界.*助けて", r"泣(いて|き)ばかり", r"眠(れない|れなくて)", r"食べ(られない|れなくて)", # 産後/育児のメンタル(過度に広げすぎない範囲) r"産後(うつ|鬱)", r"育児(うつ|鬱)", r"パニック", r"発作", r"過呼吸", r"動悸が", r"息(が)?苦しい", r"ぼーっとして", r"現実感がない", r"涙が止まらない", # DV/家族関係での危険(“今危ない”ニュアンスを拾う) r"(殴られた|叩かれた|蹴られた)", r"(怒鳴られ|脅され|恐喝され)", r"家(に)?帰れない", r"(怖い|こわい).*(夫|旦那|パートナー|家族)", r"(監視|束縛|鍵を|スマホを).*(取られ|壊され|見られ)", r"(逃げたい|避難したい)", # 子どもへの危険サイン(直接的すぎないが要注意ワード) r"子ども(を)?(叩いて|殴って|怒鳴って)しまった", r"手が出そう", r"(放置|置き去り)してしまいそう", r"子どもが(怖い|苦手)", ] RISK_PATTERNS = RISK_PATTERNS_HIGH + RISK_PATTERNS_MEDIUM def detect_risk(text: str, role: str | None = None) -> Tuple[str, List[str]]: """ Risk detection should ONLY evaluate end-user messages. If role is not "user", returns LOW regardless of text. """ r = (role or "").lower().strip() if r != "user": return "LOW", [] text_norm = str(text or "").strip() hits = [] for pat in RISK_PATTERNS: if re.search(pat, text_norm, flags=re.IGNORECASE): hits.append(pat) if hits: high_terms = [h for h in hits if re.search(r"(死にたい|自殺|リスカ|飛び降り|首を)", h)] level = "HIGH" if high_terms else "MEDIUM" return level, hits return "LOW", [] # ========================= # AES key handling (session, TTL) # ========================= def _b64_to_bytes(s: Optional[str]) -> Optional[bytes]: if not s: return None t = "".join(str(s).split()) t += "=" * ((4 - len(t) % 4) % 4) return base64.b64decode(t) def _set_session_key(key_bytes: bytes): ss["d1_key_b64"] = base64.b64encode(key_bytes).decode() ss["d1_key_set_ts"] = datetime.utcnow().timestamp() def _get_session_key() -> Optional[bytes]: b64 = ss.get("d1_key_b64") ts = ss.get("d1_key_set_ts", 0) if not b64: return None if (datetime.utcnow().timestamp() - ts) > KEY_TTL_MIN * 60: ss.pop("d1_key_b64", None) ss.pop("d1_key_set_ts", None) return None return base64.b64decode(b64) def _clear_session_key(): ss.pop("d1_key_b64", None) ss.pop("d1_key_set_ts", None) # ========================= # Cloudflare Worker pull + decrypt (in-memory) # ========================= def _http_get(url: str, headers: Optional[dict] = None, params: Optional[dict] = None, timeout: int = 30): r = requests.get(url, headers=headers or {}, params=params or {}, timeout=timeout) r.raise_for_status() return r def fetch_since(base_url: str, api_key: str, since_ms: int = 0, limit: int = 1000) -> List[Dict]: url = f"{base_url.rstrip('/')}/messages" headers = {"x-api-key": api_key} if api_key else {} params = {"since": int(since_ms), "limit": int(limit)} r = requests.get(url, headers=headers, params=params, timeout=30) r.raise_for_status() data = r.json() if isinstance(data, dict) and "results" in data: return data["results"] or [] return data if isinstance(data, list) else [] def decrypt_row(aes_key_bytes: bytes, row: Dict) -> Optional[str]: ct_b64 = row.get("ciphertext_b64") iv_b64 = row.get("iv_b64") if not ct_b64 or not iv_b64: return None try: ct = _b64_to_bytes(ct_b64) iv = _b64_to_bytes(iv_b64) if not ct or not iv: return None return AESGCM(aes_key_bytes).decrypt(iv, ct, None).decode("utf-8") except Exception: return None def import_worker_to_memory( base_url: str, api_key: str, aes_key_b64: str, since_ms: int = 0, limit: int = 1000, overwrite: bool = False, ) -> Tuple[bool, str]: """ Fetch encrypted rows from Worker (/messages), decrypt (AES-GCM), and load into ss["messages_df"]. Improvements vs previous version: - Enforces since_ms + limit (assuming fetch_since() actually sends query params) - Robust handling for mixed row schemas: * accepts plaintext row["text"] if present * tries alternate ciphertext/iv field names if Worker schema differs - Returns detailed counters: fetched vs kept vs skipped (missing vs decrypt_fail) """ # ---- validate AES key ---- try: aes_key_bytes = _b64_to_bytes((aes_key_b64 or "").strip()) if not aes_key_bytes or len(aes_key_bytes) not in (16, 24, 32): return False, "Invalid AES key: must be base64 of 16/24/32 bytes." except Exception: return False, "Invalid AES key: base64 decode error." # ---- fetch rows (Worker) ---- try: rows = fetch_since(base_url, api_key, since_ms=int(since_ms), limit=int(limit)) except Exception as e: return False, f"Fetch failed: {e}" if not rows: return True, "No rows returned from Worker." # ---- local helpers for schema-flex decrypt ---- def _first_present(d: Dict, keys: List[str]) -> Optional[str]: for k in keys: v = d.get(k) if v is None: continue s = str(v).strip() if s: return s return None def _decrypt_or_plain(row: Dict) -> Tuple[Optional[str], str]: """ Returns (text, reason) reason in {"ok_plain", "ok_decrypt", "missing_cipher", "decrypt_fail"} """ # 1) plaintext fallback if Worker already gives it plain = _first_present(row, ["text", "plain", "message", "body"]) if plain: return plain, "ok_plain" # 2) try decrypt with flexible field names ct_b64 = _first_present(row, ["ciphertext_b64", "cipher_b64", "ciphertext", "ct_b64", "ct"]) iv_b64 = _first_present(row, ["iv_b64", "nonce_b64", "iv", "nonce"]) if not ct_b64 or not iv_b64: return None, "missing_cipher" # temporarily map into the expected keys for decrypt_row() tmp = dict(row) tmp["ciphertext_b64"] = ct_b64 tmp["iv_b64"] = iv_b64 out = decrypt_row(aes_key_bytes, tmp) if out is None: return None, "decrypt_fail" return out, "ok_decrypt" # ---- instrumentation counters ---- total = len(rows) kept = 0 ok_plain = 0 ok_decrypt = 0 missing_cipher = 0 decrypt_fail = 0 example_missing_keys: List[str] = [] example_fail_keys: List[str] = [] # Optional: early sanity check (but don’t hard-fail if first row is a non-message row) # We'll just proceed and report counters. new_records = [] for row in rows: text, reason = _decrypt_or_plain(row) if text is None: if reason == "missing_cipher": missing_cipher += 1 if len(example_missing_keys) < 3: example_missing_keys.append(", ".join(sorted(list(row.keys()))[:25])) else: decrypt_fail += 1 if len(example_fail_keys) < 3: example_fail_keys.append(", ".join(sorted(list(row.keys()))[:25])) continue kept += 1 if reason == "ok_plain": ok_plain += 1 else: ok_decrypt += 1 # ts handling: prefer ts_ms (epoch ms). Fall back to ts iso, else now. ts_iso = None ts_ms = row.get("ts_ms") if ts_ms is not None and str(ts_ms).strip() != "": try: ts_iso = datetime.fromtimestamp(int(ts_ms) / 1000, tz=timezone.utc).isoformat() except Exception: ts_iso = None if not ts_iso: ts_raw = row.get("ts") if ts_raw and str(ts_raw).strip(): # If already ISO-like, keep it; otherwise fallback to now ts_iso = str(ts_raw).strip() else: ts_iso = datetime.now(timezone.utc).isoformat() user_id = str(row.get("user_id") or "U_unknown") display_name = str(row.get("display_name") or "").strip() role = str(row.get("role") or "user").strip().lower() # allow Worker to supply lvl, terms = detect_risk(text, role=role) new_records.append( { "ts": ts_iso, "user_id": user_id, "display_name": display_name, "role": role, "text": text, "risk_level": lvl, "risk_terms": terms, } ) new_df = pd.DataFrame(new_records) if overwrite: ss["messages_df"] = new_df.reset_index(drop=True) else: ss["messages_df"] = pd.concat([ss["messages_df"], new_df], ignore_index=True) msg = ( f"Fetched {total} rows from Worker. " f"Kept {kept} (plain={ok_plain}, decrypted={ok_decrypt}). " f"Skipped {missing_cipher} (missing iv/cipher) + {decrypt_fail} (decrypt failed)." ) # Add tiny hints if we skipped a lot (no huge dumps) if missing_cipher > 0 and example_missing_keys: msg += " Example missing-key row keys: [" + " | ".join(example_missing_keys) + "]" if decrypt_fail > 0 and example_fail_keys: msg += " Example decrypt-fail row keys: [" + " | ".join(example_fail_keys) + "]" return True, msg # ========================= # CSV → memory (no disk) # ========================= def import_csv_to_memory(file, overwrite: bool = True) -> Tuple[bool, str]: try: df = pd.read_csv(file, dtype=str).fillna("") except Exception as e: return False, f"Failed to read CSV: {e}" required = {"user_id", "role", "text"} if not required.issubset(set(df.columns)): return False, f"CSV must contain columns: {', '.join(sorted(required))}" now = datetime.now(timezone.utc) if "ts" not in df.columns or all(str(x).strip() == "" for x in df["ts"]): base_time = now - timedelta(minutes=len(df)) df["ts"] = [(base_time + timedelta(minutes=i)).isoformat() for i in range(len(df))] if "display_name" not in df.columns: df["display_name"] = "" levels, terms_list = [], [] for role, text in zip(df["role"].tolist(), df["text"].tolist()): lvl, terms = detect_risk(text, role=role) levels.append(lvl) terms_list.append(terms) df = df[["ts", "user_id", "display_name", "role", "text", "risk_level", "risk_terms"]] if overwrite: ss["messages_df"] = df.reset_index(drop=True) else: ss["messages_df"] = pd.concat([ss["messages_df"], df], ignore_index=True) return True, f"Imported {len(df)} rows into memory." # ========================= # Presets & KB from Excel (no hardcoded) # ========================= def parse_preset_sheet(df: pd.DataFrame) -> Dict[str, str]: """ Accept either: A) Row-wise key/value columns: ["key","value"] (case-insensitive) OR B) Column-wise single-row: columns include any of {summary, actions, qna, rewrite} If multiple rows, join non-empty cells with two newlines. """ out = {"summary": "", "actions": "", "qna": "", "rewrite": ""} cols_lower = {c.lower(): c for c in df.columns} # A) key/value if ("key" in cols_lower) and ("value" in cols_lower): key_col = cols_lower["key"] val_col = cols_lower["value"] for _, r in df.iterrows(): k = str(r.get(key_col, "")).strip().lower() v = str(r.get(val_col, "")).strip() if k in out: out[k] = v return out # B) column-wise for k in list(out.keys()): if k in cols_lower: col = cols_lower[k] vals = [str(x).strip() for x in df[col].tolist() if str(x).strip()] out[k] = "\n\n".join(vals) return out def _normalize_cols(df: pd.DataFrame) -> pd.DataFrame: df2 = df.copy() df2.columns = [str(c).strip() for c in df2.columns] return df2 def load_presets_and_kb_from_xlsx(filelike) -> Tuple[Dict[str, Dict[str, str]], List[Dict[str, str]], pd.DataFrame, pd.DataFrame]: """ Returns (presets_dict, hotlines_list, qa_df, ra_df) - Presets: any sheet EXCEPT "Hotlines", "Q&A", "QA", "RA" Expected columns: summary, actions, qna, rewrite (or key/value) - Hotlines: optional sheet "Hotlines" with name, dial/url, hours, note - Q&A: sheet "Q&A" (preferred) or "QA" with columns: keyword, question, answer - RA: sheet "RA" with columns: keyword, action (or actions) """ presets: Dict[str, Dict[str, str]] = {} hotlines: List[Dict[str, str]] = [] qa_df = pd.DataFrame(columns=["keyword", "question", "answer"]) ra_df = pd.DataFrame(columns=["keyword", "action"]) xls = pd.ExcelFile(filelike) for sheet in xls.sheet_names: df = _normalize_cols(xls.parse(sheet).fillna("")) low = sheet.strip().lower() if low == "hotlines": name_col = next((c for c in df.columns if c.lower() == "name"), None) dial_col = next((c for c in df.columns if c.lower() in ("dial","phone","tel")), None) url_col = next((c for c in df.columns if c.lower() in ("url","link")), None) hours_col= next((c for c in df.columns if c.lower() == "hours"), None) note_col = next((c for c in df.columns if c.lower() == "note"), None) for _, r in df.iterrows(): name = str(r.get(name_col,"")).strip() if name_col else "" if not name: continue hotlines.append({ "name": name, "dial": str(r.get(dial_col,"")).strip() if dial_col else "", "url": str(r.get(url_col,"")).strip() if url_col else "", "hours": str(r.get(hours_col,"")).strip() if hours_col else "", "note": str(r.get(note_col,"")).strip() if note_col else "", }) continue if low in ("q&a","qa"): # Columns: keyword, question, answer kcol = next((c for c in df.columns if c.lower() == "keyword"), None) qcol = next((c for c in df.columns if c.lower() == "question"), None) acol = next((c for c in df.columns if c.lower() == "answer"), None) if kcol and (qcol or acol): tmp = pd.DataFrame({ "keyword": df[kcol].astype(str).str.strip(), "question": df[qcol].astype(str).str.strip() if qcol else "", "answer": df[acol].astype(str).str.strip() if acol else "", }) qa_df = tmp[(tmp["keyword"]!="") & ((tmp["question"]!="") | (tmp["answer"]!=""))].reset_index(drop=True) continue if low == "ra": # Columns: keyword, action (or actions) kcol = next((c for c in df.columns if c.lower() == "keyword"), None) acol = next((c for c in df.columns if c.lower() in ("action","actions")), None) if kcol and acol: tmp = pd.DataFrame({ "keyword": df[kcol].astype(str).str.strip(), "action": df[acol].astype(str).str.strip(), }) ra_df = tmp[(tmp["keyword"]!="") & (tmp["action"]!="")].reset_index(drop=True) continue # Otherwise: treat as a preset sheet presets[sheet] = parse_preset_sheet(df) return presets, hotlines, qa_df, ra_df def unique_keywords_from_df(df: pd.DataFrame, col: str = "keyword") -> List[str]: if df is None or df.empty or (col not in df.columns): return [] kws = sorted({str(x).strip() for x in df[col].tolist() if str(x).strip()}) return kws[:500] # sanity cap def _strip_json_codefences(s: str) -> str: # remove ```json ... ``` fences if present s = str(s or "") if "```" in s: s = re.sub(r"```(?:json)?\s*(.*?)\s*```", r"\1", s, flags=re.DOTALL | re.IGNORECASE) return s.strip() def _extract_first_json_blob(s: str) -> str: """ Try to pull the first JSON-looking object/array from a messy response. Returns "" if nothing plausible found. """ s = _strip_json_codefences(s) # Fast path: already pure JSON if (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")): return s # Try to find an array first, then an object (greedy, but usually fine here) m = re.search(r"\[.*\]", s, flags=re.DOTALL) if m: return m.group(0).strip() m = re.search(r"\{.*\}", s, flags=re.DOTALL) if m: return m.group(0).strip() return "" def safe_parse_json_keywords(s: str) -> List[str]: """ Accepts either: - ["kw1","kw2"] - {"keywords":["kw1","kw2"]} Returns list[str] (deduped, up to 3). """ try: blob = _extract_first_json_blob(s) if not blob: return [] data = json.loads(blob) if isinstance(data, list): arr = data elif isinstance(data, dict): # common variants if "keywords" in data: arr = data["keywords"] elif "keyword" in data: arr = data["keyword"] elif "selected" in data: arr = data["selected"] else: return [] else: return [] # normalize to list if isinstance(arr, (str, int, float)): arr = [arr] out: List[str] = [] for x in (arr or []): sx = str(x).strip() if sx and sx not in out: out.append(sx) return out[:3] except Exception: return [] def _fallback_keywords_from_text(resp_text: str, all_keywords: List[str], top_k: int = 3) -> List[str]: """ Lenient fallback: If the model didn't return valid JSON, try to pick keywords that literally appear in the response text (case-insensitive for ASCII; JP unaffected). """ text = str(resp_text or "").strip() if not text: return [] text_low = text.lower() hits: List[Tuple[int, int, str]] = [] # (pos, -len, keyword) for kw in all_keywords: k = str(kw or "").strip() if not k: continue k_low = k.lower() pos = text_low.find(k_low) if pos >= 0: hits.append((pos, -len(k_low), k)) if not hits: return [] hits.sort() picked: List[str] = [] for _, __, k in hits: if k not in picked: picked.append(k) if len(picked) >= top_k: break return picked def llm_pick_keywords(context_text: str, all_keywords: List[str], kind: str = "Q&A", top_k: int = 3) -> List[str]: """ Calls OpenAI (Responses API) to select 1–3 keywords from all_keywords that best match the context_text. More lenient parsing: 1) Try strict JSON parse 2) If not parseable, fallback to literal keyword mentions in the model output Returns a (possibly empty) list of selected keywords. """ if not OPENAI_API_KEY: return [] if not all_keywords: return [] # Keep prompt reasonable kw_list = ", ".join(all_keywords[:2000]) sys_prompt = ( "You are a careful assistant. Given a context and a list of available keywords, " f"pick 1–{top_k} {kind} keywords that are MOST relevant. " "Output ONLY JSON, no commentary. Either a JSON array of strings, or {\"keywords\": [..]}." ) user_prompt = ( f"Context (truncated):\n\n{context_text}\n\n" f"Available {kind} keywords (choose at most {top_k}):\n{kw_list}\n\n" "Return JSON only." ) payload = { "model": OPENAI_MODEL, "input": [ {"role": "system", "content": [{"type": "input_text", "text": sys_prompt}]}, {"role": "user", "content": [{"type": "input_text", "text": user_prompt}]}, ], "store": False, } try: r = requests.post( OPENAI_URL, headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}, json=payload, timeout=60, ) if r.status_code != 200: return [] data = r.json() raw = (_extract_responses_output_text(data) or "").strip() if not raw: return [] # 1) Strict JSON parse picked = safe_parse_json_keywords(raw) # 2) Lenient fallback: literal mentions in model output if not picked: picked = _fallback_keywords_from_text(raw, all_keywords, top_k=top_k) # Keep only those present in the known list (case-insensitive match) aset = {str(k).lower(): k for k in all_keywords} final: List[str] = [] for p in picked: v = aset.get(str(p).lower()) if v and v not in final: final.append(v) return final[:top_k] except Exception: return [] def get_active_preset_content() -> Dict[str, str]: name = ss.get("active_preset") or "" return (ss["presets"].get(name) or {"summary": "", "actions": "", "qna": "", "rewrite": ""}) _load_demo_kb_if_empty() # ========================= # Utility: views & context # ========================= def db_all_messages() -> pd.DataFrame: return ss["messages_df"] def db_recent_messages(user_id: Optional[str]=None, minutes: int=1440) -> pd.DataFrame: df = ss["messages_df"] if df.empty: return df since = (datetime.utcnow() - timedelta(minutes=minutes)).replace(tzinfo=timezone.utc) dtcol = pd.to_datetime(df["ts"], utc=True, errors="coerce") mask = (dtcol >= since) if user_id: mask &= (df["user_id"] == user_id) return df[mask].copy().sort_values(by="ts", ascending=True) def db_distinct_users() -> List[str]: df = ss["messages_df"] if df.empty: return [] return sorted([u for u in df["user_id"].dropna().unique() if str(u).strip()]) def resolve_user_label(user_id: str, df_all: pd.DataFrame) -> str: # Prefer nickname from ss.user_index; else most recent non-empty display_name from df; else suffix of user_id. nm = (ss.get("user_index", {}).get(user_id, {}) or {}).get("nickname", "").strip() if nm: base = nm else: dfx = df_all[df_all["user_id"] == user_id] dn = "" if not dfx.empty: dn_series = dfx[dfx["display_name"].astype(str).str.strip() != ""].tail(1)["display_name"] if not dn_series.empty: dn = str(dn_series.iloc[0]).strip() base = dn or user_id suffix = user_id[-6:] if len(user_id) >= 6 else user_id return f"{base} ({suffix})" def split_sentences(text: str) -> List[str]: # Rough splitter for JA/EN punctuation; keeps punctuation with sentence. parts = re.split(r'(?<=[。.\.!\?!?])\s*', text) return [p for p in parts if p.strip()] def approx_tokens(s: str) -> int: # crude but works cross-lang: ~4 chars per token on average return max(1, int(len(s) / 4)) def last_context_with_sentence_guard(dfu: pd.DataFrame, max_tokens: int = 500) -> str: # Build role: text lines from recent convo lines = [] for _, r in dfu.tail(200).iterrows(): text_clean = str(r["text"]).strip().replace("\n", " ") role = r.get("role", "user") lines.append(f"{role}: {text_clean}") blob = "\n".join(lines).strip() if not blob: return "" # If already short, return if approx_tokens(blob) <= max_tokens: return blob # Sentence guard: accumulate from end by full sentences sents = split_sentences(blob) picked = [] count = 0 for sent in reversed(sents): picked.append(sent) count += approx_tokens(sent) if count >= max_tokens: break return "\n".join(reversed(picked)).strip() # ========================= # Probes (optional) # ========================= def _dns_check(host: str): try: infos = socket.getaddrinfo(host, 443, proto=socket.IPPROTO_TCP) addrs = sorted({f"{i[4][0]}:{i[4][1]}" for i in infos}) return True, addrs except Exception as e: return False, [f"DNS error: {e}"] def _http_probe(url: str, headers: dict | None = None): try: r = requests.get(url, headers=headers or {}, timeout=15) return True, f"{r.status_code} {r.reason}", r.text[:300] except Exception as e: return False, str(e), "" # ========================= # FOLLOW-UPS — D1 via Worker API (storage & helpers) # ========================= import os, datetime as dt from typing import Optional import requests import streamlit as st # Assumes you already have these elsewhere: # - ss = st.session_state # - JPN_TZ = zoneinfo.ZoneInfo("Asia/Tokyo") FOLLOWUPS_API_BASE = (os.getenv("FOLLOWUPS_API_BASE") or ss.get("worker_base") or "").rstrip("/") # Optional: x-api-key auth fallback (only needed if you are NOT using Bearer token) FOLLOWUPS_READ_KEY = os.getenv("FOLLOWUPS_READ_KEY", "") FOLLOWUPS_WRITE_KEY = os.getenv("FOLLOWUPS_WRITE_KEY", "") ss.setdefault("followups_cache", []) ss.setdefault("followups_loaded", False) ss.setdefault("followup_alert_modal_shown", False) def _now_jst() -> dt.datetime: return dt.datetime.now(JPN_TZ) def _ensure_tz(dt_obj: dt.datetime) -> dt.datetime: if dt_obj.tzinfo is None: return dt_obj.replace(tzinfo=JPN_TZ) return dt_obj def _api_headers(required_scope: str) -> dict: """ required_scope: "read" or "write" Prefers Bearer token if present, else x-api-key. """ h = {"content-type": "application/json"} token = ss.get("api_token") or ss.get("token") or ss.get("worker_token") or "" if token: h["authorization"] = f"Bearer {token}" return h # fallback: x-api-key if required_scope == "write" and FOLLOWUPS_WRITE_KEY: h["x-api-key"] = FOLLOWUPS_WRITE_KEY elif required_scope == "read" and FOLLOWUPS_READ_KEY: h["x-api-key"] = FOLLOWUPS_READ_KEY elif FOLLOWUPS_WRITE_KEY: # allow write key for reads too h["x-api-key"] = FOLLOWUPS_WRITE_KEY return h def _api_get(path: str, params: Optional[dict] = None, scope: str = "read"): if not FOLLOWUPS_API_BASE: raise RuntimeError("FOLLOWUPS_API_BASE not set (and ss.worker_base is empty)") url = f"{FOLLOWUPS_API_BASE}{path}" return requests.get(url, headers=_api_headers(scope), params=params or {}, timeout=20) def _api_post(path: str, payload: dict, scope: str = "write"): if not FOLLOWUPS_API_BASE: raise RuntimeError("FOLLOWUPS_API_BASE not set (and ss.worker_base is empty)") url = f"{FOLLOWUPS_API_BASE}{path}" return requests.post(url, headers=_api_headers(scope), json=payload, timeout=20) def load_followups_into_state(force: bool = False): if ss.get("followups_loaded") and not force: return try: resp = _api_get("/api/followups/open", scope="read") if not resp.ok: st.warning(f"Follow-ups: API read error ({resp.status_code}): {resp.text[:200]}") ss["followups_cache"] = [] ss["followups_loaded"] = True return data = resp.json() if resp.text else [] ss["followups_cache"] = data if isinstance(data, list) else [] ss["followups_loaded"] = True except Exception as e: st.warning(f"Follow-ups: load error ({e}).") ss["followups_cache"] = [] ss["followups_loaded"] = True def add_followup(user_id: str, due_dt: dt.datetime, note: str = "") -> str: due_dt = _ensure_tz(due_dt) payload = {"user_id": str(user_id), "due_iso": due_dt.isoformat(), "note": (note or "").strip()} resp = _api_post("/api/followups/set", payload, scope="write") if not resp.ok: raise RuntimeError(f"Follow-up set failed ({resp.status_code}): {resp.text[:200]}") out = resp.json() if resp.text else {} load_followups_into_state(force=True) return str(out.get("id") or "") def resolve_followup(fid: str): resp = _api_post("/api/followups/resolve", {"id": str(fid)}, scope="write") if not resp.ok: raise RuntimeError(f"Resolve failed ({resp.status_code}): {resp.text[:200]}") load_followups_into_state(force=True) def list_overdue_followups(now_dt: Optional[dt.datetime] = None) -> list: now_dt = _ensure_tz(now_dt or _now_jst()) try: resp = _api_get("/api/followups/overdue", params={"now_iso": now_dt.isoformat()}, scope="read") if not resp.ok: st.warning(f"Follow-ups: overdue query error ({resp.status_code}): {resp.text[:200]}") return [] data = resp.json() if resp.text else [] return data if isinstance(data, list) else [] except Exception as e: st.warning(f"Follow-ups: overdue query error ({e}).") return [] def list_user_open_followups(user_id: str) -> list: uid = str(user_id) out = [it for it in ss.get("followups_cache", []) if str(it.get("user_id", "")) == uid and not bool(it.get("resolved", False))] out.sort(key=lambda x: x.get("due_iso", "")) return out # ========================= # Worker API helpers (Responder Console) # ========================= def auth_headers() -> dict: return {"Authorization": f"Bearer {ss.token}"} if ss.token else {} def api_get(path: str, params: dict | None = None): base = ss.worker_base.strip().rstrip("/") try: return requests.get(f"{base}{path}", params=params, headers=auth_headers(), timeout=20) except Exception as e: return type("Obj", (), {"status_code": 599, "text": str(e)}) def api_post(path: str, json_data: dict): base = ss.worker_base.strip().rstrip("/") try: h = {"Content-Type": "application/json"}; h.update(auth_headers()) return requests.post(f"{base}{path}", json=json_data, headers=h, timeout=20) except Exception as e: return type("Obj", (), {"status_code": 599, "text": str(e)}) # ========================= # Global CSS for chat bubbles # ========================= st.markdown(""" """, unsafe_allow_html=True) st.markdown(""" """, unsafe_allow_html=True) # ========================= # PASSCODE GATE — Entire App (LOCKED base + brute-force throttle) # ========================= st.markdown("

🐨 Koala DX — Dashboard + Responder

", unsafe_allow_html=True) # --- Brute-force throttle (per-session) --- ss.setdefault("auth_fail_count", 0) ss.setdefault("auth_block_until", 0.0) def _auth_is_blocked_now() -> tuple[bool, int]: now = time.time() until = float(ss.get("auth_block_until", 0.0) or 0.0) if now < until: return True, int(until - now) return False, 0 def _auth_register_fail(): # Exponential backoff up to 5 minutes (per session) ss["auth_fail_count"] = int(ss.get("auth_fail_count", 0) or 0) + 1 delay = min(300, 2 ** min(ss["auth_fail_count"], 8)) # 2,4,8,... up to 256s, capped at 300 ss["auth_block_until"] = time.time() + delay def _auth_register_success(): ss["auth_fail_count"] = 0 ss["auth_block_until"] = 0.0 # Force the locked base into session state (prevents tampering) ss.worker_base = WORKER_BASE_DEFAULT if not ss.token: blocked, remain_s = _auth_is_blocked_now() if blocked: st.error(f"Too many attempts. Try again in ~{remain_s}s.") st.stop() with st.form("login_gate", clear_on_submit=False): st.subheader("🔐 Sign in") st.caption(f"Worker: {WORKER_BASE_DEFAULT} (locked)") passcode = st.text_input("Passcode", type="password", placeholder="Enter READ or WRITE passcode") submitted = st.form_submit_button("Sign in") if submitted: # Always add a tiny delay to make guessing slower time.sleep(0.5) base = WORKER_BASE_DEFAULT # 🔒 locked try: r = requests.post( f"{base}/api/auth", json={"passcode": passcode}, timeout=20, allow_redirects=False, # 🔒 avoid redirect-to-attacker leakage ) except Exception as e: _auth_register_fail() st.error(f"Network error: {e}") st.stop() if r.status_code != 200: _auth_register_fail() st.error(f"Auth failed: {r.text}") st.stop() # Success try: data = r.json() except Exception: _auth_register_fail() st.error("Auth failed: invalid JSON response.") st.stop() ss.worker_base = base ss.token = data.get("token", "") or "" ss.scope = data.get("scope", "") or "" if not ss.token: _auth_register_fail() st.error("Auth failed: missing token.") st.stop() _auth_register_success() st.success(f"Signed in as {ss.scope.upper()} — {ss.worker_base}") if rerun: rerun() st.stop() # ========================= # FOLLOW-UPS — load & notify once per login (no st.modal) # ========================= load_followups_into_state(force=False) def _fmt_user_label(uid: str) -> str: uid = (uid or "").strip() rec = (ss.get("user_index") or {}).get(uid, {}) or {} nick = (rec.get("nickname") or "").strip() disp = (rec.get("display_name") or "").strip() suffix = (rec.get("suffix") or (uid[-6:] if len(uid) >= 6 else uid)) name = nick or disp or f"unfollowed({suffix})" return f"{name} ({suffix})" def _fmt_due_jst(due_iso: str) -> str: s = (due_iso or "").strip() if not s: return "(no due date)" try: d = dt.datetime.fromisoformat(s) # handles "+09:00" offsets too if d.tzinfo is None: d = d.replace(tzinfo=JPN_TZ) else: d = d.astimezone(JPN_TZ) return d.strftime("%Y-%m-%d %H:%M") except Exception: # fallback: show raw if parsing fails return s if not ss["followup_alert_modal_shown"]: overdue = list_overdue_followups() if overdue: st.warning("⏰ Follow-ups due — please review below.") with st.expander("Follow-ups due (click to open)", expanded=True): st.write("Resolve now or keep to be reminded again next login.") st.divider() for it in overdue: uid = it.get("user_id", "") fid = it.get("id") note = (it.get("note", "") or "").strip() due_pretty = _fmt_due_jst(it.get("due_iso", "")) st.checkbox( f"Resolve • {_fmt_user_label(uid)} • due={due_pretty} • note={note or '(no note)'}", key=f"ck_resolve_{fid}", value=False, ) c1, c2 = st.columns(2) with c1: if st.button("✅ Update (resolve checked)", key="fu_update"): any_change = False for it in overdue: fid = it.get("id") if fid and st.session_state.get(f"ck_resolve_{fid}", False): try: resolve_followup(str(fid)) any_change = True except Exception as e: st.error(f"Resolve failed for {fid}: {e}") if any_change: st.success("Updated follow-ups.") else: st.info("No changes made.") ss["followup_alert_modal_shown"] = True if rerun: rerun() with c2: if st.button("🔁 Keep all for later", key="fu_keep"): ss["followup_alert_modal_shown"] = True st.info("Okay — you’ll be reminded again on next login.") if rerun: rerun() else: ss["followup_alert_modal_shown"] = True # ========================= # Sidebar — (visible only after login) # ========================= with st.sidebar: st.header("Data Fetch") # --- Backend config (hidden from main UI) --- # Keep these in env/secrets; not exposed in the sidebar UI base_url = (ss.get("worker_base") or WORKER_BASE_DEFAULT).strip().rstrip("/") api_key = READ_API_KEY_DEFAULT # keep in env/secrets # --- Default: fetch past 1 year --- now_jst = dt.datetime.now(JPN_TZ) default_since_dt = (now_jst - dt.timedelta(days=365)).replace(hour=0, minute=0, second=0, microsecond=0) # --- Inputs the user asked to keep --- aes_key_in = st.text_input("AES key (base64, 16/24/32 bytes)", type="password") limit = st.number_input("Limit", min_value=1, max_value=5000, value=int(FETCH_LIMIT_DEFAULT), step=100) # Human-readable since date (JST) + computed since_ms (UTC epoch ms) since_date = st.date_input("Since (JST date)", value=default_since_dt.date()) # Optional time control (kept minimal; defaults midnight JST) since_time = dt.time(0, 0) since_dt_jst = dt.datetime.combine(since_date, since_time).replace(tzinfo=JPN_TZ) since_ms = int(since_dt_jst.astimezone(dt.timezone.utc).timestamp() * 1000) st.caption(f"Computed since_ms: `{since_ms}` (UTC ms) · JST: {since_dt_jst.strftime('%Y-%m-%d %H:%M')}") c1, c2 = st.columns([1.2, 1.0]) with c1: if st.button("☁️ Fetch & Decrypt → Memory", type="primary"): # Validate AES key and store into session key cache kb = None if aes_key_in: try: kb = _b64_to_bytes(aes_key_in.strip()) except Exception: kb = None if not kb or len(kb) not in (16, 24, 32): st.error("Invalid AES key. Must be base64 for 16/24/32 bytes.") else: _set_session_key(kb) # Recommended: overwrite to avoid duplicates on repeated fetches ok, msg = import_worker_to_memory( base_url=base_url, api_key=api_key, aes_key_b64=base64.b64encode(kb).decode(), since_ms=int(since_ms), limit=int(limit), overwrite=True, ) if ok: st.success(msg) st.toast("Worker import complete.", icon="✅") else: st.error(msg) with c2: if st.button("🧹 Clear memory"): ss["messages_df"] = ss["messages_df"].iloc[0:0].copy() ss["sec_cache"].clear() st.success("Cleared in-memory data.") st.divider() # --- Backend imports kept but hidden/collapsed --- with st.expander("Backend imports (CSV / preset.xlsx)", expanded=False): st.subheader("CSV Import") up = st.file_uploader("Upload CSV (user_id, role, text[, ts, display_name])", type=["csv"]) if st.button("📥 Import CSV → Memory"): if up is None: st.warning("Please choose a CSV file first.") else: ok, msg = import_csv_to_memory(up, overwrite=True) (st.success if ok else st.error)(msg) st.divider() st.subheader("Presets from Excel") preset_file = st.file_uploader("preset.xlsx", type=["xlsx"]) if st.button("📚 Load presets"): if not preset_file: st.warning("Select an .xlsx file first.") else: try: data = preset_file.read() presets, hotlines, qa_df, ra_df = load_presets_and_kb_from_xlsx(io.BytesIO(data)) ss["presets"] = presets ss["hotlines"] = hotlines ss["qa_rows"] = qa_df ss["ra_rows"] = ra_df ss["qa_keywords"] = unique_keywords_from_df(qa_df, "keyword") ss["ra_keywords"] = unique_keywords_from_df(ra_df, "keyword") if presets: ss["active_preset"] = sorted(presets.keys())[0] st.success( f"Loaded {len(presets)} presets, {len(hotlines)} hotlines, " f"{len(ss['qa_keywords'])} Q&A keywords, {len(ss['ra_keywords'])} RA keywords." ) except Exception as e: st.error(f"Failed to load presets: {e}") if ss.get("presets"): preset_names = sorted(ss["presets"].keys()) ss["active_preset"] = st.selectbox( "Active preset", options=preset_names, index=max( 0, preset_names.index(ss.get("active_preset", "")) if ss.get("active_preset", "") in preset_names else 0 ) ) tab_main, tab_stats = st.tabs(["🏠 Main", "📊 Stats"]) with tab_main: # ========================= # Single-Page Layout — Left (Responder Console) | Right (Dashboard) # ========================= left, right = st.columns([1, 1]) # left half for chat console # ==== LEFT: Responder Console (uses Worker passcode token) ===================== with left: st.subheader("💬 Responder Console") st.caption(f"Signed in as {ss.scope.upper()} — {ss.worker_base}") st.text_input("Responder name(担当者名)", key="responder", placeholder="e.g., Ken / 山田") st.markdown("**Select a user**") resp = api_get("/api/users") if resp.status_code != 200: st.error(f"Failed to load users: {resp.text}") users = [] else: users = resp.json() or [] # Build uid -> display info map (stable) user_map = {} uid_options = [] for row in users: uid = (row.get("user_id") or "").strip() if not uid: continue nickname = (row.get("nickname") or "").strip() display = (row.get("display_name") or "").strip() suffix = uid[-6:] if len(uid) >= 6 else uid user_map[uid] = {"nickname": nickname, "display_name": display, "suffix": suffix} uid_options.append(uid) ss["user_index"] = user_map # persist for use on the right column & messages table def _last_ts(uid: str) -> int: for row in users: if (row.get("user_id") or "").strip() == uid: try: return int(row.get("last_ts") or 0) except Exception: return 0 return 0 # Sort by last_ts desc (stable values, only order changes) uid_options = sorted(uid_options, key=_last_ts, reverse=True) def fmt_uid(uid: str) -> str: rec = user_map.get(uid, {}) or {} nickname = (rec.get("nickname") or "").strip() display = (rec.get("display_name") or "").strip() suffix = rec.get("suffix") or (uid[-6:] if len(uid) >= 6 else uid) name = nickname or display or f"unfollowed({suffix})" last_ts = _last_ts(uid) return f"{name} ({suffix}) · last: {fmt_ts_ms_to_jst(last_ts)}" if uid_options: # Keep previous selection if possible default_uid = ss.get("selected_user") if ss.get("selected_user") in uid_options else uid_options[0] default_idx = uid_options.index(default_uid) picked_uid = st.selectbox( "Users", options=uid_options, index=default_idx, format_func=fmt_uid, key="user_select_uid", # IMPORTANT: stable widget key ) ss.selected_user = picked_uid ss.selected_user_label = fmt_uid(picked_uid) ss.nickname_current = (user_map.get(ss.selected_user, {}) or {}).get("nickname") or "" else: st.info("No users yet.") # Nickname editor if ss.selected_user: st.markdown("**Nickname (表示名・ニックネーム)**") cols = st.columns([3,1]) with cols[0]: new_nick = st.text_input("Set nickname for this user", value=ss.nickname_current, key="nick_input", placeholder="e.g., 山田さん / Yamada") with cols[1]: if st.button("Save"): if ss.scope != "write": st.warning("You are signed in with READ-only scope.") else: r = api_post("/api/nickname", {"user_id": ss.selected_user, "nickname": new_nick.strip() or None}) if r.status_code == 200: st.success("Nickname updated ✅") ss.nickname_current = new_nick.strip() time.sleep(0.3) if rerun: rerun() else: st.error(f"Failed to update nickname: {r.text}") # Conversation messages = [] if ss.selected_user: cols = st.columns([1,1,2]) with cols[0]: if st.button("↻ Refresh"): ss.last_refresh = time.time() if rerun: rerun() with cols[1]: st.caption("Auto-refresh ~20s") if time.time() - ss.last_refresh > 20: ss.last_refresh = time.time() if rerun: rerun() r = api_get("/api/messages", params={"user_id": ss.selected_user}) if r.status_code != 200: st.error(f"Failed to load messages: {r.text}") else: messages = r.json() or [] # Auto-set unfollowed nickname if applicable if ss.selected_user and ss.scope == "write": uinfo = user_map.get(ss.selected_user, {}) if 'user_map' in locals() else {} if (not (uinfo.get("nickname") or uinfo.get("display_name")) and any((m.get("type") or "").lower().strip() == "unfollow" for m in messages) and ss.auto_set_nick_once_for != ss.selected_user): fallback_nick = f"unfollowed({uinfo.get('suffix','')})" resp_set = api_post("/api/nickname", {"user_id": ss.selected_user, "nickname": fallback_nick}) if resp_set.status_code == 200: ss.auto_set_nick_once_for = ss.selected_user ss.nickname_current = fallback_nick st.info(f"Nickname auto-set to **{fallback_nick}** for unfollowed user.") time.sleep(0.3) if rerun: rerun() else: st.warning(f"Attempted to auto-set unfollow nickname but failed: {resp_set.text}") shown_label = ss.selected_user_label or ss.selected_user or "(none)" st.markdown(f"**Conversation — {shown_label}**") render_conversation_bubbles(messages, height_px=520, key=f"chat_{ss.selected_user or 'none'}") # Send reply if ss.selected_user: st.markdown("**Send a reply**") if ss.scope != "write": st.info("You are signed in with READ-only scope. Replies are disabled.") else: reply = st.chat_input("Type your reply…") if reply: if not ss.responder.strip(): st.warning("Please enter your Responder name(担当者名)above before sending.") else: payload = {"user_id": ss.selected_user, "text": reply.strip(), "responder": ss.responder.strip()} r = api_post("/api/messages", payload) if r.status_code == 200: try: out = r.json() or {} except Exception: out = {} status = (out.get("delivery_status") or "").lower().strip() if status == "sent": st.success("Sent ✅") elif status == "quota": st.warning("⚠️今月の無料送信数を超えています。手動でラインを送ってください.") else: st.error("Error ❌ (will auto-retry once after 10s)") # Optional: refresh conversation so you can see delivery_status fields updated if rerun: rerun() else: st.error(f"Send failed: {r.text}") with st.expander("🔎 Debug log", expanded=False): if st.button("Clear debug log", key="btn_clear_debug"): ss["debug_log"] = [] st.code("\n".join(ss.get("debug_log", [])) or "(empty)") # ==== RIGHT: Dashboard (Risk, Volunteer Assist with KB/LLM, then Messages) ===== SUMMARIZE_SYSTEM_PROMPT = ( "You are a careful support assistant for volunteer responders. " "Write in Japanese. Be supportive, neutral, and anonymized. " "Do not give medical/legal certainty. If self-harm risk is present, prioritize safety steps and hotlines." ) with right: st.subheader("⚠️ High/Medium Risk") df_all = db_all_messages() if not df_all.empty: risky = df_all[df_all["risk_level"].isin(["HIGH","MEDIUM"])] if risky.empty: st.success("No risk flagged.") else: st.dataframe( risky[["ts","user_id","display_name","text","risk_level"]].tail(200), use_container_width=True, height=220 ) else: st.info("No messages loaded yet.") # --- Volunteer Assist target user (fix NameError: pick_uid) --- pick_uid = (ss.get("selected_user") or "").strip() ss["assist_pick_uid"] = pick_uid # keep follow-up section consistent # Build context for summarization (prefer live Worker messages; fallback to in-memory) ctx = "" if pick_uid: rr = api_get("/api/messages", params={"user_id": pick_uid}) if rr.status_code == 200: msgs_ctx = rr.json() or [] lines = [] for m in msgs_ctx[-200:]: role = (m.get("role") or "user").lower().strip() text = str(m.get("text") or "").strip().replace("\n", " ") if text: lines.append(f"{role}: {text}") blob = "\n".join(lines).strip() # sentence-guard to ~500 tokens (uses your existing helpers) if approx_tokens(blob) <= 500: ctx = blob else: sents = split_sentences(blob) picked = [] count = 0 for sent in reversed(sents): picked.append(sent) count += approx_tokens(sent) if count >= 500: break ctx = "\n".join(reversed(picked)).strip() else: # fallback: in-memory df (only if you imported CSV/Worker→memory) dfu = db_recent_messages(user_id=pick_uid, minutes=60 * 24 * 30) ctx = last_context_with_sentence_guard(dfu, max_tokens=500) if not pick_uid: st.info("左側でユーザーを選択すると、要約/提案が有効になります。") # ---- ① Conversation Summary (robust: updates text_area after rerun) ---- st.markdown("**① Conversation Summary**") # Keys: # - sum_widget_key: the actual widget key (Streamlit owns this once created) # - sum_value_key: our backing store (safe to update any time) # - sum_pending_key: a "mailbox" for updates that will be applied BEFORE widget instantiation sum_value_key = f"summary_value_{pick_uid}" sum_widget_key = f"summary_widget_{pick_uid}" sum_pending_key = f"_pending_summary_{pick_uid}" # Seed initial text (preset -> previous saved -> empty) preset = get_active_preset_content() out_map = (ss.get("llm_out", {}).get(pick_uid, {}) or {}) initial = (out_map.get("summary") or preset.get("summary") or "").strip() if sum_value_key not in ss: ss[sum_value_key] = initial # ✅ Apply pending update BEFORE creating the widget (this is the crucial part) if ss.get(sum_pending_key): new_text = ss.pop(sum_pending_key) ss[sum_value_key] = new_text ss[sum_widget_key] = new_text # safe here because widget isn't created yet ss.setdefault("llm_out", {}).setdefault(pick_uid, {})["summary"] = new_text # Ensure widget key exists before first render if sum_widget_key not in ss: ss[sum_widget_key] = ss.get(sum_value_key, "") # Render widget (after this point, DO NOT assign ss[sum_widget_key] directly) summary_txt = st.text_area("Summary", key=sum_widget_key, height=150) c_sum1, c_sum2 = st.columns([1, 1]) with c_sum1: if st.button("🧠 Summarize (OpenAI)", key=f"btn_summarize_{pick_uid}"): dbg("Summarize clicked") if not OPENAI_API_KEY: dbg("OPENAI_API_KEY missing") st.error("OPENAI_API_KEY is not set.") else: dbg(f"OPENAI_URL={OPENAI_URL}") dbg(f"OPENAI_MODEL={OPENAI_MODEL}") dbg(f"ctx_chars={len(ctx or '')}") # Optional DNS hint try: host = (OPENAI_URL or "").split("/")[2] ok_dns, addrs = _dns_check(host) dbg(f"dns_ok={ok_dns} addrs={addrs[:3]}") except Exception as e: dbg(f"dns_check_error={e}") # ✅ Yes: the summarize system prompt is written here (explicit) sys_prompt = ( "You are a careful assistant supporting volunteer responders for caregiver chats in Japan. " "Summarize the conversation into 5–8 bullet points in Japanese. " ) user_prompt = ( "Conversation context:\n\n" f"{ctx}\n\n" "Output only the summary bullets." ) payload = { "model": OPENAI_MODEL, "input": [ {"role": "system", "content": [{"type": "input_text", "text": sys_prompt}]}, {"role": "user", "content": [{"type": "input_text", "text": user_prompt}]}, ], "store": False, } t0 = time.time() with st.spinner("Calling OpenAI..."): try: r = requests.post( OPENAI_URL, headers={ "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json", }, json=payload, timeout=60, ) dt_s = time.time() - t0 dbg(f"openai_http_done status={r.status_code} elapsed={dt_s:.2f}s") dbg(f"openai_resp_head={r.text[:300].replace(chr(10),' ')}") if r.status_code != 200: st.error(f"LLM error: {r.status_code}\n\n{r.text}") else: data = r.json() content = (_extract_responses_output_text(data) or "").strip() dbg(f"parsed_output_chars={len(content)}") if not content: st.warning("Got HTTP 200 but no text output was parsed. Check Debug log.") else: # ✅ DO NOT set ss[sum_widget_key] here (widget already exists) # Instead, stash it and rerun so it applies BEFORE widget instantiation. ss[sum_pending_key] = content ss[sum_value_key] = content ss.setdefault("llm_out", {}).setdefault(pick_uid, {})["summary"] = content st.success("Summary updated.") if rerun: rerun() except Exception as e: dbg(f"openai_request_exception={repr(e)}") st.exception(e) with c_sum2: if st.button("💾 Save current text", key=f"btn_save_summary_{pick_uid}"): cur = (ss.get(sum_widget_key, "") or "").strip() ss[sum_value_key] = cur ss.setdefault("llm_out", {}).setdefault(pick_uid, {})["summary"] = cur st.success("Saved to session.") # ---- ② Recommended Actions (KB-driven) ---- st.markdown("**② Recommended Actions (from RA sheet via keywords)**") ra_available = ss.get("ra_keywords", []) if not ss.get("ra_rows", pd.DataFrame()).empty else [] if not ra_available: st.info("No RA sheet loaded. Add an 'RA' sheet with columns: keyword, action.") # ✅ Make suggestion actually select the tags in the multiselect ra_widget_key = f"ra_select_{pick_uid}" # this is the multiselect's key ra_pending_key = f"_pending_ra_{pick_uid}" # pending mailbox (applied BEFORE widget) # Apply pending suggestion before widget instantiation if ra_pending_key in ss: ss[ra_widget_key] = ss.pop(ra_pending_key) or [] c_ra1, c_ra2 = st.columns([1, 1]) # ensure the widget key exists (so first render is empty, no warning) ss.setdefault(ra_widget_key, []) with c_ra1: pick_ra = st.multiselect( "Pick action keywords", options=ra_available, key=ra_widget_key, ) # (optional) show what LLM picked last time last_ra = ss.get(f"_last_ra_suggest_{pick_uid}", []) if last_ra: st.caption("LLM selected: " + ", ".join(last_ra)) with c_ra2: if st.button("🔎 Suggest Action Keywords (OpenAI)", key=f"btn_ra_{pick_uid}"): if not OPENAI_API_KEY: st.error("OPENAI_API_KEY is not set.") elif not ra_available: st.warning("No RA keywords loaded.") else: picked = llm_pick_keywords(ctx, ra_available, kind="action") if not picked: st.warning("No keywords suggested.") else: # ✅ write into pending → rerun so widget reflects it ss[ra_pending_key] = picked st.success(f"Suggested: {', '.join(picked)}") if rerun: rerun() ra_selected_now = ss.get(ra_widget_key, []) or [] ss["ra_selected"] = ra_selected_now # keep your old state key if other code uses it if ra_selected_now and not ss["ra_rows"].empty: show = ss["ra_rows"][ss["ra_rows"]["keyword"].isin(ra_selected_now)].copy() if show.empty: st.info("No rows match the selected keywords.") else: st.write("**Actions**") for kw in ra_selected_now: sub = show[show["keyword"] == kw] if not sub.empty: with st.expander(f"Keyword: {kw} · {len(sub)} item(s)", expanded=True): for _, r in sub.iterrows(): st.markdown(f"- {r['action']}") else: st.caption("Pick 1–3 keywords (or use the suggestion button).") # ---- ③ Relevant Q&A Snippets (KB-driven) ---- st.markdown("**③ Relevant Q&A (from Q&A sheet via keywords)**") qa_available = ss.get("qa_keywords", []) if not ss.get("qa_rows", pd.DataFrame()).empty else [] if not qa_available: st.info("No Q&A sheet loaded. Add a 'Q&A' (or 'QA') sheet with columns: keyword, question, answer.") # ✅ Same fix for Q&A multiselect qa_widget_key = f"qa_select_{pick_uid}" qa_pending_key = f"_pending_qa_{pick_uid}" if qa_pending_key in ss: ss[qa_widget_key] = ss.pop(qa_pending_key) or [] c_q1, c_q2 = st.columns([1, 1]) ss.setdefault(qa_widget_key, []) with c_q1: pick_qa = st.multiselect( "Pick Q&A keywords", options=qa_available, key=qa_widget_key, ) last_qa = ss.get(f"_last_qa_suggest_{pick_uid}", []) if last_qa: st.caption("LLM selected: " + ", ".join(last_qa)) with c_q2: if st.button("🔎 Suggest Q&A Keywords (OpenAI)", key=f"btn_qa_{pick_uid}"): if not OPENAI_API_KEY: st.error("OPENAI_API_KEY is not set.") elif not qa_available: st.warning("No Q&A keywords loaded.") else: picked = llm_pick_keywords(ctx, qa_available, kind="Q&A") if not picked: st.warning("No keywords suggested.") else: ss[qa_pending_key] = picked st.success(f"Suggested: {', '.join(picked)}") if rerun: rerun() qa_selected_now = ss.get(qa_widget_key, []) or [] ss["qa_selected"] = qa_selected_now if qa_selected_now and not ss["qa_rows"].empty: show = ss["qa_rows"][ss["qa_rows"]["keyword"].isin(qa_selected_now)].copy() if show.empty: st.info("No Q&A rows match the selected keywords.") else: for kw in qa_selected_now: sub = show[show["keyword"] == kw] if not sub.empty: with st.expander(f"Keyword: {kw} · {len(sub)} Q&A", expanded=True): for _, r in sub.iterrows(): q = str(r.get("question", "")).strip() a = str(r.get("answer", "")).strip() if q or a: st.markdown(f"**Q:** {q if q else '(no question)'}") st.markdown(f"**A:** {a if a else '(no answer)'}") st.markdown("---") else: st.caption("Pick 1–3 keywords (or use the suggestion button).") # Hotlines from Excel (optional) if ss.get("hotlines"): st.markdown("**Hotlines / External Resources**") for h in ss["hotlines"]: parts = [h.get("name", "")] if h.get("dial"): parts.append(h.get("dial")) elif h.get("url"): parts.append(h.get("url")) if h.get("hours"): parts.append(f"({h['hours']})") if h.get("note"): parts.append(f"- {h['note']}") st.write(" ".join([p for p in parts if p])) st.markdown("---") st.markdown("**Draft a reply → Safety check & rewrite (preset)**") draft = st.text_area( "Your draft (before sending)", height=120, placeholder="(例)昨夜は本当に大変でしたね…。今はどのあたりにいらっしゃいますか?まず安全を一緒に確認させてください。", ) if st.button("🛡️ Apply Preset Rewrite", key=f"rewrite_{pick_uid}"): rewrite = get_active_preset_content().get("rewrite", "").strip() st.text_area("Rewritten (safer)", (rewrite or draft), height=120, key=f"rewritten_{pick_uid}") st.toast("Preset rewrite applied.", icon="✅") # ---- ⏰ Follow-up (per-user) ---- st.markdown("**⏰ Follow-up**") if users: # we already computed pick_uid earlier in Volunteer Assist if "assist_pick_uid" in ss and ss["assist_pick_uid"]: fu_uid = ss["assist_pick_uid"] mode = st.radio("When", ["In X days", "Pick date & time (JST)"], horizontal=True, key=f"fu_mode_{fu_uid}") note = st.text_input("Optional note", key=f"fu_note_{fu_uid}", placeholder="e.g., confirm safety plan / share clinic info") if mode == "In X days": xdays = st.number_input("X days", min_value=1, max_value=60, value=2, step=1, key=f"fu_days_{fu_uid}") if st.button("➕ Add follow-up", key=f"fu_add_days_{fu_uid}"): due = _now_jst() + dt.timedelta(days=int(xdays)) fid = add_followup(fu_uid, due, note) st.success(f"Follow-up added for {due.strftime('%Y-%m-%d %H:%M')} JST (id={fid}).") else: today = _now_jst().date() + dt.timedelta(days=2) d = st.date_input("Date (JST)", value=today, key=f"fu_date_{fu_uid}") t = st.time_input("Time (JST)", value=dt.time(10, 0), key=f"fu_time_{fu_uid}") if st.button("➕ Add follow-up", key=f"fu_add_dt_{fu_uid}"): due_dt = dt.datetime.combine(d, t).replace(tzinfo=JPN_TZ) fid = add_followup(fu_uid, due_dt, note) st.success(f"Follow-up added for {due_dt.strftime('%Y-%m-%d %H:%M')} JST (id={fid}).") # List open follow-ups for this user with resolve buttons open_items = list_user_open_followups(fu_uid) if open_items: st.caption("Open follow-ups for selected user:") for it in open_items: due_local = it["due_iso"] short_note = it.get("note","") cols = st.columns([3,5,2]) cols[0].markdown(f"**Due:** {due_local}") cols[1].markdown(f"**Note:** {short_note or '(no note)'}") if cols[2].button("Resolve", key=f"fu_resolve_{it['id']}"): resolve_followup(it["id"]) st.toast("Follow-up resolved.", icon="✅") if rerun: rerun() else: st.caption("No open follow-ups for this user.") else: st.info("Select a user above to add a follow-up.") st.markdown("---") # ---- 🔎 Conversation Search (Tag or Text) ---- st.subheader("🔎 Conversation Search") # Helpers (local to this UI block) def _tag_options_from_kb() -> list[str]: # Prefer explicit tag sheet later; for now derive from Q&A + RA keywords qa = ss.get("qa_keywords", []) or [] ra = ss.get("ra_keywords", []) or [] uniq = sorted({x.strip() for x in (qa + ra) if str(x).strip()}) return uniq[:500] def _compile_pattern_from_tags(tags: list[str]) -> re.Pattern | None: if not tags: return None # Simple OR of literal tags; if you later add a Tags sheet with custom patterns, plug them here. safe = [re.escape(t) for t in tags if str(t).strip()] if not safe: return None return re.compile(r"(" + "|".join(safe) + r")", flags=re.IGNORECASE) def _compile_pattern_from_text(q: str, mode: str) -> re.Pattern | None: q = str(q or "").strip() if not q: return None if mode == "Exact (literal)": return re.compile(re.escape(q), flags=re.IGNORECASE) # "Any word" → split on whitespace, OR them parts = [re.escape(p) for p in re.split(r"\s+", q) if p.strip()] if not parts: return None return re.compile(r"(" + "|".join(parts) + r")", flags=re.IGNORECASE) def _search_with_context( df: pd.DataFrame, pat: re.Pattern, context: int = 2, max_snippets: int = 30, since_minutes: int | None = None, ) -> list[dict]: """ Returns a list of snippets: { 'user_id':..., 'rows': DataFrame segment, 'hit_idx_in_segment': int } Each snippet is a contiguous slice around a hit within a single user's thread. """ if df.empty or pat is None: return [] # Optional time window work = df.copy() if since_minutes: since = (datetime.utcnow() - timedelta(minutes=int(since_minutes))).replace(tzinfo=timezone.utc) tsdt = pd.to_datetime(work["ts"], utc=True, errors="coerce") work = work[tsdt >= since] # Sort for stable context slicing work = work.sort_values(by=["user_id", "ts"]).reset_index(drop=True) out: list[dict] = [] for uid, g in work.groupby("user_id", sort=False): g = g.reset_index(drop=True) texts = g["text"].astype(str).fillna("") for i, txt in enumerate(texts): if not txt: continue if pat.search(txt): start = max(0, i - context) end = min(len(g) - 1, i + context) seg = g.iloc[start:end+1].copy() out.append({ "user_id": uid, "rows": seg, "hit_idx_in_segment": i - start, }) if len(out) >= max_snippets: return out return out # UI controls scope_col, ctx_col, lim_col = st.columns([1.1, 0.9, 0.9]) with scope_col: mode = st.radio("Mode", ["Text", "Tag"], horizontal=True, key="conv_search_mode") with ctx_col: ctx_messages = st.slider("Context window (msgs)", min_value=1, max_value=5, value=2, step=1) with lim_col: max_snips = st.slider("Max snippets", min_value=5, max_value=100, value=30, step=5) timewin = st.selectbox("Time window", ["All", "Last day", "Last week", "Last month"], index=1) since_map = {"All": None, "Last day": 60*24, "Last week": 60*24*7, "Last month": 60*24*30} since_minutes = since_map.get(timewin) pat = None if mode == "Tag": tag_opts = _tag_options_from_kb() if not tag_opts: st.info("タグ情報がありません(Q&A/RAのキーワードを読み込むとタグ検索が使えます)。") tag_select = [] else: tag_select = st.multiselect("Tags (1–3推奨)", options=tag_opts, default=[], key="conv_search_tags") do_search = st.button("🔎 Search by Tag") if do_search: pat = _compile_pattern_from_tags(tag_select) else: text_q = st.text_input("Text query", value="", placeholder="例:育児 不安 眠れない 里帰り など…") text_mode = st.radio("Match", ["Any word", "Exact (literal)"], horizontal=True, key="conv_search_textmode") do_search = st.button("🔎 Search Text") if do_search: pat = _compile_pattern_from_text(text_q, text_mode) # Run search if do_search: if pat is None: st.warning("検索条件を入力してください。") else: hits = _search_with_context(df_all, pat, context=ctx_messages, max_snippets=max_snips, since_minutes=since_minutes) if not hits: st.info("該当する会話は見つかりませんでした。") else: st.write(f"**{len(hits)} 件ヒット**(最大 {max_snips} 件を表示)") # Render each snippet as a small conversation block for j, snip in enumerate(hits, start=1): uid = snip["user_id"] seg = snip["rows"] hit_rel = snip["hit_idx_in_segment"] # Header header = f"[{j}] {resolve_user_label(uid, df_all)}" t0 = str(seg.iloc[0]["ts"]) t1 = str(seg.iloc[-1]["ts"]) st.markdown(f"**{header}** `{t0} 〜 {t1}`") # Show bubbles (reuse your chat style) for k, r in seg.iterrows(): role = str(r.get("role","user")).lower().strip() text = str(r.get("text","")).strip() ts_str = str(r.get("ts","")) is_hit = (k == seg.index[0] + hit_rel) # Highlight the matching segment lightly if is_hit and text: try: text = re.sub(pat, r"**\g<0>**", text) except Exception: pass is_system = False msg_type = "" # not stored in df_all; we only have text/role effective_text = text if not effective_text and msg_type: effective_text = f"[{msg_type}]" is_system = True if is_system: align = "center"; bubble_cls = "system"; meta = ts_str else: align = "left" if role == "user" else "right" bubble_cls = "user" if role == "user" else "account" meta = ts_str st.markdown( f'
' f'
{effective_text}
{meta}
' f'
', unsafe_allow_html=True ) st.markdown("---") # ── 📡 Messages (in memory) — moved to the lowest part (under rewrite) ───── st.subheader("📡 Messages Export") if df_all.empty: st.info("No messages loaded. Use Cloud Pull or CSV import from the sidebar.") else: # Attach nickname column by mapping from ss.user_index idx_map = ss.get("user_index", {}) nick_col = df_all["user_id"].map(lambda u: (idx_map.get(u, {}) or {}).get("nickname", "") if isinstance(u, str) else "") view = df_all.copy() insert_pos = view.columns.get_loc("display_name")+1 if "display_name" in view.columns else 2 view.insert(insert_pos, "nickname", nick_col) view = view[["ts","user_id","display_name","nickname","role","text","risk_level"]] st.dataframe(view.tail(300), use_container_width=True, height=360) st.caption("⚠️ Demo only. Not a clinical tool. For emergencies in Japan, call local hotlines or 119.") with tab_stats: render_stats_tab(db_all_messages(), ss)