| import os |
| os.environ.setdefault("NUMBA_CACHE_DIR", "/tmp/numba_cache") |
|
|
| from typing import List, Tuple |
| import json |
| import pandas as pd |
| import gradio as gr |
|
|
| from lib.pipeline import analyze_texts |
| from lib.viz import scatter_2d |
| from lib.utils import now_utc_str, load_sample_df |
|
|
| APP_NAME = "🔎 SNS Analyzer v3(クラスタ+要約+感情+重複排除)— OpenAI API" |
|
|
|
|
| |
| def _file_path(file_obj) -> str | None: |
| |
| if file_obj is None: |
| return None |
| if isinstance(file_obj, str): |
| return file_obj |
| if hasattr(file_obj, "name"): |
| return file_obj.name |
| if isinstance(file_obj, dict): |
| return file_obj.get("path") or file_obj.get("name") |
| return None |
|
|
| def read_csv_flex(file_obj) -> pd.DataFrame: |
| path = _file_path(file_obj) |
| if not path or not os.path.exists(path): |
| raise RuntimeError("アップロードされた CSV のパスが取得できませんでした。") |
| encodings = ["utf-8", "utf-8-sig", "cp932", "shift_jis", "latin1"] |
| seps = [None, ",", "\t", ";", "|"] |
| last_err = None |
| for enc in encodings: |
| for sep in seps: |
| try: |
| df = pd.read_csv(path, encoding=enc, sep=sep, engine="python", on_bad_lines="skip") |
| if isinstance(df, pd.DataFrame) and df.shape[1] >= 1: |
| return df |
| except Exception as e: |
| last_err = e |
| continue |
| raise RuntimeError(f"CSV 読み込みに失敗しました(encoding/区切りの自動判定に失敗): {last_err}") |
|
|
| def guess_text_col(df: pd.DataFrame, hint: str | None) -> str: |
| if hint and hint in df.columns: |
| return hint |
| candidates = [ |
| "text","TEXT","Text","review","comment","content","body","message", |
| "本文","内容","テキスト","レビュー","クチコミ","コメント" |
| ] |
| for c in candidates: |
| if c in df.columns: |
| return c |
| for c in df.columns: |
| if df[c].dtype == "object": |
| return c |
| return df.columns[0] |
| |
|
|
|
|
| def run_analysis( |
| text_blob: str, |
| k: int, |
| max_items: int, |
| seed: int, |
| dedup_on: bool, |
| dedup_thr: float, |
| csv_file, |
| csv_text_col: str, |
| out_lang_ui: str, |
| ) -> Tuple[str, any, pd.DataFrame, str]: |
| lang_map = {"日本語": "ja", "English": "en", "自動": "auto"} |
| out_lang = lang_map.get(out_lang_ui, "ja") |
|
|
| |
| texts: List[str] = [] |
| if csv_file is not None: |
| try: |
| df = read_csv_flex(csv_file) |
| col = guess_text_col(df, csv_text_col.strip() if csv_text_col else None) |
| if col not in df.columns: |
| return f"⚠️ CSVに列 `{col}` が見つかりません。列名を指定してください。", None, pd.DataFrame(), "" |
| texts = [str(s).strip() for s in df[col].astype(str).fillna("").tolist() if str(s).strip()] |
| except Exception as e: |
| return f"⚠️ CSV読み込みでエラー: {e}", None, pd.DataFrame(), "" |
| else: |
| texts = [t.strip() for t in (text_blob or "").split("\n") if t.strip()] |
|
|
| if max_items and max_items > 0: |
| texts = texts[:max_items] |
| if len(texts) < 2: |
| return "⚠️ 2件以上のテキストを入力してください。", None, pd.DataFrame(), "" |
|
|
| |
| result = analyze_texts( |
| texts=texts, |
| k=k, |
| random_state=seed, |
| dedup=dedup_on, |
| dedup_threshold=dedup_thr, |
| output_lang=out_lang, |
| ) |
|
|
| |
| lines = [] |
| if result.get("dedup", {}).get("removed", 0) > 0: |
| kept = result["dedup"]["kept"] |
| removed = result["dedup"]["removed"] |
| lines.append(f"> 近似重複を {removed} 件除外(残り {kept} 件)\n") |
|
|
| for c in result["clusters"]: |
| title = c.get("label") or c.get("summary", {}).get("title") or f"クラスタ {c['id']}" |
| lines.append(f"### クラスタ {c['id']} (size={c['size']})") |
| lines.append(f"**{title}**") |
| if c.get("summary", {}).get("overview"): |
| lines.append(c["summary"]["overview"]) |
| if "sentiment" in c: |
| s = c["sentiment"] |
| lines.append(f"感情比率 — 👍 {s['positive']:.0%} / 😐 {s['neutral']:.0%} / 👎 {s['negative']:.0%}") |
| if c.get("summary", {}).get("actions"): |
| lines.append("- **推奨アクション**") |
| for a in c["summary"]["actions"]: |
| lines.append(f" - {a}") |
| lines.append("") |
|
|
| fig = scatter_2d(result["umap"], result["labels"]) |
|
|
| df_out = pd.DataFrame([ |
| { |
| "id": c["id"], |
| "size": c["size"], |
| "label": (c.get("label") or c.get("summary", {}).get("title") or f"クラスタ {c['id']}"), |
| "top_terms": ", ".join(c["top_terms"]), |
| } |
| for c in result["clusters"] |
| ]) |
|
|
| dl_json = json.dumps(result, ensure_ascii=False, indent=2) |
| return "\n".join(lines), fig, df_out, dl_json |
|
|
|
|
| def on_load_sample(): |
| df = load_sample_df() |
| return "\n".join(df["text"].astype(str).tolist()) |
|
|
|
|
| def apply_labels(json_str: str, edited_df: pd.DataFrame) -> str: |
| if not json_str: |
| return json_str |
| try: |
| payload = json.loads(json_str) |
| except Exception: |
| return json_str |
| if not isinstance(edited_df, pd.DataFrame) or "id" not in edited_df or "label" not in edited_df: |
| return json_str |
| id2label = {int(r["id"]): str(r["label"]) for _, r in edited_df.iterrows() if str(r["label"]).strip() != ""} |
| for c in payload.get("clusters", []): |
| cid = int(c["id"]) |
| if cid in id2label: |
| c["label"] = id2label[cid] |
| c.setdefault("summary", {}) |
| c["summary"]["title"] = id2label[cid] |
| return json.dumps(payload, ensure_ascii=False, indent=2) |
|
|
|
|
| def to_json_file(json_str: str): |
| if not json_str: |
| return gr.File.update(visible=False) |
| path = f"/tmp/result_{now_utc_str()}.json" |
| with open(path, "w", encoding="utf-8") as f: |
| f.write(json_str) |
| return gr.File.update(value=path, visible=True) |
|
|
|
|
| def to_csv_file(json_str: str): |
| if not json_str: |
| return gr.File.update(visible=False) |
| try: |
| payload = json.loads(json_str) |
| texts = payload.get("texts_kept", []) |
| labels = payload.get("labels", []) |
| cid2name = {int(c["id"]): (c.get("label") or c.get("summary", {}).get("title") or f"クラスタ {c['id']}") for c in payload.get("clusters", [])} |
| rows = [] |
| for i, t in enumerate(texts): |
| cid = int(labels[i]) |
| rows.append({"text": t, "cluster_id": cid, "cluster_label": cid2name.get(cid, f"クラスタ {cid}")}) |
| df = pd.DataFrame(rows) |
| path = f"/tmp/result_{now_utc_str()}.csv" |
| df.to_csv(path, index=False) |
| return gr.File.update(value=path, visible=True) |
| except Exception: |
| return gr.File.update(visible=False) |
|
|
|
|
| with gr.Blocks(title=APP_NAME, theme=gr.themes.Soft()) as demo: |
| gr.Markdown(f"# {APP_NAME}") |
|
|
| with gr.Row(): |
| with gr.Column(scale=6): |
| text_in = gr.Textbox(label="テキスト(1行1件)", lines=14, placeholder="1行1件で貼り付け。CSVを使う場合は下でファイル選択。") |
| with gr.Row(): |
| csv_in = gr.File(label="CSVをアップロード(任意)", file_types=[".csv"]) |
| csv_text_col = gr.Textbox(label="CSVのテキスト列名(空なら自動判定)", placeholder="text") |
| with gr.Row(): |
| k_in = gr.Slider(label="クラスタ数(0=自動)", minimum=0, maximum=30, step=1, value=0) |
| max_items_in = gr.Slider(label="最大件数", minimum=0, maximum=2000, step=50, value=0) |
| seed_in = gr.Slider(label="乱数Seed", minimum=0, maximum=9999, step=1, value=42) |
| with gr.Row(): |
| dedup_in = gr.Checkbox(label="近似重複を除外する", value=True) |
| dedup_thr_in = gr.Slider(label="重複しきい値(コサイン類似 ≥)", minimum=0.80, maximum=0.99, step=0.01, value=0.95) |
| with gr.Row(): |
| lang_in = gr.Radio(label="出力言語", choices=["日本語", "English", "自動"], value="日本語") |
| with gr.Row(): |
| btn_sample = gr.Button("🧪 デモデータを読み込む") |
| btn_run = gr.Button("🧭 解析する", variant="primary") |
|
|
| with gr.Column(scale=6): |
| summary_out = gr.Markdown(label="クラスタ要約") |
|
|
| fig_out = gr.Plot(label="2D配置(UMAP)") |
| table_out = gr.Dataframe(label="クラスタ編集(ラベルは編集可)", interactive=True, headers=["id", "size", "label", "top_terms"]) |
| json_out = gr.JSON(label="結果JSON(内部)", visible=False) |
|
|
| with gr.Row(): |
| btn_apply = gr.Button("✏️ ラベル変更を適用") |
| file_json = gr.File(label="結果JSONダウンロード", visible=False) |
| file_csv = gr.File(label="結果CSVダウンロード", visible=False) |
|
|
| btn_sample.click(on_load_sample, outputs=text_in) |
|
|
| outputs = [summary_out, fig_out, table_out, json_out] |
| btn_run.click( |
| run_analysis, |
| inputs=[text_in, k_in, max_items_in, seed_in, dedup_in, dedup_thr_in, csv_in, csv_text_col, lang_in], |
| outputs=outputs, |
| api_name="analyze", |
| ).then( |
| to_json_file, inputs=json_out, outputs=file_json |
| ).then( |
| to_csv_file, inputs=json_out, outputs=file_csv |
| ) |
|
|
| btn_apply.click( |
| apply_labels, |
| inputs=[json_out, table_out], |
| outputs=json_out, |
| ).then( |
| to_json_file, inputs=json_out, outputs=file_json |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860"))) |
|
|