Spaces:
Configuration error
Configuration error
| import whois | |
| import tldextract | |
| import aiohttp | |
| import datetime | |
| import re | |
| import asyncio | |
| from urllib.parse import urlparse | |
| from typing import Optional, Dict, Any | |
| import os | |
| from dotenv import load_dotenv | |
| from langchain_core.tools import tool | |
| load_dotenv() | |
| from app.core.config import config | |
| # class Config: | |
| # GOOGLE_APIS_KEY: Optional[str] = os.getenv("GOOGLE_APIS_KEY") | |
| # FIRECRAWL_API_KEY: Optional[str] = os.getenv("FIRECRAWL_API_KEY") | |
| # URLSCAN_API_KEY: Optional[str] = os.getenv("URLSCAN_API_KEY") | |
| # config = Config() | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| class SourceCredibilityTool: | |
| """ | |
| A collection of tools for verifying sources URLs. | |
| """ | |
| def extract_domain(url: str) -> str: | |
| """ | |
| Extract the domain from a given URL. | |
| """ | |
| extracted = tldextract.extract(url) | |
| logger.info(f"Extracted components: {extracted}") | |
| if not extracted.suffix: | |
| logger.warning(f"No suffix found for URL: {url}") | |
| return "unknown" | |
| domain = f"{extracted.domain}.{extracted.suffix}" | |
| logger.info(f"Extracted domain: {domain}") | |
| return domain | |
| async def _submit_to_urlscan(url: str) -> Optional[str]: | |
| """ | |
| Submit a URL to urlscan.io for analysis and return the scan ID. | |
| """ | |
| api_key = config.URLSCAN_API_KEY | |
| if not api_key: | |
| logger.error("URLSCAN_API_KEY is not set in the environment variables.") | |
| return None | |
| submit_url = "https://urlscan.io/api/v1/scan/" | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'API-Key': api_key, | |
| } | |
| # logger.info(f"Headers for urlscan.io submission: {headers}") | |
| data = { | |
| 'url': url, | |
| 'visibility': 'public' | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(submit_url, json=data, headers=headers) as response: | |
| if response.status == 200: | |
| resp_json = await response.json() | |
| scan_id = resp_json.get('uuid') | |
| result_url = f"https://urlscan.io/api/v1/result/{scan_id}/" | |
| # logger.info(f"Submitted URL to urlscan.io: {data.get("result") or result_url}") | |
| return data.get("result") or result_url | |
| else: | |
| text = await response.text() | |
| logger.error(f"Failed to submit URL to urlscan.io, status code: {response.status} {text}") | |
| return None | |
| except aiohttp.ClientError as e: | |
| logger.error(f"Error submitting URL to urlscan.io: {e}") | |
| return None | |
| async def _fetch_urlscan_result(result_url: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Fetch the result of a urlscan.io analysis. | |
| """ | |
| api_key = config.URLSCAN_API_KEY | |
| if not api_key: | |
| logger.error("URLSCAN_API_KEY is not set in the environment variables.") | |
| return None | |
| headers = { | |
| 'API-Key': api_key, | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(result_url, headers=headers) as response: | |
| if response.status == 200: | |
| resp_json = await response.json() | |
| # logger.info(f"Fetched urlscan.io result from: {result_url}") | |
| return resp_json | |
| else: | |
| text = await response.text() | |
| logger.error(f"Failed to fetch urlscan.io result, status code: {response.status} {text}") | |
| return None | |
| except aiohttp.ClientError as e: | |
| logger.error(f"Error fetching urlscan.io result: {e}") | |
| return None | |
| def extract_credibility_signals(urlscan_result: Dict[str, Any]) -> Dict[str, Any]: | |
| data = urlscan_result | |
| page = data.get("page", {}) | |
| stats = data.get("stats", {}) | |
| verdicts = data.get("verdicts", {}) | |
| task = data.get("task", {}) | |
| lists = data.get("lists", {}) | |
| return { | |
| "url": task.get("url"), | |
| "scan_date": task.get("time"), | |
| "screenshot_url": task.get("screenshotURL"), | |
| # Critical verdicts | |
| "malicious_detected": verdicts.get("overall", {}).get("malicious", False), | |
| "engine_detections": verdicts.get("engines", {}).get("maliciousTotal", 0), | |
| "suspicious_categories": verdicts.get("overall", {}).get("categories", []), | |
| # Domain & TLS age | |
| "domain_age_days": page.get("apexDomainAgeDays", 0), | |
| "tls_age_days": page.get("tlsAgeDays", 0), | |
| "is_new_domain": page.get("apexDomainAgeDays", 9999) < 180, | |
| "is_brand_new_tls": page.get("tlsAgeDays", 9999) < 60, | |
| # Security posture | |
| "secure_percentage": stats.get("securePercentage", 100), | |
| "uses_mixed_content": stats.get("securePercentage", 100) < 98, | |
| # Hosting | |
| "server": page.get("server"), | |
| "asn": page.get("asn"), | |
| "asn_name": page.get("asnname"), | |
| "ip": page.get("ip"), | |
| # Privacy / trackers (approximate) | |
| "total_requests": sum(s.get("count", 0) for s in stats.get("resourceStats", [])), | |
| "third_party_domains": len(lists.get("domains", [])) - 1, | |
| # Suspicious patterns | |
| "has_data_urls": any("data:" in r.get("request", {}).get("url", "") for r in data.get("data", {}).get("requests", [])), | |
| "redirects_to_suspicious": any( | |
| tldextract.extract(url).domain in ["bit", "tinyurl"] or tldextract.extract(url).suffix in ["ru", "xyz", "top"] | |
| for url in lists.get("linkDomains", []) | |
| ), | |
| # Bonus: popularity | |
| "umbrella_rank": next( | |
| (item["rank"] for item in data.get("meta", {}).get("processors", {}).get("umbrella", {}).get("data", []) if item["hostname"] == page.get("domain")), | |
| None | |
| ), | |
| } | |
| async def check_source_credibility(url: str) -> Dict[str, Any]: | |
| """ | |
| Check the credibility of a source URL using urlscan.io. | |
| Returns a dictionary with credibility information. | |
| """ | |
| result = { | |
| "url": url, | |
| "domain": SourceCredibilityTool.extract_domain(url), | |
| "urlscan_result": None, | |
| "verdict": None, | |
| "is_malicious": None, | |
| "suspicious": None, | |
| "categories": [] | |
| } | |
| result_url = await SourceCredibilityTool._submit_to_urlscan(url) | |
| if not result_url: | |
| logger.error(f"Could not submit URL to urlscan.io: {url}") | |
| return result | |
| urlscan_data = None | |
| if result_url: | |
| for _ in range(10): # Retry up to 10 times | |
| await asyncio.sleep(5) # Wait before retrying | |
| urlscan_data = await SourceCredibilityTool._fetch_urlscan_result(result_url) | |
| if urlscan_data: | |
| break | |
| urlscan_insights = {} | |
| if urlscan_data: | |
| result["urlscan_result"] = urlscan_data | |
| credibitility_signals = SourceCredibilityTool.extract_credibility_signals(urlscan_data) | |
| urlscan_insights.update(credibitility_signals) | |
| return urlscan_insights | |
| # # # Example usage: | |
| # async def main(): | |
| # url = "https://bit.ly/3X9kP2m/" | |
| # identifier = SourceCredibilityTool() | |
| # domain = identifier.extract_domain(url) | |
| # print(f"Extracted domain: {domain}") | |
| # credibility = await identifier.check_source_credibility.ainvoke(url) | |
| # print(f"Source credibility report: {credibility}") | |
| # if __name__ == "__main__": | |
| # import asyncio | |
| # asyncio.run(main()) |