gopalaKrishna1236's picture
Upload 2 files
364a23f verified
import os
import io
import re
import sys
import uuid
import math
import traceback
from datetime import datetime
import numpy as np
import pandas as pd
# Headless matplotlib
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import gradio as gr
# ------------------ NLP / Modeling ------------------
import nltk
from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer
# Transformers sentiment (optional: advanced)
from transformers import pipeline
# Time-series & stats
import ruptures as rpt
# PDF reporting
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.units import cm
from reportlab.lib.utils import ImageReader
# ------------------ NLTK bootstrap ------------------
def _ensure_nltk():
try:
nltk.data.find("tokenizers/punkt")
except LookupError:
nltk.download("punkt", quiet=True)
try:
nltk.data.find("corpora/stopwords")
except LookupError:
nltk.download("stopwords", quiet=True)
try:
nltk.data.find("sentiment/vader_lexicon.zip")
except LookupError:
nltk.download("vader_lexicon", quiet=True)
_ensure_nltk()
try:
EN_STOPWORDS = set(stopwords.words("english"))
except Exception:
EN_STOPWORDS = set()
def init_vader():
try:
return SentimentIntensityAnalyzer()
except Exception:
nltk.download("vader_lexicon", quiet=True)
return SentimentIntensityAnalyzer()
VADER = init_vader()
# ------------------ Transformers init (lazy) ------------------
_cached_pipe = None
def get_roberta_pipeline():
global _cached_pipe
if _cached_pipe is None:
model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"
try:
_cached_pipe = pipeline("sentiment-analysis", model=model_name, tokenizer=model_name, truncation=True)
except Exception:
model_name = "cardiffnlp/twitter-roberta-base-sentiment"
_cached_pipe = pipeline("sentiment-analysis", model=model_name, tokenizer=model_name, truncation=True)
return _cached_pipe
# ------------------ Helpers ------------------
TOKEN_PATTERN = re.compile(r"[A-Za-z']+")
URL_RE = re.compile(r"https?://\S+")
def tokenize(text: str):
if not isinstance(text, str):
text = "" if pd.isna(text) else str(text)
text = URL_RE.sub("", text)
toks = [t.lower() for t in TOKEN_PATTERN.findall(text)]
toks = [t for t in toks if t not in EN_STOPWORDS and len(t) > 1]
return toks
def read_csv_safe(path):
last_err = None
for enc in [None, "utf-8", "utf-8-sig", "latin-1"]:
try:
if enc is None:
return pd.read_csv(path, header=None)
return pd.read_csv(path, header=None, encoding=enc)
except Exception as e:
last_err = e
raise last_err
def coerce_sentiment140(df):
if df.shape[1] >= 6:
df = df.iloc[:, :6]
df.columns = ["target", "ids", "date", "flag", "user", "text"]
return df
def vader_score(text):
vs = VADER.polarity_scores(text if isinstance(text, str) else "")
return vs["compound"]
def classify_label(score, pos_thr=0.05, neg_thr=-0.05):
if score >= pos_thr:
return "Positive"
elif score <= neg_thr:
return "Negative"
else:
return "Neutral"
def aggregate_ts(df, date_col, score_col, freq="D", ma_window=7, ci=True):
s = df[[date_col, score_col]].dropna()
s[date_col] = pd.to_datetime(s[date_col], errors="coerce")
s = s.dropna(subset=[date_col])
s = s.set_index(date_col).sort_index()
agg = s.resample(freq).mean()
if ma_window and ma_window > 1:
agg["ma"] = agg[score_col].rolling(ma_window, min_periods=1).mean()
else:
agg["ma"] = agg[score_col]
if ci:
std = agg[score_col].rolling(ma_window, min_periods=2).std(ddof=1)
n = s.resample(freq).count()[score_col].rolling(ma_window, min_periods=1).sum()
se = std / np.sqrt(np.maximum(n, 1))
agg["ci_low"] = agg["ma"] - 1.96 * se
agg["ci_high"] = agg["ma"] + 1.96 * se
return agg
def rolling_z_anomalies(series, window=14, z=2.5):
x = series.values.astype(float)
if len(x) < max(5, window):
return np.array([False]*len(x))
roll_mean = pd.Series(x).rolling(window, min_periods=5).mean()
roll_std = pd.Series(x).rolling(window, min_periods=5).std(ddof=1)
zscores = (pd.Series(x) - roll_mean) / (roll_std.replace(0, np.nan))
return (zscores.abs() >= z).fillna(False).values
def changepoints(series, penalty=6):
x = series.dropna().values.astype(float)
if len(x) < 10:
return []
algo = rpt.Pelt(model="rbf").fit(x)
try:
result = algo.predict(pen=penalty)
except Exception:
return []
cps = [series.index[min(len(series)-1, i-1)] for i in result[:-1]]
return cps
def _save_fig(fig, name):
os.makedirs("charts", exist_ok=True)
path = os.path.join("charts", f"{name}_{uuid.uuid4().hex}.png")
fig.savefig(path, format="png", dpi=150, bbox_inches="tight")
plt.close(fig)
return path
def plot_trend(agg, title="Sentiment Trend", show_ci=True, anomalies=None, cps=None):
fig = plt.figure()
ax = plt.gca()
ax.plot(agg.index, agg["ma"], label="Moving Avg")
ax.plot(agg.index, agg.iloc[:,0], alpha=0.3, label="Mean")
if show_ci and "ci_low" in agg and "ci_high" in agg:
ax.fill_between(agg.index, agg["ci_low"], agg["ci_high"], alpha=0.2, label="95% CI")
if anomalies is not None and anomalies.any():
ax.scatter(agg.index[anomalies], agg["ma"][anomalies], marker="x", s=40, label="Anomaly")
if cps:
for cp in cps:
ax.axvline(cp, linestyle="--", alpha=0.6, label="Change-point")
ax.set_title(title)
ax.set_ylabel("Sentiment (−1 to 1)")
ax.set_xlabel("Date")
ax.legend(loc="best")
fig.autofmt_xdate()
return _save_fig(fig, "trend")
def plot_pie(series, title="Sentiment Distribution"):
counts = series.value_counts()
fig = plt.figure()
plt.pie(counts.values, labels=counts.index, autopct="%1.1f%%", startangle=90)
plt.title(title)
return _save_fig(fig, "pie")
def top_terms(df_text, top_k=20):
from collections import Counter
tokens = []
hashtags = []
mentions = []
for t in df_text:
if not isinstance(t, str):
continue
hashtags += [h.lower() for h in re.findall(r"#\w+", t)]
mentions += [m.lower() for m in re.findall(r"@\w+", t)]
tokens += tokenize(t)
tok_top = Counter(tokens).most_common(top_k)
hash_top = Counter(hashtags).most_common(top_k)
ment_top = Counter(mentions).most_common(top_k)
return tok_top, hash_top, ment_top
def ngram_top(df_text, n=2, top_k=15):
from collections import Counter
ngrams = Counter()
for t in df_text:
toks = tokenize(t)
for i in range(len(toks)-n+1):
ngrams.update([" ".join(toks[i:i+n])])
return ngrams.most_common(top_k)
# ------------------ Filters ------------------
def apply_keyword_filter(df, tcol, mode, kw_text):
if not kw_text or not isinstance(kw_text, str) or kw_text.strip() == "":
return df.copy(), None
kws = [k.strip() for k in re.split(r"[,\\n]+", kw_text) if k.strip()]
if len(kws) == 0:
return df.copy(), None
s = df[tcol].astype(str).fillna("")
if mode == "Any keyword (OR)":
mask = s.str.contains("|".join([re.escape(k) for k in kws]), case=False, na=False)
elif mode == "All keywords (AND)":
mask = pd.Series(True, index=s.index)
for k in kws:
mask &= s.str.contains(re.escape(k), case=False, na=False)
else: # Regex
try:
mask = s.str.contains(kw_text, case=False, na=False, regex=True)
except Exception:
mask = pd.Series(False, index=s.index)
return df[mask].copy(), kws
def apply_date_range(df, dcol, start, end):
if not dcol:
return df
if start:
start_dt = pd.to_datetime(start, errors="coerce")
df = df[pd.to_datetime(df[dcol], errors="coerce") >= start_dt]
if end:
end_dt = pd.to_datetime(end, errors="coerce")
df = df[pd.to_datetime(df[dcol], errors="coerce") <= end_dt]
return df
# ------------------ PDF Report ------------------
def _draw_wrapped_text(c, text, x, y, max_width_cm=17, leading=14):
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph
from reportlab.lib.units import cm
from reportlab.lib.styles import ParagraphStyle
from reportlab.lib import colors
style = ParagraphStyle(name="Body", fontName="Helvetica", fontSize=10, leading=leading, textColor=colors.black)
from reportlab.platypus import Frame
frame = Frame(x*cm, y*cm, max_width_cm*cm, 100*cm, showBoundary=0)
story = [Paragraph(text.replace("\\n","<br/>"), style)]
frame.addFromList(story, c)
def build_pdf_report(out_path, title, meta, trend_img, pie_img, terms, ngrams):
c = canvas.Canvas(out_path, pagesize=A4)
W, H = A4
# Cover
c.setFont("Helvetica-Bold", 16)
c.drawString(2*cm, H-2*cm, title)
c.setFont("Helvetica", 10)
y = H-3*cm
for line in meta:
c.drawString(2*cm, y, line)
y -= 0.6*cm
c.showPage()
# Trend
if trend_img and os.path.exists(trend_img):
c.drawString(2*cm, H-2*cm, "Sentiment Trend")
img = ImageReader(trend_img)
c.drawImage(img, 2*cm, 4*cm, width=W-4*cm, height=H-7*cm, preserveAspectRatio=True, anchor='c')
c.showPage()
# Pie
if pie_img and os.path.exists(pie_img):
c.drawString(2*cm, H-2*cm, "Sentiment Distribution")
img = ImageReader(pie_img)
c.drawImage(img, 2*cm, 6*cm, width=W-4*cm, height=H-9*cm, preserveAspectRatio=True, anchor='c')
c.showPage()
# Terms
c.setFont("Helvetica-Bold", 12)
c.drawString(2*cm, H-2*cm, "Top Terms / Hashtags / Mentions")
c.setFont("Helvetica", 10)
y = H-3*cm
for sec_title, pairs in terms.items():
c.setFont("Helvetica-Bold", 11)
c.drawString(2*cm, y, sec_title)
y -= 0.5*cm
c.setFont("Helvetica", 10)
for w, cnt in pairs[:25]:
c.drawString(2.8*cm, y, f"- {w}: {cnt}")
y -= 0.45*cm
if y < 3*cm:
c.showPage()
y = H-2*cm
y -= 0.3*cm
if y < 3*cm:
c.showPage()
y = H-2*cm
# Bigrams
c.setFont("Helvetica-Bold", 12)
c.drawString(2*cm, H-2*cm, "Top Bigrams")
c.setFont("Helvetica", 10)
y = H-3*cm
for w, cnt in ngrams[:25]:
c.drawString(2.8*cm, y, f"- {w}: {cnt}")
y -= 0.45*cm
if y < 3*cm:
c.showPage()
y = H-2*cm
c.save()
return out_path
# ------------------ Gradio UI ------------------
with gr.Blocks(title="Advanced Sentiment Trend Analyzer") as demo:
gr.Markdown("# 📈 Advanced Customer Sentiment Trend Analyzer\nIndustry-grade tool for tracking sentiment over time using Sentiment140 or similar datasets.")
with gr.Row():
with gr.Column():
file = gr.File(label="Upload Sentiment140 CSV (or similar). 6 columns expected.", file_count="single", file_types=[".csv"])
engine = gr.Radio(choices=["VADER (fast)", "RoBERTa (accurate)"], value="VADER (fast)", label="Sentiment Engine")
text_col = gr.Dropdown(label="Text column", choices=[], value=None)
date_col = gr.Dropdown(label="Date column", choices=[], value=None, allow_custom_value=True)
gr.Markdown("### Filters")
kw_text = gr.Textbox(label="Keyword filter (comma-separated OR regex)", placeholder="e.g., refund, delayed OR ^outage|downtime", lines=2)
kw_mode = gr.Radio(choices=["Any keyword (OR)", "All keywords (AND)", "Regex"], value="Any keyword (OR)", label="Keyword mode")
start_date = gr.Textbox(label="Start date (YYYY-MM-DD)", placeholder="e.g., 2009-04-06")
end_date = gr.Textbox(label="End date (YYYY-MM-DD)", placeholder="e.g., 2009-04-20")
gr.Markdown("### Time Series")
agg_freq = gr.Radio(choices=["D","W","M"], value="D", label="Aggregate by (D/W/M)")
ma_window = gr.Slider(3, 60, value=7, step=1, label="Moving average window (days)")
show_ci = gr.Checkbox(value=True, label="Show 95% confidence band")
z_window = gr.Slider(7, 90, value=21, step=1, label="Anomaly rolling window")
z_thresh = gr.Slider(1.5, 4.0, value=2.5, step=0.1, label="Anomaly z-score threshold")
cp_penalty = gr.Slider(2, 20, value=6, step=1, label="Change-point penalty (higher=fewer)")
gr.Markdown("### Insights")
top_k = gr.Slider(5, 50, value=20, step=1, label="Top tokens/hashtags/mentions")
gen_ngrams = gr.Checkbox(value=True, label="Show Top Bigrams")
run = gr.Button("Run Analysis 🚀", variant="primary")
with gr.Column():
trend_img = gr.Image(label="Trend Chart", type="filepath")
pie_img = gr.Image(label="Sentiment Distribution", type="filepath")
terms_md = gr.Markdown(label="Top Terms / Hashtags / Mentions")
ngrams_md = gr.Markdown(label="Top Bigrams")
debug_md = gr.Markdown(label="Debug Info")
export = gr.File(label="Download Enriched CSV")
pdf_out = gr.File(label="Download PDF Report")
def on_upload(f):
if f is None:
return gr.update(choices=[], value=None), gr.update(choices=[], value=None)
df = read_csv_safe(f.name)
df = coerce_sentiment140(df)
cols = df.columns.tolist()
text_guess = "text" if "text" in cols else (cols[-1] if cols else None)
date_guess = "date" if "date" in cols else None
return gr.update(choices=cols, value=text_guess), gr.update(choices=cols, value=date_guess)
file.change(on_upload, inputs=[file], outputs=[text_col, date_col])
def run_pipeline(f, eng, tcol, dcol, kwtext, kwmode, sd, ed, freq, maw, showci, zwin, zthr, cpp, topk, want_ngrams):
if f is None:
raise gr.Error("Please upload a CSV.")
try:
df = read_csv_safe(f.name)
df = coerce_sentiment140(df)
cols = df.columns.tolist()
if tcol not in cols:
raise gr.Error(f"Text column '{tcol}' not in {cols}")
if dcol and dcol not in cols:
raise gr.Error(f"Date column '{dcol}' not in {cols}")
# Parse date column early for filters
if dcol:
df[dcol] = pd.to_datetime(df[dcol], errors="coerce")
# Keyword filter
df, used_kws = apply_keyword_filter(df, tcol, kwmode, kwtext)
# Date range filter
df = apply_date_range(df, dcol, sd, ed)
if df.empty:
raise gr.Error("No rows after applying filters. Relax filters or clear them.")
# Scoring
if eng.startswith("VADER"):
df["_score"] = df[tcol].astype(str).apply(vader_score)
else:
pipe = get_roberta_pipeline()
texts = df[tcol].astype(str).tolist()
scores = []
batch = 64
for i in range(0, len(texts), batch):
chunk = texts[i:i+batch]
res = pipe(chunk, truncation=True)
for r in res:
lbl, sc = r["label"].upper(), float(r["score"])
if "NEG" in lbl:
scores.append(-sc)
elif "POS" in lbl:
scores.append(sc)
else:
scores.append(0.0)
df["_score"] = scores
df["_label"] = df["_score"].apply(classify_label)
if not dcol:
raise gr.Error("Please choose a date column for trend analysis.")
agg = aggregate_ts(df, dcol, "_score", freq=freq, ma_window=int(maw), ci=showci)
anoms = rolling_z_anomalies(agg["ma"], window=int(zwin), z=float(zthr))
cps = changepoints(agg["ma"], penalty=int(cpp))
trend_path = plot_trend(agg, title=f"Sentiment Trend ({eng}, {freq}-agg, MA={maw})", show_ci=showci, anomalies=anoms, cps=cps)
pie_path = plot_pie(df["_label"], title="Overall Sentiment Distribution")
# Terms
tok_top, hash_top, ment_top = top_terms(df[tcol], top_k=int(topk))
terms_lines = ["### Top Tokens", ""] + [f"- {w}: {c}" for w,c in tok_top]
terms_lines += ["", "### Top Hashtags", ""] + [f"- {w}: {c}" for w,c in hash_top]
terms_lines += ["", "### Top Mentions", ""] + [f"- {w}: {c}" for w,c in ment_top]
terms_md = "\n".join(terms_lines)
# N-grams
if want_ngrams:
ng = ngram_top(df[tcol], n=2, top_k=15)
ngrams_md = "### Top Bigrams\n\n" + "\n".join([f"- {w}: {c}" for w,c in ng])
ng_list = ng
else:
ngrams_md = "### Top Bigrams\n\n(Disabled)"
ng_list = []
# Export CSV
export_path = "enriched_sentiment.csv"
df.to_csv(export_path, index=False)
# Build PDF
meta = [
f"Engine: {eng}",
f"Rows (after filters): {len(df)}",
f"Date agg: {freq}, MA window: {maw}, CI: {bool(showci)}",
f"Anomaly window: {zwin}, z-threshold: {zthr}, CP penalty: {cpp}",
f"Filters: keywords={kwtext or 'None'} mode={kwmode}; date_range={sd or 'N/A'} to {ed or 'N/A'}",
]
terms_dict = {"Top Tokens": tok_top, "Top Hashtags": hash_top, "Top Mentions": ment_top}
pdf_path = "sentiment_report.pdf"
build_pdf_report(pdf_path, "Customer Sentiment Trend Report", meta, trend_path, pie_path, terms_dict, ng_list)
dbg = "#### Data shape\n" + str(df.shape) + "\n\n#### Columns\n" + str(df.dtypes) + "\n"
return trend_path, pie_path, terms_md, ngrams_md, dbg, export_path, pdf_path
except Exception as e:
tb = traceback.format_exc()
print(tb, file=sys.stderr)
raise gr.Error(f"RuntimeError: {type(e).__name__}: {e}")
run.click(
run_pipeline,
inputs=[file, engine, text_col, date_col, kw_text, kw_mode, start_date, end_date, agg_freq, ma_window, show_ci, z_window, z_thresh, cp_penalty, top_k, gen_ngrams],
outputs=[trend_img, pie_img, terms_md, ngrams_md, debug_md, export, pdf_out]
)
if __name__ == "__main__":
port = int(os.environ.get("PORT", "7860"))
demo.launch(server_name="0.0.0.0", server_port=port)