Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| os.environ["GRADIO_HOT_RELOAD"] = "0" # avoid hot-reload scanners on Spaces | |
| from pathlib import Path | |
| import json | |
| import tempfile | |
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| import tensorflow as tf | |
| ROOT = Path(__file__).parent | |
| WEIGHTS_PATH = ROOT / "kindle_bilstm.weights.h5" | |
| VEC_CONFIG_PATH = ROOT / "vectorizer_config.json" | |
| VEC_VOCAB_PATH = ROOT / "vectorizer_vocab.txt" | |
| # proposal-aligned triage thresholds | |
| POS_TH = 0.60 | |
| NEG_TH = 0.40 | |
| def triage(p_pos: float): | |
| if p_pos >= POS_TH: | |
| return "Positive", "Track" | |
| if p_pos <= NEG_TH: | |
| return "Negative", "Escalate" | |
| return "Uncertain", "Manual review" | |
| # ------------------------- | |
| # FIXED vectorizer loader | |
| # ------------------------- | |
| def load_vectorizer(): | |
| # 1) Load saved config + vocab | |
| with open(VEC_CONFIG_PATH, "r", encoding="utf-8") as f: | |
| cfg = json.load(f) | |
| with open(VEC_VOCAB_PATH, "r", encoding="utf-8") as f: | |
| vocab = [line.rstrip("\n") for line in f] | |
| # 2) Patch: remove dtype policy objects that break HF/TF deserialization | |
| # Your error was: TextVectorization dtype became a DTypePolicy dict. | |
| # We force it back to string and drop policy-related keys. | |
| cfg.pop("dtype", None) # sometimes present | |
| cfg.pop("dtype_policy", None) # sometimes present | |
| cfg.pop("autocast", None) # sometimes present | |
| # Force correct dtype for TextVectorization | |
| cfg["dtype"] = "string" | |
| # 3) Rebuild layer + set vocab (must match training) | |
| vec = tf.keras.layers.TextVectorization.from_config(cfg) | |
| vec.set_vocabulary(vocab) | |
| return vec | |
| def build_model(vocab_size: int, seq_len: int): | |
| # IMPORTANT: token_ids dtype must match what TextVectorization outputs on HF. | |
| # It outputs int64 by default in TF2.15+. | |
| token_ids = tf.keras.Input(shape=(seq_len,), dtype=tf.int64, name="token_ids") | |
| x = tf.keras.layers.Embedding(vocab_size, 128, mask_zero=True, name="embedding")(token_ids) | |
| x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64), name="bilstm")(x) | |
| x = tf.keras.layers.Dropout(0.3)(x) | |
| x = tf.keras.layers.Dense(64, activation="relu")(x) | |
| x = tf.keras.layers.Dropout(0.3)(x) | |
| out = tf.keras.layers.Dense(1, activation="sigmoid", name="p_positive")(x) | |
| return tf.keras.Model(token_ids, out, name="kindle_bilstm") | |
| def load_pipeline(): | |
| if not WEIGHTS_PATH.exists(): | |
| return None, None, f"NOT READY - missing {WEIGHTS_PATH.name}" | |
| if not (VEC_CONFIG_PATH.exists() and VEC_VOCAB_PATH.exists()): | |
| return None, None, "NOT READY - missing vectorizer_config.json / vectorizer_vocab.txt" | |
| # Load vectorizer | |
| try: | |
| vec = load_vectorizer() | |
| # output_sequence_length is stored in config; fallback to 250 | |
| seq_len = int(getattr(vec, "output_sequence_length", 250)) | |
| vocab_size = int(vec.vocabulary_size()) | |
| if vocab_size <= 0: | |
| with open(VEC_VOCAB_PATH, "r", encoding="utf-8") as f: | |
| vocab_size = sum(1 for _ in f) | |
| except Exception as e: | |
| return None, None, f"NOT READY - vectorizer load failed: {e}" | |
| # Build model + load weights | |
| try: | |
| model = build_model(vocab_size=vocab_size, seq_len=seq_len) | |
| _ = model(tf.zeros((1, seq_len), dtype=tf.int64), training=False) # build vars | |
| model.load_weights(WEIGHTS_PATH) | |
| except Exception as e: | |
| return None, None, f"NOT READY - weights load failed: {e}" | |
| return model, vec, "READY" | |
| MODEL, VEC, STATUS = load_pipeline() | |
| def predict_one(review: str): | |
| if MODEL is None or VEC is None: | |
| return STATUS, "", "", "", "App not ready. Upload required files." | |
| review = (review or "").strip() | |
| if not review: | |
| return STATUS, "", "", "", "Please enter a review." | |
| # TextVectorization expects (batch, 1) for shape=(1,) string input in your training setup | |
| x_text = tf.constant([[review]], dtype=tf.string) # (1,1) | |
| x_ids = VEC(x_text) # (1, seq_len) | |
| p = float(MODEL.predict(x_ids, verbose=0).reshape(-1)[0]) | |
| label, action = triage(p) | |
| return STATUS, f"{p:.4f}", label, action, "" | |
| def predict_batch_csv(csv_file): | |
| if MODEL is None or VEC is None: | |
| raise RuntimeError("App not ready. Upload required files.") | |
| df = pd.read_csv(csv_file.name) | |
| # pick a likely text column | |
| text_col = None | |
| for c in ["reviewText", "text", "review", "content", "summary"]: | |
| if c in df.columns: | |
| text_col = c | |
| break | |
| if text_col is None: | |
| # fallback: first object column | |
| for c in df.columns: | |
| if df[c].dtype == object: | |
| text_col = c | |
| break | |
| if text_col is None: | |
| raise ValueError("No text column found. Add a column like reviewText or text.") | |
| texts = df[text_col].fillna("").astype(str).tolist() | |
| # (batch,1) string tensor | |
| x_text = tf.constant([[t] for t in texts], dtype=tf.string) | |
| x_ids = VEC(x_text) | |
| probs = MODEL.predict(x_ids, verbose=0).reshape(-1) | |
| labels, actions = [], [] | |
| for pp in probs: | |
| lab, act = triage(float(pp)) | |
| labels.append(lab) | |
| actions.append(act) | |
| out = df.copy() | |
| out["p_positive"] = probs | |
| out["predicted_label"] = labels | |
| out["action"] = actions | |
| preview_df = out.head(50) | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") | |
| out.to_csv(tmp.name, index=False) | |
| return preview_df, tmp.name | |
| with gr.Blocks(title="Kindle Review Sentiment") as demo: | |
| gr.Markdown( | |
| """# Kindle Review Sentiment (BiLSTM) | |
| Paste a review and click **Predict**. | |
| Triage rules: | |
| - **P(Positive) ≥ 0.60 → Track** | |
| - **P(Positive) ≤ 0.40 → Escalate** | |
| - Otherwise → **Manual review** | |
| """ | |
| ) | |
| mode_box = gr.Textbox(label="Model status", value=STATUS, interactive=False) | |
| review_in = gr.Textbox(label="Review text", lines=5, placeholder="Type or paste a Kindle review here...") | |
| btn = gr.Button("Predict") | |
| p_box = gr.Textbox(label="P(Positive)", interactive=False) | |
| label_box = gr.Textbox(label="Predicted label", interactive=False) | |
| action_box = gr.Textbox(label="Action", interactive=False) | |
| msg_box = gr.Textbox(label="Message", interactive=False) | |
| btn.click(predict_one, inputs=review_in, outputs=[mode_box, p_box, label_box, action_box, msg_box]) | |
| gr.Markdown("## Batch prediction (CSV)") | |
| csv_file = gr.File(label="Upload CSV", file_types=[".csv"]) | |
| batch_btn = gr.Button("Run batch prediction") | |
| preview = gr.Dataframe(label="Preview (first 50 rows)", interactive=False) | |
| download = gr.File(label="Download predictions CSV") | |
| batch_btn.click(predict_batch_csv, inputs=csv_file, outputs=[preview, download]) | |
| if __name__ == "__main__": | |
| demo.launch() | |