abdull90's picture
update app.py
f5e8bd0 verified
import gradio as gr
import json
import re
import html
import os
import pandas as pd
import re
from tqdm import tqdm
# Models
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
import torch
aspdevice = "cpu"
# Aspect extraction Model logic
available_models = [
"gauneg/roberta-base-absa-ate-sentiment",
"yangheng/deberta-v3-base-end2end-absa",
"gauneg/deberta-v3-base-absa-ate-sentiment",
# Add more model IDs here
]
def get_senti_pipeline(model_id, device="cpu"):
if not model_id or model_id == "None":
raise ValueError("No model selected. Please choose a model from the dropdown.")
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForTokenClassification.from_pretrained(model_id)
return pipeline(task='ner', model=model, tokenizer=tokenizer, device=device, aggregation_strategy='simple')
def extract_full_word(text, start, end):
word_start = start
while word_start > 0 and re.match(r'\w', text[word_start - 1]):
word_start -= 1
word_end = end
while word_end < len(text) and re.match(r'\w', text[word_end]):
word_end += 1
return text[word_start:word_end].strip()
def extract_full_analysis(reviews, senti_pipeline):
# def extract_full_analysis(reviews):
rev_arr = []
for i in tqdm(range(len(reviews))):
# Always keep review_text as a string
if isinstance(reviews, pd.DataFrame):
review_text = reviews['text'].iloc[i]
elif isinstance(reviews, list):
review_text = reviews[i]
# Split into sentences for pipeline input
sentences = [rev.strip() for rev in review_text.split(".") if rev.strip()]
asppredictions = senti_pipeline(sentences)
# Flatten asppredictions if it's a list of lists
flat_predictions = []
for item in asppredictions:
if isinstance(item, list):
flat_predictions.extend(item)
else:
flat_predictions.append(item)
extracted_words = [d["word"].strip() for d in flat_predictions if isinstance(d, dict) and "word" in d]
aspect_word = [
{"word": d["word"].strip(), "start": d["start"], "end": d["end"]}
for d in flat_predictions if isinstance(d, dict) and "word" in d and "start" in d and "end" in d
]
sentiments = [d["entity_group"] for d in flat_predictions if isinstance(d, dict) and "entity_group" in d]
scores = [f"{d['score']:.4f}" for d in flat_predictions if isinstance(d, dict) and "score" in d]
refined_aspects = []
for aspect in aspect_word:
full_word = extract_full_word(review_text, aspect["start"], aspect["end"])
if len(full_word) >= 3:
refined_aspects.append(full_word)
refined_aspects = list(dict.fromkeys(refined_aspects))
dynamic_result = {
"review": review_text,
"extracted_words": extracted_words,
"aspect_words": refined_aspects,
"sentiment": sentiments,
"score": scores,
}
rev_arr.append(dynamic_result)
return rev_arr
def format_review(review_analyses):
html_parts = []
for analysis in review_analyses:
text = analysis['review']
extracted_words = analysis['extracted_words']
aspects = analysis['aspect_words']
sentiments = analysis['sentiment']
aspects_html = '<div class="cell-grid">' + "".join(f'<div class="cell-item">{html.escape(asp)}</div>' for asp in aspects) + '</div>'
sentiments_html = '<div class="cell-grid">' + "".join(f'<div class="cell-item">{html.escape(senti)}</div>' for senti in sentiments) + '</div>'
html_part = f"""
<div style="border: 1px solid #ccc; padding: 10px; margin-bottom: 10px;">
<h4>Review</h4>
<p>{text}</p>
<h3>Aspect Words</h3>
{aspects_html}
<h3>Sentiments</h3>
{sentiments_html}
</div>
"""
html_parts.append(html_part)
return "".join(html_parts)
def submit_new_re(text):
if not text.strip():
return "Please enter a review.", []
new_dynamic_reviews = extract_full_analysis([text])
return format_review(new_dynamic_reviews), new_dynamic_reviews
def submit_new_review(text, model_id):
if not text.strip():
return "Please enter a review.", []
senti_pipeline = get_senti_pipeline(model_id)
new_dynamic_reviews = extract_full_analysis([text], senti_pipeline)
return format_review(new_dynamic_reviews), new_dynamic_reviews
def process_uploaded_file(file,model_id):
senti_pipeline = get_senti_pipeline(model_id)
if file is None:
return "No file uploaded.", []
if file.name.endswith('.csv'):
try:
df = pd.read_csv(file.name)
except Exception as e:
return f"Error reading CSV file: {str(e)}", []
elif file.name.endswith('.json'):
try:
df = pd.read_json(file.name, lines=True)
except ValueError:
try:
df = pd.read_json(file.name)
except Exception as e:
return f"Error reading JSON file: {str(e)}", []
else:
return "Unsupported file type. Please upload a CSV or JSON file.", []
if 'text' not in df.columns:
return "The file must contain a 'text' column with reviews.", []
review_analyses = extract_full_analysis(df,senti_pipeline)
html_output = format_review(review_analyses)
return html_output
with gr.Blocks(css="""
.cell-grid {
display: grid;
grid-template-columns: repeat(6, 1fr);
gap: 10px;
width:100%;
height:100%
}
.cell-item {
padding: 10px;
border: 1px solid #ccc;
text-align: center;
}
""") as demo:
# Header
gr.Markdown("# Yelp Review Demonstration for ATE Extracted")
# Single review submission
gr.Markdown("## Submit Your Review")
with gr.Row():
with gr.Column(scale=4): #dropdown for model selection
model_dropdown = gr.Dropdown(label="Select Model", choices=available_models, value=available_models[0])
with gr.Column(scale=8):
submit_text = gr.Textbox(label="Write your review", lines=2)
submit_button = gr.Button("Submit")
dynamic_display = gr.HTML(label="Submitted Review Analysis", padding=10, max_height=600)
# File upload
gr.Markdown("## Upload and Process Reviews from File")
with gr.Row():
with gr.Column(scale=8):
uploaded_file = gr.File(label="Upload CSV or JSON file (max 10MB)", file_types=None, file_count="single",)
process_button = gr.Button("Process File")
with gr.Row():
with gr.Column(scale=12):
file_output = gr.HTML(label="Analysis of Uploaded Reviews",padding=10, max_height=600)
# States
dynamic_reviews_state = gr.State()
submit_button.click(submit_new_review,inputs=[submit_text, model_dropdown],outputs=[dynamic_display, dynamic_reviews_state])
process_button.click(process_uploaded_file, inputs=[uploaded_file, model_dropdown], outputs=file_output)
# Launch
if __name__ == "__main__":
demo.launch(share=True)