mdAmin313 commited on
Commit
32da598
·
verified ·
1 Parent(s): 372170e

Create _data_fetcher.py

Browse files
Files changed (1) hide show
  1. _data_fetcher.py +146 -0
_data_fetcher.py ADDED
@@ -0,0 +1,146 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import requests
3
+ import json
4
+ import base64
5
+ from typing import Dict, Any, Optional, Tuple
6
+ from bs4 import BeautifulSoup
7
+ from functools import lru_cache
8
+ import logging
9
+
10
+ from ._config import SERPAPI_KEY, SAFE_BROWSING_KEY, VIRUSTOTAL_KEY, SOURCE_TRUST
11
+ from ._utils import is_host_public, domain_from_url, sanitize_text
12
+
13
+ logger = logging.getLogger("fact_checker_fetcher")
14
+
15
+ @lru_cache(maxsize=256)
16
+ def serpapi_search(query: str, type: str, num: int = 6) -> Dict[str, Any]:
17
+ """Unified function for SerpApi web search and reverse image search."""
18
+ if not SERPAPI_KEY:
19
+ return {"available": False, "note": "SERPAPI_KEY not set"}
20
+ try:
21
+ params = {"engine": "google", "q": query, "num": num, "api_key": SERPAPI_KEY}
22
+ if type == "reverse_image":
23
+ params.pop("q")
24
+ params["image_url"] = query
25
+
26
+ r = requests.get("https://serpapi.com/search.json", params=params, timeout=12)
27
+ r.raise_for_status()
28
+ return {"available": True, "result": r.json()}
29
+ except Exception as e:
30
+ logger.exception(f"SerpApi {type} failed")
31
+ return {"available": True, "error": str(e)}
32
+
33
+ def google_safe_browsing_check(url: str) -> dict:
34
+ """Checks URL against Google Safe Browsing API."""
35
+ if not SAFE_BROWSING_KEY: return {"safe": None, "error": "API Key Missing"}
36
+ try:
37
+ endpoint = f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_KEY}"
38
+ body = {
39
+ "client": {"clientId": "newsorchestra", "clientVersion": "1.0"},
40
+ "threatInfo": {
41
+ "threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION"],
42
+ "platformTypes": ["ANY_PLATFORM"], "threatEntryTypes": ["URL"], "threatEntries": [{"url": url}]
43
+ }
44
+ }
45
+ r = requests.post(endpoint, json=body, timeout=10)
46
+ r.raise_for_status()
47
+ data = r.json()
48
+ return {"safe": "matches" not in data, "matches": data.get("matches", [])}
49
+ except Exception as e:
50
+ return {"safe": None, "error": str(e)}
51
+
52
+ def virustotal_url_check(url: str) -> dict:
53
+ """Checks URL against VirusTotal API."""
54
+ if not VIRUSTOTAL_KEY: return {"safe": None, "error": "API Key Missing"}
55
+ try:
56
+ headers = {"x-apikey": VIRUSTOTAL_KEY}
57
+ url_id = base64.urlsafe_b64encode(url.encode()).decode().strip("=")
58
+ vt_url = f"https://www.virustotal.com/api/v3/urls/{url_id}"
59
+ r = requests.get(vt_url, headers=headers, timeout=15)
60
+
61
+ # If not found, submit for analysis
62
+ if r.status_code == 404:
63
+ requests.post("https://www.virustotal.com/api/v3/urls", headers=headers, data={"url": url}, timeout=15)
64
+ return {"safe": None, "submitted": True}
65
+
66
+ r.raise_for_status()
67
+ stats = r.json().get("data", {}).get("attributes", {}).get("last_analysis_stats", {})
68
+ malicious = stats.get("malicious", 0)
69
+ suspicious = stats.get("suspicious", 0)
70
+ return {"safe": malicious == 0 and suspicious == 0, "malicious_votes": malicious, "suspicious_votes": suspicious}
71
+ except Exception as e:
72
+ return {"safe": None, "error": str(e)}
73
+
74
+ def phishing_checks(url: str) -> dict:
75
+ """Combines GSB and VirusTotal checks."""
76
+ if not url: return {}
77
+ out = {"url": url}
78
+ out["safe_browsing"] = google_safe_browsing_check(url)
79
+ out["virustotal"] = virustotal_url_check(url)
80
+ return out
81
+
82
+ def fetch_article_text_from_url(url: str) -> Tuple[str, str]:
83
+ """Fetches and extracts the main article text and headline from a URL."""
84
+ if not is_host_public(url):
85
+ logger.warning("Blocked fetch_article_text_from_url for private host: %s", url)
86
+ return "", ""
87
+ try:
88
+ r = requests.get(url, timeout=10, headers={"User-Agent": "newsorchestra/1.0"})
89
+ r.raise_for_status()
90
+ soup = BeautifulSoup(r.text, "html.parser")
91
+
92
+ # 1. Try JSON-LD/Schema.org extraction first
93
+ scripts = soup.find_all("script", type="application/ld+json")
94
+ for s in scripts:
95
+ try:
96
+ parsed = json.loads(s.string or s.get_text())
97
+ items = parsed if isinstance(parsed, list) else [parsed]
98
+ for item in items:
99
+ if isinstance(item, dict) and item.get("@type") in ("NewsArticle", "Article"):
100
+ headline = item.get("headline") or item.get("name") or ""
101
+ body = item.get("articleBody") or item.get("description") or ""
102
+ if isinstance(body, list): body = " ".join([str(x) for x in body if x])
103
+ if body: return sanitize_text(str(body)), sanitize_text(str(headline) or "")
104
+ except Exception: continue
105
+
106
+ # 2. Fallback to general HTML parsing
107
+ article_tag = soup.find("article")
108
+ if article_tag:
109
+ paras = [p.get_text(" ", strip=True) for p in article_tag.find_all("p")]
110
+ else:
111
+ main = soup.find("main") or soup.find(id="main") or soup
112
+ paras = [p.get_text(" ", strip=True) for p in main.find_all("p")]
113
+ article_text = "\n\n".join([p for p in paras if len(p) > 40])
114
+ headline = soup.title.get_text(strip=True) if soup.title else ""
115
+
116
+ return article_text or "", headline or ""
117
+ except Exception:
118
+ logger.exception("fetch_article_text_from_url failed")
119
+ return "", ""
120
+
121
+ def aggregate_search_results(serpapi_result: dict) -> Dict[str, Any]:
122
+ """Analyzes SERP results for domain trust and counts."""
123
+ from collections import Counter # local import for this function
124
+ if not serpapi_result or not serpapi_result.get("available") or not serpapi_result.get("result"):
125
+ return {"evidence": [], "consensus": {"top_trust_avg": 0.5, "top_domains": {}}}
126
+
127
+ organic = serpapi_result["result"].get("organic_results", []) or []
128
+ evidence = []
129
+ domains = Counter()
130
+ for r in organic[:12]:
131
+ link = r.get("link") or r.get("displayed_link") or ""
132
+ domain = domain_from_url(link)
133
+ trust = SOURCE_TRUST.get(domain, 0.6)
134
+ evidence.append({
135
+ "title": r.get("title", ""), "snippet": r.get("snippet", ""), "link": link,
136
+ "domain": domain, "trust": round(trust, 2)
137
+ })
138
+ if domain: domains[domain] += 1
139
+
140
+ top3 = evidence[:3]
141
+ top_trust_avg = sum([e["trust"] for e in top3]) / len(top3) if top3 else 0.5
142
+
143
+ return {
144
+ "evidence": evidence,
145
+ "consensus": {"top_trust_avg": round(top_trust_avg, 2), "top_domains": dict(domains.most_common(5))},
146
+ }