import torch.nn as nn import torch import torchtext.vocab as vocab import torch.nn.functional as F import pandas as pd import numpy as np from underthesea import word_tokenize import unicodedata import re from tqdm import tqdm import gradio as gr from huggingface_hub import hf_hub_download import io import matplotlib.pyplot as plt from docx import Document # Device configuration: consistent device for model and tensors device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Dictionary for common Vietnamese slang/abbreviations abbreviations = { "ko": "không", "sp": "sản phẩm", "k": "không", "m": "mình", "đc": "được", "dc": "được", "h": "giờ", "trloi": "trả lời", "cg": "cũng", "bt": "bình thường", "dt": "điện thoại", "mt": "máy tính", "m.n": "mọi người" # add more slang mappings } # Regex patterns url_pattern = r"http\S+|www\S+" # URLs user_pattern = r"@\w+" # usernames emoji_pattern = re.compile( "[" # start "\U0001F600-\U0001F64F" # emoticons "\U0001F300-\U0001F5FF" # symbols & pictographs "\U0001F680-\U0001F6FF" # transport & map symbols "\U0001F1E0-\U0001F1FF" # flags "]+", flags=re.UNICODE) emoticon_pattern = r"[:;=8][\-o\*']?[\)\]\(\[dDpP/:}\{@\|\\]" # emoticons repeat_pattern = re.compile(r"(.)\1{2,}") # 3 or more repeats def clean_text(text: str) -> str: # Unicode normalization text = str(text) text = unicodedata.normalize('NFC', text) # Chuẩn hoá Unicode rõ ràng (căn bản) # Lowercase text = text.lower() # Remove URLs and usernames text = re.sub(url_pattern, '', text) text = re.sub(user_pattern, '', text) # Remove emojis and emoticons text = emoji_pattern.sub(' ', text) text = re.sub(emoticon_pattern, ' ', text) # Expand common abbreviations def expand(match): word = match.group(0) return abbreviations.get(word, word) if abbreviations: pattern = re.compile(r"\b(" + "|".join(map(re.escape, abbreviations.keys())) + r")\b") text = pattern.sub(expand, text) # Remove repeated characters (e.g., "quaaa" -> "qua" ) text = repeat_pattern.sub(r"\1", text) # Remove punctuation (keep Vietnamese letters & numbers) text = re.sub(r"[^\w\s\u00C0-\u024F]", ' ', text) # Remove extra whitespace text = re.sub(r"\s+", ' ', text).strip() return text class Vocabulary: def __init__(self): self.word2id = dict() self.word2id[''] = 0 # Pad Token self.word2id[''] = 1 # Unknown Token self.unk_id = self.word2id[''] self.id2word = {v: k for k, v in self.word2id.items()} def __getitem__(self, word): return self.word2id.get(word, self.unk_id) def __contains__(self, word): return word in self.word2id def __len__(self): return len(self.word2id) def id2word(self, word_index): return self.id2word[word_index] def add(self, word): if word not in self: word_index = self.word2id[word] = len(self.word2id) self.id2word[word_index] = word return word_index else: return self[word] @staticmethod def tokenize_corpus(corpus): print("Tokenize the corpus...") tokenized_corpus = list() for document in tqdm(corpus): tokenized_document = [word.replace(" ", "_") for word in word_tokenize(document)] tokenized_corpus.append(tokenized_document) return tokenized_corpus def corpus_to_tensor(self, corpus, is_tokenized=False): if is_tokenized: tokenized_corpus = corpus else: tokenized_corpus = self.tokenize_corpus(corpus) indicies_corpus = list() for document in tqdm(tokenized_corpus): indicies_document = torch.tensor(list(map(lambda word: self[word], document)), dtype=torch.int64) indicies_corpus.append(indicies_document) return indicies_corpus def tensor_to_corpus(self, tensor): corpus = list() for indicies in tqdm(tensor): document = list(map(lambda index: self.id2word[index.item()], indicies)) corpus.append(document) return corpus class RNN(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, n_layers, bidirectional, dropout, pad_idx, n_classes): super().__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx) self.rnn = nn.LSTM( embedding_dim, hidden_dim, num_layers=n_layers, bidirectional=bidirectional, dropout=dropout if n_layers > 1 else 0 ) self.dropout = nn.Dropout(dropout) self.fc = nn.Linear(hidden_dim * (2 if bidirectional else 1), n_classes) def forward(self, text, text_lengths): embedded = self.dropout(self.embedding(text)) packed_embedded = nn.utils.rnn.pack_padded_sequence( embedded, text_lengths.to('cpu'), enforce_sorted=False ) packed_output, (hidden, cell) = self.rnn(packed_embedded) if self.rnn.bidirectional: hidden = self.dropout(torch.cat((hidden[-2], hidden[-1]), dim=1)) else: hidden = self.dropout(hidden[-1]) return self.fc(hidden) model_path = hf_hub_download(repo_id="Di12/sentiment_analysis", filename="model.pt", repo_type="space") embedding_path = hf_hub_download(repo_id="Di12/sentiment_analysis", filename="vi_word2vec_reduced.txt", repo_type="space") # Load pretrained embeddings and build vocab word_embedding = vocab.Vectors( name=embedding_path, unk_init=torch.Tensor.normal_ ) vocab = Vocabulary() for w in word_embedding.stoi.keys(): vocab.add(w) # Model hyperparams input_dim = word_embedding.vectors.shape[0] embedding_dim = 100 hidden_dim = 8 n_layers = 2 bidirectional = False dropout = 0.3 pad_idx = vocab[""] unk_idx = vocab[""] n_classes = 3 label_map = {0: 'tiêu cực', 1: 'bình thường', 2: 'tích cực'} def load_model(path: str): model = RNN(input_dim, embedding_dim, hidden_dim, n_layers, bidirectional, dropout, pad_idx, n_classes) model.load_state_dict(torch.load(path, map_location=device)) model.to(device) model.eval() return model model = load_model(model_path) seed_aspects = { 'vận_chuyển': ['giao hàng', 'giao', 'ship', 'nhận hàng', 'vận chuyển'], 'đóng_gói': ['đóng gói', 'đóng_gói', 'gói', 'bao_bì'], 'sản_phẩm': ['sách', 'sản_phẩm', 'chất_lượng'] } def tokenize_underthesea(text): """ underthesea.word_tokenize returns a string or tokens joined by spaces. We split to get list of tokens. """ toks = word_tokenize(text) # underthesea if isinstance(toks, str): toks = toks.split() return toks def extract_aspects_from_text(text, seed_aspects, tokenizer=tokenize_underthesea): """ Trả về: tokens: list[str] found: list of tuples (asp_key, matched_phrase, start_idx, end_idx) Hợp nhất: - token-sequence matching (like trước) - fallback substring matching trên clean_text nếu token-match không bắt được """ # 1) Normalize + tokenize cleaned = clean_text(text) tokens = tokenizer(cleaned) t_low = [t.lower() for t in tokens] found = [] found_set = set() # avoid duplicates (asp_key, start, end) # Prepare seed token lists for token-sequence matching seed_tokenlists = [] for asp_key, kws in seed_aspects.items(): for kw in kws: kw_proc = kw.lower().replace('_', ' ').strip() kw_tokens = kw_proc.split() seed_tokenlists.append((asp_key, kw_tokens, kw_proc)) # 2) Token sequence match for asp_key, kw_tokens, kw_proc in seed_tokenlists: L = len(kw_tokens) if L == 0: continue for i in range(len(t_low) - L + 1): if t_low[i:i+L] == kw_tokens: phrase = " ".join(tokens[i:i+L]) key = (asp_key, i, i+L-1) if key not in found_set: found.append((asp_key, phrase, i, i+L-1)) found_set.add(key) # 3) Fallback: substring match on cleaned text (helps when tokenization variants) # Only add fallback if aspect not already found in this sentence lower_cleaned = cleaned.lower() for asp_key, kws in seed_aspects.items(): # if aspect already found at least once, skip fallback for that aspect already = any(f[0] == asp_key for f in found) if already: continue for kw in kws: kw_norm = kw.lower().replace('_', ' ').strip() # use simple substring check (word-boundary) if re.search(r'\b' + re.escape(kw_norm) + r'\b', lower_cleaned): # find approximate index in tokens to return start/end (best-effort) kw_tokens = kw_norm.split() L = len(kw_tokens) start = None for i in range(len(t_low) - L + 1): if t_low[i:i+L] == kw_tokens: start = i end = i + L - 1 break if start is None: # fallback: find first token that contains first keyword substring first_kw = kw_tokens[0] for i, tok in enumerate(t_low): if first_kw in tok: start = i end = min(len(t_low)-1, i + L - 1) break if start is None: # worst-case: mark span as entire sentence (not ideal; we skip) continue phrase = " ".join(tokens[start:end+1]) key = (asp_key, start, end) if key not in found_set: found.append((asp_key, phrase, start, end)) found_set.add(key) break # don't try other kws for this aspect once matched return tokens, found def get_context_string_from_tokens(tokens, start, end, window=3): left = max(0, start - window) right = min(len(tokens)-1, end + window) return " ".join(tokens[left:right+1]) def predict_sentiment(model, sentence, vocab, label_mapping=None): tensor = vocab.corpus_to_tensor([sentence])[0] length = torch.LongTensor([tensor.size(0)]).to(device) tensor = tensor.unsqueeze(1) # seq_len x batch with torch.no_grad(): logits = model(tensor, length).squeeze(0) probs = F.softmax(logits, dim=-1).cpu().tolist() probs = [round(p, 2) for p in probs] idx = int(torch.tensor(probs).argmax()) return (label_mapping[idx], probs) if label_mapping else (idx, probs) def process_input_with_aspects(text_input, file): """ Reads input text or uploaded file, splits into sentences/comments, extracts aspects for each comment, predicts sentiment per-aspect (or per-sentence fallback) and returns styled DataFrame + aspect-level summary. (This version hides probability columns.) """ content = "" comments = [] if text_input: content += text_input + "\n" parts = re.split(r'[.?!]\s*|\n+', content) comments = [p.strip() for p in parts if p and p.strip()] elif file: if isinstance(file, str): if file.lower().endswith('.csv'): content = open(file, 'r', encoding='utf-8', errors='ignore').read() lines = content.splitlines() comments = [line.strip() for line in lines if line.strip()] elif file.lower().endswith('.docx'): doc = Document(file) content = "\n".join([p.text for p in doc.paragraphs]) parts = re.split(r'[.?!]\s*|\n+', content) comments = [p.strip() for p in parts if p.strip()] else: content = open(file, 'r', encoding='utf-8').read() parts = re.split(r'[.?!]\s*|\n+', content) comments = [p.strip() for p in parts if p.strip()] else: raise gr.Error("Định dạng tệp không được hỗ trợ.") if len(comments) == 0: raise gr.Error("Vui lòng nhập ít nhất một bình luận hoặc tải lên tệp chứa bình luận.") # RESULTS table_rows = [] aspect_rows = [] # flattened aspect-level entries for aggregation for comment in comments: # aspect extraction tokens, aspects = extract_aspects_from_text(comment, seed_aspects) if len(aspects) == 0: # fallback: sentence-level sent_label, _ = predict_sentiment(model, clean_text(comment), vocab, label_map) row = { 'Comment': comment, 'Dự đoán': sent_label, 'Aspects': '' } table_rows.append(row) else: asp_info_list = [] for asp_key, asp_phrase, s, e in aspects: context = get_context_string_from_tokens(tokens, s, e, window=3) sent, _ = predict_sentiment(model, clean_text(context), vocab, label_map) asp_info_list.append(f"{asp_key}: {sent}") aspect_rows.append({ 'Comment': comment, 'Aspect': asp_key, 'Phrase': asp_phrase, 'Context': context, 'Sentiment': sent }) aspects_str = " | ".join(asp_info_list) sent_label, _ = predict_sentiment(model, clean_text(comment), vocab, label_map) row = { 'Comment': comment, 'Dự đoán': sent_label, 'Aspects': aspects_str } table_rows.append(row) df2 = pd.DataFrame(table_rows) # No probability columns => simpler styler styler = df2.style if len(aspect_rows) > 0: df_aspects = pd.DataFrame(aspect_rows) aspect_dist = (df_aspects.groupby(['Aspect','Sentiment']).size() .unstack(fill_value=0)) aspect_dist_pct = aspect_dist.div(aspect_dist.sum(axis=1), axis=0) * 100 else: df_aspects = pd.DataFrame(columns=['Comment','Aspect','Phrase','Context','Sentiment']) aspect_dist_pct = pd.DataFrame() return styler, df2, df_aspects, aspect_dist_pct def plot_distribution(dist): fig, ax = plt.subplots() dist.plot.bar(ax=ax, color=['red','gray','green']) ax.set_ylabel("Tỷ lệ (%)") ax.set_title("Phân phối cảm xúc (toàn câu)") ax.tick_params(axis='x', labelrotation=0) ax.tick_params(axis='y', labelrotation=0) plt.tight_layout() return fig def summarize_distribution_from_df(df): # same as before: distribution of predicted labels (sentence-level) dist = df['Dự đoán'].value_counts(normalize=True) * 100 dist = dist.reindex(['tiêu cực', 'bình thường', 'tích cực'], fill_value=0) return dist def full_process(text_input, file_input): styler, df2, df_aspects, aspect_dist_pct = process_input_with_aspects(text_input, file_input) dist = summarize_distribution_from_df(df2) fig_main = plot_distribution(dist) return styler, fig_main with gr.Blocks() as demo: gr.Markdown("## Phân tích cảm xúc") gr.Markdown("Nhập bình luận:") text_input = gr.Textbox(lines=6, placeholder="Nhập bình luận tại đây...", label="") gr.Markdown("Hoặc tải lên tệp .txt, .docx hoặc .csv chứa các bình luận:") file_input = gr.File(label="Tải tệp", file_types=[".txt", ".csv", ".docx"]) predict_button = gr.Button("Dự đoán") output_table = gr.Dataframe(headers=["Comment", "Dự đoán", 'Aspects'], interactive=False, wrap=True, max_chars=60, column_widths=["45%", "20%", "35%"]) dist_plot = gr.Plot() predict_button.click( fn=full_process, inputs=[text_input, file_input], outputs=[output_table, dist_plot] ) demo.launch()