|
|
from flask import Flask, request, jsonify |
|
|
from flask_cors import CORS |
|
|
import os |
|
|
from transformers import pipeline |
|
|
import numpy as np |
|
|
import torch |
|
|
import re |
|
|
from werkzeug.utils import secure_filename |
|
|
import uuid |
|
|
import platform |
|
|
|
|
|
|
|
|
if platform.system() == "Windows": |
|
|
print("Windows detected. Assigning cache directory to Transformers in AppData\\Local.") |
|
|
transformers_cache_directory = os.path.join(os.getenv('LOCALAPPDATA'), 'transformers_cache') |
|
|
else: |
|
|
print("Non-Windows system detected. Assigning cache directory to /tmp/transformers_cache.") |
|
|
transformers_cache_directory = '/tmp/transformers_cache' |
|
|
|
|
|
|
|
|
if not os.path.exists(transformers_cache_directory): |
|
|
try: |
|
|
os.makedirs(transformers_cache_directory, exist_ok=True) |
|
|
print(f"Directory '{transformers_cache_directory}' created successfully.") |
|
|
except OSError as e: |
|
|
print(f"Error creating directory '{transformers_cache_directory}': {e}") |
|
|
else: |
|
|
print(f"Directory '{transformers_cache_directory}' already exists.") |
|
|
|
|
|
|
|
|
os.environ['TRANSFORMERS_CACHE'] = transformers_cache_directory |
|
|
print(f"Environment variable TRANSFORMERS_CACHE set to '{transformers_cache_directory}'.") |
|
|
|
|
|
|
|
|
class Config: |
|
|
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), '/tmp/uploads') |
|
|
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 |
|
|
CORS_HEADERS = 'Content-Type' |
|
|
|
|
|
|
|
|
|
|
|
class DialogueSentimentAnalyzer: |
|
|
def __init__(self, model_name: str = "microsoft/DialogRPT-updown"): |
|
|
self.device = 0 if torch.cuda.is_available() else -1 |
|
|
self.dialogue_model = pipeline( |
|
|
'text-classification', |
|
|
model="microsoft/DialogRPT-updown", |
|
|
device=self.device |
|
|
) |
|
|
self.sentiment_model = pipeline( |
|
|
'sentiment-analysis', |
|
|
model="distilbert-base-uncased-finetuned-sst-2-english", |
|
|
device=self.device |
|
|
) |
|
|
self.max_length = 512 |
|
|
|
|
|
def parse_dialogue(self, text: str): |
|
|
lines = text.strip().split('\n') |
|
|
dialogue = [] |
|
|
current_speaker = None |
|
|
current_text = [] |
|
|
|
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
speaker_match = re.match(r'^([^:]+):', line) |
|
|
if speaker_match: |
|
|
if current_speaker and current_text: |
|
|
dialogue.append({'speaker': current_speaker, 'text': ' '.join(current_text)}) |
|
|
current_speaker = speaker_match.group(1) |
|
|
current_text = [line[len(current_speaker) + 1:].strip()] |
|
|
else: |
|
|
if current_speaker: |
|
|
current_text.append(line.strip()) |
|
|
|
|
|
if current_speaker and current_text: |
|
|
dialogue.append({'speaker': current_speaker, 'text': ' '.join(current_text)}) |
|
|
|
|
|
return dialogue |
|
|
|
|
|
def analyze_utterance(self, utterance): |
|
|
text = utterance['text'] |
|
|
dialogue_score = self.dialogue_model(text)[0] |
|
|
sentiment = self.sentiment_model(text)[0] |
|
|
positive_phrases = ['thank you', 'thanks', 'appreciate', 'great', 'perfect', 'looking forward', 'flexible', 'competitive'] |
|
|
negative_phrases = ['concerned', 'worry', 'issue', 'problem', 'difficult', 'unfortunately', 'sorry'] |
|
|
text_lower = text.lower() |
|
|
positive_count = sum(1 for phrase in positive_phrases if phrase in text_lower) |
|
|
negative_count = sum(1 for phrase in negative_phrases if phrase in text_lower) |
|
|
sentiment_score = float(sentiment['score']) |
|
|
if sentiment['label'] == 'NEGATIVE': |
|
|
sentiment_score = 1 - sentiment_score |
|
|
final_score = sentiment_score |
|
|
if positive_count > negative_count: |
|
|
final_score = min(1.0, final_score + 0.1 * (positive_count - negative_count)) |
|
|
elif negative_count > positive_count: |
|
|
final_score = max(0.0, final_score - 0.1 * (negative_count - positive_count)) |
|
|
|
|
|
return { |
|
|
'speaker': utterance['speaker'], |
|
|
'text': text, |
|
|
'sentiment_score': final_score, |
|
|
'engagement_score': float(dialogue_score['score']), |
|
|
'positive_phrases': positive_count, |
|
|
'negative_phrases': negative_count |
|
|
} |
|
|
|
|
|
def analyze_dialogue(self, text: str): |
|
|
dialogue = self.parse_dialogue(text) |
|
|
utterance_results = [self.analyze_utterance(utterance) for utterance in dialogue] |
|
|
overall_sentiment = np.mean([r['sentiment_score'] for r in utterance_results]) |
|
|
overall_engagement = np.mean([r['engagement_score'] for r in utterance_results]) |
|
|
sentiment_variance = np.std([r['sentiment_score'] for r in utterance_results]) |
|
|
confidence = max(0.0, 1.0 - sentiment_variance) |
|
|
speaker_sentiments = {} |
|
|
for result in utterance_results: |
|
|
if result['speaker'] not in speaker_sentiments: |
|
|
speaker_sentiments[result['speaker']] = [] |
|
|
speaker_sentiments[result['speaker']].append(result['sentiment_score']) |
|
|
speaker_averages = {speaker: np.mean(scores) for speaker, scores in speaker_sentiments.items()} |
|
|
return [{'label': 'Overall Sentiment', 'score': float(overall_sentiment)}, |
|
|
{'label': 'Confidence', 'score': float(confidence)}, |
|
|
{'label': 'Engagement', 'score': float(overall_engagement)}] + [ |
|
|
{'label': f'{speaker} Sentiment', 'score': float(score)} for speaker, score in speaker_averages.items() |
|
|
] |
|
|
|
|
|
|
|
|
def save_uploaded_file(content, upload_folder): |
|
|
filename = f"{uuid.uuid4().hex}.txt" |
|
|
file_path = os.path.join(upload_folder, secure_filename(filename)) |
|
|
with open(file_path, 'w', encoding='utf-8') as f: |
|
|
f.write(content) |
|
|
return file_path |
|
|
|
|
|
|
|
|
def analyze_sentiment(file_path: str): |
|
|
try: |
|
|
analyzer = DialogueSentimentAnalyzer() |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
text = f.read() |
|
|
return analyzer.analyze_dialogue(text) |
|
|
except Exception as e: |
|
|
print(f"Error in sentiment analysis: {str(e)}") |
|
|
return [{'label': 'Error', 'score': 0.5}] |
|
|
|
|
|
|
|
|
|
|
|
def create_app(): |
|
|
app = Flask(__name__) |
|
|
app.config.from_object(Config) |
|
|
|
|
|
|
|
|
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
|
|
|
|
|
@app.route('/upload', methods=['POST']) |
|
|
def upload_transcript(): |
|
|
try: |
|
|
transcript = request.form.get('transcript') |
|
|
if not transcript: |
|
|
return jsonify({'error': 'No transcript received'}), 400 |
|
|
|
|
|
|
|
|
file_path = os.path.join(os.getcwd(), 'transcript.txt') |
|
|
with open(file_path, 'w') as file: |
|
|
file.write(transcript) |
|
|
|
|
|
|
|
|
sentiment_result = analyze_sentiment(file_path) |
|
|
|
|
|
|
|
|
os.remove(file_path) |
|
|
|
|
|
return jsonify({'sentiment': sentiment_result}), 200 |
|
|
except Exception as e: |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
app = create_app() |
|
|
app.run(host="0.0.0.0", port=5000) |
|
|
|