import os import base64 import asyncio import httpx from urllib.parse import urlparse from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import whois import datetime app = FastAPI() # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # VirusTotal API Key (from Hugging Face Secrets) VT_API_KEY = os.getenv("VT_API_KEY") class ScanRequest(BaseModel): target: str type: str @app.get("/") async def root(): return {"message": "Domify Threat Scanner", "status": "running"} @app.get("/health") async def health(): return {"status": "ok"} # ==================== PUBLIC DATA SOURCES (No API Key) ==================== async def get_urlscan_data(domain): """Get data from URLScan.io""" try: async with httpx.AsyncClient() as client: response = await client.get( f"https://urlscan.io/api/v1/search/?q=domain:{domain}", timeout=10 ) if response.status_code == 200: data = response.json() results = data.get('results', []) if results: latest = results[0] return { "screenshot": latest.get('screenshot'), "requests": len(latest.get('requests', [])), "ips": latest.get('ips', [])[:3], "asn": latest.get('asn', {}).get('asn'), "country": latest.get('country', '') } except: pass return None async def get_crtsh_data(domain): """Get SSL certificate history from crt.sh""" try: async with httpx.AsyncClient() as client: response = await client.get( f"https://crt.sh/?q={domain}&output=json", timeout=10 ) if response.status_code == 200: certs = response.json() if isinstance(certs, list) and certs: issuers = list(set([c.get('issuer_name', '') for c in certs[:10] if c.get('issuer_name')])) return { "cert_count": len(certs), "issuers": issuers[:3] } except: pass return None async def get_whois_data(domain): """Get WHOIS data""" try: w = whois.whois(domain) return { "registrar": str(w.registrar) if w.registrar else None, "creation_date": str(w.creation_date) if w.creation_date else None, "expiration_date": str(w.expiration_date) if w.expiration_date else None, "name_servers": w.name_servers[:3] if w.name_servers else [], "org": str(w.org) if w.org else None } except: return None async def get_threatfox_data(domain): """Get threat data from ThreatFox""" try: async with httpx.AsyncClient() as client: response = await client.post( "https://threatfox-api.abuse.ch/api/v1/", json={"query": "search_domain", "search_term": domain}, timeout=10 ) if response.status_code == 200: data = response.json() if data.get('query_status') == 'ok': threats = data.get('data', []) return { "threat_count": len(threats), "malware": list(set([t.get('malware') for t in threats[:3] if t.get('malware')])) } except: pass return None async def get_ipinfo(ip): """Get IP information""" if not ip: return None try: async with httpx.AsyncClient() as client: response = await client.get(f"http://ip-api.com/json/{ip}", timeout=10) if response.status_code == 200: data = response.json() if data.get('status') == 'success': return { "country": data.get('country'), "city": data.get('city'), "isp": data.get('isp'), "org": data.get('org') } except: pass return None # ==================== URL SCAN ==================== @app.post("/api/scan/url") async def scan_url(request: ScanRequest): if not VT_API_KEY: return {"status": "error", "message": "VT_API_KEY not set in secrets"} try: async with httpx.AsyncClient() as client: # Encode URL for VirusTotal url_id = base64.urlsafe_b64encode(request.target.encode()).decode().strip("=") # Get VirusTotal report response = await client.get( f"https://www.virustotal.com/api/v3/urls/{url_id}", headers={"x-apikey": VT_API_KEY} ) if response.status_code == 404: # URL never scanned - submit for scan submit = await client.post( "https://www.virustotal.com/api/v3/urls", headers={"x-apikey": VT_API_KEY}, data={"url": request.target} ) await asyncio.sleep(8) analysis_id = submit.json().get("data", {}).get("id") analysis = await client.get( f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", headers={"x-apikey": VT_API_KEY} ) attributes = analysis.json().get("data", {}).get("attributes", {}) else: data = response.json() attributes = data.get("data", {}).get("attributes", {}) # Extract domain parsed = urlparse(request.target) domain = parsed.netloc or request.target # Get enriched data (run in parallel) urlscan_data, crtsh_data, whois_data, threatfox_data = await asyncio.gather( get_urlscan_data(domain), get_crtsh_data(domain), get_whois_data(domain), get_threatfox_data(domain) ) # Get IP info if available ip_info = None if urlscan_data and urlscan_data.get('ips'): ip_info = await get_ipinfo(urlscan_data['ips'][0]) # Get vendor results results = attributes.get("last_analysis_results", {}) vendor_results = [] for engine, result in results.items(): vendor_results.append({ "engine": engine, "category": result.get("category", "undetected"), "result": result.get("result", "clean") }) # Sort: malicious first vendor_results.sort(key=lambda x: 0 if x["category"] == "malicious" else 1 if x["category"] == "suspicious" else 2 ) stats = attributes.get("last_analysis_stats", {}) return { "status": "success", "stats": { "malicious": stats.get("malicious", 0), "suspicious": stats.get("suspicious", 0), "harmless": stats.get("harmless", 0), "undetected": stats.get("undetected", 0) }, "vendor_results": vendor_results[:30], "total_vendors": len(vendor_results), "categories": attributes.get("categories", {}), "tags": attributes.get("tags", [])[:10], "reputation": attributes.get("reputation", 0), "times_submitted": attributes.get("times_submitted", 0), "enriched": { "urlscan": urlscan_data, "ssl_certs": crtsh_data, "whois": whois_data, "threatfox": threatfox_data, "ip_info": ip_info } } except Exception as e: return {"status": "error", "message": str(e)} # ==================== IP SCAN ==================== @app.post("/api/scan/ip") async def scan_ip(request: ScanRequest): if not VT_API_KEY: return {"status": "error", "message": "VT_API_KEY not set"} try: async with httpx.AsyncClient() as client: response = await client.get( f"https://www.virustotal.com/api/v3/ip_addresses/{request.target}", headers={"x-apikey": VT_API_KEY} ) if response.status_code != 200: return {"status": "error", "message": "IP not found"} data = response.json() attributes = data.get("data", {}).get("attributes", {}) stats = attributes.get("last_analysis_stats", {}) # Get IP info from free source ip_info = await get_ipinfo(request.target) return { "status": "success", "stats": { "malicious": stats.get("malicious", 0), "suspicious": stats.get("suspicious", 0), "harmless": stats.get("harmless", 0), "undetected": stats.get("undetected", 0) }, "country": attributes.get("country", "Unknown"), "network": attributes.get("network", "Unknown"), "as_owner": attributes.get("as_owner", "Unknown"), "ip_info": ip_info } except Exception as e: return {"status": "error", "message": str(e)} # ==================== DOMAIN SCAN ==================== @app.post("/api/scan/domain") async def scan_domain(request: ScanRequest): if not VT_API_KEY: return {"status": "error", "message": "VT_API_KEY not set"} try: async with httpx.AsyncClient() as client: response = await client.get( f"https://www.virustotal.com/api/v3/domains/{request.target}", headers={"x-apikey": VT_API_KEY} ) if response.status_code != 200: return {"status": "error", "message": "Domain not found"} data = response.json() attributes = data.get("data", {}).get("attributes", {}) stats = attributes.get("last_analysis_stats", {}) # Get enriched data crtsh_data, whois_data, threatfox_data = await asyncio.gather( get_crtsh_data(request.target), get_whois_data(request.target), get_threatfox_data(request.target) ) return { "status": "success", "stats": { "malicious": stats.get("malicious", 0), "suspicious": stats.get("suspicious", 0), "harmless": stats.get("harmless", 0), "undetected": stats.get("undetected", 0) }, "creation_date": attributes.get("creation_date", "Unknown"), "registrar": attributes.get("registrar", "Unknown"), "enriched": { "ssl_certs": crtsh_data, "whois": whois_data, "threatfox": threatfox_data } } except Exception as e: return {"status": "error", "message": str(e)} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)