Spaces:
Sleeping
Sleeping
| import os | |
| import base64 | |
| import traceback | |
| import streamlit as st | |
| import numpy as np | |
| import pickle | |
| import tensorflow as tf | |
| from tensorflow.keras.preprocessing.sequence import pad_sequences | |
| from tensorflow.keras import layers | |
| from tensorflow.keras.models import load_model | |
| from gensim.models import FastText | |
| import nltk | |
| import re | |
| from nltk.corpus import stopwords | |
| from nltk.tokenize import TreebankWordTokenizer | |
| # ------------------- Config ------------------- | |
| MODEL_PATH = "multi_task_bilstm_attention.h5" | |
| FASTTEXT_PATH = "fasttext_domain.model" | |
| TOKENIZER_PKL = "tokenizer.pkl" | |
| LE_TYPE_PKL = "le_type.pkl" | |
| LE_QUEUE_PKL = "le_queue.pkl" | |
| MLB_PKL = "mlb.pkl" | |
| META_PKL = "hierarchy_meta.pkl" | |
| MAX_LEN = 120 | |
| # ------------------- NLTK ------------------- | |
| NLTK_DIR = "/root/nltk_data" | |
| STOPWORDS_DIR = os.path.join(NLTK_DIR, "corpora", "stopwords") | |
| # Create main dir if missing | |
| os.makedirs(NLTK_DIR, exist_ok=True) | |
| # Download only if NOT already present | |
| if not os.path.exists(STOPWORDS_DIR): | |
| nltk.download("stopwords", download_dir=NLTK_DIR) | |
| # Punkt tokenizer | |
| if not os.path.exists(os.path.join(NLTK_DIR, "tokenizers", "punkt")): | |
| nltk.download("punkt", download_dir=NLTK_DIR) | |
| # Load safely | |
| stop_words = set(stopwords.words("english")) | |
| tokenizer_nltk = TreebankWordTokenizer() | |
| try: _ = nltk.word_tokenize("test") | |
| except: nltk.download("punkt") | |
| stop_words = set(stopwords.words("english")) | |
| tokenizer_nltk = TreebankWordTokenizer() | |
| def clean_text(text): | |
| text = str(text) | |
| text = re.sub(r"<.*?>", " ", text) | |
| text = re.sub(r"[^A-Za-z0-9 ]", " ", text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text.lower() | |
| def preprocess_text(text): | |
| toks = tokenizer_nltk.tokenize(clean_text(text)) | |
| toks = [t for t in toks if t not in stop_words and len(t) > 1] | |
| return " ".join(toks) | |
| # ------------------- Custom Attention ------------------- | |
| class AttentionLayer(layers.Layer): | |
| def build(self, input_shape): | |
| self.W = self.add_weight(shape=(input_shape[-1], input_shape[-1]), initializer="glorot_uniform", trainable=True) | |
| self.v = self.add_weight(shape=(input_shape[-1],), initializer="glorot_uniform", trainable=True) | |
| super().build(input_shape) | |
| def call(self, x): | |
| u = tf.tanh(tf.tensordot(x, self.W, axes=1)) | |
| a = tf.nn.softmax(tf.tensordot(u, self.v, axes=1), axis=1) | |
| return tf.reduce_sum(x * tf.expand_dims(a, -1), axis=1) | |
| # ------------------- Safe Loaders ------------------- | |
| def safe_pickle(p): | |
| return pickle.load(open(p, "rb")) if os.path.exists(p) else None | |
| def safe_model(p): | |
| if not os.path.exists(p): return None | |
| with tf.keras.utils.custom_object_scope({"AttentionLayer": AttentionLayer}): | |
| return load_model(p, compile=False) | |
| def safe_fasttext(p): | |
| return FastText.load(p) if os.path.exists(p) else None | |
| tokenizer = safe_pickle(TOKENIZER_PKL) | |
| le_type = safe_pickle(LE_TYPE_PKL) | |
| le_queue = safe_pickle(LE_QUEUE_PKL) | |
| mlb = safe_pickle(MLB_PKL) | |
| meta = safe_pickle(META_PKL) | |
| model = safe_model(MODEL_PATH) | |
| fasttext = safe_fasttext(FASTTEXT_PATH) | |
| if meta is None: | |
| type_queue_mask = None; type_queue_tag_mask = None; best_thr = 0.5 | |
| else: | |
| type_queue_mask = meta.get("type_queue_mask", None) | |
| type_queue_tag_mask = meta.get("type_queue_tag_mask", None) | |
| best_thr = float(meta.get("best_thr", 0.5)) | |
| # Fallbacks | |
| class DummyLE: | |
| def inverse_transform(self, X): return [str(int(x)) for x in X] | |
| class DummyMLB: | |
| def inverse_transform(self, X): return [tuple()] | |
| if tokenizer is None: | |
| from tensorflow.keras.preprocessing.text import Tokenizer | |
| tokenizer = Tokenizer(num_words=20000, oov_token="<OOV>") | |
| if le_type is None: le_type = DummyLE() | |
| if le_queue is None: le_queue = DummyLE() | |
| if mlb is None: mlb = DummyMLB() | |
| # ------------------- Inference ------------------- | |
| def infer(text): | |
| if model is None: raise RuntimeError("Model not loaded") | |
| seq = tokenizer.texts_to_sequences([preprocess_text(text)]) | |
| seq = pad_sequences(seq, maxlen=MAX_LEN) | |
| extra = np.zeros((1,2), dtype=np.int32) | |
| preds = model.predict([seq, extra], verbose=0) if len(model.inputs) > 1 else model.predict(seq, verbose=0) | |
| if isinstance(preds, (list,tuple)): | |
| p_type, p_queue, p_tags = preds[0][0], preds[1][0], preds[2][0] | |
| else: | |
| arr = preds[0]; n=len(arr); t=max(1,n//3) | |
| p_type, p_queue, p_tags = arr[:t], arr[t:2*t], arr[2*t:] | |
| t_idx = np.argmax(p_type) | |
| type_lbl = le_type.inverse_transform([t_idx])[0] | |
| q_idx = np.argmax(p_queue) | |
| queue_lbl = le_queue.inverse_transform([q_idx])[0] | |
| if type_queue_tag_mask is not None: | |
| mask = type_queue_tag_mask[t_idx, q_idx] | |
| mod = p_tags * mask if mask.sum() != 0 else p_tags | |
| else: | |
| mod = p_tags | |
| pred_bin = (mod >= best_thr).astype(int).reshape(1,-1) | |
| try: tags = mlb.inverse_transform(pred_bin)[0] | |
| except: tags = () | |
| return type_lbl, queue_lbl, list(tags) | |
| # ------------------- UI ------------------- | |
| st.set_page_config(page_title="Multilingual Ticket Classification") | |
| # Background + UI styling + BLACK fonts | |
| if os.path.exists("bg.jpg"): | |
| b64 = base64.b64encode(open("bg.jpg","rb").read()).decode() | |
| st.markdown(f""" | |
| <style> | |
| .stApp {{ | |
| background-image: url("data:image/jpg;base64,{b64}"); | |
| background-size: cover; | |
| }} | |
| * {{ color: black !important; }} | |
| .card {{ | |
| background: rgba(255,255,255,0.92); | |
| border-radius: 12px; | |
| padding: 22px; | |
| }} | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.markdown("<h1 style='text-align:center;'>Multilingual Ticket Classification</h1>", unsafe_allow_html=True) | |
| st.markdown("<div class='card'>", unsafe_allow_html=True) | |
| message = st.text_area("Enter ticket message:", height=200) | |
| if st.button("Predict"): | |
| if not message.strip(): | |
| st.warning("Please enter a ticket message.") | |
| else: | |
| try: | |
| t, q, tg = infer(message) | |
| st.subheader("TYPE") | |
| st.success(t) | |
| st.subheader("QUEUE") | |
| st.success(q) | |
| st.subheader("TAGS") | |
| st.success(", ".join(tg) if tg else "No tags predicted.") | |
| except Exception: | |
| st.error("Prediction failed — model or artifacts missing.") | |
| st.text(traceback.format_exc()) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| # Invisible debug — exists internally but 100% hidden | |
| st.markdown(""" | |
| <style> | |
| div[data-testid="stExpander"] {visibility: hidden; height: 0px;} | |
| </style>s | |
| """, unsafe_allow_html=True) | |
| with st.expander("debug_info_hidden"): | |
| st.write("hidden diagnostics active") | |