| import gradio as gr |
| import json |
| import re |
| import html |
| import os |
| import pandas as pd |
| import re |
| from tqdm import tqdm |
| |
| from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline |
|
|
| import torch |
| aspdevice = "cpu" |
| |
| available_models = [ |
| "gauneg/roberta-base-absa-ate-sentiment", |
| "yangheng/deberta-v3-base-end2end-absa", |
| "gauneg/deberta-v3-base-absa-ate-sentiment", |
| |
| ] |
|
|
| def get_senti_pipeline(model_id, device="cpu"): |
| if not model_id or model_id == "None": |
| raise ValueError("No model selected. Please choose a model from the dropdown.") |
| tokenizer = AutoTokenizer.from_pretrained(model_id) |
| model = AutoModelForTokenClassification.from_pretrained(model_id) |
| return pipeline(task='ner', model=model, tokenizer=tokenizer, device=device, aggregation_strategy='simple') |
|
|
|
|
| def extract_full_word(text, start, end): |
| word_start = start |
| while word_start > 0 and re.match(r'\w', text[word_start - 1]): |
| word_start -= 1 |
| word_end = end |
| while word_end < len(text) and re.match(r'\w', text[word_end]): |
| word_end += 1 |
| return text[word_start:word_end].strip() |
|
|
|
|
| def extract_full_analysis(reviews, senti_pipeline): |
| |
| rev_arr = [] |
| for i in tqdm(range(len(reviews))): |
| |
| if isinstance(reviews, pd.DataFrame): |
| review_text = reviews['text'].iloc[i] |
| elif isinstance(reviews, list): |
| review_text = reviews[i] |
| |
| sentences = [rev.strip() for rev in review_text.split(".") if rev.strip()] |
| asppredictions = senti_pipeline(sentences) |
| |
| flat_predictions = [] |
| for item in asppredictions: |
| if isinstance(item, list): |
| flat_predictions.extend(item) |
| else: |
| flat_predictions.append(item) |
| extracted_words = [d["word"].strip() for d in flat_predictions if isinstance(d, dict) and "word" in d] |
| aspect_word = [ |
| {"word": d["word"].strip(), "start": d["start"], "end": d["end"]} |
| for d in flat_predictions if isinstance(d, dict) and "word" in d and "start" in d and "end" in d |
| ] |
| sentiments = [d["entity_group"] for d in flat_predictions if isinstance(d, dict) and "entity_group" in d] |
| scores = [f"{d['score']:.4f}" for d in flat_predictions if isinstance(d, dict) and "score" in d] |
| refined_aspects = [] |
| for aspect in aspect_word: |
| full_word = extract_full_word(review_text, aspect["start"], aspect["end"]) |
| if len(full_word) >= 3: |
| refined_aspects.append(full_word) |
| refined_aspects = list(dict.fromkeys(refined_aspects)) |
| dynamic_result = { |
| "review": review_text, |
| "extracted_words": extracted_words, |
| "aspect_words": refined_aspects, |
| "sentiment": sentiments, |
| "score": scores, |
| } |
| rev_arr.append(dynamic_result) |
| return rev_arr |
|
|
|
|
| def format_review(review_analyses): |
| html_parts = [] |
| for analysis in review_analyses: |
| text = analysis['review'] |
| extracted_words = analysis['extracted_words'] |
| aspects = analysis['aspect_words'] |
| sentiments = analysis['sentiment'] |
| aspects_html = '<div class="cell-grid">' + "".join(f'<div class="cell-item">{html.escape(asp)}</div>' for asp in aspects) + '</div>' |
| sentiments_html = '<div class="cell-grid">' + "".join(f'<div class="cell-item">{html.escape(senti)}</div>' for senti in sentiments) + '</div>' |
| html_part = f""" |
| <div style="border: 1px solid #ccc; padding: 10px; margin-bottom: 10px;"> |
| <h4>Review</h4> |
| <p>{text}</p> |
| <h3>Aspect Words</h3> |
| {aspects_html} |
| <h3>Sentiments</h3> |
| {sentiments_html} |
| |
| </div> |
| """ |
| html_parts.append(html_part) |
| return "".join(html_parts) |
|
|
| def submit_new_re(text): |
| if not text.strip(): |
| return "Please enter a review.", [] |
| new_dynamic_reviews = extract_full_analysis([text]) |
| return format_review(new_dynamic_reviews), new_dynamic_reviews |
|
|
| def submit_new_review(text, model_id): |
| if not text.strip(): |
| return "Please enter a review.", [] |
| senti_pipeline = get_senti_pipeline(model_id) |
| new_dynamic_reviews = extract_full_analysis([text], senti_pipeline) |
| return format_review(new_dynamic_reviews), new_dynamic_reviews |
| def process_uploaded_file(file,model_id): |
|
|
| senti_pipeline = get_senti_pipeline(model_id) |
|
|
| if file is None: |
| return "No file uploaded.", [] |
| if file.name.endswith('.csv'): |
| try: |
| df = pd.read_csv(file.name) |
| except Exception as e: |
| return f"Error reading CSV file: {str(e)}", [] |
| elif file.name.endswith('.json'): |
| try: |
| df = pd.read_json(file.name, lines=True) |
| except ValueError: |
| try: |
| df = pd.read_json(file.name) |
| except Exception as e: |
| return f"Error reading JSON file: {str(e)}", [] |
| else: |
| return "Unsupported file type. Please upload a CSV or JSON file.", [] |
| if 'text' not in df.columns: |
| return "The file must contain a 'text' column with reviews.", [] |
| review_analyses = extract_full_analysis(df,senti_pipeline) |
| html_output = format_review(review_analyses) |
| return html_output |
|
|
| with gr.Blocks(css=""" |
| .cell-grid { |
| display: grid; |
| grid-template-columns: repeat(6, 1fr); |
| gap: 10px; |
| width:100%; |
| height:100% |
| } |
| .cell-item { |
| padding: 10px; |
| border: 1px solid #ccc; |
| text-align: center; |
| } |
| """) as demo: |
| |
| gr.Markdown("# Yelp Review Demonstration for ATE Extracted") |
| |
| gr.Markdown("## Submit Your Review") |
| with gr.Row(): |
| with gr.Column(scale=4): |
| model_dropdown = gr.Dropdown(label="Select Model", choices=available_models, value=available_models[0]) |
|
|
| with gr.Column(scale=8): |
| submit_text = gr.Textbox(label="Write your review", lines=2) |
| submit_button = gr.Button("Submit") |
| dynamic_display = gr.HTML(label="Submitted Review Analysis", padding=10, max_height=600) |
| |
| gr.Markdown("## Upload and Process Reviews from File") |
| with gr.Row(): |
| with gr.Column(scale=8): |
| uploaded_file = gr.File(label="Upload CSV or JSON file (max 10MB)", file_types=None, file_count="single",) |
| process_button = gr.Button("Process File") |
| with gr.Row(): |
| with gr.Column(scale=12): |
| file_output = gr.HTML(label="Analysis of Uploaded Reviews",padding=10, max_height=600) |
| |
| dynamic_reviews_state = gr.State() |
| submit_button.click(submit_new_review,inputs=[submit_text, model_dropdown],outputs=[dynamic_display, dynamic_reviews_state]) |
| process_button.click(process_uploaded_file, inputs=[uploaded_file, model_dropdown], outputs=file_output) |
| |
| if __name__ == "__main__": |
| demo.launch(share=True) |