Di12's picture
Update app.py
aac3e0b verified
import torch.nn as nn
import torch
import torchtext.vocab as vocab
import torch.nn.functional as F
import pandas as pd
import numpy as np
from underthesea import word_tokenize
import unicodedata
import re
from tqdm import tqdm
import gradio as gr
from huggingface_hub import hf_hub_download
import io
import matplotlib.pyplot as plt
from docx import Document
# Device configuration: consistent device for model and tensors
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Dictionary for common Vietnamese slang/abbreviations
abbreviations = {
"ko": "không",
"sp": "sản phẩm",
"k": "không",
"m": "mình",
"đc": "được",
"dc": "được",
"h": "giờ",
"trloi": "trả lời",
"cg": "cũng",
"bt": "bình thường",
"dt": "điện thoại",
"mt": "máy tính",
"m.n": "mọi người"
# add more slang mappings
}
# Regex patterns
url_pattern = r"http\S+|www\S+" # URLs
user_pattern = r"@\w+" # usernames
emoji_pattern = re.compile(
"[" # start
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F1E0-\U0001F1FF" # flags
"]+", flags=re.UNICODE)
emoticon_pattern = r"[:;=8][\-o\*']?[\)\]\(\[dDpP/:}\{@\|\\]" # emoticons
repeat_pattern = re.compile(r"(.)\1{2,}") # 3 or more repeats
def clean_text(text: str) -> str:
# Unicode normalization
text = str(text)
text = unicodedata.normalize('NFC', text) # Chuẩn hoá Unicode rõ ràng (căn bản)
# Lowercase
text = text.lower()
# Remove URLs and usernames
text = re.sub(url_pattern, '', text)
text = re.sub(user_pattern, '', text)
# Remove emojis and emoticons
text = emoji_pattern.sub(' ', text)
text = re.sub(emoticon_pattern, ' ', text)
# Expand common abbreviations
def expand(match):
word = match.group(0)
return abbreviations.get(word, word)
if abbreviations:
pattern = re.compile(r"\b(" + "|".join(map(re.escape, abbreviations.keys())) + r")\b")
text = pattern.sub(expand, text)
# Remove repeated characters (e.g., "quaaa" -> "qua" )
text = repeat_pattern.sub(r"\1", text)
# Remove punctuation (keep Vietnamese letters & numbers)
text = re.sub(r"[^\w\s\u00C0-\u024F]", ' ', text)
# Remove extra whitespace
text = re.sub(r"\s+", ' ', text).strip()
return text
class Vocabulary:
def __init__(self):
self.word2id = dict()
self.word2id['<pad>'] = 0 # Pad Token
self.word2id['<unk>'] = 1 # Unknown Token
self.unk_id = self.word2id['<unk>']
self.id2word = {v: k for k, v in self.word2id.items()}
def __getitem__(self, word):
return self.word2id.get(word, self.unk_id)
def __contains__(self, word):
return word in self.word2id
def __len__(self):
return len(self.word2id)
def id2word(self, word_index):
return self.id2word[word_index]
def add(self, word):
if word not in self:
word_index = self.word2id[word] = len(self.word2id)
self.id2word[word_index] = word
return word_index
else:
return self[word]
@staticmethod
def tokenize_corpus(corpus):
print("Tokenize the corpus...")
tokenized_corpus = list()
for document in tqdm(corpus):
tokenized_document = [word.replace(" ", "_") for word in word_tokenize(document)]
tokenized_corpus.append(tokenized_document)
return tokenized_corpus
def corpus_to_tensor(self, corpus, is_tokenized=False):
if is_tokenized:
tokenized_corpus = corpus
else:
tokenized_corpus = self.tokenize_corpus(corpus)
indicies_corpus = list()
for document in tqdm(tokenized_corpus):
indicies_document = torch.tensor(list(map(lambda word: self[word], document)),
dtype=torch.int64)
indicies_corpus.append(indicies_document)
return indicies_corpus
def tensor_to_corpus(self, tensor):
corpus = list()
for indicies in tqdm(tensor):
document = list(map(lambda index: self.id2word[index.item()], indicies))
corpus.append(document)
return corpus
class RNN(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, n_layers,
bidirectional, dropout, pad_idx, n_classes):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
self.rnn = nn.LSTM(
embedding_dim,
hidden_dim,
num_layers=n_layers,
bidirectional=bidirectional,
dropout=dropout if n_layers > 1 else 0
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_dim * (2 if bidirectional else 1), n_classes)
def forward(self, text, text_lengths):
embedded = self.dropout(self.embedding(text))
packed_embedded = nn.utils.rnn.pack_padded_sequence(
embedded, text_lengths.to('cpu'), enforce_sorted=False
)
packed_output, (hidden, cell) = self.rnn(packed_embedded)
if self.rnn.bidirectional:
hidden = self.dropout(torch.cat((hidden[-2], hidden[-1]), dim=1))
else:
hidden = self.dropout(hidden[-1])
return self.fc(hidden)
model_path = hf_hub_download(repo_id="Di12/sentiment_analysis", filename="model.pt", repo_type="space")
embedding_path = hf_hub_download(repo_id="Di12/sentiment_analysis", filename="vi_word2vec_reduced.txt", repo_type="space")
# Load pretrained embeddings and build vocab
word_embedding = vocab.Vectors(
name=embedding_path,
unk_init=torch.Tensor.normal_
)
vocab = Vocabulary()
for w in word_embedding.stoi.keys(): vocab.add(w)
# Model hyperparams
input_dim = word_embedding.vectors.shape[0]
embedding_dim = 100
hidden_dim = 8
n_layers = 2
bidirectional = False
dropout = 0.3
pad_idx = vocab["<pad>"]
unk_idx = vocab["<unk>"]
n_classes = 3
label_map = {0: 'tiêu cực', 1: 'bình thường', 2: 'tích cực'}
def load_model(path: str):
model = RNN(input_dim, embedding_dim, hidden_dim, n_layers, bidirectional, dropout, pad_idx, n_classes)
model.load_state_dict(torch.load(path, map_location=device))
model.to(device)
model.eval()
return model
model = load_model(model_path)
seed_aspects = {
'vận_chuyển': ['giao hàng', 'giao', 'ship', 'nhận hàng', 'vận chuyển'],
'đóng_gói': ['đóng gói', 'đóng_gói', 'gói', 'bao_bì'],
'sản_phẩm': ['sách', 'sản_phẩm', 'chất_lượng']
}
def tokenize_underthesea(text):
"""
underthesea.word_tokenize returns a string or tokens joined by spaces.
We split to get list of tokens.
"""
toks = word_tokenize(text) # underthesea
if isinstance(toks, str):
toks = toks.split()
return toks
def extract_aspects_from_text(text, seed_aspects, tokenizer=tokenize_underthesea):
"""
Trả về:
tokens: list[str]
found: list of tuples (asp_key, matched_phrase, start_idx, end_idx)
Hợp nhất:
- token-sequence matching (like trước)
- fallback substring matching trên clean_text nếu token-match không bắt được
"""
# 1) Normalize + tokenize
cleaned = clean_text(text)
tokens = tokenizer(cleaned)
t_low = [t.lower() for t in tokens]
found = []
found_set = set() # avoid duplicates (asp_key, start, end)
# Prepare seed token lists for token-sequence matching
seed_tokenlists = []
for asp_key, kws in seed_aspects.items():
for kw in kws:
kw_proc = kw.lower().replace('_', ' ').strip()
kw_tokens = kw_proc.split()
seed_tokenlists.append((asp_key, kw_tokens, kw_proc))
# 2) Token sequence match
for asp_key, kw_tokens, kw_proc in seed_tokenlists:
L = len(kw_tokens)
if L == 0:
continue
for i in range(len(t_low) - L + 1):
if t_low[i:i+L] == kw_tokens:
phrase = " ".join(tokens[i:i+L])
key = (asp_key, i, i+L-1)
if key not in found_set:
found.append((asp_key, phrase, i, i+L-1))
found_set.add(key)
# 3) Fallback: substring match on cleaned text (helps when tokenization variants)
# Only add fallback if aspect not already found in this sentence
lower_cleaned = cleaned.lower()
for asp_key, kws in seed_aspects.items():
# if aspect already found at least once, skip fallback for that aspect
already = any(f[0] == asp_key for f in found)
if already:
continue
for kw in kws:
kw_norm = kw.lower().replace('_', ' ').strip()
# use simple substring check (word-boundary)
if re.search(r'\b' + re.escape(kw_norm) + r'\b', lower_cleaned):
# find approximate index in tokens to return start/end (best-effort)
kw_tokens = kw_norm.split()
L = len(kw_tokens)
start = None
for i in range(len(t_low) - L + 1):
if t_low[i:i+L] == kw_tokens:
start = i
end = i + L - 1
break
if start is None:
# fallback: find first token that contains first keyword substring
first_kw = kw_tokens[0]
for i, tok in enumerate(t_low):
if first_kw in tok:
start = i
end = min(len(t_low)-1, i + L - 1)
break
if start is None:
# worst-case: mark span as entire sentence (not ideal; we skip)
continue
phrase = " ".join(tokens[start:end+1])
key = (asp_key, start, end)
if key not in found_set:
found.append((asp_key, phrase, start, end))
found_set.add(key)
break # don't try other kws for this aspect once matched
return tokens, found
def get_context_string_from_tokens(tokens, start, end, window=3):
left = max(0, start - window)
right = min(len(tokens)-1, end + window)
return " ".join(tokens[left:right+1])
def predict_sentiment(model, sentence, vocab, label_mapping=None):
tensor = vocab.corpus_to_tensor([sentence])[0]
length = torch.LongTensor([tensor.size(0)]).to(device)
tensor = tensor.unsqueeze(1) # seq_len x batch
with torch.no_grad():
logits = model(tensor, length).squeeze(0)
probs = F.softmax(logits, dim=-1).cpu().tolist()
probs = [round(p, 2) for p in probs]
idx = int(torch.tensor(probs).argmax())
return (label_mapping[idx], probs) if label_mapping else (idx, probs)
def process_input_with_aspects(text_input, file):
"""
Reads input text or uploaded file, splits into sentences/comments,
extracts aspects for each comment, predicts sentiment per-aspect
(or per-sentence fallback) and returns styled DataFrame + aspect-level summary.
(This version hides probability columns.)
"""
content = ""
comments = []
if text_input:
content += text_input + "\n"
parts = re.split(r'[.?!]\s*|\n+', content)
comments = [p.strip() for p in parts if p and p.strip()]
elif file:
if isinstance(file, str):
if file.lower().endswith('.csv'):
content = open(file, 'r', encoding='utf-8', errors='ignore').read()
lines = content.splitlines()
comments = [line.strip() for line in lines if line.strip()]
elif file.lower().endswith('.docx'):
doc = Document(file)
content = "\n".join([p.text for p in doc.paragraphs])
parts = re.split(r'[.?!]\s*|\n+', content)
comments = [p.strip() for p in parts if p.strip()]
else:
content = open(file, 'r', encoding='utf-8').read()
parts = re.split(r'[.?!]\s*|\n+', content)
comments = [p.strip() for p in parts if p.strip()]
else:
raise gr.Error("Định dạng tệp không được hỗ trợ.")
if len(comments) == 0:
raise gr.Error("Vui lòng nhập ít nhất một bình luận hoặc tải lên tệp chứa bình luận.")
# RESULTS
table_rows = []
aspect_rows = [] # flattened aspect-level entries for aggregation
for comment in comments:
# aspect extraction
tokens, aspects = extract_aspects_from_text(comment, seed_aspects)
if len(aspects) == 0:
# fallback: sentence-level
sent_label, _ = predict_sentiment(model, clean_text(comment), vocab, label_map)
row = {
'Comment': comment,
'Dự đoán': sent_label,
'Aspects': ''
}
table_rows.append(row)
else:
asp_info_list = []
for asp_key, asp_phrase, s, e in aspects:
context = get_context_string_from_tokens(tokens, s, e, window=3)
sent, _ = predict_sentiment(model, clean_text(context), vocab, label_map)
asp_info_list.append(f"{asp_key}: {sent}")
aspect_rows.append({
'Comment': comment,
'Aspect': asp_key,
'Phrase': asp_phrase,
'Context': context,
'Sentiment': sent
})
aspects_str = " | ".join(asp_info_list)
sent_label, _ = predict_sentiment(model, clean_text(comment), vocab, label_map)
row = {
'Comment': comment,
'Dự đoán': sent_label,
'Aspects': aspects_str
}
table_rows.append(row)
df2 = pd.DataFrame(table_rows)
# No probability columns => simpler styler
styler = df2.style
if len(aspect_rows) > 0:
df_aspects = pd.DataFrame(aspect_rows)
aspect_dist = (df_aspects.groupby(['Aspect','Sentiment']).size()
.unstack(fill_value=0))
aspect_dist_pct = aspect_dist.div(aspect_dist.sum(axis=1), axis=0) * 100
else:
df_aspects = pd.DataFrame(columns=['Comment','Aspect','Phrase','Context','Sentiment'])
aspect_dist_pct = pd.DataFrame()
return styler, df2, df_aspects, aspect_dist_pct
def plot_distribution(dist):
fig, ax = plt.subplots()
dist.plot.bar(ax=ax, color=['red','gray','green'])
ax.set_ylabel("Tỷ lệ (%)")
ax.set_title("Phân phối cảm xúc (toàn câu)")
ax.tick_params(axis='x', labelrotation=0)
ax.tick_params(axis='y', labelrotation=0)
plt.tight_layout()
return fig
def summarize_distribution_from_df(df):
# same as before: distribution of predicted labels (sentence-level)
dist = df['Dự đoán'].value_counts(normalize=True) * 100
dist = dist.reindex(['tiêu cực', 'bình thường', 'tích cực'], fill_value=0)
return dist
def full_process(text_input, file_input):
styler, df2, df_aspects, aspect_dist_pct = process_input_with_aspects(text_input, file_input)
dist = summarize_distribution_from_df(df2)
fig_main = plot_distribution(dist)
return styler, fig_main
with gr.Blocks() as demo:
gr.Markdown("## Phân tích cảm xúc")
gr.Markdown("Nhập bình luận:")
text_input = gr.Textbox(lines=6, placeholder="Nhập bình luận tại đây...", label="")
gr.Markdown("Hoặc tải lên tệp .txt, .docx hoặc .csv chứa các bình luận:")
file_input = gr.File(label="Tải tệp", file_types=[".txt", ".csv", ".docx"])
predict_button = gr.Button("Dự đoán")
output_table = gr.Dataframe(headers=["Comment", "Dự đoán", 'Aspects'],
interactive=False,
wrap=True,
max_chars=60,
column_widths=["45%", "20%", "35%"])
dist_plot = gr.Plot()
predict_button.click(
fn=full_process,
inputs=[text_input, file_input],
outputs=[output_table, dist_plot]
)
demo.launch()