Testys's picture
feat: adding verifact services with backend code and multi-agent workflow
55086fb
import whois
import tldextract
import aiohttp
import datetime
import re
import asyncio
from urllib.parse import urlparse
from typing import Optional, Dict, Any
import os
from dotenv import load_dotenv
from langchain_core.tools import tool
load_dotenv()
from app.core.config import config
# class Config:
# GOOGLE_APIS_KEY: Optional[str] = os.getenv("GOOGLE_APIS_KEY")
# FIRECRAWL_API_KEY: Optional[str] = os.getenv("FIRECRAWL_API_KEY")
# URLSCAN_API_KEY: Optional[str] = os.getenv("URLSCAN_API_KEY")
# config = Config()
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class SourceCredibilityTool:
"""
A collection of tools for verifying sources URLs.
"""
@staticmethod
def extract_domain(url: str) -> str:
"""
Extract the domain from a given URL.
"""
extracted = tldextract.extract(url)
logger.info(f"Extracted components: {extracted}")
if not extracted.suffix:
logger.warning(f"No suffix found for URL: {url}")
return "unknown"
domain = f"{extracted.domain}.{extracted.suffix}"
logger.info(f"Extracted domain: {domain}")
return domain
@staticmethod
async def _submit_to_urlscan(url: str) -> Optional[str]:
"""
Submit a URL to urlscan.io for analysis and return the scan ID.
"""
api_key = config.URLSCAN_API_KEY
if not api_key:
logger.error("URLSCAN_API_KEY is not set in the environment variables.")
return None
submit_url = "https://urlscan.io/api/v1/scan/"
headers = {
'Content-Type': 'application/json',
'API-Key': api_key,
}
# logger.info(f"Headers for urlscan.io submission: {headers}")
data = {
'url': url,
'visibility': 'public'
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(submit_url, json=data, headers=headers) as response:
if response.status == 200:
resp_json = await response.json()
scan_id = resp_json.get('uuid')
result_url = f"https://urlscan.io/api/v1/result/{scan_id}/"
# logger.info(f"Submitted URL to urlscan.io: {data.get("result") or result_url}")
return data.get("result") or result_url
else:
text = await response.text()
logger.error(f"Failed to submit URL to urlscan.io, status code: {response.status} {text}")
return None
except aiohttp.ClientError as e:
logger.error(f"Error submitting URL to urlscan.io: {e}")
return None
@staticmethod
async def _fetch_urlscan_result(result_url: str) -> Optional[Dict[str, Any]]:
"""
Fetch the result of a urlscan.io analysis.
"""
api_key = config.URLSCAN_API_KEY
if not api_key:
logger.error("URLSCAN_API_KEY is not set in the environment variables.")
return None
headers = {
'API-Key': api_key,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(result_url, headers=headers) as response:
if response.status == 200:
resp_json = await response.json()
# logger.info(f"Fetched urlscan.io result from: {result_url}")
return resp_json
else:
text = await response.text()
logger.error(f"Failed to fetch urlscan.io result, status code: {response.status} {text}")
return None
except aiohttp.ClientError as e:
logger.error(f"Error fetching urlscan.io result: {e}")
return None
def extract_credibility_signals(urlscan_result: Dict[str, Any]) -> Dict[str, Any]:
data = urlscan_result
page = data.get("page", {})
stats = data.get("stats", {})
verdicts = data.get("verdicts", {})
task = data.get("task", {})
lists = data.get("lists", {})
return {
"url": task.get("url"),
"scan_date": task.get("time"),
"screenshot_url": task.get("screenshotURL"),
# Critical verdicts
"malicious_detected": verdicts.get("overall", {}).get("malicious", False),
"engine_detections": verdicts.get("engines", {}).get("maliciousTotal", 0),
"suspicious_categories": verdicts.get("overall", {}).get("categories", []),
# Domain & TLS age
"domain_age_days": page.get("apexDomainAgeDays", 0),
"tls_age_days": page.get("tlsAgeDays", 0),
"is_new_domain": page.get("apexDomainAgeDays", 9999) < 180,
"is_brand_new_tls": page.get("tlsAgeDays", 9999) < 60,
# Security posture
"secure_percentage": stats.get("securePercentage", 100),
"uses_mixed_content": stats.get("securePercentage", 100) < 98,
# Hosting
"server": page.get("server"),
"asn": page.get("asn"),
"asn_name": page.get("asnname"),
"ip": page.get("ip"),
# Privacy / trackers (approximate)
"total_requests": sum(s.get("count", 0) for s in stats.get("resourceStats", [])),
"third_party_domains": len(lists.get("domains", [])) - 1,
# Suspicious patterns
"has_data_urls": any("data:" in r.get("request", {}).get("url", "") for r in data.get("data", {}).get("requests", [])),
"redirects_to_suspicious": any(
tldextract.extract(url).domain in ["bit", "tinyurl"] or tldextract.extract(url).suffix in ["ru", "xyz", "top"]
for url in lists.get("linkDomains", [])
),
# Bonus: popularity
"umbrella_rank": next(
(item["rank"] for item in data.get("meta", {}).get("processors", {}).get("umbrella", {}).get("data", []) if item["hostname"] == page.get("domain")),
None
),
}
@staticmethod
@tool("check_source_credibility")
async def check_source_credibility(url: str) -> Dict[str, Any]:
"""
Check the credibility of a source URL using urlscan.io.
Returns a dictionary with credibility information.
"""
result = {
"url": url,
"domain": SourceCredibilityTool.extract_domain(url),
"urlscan_result": None,
"verdict": None,
"is_malicious": None,
"suspicious": None,
"categories": []
}
result_url = await SourceCredibilityTool._submit_to_urlscan(url)
if not result_url:
logger.error(f"Could not submit URL to urlscan.io: {url}")
return result
urlscan_data = None
if result_url:
for _ in range(10): # Retry up to 10 times
await asyncio.sleep(5) # Wait before retrying
urlscan_data = await SourceCredibilityTool._fetch_urlscan_result(result_url)
if urlscan_data:
break
urlscan_insights = {}
if urlscan_data:
result["urlscan_result"] = urlscan_data
credibitility_signals = SourceCredibilityTool.extract_credibility_signals(urlscan_data)
urlscan_insights.update(credibitility_signals)
return urlscan_insights
# # # Example usage:
# async def main():
# url = "https://bit.ly/3X9kP2m/"
# identifier = SourceCredibilityTool()
# domain = identifier.extract_domain(url)
# print(f"Extracted domain: {domain}")
# credibility = await identifier.check_source_credibility.ainvoke(url)
# print(f"Source credibility report: {credibility}")
# if __name__ == "__main__":
# import asyncio
# asyncio.run(main())