Spaces:
Running
Running
| """ | |
| All the process related to data collection and filtering | |
| will happen here. | |
| """ | |
| import trafilatura, json, tldextract | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from similarity import ModelFunctions | |
| # ββ Standardised response builders ββββββββββββββββββββββββββββββββββββββββ # | |
| def _ok(data: dict) -> str: | |
| """Successful result with matched articles.""" | |
| return {"status": "success", "results": data} | |
| def _no_match(fetched: int, scored: int, threshold: float) -> str: | |
| """Articles were fetched and scored, but none crossed the threshold.""" | |
| return { | |
| "status": "no_match", | |
| "reason": ( | |
| f"Fetched {fetched} article(s) and scored {scored}, " | |
| f"but none had a similarity score >= {threshold}. " | |
| "The available content may not be directly related to the input topic." | |
| ) | |
| } | |
| def _no_content(total_links: int, failed: int) -> str: | |
| """Every URL either failed to fetch or returned no extractable text.""" | |
| return { | |
| "status": "error", | |
| "error": ( | |
| f"Could not retrieve content from any of the {total_links} link(s). " | |
| f"{failed} link(s) failed during fetch/extraction. " | |
| "Possible causes: paywalls, network timeouts, bot-blocking, or invalid URLs." | |
| ) | |
| } | |
| def _bad_input(detail: str) -> str: | |
| """Caller passed invalid arguments.""" | |
| return {"status": "error", "error": f"Invalid input β {detail}"} | |
| def _internal_error(context: str, exc: Exception) -> str: | |
| """Unexpected exception in a named context.""" | |
| return { | |
| "status": "error", | |
| "error": f"Unexpected failure in [{context}]: {type(exc).__name__}: {exc}" | |
| } | |
| # Similarity threshold | |
| SIMILARITY_THRESHOLD = 0.4 | |
| class DataCollector(): | |
| def __init__(self, ModelFunctionsObj): | |
| self.object = ModelFunctionsObj | |
| # ------------------------------------------------------------------ # | |
| # Fetches a single URL β returns (link, text, error_reason) # | |
| # error_reason is None on success, a short string on failure # | |
| # ------------------------------------------------------------------ # | |
| def _fetch_one(self, link: str) -> tuple: | |
| try: | |
| html = trafilatura.fetch_url(link) | |
| if not html: | |
| return link, None, "no HTML returned (possible bot-block or empty page)" | |
| text = trafilatura.extract(html) | |
| if not text: | |
| return link, None, "HTML fetched but no text could be extracted" | |
| return link, text, None | |
| except Exception as e: | |
| return link, None, f"{type(e).__name__}: {e}" | |
| # ------------------------------------------------------------------ # | |
| # Parallel fetch + batch similarity # | |
| # ------------------------------------------------------------------ # | |
| def retriever(self, OriginalContent: str, links: list) -> str: | |
| # validate inputs | |
| if not isinstance(OriginalContent, str) or not OriginalContent.strip(): | |
| return _bad_input("OriginalContent must be a non-empty string.") | |
| if not isinstance(links, list) or not links: | |
| return _bad_input("links must be a non-empty list of URL strings.") | |
| try: | |
| # Step 1: Parallel fetch | |
| fetched = {} # link -> raw text | |
| fetch_failures = [] # track failures for diagnostics | |
| with ThreadPoolExecutor(max_workers=20) as executor: | |
| futures = {executor.submit(self._fetch_one, link): link for link in links} | |
| for future in as_completed(futures): | |
| link, text, reason = future.result() | |
| if text: | |
| fetched[link] = text | |
| else: | |
| fetch_failures.append(f"{link} β {reason}") | |
| # Log which URLs failed | |
| if fetch_failures: | |
| print(f"[DataRetrieval] {len(fetch_failures)}/{len(links)} link(s) failed:") | |
| for f in fetch_failures: | |
| print(f" β {f}") | |
| # Zero articles retrieved β no point going further | |
| if not fetched: | |
| return _no_content(len(links), len(fetch_failures)) | |
| # ββ Step 2: Extract titles βββββββββββββββββββββββββββββββββββ | |
| valid_links = [] | |
| valid_titles = [] | |
| valid_texts = [] | |
| for link, text in fetched.items(): | |
| try: | |
| title = text.strip().split(".")[0].lower() | |
| except (AttributeError, IndexError): | |
| title = "" # empty string still gets scored, just poorly | |
| valid_links.append(link) | |
| valid_titles.append(title) | |
| valid_texts.append(text) | |
| # ββ Step 3: Single batch similarity pass βββββββββββββββββββββ | |
| try: | |
| scores = self.object.BatchSimilarityScores(OriginalContent, valid_titles) | |
| except Exception as e: | |
| return _internal_error("BatchSimilarityScores", e) | |
| # ββ Step 4: Filter by threshold ββββββββββββββββββββββββββββββ | |
| data = {} | |
| for link, text, score in zip(valid_links, valid_texts, scores): | |
| # print(f"[Score] {score:.4f} {link}") # only for testing dev | |
| if score >= SIMILARITY_THRESHOLD: | |
| try: | |
| data[f"searchresult{len(data) + 1}"] = { | |
| "organization": tldextract.extract(link).domain, | |
| "score": score, | |
| "article": text | |
| } | |
| except Exception as e: | |
| print(f"[DataRetrieval] Could not save result for {link}: {e} β skipping.") | |
| continue | |
| # ββ Step 5: Return with clear status βββββββββββββββββββββββββ | |
| if not data: | |
| return _no_match( | |
| fetched=len(fetched), | |
| scored=len(valid_titles), | |
| threshold=SIMILARITY_THRESHOLD | |
| ) | |
| return _ok(data) | |
| except Exception as e: | |
| return _internal_error("retriever main block", e) | |
| # ------------------------------------------------------------------ # | |
| # top_results β handles both old bare-dict and new status-wrapped fmt # | |
| # ------------------------------------------------------------------ # | |
| def top_results(self, data, num_of_articals: int = 2): | |
| try: | |
| if isinstance(data, str): | |
| try: | |
| data = json.loads(data) | |
| except json.JSONDecodeError as e: | |
| print(f"[top_results] Failed to parse JSON input: {e}") | |
| return None | |
| # Unwrap new response format if present | |
| if isinstance(data, dict) and "results" in data: | |
| data = data["results"] | |
| if not isinstance(data, dict) or not data: | |
| print("[top_results] Invalid or empty data β nothing to sort.") | |
| return None | |
| sorted_items = sorted( | |
| data.items(), | |
| key=lambda item: item[1]["score"], | |
| reverse=True | |
| ) | |
| num_of_articals = min(num_of_articals, len(sorted_items)) | |
| top_n = sorted_items[:num_of_articals] | |
| result = {} | |
| for i, (_, value) in enumerate(top_n, start=1): | |
| result[f"searchresult{i}"] = value | |
| return result | |
| except Exception as e: | |
| print(f"[top_results] Unexpected error: {e}") | |
| return None | |
| # ββ Standalone helper: fetch and parse a single user-supplied article βββββββ # | |
| def get_user_article(user_link: str) -> dict: | |
| if not isinstance(user_link, str) or not user_link.strip(): | |
| return {"status": "error", "error": "Invalid or empty URL provided."} | |
| try: | |
| try: | |
| html = trafilatura.fetch_url(user_link) | |
| article_text = trafilatura.extract(html) if html else None | |
| except Exception as e: | |
| msg = f"Network or extraction failure: {type(e).__name__}: {e}" | |
| print(f"[get_user_article] {msg}") | |
| return {"status": "error", "error": msg} | |
| if not article_text: | |
| return { | |
| "status": "error", | |
| "error": ( | |
| "Could not extract readable text from the provided URL. " | |
| "The page may be paywalled, JavaScript-rendered, or block scrapers." | |
| ) | |
| } | |
| try: | |
| title = article_text.strip().split(".")[0].lower() | |
| except (AttributeError, IndexError, TypeError, ValueError): | |
| title = None | |
| try: | |
| organization = tldextract.extract(user_link).domain | |
| except (AttributeError, TypeError, ValueError): | |
| organization = None | |
| return { | |
| "status": "success", | |
| "organization": organization, | |
| "title": title, | |
| "article": article_text | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "error": f"Unexpected failure in get_user_article: {type(e).__name__}: {e}" | |
| } |