Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """News_RAG_Chatbot_Regional.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1ponOTh-xbMXz2dwwW7LhYlT7q-dgyyLy | |
| """ | |
| import re | |
| import requests | |
| import feedparser | |
| from bs4 import BeautifulSoup | |
| from datetime import timezone | |
| from dateutil import parser as dateparser | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| from transformers import GPT2LMHeadModel, GPT2Tokenizer | |
| # Load embedding + GPT-2 models (same as notebook) | |
| embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| tokenizer = GPT2Tokenizer.from_pretrained("openai-community/gpt2-large") | |
| gpt2 = GPT2LMHeadModel.from_pretrained("openai-community/gpt2-large") | |
| tokenizer.pad_token = tokenizer.eos_token | |
| print("Models loaded.") | |
| def clean_text(raw_html: str) -> str: | |
| """ | |
| Clean RSS summary / HTML content: | |
| - remove tags | |
| - drop images (and their alt text/captions) | |
| - collapse whitespace | |
| """ | |
| if not raw_html: | |
| return "" | |
| soup = BeautifulSoup(raw_html, "html.parser") | |
| for img in soup.find_all("img"): | |
| img.decompose() | |
| text = soup.get_text(separator=" ") | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| def safe_feedparse(url, timeout=5): | |
| """Fetch RSS safely with timeout and error handling.""" | |
| try: | |
| resp = requests.get(url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"}) | |
| resp.raise_for_status() | |
| return feedparser.parse(resp.text) | |
| except Exception as e: | |
| print("Failed to fetch:", url, "→", e) | |
| return feedparser.parse("") | |
| def fetch_articles_from_rss(source_name, feed_url, max_articles=20): | |
| feed = safe_feedparse(feed_url) | |
| print(source_name, "| entries in feed:", len(feed.entries)) | |
| docs = [] | |
| for entry in feed.entries[:max_articles]: | |
| title = entry.get("title", "").strip() | |
| summary = entry.get("summary", "").strip() | |
| url = entry.get("link", "") | |
| clean_summary = clean_text(summary) | |
| text = clean_summary or title | |
| if not text: | |
| continue | |
| docs.append({ | |
| "source": source_name, | |
| "title": title, | |
| "text": text, | |
| "url": url, | |
| "published": entry.get("published", "") | |
| }) | |
| return docs | |
| NEWS_SOURCES = { | |
| "Global News BC": "https://globalnews.ca/bc/feed/", | |
| "CBC BC": "https://www.cbc.ca/webfeed/rss/rss-canada-britishcolumbia", | |
| "The Province": "https://theprovince.com/feed", | |
| "CBC Canada": "https://www.cbc.ca/webfeed/rss/rss-canada", | |
| "CNN World": "http://rss.cnn.com/rss/edition_world.rss", | |
| "BBC World": "http://feeds.bbci.co.uk/news/world/rss.xml", | |
| } | |
| all_documents = [] | |
| for name, url in NEWS_SOURCES.items(): | |
| docs = fetch_articles_from_rss(name, url, max_articles=50) | |
| print(" → Retrieved:", len(docs)) | |
| all_documents.extend(docs) | |
| print("Total articles:", len(all_documents)) | |
| # Just evaluating this in the original notebook; harmless here: | |
| len(all_documents), all_documents[0] if all_documents else None | |
| corpus_texts = [ | |
| doc["title"] + "\n\n" + doc["text"] | |
| for doc in all_documents | |
| ] | |
| corpus_embeddings = embed_model.encode(corpus_texts, convert_to_numpy=True).astype("float32") | |
| embedding_dim = corpus_embeddings.shape[1] | |
| index = faiss.IndexFlatL2(embedding_dim) | |
| index.add(corpus_embeddings) | |
| print("FAISS index ready. Documents indexed:", len(corpus_texts)) | |
| REGION_KEYWORDS = { | |
| "vancouver": [ | |
| "vancouver", "downtown vancouver", "granville", "kitsilano", | |
| "mount pleasant", "gastown", "yvr" | |
| ], | |
| "bc": [ | |
| "b.c.", "british columbia", "vancouver", "surrey", "burnaby", | |
| "richmond", "delta", "coquitlam", "langley", "abbotsford", | |
| "nanaimo", "kelowna", "kamloops", "prince george", "cranbrook", | |
| ], | |
| } | |
| def is_bc_news(doc): | |
| title = doc.get("title", "").lower() | |
| summary = doc.get("text", "").lower() | |
| bc_keywords = [ | |
| "british columbia", "b.c.", " bc ", | |
| "vancouver", "surrey", "burnaby", "richmond", | |
| "coquitlam", "delta", "new westminster", | |
| "north vancouver", "west vancouver", | |
| "langley", "abbotsford", "chilliwack", | |
| "victoria", "nanaimo", "kelowna", "kamloops", | |
| "lytton", "cranbrook", | |
| ] | |
| return any(k in title or k in summary for k in bc_keywords) | |
| bc_documents = [d for d in all_documents if is_bc_news(d)] | |
| print("Raw documents loaded:", len(all_documents)) | |
| print("BC / Vancouver filtered documents:", len(bc_documents)) | |
| from sklearn.neighbors import NearestNeighbors | |
| bc_texts_for_index = [ | |
| (d.get("title", "") + " " + d.get("text", "")).strip() | |
| for d in bc_documents | |
| ] | |
| if bc_texts_for_index: | |
| bc_embeddings = embed_model.encode(bc_texts_for_index, convert_to_numpy=True) | |
| nn_model = NearestNeighbors(metric="cosine") | |
| nn_model.fit(bc_embeddings) | |
| print("Embeddings shape:", bc_embeddings.shape) | |
| else: | |
| bc_embeddings = None | |
| nn_model = None | |
| print("No BC docs available for indexing.") | |
| print("Raw documents loaded:", len(all_documents)) | |
| bc_documents = [d for d in all_documents if is_bc_news(d)] | |
| print("BC / Vancouver filtered documents:", len(bc_documents)) | |
| def doc_matches_region(doc, region: str) -> bool: | |
| """ | |
| Return True if this document looks related to the region. | |
| Uses simple keyword search over title + text. | |
| """ | |
| if not region: | |
| return True | |
| region = region.lower() | |
| keywords = REGION_KEYWORDS.get(region, [region]) | |
| haystack = (doc.get("title", "") + " " + doc.get("text", "")).lower() | |
| return any(kw in haystack for kw in keywords) | |
| def retrieve_docs(query, k=3): | |
| """ | |
| Retrieve top-k BC/Vancouver documents for a query using cosine similarity. | |
| Uses bc_documents + bc_embeddings + nn_model. | |
| """ | |
| if not bc_documents or nn_model is None or bc_embeddings is None: | |
| return [] | |
| q_emb = embed_model.encode([query], convert_to_numpy=True) | |
| k_eff = min(k, len(bc_documents)) | |
| distances, indices = nn_model.kneighbors(q_emb, n_neighbors=k_eff) | |
| docs = [] | |
| for dist, idx in zip(distances[0], indices[0]): | |
| if 0 <= idx < len(bc_documents): | |
| doc = bc_documents[idx].copy() | |
| doc["distance"] = float(dist) | |
| docs.append(doc) | |
| return docs | |
| def extractive_summary(text, n_sentences=2): | |
| """ | |
| Extractive summary using sentence embeddings. | |
| Cleans HTML, avoids tiny fragments, keeps sentence order. | |
| """ | |
| if not text: | |
| return "" | |
| text_clean = re.sub(r"<[^>]+>", " ", text) | |
| text_clean = re.sub(r'\s+', ' ', text_clean).strip() | |
| sentences = re.split(r'(?<=[.!?])\s+', text_clean) | |
| sentences = [s.strip() for s in sentences if len(s.strip()) > 5] | |
| if not sentences: | |
| return "" | |
| if len(sentences) <= n_sentences: | |
| return " ".join(sentences) | |
| sent_embs = embed_model.encode(sentences, convert_to_numpy=True) | |
| doc_emb = embed_model.encode([text_clean], convert_to_numpy=True)[0] | |
| sims = cosine_similarity(sent_embs, doc_emb.reshape(1, -1)).flatten() | |
| top_idx = np.argsort(sims)[-n_sentences:] | |
| top_idx = sorted(top_idx) | |
| return " ".join(sentences[i] for i in top_idx) | |
| def summarize_doc(doc, n_sentences=2): | |
| source = doc["source"] | |
| title = doc["title"] | |
| text = doc["text"] | |
| url = doc.get("url", "") | |
| short = extractive_summary(text, n_sentences=n_sentences) | |
| if not short: | |
| return "No content to summarize." | |
| line = f"{short} (Source: {source} — {title})" | |
| if url: | |
| line += f"\nLink: {url}" | |
| return line | |
| # Sample in notebook: | |
| # sample = bc_documents[0] | |
| # print(summarize_doc(sample)) | |
| def get_published_datetime(doc): | |
| pub = doc.get("published", "") | |
| if not pub: | |
| return None | |
| try: | |
| dt = dateparser.parse(pub) | |
| if dt is None: | |
| return None | |
| if dt.tzinfo is not None: | |
| dt = dt.astimezone(timezone.utc).replace(tzinfo=None) | |
| return dt | |
| except Exception: | |
| return None | |
| def summarize_latest_news(n=5): | |
| dated_docs = [] | |
| for d in bc_documents: | |
| dt = get_published_datetime(d) | |
| if dt is not None: | |
| dated_docs.append((dt, d)) | |
| if not dated_docs: | |
| return "No recent articles available." | |
| dated_docs.sort(key=lambda x: x[0], reverse=True) | |
| top_docs = [d for _, d in dated_docs[:n]] | |
| summaries = [] | |
| for doc in top_docs: | |
| summaries.append(summarize_doc(doc, n_sentences=2)) | |
| if not summaries: | |
| return "No recent articles available." | |
| return "\n\n".join(summaries) | |
| # print(summarize_latest_news(n=5)) | |
| def search_articles(query, k=5): | |
| retrieved = retrieve_docs(query, k=k) | |
| if len(retrieved) == 0: | |
| print("No relevant news found for this query.") | |
| return [] | |
| for idx, doc in enumerate(retrieved): | |
| print(f"[{idx}] {doc['title']} ({doc['source']})") | |
| if doc.get("published"): | |
| print(" Published:", doc["published"]) | |
| if doc.get("url"): | |
| print(" Link:", doc["url"]) | |
| print() | |
| return retrieved | |
| # candidates = search_articles("pipeline northern BC", k=5) | |
| def tidy_summary(text: str) -> str: | |
| """ | |
| Light cleanup for readability. | |
| - Fix common run-ons like 'says The' → 'says. The' | |
| - Collapse weird extra spaces | |
| """ | |
| text = re.sub(r"\b(says|said)\s+([A-Z])", r"\1. \2", text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| def rewrite_with_gpt2_xl(extractive_text, max_new_tokens=80): | |
| """ | |
| Use GPT-2 XL to rewrite an extractive summary into a short, fluent paragraph. | |
| We tell it explicitly NOT to add new information. | |
| """ | |
| text = (extractive_text or "").strip() | |
| if len(text) < 10: | |
| return extractive_text | |
| prompt = ( | |
| "You are a news summarization assistant.\n" | |
| "Rewrite the following text as a concise local news summary (2–3 sentences).\n" | |
| "Keep all facts the same. Do NOT add any new information.\n\n" | |
| "TEXT:\n" | |
| f"{text}\n\n" | |
| "SUMMARY:" | |
| ) | |
| inputs = tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512, | |
| padding=True, | |
| ) | |
| output_ids = gpt2.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=False, | |
| repetition_penalty=1.2, | |
| pad_token_id=tokenizer.eos_token_id, | |
| ) | |
| generated = output_ids[0][inputs["input_ids"].shape[1]:] | |
| summary = tokenizer.decode(generated, skip_special_tokens=True).strip() | |
| if len(summary) < 20: | |
| return extractive_text | |
| return summary | |
| def fused_summary_hybrid(docs, n_sentences=4, use_generative=True): | |
| """ | |
| 1) Extractive summarization over retrieved docs (factual). | |
| 2) Optional GPT-2 XL rewrite to improve fluency. | |
| 3) Append source list. | |
| """ | |
| combined_text = " ".join(d["text"] for d in docs) | |
| extracted = extractive_summary(combined_text, n_sentences=n_sentences) | |
| extracted = re.sub(r'\s+', ' ', extracted).strip() | |
| if use_generative: | |
| rewritten = rewrite_with_gpt2_xl(extracted, max_new_tokens=96) | |
| else: | |
| rewritten = extracted | |
| sources_block = "\n".join([ | |
| f"- {d['source']} — {d['title']} ({d.get('url','')})" | |
| for d in docs | |
| ]) | |
| final = f"{rewritten}\n\nSources:\n{sources_block}" | |
| return final | |
| def fused_summary(docs, n_sentences=4): | |
| """ | |
| Combine multiple articles into ONE clean extractive summary. | |
| """ | |
| if not docs: | |
| return "No relevant news found for this query." | |
| combined_text = " ".join(d["text"] for d in docs if d.get("text")) | |
| if not combined_text.strip(): | |
| return "No relevant news content available." | |
| extracted = extractive_summary(combined_text, n_sentences=n_sentences) | |
| extracted = re.sub(r"\s+", " ", extracted).strip() | |
| sources_block = "\n".join([ | |
| f"- {d['source']} — {d['title']} ({d.get('url', '')})" | |
| for d in docs | |
| ]) | |
| final = f"{extracted}\n\nSources:\n{sources_block}" | |
| return final | |
| def find_keyword_matches(query, docs, max_docs=10): | |
| """ | |
| Return docs that contain at least one non-trivial word from the query | |
| in their title or text. | |
| """ | |
| q = query.lower() | |
| tokens = [w for w in re.split(r"\W+", q) if len(w) > 3] | |
| if not tokens: | |
| return [] | |
| matches = [] | |
| for d in docs: | |
| title = d.get("title", "").lower() | |
| text = d.get("text", "").lower() | |
| for t in tokens: | |
| if t in title or t in text: | |
| matches.append(d) | |
| break | |
| return matches[:max_docs] | |
| STOPWORDS = { | |
| "what", "happened", "story", "about", "the", "a", "an", | |
| "in", "on", "of", "for", "to", "with", "recent", "latest", | |
| "summarize", "summary", "tell", "me", "news", "case", | |
| "issue", "situation", "update", "updates" | |
| } | |
| def get_strong_tokens(query: str): | |
| """ | |
| Extract 'strong' content words from the user query: | |
| - lowercase | |
| - at least 4 characters | |
| - not in a small stopword list | |
| """ | |
| tokens = re.findall(r"[a-zA-Z]+", query.lower()) | |
| strong = [ | |
| t for t in tokens | |
| if len(t) >= 4 and t not in STOPWORDS | |
| ] | |
| return strong | |
| def filter_docs_by_tokens(docs, strong_tokens, min_matches: int = 2): | |
| """ | |
| Keep only docs that contain at least `min_matches` of the strong tokens | |
| in their title+text. If strong_tokens is empty, just return docs unchanged. | |
| """ | |
| if not strong_tokens: | |
| return docs | |
| filtered = [] | |
| for d in docs: | |
| haystack = (d.get("title", "") + " " + d.get("text", "")).lower() | |
| count = sum(1 for t in strong_tokens if t in haystack) | |
| if count >= min_matches: | |
| filtered.append(d) | |
| return filtered | |
| def title_keyword_search(strong_tokens, max_docs=3): | |
| """ | |
| Fallback: search ALL documents (BC + others) by keyword overlap | |
| in TITLE + TEXT. Returns docs sorted by how many strong tokens they match. | |
| """ | |
| if not strong_tokens: | |
| return [] | |
| candidates = [] | |
| for d in all_documents: | |
| haystack = (d.get("title", "") + " " + d.get("text", "")).lower() | |
| score = sum(1 for t in strong_tokens if t in haystack) | |
| if score > 0: | |
| candidates.append((score, d)) | |
| candidates.sort(key=lambda x: x[0], reverse=True) | |
| return [d for score, d in candidates[:max_docs]] | |
| def safe_retrieve(query, k=3): | |
| """ | |
| Try keyword-based retrieval first (good for very specific terms like 'pistachio'). | |
| If that fails, fall back to embedding-based nearest-neighbour search. | |
| If that still fails, try a loose keyword fallback. | |
| """ | |
| keyword_matches = find_keyword_matches(query, bc_documents, max_docs=20) | |
| if keyword_matches: | |
| return keyword_matches[:k] | |
| docs = retrieve_docs(query, k=k) | |
| if docs: | |
| return docs | |
| keywords = query.lower().split() | |
| alt_docs = [] | |
| for kw in keywords: | |
| alt_docs.extend(retrieve_docs(kw, k=1)) | |
| seen = set() | |
| unique = [] | |
| for d in alt_docs: | |
| key = d.get("url") or d.get("title") | |
| if key and key not in seen: | |
| seen.add(key) | |
| unique.append(d) | |
| return unique[:k] | |
| def summarize_query_with_rag(query, k=3): | |
| retrieved = safe_retrieve(query, k=k) | |
| if not retrieved: | |
| return f"No articles match your query: “{query}”" | |
| return fused_summary(retrieved, n_sentences=4) | |
| def rewrite_with_gpt2(summary, max_new_tokens=40): | |
| """ | |
| Use GPT-2 to lightly rewrite the extractive summary for readability. | |
| We prepend strong instructions to avoid adding new facts, but GPT-2 can still hallucinate. | |
| """ | |
| prompt = ( | |
| "Rewrite the following news summary in clear, fluent English. " | |
| "Do NOT add any new information or details that are not already stated.\n\n" | |
| f"SUMMARY:\n{summary}\n\nREWRITE:\n" | |
| ) | |
| inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True) | |
| output_ids = gpt2.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=False, | |
| pad_token_id=tokenizer.eos_token_id, | |
| ) | |
| generated = output_ids[0][inputs["input_ids"].shape[1]:] | |
| rewritten = tokenizer.decode(generated, skip_special_tokens=True).strip() | |
| return rewritten | |
| def answer_news_query(user_query: str, k: int = 3): | |
| """ | |
| 1) Retrieve top-k docs (semantic search). | |
| 2) Filter them using strong tokens from the query. | |
| 3) If that fails, do a title-based keyword search over all docs. | |
| 4) Fuse remaining docs into one extractive summary. | |
| """ | |
| docs = safe_retrieve(user_query, k=k) | |
| if not docs: | |
| return "No relevant news found for this query." | |
| strong_tokens = get_strong_tokens(user_query) | |
| filtered = filter_docs_by_tokens(docs, strong_tokens, min_matches=2) | |
| if filtered: | |
| docs_to_summarize = filtered | |
| else: | |
| title_docs = title_keyword_search(strong_tokens, max_docs=k) | |
| if title_docs: | |
| docs_to_summarize = title_docs | |
| else: | |
| docs_to_summarize = docs[:1] | |
| final = fused_summary(docs_to_summarize, n_sentences=3) | |
| return final | |
| def show_sample_headlines(n=15): | |
| print(f"Total articles loaded: {len(bc_documents)}\n") | |
| for i, d in enumerate(bc_documents[:n]): | |
| print(f"[{i}] {d['source']} — {d['title']}") | |
| print(f" Link: {d.get('url', '')}") | |
| print() | |
| def rag_answer(user_query: str, k: int = 3) -> str: | |
| """Wrapper used by Gradio app: returns news answer string for a query.""" | |
| return answer_news_query(user_query, k=k) | |