webscraping1 / app.py
Gagan0141's picture
Update app.py
1049881 verified
from flask import Flask, request, jsonify, render_template
from urllib.request import Request, urlopen
from bs4 import BeautifulSoup
import nltk
import re
import socket
from urllib.parse import urlparse
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import numpy as np
# Ensure NLTK data exists
nltk.download("punkt", quiet=True)
nltk.download("punkt_tab", quiet=True)
from nltk.tokenize import word_tokenize, sent_tokenize
app = Flask(__name__)
# -------------------------
# Helper: fetch page safely
# -------------------------
def fetch_page(url, timeout=15):
"""
Fetch URL content using urllib with a browser-like User-Agent.
Returns cleaned text or raises Exception.
"""
try:
req = Request(
url,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0 Safari/537.36"
},
)
resp = urlopen(req, timeout=timeout)
raw = resp.read()
soup = BeautifulSoup(raw, "html.parser")
# remove scripts/styles etc
for tag in soup(["script", "style", "noscript", "iframe", "header", "footer"]):
tag.extract()
text = soup.get_text(separator=" ")
text = re.sub(r"\s+", " ", text).strip()
return text
except Exception as e:
raise
# -------------------------
# Helper: extract heading tag text
# -------------------------
def extract_heading_text(soup, tag):
elements = soup.find_all(tag)
return " ".join([el.get_text(" ", strip=True) for el in elements]).strip()
# -------------------------
# Clean / normalize text
# -------------------------
def clean_text(t):
return re.sub(r"\s+", " ", t or "").strip()
# -------------------------
# Summarize (extractive)
# -------------------------
def summarize(text, num_sentences=3):
sentences = sent_tokenize(text)
if len(sentences) <= num_sentences:
return " ".join(sentences)
try:
vec = TfidfVectorizer(stop_words="english")
X = vec.fit_transform(sentences)
scores = np.array(X.sum(axis=1)).ravel()
top_idx = scores.argsort()[-num_sentences:][::-1]
top_sentences = [sentences[i] for i in sorted(top_idx)]
return " ".join(top_sentences)
except Exception:
return " ".join(sentences[:num_sentences])
# -------------------------
# Topic clustering
# -------------------------
def cluster_texts(texts, n_clusters=3):
if len(texts) == 0:
return []
if len(texts) <= 1:
return [0] * len(texts)
k = min(n_clusters, len(texts))
vec = TfidfVectorizer(stop_words="english")
X = vec.fit_transform(texts)
kmeans = KMeans(n_clusters=k, random_state=0, n_init=10)
labels = kmeans.fit_predict(X)
return labels.tolist()
# -------------------------
# Duplicate detection (cosine)
# -------------------------
def detect_duplicates(texts, threshold=0.55):
n = len(texts)
if n <= 1:
return []
vec = TfidfVectorizer(stop_words="english")
X = vec.fit_transform(texts)
sim = cosine_similarity(X)
groups = []
used = set()
for i in range(n):
if i in used:
continue
group = [i]
used.add(i)
for j in range(i + 1, n):
if sim[i, j] >= threshold:
group.append(j)
used.add(j)
if len(group) > 1:
groups.append(group)
return groups
# -------------------------
# Sentence-level change detection (exact-match)
# -------------------------
def changed_sentences(textA, textB):
sA = [s.strip() for s in sent_tokenize(textA) if s.strip()]
sB = [s.strip() for s in sent_tokenize(textB) if s.strip()]
setA = set(sA)
setB = set(sB)
changedA = [s for s in sA if s not in setB]
changedB = [s for s in sB if s not in setA]
return changedA, changedB
# -------------------------
# Return hostname helper
# -------------------------
def hostname(url):
try:
p = urlparse(url)
return p.netloc or url
except Exception:
return url
# -------------------------
# Routes
# -------------------------
@app.route("/")
def home():
# list of preselected sites (you can add/remove)
sites = {
"Indian Express": "https://indianexpress.com/",
"Times of India": "https://timesofindia.indiatimes.com/",
"NDTV": "https://www.ndtv.com/",
"BBC News": "https://www.bbc.com/news",
"CNN": "https://www.cnn.com/",
"The Hindu": "https://www.thehindu.com/",
}
return render_template("index.html", sites=sites)
@app.route("/process_urls", methods=["POST"])
def process_urls():
payload = request.get_json(force=True)
urls = payload.get("urls", []) or []
mode = payload.get("mode", "tokenize")
results = []
texts_for_clustering = []
for raw_url in urls:
url = raw_url.strip()
if not url:
continue
try:
# fetch page raw
req = Request(
url,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0 Safari/537.36"}
)
resp = urlopen(req, timeout=15)
soup = BeautifulSoup(resp.read(), "html.parser")
# choose extraction according to mode (H1..H6 or full)
if mode in ["H1", "H2", "H3", "H4", "H5", "H6"]:
tag = mode.lower()
extracted = extract_heading_text(soup, tag)
else:
# full text
for tag_rm in soup(["script", "style", "noscript", "iframe", "header", "footer"]):
tag_rm.extract()
extracted = soup.get_text(separator=" ")
extracted = clean_text(extracted)
words = []
sentences = []
if extracted:
# tokenization may throw in weird content, guard it
try:
words = word_tokenize(extracted)
except Exception:
words = extracted.split()
try:
sentences = sent_tokenize(extracted)
except Exception:
sentences = [s.strip() for s in re.split(r'(?<=[.!?]) +', extracted) if s.strip()]
summary = summarize(extracted) if extracted else ""
texts_for_clustering.append(extracted)
results.append({
"url": url,
"host": hostname(url),
"text": extracted,
"words": words,
"sentences": sentences,
"summary": summary,
})
except Exception as e:
results.append({
"url": url,
"host": hostname(url),
"text": "",
"words": [],
"sentences": [],
"summary": "",
"error": str(e)
})
# clustering
texts_only = [r.get("text", "") for r in results]
clusters = cluster_texts(texts_only, n_clusters=3) if len(texts_only) > 0 else []
# attach clusters (fill default 0 if sizes mismatch)
if len(clusters) != len(results):
clusters = [int(c) if i < len(clusters) else 0 for i, c in enumerate(range(len(results)))]
for i, r in enumerate(results):
r["cluster"] = int(clusters[i]) if i < len(clusters) else 0
# duplicate groups (convert index groups to url groups)
dup_idx_groups = detect_duplicates(texts_only, threshold=0.55)
dup_url_groups = [[results[i]["url"] for i in grp] for grp in dup_idx_groups]
return jsonify({
"articles": results,
"duplicate_groups": dup_url_groups
})
@app.route("/compare_texts", methods=["POST"])
def compare_texts_route():
data = request.get_json(force=True)
text1 = data.get("text1", "") or ""
text2 = data.get("text2", "") or ""
# compute changed sentences (exact-match)
changedA, changedB = changed_sentences(text1, text2)
# build html: show only changed sentences highlighted, and keep order from original
def highlight_html(original_text, changed_set):
sents = [s.strip() for s in sent_tokenize(original_text) if s.strip()]
pieces = []
for s in sents:
if s in changed_set:
pieces.append(f"<p class='changed'>{escape_html(s)}</p>")
return "".join(pieces)
left_html = highlight_html(text1, set(changedA))
right_html = highlight_html(text2, set(changedB))
return jsonify({"left": left_html, "right": right_html, "changedA_count": len(changedA), "changedB_count": len(changedB)})
# small helper used in templates/JS if needed
def escape_html(s):
return (s.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
.replace('"', "&quot;").replace("'", "&#39;"))
if __name__ == "__main__":
# increase default socket timeout a bit
socket.setdefaulttimeout(20)
app.run(host="0.0.0.0", port=7860, debug=False)