Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify, render_template | |
| from urllib.request import Request, urlopen | |
| from bs4 import BeautifulSoup | |
| import nltk | |
| import re | |
| import socket | |
| from urllib.parse import urlparse | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.cluster import KMeans | |
| import numpy as np | |
| # Ensure NLTK data exists | |
| nltk.download("punkt", quiet=True) | |
| nltk.download("punkt_tab", quiet=True) | |
| from nltk.tokenize import word_tokenize, sent_tokenize | |
| app = Flask(__name__) | |
| # ------------------------- | |
| # Helper: fetch page safely | |
| # ------------------------- | |
| def fetch_page(url, timeout=15): | |
| """ | |
| Fetch URL content using urllib with a browser-like User-Agent. | |
| Returns cleaned text or raises Exception. | |
| """ | |
| try: | |
| req = Request( | |
| url, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/120.0 Safari/537.36" | |
| }, | |
| ) | |
| resp = urlopen(req, timeout=timeout) | |
| raw = resp.read() | |
| soup = BeautifulSoup(raw, "html.parser") | |
| # remove scripts/styles etc | |
| for tag in soup(["script", "style", "noscript", "iframe", "header", "footer"]): | |
| tag.extract() | |
| text = soup.get_text(separator=" ") | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| except Exception as e: | |
| raise | |
| # ------------------------- | |
| # Helper: extract heading tag text | |
| # ------------------------- | |
| def extract_heading_text(soup, tag): | |
| elements = soup.find_all(tag) | |
| return " ".join([el.get_text(" ", strip=True) for el in elements]).strip() | |
| # ------------------------- | |
| # Clean / normalize text | |
| # ------------------------- | |
| def clean_text(t): | |
| return re.sub(r"\s+", " ", t or "").strip() | |
| # ------------------------- | |
| # Summarize (extractive) | |
| # ------------------------- | |
| def summarize(text, num_sentences=3): | |
| sentences = sent_tokenize(text) | |
| if len(sentences) <= num_sentences: | |
| return " ".join(sentences) | |
| try: | |
| vec = TfidfVectorizer(stop_words="english") | |
| X = vec.fit_transform(sentences) | |
| scores = np.array(X.sum(axis=1)).ravel() | |
| top_idx = scores.argsort()[-num_sentences:][::-1] | |
| top_sentences = [sentences[i] for i in sorted(top_idx)] | |
| return " ".join(top_sentences) | |
| except Exception: | |
| return " ".join(sentences[:num_sentences]) | |
| # ------------------------- | |
| # Topic clustering | |
| # ------------------------- | |
| def cluster_texts(texts, n_clusters=3): | |
| if len(texts) == 0: | |
| return [] | |
| if len(texts) <= 1: | |
| return [0] * len(texts) | |
| k = min(n_clusters, len(texts)) | |
| vec = TfidfVectorizer(stop_words="english") | |
| X = vec.fit_transform(texts) | |
| kmeans = KMeans(n_clusters=k, random_state=0, n_init=10) | |
| labels = kmeans.fit_predict(X) | |
| return labels.tolist() | |
| # ------------------------- | |
| # Duplicate detection (cosine) | |
| # ------------------------- | |
| def detect_duplicates(texts, threshold=0.55): | |
| n = len(texts) | |
| if n <= 1: | |
| return [] | |
| vec = TfidfVectorizer(stop_words="english") | |
| X = vec.fit_transform(texts) | |
| sim = cosine_similarity(X) | |
| groups = [] | |
| used = set() | |
| for i in range(n): | |
| if i in used: | |
| continue | |
| group = [i] | |
| used.add(i) | |
| for j in range(i + 1, n): | |
| if sim[i, j] >= threshold: | |
| group.append(j) | |
| used.add(j) | |
| if len(group) > 1: | |
| groups.append(group) | |
| return groups | |
| # ------------------------- | |
| # Sentence-level change detection (exact-match) | |
| # ------------------------- | |
| def changed_sentences(textA, textB): | |
| sA = [s.strip() for s in sent_tokenize(textA) if s.strip()] | |
| sB = [s.strip() for s in sent_tokenize(textB) if s.strip()] | |
| setA = set(sA) | |
| setB = set(sB) | |
| changedA = [s for s in sA if s not in setB] | |
| changedB = [s for s in sB if s not in setA] | |
| return changedA, changedB | |
| # ------------------------- | |
| # Return hostname helper | |
| # ------------------------- | |
| def hostname(url): | |
| try: | |
| p = urlparse(url) | |
| return p.netloc or url | |
| except Exception: | |
| return url | |
| # ------------------------- | |
| # Routes | |
| # ------------------------- | |
| def home(): | |
| # list of preselected sites (you can add/remove) | |
| sites = { | |
| "Indian Express": "https://indianexpress.com/", | |
| "Times of India": "https://timesofindia.indiatimes.com/", | |
| "NDTV": "https://www.ndtv.com/", | |
| "BBC News": "https://www.bbc.com/news", | |
| "CNN": "https://www.cnn.com/", | |
| "The Hindu": "https://www.thehindu.com/", | |
| } | |
| return render_template("index.html", sites=sites) | |
| def process_urls(): | |
| payload = request.get_json(force=True) | |
| urls = payload.get("urls", []) or [] | |
| mode = payload.get("mode", "tokenize") | |
| results = [] | |
| texts_for_clustering = [] | |
| for raw_url in urls: | |
| url = raw_url.strip() | |
| if not url: | |
| continue | |
| try: | |
| # fetch page raw | |
| req = Request( | |
| url, | |
| headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/120.0 Safari/537.36"} | |
| ) | |
| resp = urlopen(req, timeout=15) | |
| soup = BeautifulSoup(resp.read(), "html.parser") | |
| # choose extraction according to mode (H1..H6 or full) | |
| if mode in ["H1", "H2", "H3", "H4", "H5", "H6"]: | |
| tag = mode.lower() | |
| extracted = extract_heading_text(soup, tag) | |
| else: | |
| # full text | |
| for tag_rm in soup(["script", "style", "noscript", "iframe", "header", "footer"]): | |
| tag_rm.extract() | |
| extracted = soup.get_text(separator=" ") | |
| extracted = clean_text(extracted) | |
| words = [] | |
| sentences = [] | |
| if extracted: | |
| # tokenization may throw in weird content, guard it | |
| try: | |
| words = word_tokenize(extracted) | |
| except Exception: | |
| words = extracted.split() | |
| try: | |
| sentences = sent_tokenize(extracted) | |
| except Exception: | |
| sentences = [s.strip() for s in re.split(r'(?<=[.!?]) +', extracted) if s.strip()] | |
| summary = summarize(extracted) if extracted else "" | |
| texts_for_clustering.append(extracted) | |
| results.append({ | |
| "url": url, | |
| "host": hostname(url), | |
| "text": extracted, | |
| "words": words, | |
| "sentences": sentences, | |
| "summary": summary, | |
| }) | |
| except Exception as e: | |
| results.append({ | |
| "url": url, | |
| "host": hostname(url), | |
| "text": "", | |
| "words": [], | |
| "sentences": [], | |
| "summary": "", | |
| "error": str(e) | |
| }) | |
| # clustering | |
| texts_only = [r.get("text", "") for r in results] | |
| clusters = cluster_texts(texts_only, n_clusters=3) if len(texts_only) > 0 else [] | |
| # attach clusters (fill default 0 if sizes mismatch) | |
| if len(clusters) != len(results): | |
| clusters = [int(c) if i < len(clusters) else 0 for i, c in enumerate(range(len(results)))] | |
| for i, r in enumerate(results): | |
| r["cluster"] = int(clusters[i]) if i < len(clusters) else 0 | |
| # duplicate groups (convert index groups to url groups) | |
| dup_idx_groups = detect_duplicates(texts_only, threshold=0.55) | |
| dup_url_groups = [[results[i]["url"] for i in grp] for grp in dup_idx_groups] | |
| return jsonify({ | |
| "articles": results, | |
| "duplicate_groups": dup_url_groups | |
| }) | |
| def compare_texts_route(): | |
| data = request.get_json(force=True) | |
| text1 = data.get("text1", "") or "" | |
| text2 = data.get("text2", "") or "" | |
| # compute changed sentences (exact-match) | |
| changedA, changedB = changed_sentences(text1, text2) | |
| # build html: show only changed sentences highlighted, and keep order from original | |
| def highlight_html(original_text, changed_set): | |
| sents = [s.strip() for s in sent_tokenize(original_text) if s.strip()] | |
| pieces = [] | |
| for s in sents: | |
| if s in changed_set: | |
| pieces.append(f"<p class='changed'>{escape_html(s)}</p>") | |
| return "".join(pieces) | |
| left_html = highlight_html(text1, set(changedA)) | |
| right_html = highlight_html(text2, set(changedB)) | |
| return jsonify({"left": left_html, "right": right_html, "changedA_count": len(changedA), "changedB_count": len(changedB)}) | |
| # small helper used in templates/JS if needed | |
| def escape_html(s): | |
| return (s.replace("&", "&").replace("<", "<").replace(">", ">") | |
| .replace('"', """).replace("'", "'")) | |
| if __name__ == "__main__": | |
| # increase default socket timeout a bit | |
| socket.setdefaulttimeout(20) | |
| app.run(host="0.0.0.0", port=7860, debug=False) |