Spaces:
Sleeping
Sleeping
File size: 9,716 Bytes
540b123 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 | """
All the process related to data collection and filtering
will happen here.
"""
import trafilatura, json, tldextract
from concurrent.futures import ThreadPoolExecutor, as_completed
from similarity import ModelFunctions
# ββ Standardised response builders ββββββββββββββββββββββββββββββββββββββββ #
def _ok(data: dict) -> str:
"""Successful result with matched articles."""
return {"status": "success", "results": data}
def _no_match(fetched: int, scored: int, threshold: float) -> str:
"""Articles were fetched and scored, but none crossed the threshold."""
return {
"status": "no_match",
"reason": (
f"Fetched {fetched} article(s) and scored {scored}, "
f"but none had a similarity score >= {threshold}. "
"The available content may not be directly related to the input topic."
)
}
def _no_content(total_links: int, failed: int) -> str:
"""Every URL either failed to fetch or returned no extractable text."""
return {
"status": "error",
"error": (
f"Could not retrieve content from any of the {total_links} link(s). "
f"{failed} link(s) failed during fetch/extraction. "
"Possible causes: paywalls, network timeouts, bot-blocking, or invalid URLs."
)
}
def _bad_input(detail: str) -> str:
"""Caller passed invalid arguments."""
return {"status": "error", "error": f"Invalid input β {detail}"}
def _internal_error(context: str, exc: Exception) -> str:
"""Unexpected exception in a named context."""
return {
"status": "error",
"error": f"Unexpected failure in [{context}]: {type(exc).__name__}: {exc}"
}
# Similarity threshold
SIMILARITY_THRESHOLD = 0.4
class DataCollector():
def __init__(self, ModelFunctionsObj):
self.object = ModelFunctionsObj
# ------------------------------------------------------------------ #
# Fetches a single URL β returns (link, text, error_reason) #
# error_reason is None on success, a short string on failure #
# ------------------------------------------------------------------ #
def _fetch_one(self, link: str) -> tuple:
try:
html = trafilatura.fetch_url(link)
if not html:
return link, None, "no HTML returned (possible bot-block or empty page)"
text = trafilatura.extract(html)
if not text:
return link, None, "HTML fetched but no text could be extracted"
return link, text, None
except Exception as e:
return link, None, f"{type(e).__name__}: {e}"
# ------------------------------------------------------------------ #
# Parallel fetch + batch similarity #
# ------------------------------------------------------------------ #
def retriever(self, OriginalContent: str, links: list) -> str:
# validate inputs
if not isinstance(OriginalContent, str) or not OriginalContent.strip():
return _bad_input("OriginalContent must be a non-empty string.")
if not isinstance(links, list) or not links:
return _bad_input("links must be a non-empty list of URL strings.")
try:
# Step 1: Parallel fetch
fetched = {} # link -> raw text
fetch_failures = [] # track failures for diagnostics
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(self._fetch_one, link): link for link in links}
for future in as_completed(futures):
link, text, reason = future.result()
if text:
fetched[link] = text
else:
fetch_failures.append(f"{link} β {reason}")
# Log which URLs failed
if fetch_failures:
print(f"[DataRetrieval] {len(fetch_failures)}/{len(links)} link(s) failed:")
for f in fetch_failures:
print(f" β {f}")
# Zero articles retrieved β no point going further
if not fetched:
return _no_content(len(links), len(fetch_failures))
# ββ Step 2: Extract titles βββββββββββββββββββββββββββββββββββ
valid_links = []
valid_titles = []
valid_texts = []
for link, text in fetched.items():
try:
title = text.strip().split(".")[0].lower()
except (AttributeError, IndexError):
title = "" # empty string still gets scored, just poorly
valid_links.append(link)
valid_titles.append(title)
valid_texts.append(text)
# ββ Step 3: Single batch similarity pass βββββββββββββββββββββ
try:
scores = self.object.BatchSimilarityScores(OriginalContent, valid_titles)
except Exception as e:
return _internal_error("BatchSimilarityScores", e)
# ββ Step 4: Filter by threshold ββββββββββββββββββββββββββββββ
data = {}
for link, text, score in zip(valid_links, valid_texts, scores):
# print(f"[Score] {score:.4f} {link}") # only for testing dev
if score >= SIMILARITY_THRESHOLD:
try:
data[f"searchresult{len(data) + 1}"] = {
"organization": tldextract.extract(link).domain,
"score": score,
"article": text
}
except Exception as e:
print(f"[DataRetrieval] Could not save result for {link}: {e} β skipping.")
continue
# ββ Step 5: Return with clear status βββββββββββββββββββββββββ
if not data:
return _no_match(
fetched=len(fetched),
scored=len(valid_titles),
threshold=SIMILARITY_THRESHOLD
)
return _ok(data)
except Exception as e:
return _internal_error("retriever main block", e)
# ------------------------------------------------------------------ #
# top_results β handles both old bare-dict and new status-wrapped fmt #
# ------------------------------------------------------------------ #
def top_results(self, data, num_of_articals: int = 2):
try:
if isinstance(data, str):
try:
data = json.loads(data)
except json.JSONDecodeError as e:
print(f"[top_results] Failed to parse JSON input: {e}")
return None
# Unwrap new response format if present
if isinstance(data, dict) and "results" in data:
data = data["results"]
if not isinstance(data, dict) or not data:
print("[top_results] Invalid or empty data β nothing to sort.")
return None
sorted_items = sorted(
data.items(),
key=lambda item: item[1]["score"],
reverse=True
)
num_of_articals = min(num_of_articals, len(sorted_items))
top_n = sorted_items[:num_of_articals]
result = {}
for i, (_, value) in enumerate(top_n, start=1):
result[f"searchresult{i}"] = value
return result
except Exception as e:
print(f"[top_results] Unexpected error: {e}")
return None
# ββ Standalone helper: fetch and parse a single user-supplied article βββββββ #
def get_user_article(user_link: str) -> dict:
if not isinstance(user_link, str) or not user_link.strip():
return {"status": "error", "error": "Invalid or empty URL provided."}
try:
try:
html = trafilatura.fetch_url(user_link)
article_text = trafilatura.extract(html) if html else None
except Exception as e:
msg = f"Network or extraction failure: {type(e).__name__}: {e}"
print(f"[get_user_article] {msg}")
return {"status": "error", "error": msg}
if not article_text:
return {
"status": "error",
"error": (
"Could not extract readable text from the provided URL. "
"The page may be paywalled, JavaScript-rendered, or block scrapers."
)
}
try:
title = article_text.strip().split(".")[0].lower()
except (AttributeError, IndexError, TypeError, ValueError):
title = None
try:
organization = tldextract.extract(user_link).domain
except (AttributeError, TypeError, ValueError):
organization = None
return {
"status": "success",
"organization": organization,
"title": title,
"article": article_text
}
except Exception as e:
return {
"status": "error",
"error": f"Unexpected failure in get_user_article: {type(e).__name__}: {e}"
} |