News_RAG_Chat / rag_app.py
Darian Sawali
modified file rag
3bb0ff3
# -*- coding: utf-8 -*-
"""News_RAG_Chatbot_Regional.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1ponOTh-xbMXz2dwwW7LhYlT7q-dgyyLy
"""
import re
import requests
import feedparser
from bs4 import BeautifulSoup
from datetime import timezone
from dateutil import parser as dateparser
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import faiss
from transformers import GPT2LMHeadModel, GPT2Tokenizer
# Load embedding + GPT-2 models (same as notebook)
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
tokenizer = GPT2Tokenizer.from_pretrained("openai-community/gpt2-large")
gpt2 = GPT2LMHeadModel.from_pretrained("openai-community/gpt2-large")
tokenizer.pad_token = tokenizer.eos_token
print("Models loaded.")
def clean_text(raw_html: str) -> str:
"""
Clean RSS summary / HTML content:
- remove tags
- drop images (and their alt text/captions)
- collapse whitespace
"""
if not raw_html:
return ""
soup = BeautifulSoup(raw_html, "html.parser")
for img in soup.find_all("img"):
img.decompose()
text = soup.get_text(separator=" ")
text = re.sub(r"\s+", " ", text).strip()
return text
def safe_feedparse(url, timeout=5):
"""Fetch RSS safely with timeout and error handling."""
try:
resp = requests.get(url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"})
resp.raise_for_status()
return feedparser.parse(resp.text)
except Exception as e:
print("Failed to fetch:", url, "→", e)
return feedparser.parse("")
def fetch_articles_from_rss(source_name, feed_url, max_articles=20):
feed = safe_feedparse(feed_url)
print(source_name, "| entries in feed:", len(feed.entries))
docs = []
for entry in feed.entries[:max_articles]:
title = entry.get("title", "").strip()
summary = entry.get("summary", "").strip()
url = entry.get("link", "")
clean_summary = clean_text(summary)
text = clean_summary or title
if not text:
continue
docs.append({
"source": source_name,
"title": title,
"text": text,
"url": url,
"published": entry.get("published", "")
})
return docs
NEWS_SOURCES = {
"Global News BC": "https://globalnews.ca/bc/feed/",
"CBC BC": "https://www.cbc.ca/webfeed/rss/rss-canada-britishcolumbia",
"The Province": "https://theprovince.com/feed",
"CBC Canada": "https://www.cbc.ca/webfeed/rss/rss-canada",
"CNN World": "http://rss.cnn.com/rss/edition_world.rss",
"BBC World": "http://feeds.bbci.co.uk/news/world/rss.xml",
}
all_documents = []
for name, url in NEWS_SOURCES.items():
docs = fetch_articles_from_rss(name, url, max_articles=50)
print(" → Retrieved:", len(docs))
all_documents.extend(docs)
print("Total articles:", len(all_documents))
# Just evaluating this in the original notebook; harmless here:
len(all_documents), all_documents[0] if all_documents else None
corpus_texts = [
doc["title"] + "\n\n" + doc["text"]
for doc in all_documents
]
corpus_embeddings = embed_model.encode(corpus_texts, convert_to_numpy=True).astype("float32")
embedding_dim = corpus_embeddings.shape[1]
index = faiss.IndexFlatL2(embedding_dim)
index.add(corpus_embeddings)
print("FAISS index ready. Documents indexed:", len(corpus_texts))
REGION_KEYWORDS = {
"vancouver": [
"vancouver", "downtown vancouver", "granville", "kitsilano",
"mount pleasant", "gastown", "yvr"
],
"bc": [
"b.c.", "british columbia", "vancouver", "surrey", "burnaby",
"richmond", "delta", "coquitlam", "langley", "abbotsford",
"nanaimo", "kelowna", "kamloops", "prince george", "cranbrook",
],
}
def is_bc_news(doc):
title = doc.get("title", "").lower()
summary = doc.get("text", "").lower()
bc_keywords = [
"british columbia", "b.c.", " bc ",
"vancouver", "surrey", "burnaby", "richmond",
"coquitlam", "delta", "new westminster",
"north vancouver", "west vancouver",
"langley", "abbotsford", "chilliwack",
"victoria", "nanaimo", "kelowna", "kamloops",
"lytton", "cranbrook",
]
return any(k in title or k in summary for k in bc_keywords)
bc_documents = [d for d in all_documents if is_bc_news(d)]
print("Raw documents loaded:", len(all_documents))
print("BC / Vancouver filtered documents:", len(bc_documents))
from sklearn.neighbors import NearestNeighbors
bc_texts_for_index = [
(d.get("title", "") + " " + d.get("text", "")).strip()
for d in bc_documents
]
if bc_texts_for_index:
bc_embeddings = embed_model.encode(bc_texts_for_index, convert_to_numpy=True)
nn_model = NearestNeighbors(metric="cosine")
nn_model.fit(bc_embeddings)
print("Embeddings shape:", bc_embeddings.shape)
else:
bc_embeddings = None
nn_model = None
print("No BC docs available for indexing.")
print("Raw documents loaded:", len(all_documents))
bc_documents = [d for d in all_documents if is_bc_news(d)]
print("BC / Vancouver filtered documents:", len(bc_documents))
def doc_matches_region(doc, region: str) -> bool:
"""
Return True if this document looks related to the region.
Uses simple keyword search over title + text.
"""
if not region:
return True
region = region.lower()
keywords = REGION_KEYWORDS.get(region, [region])
haystack = (doc.get("title", "") + " " + doc.get("text", "")).lower()
return any(kw in haystack for kw in keywords)
def retrieve_docs(query, k=3):
"""
Retrieve top-k BC/Vancouver documents for a query using cosine similarity.
Uses bc_documents + bc_embeddings + nn_model.
"""
if not bc_documents or nn_model is None or bc_embeddings is None:
return []
q_emb = embed_model.encode([query], convert_to_numpy=True)
k_eff = min(k, len(bc_documents))
distances, indices = nn_model.kneighbors(q_emb, n_neighbors=k_eff)
docs = []
for dist, idx in zip(distances[0], indices[0]):
if 0 <= idx < len(bc_documents):
doc = bc_documents[idx].copy()
doc["distance"] = float(dist)
docs.append(doc)
return docs
def extractive_summary(text, n_sentences=2):
"""
Extractive summary using sentence embeddings.
Cleans HTML, avoids tiny fragments, keeps sentence order.
"""
if not text:
return ""
text_clean = re.sub(r"<[^>]+>", " ", text)
text_clean = re.sub(r'\s+', ' ', text_clean).strip()
sentences = re.split(r'(?<=[.!?])\s+', text_clean)
sentences = [s.strip() for s in sentences if len(s.strip()) > 5]
if not sentences:
return ""
if len(sentences) <= n_sentences:
return " ".join(sentences)
sent_embs = embed_model.encode(sentences, convert_to_numpy=True)
doc_emb = embed_model.encode([text_clean], convert_to_numpy=True)[0]
sims = cosine_similarity(sent_embs, doc_emb.reshape(1, -1)).flatten()
top_idx = np.argsort(sims)[-n_sentences:]
top_idx = sorted(top_idx)
return " ".join(sentences[i] for i in top_idx)
def summarize_doc(doc, n_sentences=2):
source = doc["source"]
title = doc["title"]
text = doc["text"]
url = doc.get("url", "")
short = extractive_summary(text, n_sentences=n_sentences)
if not short:
return "No content to summarize."
line = f"{short} (Source: {source}{title})"
if url:
line += f"\nLink: {url}"
return line
# Sample in notebook:
# sample = bc_documents[0]
# print(summarize_doc(sample))
def get_published_datetime(doc):
pub = doc.get("published", "")
if not pub:
return None
try:
dt = dateparser.parse(pub)
if dt is None:
return None
if dt.tzinfo is not None:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt
except Exception:
return None
def summarize_latest_news(n=5):
dated_docs = []
for d in bc_documents:
dt = get_published_datetime(d)
if dt is not None:
dated_docs.append((dt, d))
if not dated_docs:
return "No recent articles available."
dated_docs.sort(key=lambda x: x[0], reverse=True)
top_docs = [d for _, d in dated_docs[:n]]
summaries = []
for doc in top_docs:
summaries.append(summarize_doc(doc, n_sentences=2))
if not summaries:
return "No recent articles available."
return "\n\n".join(summaries)
# print(summarize_latest_news(n=5))
def search_articles(query, k=5):
retrieved = retrieve_docs(query, k=k)
if len(retrieved) == 0:
print("No relevant news found for this query.")
return []
for idx, doc in enumerate(retrieved):
print(f"[{idx}] {doc['title']} ({doc['source']})")
if doc.get("published"):
print(" Published:", doc["published"])
if doc.get("url"):
print(" Link:", doc["url"])
print()
return retrieved
# candidates = search_articles("pipeline northern BC", k=5)
def tidy_summary(text: str) -> str:
"""
Light cleanup for readability.
- Fix common run-ons like 'says The' → 'says. The'
- Collapse weird extra spaces
"""
text = re.sub(r"\b(says|said)\s+([A-Z])", r"\1. \2", text)
text = re.sub(r"\s+", " ", text).strip()
return text
def rewrite_with_gpt2_xl(extractive_text, max_new_tokens=80):
"""
Use GPT-2 XL to rewrite an extractive summary into a short, fluent paragraph.
We tell it explicitly NOT to add new information.
"""
text = (extractive_text or "").strip()
if len(text) < 10:
return extractive_text
prompt = (
"You are a news summarization assistant.\n"
"Rewrite the following text as a concise local news summary (2–3 sentences).\n"
"Keep all facts the same. Do NOT add any new information.\n\n"
"TEXT:\n"
f"{text}\n\n"
"SUMMARY:"
)
inputs = tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=512,
padding=True,
)
output_ids = gpt2.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=False,
repetition_penalty=1.2,
pad_token_id=tokenizer.eos_token_id,
)
generated = output_ids[0][inputs["input_ids"].shape[1]:]
summary = tokenizer.decode(generated, skip_special_tokens=True).strip()
if len(summary) < 20:
return extractive_text
return summary
def fused_summary_hybrid(docs, n_sentences=4, use_generative=True):
"""
1) Extractive summarization over retrieved docs (factual).
2) Optional GPT-2 XL rewrite to improve fluency.
3) Append source list.
"""
combined_text = " ".join(d["text"] for d in docs)
extracted = extractive_summary(combined_text, n_sentences=n_sentences)
extracted = re.sub(r'\s+', ' ', extracted).strip()
if use_generative:
rewritten = rewrite_with_gpt2_xl(extracted, max_new_tokens=96)
else:
rewritten = extracted
sources_block = "\n".join([
f"- {d['source']}{d['title']} ({d.get('url','')})"
for d in docs
])
final = f"{rewritten}\n\nSources:\n{sources_block}"
return final
def fused_summary(docs, n_sentences=4):
"""
Combine multiple articles into ONE clean extractive summary.
"""
if not docs:
return "No relevant news found for this query."
combined_text = " ".join(d["text"] for d in docs if d.get("text"))
if not combined_text.strip():
return "No relevant news content available."
extracted = extractive_summary(combined_text, n_sentences=n_sentences)
extracted = re.sub(r"\s+", " ", extracted).strip()
sources_block = "\n".join([
f"- {d['source']}{d['title']} ({d.get('url', '')})"
for d in docs
])
final = f"{extracted}\n\nSources:\n{sources_block}"
return final
def find_keyword_matches(query, docs, max_docs=10):
"""
Return docs that contain at least one non-trivial word from the query
in their title or text.
"""
q = query.lower()
tokens = [w for w in re.split(r"\W+", q) if len(w) > 3]
if not tokens:
return []
matches = []
for d in docs:
title = d.get("title", "").lower()
text = d.get("text", "").lower()
for t in tokens:
if t in title or t in text:
matches.append(d)
break
return matches[:max_docs]
STOPWORDS = {
"what", "happened", "story", "about", "the", "a", "an",
"in", "on", "of", "for", "to", "with", "recent", "latest",
"summarize", "summary", "tell", "me", "news", "case",
"issue", "situation", "update", "updates"
}
def get_strong_tokens(query: str):
"""
Extract 'strong' content words from the user query:
- lowercase
- at least 4 characters
- not in a small stopword list
"""
tokens = re.findall(r"[a-zA-Z]+", query.lower())
strong = [
t for t in tokens
if len(t) >= 4 and t not in STOPWORDS
]
return strong
def filter_docs_by_tokens(docs, strong_tokens, min_matches: int = 2):
"""
Keep only docs that contain at least `min_matches` of the strong tokens
in their title+text. If strong_tokens is empty, just return docs unchanged.
"""
if not strong_tokens:
return docs
filtered = []
for d in docs:
haystack = (d.get("title", "") + " " + d.get("text", "")).lower()
count = sum(1 for t in strong_tokens if t in haystack)
if count >= min_matches:
filtered.append(d)
return filtered
def title_keyword_search(strong_tokens, max_docs=3):
"""
Fallback: search ALL documents (BC + others) by keyword overlap
in TITLE + TEXT. Returns docs sorted by how many strong tokens they match.
"""
if not strong_tokens:
return []
candidates = []
for d in all_documents:
haystack = (d.get("title", "") + " " + d.get("text", "")).lower()
score = sum(1 for t in strong_tokens if t in haystack)
if score > 0:
candidates.append((score, d))
candidates.sort(key=lambda x: x[0], reverse=True)
return [d for score, d in candidates[:max_docs]]
def safe_retrieve(query, k=3):
"""
Try keyword-based retrieval first (good for very specific terms like 'pistachio').
If that fails, fall back to embedding-based nearest-neighbour search.
If that still fails, try a loose keyword fallback.
"""
keyword_matches = find_keyword_matches(query, bc_documents, max_docs=20)
if keyword_matches:
return keyword_matches[:k]
docs = retrieve_docs(query, k=k)
if docs:
return docs
keywords = query.lower().split()
alt_docs = []
for kw in keywords:
alt_docs.extend(retrieve_docs(kw, k=1))
seen = set()
unique = []
for d in alt_docs:
key = d.get("url") or d.get("title")
if key and key not in seen:
seen.add(key)
unique.append(d)
return unique[:k]
def summarize_query_with_rag(query, k=3):
retrieved = safe_retrieve(query, k=k)
if not retrieved:
return f"No articles match your query: “{query}”"
return fused_summary(retrieved, n_sentences=4)
def rewrite_with_gpt2(summary, max_new_tokens=40):
"""
Use GPT-2 to lightly rewrite the extractive summary for readability.
We prepend strong instructions to avoid adding new facts, but GPT-2 can still hallucinate.
"""
prompt = (
"Rewrite the following news summary in clear, fluent English. "
"Do NOT add any new information or details that are not already stated.\n\n"
f"SUMMARY:\n{summary}\n\nREWRITE:\n"
)
inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True)
output_ids = gpt2.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=False,
pad_token_id=tokenizer.eos_token_id,
)
generated = output_ids[0][inputs["input_ids"].shape[1]:]
rewritten = tokenizer.decode(generated, skip_special_tokens=True).strip()
return rewritten
def answer_news_query(user_query: str, k: int = 3):
"""
1) Retrieve top-k docs (semantic search).
2) Filter them using strong tokens from the query.
3) If that fails, do a title-based keyword search over all docs.
4) Fuse remaining docs into one extractive summary.
"""
docs = safe_retrieve(user_query, k=k)
if not docs:
return "No relevant news found for this query."
strong_tokens = get_strong_tokens(user_query)
filtered = filter_docs_by_tokens(docs, strong_tokens, min_matches=2)
if filtered:
docs_to_summarize = filtered
else:
title_docs = title_keyword_search(strong_tokens, max_docs=k)
if title_docs:
docs_to_summarize = title_docs
else:
docs_to_summarize = docs[:1]
final = fused_summary(docs_to_summarize, n_sentences=3)
return final
def show_sample_headlines(n=15):
print(f"Total articles loaded: {len(bc_documents)}\n")
for i, d in enumerate(bc_documents[:n]):
print(f"[{i}] {d['source']}{d['title']}")
print(f" Link: {d.get('url', '')}")
print()
def rag_answer(user_query: str, k: int = 3) -> str:
"""Wrapper used by Gradio app: returns news answer string for a query."""
return answer_news_query(user_query, k=k)