Spaces:
Sleeping
Sleeping
| import torch.nn as nn | |
| import torch | |
| import torchtext.vocab as vocab | |
| import torch.nn.functional as F | |
| import pandas as pd | |
| import numpy as np | |
| from underthesea import word_tokenize | |
| import unicodedata | |
| import re | |
| from tqdm import tqdm | |
| import gradio as gr | |
| from huggingface_hub import hf_hub_download | |
| import io | |
| import matplotlib.pyplot as plt | |
| from docx import Document | |
| # Device configuration: consistent device for model and tensors | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| # Dictionary for common Vietnamese slang/abbreviations | |
| abbreviations = { | |
| "ko": "không", | |
| "sp": "sản phẩm", | |
| "k": "không", | |
| "m": "mình", | |
| "đc": "được", | |
| "dc": "được", | |
| "h": "giờ", | |
| "trloi": "trả lời", | |
| "cg": "cũng", | |
| "bt": "bình thường", | |
| "dt": "điện thoại", | |
| "mt": "máy tính", | |
| "m.n": "mọi người" | |
| # add more slang mappings | |
| } | |
| # Regex patterns | |
| url_pattern = r"http\S+|www\S+" # URLs | |
| user_pattern = r"@\w+" # usernames | |
| emoji_pattern = re.compile( | |
| "[" # start | |
| "\U0001F600-\U0001F64F" # emoticons | |
| "\U0001F300-\U0001F5FF" # symbols & pictographs | |
| "\U0001F680-\U0001F6FF" # transport & map symbols | |
| "\U0001F1E0-\U0001F1FF" # flags | |
| "]+", flags=re.UNICODE) | |
| emoticon_pattern = r"[:;=8][\-o\*']?[\)\]\(\[dDpP/:}\{@\|\\]" # emoticons | |
| repeat_pattern = re.compile(r"(.)\1{2,}") # 3 or more repeats | |
| def clean_text(text: str) -> str: | |
| # Unicode normalization | |
| text = str(text) | |
| text = unicodedata.normalize('NFC', text) # Chuẩn hoá Unicode rõ ràng (căn bản) | |
| # Lowercase | |
| text = text.lower() | |
| # Remove URLs and usernames | |
| text = re.sub(url_pattern, '', text) | |
| text = re.sub(user_pattern, '', text) | |
| # Remove emojis and emoticons | |
| text = emoji_pattern.sub(' ', text) | |
| text = re.sub(emoticon_pattern, ' ', text) | |
| # Expand common abbreviations | |
| def expand(match): | |
| word = match.group(0) | |
| return abbreviations.get(word, word) | |
| if abbreviations: | |
| pattern = re.compile(r"\b(" + "|".join(map(re.escape, abbreviations.keys())) + r")\b") | |
| text = pattern.sub(expand, text) | |
| # Remove repeated characters (e.g., "quaaa" -> "qua" ) | |
| text = repeat_pattern.sub(r"\1", text) | |
| # Remove punctuation (keep Vietnamese letters & numbers) | |
| text = re.sub(r"[^\w\s\u00C0-\u024F]", ' ', text) | |
| # Remove extra whitespace | |
| text = re.sub(r"\s+", ' ', text).strip() | |
| return text | |
| class Vocabulary: | |
| def __init__(self): | |
| self.word2id = dict() | |
| self.word2id['<pad>'] = 0 # Pad Token | |
| self.word2id['<unk>'] = 1 # Unknown Token | |
| self.unk_id = self.word2id['<unk>'] | |
| self.id2word = {v: k for k, v in self.word2id.items()} | |
| def __getitem__(self, word): | |
| return self.word2id.get(word, self.unk_id) | |
| def __contains__(self, word): | |
| return word in self.word2id | |
| def __len__(self): | |
| return len(self.word2id) | |
| def id2word(self, word_index): | |
| return self.id2word[word_index] | |
| def add(self, word): | |
| if word not in self: | |
| word_index = self.word2id[word] = len(self.word2id) | |
| self.id2word[word_index] = word | |
| return word_index | |
| else: | |
| return self[word] | |
| def tokenize_corpus(corpus): | |
| print("Tokenize the corpus...") | |
| tokenized_corpus = list() | |
| for document in tqdm(corpus): | |
| tokenized_document = [word.replace(" ", "_") for word in word_tokenize(document)] | |
| tokenized_corpus.append(tokenized_document) | |
| return tokenized_corpus | |
| def corpus_to_tensor(self, corpus, is_tokenized=False): | |
| if is_tokenized: | |
| tokenized_corpus = corpus | |
| else: | |
| tokenized_corpus = self.tokenize_corpus(corpus) | |
| indicies_corpus = list() | |
| for document in tqdm(tokenized_corpus): | |
| indicies_document = torch.tensor(list(map(lambda word: self[word], document)), | |
| dtype=torch.int64) | |
| indicies_corpus.append(indicies_document) | |
| return indicies_corpus | |
| def tensor_to_corpus(self, tensor): | |
| corpus = list() | |
| for indicies in tqdm(tensor): | |
| document = list(map(lambda index: self.id2word[index.item()], indicies)) | |
| corpus.append(document) | |
| return corpus | |
| class RNN(nn.Module): | |
| def __init__(self, vocab_size, embedding_dim, hidden_dim, n_layers, | |
| bidirectional, dropout, pad_idx, n_classes): | |
| super().__init__() | |
| self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx) | |
| self.rnn = nn.LSTM( | |
| embedding_dim, | |
| hidden_dim, | |
| num_layers=n_layers, | |
| bidirectional=bidirectional, | |
| dropout=dropout if n_layers > 1 else 0 | |
| ) | |
| self.dropout = nn.Dropout(dropout) | |
| self.fc = nn.Linear(hidden_dim * (2 if bidirectional else 1), n_classes) | |
| def forward(self, text, text_lengths): | |
| embedded = self.dropout(self.embedding(text)) | |
| packed_embedded = nn.utils.rnn.pack_padded_sequence( | |
| embedded, text_lengths.to('cpu'), enforce_sorted=False | |
| ) | |
| packed_output, (hidden, cell) = self.rnn(packed_embedded) | |
| if self.rnn.bidirectional: | |
| hidden = self.dropout(torch.cat((hidden[-2], hidden[-1]), dim=1)) | |
| else: | |
| hidden = self.dropout(hidden[-1]) | |
| return self.fc(hidden) | |
| model_path = hf_hub_download(repo_id="Di12/sentiment_analysis", filename="model.pt", repo_type="space") | |
| embedding_path = hf_hub_download(repo_id="Di12/sentiment_analysis", filename="vi_word2vec_reduced.txt", repo_type="space") | |
| # Load pretrained embeddings and build vocab | |
| word_embedding = vocab.Vectors( | |
| name=embedding_path, | |
| unk_init=torch.Tensor.normal_ | |
| ) | |
| vocab = Vocabulary() | |
| for w in word_embedding.stoi.keys(): vocab.add(w) | |
| # Model hyperparams | |
| input_dim = word_embedding.vectors.shape[0] | |
| embedding_dim = 100 | |
| hidden_dim = 8 | |
| n_layers = 2 | |
| bidirectional = False | |
| dropout = 0.3 | |
| pad_idx = vocab["<pad>"] | |
| unk_idx = vocab["<unk>"] | |
| n_classes = 3 | |
| label_map = {0: 'tiêu cực', 1: 'bình thường', 2: 'tích cực'} | |
| def load_model(path: str): | |
| model = RNN(input_dim, embedding_dim, hidden_dim, n_layers, bidirectional, dropout, pad_idx, n_classes) | |
| model.load_state_dict(torch.load(path, map_location=device)) | |
| model.to(device) | |
| model.eval() | |
| return model | |
| model = load_model(model_path) | |
| seed_aspects = { | |
| 'vận_chuyển': ['giao hàng', 'giao', 'ship', 'nhận hàng', 'vận chuyển'], | |
| 'đóng_gói': ['đóng gói', 'đóng_gói', 'gói', 'bao_bì'], | |
| 'sản_phẩm': ['sách', 'sản_phẩm', 'chất_lượng'] | |
| } | |
| def tokenize_underthesea(text): | |
| """ | |
| underthesea.word_tokenize returns a string or tokens joined by spaces. | |
| We split to get list of tokens. | |
| """ | |
| toks = word_tokenize(text) # underthesea | |
| if isinstance(toks, str): | |
| toks = toks.split() | |
| return toks | |
| def extract_aspects_from_text(text, seed_aspects, tokenizer=tokenize_underthesea): | |
| """ | |
| Trả về: | |
| tokens: list[str] | |
| found: list of tuples (asp_key, matched_phrase, start_idx, end_idx) | |
| Hợp nhất: | |
| - token-sequence matching (like trước) | |
| - fallback substring matching trên clean_text nếu token-match không bắt được | |
| """ | |
| # 1) Normalize + tokenize | |
| cleaned = clean_text(text) | |
| tokens = tokenizer(cleaned) | |
| t_low = [t.lower() for t in tokens] | |
| found = [] | |
| found_set = set() # avoid duplicates (asp_key, start, end) | |
| # Prepare seed token lists for token-sequence matching | |
| seed_tokenlists = [] | |
| for asp_key, kws in seed_aspects.items(): | |
| for kw in kws: | |
| kw_proc = kw.lower().replace('_', ' ').strip() | |
| kw_tokens = kw_proc.split() | |
| seed_tokenlists.append((asp_key, kw_tokens, kw_proc)) | |
| # 2) Token sequence match | |
| for asp_key, kw_tokens, kw_proc in seed_tokenlists: | |
| L = len(kw_tokens) | |
| if L == 0: | |
| continue | |
| for i in range(len(t_low) - L + 1): | |
| if t_low[i:i+L] == kw_tokens: | |
| phrase = " ".join(tokens[i:i+L]) | |
| key = (asp_key, i, i+L-1) | |
| if key not in found_set: | |
| found.append((asp_key, phrase, i, i+L-1)) | |
| found_set.add(key) | |
| # 3) Fallback: substring match on cleaned text (helps when tokenization variants) | |
| # Only add fallback if aspect not already found in this sentence | |
| lower_cleaned = cleaned.lower() | |
| for asp_key, kws in seed_aspects.items(): | |
| # if aspect already found at least once, skip fallback for that aspect | |
| already = any(f[0] == asp_key for f in found) | |
| if already: | |
| continue | |
| for kw in kws: | |
| kw_norm = kw.lower().replace('_', ' ').strip() | |
| # use simple substring check (word-boundary) | |
| if re.search(r'\b' + re.escape(kw_norm) + r'\b', lower_cleaned): | |
| # find approximate index in tokens to return start/end (best-effort) | |
| kw_tokens = kw_norm.split() | |
| L = len(kw_tokens) | |
| start = None | |
| for i in range(len(t_low) - L + 1): | |
| if t_low[i:i+L] == kw_tokens: | |
| start = i | |
| end = i + L - 1 | |
| break | |
| if start is None: | |
| # fallback: find first token that contains first keyword substring | |
| first_kw = kw_tokens[0] | |
| for i, tok in enumerate(t_low): | |
| if first_kw in tok: | |
| start = i | |
| end = min(len(t_low)-1, i + L - 1) | |
| break | |
| if start is None: | |
| # worst-case: mark span as entire sentence (not ideal; we skip) | |
| continue | |
| phrase = " ".join(tokens[start:end+1]) | |
| key = (asp_key, start, end) | |
| if key not in found_set: | |
| found.append((asp_key, phrase, start, end)) | |
| found_set.add(key) | |
| break # don't try other kws for this aspect once matched | |
| return tokens, found | |
| def get_context_string_from_tokens(tokens, start, end, window=3): | |
| left = max(0, start - window) | |
| right = min(len(tokens)-1, end + window) | |
| return " ".join(tokens[left:right+1]) | |
| def predict_sentiment(model, sentence, vocab, label_mapping=None): | |
| tensor = vocab.corpus_to_tensor([sentence])[0] | |
| length = torch.LongTensor([tensor.size(0)]).to(device) | |
| tensor = tensor.unsqueeze(1) # seq_len x batch | |
| with torch.no_grad(): | |
| logits = model(tensor, length).squeeze(0) | |
| probs = F.softmax(logits, dim=-1).cpu().tolist() | |
| probs = [round(p, 2) for p in probs] | |
| idx = int(torch.tensor(probs).argmax()) | |
| return (label_mapping[idx], probs) if label_mapping else (idx, probs) | |
| def process_input_with_aspects(text_input, file): | |
| """ | |
| Reads input text or uploaded file, splits into sentences/comments, | |
| extracts aspects for each comment, predicts sentiment per-aspect | |
| (or per-sentence fallback) and returns styled DataFrame + aspect-level summary. | |
| (This version hides probability columns.) | |
| """ | |
| content = "" | |
| comments = [] | |
| if text_input: | |
| content += text_input + "\n" | |
| parts = re.split(r'[.?!]\s*|\n+', content) | |
| comments = [p.strip() for p in parts if p and p.strip()] | |
| elif file: | |
| if isinstance(file, str): | |
| if file.lower().endswith('.csv'): | |
| content = open(file, 'r', encoding='utf-8', errors='ignore').read() | |
| lines = content.splitlines() | |
| comments = [line.strip() for line in lines if line.strip()] | |
| elif file.lower().endswith('.docx'): | |
| doc = Document(file) | |
| content = "\n".join([p.text for p in doc.paragraphs]) | |
| parts = re.split(r'[.?!]\s*|\n+', content) | |
| comments = [p.strip() for p in parts if p.strip()] | |
| else: | |
| content = open(file, 'r', encoding='utf-8').read() | |
| parts = re.split(r'[.?!]\s*|\n+', content) | |
| comments = [p.strip() for p in parts if p.strip()] | |
| else: | |
| raise gr.Error("Định dạng tệp không được hỗ trợ.") | |
| if len(comments) == 0: | |
| raise gr.Error("Vui lòng nhập ít nhất một bình luận hoặc tải lên tệp chứa bình luận.") | |
| # RESULTS | |
| table_rows = [] | |
| aspect_rows = [] # flattened aspect-level entries for aggregation | |
| for comment in comments: | |
| # aspect extraction | |
| tokens, aspects = extract_aspects_from_text(comment, seed_aspects) | |
| if len(aspects) == 0: | |
| # fallback: sentence-level | |
| sent_label, _ = predict_sentiment(model, clean_text(comment), vocab, label_map) | |
| row = { | |
| 'Comment': comment, | |
| 'Dự đoán': sent_label, | |
| 'Aspects': '' | |
| } | |
| table_rows.append(row) | |
| else: | |
| asp_info_list = [] | |
| for asp_key, asp_phrase, s, e in aspects: | |
| context = get_context_string_from_tokens(tokens, s, e, window=3) | |
| sent, _ = predict_sentiment(model, clean_text(context), vocab, label_map) | |
| asp_info_list.append(f"{asp_key}: {sent}") | |
| aspect_rows.append({ | |
| 'Comment': comment, | |
| 'Aspect': asp_key, | |
| 'Phrase': asp_phrase, | |
| 'Context': context, | |
| 'Sentiment': sent | |
| }) | |
| aspects_str = " | ".join(asp_info_list) | |
| sent_label, _ = predict_sentiment(model, clean_text(comment), vocab, label_map) | |
| row = { | |
| 'Comment': comment, | |
| 'Dự đoán': sent_label, | |
| 'Aspects': aspects_str | |
| } | |
| table_rows.append(row) | |
| df2 = pd.DataFrame(table_rows) | |
| # No probability columns => simpler styler | |
| styler = df2.style | |
| if len(aspect_rows) > 0: | |
| df_aspects = pd.DataFrame(aspect_rows) | |
| aspect_dist = (df_aspects.groupby(['Aspect','Sentiment']).size() | |
| .unstack(fill_value=0)) | |
| aspect_dist_pct = aspect_dist.div(aspect_dist.sum(axis=1), axis=0) * 100 | |
| else: | |
| df_aspects = pd.DataFrame(columns=['Comment','Aspect','Phrase','Context','Sentiment']) | |
| aspect_dist_pct = pd.DataFrame() | |
| return styler, df2, df_aspects, aspect_dist_pct | |
| def plot_distribution(dist): | |
| fig, ax = plt.subplots() | |
| dist.plot.bar(ax=ax, color=['red','gray','green']) | |
| ax.set_ylabel("Tỷ lệ (%)") | |
| ax.set_title("Phân phối cảm xúc (toàn câu)") | |
| ax.tick_params(axis='x', labelrotation=0) | |
| ax.tick_params(axis='y', labelrotation=0) | |
| plt.tight_layout() | |
| return fig | |
| def summarize_distribution_from_df(df): | |
| # same as before: distribution of predicted labels (sentence-level) | |
| dist = df['Dự đoán'].value_counts(normalize=True) * 100 | |
| dist = dist.reindex(['tiêu cực', 'bình thường', 'tích cực'], fill_value=0) | |
| return dist | |
| def full_process(text_input, file_input): | |
| styler, df2, df_aspects, aspect_dist_pct = process_input_with_aspects(text_input, file_input) | |
| dist = summarize_distribution_from_df(df2) | |
| fig_main = plot_distribution(dist) | |
| return styler, fig_main | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Phân tích cảm xúc") | |
| gr.Markdown("Nhập bình luận:") | |
| text_input = gr.Textbox(lines=6, placeholder="Nhập bình luận tại đây...", label="") | |
| gr.Markdown("Hoặc tải lên tệp .txt, .docx hoặc .csv chứa các bình luận:") | |
| file_input = gr.File(label="Tải tệp", file_types=[".txt", ".csv", ".docx"]) | |
| predict_button = gr.Button("Dự đoán") | |
| output_table = gr.Dataframe(headers=["Comment", "Dự đoán", 'Aspects'], | |
| interactive=False, | |
| wrap=True, | |
| max_chars=60, | |
| column_widths=["45%", "20%", "35%"]) | |
| dist_plot = gr.Plot() | |
| predict_button.click( | |
| fn=full_process, | |
| inputs=[text_input, file_input], | |
| outputs=[output_table, dist_plot] | |
| ) | |
| demo.launch() |