FactCheckAI / dataretrieval.py
vishwjeet71's picture
Initial deployment
540b123
"""
All the process related to data collection and filtering
will happen here.
"""
import trafilatura, json, tldextract
from concurrent.futures import ThreadPoolExecutor, as_completed
from similarity import ModelFunctions
# ── Standardised response builders ──────────────────────────────────────── #
def _ok(data: dict) -> str:
"""Successful result with matched articles."""
return {"status": "success", "results": data}
def _no_match(fetched: int, scored: int, threshold: float) -> str:
"""Articles were fetched and scored, but none crossed the threshold."""
return {
"status": "no_match",
"reason": (
f"Fetched {fetched} article(s) and scored {scored}, "
f"but none had a similarity score >= {threshold}. "
"The available content may not be directly related to the input topic."
)
}
def _no_content(total_links: int, failed: int) -> str:
"""Every URL either failed to fetch or returned no extractable text."""
return {
"status": "error",
"error": (
f"Could not retrieve content from any of the {total_links} link(s). "
f"{failed} link(s) failed during fetch/extraction. "
"Possible causes: paywalls, network timeouts, bot-blocking, or invalid URLs."
)
}
def _bad_input(detail: str) -> str:
"""Caller passed invalid arguments."""
return {"status": "error", "error": f"Invalid input β€” {detail}"}
def _internal_error(context: str, exc: Exception) -> str:
"""Unexpected exception in a named context."""
return {
"status": "error",
"error": f"Unexpected failure in [{context}]: {type(exc).__name__}: {exc}"
}
# Similarity threshold
SIMILARITY_THRESHOLD = 0.4
class DataCollector():
def __init__(self, ModelFunctionsObj):
self.object = ModelFunctionsObj
# ------------------------------------------------------------------ #
# Fetches a single URL β€” returns (link, text, error_reason) #
# error_reason is None on success, a short string on failure #
# ------------------------------------------------------------------ #
def _fetch_one(self, link: str) -> tuple:
try:
html = trafilatura.fetch_url(link)
if not html:
return link, None, "no HTML returned (possible bot-block or empty page)"
text = trafilatura.extract(html)
if not text:
return link, None, "HTML fetched but no text could be extracted"
return link, text, None
except Exception as e:
return link, None, f"{type(e).__name__}: {e}"
# ------------------------------------------------------------------ #
# Parallel fetch + batch similarity #
# ------------------------------------------------------------------ #
def retriever(self, OriginalContent: str, links: list) -> str:
# validate inputs
if not isinstance(OriginalContent, str) or not OriginalContent.strip():
return _bad_input("OriginalContent must be a non-empty string.")
if not isinstance(links, list) or not links:
return _bad_input("links must be a non-empty list of URL strings.")
try:
# Step 1: Parallel fetch
fetched = {} # link -> raw text
fetch_failures = [] # track failures for diagnostics
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(self._fetch_one, link): link for link in links}
for future in as_completed(futures):
link, text, reason = future.result()
if text:
fetched[link] = text
else:
fetch_failures.append(f"{link} β†’ {reason}")
# Log which URLs failed
if fetch_failures:
print(f"[DataRetrieval] {len(fetch_failures)}/{len(links)} link(s) failed:")
for f in fetch_failures:
print(f" βœ— {f}")
# Zero articles retrieved β€” no point going further
if not fetched:
return _no_content(len(links), len(fetch_failures))
# ── Step 2: Extract titles ───────────────────────────────────
valid_links = []
valid_titles = []
valid_texts = []
for link, text in fetched.items():
try:
title = text.strip().split(".")[0].lower()
except (AttributeError, IndexError):
title = "" # empty string still gets scored, just poorly
valid_links.append(link)
valid_titles.append(title)
valid_texts.append(text)
# ── Step 3: Single batch similarity pass ─────────────────────
try:
scores = self.object.BatchSimilarityScores(OriginalContent, valid_titles)
except Exception as e:
return _internal_error("BatchSimilarityScores", e)
# ── Step 4: Filter by threshold ──────────────────────────────
data = {}
for link, text, score in zip(valid_links, valid_texts, scores):
# print(f"[Score] {score:.4f} {link}") # only for testing dev
if score >= SIMILARITY_THRESHOLD:
try:
data[f"searchresult{len(data) + 1}"] = {
"organization": tldextract.extract(link).domain,
"score": score,
"article": text
}
except Exception as e:
print(f"[DataRetrieval] Could not save result for {link}: {e} β€” skipping.")
continue
# ── Step 5: Return with clear status ─────────────────────────
if not data:
return _no_match(
fetched=len(fetched),
scored=len(valid_titles),
threshold=SIMILARITY_THRESHOLD
)
return _ok(data)
except Exception as e:
return _internal_error("retriever main block", e)
# ------------------------------------------------------------------ #
# top_results β€” handles both old bare-dict and new status-wrapped fmt #
# ------------------------------------------------------------------ #
def top_results(self, data, num_of_articals: int = 2):
try:
if isinstance(data, str):
try:
data = json.loads(data)
except json.JSONDecodeError as e:
print(f"[top_results] Failed to parse JSON input: {e}")
return None
# Unwrap new response format if present
if isinstance(data, dict) and "results" in data:
data = data["results"]
if not isinstance(data, dict) or not data:
print("[top_results] Invalid or empty data β€” nothing to sort.")
return None
sorted_items = sorted(
data.items(),
key=lambda item: item[1]["score"],
reverse=True
)
num_of_articals = min(num_of_articals, len(sorted_items))
top_n = sorted_items[:num_of_articals]
result = {}
for i, (_, value) in enumerate(top_n, start=1):
result[f"searchresult{i}"] = value
return result
except Exception as e:
print(f"[top_results] Unexpected error: {e}")
return None
# ── Standalone helper: fetch and parse a single user-supplied article ─────── #
def get_user_article(user_link: str) -> dict:
if not isinstance(user_link, str) or not user_link.strip():
return {"status": "error", "error": "Invalid or empty URL provided."}
try:
try:
html = trafilatura.fetch_url(user_link)
article_text = trafilatura.extract(html) if html else None
except Exception as e:
msg = f"Network or extraction failure: {type(e).__name__}: {e}"
print(f"[get_user_article] {msg}")
return {"status": "error", "error": msg}
if not article_text:
return {
"status": "error",
"error": (
"Could not extract readable text from the provided URL. "
"The page may be paywalled, JavaScript-rendered, or block scrapers."
)
}
try:
title = article_text.strip().split(".")[0].lower()
except (AttributeError, IndexError, TypeError, ValueError):
title = None
try:
organization = tldextract.extract(user_link).domain
except (AttributeError, TypeError, ValueError):
organization = None
return {
"status": "success",
"organization": organization,
"title": title,
"article": article_text
}
except Exception as e:
return {
"status": "error",
"error": f"Unexpected failure in get_user_article: {type(e).__name__}: {e}"
}