Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor | |
| from pathlib import Path | |
| from typing import Any | |
| import cachetools | |
| import gradio as gr | |
| import requests | |
| import urllib3 | |
| from dns import message | |
| _DNS_SERVER = "https://dns.google/dns-query" # can use others | |
| _DNS_RECORD_TYPES = [ | |
| "A", | |
| "AAAA", | |
| "CNAME", | |
| "MX", | |
| "NS", | |
| "SOA", | |
| "TXT", | |
| "RP", | |
| "LOC", | |
| "CAA", | |
| "SPF", | |
| "SRV", | |
| "NSEC", | |
| "RRSIG", | |
| ] | |
| _COMMON_SUBDOMAINS_TXT_PATH = Path("./subdomains/subdomains.txt") | |
| _CACHE_MAX_SIZE = 4096 | |
| _CACHE_TTL_SECONDS = 3600 | |
| def get_geolocation(ip: str) -> dict[str, Any] | str: | |
| """Get location information from an ip address. | |
| Returns the following information on an ip address: | |
| 1. IPv4 | |
| 2. city | |
| 4. country_code | |
| 5. country_name | |
| 6. latitude | |
| 7. longitude | |
| 8. postal | |
| 9. state | |
| Example: | |
| >>> from pprint import pprint | |
| >>> pprint(get_location("103.100.104.0")) | |
| ... {'IPv4': '103.100.104.0', | |
| 'city': None, | |
| 'country_code': 'NZ', | |
| 'country_name': 'New Zealand', | |
| 'latitude': -41, | |
| 'longitude': 174, | |
| 'postal': None, | |
| 'state': None} | |
| Args: | |
| ip: ip address | |
| Returns: | |
| Location information on the ip address. | |
| """ | |
| try: | |
| return requests.get( | |
| f"https://geolocation-db.com/json/{ip.strip()}", | |
| timeout=1, | |
| ).json() | |
| except Exception as e: # noqa: BLE001 | |
| return str(e) | |
| def _request_dns_record( # noqa: D417 | |
| domain: str, | |
| record_type: str, | |
| timeout: float = 0.5, | |
| ) -> list[str]: | |
| """Utility to build dns resolve requests that do not use port 53. | |
| Args: | |
| domain: domain to investigate | |
| record_type: record type | |
| Returns: | |
| Information about the dns record type for the domain. | |
| """ | |
| q = message.make_query(domain, record_type) | |
| response = requests.post( | |
| _DNS_SERVER, | |
| headers={ | |
| "Content-Type": "application/dns-message", | |
| "Accept": "application/dns-message", | |
| }, | |
| data=q.to_wire(), | |
| verify=True, | |
| timeout=timeout, | |
| ) | |
| dns_message = message.from_wire(response.content) | |
| return [str(rdata) for rdata in dns_message.answer[0]] if dns_message.answer else [] | |
| # see: https://thepythoncode.com/article/dns-enumeration-with-python | |
| # https://dnspython.readthedocs.io | |
| def enumerate_dns(domain_name: str) -> dict[str, Any] | None: | |
| r"""Enumerates information about a specific domain's DNS configuration. | |
| Information collected about the domain name: | |
| 1. A records: the IPv4 associated with the domain | |
| 2. AAAA records: the IPv6 associated with the domain | |
| 3. CAA records: used by owners to specify which Certificate Authorities | |
| are authorized to issue SSL/TLS certificates for their domains. | |
| 4. CNAME records: alias of one name to another - the DNS lookup will | |
| continue by retrying the lookup with the new name. | |
| 5. LOC records: geographic location associated with a domain name. | |
| 6. MX records: associated email servers to the domain. | |
| 7. NS records: DNS servers that are authoritative for a particular domain. | |
| These may be use to inquire information about the domain. | |
| 8. SOA records: defines authoritative information about a DNS zone, | |
| including zone transfers and cache expiration. | |
| 9. TXT records: used for domain verification and email security. | |
| 10. RP records: the responsible person for a domain. | |
| 11. SPF records: defines authorized email servers. | |
| 12. SRV records: specifies location of specific services | |
| (port and host) for the domain. | |
| 14. NSEC records: proves non-existence of DNS records | |
| and prevents zone enumeration. | |
| 15. RRSIG records: contains cryptographic signatures for DNSSEC-signed | |
| records, providing authentication and integrity. | |
| Example: | |
| >>> from pprint import pprint | |
| >>> pprint(enumerate_dns("youtube.com")) | |
| ... {'A': 'youtube.com. 300 IN A 142.250.200.142', | |
| 'AAAA': 'youtube.com. 286 IN AAAA 2a00:1450:4003:80f::200e', | |
| 'CAA': 'youtube.com. 14352 IN CAA 0 issue "pki.goog"', | |
| 'CNAME': None, | |
| 'LOC': None, | |
| 'MX': 'youtube.com. 300 IN MX 0 smtp.google.com.', | |
| 'NS': 'youtube.com. 21600 IN NS ns4.google.com.\n' | |
| 'youtube.com. 21600 IN NS ns1.google.com.\n' | |
| 'youtube.com. 21600 IN NS ns2.google.com.\n' | |
| 'youtube.com. 21600 IN NS ns3.google.com.', | |
| 'NSEC': None, | |
| 'RP': None, | |
| 'RRSIG': None, | |
| 'SOA': 'youtube.com. 60 IN SOA ns1.google.com. dns-admin.google.com. ' | |
| '766113658 900 900 1800 60', | |
| 'SPF': None, | |
| 'SRV': None, | |
| 'TXT': 'youtube.com. 3586 IN TXT "v=spf1 include:google.com mx -all"\n' | |
| 'youtube.com. 3586 IN TXT ' | |
| '"facebook-domain-verification=64jdes7le4h7e7lfpi22rijygx58j1"\n' | |
| 'youtube.com. 3586 IN TXT ' | |
| '"google-site-verification=QtQWEwHWM8tHiJ4s-jJWzEQrD_fF3luPnpzNDH-Nw-w"'} | |
| Args: | |
| domain_name: domain name for which to | |
| enumerate the DNS configuration. | |
| Returns: | |
| The domain's DNS configuration. | |
| """ | |
| enumeration = {} | |
| for record_type in _DNS_RECORD_TYPES: | |
| try: | |
| record = _request_dns_record(domain_name.strip(), record_type, timeout=1) | |
| if record: | |
| enumeration[record_type] = record | |
| except Exception as e: # noqa: BLE001, PERF203 | |
| enumeration[record_type] = [str(e)] | |
| return enumeration if enumeration else None | |
| def resolve_subdomain_ipv4(domain: str) -> str | None: | |
| """Resolve the IPv4 address of a domain. | |
| Args: | |
| domain: domain name | |
| Returns: | |
| The domain is returned provided | |
| it was resolved. Otherwise nothing | |
| is returned. | |
| """ | |
| try: | |
| ipv4 = _request_dns_record(domain, "A", timeout=0.6) | |
| if ipv4: | |
| return domain | |
| msg = "Cannot resolve it: it is likely non-existing" | |
| raise Exception(msg) # noqa: TRY002, TRY301 | |
| except Exception: # noqa: BLE001 | |
| return None | |
| def scrap_subdomains_for_domain(domain_name: str) -> list[str]: | |
| """Retrieves subdomains associated to a domain if any. | |
| The information retrieved from a domain is its subdomains | |
| provided they are the top 1000 subdomain prefixes as | |
| indicated by https://github.com/rbsec/dnscan/tree/master | |
| Importantly, it finds subdomains only if their prefixes | |
| are along the top 1000 most common. Hence, it may not | |
| yield all the subdomains associated to the domain. | |
| Example: | |
| >>> scrap_subdomains_for_domain("github.com") | |
| ... ['www.github.com', 'smtp.github.com', 'ns1.github.com', | |
| 'ns2.github.com','autodiscover.github.com', 'test.github.com', | |
| 'blog.github.com', 'admin.github.com', 'support.github.com', | |
| 'docs.github.com', 'shop.github.com', 'wiki.github.com', | |
| 'api.github.com', 'live.github.com', 'help.github.com', | |
| 'jobs.github.com', 'services.github.com', 'de.github.com', | |
| 'cs.github.com', 'fr.github.com', 'ssh.github.com', | |
| 'partner.github.com', 'community.github.com', | |
| 'mailer.github.com', 'training.github.com', ...] | |
| Args: | |
| domain_name: domain name for which to retrieve a | |
| list of subdomains | |
| Returns: | |
| List of subdomains if any. | |
| """ | |
| try: | |
| with open(_COMMON_SUBDOMAINS_TXT_PATH) as file: # noqa: PTH123 | |
| subdomains = [line.strip() for line in file if line.strip()] | |
| except FileNotFoundError: | |
| return [] | |
| potential_subdomains = [ | |
| f"{subdomain}.{domain_name.strip()}" for subdomain in subdomains | |
| ] | |
| with ThreadPoolExecutor(max_workers=None) as executor: | |
| results = executor.map(resolve_subdomain_ipv4, potential_subdomains) | |
| return [domain for domain in results if domain] | |
| def retrieve_ioc_from_threatfox(potentially_ioc: str) -> str: | |
| r"""Retrieves information about a potential IoC from ThreatFox. | |
| It may be used to retrieve information of indicators of compromise | |
| (IOCs) associated with malware, with the infosec community, AV | |
| vendors and cyber threat intelligence providers. | |
| Examples: | |
| >>> retrieve_ioc_from_threatfox("139.180.203.104") | |
| ... { | |
| "query_status": "ok", | |
| "data": [ | |
| { | |
| "id": "12", | |
| "ioc": "139.180.203.104:443", | |
| "threat_type": "botnet_cc", | |
| "threat_type_desc": "Indicator that identifies a botnet command&control...", | |
| "ioc_type": "ip:port", | |
| "ioc_type_desc": "ip:port combination that is used for botnet Command&..., | |
| "malware": "win.cobalt_strike", | |
| "malware_printable": "Cobalt Strike", | |
| "malware_alias": "Agentemis,BEACON,CobaltStrike", | |
| "malware_malpedia": "https:\/\/malpedia.caad.fkie.fraunhofer.de\/...", | |
| "confidence_level": 75, | |
| "first_seen": "2020-12-06 09:10:23 UTC", | |
| "last_seen": null, | |
| "reference": null, | |
| "reporter": "abuse_ch", | |
| "tags": null, | |
| "malware_samples": [ | |
| { | |
| "time_stamp": "2021-03-23 08:18:06 UTC", | |
| "md5_hash": "5b7e82e051ade4b14d163eea2a17bf8b", | |
| "sha256_hash": "b325c92fa540edeb89b95dbfd4400c1cb33599c66859....", | |
| "malware_bazaar": "https:\/\/bazaar.abuse.ch\/sample\/b325c...\/" | |
| }, | |
| ] | |
| } | |
| ] | |
| } | |
| Args: | |
| potentially_ioc: this can be a url, a domain, a hash, | |
| or any other type of IoC. | |
| Returns: | |
| Information of the input as an IoC: threat type, malware type andsamples, | |
| confidence level, first/last seen dates, and more IoC information. | |
| """ | |
| headers = {"Auth-Key": os.environ["THREATFOX_APIKEY"]} | |
| pool = urllib3.HTTPSConnectionPool( | |
| "threatfox-api.abuse.ch", | |
| port=443, | |
| maxsize=50, | |
| headers=headers, | |
| timeout=5, | |
| ) | |
| data = { | |
| "query": "search_ioc", | |
| "search_term": potentially_ioc.strip(), | |
| } | |
| json_data = json.dumps(data) | |
| try: | |
| response = pool.request("POST", "/api/v1/", body=json_data) | |
| return response.data.decode("utf-8", "ignore") | |
| except Exception as e: # noqa: BLE001 | |
| return str(e) | |
| geo_location_tool = gr.Interface( | |
| fn=get_geolocation, | |
| inputs=gr.Textbox(label="ip"), | |
| outputs=gr.JSON(label="Geolocation of IP"), | |
| title="Domain Associated Geolocation Finder", | |
| description="Retrieves the geolocation associated to an input ip address", | |
| theme="default", | |
| examples=["1.0.3.255", "59.34.7.3"], | |
| ) | |
| dns_enumeration_tool = gr.Interface( | |
| fn=enumerate_dns, | |
| inputs=gr.Textbox(label="domain"), | |
| outputs=gr.JSON(label="DNS records"), | |
| title="DNS record enumerator of domains", | |
| description="Retrieves several dns record types for the input domain names", | |
| theme="default", | |
| examples=["owasp.org", "nist.gov"], | |
| ) | |
| scrap_subdomains_tool = gr.Interface( | |
| fn=scrap_subdomains_for_domain, | |
| inputs=gr.Textbox(label="domain"), | |
| outputs=gr.JSON(label="Subdomains managed by domain"), | |
| title="Subdomains Extractor of domains", | |
| description="Retrieves the subdomains for the input domain if they are common", | |
| theme="default", | |
| examples=["github.com", "netacea.com"], | |
| ) | |
| extractor_of_ioc_from_threatfox_tool = gr.Interface( | |
| fn=retrieve_ioc_from_threatfox, | |
| inputs=gr.Textbox(label="IoC - url, domains or hash"), | |
| outputs=gr.Text(label="Entity information as an IoC"), | |
| title="IoC information extractor associated to particular entities", | |
| description=( | |
| "If information as an Indicator of Compromise (IoC) exists " | |
| "for the input url, domain or hash, it retrieves it" | |
| ), | |
| theme="default", | |
| examples=["advertipros.com", "dev.couplesparks.com"], | |
| example_labels=["👾 IoC 1", "👾 IoC 2"], | |
| ) | |