Luminet / main.py
koesan's picture
Update main.py
aa6fbb0 verified
import os, re, time, json, socket, logging, threading, hashlib, secrets, ipaddress, subprocess, concurrent.futures, requests, pycountry, ssl
from datetime import datetime, timedelta, timezone
from functools import wraps
from collections import defaultdict, Counter
from flask import Flask, render_template, request, jsonify
from OpenSSL import crypto
from typing import Optional, Dict, List
import whois, tldextract
import dns.resolver, dns.reversename, dns.message, dns.query, dns.rdatatype, dns.flags, dns.edns, dns.zone
from dns.resolver import NoAnswer, NXDOMAIN, Timeout, NoNameservers, YXDOMAIN
from dns.exception import DNSException
from dns.query import BadResponse
# Opsiyonel bağımlılıklar için import denemeleri
try:
import scapy.all as scapy
SCAPY_AVAILABLE = True
except (ImportError, PermissionError):
SCAPY_AVAILABLE = False
try:
import nmap
NMAP_AVAILABLE = True
except ImportError:
NMAP_AVAILABLE = False
# Flask uygulaması ve konfigürasyon
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', secrets.token_hex(32))
app.jinja_env.globals.update(json=json)
app.jinja_env.filters['tojson'] = json.dumps
# Loglama konfigürasyonu
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# API endpointleri
IANA_IP_BOOTSTRAP_URL = "https://rdap.iana.org/ip/"
IANA_ASN_BOOTSTRAP_URL = "https://rdap.iana.org/autnum/"
RIPESTAT_DATA_URL = "https://stat.ripe.net/data/"
IPINFO_URL = "https://ipinfo.io/"
IP_API_URL = "http://ip-api.com/json/"
OSM_NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"
# Traceroute için API rate limitini göz önünde bulundur
REQUEST_DELAY = 0.1
# DNSBL listeleri
DNS_BL_LISTS = {
'spam': {
'zen.spamhaus.org': 'Spamhaus Zen (SBL+XBL+PBL)',
'bl.spamcop.net': 'SpamCop Blocking List',
'bl.spameatingmonkey.net': 'SEM Black',
'backscatter.spameatingmonkey.net': 'SEM Backscatter',
'bl.mailspike.net': 'Mailspike',
'bl.drmx.org': 'DRMX',
'bl.0spam.org': '0Spam Main BL',
'rbl.0spam.org': '0Spam Realtime BL',
'bl.nordspam.com': 'NordSpam IP BL',
},
'domain': {
'dbl.0spam.org': '0Spam Domain BL',
'dbl.nordspam.com': 'NordSpam Domain BL',
'uribl.spameatingmonkey.net': 'SEM URI BL',
'urired.spameatingmonkey.net': 'SEM URI Red',
},
'network': {
'b.barracudacentral.org': 'Barracuda Reputation Block List',
'dnsbl.sorbs.net': 'SORBS Aggregate',
'dnsbl-1.uceprotect.net': 'UCEPROTECT Level 1',
'dnsbl-2.uceprotect.net': 'UCEPROTECT Level 2',
'dnsbl-3.uceprotect.net': 'UCEPROTECT Level 3',
'dyna.spamrats.com': 'SpamRats Dynamic IP List',
'noptr.spamrats.com': 'SpamRats NoPTR List',
'netbl.spameatingmonkey.net': 'SEM Network BL',
'bl.ipv6.spameatingmonkey.net': 'SEM IPv6 BL',
'nbl.0spam.org': '0Spam Network BL',
},
'proxy_bot': {
'dnsbl.dronebl.org': 'DroneBL',
'tor.dan.me.uk': 'Tor Exit Nodes',
'torexit.dan.me.uk': 'Tor Exit Nodes Only',
'openproxy.bls.digibase.ca': 'Digibase OpenProxy',
'proxyabuse.bls.digibase.ca': 'Digibase Proxy Abuse',
'spambot.bls.digibase.ca': 'Digibase Spambot',
'socks.dnsbl.sorbs.net': 'SORBS SOCKS Proxies',
'zombie.dnsbl.sorbs.net': 'SORBS Zombies',
},
'misc': {
'psbl.surriel.com': 'Passive Spam Block List',
'db.wpbl.info': 'Weighted Private Block List',
'bl.blocklist.de': 'Blocklist.de',
'all.s5h.net': 'S5H Blocklist',
'rbl.efnet.org': 'EFNet RBL',
'combined.rbl.msrbl.net': 'MSRBL Combined',
'bl.mxrbl.com': 'MXRBL',
},
}
def get_dns_records(domain):
records = {
"A": [], "AAAA": [], "MX": [], "TXT": [], "NS": [], "CNAME": None, "SOA": None,
"SRV": [], "CAA": [], "DMARC": [], "SPF": [], "PTR": [], "NAPTR": [], "SSHFP": [],
"TLSA": [], "LOC": [], "DS": [], "DNSKEY": [], "WHOIS": {}, "ZONE_TRANSFER": []
}
resolver = dns.resolver.Resolver()
resolver.timeout = 5
resolver.lifetime = 5
# A
for rdata in safe_resolve(resolver, domain, 'A'):
records["A"].append(rdata.address)
# AAAA
for rdata in safe_resolve(resolver, domain, 'AAAA'):
records["AAAA"].append(rdata.address)
# PTR
for ip_addr in records["A"] + records["AAAA"]:
try:
rev_name = dns.reversename.from_address(ip_addr)
for rdata in safe_resolve(resolver, rev_name, 'PTR'):
records["PTR"].append({"ip": ip_addr, "ptr": str(rdata.target)})
except Exception:
pass
# MX
for rdata in safe_resolve(resolver, domain, 'MX'):
records["MX"].append({"preference": rdata.preference, "exchange": rdata.exchange.to_text()})
# TXT / SPF
for rdata in safe_resolve(resolver, domain, 'TXT'):
for txt_string in rdata.strings:
txt_record = txt_string.decode('utf-8')
records["TXT"].append(txt_record)
if txt_record.lower().startswith('v=spf'):
records["SPF"].append(txt_record)
# DMARC
for rdata in safe_resolve(resolver, f'_dmarc.{domain}', 'TXT'):
for txt_string in rdata.strings:
records["DMARC"].append(txt_string.decode('utf-8'))
# NS
for rdata in safe_resolve(resolver, domain, 'NS'):
records["NS"].append(rdata.target.to_text())
# CNAME
cname_result = safe_resolve(resolver, domain, 'CNAME')
if cname_result:
records["CNAME"] = cname_result[0].target.to_text()
# SOA
soa_result = safe_resolve(resolver, domain, 'SOA')
if soa_result:
soa = soa_result[0]
records["SOA"] = {
"mname": soa.mname.to_text(),
"rname": soa.rname.to_text(),
"serial": soa.serial,
"refresh": soa.refresh,
"retry": soa.retry,
"expire": soa.expire,
"minimum": soa.minimum
}
# SRV
for rdata in safe_resolve(resolver, domain, 'SRV'):
records["SRV"].append({
"port": rdata.port,
"target": rdata.target.to_text(),
"priority": rdata.priority,
"weight": rdata.weight
})
# CAA
for rdata in safe_resolve(resolver, domain, 'CAA'):
records["CAA"].append({
"flags": rdata.flags,
"tag": rdata.tag.decode('utf-8'),
"value": rdata.value.decode('utf-8')
})
# Ek kayıtlar
for rdata in safe_resolve(resolver, domain, 'NAPTR'):
records["NAPTR"].append(str(rdata))
for rdata in safe_resolve(resolver, domain, 'SSHFP'):
records["SSHFP"].append(str(rdata))
for rdata in safe_resolve(resolver, domain, 'TLSA'):
records["TLSA"].append(str(rdata))
for rdata in safe_resolve(resolver, domain, 'LOC'):
records["LOC"].append(str(rdata))
for rdata in safe_resolve(resolver, domain, 'DS'):
records["DS"].append(str(rdata))
for rdata in safe_resolve(resolver, domain, 'DNSKEY'):
records["DNSKEY"].append(str(rdata))
# WHOIS - sadece gerçek domainlerde çalıştır
try:
extracted = tldextract.extract(domain)
real_domain = f"{extracted.domain}.{extracted.suffix}"
w = whois.whois(real_domain)
records["WHOIS"] = {
"registrar": w.registrar,
"creation_date": str(w.creation_date),
"expiration_date": str(w.expiration_date),
"name_servers": w.name_servers,
"emails": w.emails
}
except Exception as e:
records["WHOIS"] = {"error": str(e)}
# Zone transfer testi
try:
ns_records = safe_resolve(resolver, domain, 'NS')
for ns in ns_records:
try:
ns_ip = safe_resolve(resolver, ns.target, 'A')[0].to_text()
xfr = dns.query.xfr(ns_ip, domain, timeout=5)
zone = dns.zone.from_xfr(xfr)
records["ZONE_TRANSFER"] = list(zone.nodes.keys())
except Exception:
pass
except Exception:
pass
return records
def whois_info(domain):
try:
extracted = tldextract.extract(domain)
real_domain = f"{extracted.domain}.{extracted.suffix}"
w = whois.whois(real_domain)
return {
"registrar": w.registrar,
"creation_date": str(w.creation_date),
"expiration_date": str(w.expiration_date),
"name_servers": w.name_servers,
"emails": w.emails
}
except Exception as e:
return {"error": str(e)}
def safe_resolve(resolver, qname, rdtype):
try:
return resolver.resolve(qname, rdtype, raise_on_no_answer=False)
except (NoAnswer, NXDOMAIN, Timeout, NoNameservers, DNSException):
return []
def get_authoritative_dns(domain):
"""Gerçek alan adının yetkili DNS sunucusunu bulur."""
try:
ns_to_query = '198.41.0.4' # Root server
parts = domain.split('.')
for i in range(len(parts)):
subdomain = '.'.join(parts[i:])
q = dns.message.make_query(subdomain, dns.rdatatype.NS)
res = dns.query.udp(q, ns_to_query, timeout=5)
if res.authority:
ns_record = res.authority[0]
return str(ns_record[0].target)
elif res.additional:
for rdata in res.additional:
if rdata.rdtype == dns.rdatatype.A:
ns_to_query = rdata.address
break
except Exception:
pass
return None
def check_dnssec(domain):
try:
resolver = dns.resolver.Resolver()
resolver.use_edns(edns=True, payload=4096)
ds_records = resolver.resolve(domain, 'DS', raise_on_no_answer=False)
dnskey_records = resolver.resolve(domain, 'DNSKEY', raise_on_no_answer=False)
if ds_records and dnskey_records:
return "DNSSEC Etkin (DS ve DNSKEY kayıtları bulundu)"
elif ds_records:
return "DNSSEC Kısmen Etkin (Sadece DS kaydı bulundu)"
else:
return "DNSSEC Etkin Değil"
except (NoAnswer, NXDOMAIN, Timeout, NoNameservers):
return "DNSSEC Durumu Belirlenemedi"
except Exception as e:
return f"Hata: {str(e)}"
def check_dnsbl(ip_address):
results = defaultdict(list)
results_lock = threading.Lock()
try:
if '.' in ip_address:
reversed_ip = '.'.join(reversed(ip_address.split('.')))
else:
reversed_ip = dns.reversename.from_address(ip_address).to_text(omit_final_dot=True).replace('.ip6.arpa', '')
except Exception as e:
results['error'] = f"IP adresi çevrim hatası: {str(e)}"
return results
def query_worker(dnsbl, description, category):
try:
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8', '1.1.1.1']
resolver.timeout = 5
resolver.lifetime = 5
query = f"{reversed_ip}.{dnsbl}"
answers = resolver.resolve(query, 'A')
for answer in answers:
with results_lock:
results[category].append({
'dnsbl': dnsbl,
'description': description,
'response': answer.to_text()
})
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
except dns.resolver.Timeout:
with results_lock:
results[category].append({'dnsbl': dnsbl, 'error': 'Timeout'})
except Exception as e:
with results_lock:
results[category].append({'dnsbl': dnsbl, 'error': str(e)})
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
future_to_dnsbl = {
executor.submit(query_worker, dnsbl, description, category): dnsbl
for category, blacklists in DNS_BL_LISTS.items()
for dnsbl, description in blacklists.items()
}
for future in concurrent.futures.as_completed(future_to_dnsbl):
try:
future.result()
except Exception as exc:
dnsbl_name = future_to_dnsbl[future]
logger.error(f'{dnsbl_name} sorgusunda beklenmedik hata oluştu: {exc}')
return results
def process_bgp_data_for_d3(bgp_data, target_asn_str=None):
if not bgp_data or 'bgp_state' not in bgp_data or not bgp_data['bgp_state']:
return None
def get_color_for_path(path):
key = '-'.join(map(str, path))
hash_digest = hashlib.md5(key.encode()).hexdigest()
return f"#{hash_digest[:6]}"
def get_color_for_asn(asn):
key = str(asn)
hash_digest = hashlib.md5(key.encode()).hexdigest()
return f"#{hash_digest[6:12]}"
nodes = {}
raw_links = []
source_asns = set()
target_asn = int(target_asn_str) if target_asn_str and target_asn_str.isdigit() else None
for entry in bgp_data.get('bgp_state', []):
path = entry.get('path', [])
if not path:
continue
source_asns.add(path[0])
path_color = get_color_for_path(path)
path_details = {
"community": entry.get("community", []),
"full_path": entry.get("path", []),
"source_id": entry.get("source_id", "Bilinmiyor"),
"target_prefix": entry.get("target_prefix", "Bilinmiyor")
}
for asn in path:
if asn not in nodes:
nodes[asn] = {"id": str(asn), "label": f"AS{asn}", "color": get_color_for_asn(asn), "node_type": "transit"}
for i in range(len(path) - 1):
link_data = {
"source": str(path[i]),
"target": str(path[i+1]),
"color": path_color,
**path_details
}
raw_links.append(link_data)
for asn, node_data in nodes.items():
if asn == target_asn:
node_data["type"] = "target"
elif asn in source_asns:
node_data["type"] = "source"
if not raw_links:
return None
link_counts = defaultdict(int)
for link in raw_links:
key = tuple(sorted((link['source'], link['target'])))
link_counts[key] += 1
processed_links = []
link_group_indices = defaultdict(int)
for link in raw_links:
key = tuple(sorted((link['source'], link['target'])))
total_links_in_group = link_counts[key]
new_link = link.copy()
new_link['total_links'] = total_links_in_group
new_link['link_index'] = link_group_indices[key]
processed_links.append(new_link)
link_group_indices[key] += 1
return {"nodes": list(nodes.values()), "links": processed_links}
def get_country_name(code: str) -> str:
try:
country = pycountry.countries.get(alpha_2=code.upper())
return country.name if country else "Bilinmeyen Ülke"
except Exception as e:
logger.error(f"Ülke ismi alınamadı: {str(e)}")
return "Geçersiz Kod"
def get_reverse_dns(ip):
try:
hostname, _, _ = socket.gethostbyaddr(ip)
return hostname
except socket.herror:
return "PTR kaydı bulunamadı"
except Exception as e:
logger.error(f"Reverse DNS hatası: {str(e)}")
return f"Hata: {str(e)}"
def get_ping_latency(ip, packet_count=4):
try:
command = ["ping", "-c", str(packet_count), ip] if os.name != "nt" else ["ping", "-n", str(packet_count), ip]
result = subprocess.run(command, capture_output=True, text=True, timeout=10, encoding='utf-8', errors='ignore')
if result.returncode == 0:
output = result.stdout
if os.name != "nt":
match = re.search(r"rtt en düşük/ortalama/en yüksek/mdev = ([\d.]+)/([\d.]+)/([\d.]+)/([\d.]+) ms", output)
if match:
return {
"min": f"{match.group(1)} ms",
"avg": f"{match.group(2)} ms",
"max": f"{match.group(3)} ms",
"mdev": f"{match.group(4)} ms"
}
else:
match = re.search(r"Minimum = (\d+ms), Maximum = (\d+ms), Average = (\d+ms)", output)
if match:
return {
"min": match.group(1),
"max": match.group(2),
"avg": match.group(3)
}
return {"error": "Ping sonuçları regex ile eşleşmedi", "raw": output}
else:
return {"error": f"Ping komutu hata kodu döndürdü ({result.returncode}): {result.stderr or result.stdout}", "raw": result.stdout + result.stderr}
except FileNotFoundError:
return {"error": "Ping komutu bulunamadı. PATH ayarınızı veya komutun konumunu kontrol edin."}
except subprocess.TimeoutExpired:
return {"error": "Ping isteği zaman aşımına uğradı"}
except Exception as e:
return {"error": f"Beklenmedik hata: {str(e)}"}
def scan_ports(ip):
if not NMAP_AVAILABLE:
return {"error": "Nmap kütüphanesi kurulu değil"}
try:
nm = nmap.PortScanner()
common_ports = (
"1-1024,1433-1434,1521,1720-1723,2049,2375-2379,3306,3389,"
"5000-5010,5432,5900-5905,6379,8080,8443,9000-9100,69,"
"161-162,2222,5601,9200-9300,27017,11211,10000,"
"8000-8010,8081,8082,8888,4848,9001,8086,9418,25,"
"110,143,993,995,514,1812-1813,2376,2377,4789,"
"10050-10051,9100,5984"
)
scan_results = nm.scan(
hosts=ip,
arguments=f'-T4 -Pn -p {common_ports}'
)
open_ports = []
if ip in nm.all_hosts():
for proto in nm[ip].all_protocols():
for port in nm[ip][proto].keys():
port_info = nm[ip][proto][port]
if port_info['state'] == 'open':
open_ports.append({
'port': port,
'state': port_info['state'],
'name': port_info.get('name', 'bilinmiyor'),
'product': port_info.get('product', ''),
'version': port_info.get('version', ''),
})
return {"ports": open_ports}
except nmap.PortScannerError as e:
logger.error(f"Nmap hatası: {str(e)}")
return {"error": f"Nmap hatası: {str(e)}"}
except Exception as e:
logger.error(f"Port tarama hatası: {str(e)}")
return {"error": f"Port tarama hatası: {str(e)}"}
def get_http_headers(host):
urls = [f"https://{host}", f"http://{host}"]
headers = {'User-Agent': 'Luminet/1.0 (Network Analysis Tool)'}
last_error = None
for url in urls:
try:
response = requests.get(
url,
headers=headers,
timeout=10,
allow_redirects=True,
verify=False
)
history_data = []
for h in response.history:
history_item = {
"url": h.url,
"status_code": h.status_code,
"headers": dict(h.headers)
}
history_data.append(history_item)
return {
"final_url": response.url,
"status_code": response.status_code,
"headers": dict(response.headers),
"history": history_data
}
except requests.exceptions.RequestException as e:
last_error = e
logger.warning(f"HTTP başlık hatası: {str(e)}")
continue
return {"error": f"HTTP/HTTPS bağlantısı kurulamadı: {str(last_error)}"}
def is_public_ip(ip: str) -> bool:
"""Check if an IP address is public/routable."""
try:
ip_obj = ipaddress.ip_address(ip)
return not (ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local or
ip_obj.is_multicast or ip_obj.is_reserved or ip_obj.is_unspecified)
except ValueError:
return False
def get_location(ip: str) -> Optional[Dict]:
"""Get geolocation information for an IP address."""
if not is_public_ip(ip):
return None
try:
response = requests.get(
f"{IP_API_URL}{ip}",
params={"fields": "status,message,continent,continentCode,country,countryCode,region,regionName,city,district,zip,lat,lon,timezone,offset,currency,isp,org,as,asname,reverse,mobile,proxy,hosting,query"},
timeout=5
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
return {
"ip": ip,
"continent": data.get("continent", ""),
"continent_code": data.get("continentCode", ""),
"country": data.get("country", ""),
"country_code": data.get("countryCode", ""),
"country_name": get_country_name(data.get("countryCode", "")),
"region": data.get("region", ""),
"region_name": data.get("regionName", ""),
"city": data.get("city", ""),
"district": data.get("district", ""),
"zip": data.get("zip", ""),
"lat": data.get("lat"),
"lon": data.get("lon"),
"timezone": data.get("timezone", ""),
"offset": data.get("offset", ""),
"currency": data.get("currency", ""),
"isp": data.get("isp", ""),
"org": data.get("org", ""),
"as_number": data.get("as", ""),
"as_name": data.get("asname", ""),
"reverse_dns": data.get("reverse", ""),
"mobile": data.get("mobile", False),
"proxy": data.get("proxy", False),
"hosting": data.get("hosting", False),
}
logger.warning(f"Location lookup failed for {ip}: {data.get('message', 'Unknown error')}")
return None
except Exception as e:
logger.error(f"Failed to get location for {ip}: {str(e)}")
return None
def get_locations_batch(ips: List[str]) -> List[Optional[Dict]]:
"""Batch geolocation lookup for multiple IPs."""
if not ips:
return []
public_ips = [ip for ip in ips if is_public_ip(ip)]
if not public_ips:
return [None] * len(ips)
try:
response = requests.post(
"http://ip-api.com/batch",
json=[{"query": ip, "fields": "status,message,country,countryCode,region,regionName,city,lat,lon,isp,org,as,query"}
for ip in public_ips],
timeout=10
)
if response.status_code == 200:
results = response.json()
# Map results back to original IP list
result_map = {result["query"]: result for result in results if result.get("status") == "success"}
locations = []
for ip in ips:
if ip in result_map:
data = result_map[ip]
locations.append({
"ip": ip,
"lat": data["lat"],
"lon": data["lon"],
"city": data.get("city", ""),
"country": data.get("country", ""),
"country_name": get_country_name(data.get("countryCode", "")),
"isp": data.get("isp", ""),
"org": data.get("org", ""),
"as_number": data.get("as", ""),
"as_name": data.get("asname", ""),
})
else:
locations.append(None)
return locations
except Exception as e:
logger.error(f"Batch location lookup failed: {str(e)}")
# Fallback to individual lookups if batch fails
return [get_location(ip) if is_public_ip(ip) else None for ip in ips]
def system_traceroute(ip: str, max_hops: int = 30, timeout: int = 60) -> List[Dict]:
"""Perform a system traceroute and return hops."""
# Determine OS and appropriate command
if os.name == "nt":
cmd = ["tracert", "-d", "-h", str(max_hops), "-w", "1000", ip]
else:
cmd = ["traceroute", "-n", "-m", str(max_hops), "-w", "1", "-q", "1", ip]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=True
)
hops = []
ttl = 1
for line in proc.stdout.splitlines():
# Skip header lines
if not line.strip() or line.startswith(("traceroute", "tracert", "Tracing")):
continue
# Parse line based on OS
if os.name == "nt":
# Windows tracert format
parts = line.strip().split()
if len(parts) >= 3 and parts[1].replace(".", "").isdigit():
hop_ip = parts[1]
hops.append({"ip": hop_ip, "ttl": ttl})
ttl += 1
else:
# Unix traceroute format
parts = line.strip().split()
if len(parts) >= 2 and parts[1] != "*":
hop_ip = parts[1]
# Handle cases where IP is in parentheses
if "(" in hop_ip and ")" in hop_ip:
hop_ip = hop_ip[hop_ip.find("(")+1:hop_ip.find(")")]
hops.append({"ip": hop_ip, "ttl": ttl})
ttl += 1
return hops
except subprocess.TimeoutExpired:
logger.warning(f"Traceroute timed out after {timeout} seconds")
return []
except subprocess.CalledProcessError as e:
logger.error(f"Traceroute failed with error: {e.stderr}")
return []
except Exception as e:
logger.error(f"Unexpected traceroute error: {str(e)}")
return []
def enriched_traceroute(target_ip: str, max_hops: int = 30) -> List[Dict]:
"""Perform traceroute with geolocation information for each hop."""
# First get all hops
hops = system_traceroute(target_ip, max_hops=max_hops)
if not hops:
return []
# Extract IPs for batch lookup
hop_ips = [hop["ip"] for hop in hops]
# Get locations in batch if possible
locations = get_locations_batch(hop_ips)
# Enrich hop data with location information
enriched_hops = []
for hop, location in zip(hops, locations):
enriched_hop = {
"ip": hop["ip"],
"ttl": hop["ttl"],
"location": location
}
enriched_hops.append(enriched_hop)
return enriched_hops
def get_asn_from_ip(ip):
try:
r = requests.get(f"{RIPESTAT_DATA_URL}prefix-overview/data.json?resource={ip}", timeout=5)
if r.status_code == 200:
data = r.json()
asns = data.get("data", {}).get("asns", [])
if asns:
return str(asns[0]["asn"])
return None
except Exception as e:
logger.error(f"ASN alınamadı: {str(e)}")
return None
def get_asn_path_locations(asn):
try:
url = f"https://api.bgpview.io/asn/{asn}/prefixes"
r = requests.get(url, timeout=10)
if r.status_code == 200:
data = r.json()
prefixes = data.get("data", {}).get("ipv4_prefixes", []) + data.get("data", {}).get("ipv6_prefixes", [])
countries = [p.get("country_code") for p in prefixes if p.get("country_code")]
return Counter(countries)
return Counter()
except Exception as e:
logger.error(f"BGPView ASN path hatası: {str(e)}")
return Counter()
def lookup_coords_osm(country_code):
try:
if not country_code:
return None, None
country_name = get_country_name(country_code)
if not country_name:
return None, None
params = {"q": country_name, "format": "json", "limit": 1}
headers = {"User-Agent": "Luminet/1.0"}
r = requests.get(OSM_NOMINATIM_URL, params=params, headers=headers, timeout=5)
r.raise_for_status()
results = r.json()
if results:
return float(results[0]["lat"]), float(results[0]["lon"])
return None, None
except Exception as e:
logger.error(f"[OSM Lookup Hatası] {country_code}: {str(e)}")
return None, None
def get_asn_map_data(ip):
asn = get_asn_from_ip(ip)
if not asn:
return (None, [])
location_counts = get_asn_path_locations(asn)
map_points = []
for country_code, count in location_counts.most_common(15):
lat, lon = lookup_coords_osm(country_code)
if lat and lon:
map_points.append({
"loc": country_code,
"count": count,
"lat": lat,
"lon": lon,
"country_name": get_country_name(country_code)
})
return (asn, map_points)
def parse_vcard(vcard_array):
if not isinstance(vcard_array, list) or len(vcard_array) < 2:
return {}
vcard_data = vcard_array[1]
contact = {
"name": "Bilinmiyor",
"org": "Bilinmiyor",
"email": "Bilinmiyor",
"address": "Bilinmiyor",
"tel": "Bilinmiyor"
}
for item in vcard_data:
if len(item) < 4:
continue
prop, params, p_type, value = item[0], item[1], item[2], item[3]
if prop == "fn":
contact["name"] = value
elif prop == "org":
contact["org"] = value
elif prop == "email":
contact["email"] = value
elif prop == "tel":
contact["tel"] = value
elif prop == "adr":
address_label = params.get("label", "")
if address_label:
contact["address"] = address_label.replace("\r\n", ", ").replace("\n", ", ")
elif isinstance(value, list):
contact["address"] = ", ".join(filter(None, value))
return contact
def run_mtr_analysis(ip):
if not ip or not isinstance(ip, str):
return {"error": "Geçersiz IP adresi veya hostname."}
try:
ipaddress.ip_address(ip)
except ValueError:
pass
command = [
"mtr",
"-r",
"-c", "10",
"--json",
ip
]
result = {"ip": ip, "timestamp": datetime.now().isoformat()}
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
stdout, stderr = process.communicate(timeout=60)
if process.returncode != 0:
error_msg = stderr.strip()
if "Name or service not known" in error_msg or "Temporary failure in name resolution" in error_msg:
user_error = f"Hedef hostname çözümlenemedi: {ip}. Lütfen doğru bir IP veya hostname girin."
elif "No such device" in error_msg or "Cannot find device" in error_msg:
user_error = "Ağ arayüzü hatası: MTR gerekli ağ cihazını bulamadı."
elif "Operation not permitted" in error_msg or "Permission denied" in error_msg:
user_error = "İzin hatası: MTR çalıştırmak için yetki gerekli."
else:
user_error = "MTR analizi başarısız oldu."
logger.error(f"MTR command failed for {ip}: {error_msg} (Exit code: {process.returncode})")
result.update({
"error": user_error,
"details": error_msg,
"raw_output": stdout,
"raw_error": stderr
})
return result
if not stdout.strip():
logger.error(f"MTR returned empty output for {ip}.")
result["error"] = "MTR boş sonuç döndürdü. Hedef erişilemiyor olabilir veya MTR çıktısı beklenenden farklı."
result["raw_output"] = stdout
result["raw_error"] = stderr
return result
try:
mtr_data = json.loads(stdout)
except json.JSONDecodeError as e:
logger.error(f"MTR JSON decoding error for {ip}: {e}. Raw stdout: {stdout}")
result.update({
"error": "MTR çıktısı çözümlenemedi. Biçim hatası.",
"details": f"JSON çözümleme hatası: {e}",
"raw_output": stdout,
"raw_error": stderr
})
return result
report = mtr_data.get('report', {})
hubs = report.get('hubs', [])
cleaned_hops = []
for idx, hop in enumerate(hubs, 1):
host = hop.get('host', '???').strip()
is_private = False
try:
if host != '???':
ip_obj = ipaddress.ip_address(host)
is_private = ip_obj.is_private
except ValueError:
pass
loss = float(hop.get('Loss%', 0.0))
avg = float(hop.get('Avg', 0.0))
best = float(hop.get('Best', 0.0))
worst = float(hop.get('Worst', 0.0))
last = float(hop.get('Last', 0.0))
snt = int(hop.get('Snt', 0))
cleaned_hops.append({
'count': idx,
'host': host,
'loss': loss,
'avg': avg,
'best': best,
'worst': worst,
'last': last,
'is_private': is_private,
'packets_sent': snt
})
overall_packet_loss = float(report.get('loss', 0.0))
if not hubs and overall_packet_loss == 0.0 and snt == 0:
overall_packet_loss = 100.0 if "No route to host" in stderr or "Host unreachable" in stderr else 0.0
result.update({
"hops": cleaned_hops,
"destination": report.get('dst', ip).strip(),
"packet_loss": overall_packet_loss,
"total_hops": len(cleaned_hops)
})
except subprocess.TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
error_msg = f"MTR analizi zaman aşımına uğradı ({ip})"
logger.error(error_msg)
result.update({
"error": "MTR analizi zaman aşımına uğradı. Hedefe ulaşmak çok uzun sürdü veya ağ engellendi.",
"details": "Analiz 60 saniyeden uzun sürdü.",
"raw_output": stdout,
"raw_error": stderr
})
except FileNotFoundError:
logger.error("MTR komutu bulunamadı. Lütfen sisteminizde 'mtr' yüklü olduğundan emin olun.")
result.update({
"error": "MTR komutu bulunamadı. Sunucuda 'mtr' kurulu değil.",
"details": "Lütfen sunucunuzda MTR'ı kurun."
})
except PermissionError as e:
logger.error(f"MTR çalıştırmak için izin hatası: {e}")
result.update({
"error": "MTR çalıştırmak için yetki hatası.",
"details": f"MTR komutunu çalıştırmak için gerekli izinler yok. Hata: {e}"
})
except Exception as e:
logger.error(f"MTR beklenmedik hata için {ip}: {e}")
result.update({
"error": "Beklenmedik bir hata oluştu.",
"details": str(e)
})
return result
def get_bgpview_prefix_details(ip):
"""
BGPView API'sini kullanarak bir IP adresinin ait olduğu prefix hakkında
detaylı bilgi alır (prefix, isim, açıklama, ülke).
"""
try:
url = f"https://api.bgpview.io/ip/{ip}"
response = robust_get_request(url, timeout=5)
if response and response.status_code == 200:
data = response.json().get("data", {})
# Birden fazla prefix olabilir, en spesifik olanı (en uzun) alalım.
longest_prefix = max(data.get("prefixes", []), key=lambda p: p.get("prefix", "").split('/')[1], default=None)
if longest_prefix:
return {
"prefix": longest_prefix.get("prefix"),
"name": longest_prefix.get("name"),
"description": longest_prefix.get("description"),
"country_code": longest_prefix.get("country_code")
}
return None
except Exception as e:
logger.error(f"BGPView prefix detayları alınamadı: {str(e)}")
return None
def get_peeringdb_info(asn):
"""
PeeringDB API'sini kullanarak bir ASN hakkında temel bilgileri
(isim, web sitesi, trafik türü, anons edilen prefix sayısı) alır.
"""
if not asn:
return None
try:
url = f"https://www.peeringdb.com/api/net?asn={asn}"
response = robust_get_request(url, timeout=7)
if response and response.status_code == 200:
data = response.json().get("data", [])
if data:
net_info = data[0]
return {
"name": net_info.get("name"),
"website": net_info.get("website"),
"traffic_type": net_info.get("info_traffic"),
"prefix_count_ipv4": net_info.get("info_prefixes4"),
"prefix_count_ipv6": net_info.get("info_prefixes6"),
"policy": net_info.get("policy_general")
}
return None
except Exception as e:
logger.error(f"PeeringDB bilgisi alınamadı: {str(e)}")
return None
def parse_rdap_response(data):
"""
Ham RDAP JSON yanıtını ayrıştırarak ön yüzde kullanılacak
yapılandırılmış bir sözlük oluşturur.
"""
parsed_info = {
"summary": {},
"contacts": {
"abuse": [],
"technical": [],
"administrative": [],
"registrant": [],
"other": []
},
"details": {},
"source_rir": "Bilinmiyor",
}
# RIR kaynağını (RIPE, ARIN vb.) belirle
if "port43" in data:
port43_val = data["port43"].lower()
rir_mapping = {
"apnic": "APNIC", "lacnic": "LACNIC", "afrinic": "AfriNIC",
"arin": "ARIN", "ripe": "RIPE NCC",
}
parsed_info["source_rir"] = next((v for k, v in rir_mapping.items() if k in port43_val), "Bilinmiyor")
# Temel özet bilgileri
summary = {
"name": data.get("name", "Bilinmiyor"),
"handle": data.get("handle", "Bilinmiyor"),
"country": data.get("country", "Bilinmiyor"),
"ip_range": f"{data.get('startAddress', '')} - {data.get('endAddress', '')}",
"asn_range": f"{data.get('startAutnum', '')} - {data.get('endAutnum', '')}",
"type": data.get("type", "Bilinmiyor").title(),
}
if summary["ip_range"] == " - ": summary.pop("ip_range", None)
if summary["asn_range"] == " - ": summary.pop("asn_range", None)
parsed_info["summary"] = summary
# İletişim bilgilerini ayrıştır
for entity in data.get("entities", []):
vcard_array = entity.get("vcardArray")
if not vcard_array:
continue
contact_details = parse_vcard(vcard_array)
roles = entity.get("roles", [])
# Rolleri uygun kategorilere ata
assigned = False
for role in ["registrant", "administrative", "technical", "abuse"]:
if role in roles:
parsed_info["contacts"][role].append(contact_details)
assigned = True
if not assigned:
parsed_info["contacts"]["other"].append(contact_details)
parsed_info["details"] = {
"events": data.get("events", []),
"remarks": data.get("remarks", []),
"links": data.get("links", []),
"notices": data.get("notices", []),
"object_class_name": data.get("objectClassName", "Bilinmiyor"),
}
return parsed_info
def get_authoritative_rdap_url(query):
query = query.strip().upper().replace("AS", "")
is_ip = "." in query or ":" in query
rdap_type = "ip" if is_ip else "autnum"
bootstrap_url = IANA_IP_BOOTSTRAP_URL if is_ip else IANA_ASN_BOOTSTRAP_URL
try:
r = robust_get_request(f"{bootstrap_url}{query}", timeout=10, headers={"Accept": "application/rdap+json"})
if r.status_code == 501:
raise requests.exceptions.HTTPError("IANA RDAP not implemented", response=r)
r.raise_for_status()
iana_data = r.json()
for link in iana_data.get("links", []):
if link.get("rel") == "related":
return link.get("href")
logger.warning("IANA RDAP sonucu var ama yönlendirme linki bulunamadı.")
except requests.exceptions.HTTPError as http_err:
if http_err.response.status_code == 501:
logger.warning(f"IANA RDAP not implemented, trying fallback RIRs")
else:
logger.warning(f"IANA RDAP URL alınamadı: {str(http_err)}")
except requests.exceptions.RequestException as e:
logger.warning(f"IANA RDAP bağlantısı başarısız: {str(e)}")
fallback_rir_urls = {
"ripe": f"https://rdap.db.ripe.net/{rdap_type}/{query}",
"apnic": f"https://rdap.apnic.net/{rdap_type}/{query}",
"lacnic": f"https://rdap.lacnic.net/rdap/{rdap_type}/{query}",
"afrinic": f"https://rdap.afrinic.net/rdap/{rdap_type}/{query}",
"arin": f"https://rdap.arin.net/registry/{rdap_type}/{query}"
}
for rir, url in fallback_rir_urls.items():
try:
r = robust_get_request(url, timeout=10, headers={"Accept": "application/rdap+json"})
if r.status_code == 200:
logger.info(f"{rir.upper()} RDAP kaynağı kullanıldı: {url}")
return url
except requests.exceptions.RequestException:
continue
logger.warning(f"Tüm RDAP kaynakları başarısız oldu: {query}")
return None
def get_additional_info_from_ripestat(query):
additional_data = {}
endpoints = {
"announced-prefixes": "Anons Edilen Prefixler",
"routing-status": "Yönlendirme Durumu",
"bgp-state": "BGP Durumu"
}
for endpoint, title in endpoints.items():
try:
r = requests.get(f"{RIPESTAT_DATA_URL}{endpoint}/data.json",
params={"resource": query},
timeout=7)
if r.status_code == 200:
additional_data[title] = r.json().get("data", {})
else:
additional_data[title] = {"error": f"HTTP {r.status_code}: Veri alınamadı"}
except requests.exceptions.RequestException as e:
additional_data[title] = {"error": f"İstek hatası: {str(e)}"}
except Exception as e:
additional_data[title] = {"error": f"Beklenmedik hata: {str(e)}"}
return additional_data
def get_ipinfo_details(ip):
if not re.match(r"^\d{1,3}(?:\.\d{1,3}){3}$", ip) and ":" not in ip:
return None
try:
r = requests.get(f"{IPINFO_URL}{ip}/json", timeout=5)
if r.status_code == 200:
data = r.json()
country_code = data.get('country', '')
data['country_name'] = get_country_name(country_code)
return data
return None
except requests.exceptions.RequestException as e:
logger.warning(f"IPinfo bilgileri alınamadı: {str(e)}")
return None
def analyze_ssl(hostname, port=443):
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.minimum_version = ssl.TLSVersion.TLSv1_2
with socket.create_connection((hostname, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssl_sock:
cert_bin = ssl_sock.getpeercert(binary_form=True)
cipher = ssl_sock.cipher()
x509 = crypto.load_certificate(
crypto.FILETYPE_ASN1,
cert_bin
)
issuer = x509.get_issuer()
subject = x509.get_subject()
not_after_naive = datetime.strptime(x509.get_notAfter().decode('utf-8'), '%Y%m%d%H%M%SZ')
not_before_naive = datetime.strptime(x509.get_notBefore().decode('utf-8'), '%Y%m%d%H%M%SZ')
not_after = not_after_naive.replace(tzinfo=timezone.utc)
not_before = not_before_naive.replace(tzinfo=timezone.utc)
days_valid = (not_after - datetime.now(timezone.utc)).days
return {
"issuer": {k.decode(): v.decode() for k, v in issuer.get_components()},
"subject": {k.decode(): v.decode() for k, v in subject.get_components()},
"valid_from": not_before.strftime("%Y-%m-%d"),
"valid_until": not_after.strftime("%Y-%m-%d"),
"days_remaining": days_valid,
"protocol": ssl_sock.version(),
"cipher": {
"name": cipher[0],
"version": cipher[1],
"bits": cipher[2]
} if cipher else None,
"serial_number": str(x509.get_serial_number()),
"signature_algorithm": x509.get_signature_algorithm().decode('utf-8'),
"is_valid": days_valid > 0,
"common_name": dict(subject.get_components()).get(b'CN', b'').decode('utf-8'),
"organization": dict(subject.get_components()).get(b'O', b'').decode('utf-8'),
"alt_names": _get_alt_names(x509)
}
except ssl.SSLError as e:
logger.error(f"SSL hatası ({hostname}:{port}): {str(e)}")
return {"error": f"SSL hatası: {str(e)}"}
except socket.timeout:
logger.error(f"Zaman aşımı ({hostname}:{port})")
return {"error": "Bağlantı zaman aşımı"}
except Exception as e:
logger.error(f"Beklenmedik hata ({hostname}:{port}): {str(e)}")
return {"error": f"Beklenmedik hata: {str(e)}"}
def _get_alt_names(x509):
alt_names = []
for i in range(x509.get_extension_count()):
ext = x509.get_extension(i)
if ext.get_short_name() == b'subjectAltName':
alt_names.extend([name.strip() for name in str(ext).split(',')])
return alt_names
def generate_security_report(ipinfo_data, http_headers, port_scan, dnsbl_results, ip):
report = {
"proxy_vpn_tor": False,
"anonymity_services": [],
"open_ports": [],
"security_headers": [],
"server_info": {},
"security_issues": []
}
report["anonymity_services"] = check_anonymity_services(ip)
if report["anonymity_services"]:
report["proxy_vpn_tor"] = True
detected_str = ", ".join(report["anonymity_services"])
report["security_issues"].append(
f"Anonim ağ kullanımı tespit edildi: {detected_str}"
)
if ipinfo_data:
if ipinfo_data.get("proxy"):
if "Proxy" not in report["anonymity_services"]:
report["anonymity_services"].append("Proxy")
if ipinfo_data.get("vpn"):
if "VPN" not in report["anonymity_services"]:
report["anonymity_services"].append("VPN")
if ipinfo_data.get("tor"):
if "Tor" not in report["anonymity_services"]:
report["anonymity_services"].append("Tor")
if dnsbl_results:
for category, listings in dnsbl_results.items():
for entry in listings:
if entry.get('response'):
category_map = {
'spam': 'Spam',
'proxy_bot': 'Proxy/Bot',
'anonymity': 'Anonim Servis'
}
service_name = category_map.get(category, category.capitalize())
if service_name not in report["anonymity_services"]:
report["anonymity_services"].append(service_name)
if port_scan and "ports" in port_scan:
report["open_ports"] = port_scan["ports"]
critical_ports = {21, 22, 23, 25, 53, 80, 110, 139, 143, 443, 445, 3389, 8080}
open_critical = [p for p in report["open_ports"] if p["port"] in critical_ports]
if open_critical:
report["security_issues"].append(f"{len(open_critical)} kritik port açık bulundu")
if http_headers and not http_headers.get("error") and "headers" in http_headers:
headers = {k.lower(): v for k, v in http_headers["headers"].items()}
security_headers_check = {
"Content-Security-Policy": "content-security-policy",
"Strict-Transport-Security": "strict-transport-security",
"X-Frame-Options": "x-frame-options",
"X-Content-Type-Options": "x-content-type-options",
"X-XSS-Protection": "x-xss-protection"
}
report["security_headers"] = [
display_name for display_name, real_name in security_headers_check.items()
if real_name in headers
]
missing_headers = [
display_name for display_name, real_name in security_headers_check.items()
if real_name not in headers
]
if missing_headers:
report["security_issues"].append(f"{len(missing_headers)} güvenlik başlığı eksik")
return report
def check_anonymity_services(ip):
anonymity_services = []
proxy_lists = [
'proxy.dnsbl.sorbs.net',
'socks.dnsbl.sorbs.net',
'http.dnsbl.sorbs.net',
'tor.dan.me.uk',
'torexit.dan.me.uk',
'dnsbl.tornevall.org',
'rbl.efnetrbl.org',
'noptr.spamrats.com',
'dyna.spamrats.com',
'bl.blocklist.de',
'dnsbl.dronebl.org'
]
try:
if '.' in ip:
reversed_ip = '.'.join(reversed(ip.split('.')))
else:
reversed_ip = '.'.join(reversed(ip.split(':'))).replace(':', '')
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8', '1.1.1.1']
resolver.timeout = 3
resolver.lifetime = 3
for dnsbl in proxy_lists:
query = f"{reversed_ip}.{dnsbl}"
try:
answers = resolver.resolve(query, 'A')
if answers:
service_map = {
'sorbs': 'Proxy',
'dan': 'Tor',
'tornevall': 'VPN',
'efnetrbl': 'IRC Proxy',
'spamrats': 'Spam Bot',
'blocklist': 'Hack Saldırısı',
'dronebl': 'Zombie/Botnet'
}
service_name = next(
(service_map[key] for key in service_map if key in dnsbl),
dnsbl.split('.')[-2].capitalize()
)
if service_name not in anonymity_services:
anonymity_services.append(service_name)
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.Timeout):
continue
except Exception as e:
logger.warning(f"DNSBL query error for {dnsbl}: {str(e)}")
return anonymity_services
except Exception as e:
logger.error(f"Anonimlik kontrol hatası: {str(e)}")
return []
def get_ip_classification(ip):
try:
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_private:
return "Private", "Özel ağ kullanımı (LAN)"
elif ip_obj.is_multicast:
return "Multicast", "Çoklu yayın ağı"
elif ip_obj.is_global:
return "Public", "Genel internet"
else:
return "Special", "Özel kullanım"
except ValueError:
return "Invalid", "Geçersiz IP adresi"
def robust_get_request(url, retries=3, backoff_factor=0.5, **kwargs):
headers = kwargs.get('headers', {})
headers.setdefault('User-Agent', 'Luminet/1.0 (Network Analysis Tool)')
kwargs['headers'] = headers
for i in range(retries):
try:
response = requests.get(url, **kwargs)
response.raise_for_status()
return response
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as e:
if i == retries - 1:
raise e
wait_time = backoff_factor * (2 ** i)
time.sleep(wait_time)
return None
# Rotalar
@app.route("/", methods=["GET", "POST"])
def index():
# Başlangıç değişkenlerini tanımla
query = request.form.get("query", "").strip() if request.method == "POST" else ""
main_data, additional_data, ipinfo_data, raw_json_str, error = None, None, None, None, None
traceroute_locations, asn_map_data, origin_asn = None, None, None
reverse_dns, ping_data, port_scan, http_headers, ssl_info = None, None, None, None, None
dnsbl_results, security_report, bgp_graph_data = None, None, None
bgpview_prefix, mtr_data, peeringdb_data = None, None, None
# Yeni DNS değişkenleri
dns_records, authoritative_dns, dnssec_status = None, None, None
if request.method == "POST" and query:
try:
# Alan adı veya IP'yi çözümle
ip_to_query = query
is_domain = False
try:
addr_info = socket.getaddrinfo(query, None)
ip_to_query = addr_info[0][4][0]
is_domain = True
except socket.gaierror:
pass # Bu bir IP adresi olabilir
except Exception as e:
logger.error(f"DNS çözümleme hatası: {str(e)}")
# RDAP sorgusu ile başla
authoritative_url = get_authoritative_rdap_url(ip_to_query)
if authoritative_url:
try:
r = robust_get_request(authoritative_url, timeout=15, headers={"Accept": "application/rdap+json"})
raw_data = r.json()
main_data = parse_rdap_response(raw_data)
ip_class, ip_class_desc = get_ip_classification(ip_to_query)
if main_data:
main_data["summary"]["ip_class"] = ip_class
main_data["summary"]["ip_class_desc"] = ip_class_desc
raw_json_str = json.dumps(raw_data, indent=2, ensure_ascii=False)
except requests.exceptions.HTTPError as http_err:
error = f"API Hatası (Kod: {http_err.response.status_code}): '{query}' için kayıt bulunamadı."
except Exception as e:
error = f"RDAP verisi alınamadı: {str(e)}"
else:
error = f"Yetkili RDAP sunucusu bulunamadı: {query}"
if error:
logger.error(error)
# RDAP'de hata yoksa, diğer analizleri paralel olarak yap
if not error:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# Paralel çalışacak görevleri tanımla
tasks = {
'ipinfo': executor.submit(get_ipinfo_details, ip_to_query),
'ripestat': executor.submit(get_additional_info_from_ripestat, ip_to_query),
'reverse_dns': executor.submit(get_reverse_dns, ip_to_query),
'ping': executor.submit(get_ping_latency, ip_to_query),
'dnsbl': executor.submit(check_dnsbl, ip_to_query),
'mtr': executor.submit(run_mtr_analysis, ip_to_query),
'bgpview': executor.submit(get_bgpview_prefix_details, ip_to_query),
'traceroute': executor.submit(enriched_traceroute, ip_to_query)
}
# Sonuçları topla
results = {name: future.result() for name, future in tasks.items()}
ipinfo_data = results.get('ipinfo')
additional_data = results.get('ripestat')
reverse_dns = results.get('reverse_dns')
ping_data = results.get('ping')
mtr_data = results.get('mtr')
dnsbl_results = results.get('dnsbl')
bgpview_prefix = results.get('bgpview')
traceroute_locations = results.get('traceroute')
# Alan adıysa DNS kayıtlarını al
if is_domain:
try:
dns_records = get_dns_records(query)
authoritative_dns = get_authoritative_dns(query)
dnssec_status = check_dnssec(query)
except Exception as e:
logger.error(f"DNS sorgu hatası: {str(e)}")
# Hata durumunda boş döndür
dns_records, authoritative_dns, dnssec_status = {}, None, "Hata"
# ASN'yi ipinfo'dan al, yoksa RIPEstat'tan dene
if ipinfo_data and ipinfo_data.get('asn'):
origin_asn = ipinfo_data['asn'].get('asn', '').replace("AS", "")
else:
origin_asn = get_asn_from_ip(ip_to_query)
# ASN bilgisiyle PeeringDB'yi sorgula (bu sıralı olmalı)
peeringdb_data = get_peeringdb_info(origin_asn)
# Sıralı çalışması gereken diğer görevler
target_host = query if is_domain else ip_to_query
http_headers = get_http_headers(target_host)
if http_headers and not http_headers.get("error"):
ssl_info = analyze_ssl(target_host)
if NMAP_AVAILABLE:
port_scan = scan_ports(ip_to_query)
else:
port_scan = {"error": "Nmap kurulu değil"}
_, asn_map_data = get_asn_map_data(ip_to_query)
security_report = generate_security_report(
ipinfo_data, http_headers, port_scan, dnsbl_results, ip_to_query
)
if additional_data and 'BGP Durumu' in additional_data:
bgp_graph_data = process_bgp_data_for_d3(additional_data['BGP Durumu'], origin_asn)
except requests.exceptions.ConnectionError as conn_err:
error = f"Bağlantı hatası: {str(conn_err)}"
logger.error(error)
except Exception as e:
error = f"Beklenmedik bir hata oluştu: {str(e)}"
logger.error(f"Ana işlem hatası: {str(e)}", exc_info=True)
elif request.method == "POST" and not query:
error = "Lütfen bir IP adresi veya alan adı girin."
# Tüm toplanan verileri şablona gönder
return render_template(
"index.html",
query=query,
data=main_data,
additional_data=additional_data,
ipinfo_data=ipinfo_data,
raw_json=raw_json_str,
error=error,
nmap_available=NMAP_AVAILABLE,
reverse_dns=reverse_dns,
ping_data=ping_data,
port_scan=port_scan,
http_headers=http_headers,
ssl_info=ssl_info,
traceroute_locations=traceroute_locations,
asn_map_data=asn_map_data,
security_report=security_report,
bgp_graph_data=bgp_graph_data,
origin_asn=origin_asn,
bgpview_prefix=bgpview_prefix,
mtr_data=mtr_data,
peeringdb_data=peeringdb_data,
dns_records=dns_records,
authoritative_dns=authoritative_dns,
dnssec_status=dnssec_status
)
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
app.run(host="0.0.0.0", port=port, debug=False)