import streamlit as st import google.generativeai as genai from io import BytesIO import json import matplotlib.pyplot as plt import re import nltk from nltk.corpus import stopwords from collections import Counter from wordcloud import WordCloud from fpdf import FPDF import langdetect from datetime import datetime # Configure API Key securely from Streamlit's secrets genai.configure(api_key=st.secrets["GOOGLE_API_KEY"]) # App Configuration st.set_page_config(page_title="Escalytics", page_icon="📧", layout="wide") st.title("⚡Escalytics by EverTech") st.write("Extract insights, root causes, and actionable steps from emails.") # Sidebar for Features st.sidebar.header("Settings") features = { "sentiment": st.sidebar.checkbox("Perform Sentiment Analysis"), "highlights": st.sidebar.checkbox("Highlight Key Phrases"), "response": st.sidebar.checkbox("Generate Suggested Response"), "wordcloud": st.sidebar.checkbox("Generate Word Cloud"), "grammar_check": st.sidebar.checkbox("Grammar Check"), "key_phrases": st.sidebar.checkbox("Extract Key Phrases"), "actionable_items": st.sidebar.checkbox("Extract Actionable Items"), "root_cause": st.sidebar.checkbox("Root Cause Detection"), "risk_assessment": st.sidebar.checkbox("Risk Assessment"), "severity_detection": st.sidebar.checkbox("Severity Detection"), "critical_keywords": st.sidebar.checkbox("Critical Keyword Identification"), "escalation_trigger": st.sidebar.checkbox("Escalation Trigger Detection"), "culprit_identification": st.sidebar.checkbox("Culprit Identification"), "email_summary": st.sidebar.checkbox("Email Summary"), "language_detection": st.sidebar.checkbox("Language Detection"), "entity_recognition": st.sidebar.checkbox("Entity Recognition"), "response_time_analysis": st.sidebar.checkbox("Response Time Analysis"), "attachment_analysis": st.sidebar.checkbox("Attachment Analysis"), "customer_tone_analysis": st.sidebar.checkbox("Customer Tone Analysis"), "department_identification": st.sidebar.checkbox("Department Identification"), "priority_identification": st.sidebar.checkbox("Priority Identification"), "urgency_assessment": st.sidebar.checkbox("Urgency Assessment"), "action_item_priority": st.sidebar.checkbox("Action Item Priority"), "deadline_detection": st.sidebar.checkbox("Deadline Detection"), "email_chain_analysis": st.sidebar.checkbox("Email Chain Analysis"), "executive_summary": st.sidebar.checkbox("Executive Summary"), "actionable_resolution": st.sidebar.checkbox("Actionable Resolution Detection"), "response_completeness": st.sidebar.checkbox("Response Completeness"), "agreement_identification": st.sidebar.checkbox("Agreement Identification"), "feedback_analysis": st.sidebar.checkbox("Feedback Analysis"), "threat_detection": st.sidebar.checkbox("Threat Detection"), "response_quality_assessment": st.sidebar.checkbox("Response Quality Assessment"), } # Input Email Section email_content = st.text_area("Paste your email content here:", height=200) MAX_EMAIL_LENGTH = 1000 # Cache the AI responses to improve performance @st.cache_data(ttl=3600) def get_ai_response(prompt, email_content): try: model = genai.GenerativeModel("gemini-1.5-flash") response = model.generate_content(prompt + email_content[:MAX_EMAIL_LENGTH]) return response.text.strip() except Exception as e: st.error(f"Error: {e}") return "" # Sentiment Analysis def get_sentiment(email_content): positive_keywords = ["happy", "good", "great", "excellent", "love"] negative_keywords = ["sad", "bad", "hate", "angry", "disappointed"] sentiment_score = 0 for word in email_content.split(): if word.lower() in positive_keywords: sentiment_score += 1 elif word.lower() in negative_keywords: sentiment_score -= 1 return sentiment_score # Grammar Check (basic spelling correction) def grammar_check(text): corrections = { "recieve": "receive", "adress": "address", "teh": "the", "occured": "occurred" } for word, correct in corrections.items(): text = text.replace(word, correct) return text # Key Phrase Extraction def extract_key_phrases(text): key_phrases = re.findall(r"\b[A-Za-z]{4,}\b", text) return list(set(key_phrases)) # Remove duplicates # Word Cloud Generation def generate_wordcloud(text): wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text) return wordcloud # Export to PDF def export_pdf(text): pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) pdf.multi_cell(0, 10, text) return pdf.output(dest='S').encode('latin1') # Actionable Items Extraction def extract_actionable_items(text): actions = [line for line in text.split("\n") if "to" in line.lower() or "action" in line.lower()] return actions # Root Cause Detection def detect_root_cause(text): if "lack of communication" in text.lower(): return "Root Cause: Lack of communication between teams." elif "delayed response" in text.lower(): return "Root Cause: Delayed response from the team." return "Root Cause: Unknown" # Risk Assessment def assess_risk(text): if "urgent" in text.lower(): return "Risk Assessment: High risk due to urgency of the issue." return "Risk Assessment: Normal risk." # Severity Detection def detect_severity(text): if "urgent" in text.lower(): return "Severity: High" return "Severity: Normal" # Critical Keyword Identification def identify_critical_keywords(text): critical_keywords = ["urgent", "problem", "issue", "failure"] critical_terms = [word for word in text.split() if word.lower() in critical_keywords] return critical_terms # Escalation Trigger Detection def detect_escalation_trigger(text): if "escalate" in text.lower() or "critical" in text.lower(): return "Escalation Trigger: Immediate escalation required." return "Escalation Trigger: No immediate escalation needed." # Culprit Identification def identify_culprit(text): if "manager" in text.lower(): return "Culprit: The manager might be responsible." elif "team" in text.lower(): return "Culprit: The team might be responsible." return "Culprit: Unknown" # Language Detection def detect_language(text): try: lang = langdetect.detect(text) return lang except Exception as e: return "Unknown" # Entity Recognition (dummy for illustration) def entity_recognition(text): entities = ["Email", "Action", "Team", "Manager"] return entities # Response Time Analysis def response_time_analysis(text): if "responded" in text.lower(): return "Response Time Analysis: Response time is within acceptable range." return "Response Time Analysis: Response time is not clear." # Attachment Analysis (Dummy) def attachment_analysis(text): if "attached" in text.lower(): return "Attachment Analysis: The email contains an attachment." return "Attachment Analysis: No attachment found." # Customer Tone Analysis def customer_tone_analysis(text): positive_tone_keywords = ["thank you", "appreciate", "grateful"] negative_tone_keywords = ["disappointed", "frustrated", "unhappy"] tone = "Neutral" if any(word in text.lower() for word in positive_tone_keywords): tone = "Positive" elif any(word in text.lower() for word in negative_tone_keywords): tone = "Negative" return f"Customer Tone Analysis: {tone}" # Department Identification (Dummy) def department_identification(text): if "sales" in text.lower(): return "Department Identification: Sales" elif "support" in text.lower(): return "Department Identification: Support" return "Department Identification: Unknown" # Priority Identification def identify_priority(text): if "high priority" in text.lower(): return "Priority: High" elif "low priority" in text.lower(): return "Priority: Low" return "Priority: Normal" # Urgency Assessment def assess_urgency(text): if "urgent" in text.lower(): return "Urgency Assessment: High urgency." return "Urgency Assessment: Normal urgency." # Action Item Priority def action_item_priority(text): if "urgent" in text.lower() or "immediate" in text.lower(): return "Action Item Priority: High" return "Action Item Priority: Normal" # Deadline Detection def detect_deadline(text): deadlines = ["due", "deadline", "by"] if any(word in text.lower() for word in deadlines): return "Deadline Detection: Contains a deadline." return "Deadline Detection: No deadline mentioned." # Email Chain Analysis def email_chain_analysis(text): if "forwarded" in text.lower() or "re:" in text.lower(): return "Email Chain Analysis: This is part of an email chain." return "Email Chain Analysis: This email is standalone." # Executive Summary def executive_summary(text): return f"Executive Summary: {text[:200]}..." # Actionable Resolution Detection def actionable_resolution(text): if "resolve" in text.lower() or "solution" in text.lower(): return "Actionable Resolution: The email includes a resolution or solution." return "Actionable Resolution: No actionable resolution found." # Response Completeness def response_completeness(text): if "thank you" in text.lower() and "best regards" in text.lower(): return "Response Completeness: Response is complete." return "Response Completeness: Response is incomplete." # Agreement Identification def agreement_identification(text): if "agree" in text.lower(): return "Agreement Identification: The email includes an agreement." return "Agreement Identification: No agreement found." # Feedback Analysis def feedback_analysis(text): if "feedback" in text.lower(): return "Feedback Analysis: Feedback present." return "Feedback Analysis: No feedback found." # Threat Detection def threat_detection(text): threat_keywords = ["threat", "warning", "danger"] if any(word in text.lower() for word in threat_keywords): return "Threat Detection: Threat detected." return "Threat Detection: No threat detected." # Response Quality Assessment def response_quality_assessment(text): if len(text.split()) < 20: return "Response Quality: Incomplete response." return "Response Quality: Complete response." # Visualizing Word Frequency def visualize_word_frequency(word_counts): plt.figure(figsize=(10, 5)) plt.bar(word_counts.keys(), word_counts.values()) plt.xticks(rotation=45) plt.title("Word Frequency") plt.tight_layout() # Exporting Insights as JSON, Text, and PDF def export_insights(text_data, summary): pdf_buffer = BytesIO(export_pdf(text_data)) buffer_txt = BytesIO(text_data.encode("utf-8")) buffer_json = BytesIO(json.dumps(summary, indent=4).encode("utf-8")) st.download_button("Download as Text", data=buffer_txt, file_name="analysis.txt", mime="text/plain") st.download_button("Download as PDF", data=pdf_buffer, file_name="analysis.pdf", mime="application/pdf") st.download_button("Download as JSON", data=buffer_json, file_name="analysis.json", mime="application/json") # File upload for email content uploaded_file = st.file_uploader("Upload Email File", type=["txt", "eml"]) if uploaded_file is not None: email_content = uploaded_file.read().decode("utf-8") # Layout for displaying results if email_content and st.button("Generate Insights"): try: # Generate AI-like responses (using google.generativeai for content generation) summary = get_ai_response("Summarize the email in a concise, actionable format:\n\n", email_content) response = get_ai_response("Draft a professional response to this email:\n\n", email_content) if features["response"] else "" highlights = get_ai_response("Highlight key points and actions in this email:\n\n", email_content) if features["highlights"] else "" # Sentiment Analysis sentiment = get_sentiment(email_content) sentiment_label = "Positive" if sentiment > 0 else "Negative" if sentiment < 0 else "Neutral" # Generate Word Cloud wordcloud = generate_wordcloud(email_content) wordcloud_fig = plt.figure(figsize=(10, 5)) plt.imshow(wordcloud, interpolation="bilinear") plt.axis("off") plt.show() # Display Results st.subheader("AI Summary") st.write(summary) if features["response"]: st.subheader("Suggested Response") st.write(response) if features["highlights"]: st.subheader("Key Highlights") st.write(highlights) st.subheader("Sentiment Analysis") st.write(f"**Sentiment:** {sentiment_label} (Score: {sentiment})") if features["grammar_check"]: corrected_text = grammar_check(email_content) st.subheader("Grammar Check") st.write("Corrected Text:") st.write(corrected_text) if features["key_phrases"]: key_phrases = extract_key_phrases(email_content) st.subheader("Key Phrases Extracted") st.write(key_phrases) if features["wordcloud"]: st.subheader("Word Cloud") st.pyplot(wordcloud_fig) if features["actionable_items"]: actionable_items = extract_actionable_items(email_content) st.subheader("Actionable Items") st.write(actionable_items) # RCA and Insights Features if features["root_cause"]: root_cause = detect_root_cause(email_content) st.subheader("Root Cause Detection") st.write(root_cause) if features["risk_assessment"]: risk = assess_risk(email_content) st.subheader("Risk Assessment") st.write(risk) if features["severity_detection"]: severity = detect_severity(email_content) st.subheader("Severity Detection") st.write(severity) if features["critical_keywords"]: critical_terms = identify_critical_keywords(email_content) st.subheader("Critical Keywords Identified") st.write(critical_terms) if features["escalation_trigger"]: escalation_trigger = detect_escalation_trigger(email_content) st.subheader("Escalation Trigger") st.write(escalation_trigger) if features["culprit_identification"]: culprit = identify_culprit(email_content) st.subheader("Culprit Identification") st.write(culprit) if features["response_time_analysis"]: response_time = response_time_analysis(email_content) st.subheader("Response Time Analysis") st.write(response_time) if features["attachment_analysis"]: attachment = attachment_analysis(email_content) st.subheader("Attachment Analysis") st.write(attachment) if features["customer_tone_analysis"]: tone = customer_tone_analysis(email_content) st.subheader("Customer Tone Analysis") st.write(tone) if features["department_identification"]: department = department_identification(email_content) st.subheader("Department Identification") st.write(department) if features["priority_identification"]: priority = identify_priority(email_content) st.subheader("Priority Identification") st.write(priority) if features["urgency_assessment"]: urgency = assess_urgency(email_content) st.subheader("Urgency Assessment") st.write(urgency) if features["action_item_priority"]: item_priority = action_item_priority(email_content) st.subheader("Action Item Priority") st.write(item_priority) if features["deadline_detection"]: deadline = detect_deadline(email_content) st.subheader("Deadline Detection") st.write(deadline) if features["email_chain_analysis"]: email_chain = email_chain_analysis(email_content) st.subheader("Email Chain Analysis") st.write(email_chain) if features["executive_summary"]: executive = executive_summary(email_content) st.subheader("Executive Summary") st.write(executive) if features["actionable_resolution"]: resolution = actionable_resolution(email_content) st.subheader("Actionable Resolution Detection") st.write(resolution) if features["response_completeness"]: completeness = response_completeness(email_content) st.subheader("Response Completeness") st.write(completeness) if features["agreement_identification"]: agreement = agreement_identification(email_content) st.subheader("Agreement Identification") st.write(agreement) if features["feedback_analysis"]: feedback = feedback_analysis(email_content) st.subheader("Feedback Analysis") st.write(feedback) if features["threat_detection"]: threat = threat_detection(email_content) st.subheader("Threat Detection") st.write(threat) if features["response_quality_assessment"]: quality = response_quality_assessment(email_content) st.subheader("Response Quality Assessment") st.write(quality) # Export options export_content = f"Summary:\n{summary}\n\nResponse:\n{response}\n\nHighlights:\n{highlights}\n\nSentiment Analysis: {sentiment_label} (Score: {sentiment})" export_insights(export_content, {"summary": summary, "response": response, "highlights": highlights, "sentiment": sentiment_label}) except Exception as e: st.error(f"Error: {e}") else: st.info("Paste email content or upload an email file and click 'Generate Insights' to start.")