SUMA / utils.py
brotoo's picture
Upload 7 files
d7f53b3 verified
import logging
import os
import re
import tempfile
from typing import List
from urllib.parse import urlparse
import requests
from bs4 import BeautifulSoup
from readability import Document
from newspaper import Article
ALLOWED_DOMAINS = {
"cnn.com",
"www.cnn.com",
"edition.cnn.com",
"nbcnews.com",
"www.nbcnews.com",
"bbc.com",
"www.bbc.com",
"bbc.co.uk",
"www.bbc.co.uk",
}
def is_valid_news_url(url: str) -> bool:
try:
parsed = urlparse(url)
return parsed.scheme in {"http", "https"} and parsed.netloc.lower() in ALLOWED_DOMAINS
except Exception:
logging.exception("URL validation failed for %s", url)
return False
def clean_html(raw_html: str) -> str:
soup = BeautifulSoup(raw_html or "", "html.parser")
for tag in soup(["script", "style", "noscript"]):
tag.extract()
text = soup.get_text(" ", strip=True)
return clean_text(text)
def clean_text(text: str) -> str:
if not text:
return ""
text = re.sub(r"\s+", " ", text)
return text.strip()
def extract_article_content(url: str) -> str:
article_text = ""
try:
article = Article(url)
article.download()
article.parse()
article_text = clean_text(article.text)
logging.info("Article scraped via newspaper3k")
except Exception:
logging.exception("Primary article scrape failed, falling back to readability/BeautifulSoup")
if article_text:
return article_text
try:
headers = {"User-Agent": "Mozilla/5.0"}
response = requests.get(url, timeout=12, headers=headers)
response.raise_for_status()
html = response.text
document = Document(html)
article_text = clean_html(document.summary())
if not article_text:
soup = BeautifulSoup(html, "html.parser")
paragraphs = [p.get_text(" ", strip=True) for p in soup.find_all("p")]
article_text = clean_text(" ".join(paragraphs))
except Exception:
logging.exception("Fallback scraping failed")
return article_text
def chunk_text(text: str, max_words: int = 800) -> List[str]:
words = text.split()
if not words:
return []
chunks: List[str] = []
for i in range(0, len(words), max_words):
chunks.append(" ".join(words[i : i + max_words]))
return chunks
def summarize_text(text: str, summarizer) -> str:
chunks = chunk_text(text)
if not chunks:
return ""
partial_summaries: List[str] = []
for chunk in chunks:
try:
summary = summarizer(
chunk,
max_length=300,
min_length=120,
do_sample=False,
truncation=True,
)[0]["summary_text"]
partial_summaries.append(clean_text(summary))
except Exception:
logging.exception("Summarization failed for chunk")
merged = clean_text(" ".join(partial_summaries))
if not merged:
return ""
if len(partial_summaries) == 1:
return merged
try:
final_summary = summarizer(
merged,
max_length=300,
min_length=120,
do_sample=False,
truncation=True,
)[0]["summary_text"]
return clean_text(final_summary)
except Exception:
logging.exception("Final summarization merge failed")
return merged
def find_first_wav(path: str) -> str:
if os.path.isfile(path) and path.lower().endswith(".wav"):
return path
if os.path.isdir(path):
for entry in os.listdir(path):
candidate = os.path.join(path, entry)
if os.path.isfile(candidate) and candidate.lower().endswith(".wav"):
return candidate
return ""
def temp_audio_path() -> str:
directory = tempfile.mkdtemp(prefix="yt_audio_")
return os.path.join(directory, "audio.%(ext)s")