| |
|
| | import os |
| | import io |
| | import re |
| | import sys |
| | import uuid |
| | import math |
| | import traceback |
| | from datetime import datetime |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | |
| | import matplotlib |
| | matplotlib.use("Agg") |
| | import matplotlib.pyplot as plt |
| |
|
| | import gradio as gr |
| |
|
| | |
| | import nltk |
| | from nltk.corpus import stopwords |
| | from nltk.sentiment import SentimentIntensityAnalyzer |
| |
|
| | |
| | from transformers import pipeline |
| |
|
| | |
| | import ruptures as rpt |
| |
|
| | |
| | from reportlab.lib.pagesizes import A4 |
| | from reportlab.pdfgen import canvas |
| | from reportlab.lib.units import cm |
| | from reportlab.lib.utils import ImageReader |
| |
|
| | |
| | def _ensure_nltk(): |
| | try: |
| | nltk.data.find("tokenizers/punkt") |
| | except LookupError: |
| | nltk.download("punkt", quiet=True) |
| | try: |
| | nltk.data.find("corpora/stopwords") |
| | except LookupError: |
| | nltk.download("stopwords", quiet=True) |
| | try: |
| | nltk.data.find("sentiment/vader_lexicon.zip") |
| | except LookupError: |
| | nltk.download("vader_lexicon", quiet=True) |
| |
|
| | _ensure_nltk() |
| | try: |
| | EN_STOPWORDS = set(stopwords.words("english")) |
| | except Exception: |
| | EN_STOPWORDS = set() |
| |
|
| | def init_vader(): |
| | try: |
| | return SentimentIntensityAnalyzer() |
| | except Exception: |
| | nltk.download("vader_lexicon", quiet=True) |
| | return SentimentIntensityAnalyzer() |
| |
|
| | VADER = init_vader() |
| |
|
| | |
| | _cached_pipe = None |
| | def get_roberta_pipeline(): |
| | global _cached_pipe |
| | if _cached_pipe is None: |
| | model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest" |
| | try: |
| | _cached_pipe = pipeline("sentiment-analysis", model=model_name, tokenizer=model_name, truncation=True) |
| | except Exception: |
| | model_name = "cardiffnlp/twitter-roberta-base-sentiment" |
| | _cached_pipe = pipeline("sentiment-analysis", model=model_name, tokenizer=model_name, truncation=True) |
| | return _cached_pipe |
| |
|
| | |
| | TOKEN_PATTERN = re.compile(r"[A-Za-z']+") |
| | URL_RE = re.compile(r"https?://\S+") |
| |
|
| | def tokenize(text: str): |
| | if not isinstance(text, str): |
| | text = "" if pd.isna(text) else str(text) |
| | text = URL_RE.sub("", text) |
| | toks = [t.lower() for t in TOKEN_PATTERN.findall(text)] |
| | toks = [t for t in toks if t not in EN_STOPWORDS and len(t) > 1] |
| | return toks |
| |
|
| | def read_csv_safe(path): |
| | last_err = None |
| | for enc in [None, "utf-8", "utf-8-sig", "latin-1"]: |
| | try: |
| | if enc is None: |
| | return pd.read_csv(path, header=None) |
| | return pd.read_csv(path, header=None, encoding=enc) |
| | except Exception as e: |
| | last_err = e |
| | raise last_err |
| |
|
| | def coerce_sentiment140(df): |
| | if df.shape[1] >= 6: |
| | df = df.iloc[:, :6] |
| | df.columns = ["target", "ids", "date", "flag", "user", "text"] |
| | return df |
| |
|
| | def vader_score(text): |
| | vs = VADER.polarity_scores(text if isinstance(text, str) else "") |
| | return vs["compound"] |
| |
|
| | def classify_label(score, pos_thr=0.05, neg_thr=-0.05): |
| | if score >= pos_thr: |
| | return "Positive" |
| | elif score <= neg_thr: |
| | return "Negative" |
| | else: |
| | return "Neutral" |
| |
|
| | def aggregate_ts(df, date_col, score_col, freq="D", ma_window=7, ci=True): |
| | s = df[[date_col, score_col]].dropna() |
| | s[date_col] = pd.to_datetime(s[date_col], errors="coerce") |
| | s = s.dropna(subset=[date_col]) |
| | s = s.set_index(date_col).sort_index() |
| | agg = s.resample(freq).mean() |
| | if ma_window and ma_window > 1: |
| | agg["ma"] = agg[score_col].rolling(ma_window, min_periods=1).mean() |
| | else: |
| | agg["ma"] = agg[score_col] |
| | if ci: |
| | std = agg[score_col].rolling(ma_window, min_periods=2).std(ddof=1) |
| | n = s.resample(freq).count()[score_col].rolling(ma_window, min_periods=1).sum() |
| | se = std / np.sqrt(np.maximum(n, 1)) |
| | agg["ci_low"] = agg["ma"] - 1.96 * se |
| | agg["ci_high"] = agg["ma"] + 1.96 * se |
| | return agg |
| |
|
| | def rolling_z_anomalies(series, window=14, z=2.5): |
| | x = series.values.astype(float) |
| | if len(x) < max(5, window): |
| | return np.array([False]*len(x)) |
| | roll_mean = pd.Series(x).rolling(window, min_periods=5).mean() |
| | roll_std = pd.Series(x).rolling(window, min_periods=5).std(ddof=1) |
| | zscores = (pd.Series(x) - roll_mean) / (roll_std.replace(0, np.nan)) |
| | return (zscores.abs() >= z).fillna(False).values |
| |
|
| | def changepoints(series, penalty=6): |
| | x = series.dropna().values.astype(float) |
| | if len(x) < 10: |
| | return [] |
| | algo = rpt.Pelt(model="rbf").fit(x) |
| | try: |
| | result = algo.predict(pen=penalty) |
| | except Exception: |
| | return [] |
| | cps = [series.index[min(len(series)-1, i-1)] for i in result[:-1]] |
| | return cps |
| |
|
| | def _save_fig(fig, name): |
| | os.makedirs("charts", exist_ok=True) |
| | path = os.path.join("charts", f"{name}_{uuid.uuid4().hex}.png") |
| | fig.savefig(path, format="png", dpi=150, bbox_inches="tight") |
| | plt.close(fig) |
| | return path |
| |
|
| | def plot_trend(agg, title="Sentiment Trend", show_ci=True, anomalies=None, cps=None): |
| | fig = plt.figure() |
| | ax = plt.gca() |
| | ax.plot(agg.index, agg["ma"], label="Moving Avg") |
| | ax.plot(agg.index, agg.iloc[:,0], alpha=0.3, label="Mean") |
| | if show_ci and "ci_low" in agg and "ci_high" in agg: |
| | ax.fill_between(agg.index, agg["ci_low"], agg["ci_high"], alpha=0.2, label="95% CI") |
| | if anomalies is not None and anomalies.any(): |
| | ax.scatter(agg.index[anomalies], agg["ma"][anomalies], marker="x", s=40, label="Anomaly") |
| | if cps: |
| | for cp in cps: |
| | ax.axvline(cp, linestyle="--", alpha=0.6, label="Change-point") |
| | ax.set_title(title) |
| | ax.set_ylabel("Sentiment (−1 to 1)") |
| | ax.set_xlabel("Date") |
| | ax.legend(loc="best") |
| | fig.autofmt_xdate() |
| | return _save_fig(fig, "trend") |
| |
|
| | def plot_pie(series, title="Sentiment Distribution"): |
| | counts = series.value_counts() |
| | fig = plt.figure() |
| | plt.pie(counts.values, labels=counts.index, autopct="%1.1f%%", startangle=90) |
| | plt.title(title) |
| | return _save_fig(fig, "pie") |
| |
|
| | def top_terms(df_text, top_k=20): |
| | from collections import Counter |
| | tokens = [] |
| | hashtags = [] |
| | mentions = [] |
| | for t in df_text: |
| | if not isinstance(t, str): |
| | continue |
| | hashtags += [h.lower() for h in re.findall(r"#\w+", t)] |
| | mentions += [m.lower() for m in re.findall(r"@\w+", t)] |
| | tokens += tokenize(t) |
| | tok_top = Counter(tokens).most_common(top_k) |
| | hash_top = Counter(hashtags).most_common(top_k) |
| | ment_top = Counter(mentions).most_common(top_k) |
| | return tok_top, hash_top, ment_top |
| |
|
| | def ngram_top(df_text, n=2, top_k=15): |
| | from collections import Counter |
| | ngrams = Counter() |
| | for t in df_text: |
| | toks = tokenize(t) |
| | for i in range(len(toks)-n+1): |
| | ngrams.update([" ".join(toks[i:i+n])]) |
| | return ngrams.most_common(top_k) |
| |
|
| | |
| | def apply_keyword_filter(df, tcol, mode, kw_text): |
| | if not kw_text or not isinstance(kw_text, str) or kw_text.strip() == "": |
| | return df.copy(), None |
| | kws = [k.strip() for k in re.split(r"[,\\n]+", kw_text) if k.strip()] |
| | if len(kws) == 0: |
| | return df.copy(), None |
| | s = df[tcol].astype(str).fillna("") |
| | if mode == "Any keyword (OR)": |
| | mask = s.str.contains("|".join([re.escape(k) for k in kws]), case=False, na=False) |
| | elif mode == "All keywords (AND)": |
| | mask = pd.Series(True, index=s.index) |
| | for k in kws: |
| | mask &= s.str.contains(re.escape(k), case=False, na=False) |
| | else: |
| | try: |
| | mask = s.str.contains(kw_text, case=False, na=False, regex=True) |
| | except Exception: |
| | mask = pd.Series(False, index=s.index) |
| | return df[mask].copy(), kws |
| |
|
| | def apply_date_range(df, dcol, start, end): |
| | if not dcol: |
| | return df |
| | if start: |
| | start_dt = pd.to_datetime(start, errors="coerce") |
| | df = df[pd.to_datetime(df[dcol], errors="coerce") >= start_dt] |
| | if end: |
| | end_dt = pd.to_datetime(end, errors="coerce") |
| | df = df[pd.to_datetime(df[dcol], errors="coerce") <= end_dt] |
| | return df |
| |
|
| | |
| | def _draw_wrapped_text(c, text, x, y, max_width_cm=17, leading=14): |
| | from reportlab.lib.styles import getSampleStyleSheet |
| | from reportlab.platypus import Paragraph |
| | from reportlab.lib.units import cm |
| | from reportlab.lib.styles import ParagraphStyle |
| | from reportlab.lib import colors |
| | style = ParagraphStyle(name="Body", fontName="Helvetica", fontSize=10, leading=leading, textColor=colors.black) |
| | from reportlab.platypus import Frame |
| | frame = Frame(x*cm, y*cm, max_width_cm*cm, 100*cm, showBoundary=0) |
| | story = [Paragraph(text.replace("\\n","<br/>"), style)] |
| | frame.addFromList(story, c) |
| |
|
| | def build_pdf_report(out_path, title, meta, trend_img, pie_img, terms, ngrams): |
| | c = canvas.Canvas(out_path, pagesize=A4) |
| | W, H = A4 |
| | |
| | c.setFont("Helvetica-Bold", 16) |
| | c.drawString(2*cm, H-2*cm, title) |
| | c.setFont("Helvetica", 10) |
| | y = H-3*cm |
| | for line in meta: |
| | c.drawString(2*cm, y, line) |
| | y -= 0.6*cm |
| | c.showPage() |
| |
|
| | |
| | if trend_img and os.path.exists(trend_img): |
| | c.drawString(2*cm, H-2*cm, "Sentiment Trend") |
| | img = ImageReader(trend_img) |
| | c.drawImage(img, 2*cm, 4*cm, width=W-4*cm, height=H-7*cm, preserveAspectRatio=True, anchor='c') |
| | c.showPage() |
| |
|
| | |
| | if pie_img and os.path.exists(pie_img): |
| | c.drawString(2*cm, H-2*cm, "Sentiment Distribution") |
| | img = ImageReader(pie_img) |
| | c.drawImage(img, 2*cm, 6*cm, width=W-4*cm, height=H-9*cm, preserveAspectRatio=True, anchor='c') |
| | c.showPage() |
| |
|
| | |
| | c.setFont("Helvetica-Bold", 12) |
| | c.drawString(2*cm, H-2*cm, "Top Terms / Hashtags / Mentions") |
| | c.setFont("Helvetica", 10) |
| | y = H-3*cm |
| | for sec_title, pairs in terms.items(): |
| | c.setFont("Helvetica-Bold", 11) |
| | c.drawString(2*cm, y, sec_title) |
| | y -= 0.5*cm |
| | c.setFont("Helvetica", 10) |
| | for w, cnt in pairs[:25]: |
| | c.drawString(2.8*cm, y, f"- {w}: {cnt}") |
| | y -= 0.45*cm |
| | if y < 3*cm: |
| | c.showPage() |
| | y = H-2*cm |
| | y -= 0.3*cm |
| | if y < 3*cm: |
| | c.showPage() |
| | y = H-2*cm |
| | |
| | c.setFont("Helvetica-Bold", 12) |
| | c.drawString(2*cm, H-2*cm, "Top Bigrams") |
| | c.setFont("Helvetica", 10) |
| | y = H-3*cm |
| | for w, cnt in ngrams[:25]: |
| | c.drawString(2.8*cm, y, f"- {w}: {cnt}") |
| | y -= 0.45*cm |
| | if y < 3*cm: |
| | c.showPage() |
| | y = H-2*cm |
| |
|
| | c.save() |
| | return out_path |
| |
|
| | |
| | with gr.Blocks(title="Advanced Sentiment Trend Analyzer") as demo: |
| | gr.Markdown("# 📈 Advanced Customer Sentiment Trend Analyzer\nIndustry-grade tool for tracking sentiment over time using Sentiment140 or similar datasets.") |
| |
|
| | with gr.Row(): |
| | with gr.Column(): |
| | file = gr.File(label="Upload Sentiment140 CSV (or similar). 6 columns expected.", file_count="single", file_types=[".csv"]) |
| | engine = gr.Radio(choices=["VADER (fast)", "RoBERTa (accurate)"], value="VADER (fast)", label="Sentiment Engine") |
| | text_col = gr.Dropdown(label="Text column", choices=[], value=None) |
| | date_col = gr.Dropdown(label="Date column", choices=[], value=None, allow_custom_value=True) |
| |
|
| | gr.Markdown("### Filters") |
| | kw_text = gr.Textbox(label="Keyword filter (comma-separated OR regex)", placeholder="e.g., refund, delayed OR ^outage|downtime", lines=2) |
| | kw_mode = gr.Radio(choices=["Any keyword (OR)", "All keywords (AND)", "Regex"], value="Any keyword (OR)", label="Keyword mode") |
| | start_date = gr.Textbox(label="Start date (YYYY-MM-DD)", placeholder="e.g., 2009-04-06") |
| | end_date = gr.Textbox(label="End date (YYYY-MM-DD)", placeholder="e.g., 2009-04-20") |
| |
|
| | gr.Markdown("### Time Series") |
| | agg_freq = gr.Radio(choices=["D","W","M"], value="D", label="Aggregate by (D/W/M)") |
| | ma_window = gr.Slider(3, 60, value=7, step=1, label="Moving average window (days)") |
| | show_ci = gr.Checkbox(value=True, label="Show 95% confidence band") |
| | z_window = gr.Slider(7, 90, value=21, step=1, label="Anomaly rolling window") |
| | z_thresh = gr.Slider(1.5, 4.0, value=2.5, step=0.1, label="Anomaly z-score threshold") |
| | cp_penalty = gr.Slider(2, 20, value=6, step=1, label="Change-point penalty (higher=fewer)") |
| |
|
| | gr.Markdown("### Insights") |
| | top_k = gr.Slider(5, 50, value=20, step=1, label="Top tokens/hashtags/mentions") |
| | gen_ngrams = gr.Checkbox(value=True, label="Show Top Bigrams") |
| |
|
| | run = gr.Button("Run Analysis 🚀", variant="primary") |
| | with gr.Column(): |
| | trend_img = gr.Image(label="Trend Chart", type="filepath") |
| | pie_img = gr.Image(label="Sentiment Distribution", type="filepath") |
| | terms_md = gr.Markdown(label="Top Terms / Hashtags / Mentions") |
| | ngrams_md = gr.Markdown(label="Top Bigrams") |
| | debug_md = gr.Markdown(label="Debug Info") |
| | export = gr.File(label="Download Enriched CSV") |
| | pdf_out = gr.File(label="Download PDF Report") |
| |
|
| | def on_upload(f): |
| | if f is None: |
| | return gr.update(choices=[], value=None), gr.update(choices=[], value=None) |
| | df = read_csv_safe(f.name) |
| | df = coerce_sentiment140(df) |
| | cols = df.columns.tolist() |
| | text_guess = "text" if "text" in cols else (cols[-1] if cols else None) |
| | date_guess = "date" if "date" in cols else None |
| | return gr.update(choices=cols, value=text_guess), gr.update(choices=cols, value=date_guess) |
| |
|
| | file.change(on_upload, inputs=[file], outputs=[text_col, date_col]) |
| |
|
| | def run_pipeline(f, eng, tcol, dcol, kwtext, kwmode, sd, ed, freq, maw, showci, zwin, zthr, cpp, topk, want_ngrams): |
| | if f is None: |
| | raise gr.Error("Please upload a CSV.") |
| | try: |
| | df = read_csv_safe(f.name) |
| | df = coerce_sentiment140(df) |
| | cols = df.columns.tolist() |
| | if tcol not in cols: |
| | raise gr.Error(f"Text column '{tcol}' not in {cols}") |
| | if dcol and dcol not in cols: |
| | raise gr.Error(f"Date column '{dcol}' not in {cols}") |
| | |
| | if dcol: |
| | df[dcol] = pd.to_datetime(df[dcol], errors="coerce") |
| | |
| | df, used_kws = apply_keyword_filter(df, tcol, kwmode, kwtext) |
| | |
| | df = apply_date_range(df, dcol, sd, ed) |
| | if df.empty: |
| | raise gr.Error("No rows after applying filters. Relax filters or clear them.") |
| | |
| | if eng.startswith("VADER"): |
| | df["_score"] = df[tcol].astype(str).apply(vader_score) |
| | else: |
| | pipe = get_roberta_pipeline() |
| | texts = df[tcol].astype(str).tolist() |
| | scores = [] |
| | batch = 64 |
| | for i in range(0, len(texts), batch): |
| | chunk = texts[i:i+batch] |
| | res = pipe(chunk, truncation=True) |
| | for r in res: |
| | lbl, sc = r["label"].upper(), float(r["score"]) |
| | if "NEG" in lbl: |
| | scores.append(-sc) |
| | elif "POS" in lbl: |
| | scores.append(sc) |
| | else: |
| | scores.append(0.0) |
| | df["_score"] = scores |
| | df["_label"] = df["_score"].apply(classify_label) |
| |
|
| | if not dcol: |
| | raise gr.Error("Please choose a date column for trend analysis.") |
| | agg = aggregate_ts(df, dcol, "_score", freq=freq, ma_window=int(maw), ci=showci) |
| | anoms = rolling_z_anomalies(agg["ma"], window=int(zwin), z=float(zthr)) |
| | cps = changepoints(agg["ma"], penalty=int(cpp)) |
| | trend_path = plot_trend(agg, title=f"Sentiment Trend ({eng}, {freq}-agg, MA={maw})", show_ci=showci, anomalies=anoms, cps=cps) |
| | pie_path = plot_pie(df["_label"], title="Overall Sentiment Distribution") |
| |
|
| | |
| | tok_top, hash_top, ment_top = top_terms(df[tcol], top_k=int(topk)) |
| | terms_lines = ["### Top Tokens", ""] + [f"- {w}: {c}" for w,c in tok_top] |
| | terms_lines += ["", "### Top Hashtags", ""] + [f"- {w}: {c}" for w,c in hash_top] |
| | terms_lines += ["", "### Top Mentions", ""] + [f"- {w}: {c}" for w,c in ment_top] |
| | terms_md = "\n".join(terms_lines) |
| |
|
| | |
| | if want_ngrams: |
| | ng = ngram_top(df[tcol], n=2, top_k=15) |
| | ngrams_md = "### Top Bigrams\n\n" + "\n".join([f"- {w}: {c}" for w,c in ng]) |
| | ng_list = ng |
| | else: |
| | ngrams_md = "### Top Bigrams\n\n(Disabled)" |
| | ng_list = [] |
| |
|
| | |
| | export_path = "enriched_sentiment.csv" |
| | df.to_csv(export_path, index=False) |
| |
|
| | |
| | meta = [ |
| | f"Engine: {eng}", |
| | f"Rows (after filters): {len(df)}", |
| | f"Date agg: {freq}, MA window: {maw}, CI: {bool(showci)}", |
| | f"Anomaly window: {zwin}, z-threshold: {zthr}, CP penalty: {cpp}", |
| | f"Filters: keywords={kwtext or 'None'} mode={kwmode}; date_range={sd or 'N/A'} to {ed or 'N/A'}", |
| | ] |
| | terms_dict = {"Top Tokens": tok_top, "Top Hashtags": hash_top, "Top Mentions": ment_top} |
| | pdf_path = "sentiment_report.pdf" |
| | build_pdf_report(pdf_path, "Customer Sentiment Trend Report", meta, trend_path, pie_path, terms_dict, ng_list) |
| |
|
| | dbg = "#### Data shape\n" + str(df.shape) + "\n\n#### Columns\n" + str(df.dtypes) + "\n" |
| | return trend_path, pie_path, terms_md, ngrams_md, dbg, export_path, pdf_path |
| | except Exception as e: |
| | tb = traceback.format_exc() |
| | print(tb, file=sys.stderr) |
| | raise gr.Error(f"RuntimeError: {type(e).__name__}: {e}") |
| |
|
| | run.click( |
| | run_pipeline, |
| | inputs=[file, engine, text_col, date_col, kw_text, kw_mode, start_date, end_date, agg_freq, ma_window, show_ci, z_window, z_thresh, cp_penalty, top_k, gen_ngrams], |
| | outputs=[trend_img, pie_img, terms_md, ngrams_md, debug_md, export, pdf_out] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | port = int(os.environ.get("PORT", "7860")) |
| | demo.launch(server_name="0.0.0.0", server_port=port) |
| |
|