import os import io import re import sys import uuid import math import traceback from datetime import datetime import numpy as np import pandas as pd # Headless matplotlib import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import gradio as gr # ------------------ NLP / Modeling ------------------ import nltk from nltk.corpus import stopwords from nltk.sentiment import SentimentIntensityAnalyzer # Transformers sentiment (optional: advanced) from transformers import pipeline # Time-series & stats import ruptures as rpt # PDF reporting from reportlab.lib.pagesizes import A4 from reportlab.pdfgen import canvas from reportlab.lib.units import cm from reportlab.lib.utils import ImageReader # ------------------ NLTK bootstrap ------------------ def _ensure_nltk(): try: nltk.data.find("tokenizers/punkt") except LookupError: nltk.download("punkt", quiet=True) try: nltk.data.find("corpora/stopwords") except LookupError: nltk.download("stopwords", quiet=True) try: nltk.data.find("sentiment/vader_lexicon.zip") except LookupError: nltk.download("vader_lexicon", quiet=True) _ensure_nltk() try: EN_STOPWORDS = set(stopwords.words("english")) except Exception: EN_STOPWORDS = set() def init_vader(): try: return SentimentIntensityAnalyzer() except Exception: nltk.download("vader_lexicon", quiet=True) return SentimentIntensityAnalyzer() VADER = init_vader() # ------------------ Transformers init (lazy) ------------------ _cached_pipe = None def get_roberta_pipeline(): global _cached_pipe if _cached_pipe is None: model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest" try: _cached_pipe = pipeline("sentiment-analysis", model=model_name, tokenizer=model_name, truncation=True) except Exception: model_name = "cardiffnlp/twitter-roberta-base-sentiment" _cached_pipe = pipeline("sentiment-analysis", model=model_name, tokenizer=model_name, truncation=True) return _cached_pipe # ------------------ Helpers ------------------ TOKEN_PATTERN = re.compile(r"[A-Za-z']+") URL_RE = re.compile(r"https?://\S+") def tokenize(text: str): if not isinstance(text, str): text = "" if pd.isna(text) else str(text) text = URL_RE.sub("", text) toks = [t.lower() for t in TOKEN_PATTERN.findall(text)] toks = [t for t in toks if t not in EN_STOPWORDS and len(t) > 1] return toks def read_csv_safe(path): last_err = None for enc in [None, "utf-8", "utf-8-sig", "latin-1"]: try: if enc is None: return pd.read_csv(path, header=None) return pd.read_csv(path, header=None, encoding=enc) except Exception as e: last_err = e raise last_err def coerce_sentiment140(df): if df.shape[1] >= 6: df = df.iloc[:, :6] df.columns = ["target", "ids", "date", "flag", "user", "text"] return df def vader_score(text): vs = VADER.polarity_scores(text if isinstance(text, str) else "") return vs["compound"] def classify_label(score, pos_thr=0.05, neg_thr=-0.05): if score >= pos_thr: return "Positive" elif score <= neg_thr: return "Negative" else: return "Neutral" def aggregate_ts(df, date_col, score_col, freq="D", ma_window=7, ci=True): s = df[[date_col, score_col]].dropna() s[date_col] = pd.to_datetime(s[date_col], errors="coerce") s = s.dropna(subset=[date_col]) s = s.set_index(date_col).sort_index() agg = s.resample(freq).mean() if ma_window and ma_window > 1: agg["ma"] = agg[score_col].rolling(ma_window, min_periods=1).mean() else: agg["ma"] = agg[score_col] if ci: std = agg[score_col].rolling(ma_window, min_periods=2).std(ddof=1) n = s.resample(freq).count()[score_col].rolling(ma_window, min_periods=1).sum() se = std / np.sqrt(np.maximum(n, 1)) agg["ci_low"] = agg["ma"] - 1.96 * se agg["ci_high"] = agg["ma"] + 1.96 * se return agg def rolling_z_anomalies(series, window=14, z=2.5): x = series.values.astype(float) if len(x) < max(5, window): return np.array([False]*len(x)) roll_mean = pd.Series(x).rolling(window, min_periods=5).mean() roll_std = pd.Series(x).rolling(window, min_periods=5).std(ddof=1) zscores = (pd.Series(x) - roll_mean) / (roll_std.replace(0, np.nan)) return (zscores.abs() >= z).fillna(False).values def changepoints(series, penalty=6): x = series.dropna().values.astype(float) if len(x) < 10: return [] algo = rpt.Pelt(model="rbf").fit(x) try: result = algo.predict(pen=penalty) except Exception: return [] cps = [series.index[min(len(series)-1, i-1)] for i in result[:-1]] return cps def _save_fig(fig, name): os.makedirs("charts", exist_ok=True) path = os.path.join("charts", f"{name}_{uuid.uuid4().hex}.png") fig.savefig(path, format="png", dpi=150, bbox_inches="tight") plt.close(fig) return path def plot_trend(agg, title="Sentiment Trend", show_ci=True, anomalies=None, cps=None): fig = plt.figure() ax = plt.gca() ax.plot(agg.index, agg["ma"], label="Moving Avg") ax.plot(agg.index, agg.iloc[:,0], alpha=0.3, label="Mean") if show_ci and "ci_low" in agg and "ci_high" in agg: ax.fill_between(agg.index, agg["ci_low"], agg["ci_high"], alpha=0.2, label="95% CI") if anomalies is not None and anomalies.any(): ax.scatter(agg.index[anomalies], agg["ma"][anomalies], marker="x", s=40, label="Anomaly") if cps: for cp in cps: ax.axvline(cp, linestyle="--", alpha=0.6, label="Change-point") ax.set_title(title) ax.set_ylabel("Sentiment (−1 to 1)") ax.set_xlabel("Date") ax.legend(loc="best") fig.autofmt_xdate() return _save_fig(fig, "trend") def plot_pie(series, title="Sentiment Distribution"): counts = series.value_counts() fig = plt.figure() plt.pie(counts.values, labels=counts.index, autopct="%1.1f%%", startangle=90) plt.title(title) return _save_fig(fig, "pie") def top_terms(df_text, top_k=20): from collections import Counter tokens = [] hashtags = [] mentions = [] for t in df_text: if not isinstance(t, str): continue hashtags += [h.lower() for h in re.findall(r"#\w+", t)] mentions += [m.lower() for m in re.findall(r"@\w+", t)] tokens += tokenize(t) tok_top = Counter(tokens).most_common(top_k) hash_top = Counter(hashtags).most_common(top_k) ment_top = Counter(mentions).most_common(top_k) return tok_top, hash_top, ment_top def ngram_top(df_text, n=2, top_k=15): from collections import Counter ngrams = Counter() for t in df_text: toks = tokenize(t) for i in range(len(toks)-n+1): ngrams.update([" ".join(toks[i:i+n])]) return ngrams.most_common(top_k) # ------------------ Filters ------------------ def apply_keyword_filter(df, tcol, mode, kw_text): if not kw_text or not isinstance(kw_text, str) or kw_text.strip() == "": return df.copy(), None kws = [k.strip() for k in re.split(r"[,\\n]+", kw_text) if k.strip()] if len(kws) == 0: return df.copy(), None s = df[tcol].astype(str).fillna("") if mode == "Any keyword (OR)": mask = s.str.contains("|".join([re.escape(k) for k in kws]), case=False, na=False) elif mode == "All keywords (AND)": mask = pd.Series(True, index=s.index) for k in kws: mask &= s.str.contains(re.escape(k), case=False, na=False) else: # Regex try: mask = s.str.contains(kw_text, case=False, na=False, regex=True) except Exception: mask = pd.Series(False, index=s.index) return df[mask].copy(), kws def apply_date_range(df, dcol, start, end): if not dcol: return df if start: start_dt = pd.to_datetime(start, errors="coerce") df = df[pd.to_datetime(df[dcol], errors="coerce") >= start_dt] if end: end_dt = pd.to_datetime(end, errors="coerce") df = df[pd.to_datetime(df[dcol], errors="coerce") <= end_dt] return df # ------------------ PDF Report ------------------ def _draw_wrapped_text(c, text, x, y, max_width_cm=17, leading=14): from reportlab.lib.styles import getSampleStyleSheet from reportlab.platypus import Paragraph from reportlab.lib.units import cm from reportlab.lib.styles import ParagraphStyle from reportlab.lib import colors style = ParagraphStyle(name="Body", fontName="Helvetica", fontSize=10, leading=leading, textColor=colors.black) from reportlab.platypus import Frame frame = Frame(x*cm, y*cm, max_width_cm*cm, 100*cm, showBoundary=0) story = [Paragraph(text.replace("\\n","
"), style)] frame.addFromList(story, c) def build_pdf_report(out_path, title, meta, trend_img, pie_img, terms, ngrams): c = canvas.Canvas(out_path, pagesize=A4) W, H = A4 # Cover c.setFont("Helvetica-Bold", 16) c.drawString(2*cm, H-2*cm, title) c.setFont("Helvetica", 10) y = H-3*cm for line in meta: c.drawString(2*cm, y, line) y -= 0.6*cm c.showPage() # Trend if trend_img and os.path.exists(trend_img): c.drawString(2*cm, H-2*cm, "Sentiment Trend") img = ImageReader(trend_img) c.drawImage(img, 2*cm, 4*cm, width=W-4*cm, height=H-7*cm, preserveAspectRatio=True, anchor='c') c.showPage() # Pie if pie_img and os.path.exists(pie_img): c.drawString(2*cm, H-2*cm, "Sentiment Distribution") img = ImageReader(pie_img) c.drawImage(img, 2*cm, 6*cm, width=W-4*cm, height=H-9*cm, preserveAspectRatio=True, anchor='c') c.showPage() # Terms c.setFont("Helvetica-Bold", 12) c.drawString(2*cm, H-2*cm, "Top Terms / Hashtags / Mentions") c.setFont("Helvetica", 10) y = H-3*cm for sec_title, pairs in terms.items(): c.setFont("Helvetica-Bold", 11) c.drawString(2*cm, y, sec_title) y -= 0.5*cm c.setFont("Helvetica", 10) for w, cnt in pairs[:25]: c.drawString(2.8*cm, y, f"- {w}: {cnt}") y -= 0.45*cm if y < 3*cm: c.showPage() y = H-2*cm y -= 0.3*cm if y < 3*cm: c.showPage() y = H-2*cm # Bigrams c.setFont("Helvetica-Bold", 12) c.drawString(2*cm, H-2*cm, "Top Bigrams") c.setFont("Helvetica", 10) y = H-3*cm for w, cnt in ngrams[:25]: c.drawString(2.8*cm, y, f"- {w}: {cnt}") y -= 0.45*cm if y < 3*cm: c.showPage() y = H-2*cm c.save() return out_path # ------------------ Gradio UI ------------------ with gr.Blocks(title="Advanced Sentiment Trend Analyzer") as demo: gr.Markdown("# 📈 Advanced Customer Sentiment Trend Analyzer\nIndustry-grade tool for tracking sentiment over time using Sentiment140 or similar datasets.") with gr.Row(): with gr.Column(): file = gr.File(label="Upload Sentiment140 CSV (or similar). 6 columns expected.", file_count="single", file_types=[".csv"]) engine = gr.Radio(choices=["VADER (fast)", "RoBERTa (accurate)"], value="VADER (fast)", label="Sentiment Engine") text_col = gr.Dropdown(label="Text column", choices=[], value=None) date_col = gr.Dropdown(label="Date column", choices=[], value=None, allow_custom_value=True) gr.Markdown("### Filters") kw_text = gr.Textbox(label="Keyword filter (comma-separated OR regex)", placeholder="e.g., refund, delayed OR ^outage|downtime", lines=2) kw_mode = gr.Radio(choices=["Any keyword (OR)", "All keywords (AND)", "Regex"], value="Any keyword (OR)", label="Keyword mode") start_date = gr.Textbox(label="Start date (YYYY-MM-DD)", placeholder="e.g., 2009-04-06") end_date = gr.Textbox(label="End date (YYYY-MM-DD)", placeholder="e.g., 2009-04-20") gr.Markdown("### Time Series") agg_freq = gr.Radio(choices=["D","W","M"], value="D", label="Aggregate by (D/W/M)") ma_window = gr.Slider(3, 60, value=7, step=1, label="Moving average window (days)") show_ci = gr.Checkbox(value=True, label="Show 95% confidence band") z_window = gr.Slider(7, 90, value=21, step=1, label="Anomaly rolling window") z_thresh = gr.Slider(1.5, 4.0, value=2.5, step=0.1, label="Anomaly z-score threshold") cp_penalty = gr.Slider(2, 20, value=6, step=1, label="Change-point penalty (higher=fewer)") gr.Markdown("### Insights") top_k = gr.Slider(5, 50, value=20, step=1, label="Top tokens/hashtags/mentions") gen_ngrams = gr.Checkbox(value=True, label="Show Top Bigrams") run = gr.Button("Run Analysis 🚀", variant="primary") with gr.Column(): trend_img = gr.Image(label="Trend Chart", type="filepath") pie_img = gr.Image(label="Sentiment Distribution", type="filepath") terms_md = gr.Markdown(label="Top Terms / Hashtags / Mentions") ngrams_md = gr.Markdown(label="Top Bigrams") debug_md = gr.Markdown(label="Debug Info") export = gr.File(label="Download Enriched CSV") pdf_out = gr.File(label="Download PDF Report") def on_upload(f): if f is None: return gr.update(choices=[], value=None), gr.update(choices=[], value=None) df = read_csv_safe(f.name) df = coerce_sentiment140(df) cols = df.columns.tolist() text_guess = "text" if "text" in cols else (cols[-1] if cols else None) date_guess = "date" if "date" in cols else None return gr.update(choices=cols, value=text_guess), gr.update(choices=cols, value=date_guess) file.change(on_upload, inputs=[file], outputs=[text_col, date_col]) def run_pipeline(f, eng, tcol, dcol, kwtext, kwmode, sd, ed, freq, maw, showci, zwin, zthr, cpp, topk, want_ngrams): if f is None: raise gr.Error("Please upload a CSV.") try: df = read_csv_safe(f.name) df = coerce_sentiment140(df) cols = df.columns.tolist() if tcol not in cols: raise gr.Error(f"Text column '{tcol}' not in {cols}") if dcol and dcol not in cols: raise gr.Error(f"Date column '{dcol}' not in {cols}") # Parse date column early for filters if dcol: df[dcol] = pd.to_datetime(df[dcol], errors="coerce") # Keyword filter df, used_kws = apply_keyword_filter(df, tcol, kwmode, kwtext) # Date range filter df = apply_date_range(df, dcol, sd, ed) if df.empty: raise gr.Error("No rows after applying filters. Relax filters or clear them.") # Scoring if eng.startswith("VADER"): df["_score"] = df[tcol].astype(str).apply(vader_score) else: pipe = get_roberta_pipeline() texts = df[tcol].astype(str).tolist() scores = [] batch = 64 for i in range(0, len(texts), batch): chunk = texts[i:i+batch] res = pipe(chunk, truncation=True) for r in res: lbl, sc = r["label"].upper(), float(r["score"]) if "NEG" in lbl: scores.append(-sc) elif "POS" in lbl: scores.append(sc) else: scores.append(0.0) df["_score"] = scores df["_label"] = df["_score"].apply(classify_label) if not dcol: raise gr.Error("Please choose a date column for trend analysis.") agg = aggregate_ts(df, dcol, "_score", freq=freq, ma_window=int(maw), ci=showci) anoms = rolling_z_anomalies(agg["ma"], window=int(zwin), z=float(zthr)) cps = changepoints(agg["ma"], penalty=int(cpp)) trend_path = plot_trend(agg, title=f"Sentiment Trend ({eng}, {freq}-agg, MA={maw})", show_ci=showci, anomalies=anoms, cps=cps) pie_path = plot_pie(df["_label"], title="Overall Sentiment Distribution") # Terms tok_top, hash_top, ment_top = top_terms(df[tcol], top_k=int(topk)) terms_lines = ["### Top Tokens", ""] + [f"- {w}: {c}" for w,c in tok_top] terms_lines += ["", "### Top Hashtags", ""] + [f"- {w}: {c}" for w,c in hash_top] terms_lines += ["", "### Top Mentions", ""] + [f"- {w}: {c}" for w,c in ment_top] terms_md = "\n".join(terms_lines) # N-grams if want_ngrams: ng = ngram_top(df[tcol], n=2, top_k=15) ngrams_md = "### Top Bigrams\n\n" + "\n".join([f"- {w}: {c}" for w,c in ng]) ng_list = ng else: ngrams_md = "### Top Bigrams\n\n(Disabled)" ng_list = [] # Export CSV export_path = "enriched_sentiment.csv" df.to_csv(export_path, index=False) # Build PDF meta = [ f"Engine: {eng}", f"Rows (after filters): {len(df)}", f"Date agg: {freq}, MA window: {maw}, CI: {bool(showci)}", f"Anomaly window: {zwin}, z-threshold: {zthr}, CP penalty: {cpp}", f"Filters: keywords={kwtext or 'None'} mode={kwmode}; date_range={sd or 'N/A'} to {ed or 'N/A'}", ] terms_dict = {"Top Tokens": tok_top, "Top Hashtags": hash_top, "Top Mentions": ment_top} pdf_path = "sentiment_report.pdf" build_pdf_report(pdf_path, "Customer Sentiment Trend Report", meta, trend_path, pie_path, terms_dict, ng_list) dbg = "#### Data shape\n" + str(df.shape) + "\n\n#### Columns\n" + str(df.dtypes) + "\n" return trend_path, pie_path, terms_md, ngrams_md, dbg, export_path, pdf_path except Exception as e: tb = traceback.format_exc() print(tb, file=sys.stderr) raise gr.Error(f"RuntimeError: {type(e).__name__}: {e}") run.click( run_pipeline, inputs=[file, engine, text_col, date_col, kw_text, kw_mode, start_date, end_date, agg_freq, ma_window, show_ci, z_window, z_thresh, cp_penalty, top_k, gen_ngrams], outputs=[trend_img, pie_img, terms_md, ngrams_md, debug_md, export, pdf_out] ) if __name__ == "__main__": port = int(os.environ.get("PORT", "7860")) demo.launch(server_name="0.0.0.0", server_port=port)