Spaces:
Sleeping
Sleeping
| """ | |
| PhishLens Threat Intelligence API Module. | |
| Integrates four external threat intelligence APIs to enrich URL and IP | |
| analysis with community-sourced reputation data: | |
| 1. VirusTotal — 70+ AV engine votes on URL/domain maliciousness | |
| 2. Google Safe Browsing — Chrome-level phishing/malware database | |
| 3. AbuseIPDB — Sender IP reputation from community reports | |
| 4. URLScan.io — Live page scan with visual phishing detection | |
| 5. URLhaus (no key) — abuse.ch malicious URL database | |
| All API calls use strict timeouts, fallback to safe defaults on failure, | |
| and are designed to be called asynchronously for batch processing. | |
| Security rationale: Combining multiple independent threat feeds using | |
| different detection methodologies (ML-based, signature-based, behavioural) | |
| creates a consensus signal that is extremely hard for attackers to evade — | |
| they would need to remain undetected across all five intelligence sources | |
| simultaneously. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import base64 | |
| import os | |
| from typing import Dict, List, Optional | |
| import aiohttp | |
| import requests | |
| from src.utils.config import API_ENDPOINTS, NETWORK_TIMEOUT | |
| from src.utils.logger import get_logger | |
| log = get_logger(__name__) | |
| # API keys loaded lazily at call time (dotenv may be loaded after module import) | |
| def _vt_key(): return os.getenv("VIRUSTOTAL_API_KEY", "") | |
| def _gsb_key(): return os.getenv("GOOGLE_SAFE_BROWSING_API_KEY", "") | |
| def _abuse_key(): return os.getenv("ABUSEIPDB_API_KEY", "") | |
| def _urlscan_key(): return os.getenv("URLSCAN_API_KEY", "") | |
| def _ipqs_key(): return os.getenv("IPQS_API_KEY", "") | |
| _IPQS_EMAIL_URL = "https://ipqualityscore.com/api/json/email/{key}/{email}" | |
| _IPQS_URL_URL = "https://ipqualityscore.com/api/json/url/{key}/{url}" | |
| _IPQS_IP_URL = "https://ipqualityscore.com/api/json/ip/{key}/{ip}" | |
| # --------------------------------------------------------------------------- | |
| # VirusTotal | |
| # --------------------------------------------------------------------------- | |
| def query_virustotal(url: str, timeout: int = NETWORK_TIMEOUT) -> Dict: | |
| """Query VirusTotal API v3 for URL reputation. | |
| Security rationale: VirusTotal aggregates 70+ independent AV/security | |
| vendor verdicts. Even a single malicious vote on a URL inside an email | |
| is a significant risk indicator — false positives from VT are rare. | |
| Args: | |
| url: URL string to query. | |
| timeout: Request timeout in seconds. | |
| Returns: | |
| Dict with vt_malicious, vt_suspicious, vt_clean, vt_reputation. | |
| Returns -1 values on API failure. | |
| """ | |
| if not _vt_key(): | |
| log.debug("VirusTotal API key not configured — skipping VT lookup.") | |
| return _default_vt_features() | |
| try: | |
| url_id = base64.urlsafe_b64encode(url.encode()).decode().strip("=") | |
| headers = {"x-apikey": _vt_key()} | |
| resp = requests.get( | |
| API_ENDPOINTS["virustotal_url"].format(url_id=url_id), | |
| headers=headers, | |
| timeout=timeout, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json().get("data", {}) | |
| attrs = data.get("attributes", {}) | |
| stats = attrs.get("last_analysis_stats", {}) | |
| return { | |
| "vt_malicious": stats.get("malicious", 0), | |
| "vt_suspicious": stats.get("suspicious", 0), | |
| "vt_clean": stats.get("undetected", 0), | |
| "vt_reputation": attrs.get("reputation", 0), | |
| } | |
| elif resp.status_code == 404: | |
| # URL not in VT database — submit for analysis (async, don't wait) | |
| _submit_url_to_virustotal(url) | |
| return _default_vt_features() | |
| else: | |
| log.debug(f"VirusTotal API returned {resp.status_code} for '{url[:80]}'") | |
| except requests.Timeout: | |
| log.debug(f"VirusTotal timeout for '{url[:80]}'") | |
| except Exception as exc: | |
| log.debug(f"VirusTotal error for '{url[:80]}': {exc}") | |
| return _default_vt_features() | |
| def _submit_url_to_virustotal(url: str) -> None: | |
| """Submit a new URL to VirusTotal for analysis (fire-and-forget).""" | |
| if not _vt_key(): | |
| return | |
| try: | |
| headers = {"x-apikey": _vt_key(), "content-type": "application/x-www-form-urlencoded"} | |
| requests.post( | |
| API_ENDPOINTS["virustotal_submit"], | |
| headers=headers, | |
| data={"url": url}, | |
| timeout=2, | |
| ) | |
| except Exception: | |
| pass # Best-effort submission; failure is acceptable | |
| def _default_vt_features() -> Dict: | |
| return {"vt_malicious": -1, "vt_suspicious": -1, "vt_clean": -1, "vt_reputation": 0} | |
| def query_virustotal_domain(domain: str, timeout: int = NETWORK_TIMEOUT) -> Dict: | |
| """Query VirusTotal API v3 for domain reputation.""" | |
| if not _vt_key() or not domain: | |
| return _default_vt_features() | |
| try: | |
| headers = {"x-apikey": _vt_key()} | |
| resp = requests.get( | |
| f"https://www.virustotal.com/api/v3/domains/{domain}", | |
| headers=headers, | |
| timeout=timeout, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json().get("data", {}) | |
| attrs = data.get("attributes", {}) | |
| stats = attrs.get("last_analysis_stats", {}) | |
| return { | |
| "vt_malicious": stats.get("malicious", 0), | |
| "vt_suspicious": stats.get("suspicious", 0), | |
| "vt_clean": stats.get("undetected", 0), | |
| "vt_reputation": attrs.get("reputation", 0), | |
| } | |
| log.debug(f"VirusTotal domain API returned {resp.status_code} for '{domain}'") | |
| except requests.Timeout: | |
| log.debug(f"VirusTotal domain timeout for '{domain}'") | |
| except Exception as exc: | |
| log.debug(f"VirusTotal domain error for '{domain}': {exc}") | |
| return _default_vt_features() | |
| # --------------------------------------------------------------------------- | |
| # Google Safe Browsing | |
| # --------------------------------------------------------------------------- | |
| def query_google_safe_browsing(urls: List[str], timeout: int = NETWORK_TIMEOUT) -> Dict: | |
| """Query Google Safe Browsing API v4 for a batch of URLs. | |
| Security rationale: Google Safe Browsing is the same database that powers | |
| Chrome's phishing warnings — used by 3+ billion users. When GSB flags a URL, | |
| it has been confirmed phishing by Google's threat analysis team. This is | |
| among the most reliable threat signals available at no cost. | |
| Args: | |
| urls: List of URL strings to check (up to 500 per call). | |
| timeout: Request timeout in seconds. | |
| Returns: | |
| Dict with gsb_is_flagged (1 if any URL matches), gsb_threat_count. | |
| """ | |
| if not _gsb_key(): | |
| log.debug("Google Safe Browsing API key not configured — skipping GSB check.") | |
| return {"gsb_is_flagged": -1, "gsb_threat_count": -1} | |
| if not urls: | |
| return {"gsb_is_flagged": 0, "gsb_threat_count": 0} | |
| payload = { | |
| "client": {"clientId": "PhishLens", "clientVersion": "2.0"}, | |
| "threatInfo": { | |
| "threatTypes": [ | |
| "MALWARE", | |
| "SOCIAL_ENGINEERING", | |
| "UNWANTED_SOFTWARE", | |
| "POTENTIALLY_HARMFUL_APPLICATION", | |
| ], | |
| "platformTypes": ["ANY_PLATFORM"], | |
| "threatEntryTypes": ["URL"], | |
| "threatEntries": [{"url": u} for u in urls[:500]], | |
| }, | |
| } | |
| try: | |
| resp = requests.post( | |
| API_ENDPOINTS["google_safe_browsing"], | |
| params={"key": _gsb_key()}, | |
| json=payload, | |
| timeout=timeout, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| matches = data.get("matches", []) | |
| flagged_urls = {m.get("threat", {}).get("url", "") for m in matches} | |
| return { | |
| "gsb_is_flagged": int(len(matches) > 0), | |
| "gsb_threat_count": len(matches), | |
| "_gsb_flagged_urls": flagged_urls, | |
| } | |
| except requests.Timeout: | |
| log.debug("Google Safe Browsing request timed out.") | |
| except Exception as exc: | |
| log.debug(f"Google Safe Browsing error: {exc}") | |
| return {"gsb_is_flagged": -1, "gsb_threat_count": -1, "_gsb_flagged_urls": set()} | |
| # --------------------------------------------------------------------------- | |
| # AbuseIPDB | |
| # --------------------------------------------------------------------------- | |
| def query_abuseipdb(ip_address: str, timeout: int = NETWORK_TIMEOUT) -> Dict: | |
| """Query AbuseIPDB for sender IP reputation. | |
| Security rationale: Phishing infrastructure reuses IP addresses. An IP | |
| with 50+ community abuse reports is almost certainly malicious, regardless | |
| of what the email claims its origin is. AbuseIPDB maintains crowdsourced | |
| reports from security teams globally — it catches infrastructure that | |
| commercial threat feeds miss. | |
| Args: | |
| ip_address: IPv4 or IPv6 address extracted from email Received headers. | |
| timeout: Request timeout in seconds. | |
| Returns: | |
| Dict with abuse_confidence_score, total_reports, is_tor, country_code, isp. | |
| """ | |
| if not _abuse_key(): | |
| log.debug("AbuseIPDB API key not configured — skipping IP reputation check.") | |
| return _default_abuseipdb_features() | |
| if not ip_address or _is_private_ip(ip_address): | |
| return _default_abuseipdb_features() | |
| try: | |
| headers = { | |
| "Key": _abuse_key(), | |
| "Accept": "application/json", | |
| } | |
| params = { | |
| "ipAddress": ip_address, | |
| "maxAgeInDays": "90", | |
| "verbose": "", | |
| } | |
| resp = requests.get( | |
| API_ENDPOINTS["abuseipdb_check"], | |
| headers=headers, | |
| params=params, | |
| timeout=timeout, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json().get("data", {}) | |
| return { | |
| "abuse_confidence_score": data.get("abuseConfidenceScore", 0), | |
| "abuse_total_reports": data.get("totalReports", 0), | |
| "abuse_is_tor": int(data.get("isTor", False)), | |
| "abuse_country_code": data.get("countryCode", ""), | |
| "abuse_isp": data.get("isp", ""), | |
| } | |
| except requests.Timeout: | |
| log.debug(f"AbuseIPDB timeout for IP '{ip_address}'") | |
| except Exception as exc: | |
| log.debug(f"AbuseIPDB error for IP '{ip_address}': {exc}") | |
| return _default_abuseipdb_features() | |
| def _default_abuseipdb_features() -> Dict: | |
| return { | |
| "abuse_confidence_score": -1, | |
| "abuse_total_reports": -1, | |
| "abuse_is_tor": -1, | |
| "abuse_country_code": "", | |
| "abuse_isp": "", | |
| } | |
| def _is_private_ip(ip: str) -> bool: | |
| """Return True if the IP is a private/reserved range (not useful for abuse check).""" | |
| private_prefixes = ("10.", "192.168.", "127.", "172.16.", "172.17.", | |
| "172.18.", "172.19.", "172.20.", "172.21.", | |
| "172.22.", "172.23.", "172.24.", "172.25.", | |
| "172.26.", "172.27.", "172.28.", "172.29.", | |
| "172.30.", "172.31.", "0.0.0.0", "::1", "fe80:") | |
| return any(ip.startswith(p) for p in private_prefixes) | |
| # --------------------------------------------------------------------------- | |
| # URLScan.io | |
| # --------------------------------------------------------------------------- | |
| def query_urlscan(url: str, timeout: int = NETWORK_TIMEOUT) -> Dict: | |
| """Search URLScan.io for existing scan results for a URL. | |
| Security rationale: URLScan.io is used daily by SOC analysts to investigate | |
| suspicious URLs. It captures screenshots of phishing pages, detects brand | |
| impersonation, and tracks redirect chains — all features that ML models | |
| cannot capture directly. Integrating URLScan signals you understand | |
| real-world analyst tooling. | |
| Args: | |
| url: URL string to search for. | |
| timeout: Request timeout in seconds. | |
| Returns: | |
| Dict with urlscan_malicious, urlscan_brand_impersonated, | |
| urlscan_redirect_count. | |
| """ | |
| if not _urlscan_key(): | |
| log.debug("URLScan.io API key not configured — skipping URLScan lookup.") | |
| return _default_urlscan_features() | |
| try: | |
| import urllib.parse | |
| query = urllib.parse.quote(f'page.url:"{url}"') | |
| headers = {"API-Key": _urlscan_key(), "Content-Type": "application/json"} | |
| resp = requests.get( | |
| f"{API_ENDPOINTS['urlscan_search']}?q={query}&size=1", | |
| headers=headers, | |
| timeout=timeout, | |
| ) | |
| if resp.status_code == 200: | |
| results = resp.json().get("results", []) | |
| if results: | |
| result = results[0] | |
| verdict = result.get("verdicts", {}).get("overall", {}) | |
| return { | |
| "urlscan_malicious": int(verdict.get("malicious", False)), | |
| "urlscan_brand_impersonated": int( | |
| bool(result.get("verdicts", {}).get("urlscan", {}).get("brands", [])) | |
| ), | |
| "urlscan_redirect_count": len( | |
| result.get("page", {}).get("redirects", []) | |
| ), | |
| } | |
| except requests.Timeout: | |
| log.debug(f"URLScan.io timeout for '{url[:80]}'") | |
| except Exception as exc: | |
| log.debug(f"URLScan.io error for '{url[:80]}': {exc}") | |
| return _default_urlscan_features() | |
| def _default_urlscan_features() -> Dict: | |
| return { | |
| "urlscan_malicious": -1, | |
| "urlscan_brand_impersonated": -1, | |
| "urlscan_redirect_count": -1, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # URLhaus (no key required) | |
| # --------------------------------------------------------------------------- | |
| def query_urlhaus(url: str, timeout: int = NETWORK_TIMEOUT) -> Dict: | |
| """Query abuse.ch URLhaus for malicious URL classification. | |
| Security rationale: URLhaus tracks malware distribution and phishing URLs | |
| submitted by the security community. No API key required — fully open. | |
| Args: | |
| url: URL string to query. | |
| timeout: Request timeout in seconds. | |
| Returns: | |
| Dict with urlhaus_threat (0=clean, 1=malicious/phishing, -1=unknown). | |
| """ | |
| try: | |
| resp = requests.post( | |
| API_ENDPOINTS["urlhaus_lookup"], | |
| data={"url": url}, | |
| timeout=timeout, | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| query_status = data.get("query_status", "") | |
| if query_status == "no_results": | |
| return {"urlhaus_threat": 0} | |
| elif query_status in ("is_host", "blacklisted"): | |
| return {"urlhaus_threat": 1} | |
| except Exception as exc: | |
| log.debug(f"URLhaus error for '{url[:80]}': {exc}") | |
| return {"urlhaus_threat": -1} | |
| # --------------------------------------------------------------------------- | |
| # Combined intelligence enrichment | |
| # --------------------------------------------------------------------------- | |
| def enrich_email_with_intelligence( | |
| urls: List[str], | |
| sender_ip: Optional[str] = None, | |
| ) -> Dict: | |
| """Run all intelligence API queries for an email and return combined features. | |
| Args: | |
| urls: List of URLs from the email. | |
| sender_ip: Sender IP extracted from Received: headers (optional). | |
| Returns: | |
| Merged dict of all intelligence features. | |
| """ | |
| features: Dict = {} | |
| # VT, URLhaus, URLScan — scan each URL individually (up to 5) and | |
| # store per-URL results under _vt_url_N / _uh_url_N / _us_url_N keys | |
| # (display-only; ML vector still uses primary URL's aggregated features) | |
| primary_url = urls[0] if urls else None | |
| for _i, _u in enumerate(urls[:5]): | |
| vt_i = query_virustotal(_u) | |
| features[f"_vt_url_{_i}"] = vt_i | |
| features[f"_vt_url_{_i}_url"] = _u | |
| uh_i = query_urlhaus(_u) | |
| features[f"_uh_url_{_i}"] = uh_i | |
| us_i = query_urlscan(_u) | |
| features[f"_us_url_{_i}"] = us_i | |
| # ML features: derived from the primary URL (backward-compatible) | |
| if primary_url: | |
| features.update(features.get("_vt_url_0", _default_vt_features())) | |
| _uh0 = features.get("_uh_url_0", {}) | |
| features["urlhaus_threat"] = _uh0.get("urlhaus_threat", -1) | |
| _us0 = features.get("_us_url_0", {}) | |
| features["urlscan_malicious"] = _us0.get("urlscan_malicious", -1) | |
| features["urlscan_brand_impersonated"] = _us0.get("urlscan_brand_impersonated", -1) | |
| features["urlscan_redirect_count"] = _us0.get("urlscan_redirect_count", -1) | |
| # GSB on all URLs (batch check — single API call) | |
| if urls: | |
| gsb = query_google_safe_browsing(urls) | |
| features.update(gsb) | |
| # AbuseIPDB on sender IP | |
| if sender_ip: | |
| abuse = query_abuseipdb(sender_ip) | |
| # Convert numeric abuse features only (drop strings for ML pipeline) | |
| features["abuse_confidence_score"] = abuse["abuse_confidence_score"] | |
| features["abuse_total_reports"] = abuse["abuse_total_reports"] | |
| features["abuse_is_tor"] = abuse["abuse_is_tor"] | |
| else: | |
| features.update({ | |
| "abuse_confidence_score": -1, | |
| "abuse_total_reports": -1, | |
| "abuse_is_tor": -1, | |
| }) | |
| return features | |
| def get_default_intelligence_features() -> Dict: | |
| """Return zero-filled intelligence features for emails without URLs/IPs.""" | |
| return { | |
| "vt_malicious": -1, | |
| "vt_suspicious": -1, | |
| "vt_clean": -1, | |
| "vt_reputation": 0, | |
| "gsb_is_flagged": -1, | |
| "gsb_threat_count": -1, | |
| "urlscan_malicious": -1, | |
| "urlscan_brand_impersonated": -1, | |
| "urlscan_redirect_count": -1, | |
| "urlhaus_threat": -1, | |
| "abuse_confidence_score": -1, | |
| "abuse_total_reports": -1, | |
| "abuse_is_tor": -1, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # IPQualityScore (IPQS) | |
| # --------------------------------------------------------------------------- | |
| def query_ipqs_email(email_address: str, timeout: int = NETWORK_TIMEOUT) -> Dict: | |
| """Check an email address against IPQualityScore Email Verification API. | |
| Returns fraud score (0–100), disposable flag, spam trap flag, | |
| and deliverability status. All display-only — NOT in the ML vector. | |
| Args: | |
| email_address: The sender email address to verify. | |
| timeout: Request timeout in seconds. | |
| Returns: | |
| Dict with ipqs_email_fraud_score, ipqs_email_disposable, | |
| ipqs_email_spam_trap, ipqs_email_valid, ipqs_email_recent_abuse, | |
| ipqs_email_deliverability, ipqs_email_dns_valid. | |
| """ | |
| _default = { | |
| "ipqs_email_fraud_score": -1, | |
| "ipqs_email_disposable": -1, | |
| "ipqs_email_spam_trap": -1, | |
| "ipqs_email_valid": -1, | |
| "ipqs_email_recent_abuse": -1, | |
| "ipqs_email_deliverability": "unknown", | |
| "ipqs_email_dns_valid": -1, | |
| } | |
| if not _ipqs_key() or not email_address: | |
| return _default | |
| try: | |
| import urllib.parse | |
| url = _IPQS_EMAIL_URL.format( | |
| key=_ipqs_key(), | |
| email=urllib.parse.quote(email_address, safe=""), | |
| ) | |
| params = {"timeout": 7, "fast": "true", "abuse_strictness": 1} | |
| resp = requests.get(url, params=params, timeout=timeout) | |
| if resp.status_code == 200: | |
| d = resp.json() | |
| if d.get("success", False): | |
| return { | |
| "ipqs_email_fraud_score": d.get("fraud_score", -1), | |
| "ipqs_email_disposable": int(d.get("disposable", False)), | |
| "ipqs_email_spam_trap": int(d.get("spam_trap_score", 0) > 50), | |
| "ipqs_email_valid": int(d.get("valid", False)), | |
| "ipqs_email_recent_abuse": int(d.get("recent_abuse", False)), | |
| "ipqs_email_deliverability": d.get("deliverability", "unknown"), | |
| "ipqs_email_dns_valid": int(d.get("dns_valid", False)), | |
| } | |
| else: | |
| _default["_ipqs_error"] = d.get("message", "API error") | |
| return _default | |
| except requests.Timeout: | |
| log.debug(f"IPQS email timeout for '{email_address}'") | |
| _default["_ipqs_error"] = "Request timed out" | |
| except Exception as exc: | |
| log.debug(f"IPQS email error for '{email_address}': {exc}") | |
| _default["_ipqs_error"] = str(exc) | |
| return _default | |
| def query_ipqs_url(url: str, timeout: int = NETWORK_TIMEOUT) -> Dict: | |
| """Scan a URL against IPQualityScore Malicious URL Scanner API. | |
| Returns phishing/malware/suspicious flags and an overall risk score. | |
| All display-only — NOT in the ML vector. | |
| Args: | |
| url: URL string to scan. | |
| timeout: Request timeout in seconds. | |
| Returns: | |
| Dict with ipqs_url_phishing, ipqs_url_malware, ipqs_url_suspicious, | |
| ipqs_url_unsafe, ipqs_url_risk_score, ipqs_url_domain_rank, | |
| ipqs_url_short_link_redirect, ipqs_url_spamming. | |
| """ | |
| _default = { | |
| "ipqs_url_phishing": -1, | |
| "ipqs_url_malware": -1, | |
| "ipqs_url_suspicious": -1, | |
| "ipqs_url_unsafe": -1, | |
| "ipqs_url_risk_score": -1, | |
| "ipqs_url_domain_rank": -1, | |
| "ipqs_url_short_link_redirect": -1, | |
| "ipqs_url_spamming": -1, | |
| } | |
| if not _ipqs_key() or not url: | |
| return _default | |
| try: | |
| import urllib.parse | |
| encoded_url = urllib.parse.quote(url, safe="") | |
| req_url = _IPQS_URL_URL.format(key=_ipqs_key(), url=encoded_url) | |
| params = {"strictness": 1, "allow_public_access_points": "true", "fast": "false"} | |
| resp = requests.get(req_url, params=params, timeout=timeout) | |
| if resp.status_code == 200: | |
| d = resp.json() | |
| if d.get("success", False): | |
| return { | |
| "ipqs_url_phishing": int(d.get("phishing", False)), | |
| "ipqs_url_malware": int(d.get("malware", False)), | |
| "ipqs_url_suspicious": int(d.get("suspicious", False)), | |
| "ipqs_url_unsafe": int(d.get("unsafe", False)), | |
| "ipqs_url_risk_score": d.get("risk_score", -1), | |
| "ipqs_url_domain_rank": d.get("domain_rank", -1), | |
| "ipqs_url_short_link_redirect": int(d.get("short_link_redirect", False)), | |
| "ipqs_url_spamming": int(d.get("spamming", False)), | |
| } | |
| else: | |
| _default["_ipqs_error"] = d.get("message", "API error") | |
| return _default | |
| except requests.Timeout: | |
| log.debug(f"IPQS URL timeout for '{url[:80]}'") | |
| _default["_ipqs_error"] = "Request timed out" | |
| except Exception as exc: | |
| log.debug(f"IPQS URL error for '{url[:80]}': {exc}") | |
| _default["_ipqs_error"] = str(exc) | |
| return _default | |
| def query_ipqs_ip(ip_address: str, timeout: int = NETWORK_TIMEOUT) -> Dict: | |
| """Check a sender IP against IPQualityScore IP Reputation API. | |
| Returns fraud score, proxy/VPN/Tor detection, and abuse flags. | |
| All display-only — NOT in the ML vector. | |
| Args: | |
| ip_address: IPv4 or IPv6 address to check. | |
| timeout: Request timeout in seconds. | |
| Returns: | |
| Dict with ipqs_ip_fraud_score, ipqs_ip_proxy, ipqs_ip_vpn, | |
| ipqs_ip_tor, ipqs_ip_recent_abuse, ipqs_ip_bot_status, | |
| ipqs_ip_country_code, ipqs_ip_isp, ipqs_ip_connection_type. | |
| """ | |
| _default = { | |
| "ipqs_ip_fraud_score": -1, | |
| "ipqs_ip_proxy": -1, | |
| "ipqs_ip_vpn": -1, | |
| "ipqs_ip_tor": -1, | |
| "ipqs_ip_recent_abuse": -1, | |
| "ipqs_ip_bot_status": -1, | |
| "ipqs_ip_country_code": "unknown", | |
| "ipqs_ip_isp": "unknown", | |
| "ipqs_ip_connection_type": "unknown", | |
| } | |
| if not _ipqs_key() or not ip_address: | |
| return _default | |
| if _is_private_ip(ip_address): | |
| return _default | |
| try: | |
| req_url = _IPQS_IP_URL.format(key=_ipqs_key(), ip=ip_address) | |
| params = {"strictness": 1, "allow_public_access_points": "true"} | |
| resp = requests.get(req_url, params=params, timeout=timeout) | |
| if resp.status_code == 200: | |
| d = resp.json() | |
| if d.get("success", False): | |
| return { | |
| "ipqs_ip_fraud_score": d.get("fraud_score", -1), | |
| "ipqs_ip_proxy": int(d.get("proxy", False)), | |
| "ipqs_ip_vpn": int(d.get("vpn", False)), | |
| "ipqs_ip_tor": int(d.get("tor", False)), | |
| "ipqs_ip_recent_abuse": int(d.get("recent_abuse", False)), | |
| "ipqs_ip_bot_status": int(d.get("bot_status", False)), | |
| "ipqs_ip_country_code": d.get("country_code", "unknown"), | |
| "ipqs_ip_isp": d.get("ISP", d.get("isp", "unknown")), | |
| "ipqs_ip_connection_type": d.get("connection_type", "unknown"), | |
| } | |
| else: | |
| _default["_ipqs_error"] = d.get("message", "API error") | |
| return _default | |
| except requests.Timeout: | |
| log.debug(f"IPQS IP timeout for '{ip_address}'") | |
| _default["_ipqs_error"] = "Request timed out" | |
| except Exception as exc: | |
| log.debug(f"IPQS IP error for '{ip_address}': {exc}") | |
| _default["_ipqs_error"] = str(exc) | |
| return _default | |