Spaces:
Runtime error
Runtime error
| import base64 | |
| import io | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from transformers import BertModel, BertTokenizer, BertConfig | |
| from werkzeug.utils import secure_filename | |
| import os | |
| os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' | |
| import pandas as pd | |
| from openpyxl import load_workbook | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import matplotlib | |
| matplotlib.use('Agg') | |
| import plotly.express as px | |
| # Load the model | |
| from huggingface_hub import hf_hub_download | |
| app = Flask(__name__) | |
| CORS(app) # Enable CORS for all routes | |
| # Define class_names and device if not already defined | |
| class_names = ['Negative', 'Neutral', 'Positive'] | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Create a modified BERT model with the correct vocabulary size | |
| class ModifiedBertForSentiment(nn.Module): | |
| def __init__(self, config, n_classes): | |
| super(ModifiedBertForSentiment, self).__init__() | |
| self.bert = BertModel(config) | |
| self.drop = nn.Dropout(p=0.3) | |
| self.out = nn.Linear(config.hidden_size, n_classes) | |
| def forward(self, input_ids, attention_mask): | |
| outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask) | |
| pooled_output = outputs.last_hidden_state.mean(dim=1) | |
| output = self.drop(pooled_output) | |
| return self.out(output) | |
| # Load the model | |
| tokenizer = BertTokenizer.from_pretrained('nlptown/bert-base-multilingual-uncased-sentiment') | |
| config = BertConfig.from_pretrained('nlptown/bert-base-multilingual-uncased-sentiment') | |
| model = ModifiedBertForSentiment(config, len(class_names)) | |
| # Download model from Hugging Face if not exists locally | |
| model_filename = 'roman_Sentiment.pth' | |
| if not os.path.exists(model_filename): | |
| print("Downloading model from Hugging Face...") | |
| model_filename = hf_hub_download( | |
| repo_id="makbar023/roman-sentiment-model", | |
| filename="roman_Sentiment.pth" | |
| ) | |
| print(f"Model downloaded to: {model_filename}") | |
| else: | |
| print("Using local model file") | |
| model.load_state_dict(torch.load(model_filename, map_location=device)) | |
| model.to(device) | |
| model.eval() | |
| # Helper function to tokenize text | |
| def tokenize_text(text): | |
| inputs = tokenizer(text, padding=True, truncation=True, return_tensors='pt', max_length=512) | |
| return inputs['input_ids'], inputs['attention_mask'] | |
| # Sentiment analysis function | |
| def predict_single_sentence_sentiment(review_text): | |
| input_ids, attention_mask = tokenize_text(review_text) | |
| input_ids = input_ids.to(device) | |
| attention_mask = attention_mask.to(device) | |
| with torch.no_grad(): | |
| outputs = model(input_ids=input_ids, attention_mask=attention_mask) | |
| _, preds = torch.max(outputs, dim=1) | |
| probs = F.softmax(outputs, dim=1) | |
| sentiment = class_names[preds.item()] | |
| return sentiment, probs | |
| def analyze_sentiment_route(): | |
| try: | |
| data = request.get_json() | |
| review = data['review'] | |
| sentiment, _ = predict_single_sentence_sentiment(review) | |
| return jsonify(sentiment) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}) | |
| # Sentiment analysis function | |
| def predict_sentiment(review_text): | |
| input_ids, attention_mask = tokenize_text(review_text) | |
| input_ids = input_ids.to(device) | |
| attention_mask = attention_mask.to(device) | |
| with torch.no_grad(): | |
| outputs = model(input_ids=input_ids, attention_mask=attention_mask) | |
| _, preds = torch.max(outputs, dim=1) | |
| probs = F.softmax(outputs, dim=1) | |
| sentiment = class_names[preds.item()] | |
| return sentiment, probs | |
| def analyze_multi_sentences_route(): | |
| try: | |
| data = request.get_json() | |
| sentences = data['sentences'] | |
| results = [] | |
| for sentence in sentences: | |
| sentiment, probabilities = predict_sentiment(sentence) | |
| result = { | |
| 'sentence': sentence, | |
| 'sentiment': sentiment, | |
| 'probabilities': {class_names[i]: float(probabilities[0][i]) for i in range(len(class_names))} | |
| } | |
| results.append(result) | |
| return jsonify(results) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}) | |
| # Define your prediction data (you should replace this with actual data) | |
| prediction_data = ['Negative', 'Neutral', 'Positive'] | |
| def analyze_sentiment_file(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file part'}) | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No selected file'}) | |
| if file: | |
| filename = secure_filename(file.filename) | |
| file.save(filename) | |
| data = None | |
| # Handle different file formats | |
| if filename.endswith('.csv'): | |
| data = pd.read_csv(filename) | |
| elif filename.endswith('.xlsx'): | |
| wb = load_workbook(filename) | |
| sheet = wb.active | |
| data = pd.DataFrame(sheet.values) | |
| elif filename.endswith('.txt'): | |
| # Read text content from a .txt file | |
| with open(filename, 'r') as txt_file: | |
| data = [line.strip() for line in txt_file] | |
| line_count = len(data) | |
| sentiments = [] | |
| sentiment_counts = {'Negative': 0, 'Neutral': 0, 'Positive': 0} | |
| reviews = [] | |
| for text_data in data: | |
| sentiment, _ = predict_single_sentence_sentiment(text_data) | |
| sentiments.append(sentiment) | |
| sentiment_counts[sentiment] += 1 | |
| reviews.append(text_data) | |
| # Create a pie chart | |
| fig = px.pie( | |
| names=class_names, | |
| values=[sentiment_counts['Negative'], sentiment_counts['Neutral'], sentiment_counts['Positive']], | |
| title='Sentiment Distribution' | |
| ) | |
| fig.write_image("pie_chart.png", width=800, height=400) | |
| with open("pie_chart.png", "rb") as image_file: | |
| encoded_image = base64.b64encode(image_file.read()).decode('utf-8') | |
| os.remove(filename) | |
| os.remove("pie_chart.png") | |
| return jsonify({ | |
| 'line_count': line_count, | |
| 'sentiment_counts': sentiment_counts, | |
| 'sentiments': sentiments, | |
| 'reviews': reviews, | |
| 'pie_chart_path': encoded_image | |
| }) | |
| def health_check(): | |
| return jsonify({'status': 'healthy', 'message': 'SentimentSense API is running'}) | |
| if __name__ == '__main__': | |
| if not os.path.exists('uploads'): | |
| os.makedirs('uploads') | |
| port = int(os.environ.get("PORT", 5000)) | |
| app.run(host="0.0.0.0", port=port) | |