# -*- coding: utf-8 -*- """News_RAG_Chatbot_Regional.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1ponOTh-xbMXz2dwwW7LhYlT7q-dgyyLy """ import re import requests import feedparser from bs4 import BeautifulSoup from datetime import timezone from dateutil import parser as dateparser import numpy as np from sklearn.metrics.pairwise import cosine_similarity from sentence_transformers import SentenceTransformer import faiss from transformers import GPT2LMHeadModel, GPT2Tokenizer # Load embedding + GPT-2 models (same as notebook) embed_model = SentenceTransformer("all-MiniLM-L6-v2") tokenizer = GPT2Tokenizer.from_pretrained("openai-community/gpt2-large") gpt2 = GPT2LMHeadModel.from_pretrained("openai-community/gpt2-large") tokenizer.pad_token = tokenizer.eos_token print("Models loaded.") def clean_text(raw_html: str) -> str: """ Clean RSS summary / HTML content: - remove tags - drop images (and their alt text/captions) - collapse whitespace """ if not raw_html: return "" soup = BeautifulSoup(raw_html, "html.parser") for img in soup.find_all("img"): img.decompose() text = soup.get_text(separator=" ") text = re.sub(r"\s+", " ", text).strip() return text def safe_feedparse(url, timeout=5): """Fetch RSS safely with timeout and error handling.""" try: resp = requests.get(url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"}) resp.raise_for_status() return feedparser.parse(resp.text) except Exception as e: print("Failed to fetch:", url, "→", e) return feedparser.parse("") def fetch_articles_from_rss(source_name, feed_url, max_articles=20): feed = safe_feedparse(feed_url) print(source_name, "| entries in feed:", len(feed.entries)) docs = [] for entry in feed.entries[:max_articles]: title = entry.get("title", "").strip() summary = entry.get("summary", "").strip() url = entry.get("link", "") clean_summary = clean_text(summary) text = clean_summary or title if not text: continue docs.append({ "source": source_name, "title": title, "text": text, "url": url, "published": entry.get("published", "") }) return docs NEWS_SOURCES = { "Global News BC": "https://globalnews.ca/bc/feed/", "CBC BC": "https://www.cbc.ca/webfeed/rss/rss-canada-britishcolumbia", "The Province": "https://theprovince.com/feed", "CBC Canada": "https://www.cbc.ca/webfeed/rss/rss-canada", "CNN World": "http://rss.cnn.com/rss/edition_world.rss", "BBC World": "http://feeds.bbci.co.uk/news/world/rss.xml", } all_documents = [] for name, url in NEWS_SOURCES.items(): docs = fetch_articles_from_rss(name, url, max_articles=50) print(" → Retrieved:", len(docs)) all_documents.extend(docs) print("Total articles:", len(all_documents)) # Just evaluating this in the original notebook; harmless here: len(all_documents), all_documents[0] if all_documents else None corpus_texts = [ doc["title"] + "\n\n" + doc["text"] for doc in all_documents ] corpus_embeddings = embed_model.encode(corpus_texts, convert_to_numpy=True).astype("float32") embedding_dim = corpus_embeddings.shape[1] index = faiss.IndexFlatL2(embedding_dim) index.add(corpus_embeddings) print("FAISS index ready. Documents indexed:", len(corpus_texts)) REGION_KEYWORDS = { "vancouver": [ "vancouver", "downtown vancouver", "granville", "kitsilano", "mount pleasant", "gastown", "yvr" ], "bc": [ "b.c.", "british columbia", "vancouver", "surrey", "burnaby", "richmond", "delta", "coquitlam", "langley", "abbotsford", "nanaimo", "kelowna", "kamloops", "prince george", "cranbrook", ], } def is_bc_news(doc): title = doc.get("title", "").lower() summary = doc.get("text", "").lower() bc_keywords = [ "british columbia", "b.c.", " bc ", "vancouver", "surrey", "burnaby", "richmond", "coquitlam", "delta", "new westminster", "north vancouver", "west vancouver", "langley", "abbotsford", "chilliwack", "victoria", "nanaimo", "kelowna", "kamloops", "lytton", "cranbrook", ] return any(k in title or k in summary for k in bc_keywords) bc_documents = [d for d in all_documents if is_bc_news(d)] print("Raw documents loaded:", len(all_documents)) print("BC / Vancouver filtered documents:", len(bc_documents)) from sklearn.neighbors import NearestNeighbors bc_texts_for_index = [ (d.get("title", "") + " " + d.get("text", "")).strip() for d in bc_documents ] if bc_texts_for_index: bc_embeddings = embed_model.encode(bc_texts_for_index, convert_to_numpy=True) nn_model = NearestNeighbors(metric="cosine") nn_model.fit(bc_embeddings) print("Embeddings shape:", bc_embeddings.shape) else: bc_embeddings = None nn_model = None print("No BC docs available for indexing.") print("Raw documents loaded:", len(all_documents)) bc_documents = [d for d in all_documents if is_bc_news(d)] print("BC / Vancouver filtered documents:", len(bc_documents)) def doc_matches_region(doc, region: str) -> bool: """ Return True if this document looks related to the region. Uses simple keyword search over title + text. """ if not region: return True region = region.lower() keywords = REGION_KEYWORDS.get(region, [region]) haystack = (doc.get("title", "") + " " + doc.get("text", "")).lower() return any(kw in haystack for kw in keywords) def retrieve_docs(query, k=3): """ Retrieve top-k BC/Vancouver documents for a query using cosine similarity. Uses bc_documents + bc_embeddings + nn_model. """ if not bc_documents or nn_model is None or bc_embeddings is None: return [] q_emb = embed_model.encode([query], convert_to_numpy=True) k_eff = min(k, len(bc_documents)) distances, indices = nn_model.kneighbors(q_emb, n_neighbors=k_eff) docs = [] for dist, idx in zip(distances[0], indices[0]): if 0 <= idx < len(bc_documents): doc = bc_documents[idx].copy() doc["distance"] = float(dist) docs.append(doc) return docs def extractive_summary(text, n_sentences=2): """ Extractive summary using sentence embeddings. Cleans HTML, avoids tiny fragments, keeps sentence order. """ if not text: return "" text_clean = re.sub(r"<[^>]+>", " ", text) text_clean = re.sub(r'\s+', ' ', text_clean).strip() sentences = re.split(r'(?<=[.!?])\s+', text_clean) sentences = [s.strip() for s in sentences if len(s.strip()) > 5] if not sentences: return "" if len(sentences) <= n_sentences: return " ".join(sentences) sent_embs = embed_model.encode(sentences, convert_to_numpy=True) doc_emb = embed_model.encode([text_clean], convert_to_numpy=True)[0] sims = cosine_similarity(sent_embs, doc_emb.reshape(1, -1)).flatten() top_idx = np.argsort(sims)[-n_sentences:] top_idx = sorted(top_idx) return " ".join(sentences[i] for i in top_idx) def summarize_doc(doc, n_sentences=2): source = doc["source"] title = doc["title"] text = doc["text"] url = doc.get("url", "") short = extractive_summary(text, n_sentences=n_sentences) if not short: return "No content to summarize." line = f"{short} (Source: {source} — {title})" if url: line += f"\nLink: {url}" return line # Sample in notebook: # sample = bc_documents[0] # print(summarize_doc(sample)) def get_published_datetime(doc): pub = doc.get("published", "") if not pub: return None try: dt = dateparser.parse(pub) if dt is None: return None if dt.tzinfo is not None: dt = dt.astimezone(timezone.utc).replace(tzinfo=None) return dt except Exception: return None def summarize_latest_news(n=5): dated_docs = [] for d in bc_documents: dt = get_published_datetime(d) if dt is not None: dated_docs.append((dt, d)) if not dated_docs: return "No recent articles available." dated_docs.sort(key=lambda x: x[0], reverse=True) top_docs = [d for _, d in dated_docs[:n]] summaries = [] for doc in top_docs: summaries.append(summarize_doc(doc, n_sentences=2)) if not summaries: return "No recent articles available." return "\n\n".join(summaries) # print(summarize_latest_news(n=5)) def search_articles(query, k=5): retrieved = retrieve_docs(query, k=k) if len(retrieved) == 0: print("No relevant news found for this query.") return [] for idx, doc in enumerate(retrieved): print(f"[{idx}] {doc['title']} ({doc['source']})") if doc.get("published"): print(" Published:", doc["published"]) if doc.get("url"): print(" Link:", doc["url"]) print() return retrieved # candidates = search_articles("pipeline northern BC", k=5) def tidy_summary(text: str) -> str: """ Light cleanup for readability. - Fix common run-ons like 'says The' → 'says. The' - Collapse weird extra spaces """ text = re.sub(r"\b(says|said)\s+([A-Z])", r"\1. \2", text) text = re.sub(r"\s+", " ", text).strip() return text def rewrite_with_gpt2_xl(extractive_text, max_new_tokens=80): """ Use GPT-2 XL to rewrite an extractive summary into a short, fluent paragraph. We tell it explicitly NOT to add new information. """ text = (extractive_text or "").strip() if len(text) < 10: return extractive_text prompt = ( "You are a news summarization assistant.\n" "Rewrite the following text as a concise local news summary (2–3 sentences).\n" "Keep all facts the same. Do NOT add any new information.\n\n" "TEXT:\n" f"{text}\n\n" "SUMMARY:" ) inputs = tokenizer( prompt, return_tensors="pt", truncation=True, max_length=512, padding=True, ) output_ids = gpt2.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=False, repetition_penalty=1.2, pad_token_id=tokenizer.eos_token_id, ) generated = output_ids[0][inputs["input_ids"].shape[1]:] summary = tokenizer.decode(generated, skip_special_tokens=True).strip() if len(summary) < 20: return extractive_text return summary def fused_summary_hybrid(docs, n_sentences=4, use_generative=True): """ 1) Extractive summarization over retrieved docs (factual). 2) Optional GPT-2 XL rewrite to improve fluency. 3) Append source list. """ combined_text = " ".join(d["text"] for d in docs) extracted = extractive_summary(combined_text, n_sentences=n_sentences) extracted = re.sub(r'\s+', ' ', extracted).strip() if use_generative: rewritten = rewrite_with_gpt2_xl(extracted, max_new_tokens=96) else: rewritten = extracted sources_block = "\n".join([ f"- {d['source']} — {d['title']} ({d.get('url','')})" for d in docs ]) final = f"{rewritten}\n\nSources:\n{sources_block}" return final def fused_summary(docs, n_sentences=4): """ Combine multiple articles into ONE clean extractive summary. """ if not docs: return "No relevant news found for this query." combined_text = " ".join(d["text"] for d in docs if d.get("text")) if not combined_text.strip(): return "No relevant news content available." extracted = extractive_summary(combined_text, n_sentences=n_sentences) extracted = re.sub(r"\s+", " ", extracted).strip() sources_block = "\n".join([ f"- {d['source']} — {d['title']} ({d.get('url', '')})" for d in docs ]) final = f"{extracted}\n\nSources:\n{sources_block}" return final def find_keyword_matches(query, docs, max_docs=10): """ Return docs that contain at least one non-trivial word from the query in their title or text. """ q = query.lower() tokens = [w for w in re.split(r"\W+", q) if len(w) > 3] if not tokens: return [] matches = [] for d in docs: title = d.get("title", "").lower() text = d.get("text", "").lower() for t in tokens: if t in title or t in text: matches.append(d) break return matches[:max_docs] STOPWORDS = { "what", "happened", "story", "about", "the", "a", "an", "in", "on", "of", "for", "to", "with", "recent", "latest", "summarize", "summary", "tell", "me", "news", "case", "issue", "situation", "update", "updates" } def get_strong_tokens(query: str): """ Extract 'strong' content words from the user query: - lowercase - at least 4 characters - not in a small stopword list """ tokens = re.findall(r"[a-zA-Z]+", query.lower()) strong = [ t for t in tokens if len(t) >= 4 and t not in STOPWORDS ] return strong def filter_docs_by_tokens(docs, strong_tokens, min_matches: int = 2): """ Keep only docs that contain at least `min_matches` of the strong tokens in their title+text. If strong_tokens is empty, just return docs unchanged. """ if not strong_tokens: return docs filtered = [] for d in docs: haystack = (d.get("title", "") + " " + d.get("text", "")).lower() count = sum(1 for t in strong_tokens if t in haystack) if count >= min_matches: filtered.append(d) return filtered def title_keyword_search(strong_tokens, max_docs=3): """ Fallback: search ALL documents (BC + others) by keyword overlap in TITLE + TEXT. Returns docs sorted by how many strong tokens they match. """ if not strong_tokens: return [] candidates = [] for d in all_documents: haystack = (d.get("title", "") + " " + d.get("text", "")).lower() score = sum(1 for t in strong_tokens if t in haystack) if score > 0: candidates.append((score, d)) candidates.sort(key=lambda x: x[0], reverse=True) return [d for score, d in candidates[:max_docs]] def safe_retrieve(query, k=3): """ Try keyword-based retrieval first (good for very specific terms like 'pistachio'). If that fails, fall back to embedding-based nearest-neighbour search. If that still fails, try a loose keyword fallback. """ keyword_matches = find_keyword_matches(query, bc_documents, max_docs=20) if keyword_matches: return keyword_matches[:k] docs = retrieve_docs(query, k=k) if docs: return docs keywords = query.lower().split() alt_docs = [] for kw in keywords: alt_docs.extend(retrieve_docs(kw, k=1)) seen = set() unique = [] for d in alt_docs: key = d.get("url") or d.get("title") if key and key not in seen: seen.add(key) unique.append(d) return unique[:k] def summarize_query_with_rag(query, k=3): retrieved = safe_retrieve(query, k=k) if not retrieved: return f"No articles match your query: “{query}”" return fused_summary(retrieved, n_sentences=4) def rewrite_with_gpt2(summary, max_new_tokens=40): """ Use GPT-2 to lightly rewrite the extractive summary for readability. We prepend strong instructions to avoid adding new facts, but GPT-2 can still hallucinate. """ prompt = ( "Rewrite the following news summary in clear, fluent English. " "Do NOT add any new information or details that are not already stated.\n\n" f"SUMMARY:\n{summary}\n\nREWRITE:\n" ) inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True) output_ids = gpt2.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=False, pad_token_id=tokenizer.eos_token_id, ) generated = output_ids[0][inputs["input_ids"].shape[1]:] rewritten = tokenizer.decode(generated, skip_special_tokens=True).strip() return rewritten def answer_news_query(user_query: str, k: int = 3): """ 1) Retrieve top-k docs (semantic search). 2) Filter them using strong tokens from the query. 3) If that fails, do a title-based keyword search over all docs. 4) Fuse remaining docs into one extractive summary. """ docs = safe_retrieve(user_query, k=k) if not docs: return "No relevant news found for this query." strong_tokens = get_strong_tokens(user_query) filtered = filter_docs_by_tokens(docs, strong_tokens, min_matches=2) if filtered: docs_to_summarize = filtered else: title_docs = title_keyword_search(strong_tokens, max_docs=k) if title_docs: docs_to_summarize = title_docs else: docs_to_summarize = docs[:1] final = fused_summary(docs_to_summarize, n_sentences=3) return final def show_sample_headlines(n=15): print(f"Total articles loaded: {len(bc_documents)}\n") for i, d in enumerate(bc_documents[:n]): print(f"[{i}] {d['source']} — {d['title']}") print(f" Link: {d.get('url', '')}") print() def rag_answer(user_query: str, k: int = 3) -> str: """Wrapper used by Gradio app: returns news answer string for a query.""" return answer_news_query(user_query, k=k)