| import os |
| import base64 |
| import asyncio |
| import httpx |
| from urllib.parse import urlparse |
| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| import whois |
| import datetime |
|
|
| app = FastAPI() |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| VT_API_KEY = os.getenv("VT_API_KEY") |
|
|
| class ScanRequest(BaseModel): |
| target: str |
| type: str |
|
|
| @app.get("/") |
| async def root(): |
| return {"message": "Domify Threat Scanner", "status": "running"} |
|
|
| @app.get("/health") |
| async def health(): |
| return {"status": "ok"} |
|
|
| |
|
|
| async def get_urlscan_data(domain): |
| """Get data from URLScan.io""" |
| try: |
| async with httpx.AsyncClient() as client: |
| response = await client.get( |
| f"https://urlscan.io/api/v1/search/?q=domain:{domain}", |
| timeout=10 |
| ) |
| if response.status_code == 200: |
| data = response.json() |
| results = data.get('results', []) |
| if results: |
| latest = results[0] |
| return { |
| "screenshot": latest.get('screenshot'), |
| "requests": len(latest.get('requests', [])), |
| "ips": latest.get('ips', [])[:3], |
| "asn": latest.get('asn', {}).get('asn'), |
| "country": latest.get('country', '') |
| } |
| except: |
| pass |
| return None |
|
|
| async def get_crtsh_data(domain): |
| """Get SSL certificate history from crt.sh""" |
| try: |
| async with httpx.AsyncClient() as client: |
| response = await client.get( |
| f"https://crt.sh/?q={domain}&output=json", |
| timeout=10 |
| ) |
| if response.status_code == 200: |
| certs = response.json() |
| if isinstance(certs, list) and certs: |
| issuers = list(set([c.get('issuer_name', '') for c in certs[:10] if c.get('issuer_name')])) |
| return { |
| "cert_count": len(certs), |
| "issuers": issuers[:3] |
| } |
| except: |
| pass |
| return None |
|
|
| async def get_whois_data(domain): |
| """Get WHOIS data""" |
| try: |
| w = whois.whois(domain) |
| return { |
| "registrar": str(w.registrar) if w.registrar else None, |
| "creation_date": str(w.creation_date) if w.creation_date else None, |
| "expiration_date": str(w.expiration_date) if w.expiration_date else None, |
| "name_servers": w.name_servers[:3] if w.name_servers else [], |
| "org": str(w.org) if w.org else None |
| } |
| except: |
| return None |
|
|
| async def get_threatfox_data(domain): |
| """Get threat data from ThreatFox""" |
| try: |
| async with httpx.AsyncClient() as client: |
| response = await client.post( |
| "https://threatfox-api.abuse.ch/api/v1/", |
| json={"query": "search_domain", "search_term": domain}, |
| timeout=10 |
| ) |
| if response.status_code == 200: |
| data = response.json() |
| if data.get('query_status') == 'ok': |
| threats = data.get('data', []) |
| return { |
| "threat_count": len(threats), |
| "malware": list(set([t.get('malware') for t in threats[:3] if t.get('malware')])) |
| } |
| except: |
| pass |
| return None |
|
|
| async def get_ipinfo(ip): |
| """Get IP information""" |
| if not ip: |
| return None |
| try: |
| async with httpx.AsyncClient() as client: |
| response = await client.get(f"http://ip-api.com/json/{ip}", timeout=10) |
| if response.status_code == 200: |
| data = response.json() |
| if data.get('status') == 'success': |
| return { |
| "country": data.get('country'), |
| "city": data.get('city'), |
| "isp": data.get('isp'), |
| "org": data.get('org') |
| } |
| except: |
| pass |
| return None |
|
|
| |
| @app.post("/api/scan/url") |
| async def scan_url(request: ScanRequest): |
| if not VT_API_KEY: |
| return {"status": "error", "message": "VT_API_KEY not set in secrets"} |
| |
| try: |
| async with httpx.AsyncClient() as client: |
| |
| url_id = base64.urlsafe_b64encode(request.target.encode()).decode().strip("=") |
| |
| |
| response = await client.get( |
| f"https://www.virustotal.com/api/v3/urls/{url_id}", |
| headers={"x-apikey": VT_API_KEY} |
| ) |
| |
| if response.status_code == 404: |
| |
| submit = await client.post( |
| "https://www.virustotal.com/api/v3/urls", |
| headers={"x-apikey": VT_API_KEY}, |
| data={"url": request.target} |
| ) |
| await asyncio.sleep(8) |
| analysis_id = submit.json().get("data", {}).get("id") |
| analysis = await client.get( |
| f"https://www.virustotal.com/api/v3/analyses/{analysis_id}", |
| headers={"x-apikey": VT_API_KEY} |
| ) |
| attributes = analysis.json().get("data", {}).get("attributes", {}) |
| else: |
| data = response.json() |
| attributes = data.get("data", {}).get("attributes", {}) |
| |
| |
| parsed = urlparse(request.target) |
| domain = parsed.netloc or request.target |
| |
| |
| urlscan_data, crtsh_data, whois_data, threatfox_data = await asyncio.gather( |
| get_urlscan_data(domain), |
| get_crtsh_data(domain), |
| get_whois_data(domain), |
| get_threatfox_data(domain) |
| ) |
| |
| |
| ip_info = None |
| if urlscan_data and urlscan_data.get('ips'): |
| ip_info = await get_ipinfo(urlscan_data['ips'][0]) |
| |
| |
| results = attributes.get("last_analysis_results", {}) |
| vendor_results = [] |
| for engine, result in results.items(): |
| vendor_results.append({ |
| "engine": engine, |
| "category": result.get("category", "undetected"), |
| "result": result.get("result", "clean") |
| }) |
| |
| |
| vendor_results.sort(key=lambda x: |
| 0 if x["category"] == "malicious" |
| else 1 if x["category"] == "suspicious" |
| else 2 |
| ) |
| |
| stats = attributes.get("last_analysis_stats", {}) |
| |
| return { |
| "status": "success", |
| "stats": { |
| "malicious": stats.get("malicious", 0), |
| "suspicious": stats.get("suspicious", 0), |
| "harmless": stats.get("harmless", 0), |
| "undetected": stats.get("undetected", 0) |
| }, |
| "vendor_results": vendor_results[:30], |
| "total_vendors": len(vendor_results), |
| "categories": attributes.get("categories", {}), |
| "tags": attributes.get("tags", [])[:10], |
| "reputation": attributes.get("reputation", 0), |
| "times_submitted": attributes.get("times_submitted", 0), |
| "enriched": { |
| "urlscan": urlscan_data, |
| "ssl_certs": crtsh_data, |
| "whois": whois_data, |
| "threatfox": threatfox_data, |
| "ip_info": ip_info |
| } |
| } |
| |
| except Exception as e: |
| return {"status": "error", "message": str(e)} |
|
|
| |
| @app.post("/api/scan/ip") |
| async def scan_ip(request: ScanRequest): |
| if not VT_API_KEY: |
| return {"status": "error", "message": "VT_API_KEY not set"} |
| |
| try: |
| async with httpx.AsyncClient() as client: |
| response = await client.get( |
| f"https://www.virustotal.com/api/v3/ip_addresses/{request.target}", |
| headers={"x-apikey": VT_API_KEY} |
| ) |
| |
| if response.status_code != 200: |
| return {"status": "error", "message": "IP not found"} |
| |
| data = response.json() |
| attributes = data.get("data", {}).get("attributes", {}) |
| stats = attributes.get("last_analysis_stats", {}) |
| |
| |
| ip_info = await get_ipinfo(request.target) |
| |
| return { |
| "status": "success", |
| "stats": { |
| "malicious": stats.get("malicious", 0), |
| "suspicious": stats.get("suspicious", 0), |
| "harmless": stats.get("harmless", 0), |
| "undetected": stats.get("undetected", 0) |
| }, |
| "country": attributes.get("country", "Unknown"), |
| "network": attributes.get("network", "Unknown"), |
| "as_owner": attributes.get("as_owner", "Unknown"), |
| "ip_info": ip_info |
| } |
| |
| except Exception as e: |
| return {"status": "error", "message": str(e)} |
|
|
| |
| @app.post("/api/scan/domain") |
| async def scan_domain(request: ScanRequest): |
| if not VT_API_KEY: |
| return {"status": "error", "message": "VT_API_KEY not set"} |
| |
| try: |
| async with httpx.AsyncClient() as client: |
| response = await client.get( |
| f"https://www.virustotal.com/api/v3/domains/{request.target}", |
| headers={"x-apikey": VT_API_KEY} |
| ) |
| |
| if response.status_code != 200: |
| return {"status": "error", "message": "Domain not found"} |
| |
| data = response.json() |
| attributes = data.get("data", {}).get("attributes", {}) |
| stats = attributes.get("last_analysis_stats", {}) |
| |
| |
| crtsh_data, whois_data, threatfox_data = await asyncio.gather( |
| get_crtsh_data(request.target), |
| get_whois_data(request.target), |
| get_threatfox_data(request.target) |
| ) |
| |
| return { |
| "status": "success", |
| "stats": { |
| "malicious": stats.get("malicious", 0), |
| "suspicious": stats.get("suspicious", 0), |
| "harmless": stats.get("harmless", 0), |
| "undetected": stats.get("undetected", 0) |
| }, |
| "creation_date": attributes.get("creation_date", "Unknown"), |
| "registrar": attributes.get("registrar", "Unknown"), |
| "enriched": { |
| "ssl_certs": crtsh_data, |
| "whois": whois_data, |
| "threatfox": threatfox_data |
| } |
| } |
| |
| except Exception as e: |
| return {"status": "error", "message": str(e)} |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |