Spaces:
Configuration error
Configuration error
File size: 8,225 Bytes
55086fb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | import whois
import tldextract
import aiohttp
import datetime
import re
import asyncio
from urllib.parse import urlparse
from typing import Optional, Dict, Any
import os
from dotenv import load_dotenv
from langchain_core.tools import tool
load_dotenv()
from app.core.config import config
# class Config:
# GOOGLE_APIS_KEY: Optional[str] = os.getenv("GOOGLE_APIS_KEY")
# FIRECRAWL_API_KEY: Optional[str] = os.getenv("FIRECRAWL_API_KEY")
# URLSCAN_API_KEY: Optional[str] = os.getenv("URLSCAN_API_KEY")
# config = Config()
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class SourceCredibilityTool:
"""
A collection of tools for verifying sources URLs.
"""
@staticmethod
def extract_domain(url: str) -> str:
"""
Extract the domain from a given URL.
"""
extracted = tldextract.extract(url)
logger.info(f"Extracted components: {extracted}")
if not extracted.suffix:
logger.warning(f"No suffix found for URL: {url}")
return "unknown"
domain = f"{extracted.domain}.{extracted.suffix}"
logger.info(f"Extracted domain: {domain}")
return domain
@staticmethod
async def _submit_to_urlscan(url: str) -> Optional[str]:
"""
Submit a URL to urlscan.io for analysis and return the scan ID.
"""
api_key = config.URLSCAN_API_KEY
if not api_key:
logger.error("URLSCAN_API_KEY is not set in the environment variables.")
return None
submit_url = "https://urlscan.io/api/v1/scan/"
headers = {
'Content-Type': 'application/json',
'API-Key': api_key,
}
# logger.info(f"Headers for urlscan.io submission: {headers}")
data = {
'url': url,
'visibility': 'public'
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(submit_url, json=data, headers=headers) as response:
if response.status == 200:
resp_json = await response.json()
scan_id = resp_json.get('uuid')
result_url = f"https://urlscan.io/api/v1/result/{scan_id}/"
# logger.info(f"Submitted URL to urlscan.io: {data.get("result") or result_url}")
return data.get("result") or result_url
else:
text = await response.text()
logger.error(f"Failed to submit URL to urlscan.io, status code: {response.status} {text}")
return None
except aiohttp.ClientError as e:
logger.error(f"Error submitting URL to urlscan.io: {e}")
return None
@staticmethod
async def _fetch_urlscan_result(result_url: str) -> Optional[Dict[str, Any]]:
"""
Fetch the result of a urlscan.io analysis.
"""
api_key = config.URLSCAN_API_KEY
if not api_key:
logger.error("URLSCAN_API_KEY is not set in the environment variables.")
return None
headers = {
'API-Key': api_key,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(result_url, headers=headers) as response:
if response.status == 200:
resp_json = await response.json()
# logger.info(f"Fetched urlscan.io result from: {result_url}")
return resp_json
else:
text = await response.text()
logger.error(f"Failed to fetch urlscan.io result, status code: {response.status} {text}")
return None
except aiohttp.ClientError as e:
logger.error(f"Error fetching urlscan.io result: {e}")
return None
def extract_credibility_signals(urlscan_result: Dict[str, Any]) -> Dict[str, Any]:
data = urlscan_result
page = data.get("page", {})
stats = data.get("stats", {})
verdicts = data.get("verdicts", {})
task = data.get("task", {})
lists = data.get("lists", {})
return {
"url": task.get("url"),
"scan_date": task.get("time"),
"screenshot_url": task.get("screenshotURL"),
# Critical verdicts
"malicious_detected": verdicts.get("overall", {}).get("malicious", False),
"engine_detections": verdicts.get("engines", {}).get("maliciousTotal", 0),
"suspicious_categories": verdicts.get("overall", {}).get("categories", []),
# Domain & TLS age
"domain_age_days": page.get("apexDomainAgeDays", 0),
"tls_age_days": page.get("tlsAgeDays", 0),
"is_new_domain": page.get("apexDomainAgeDays", 9999) < 180,
"is_brand_new_tls": page.get("tlsAgeDays", 9999) < 60,
# Security posture
"secure_percentage": stats.get("securePercentage", 100),
"uses_mixed_content": stats.get("securePercentage", 100) < 98,
# Hosting
"server": page.get("server"),
"asn": page.get("asn"),
"asn_name": page.get("asnname"),
"ip": page.get("ip"),
# Privacy / trackers (approximate)
"total_requests": sum(s.get("count", 0) for s in stats.get("resourceStats", [])),
"third_party_domains": len(lists.get("domains", [])) - 1,
# Suspicious patterns
"has_data_urls": any("data:" in r.get("request", {}).get("url", "") for r in data.get("data", {}).get("requests", [])),
"redirects_to_suspicious": any(
tldextract.extract(url).domain in ["bit", "tinyurl"] or tldextract.extract(url).suffix in ["ru", "xyz", "top"]
for url in lists.get("linkDomains", [])
),
# Bonus: popularity
"umbrella_rank": next(
(item["rank"] for item in data.get("meta", {}).get("processors", {}).get("umbrella", {}).get("data", []) if item["hostname"] == page.get("domain")),
None
),
}
@staticmethod
@tool("check_source_credibility")
async def check_source_credibility(url: str) -> Dict[str, Any]:
"""
Check the credibility of a source URL using urlscan.io.
Returns a dictionary with credibility information.
"""
result = {
"url": url,
"domain": SourceCredibilityTool.extract_domain(url),
"urlscan_result": None,
"verdict": None,
"is_malicious": None,
"suspicious": None,
"categories": []
}
result_url = await SourceCredibilityTool._submit_to_urlscan(url)
if not result_url:
logger.error(f"Could not submit URL to urlscan.io: {url}")
return result
urlscan_data = None
if result_url:
for _ in range(10): # Retry up to 10 times
await asyncio.sleep(5) # Wait before retrying
urlscan_data = await SourceCredibilityTool._fetch_urlscan_result(result_url)
if urlscan_data:
break
urlscan_insights = {}
if urlscan_data:
result["urlscan_result"] = urlscan_data
credibitility_signals = SourceCredibilityTool.extract_credibility_signals(urlscan_data)
urlscan_insights.update(credibitility_signals)
return urlscan_insights
# # # Example usage:
# async def main():
# url = "https://bit.ly/3X9kP2m/"
# identifier = SourceCredibilityTool()
# domain = identifier.extract_domain(url)
# print(f"Extracted domain: {domain}")
# credibility = await identifier.check_source_credibility.ainvoke(url)
# print(f"Source credibility report: {credibility}")
# if __name__ == "__main__":
# import asyncio
# asyncio.run(main()) |