princemaxp commited on
Commit
89a43f0
·
verified ·
1 Parent(s): e30d91f

Update url_analyzer.py

Browse files
Files changed (1) hide show
  1. url_analyzer.py +156 -47
url_analyzer.py CHANGED
@@ -1,90 +1,199 @@
 
1
  import requests
2
  import os
3
  import re
4
- from urllib.parse import quote
 
5
 
6
  SAFE_BROWSING_API_KEY = os.getenv("SAFE_BROWSING_API_KEY")
7
  OTX_API_KEY = os.getenv("OTX_API_KEY")
8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  def analyze_urls(urls):
10
  findings = []
11
  score = 0
12
- urls = urls or []
13
 
14
- for url in urls:
15
- # 1) Google Safe Browsing
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  if SAFE_BROWSING_API_KEY:
17
  try:
18
  payload = {
19
- "client": {"clientId": "email-analysis-tool", "clientVersion": "1.0"},
20
  "threatInfo": {
21
- "threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE"],
 
 
 
 
 
22
  "platformTypes": ["ANY_PLATFORM"],
23
  "threatEntryTypes": ["URL"],
24
  "threatEntries": [{"url": url}],
25
  },
26
  }
 
27
  res = requests.post(
28
  f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_API_KEY}",
29
  json=payload,
30
- timeout=15,
31
  )
32
- data = res.json()
33
- if isinstance(data, dict) and "matches" in data:
34
- findings.append(f"URL: {url} flagged by Google Safe Browsing")
35
- score += 40
36
- else:
37
- findings.append(f"URL: {url} not flagged (Google Safe Browsing)")
38
  except Exception:
39
- findings.append(f"URL: {url} check failed (Google Safe Browsing)")
 
 
 
 
40
 
41
- # 2) AlienVault OTX
42
  if OTX_API_KEY:
43
  try:
44
  headers = {"X-OTX-API-KEY": OTX_API_KEY}
45
- encoded = quote(url, safe="")
46
  res = requests.get(
47
- f"https://otx.alienvault.com/api/v1/indicators/url/{encoded}/general",
48
  headers=headers,
49
- timeout=15,
50
  )
 
51
  if res.status_code == 200:
52
  data = res.json()
53
  if data.get("pulse_info", {}).get("count", 0) > 0:
54
- findings.append(f"URL: {url} flagged in AlienVault OTX")
55
  score += 30
56
- else:
57
- findings.append(f"URL: {url} not found in AlienVault OTX")
58
- else:
59
- findings.append(f"URL: {url} OTX lookup returned {res.status_code}")
60
  except Exception:
61
- findings.append(f"URL: {url} check failed (AlienVault OTX)")
 
 
 
 
62
 
63
- # 3) URLHaus
64
  try:
65
- res = requests.post("https://urlhaus-api.abuse.ch/v1/url/", data={"url": url}, timeout=15)
 
 
 
 
66
  data = res.json()
67
  if data.get("query_status") == "ok":
68
- status = data.get("url_status", "malicious/suspicious")
69
- findings.append(f"URL: {url} flagged as {status} (URLHaus)")
70
- score += 30
71
- else:
72
- findings.append(f"URL: {url} not found in URLHaus")
73
  except Exception:
74
- findings.append(f"URL: {url} check failed (URLHaus)")
75
-
76
- # 4) Heuristics
77
- domain_match = re.search(r"https?://([^/]+)/?", url)
78
- if domain_match:
79
- domain = domain_match.group(1)
80
- if len(domain) > 25 or any(char.isdigit() for char in domain.split(".")[0]):
81
- findings.append(f"URL: {url} has suspicious-looking domain")
82
- score += 15
83
- if "?" in url and len(url.split("?", 1)[1]) > 50:
84
- findings.append(f"URL: {url} has obfuscated query string")
85
- score += 15
86
-
87
- if not findings:
88
- return ["No URLs found in email."], 0
89
 
90
- return findings, score
 
1
+ # url_analyzer.py
2
  import requests
3
  import os
4
  import re
5
+ from urllib.parse import urlparse, unquote
6
+ from difflib import SequenceMatcher
7
 
8
  SAFE_BROWSING_API_KEY = os.getenv("SAFE_BROWSING_API_KEY")
9
  OTX_API_KEY = os.getenv("OTX_API_KEY")
10
 
11
+ # ---------------------------
12
+ # CONFIG
13
+ # ---------------------------
14
+
15
+ SHORTENERS = {
16
+ "bit.ly", "tinyurl.com", "goo.gl", "t.co", "is.gd",
17
+ "buff.ly", "ow.ly", "rebrand.ly", "shorturl.at"
18
+ }
19
+
20
+ SUSPICIOUS_TLDS = {"xyz", "top", "click", "info", "work", "loan"}
21
+
22
+ BRAND_KEYWORDS = {
23
+ "paypal": ["paypal.com"],
24
+ "amazon": ["amazon.com"],
25
+ "google": ["google.com", "gmail.com"],
26
+ "microsoft": ["microsoft.com", "outlook.com"],
27
+ "apple": ["apple.com"],
28
+ }
29
+
30
+ # ---------------------------
31
+ # HELPERS
32
+ # ---------------------------
33
+
34
+ def normalize_url(url: str) -> str:
35
+ url = url.strip()
36
+ url = unquote(url)
37
+ if not url.startswith("http"):
38
+ url = "http://" + url
39
+ return url
40
+
41
+ def get_domain(url: str) -> str:
42
+ try:
43
+ return urlparse(url).netloc.lower()
44
+ except Exception:
45
+ return ""
46
+
47
+ def is_ip_address(domain: str) -> bool:
48
+ return bool(re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", domain))
49
+
50
+ def brand_impersonation(domain: str):
51
+ findings = []
52
+ for brand, legit_domains in BRAND_KEYWORDS.items():
53
+ if brand in domain:
54
+ legit = any(domain == d or domain.endswith("." + d) for d in legit_domains)
55
+ if not legit:
56
+ findings.append(f"Brand impersonation suspected: {brand} in {domain}")
57
+
58
+ for legit in legit_domains:
59
+ ratio = SequenceMatcher(None, domain, legit).ratio()
60
+ if ratio > 0.75 and domain != legit:
61
+ findings.append(f"Look-alike domain detected: {domain} vs {legit}")
62
+ return findings
63
+
64
+ # ---------------------------
65
+ # MAIN ANALYZER
66
+ # ---------------------------
67
+
68
  def analyze_urls(urls):
69
  findings = []
70
  score = 0
 
71
 
72
+ if not urls:
73
+ return ["No URLs found in email."], 0
74
+
75
+ for original_url in urls:
76
+ url = normalize_url(original_url)
77
+ domain = get_domain(url)
78
+
79
+ # ---------------------------
80
+ # BASIC HEURISTICS
81
+ # ---------------------------
82
+
83
+ if is_ip_address(domain):
84
+ findings.append(f"URL uses raw IP address ({domain})")
85
+ score += 40
86
+
87
+ if domain in SHORTENERS:
88
+ findings.append(f"URL shortener detected ({domain})")
89
+ score += 25
90
+
91
+ if any(tld == domain.split(".")[-1] for tld in SUSPICIOUS_TLDS):
92
+ findings.append(f"Suspicious TLD used ({domain})")
93
+ score += 20
94
+
95
+ if len(domain) > 30:
96
+ findings.append(f"Unusually long domain name ({domain})")
97
+ score += 15
98
+
99
+ if any(char.isdigit() for char in domain.split(".")[0]):
100
+ findings.append(f"Digit-heavy domain (possible DGA): {domain}")
101
+ score += 15
102
+
103
+ # ---------------------------
104
+ # BRAND SPOOFING
105
+ # ---------------------------
106
+
107
+ brand_findings = brand_impersonation(domain)
108
+ for bf in brand_findings:
109
+ findings.append(f"URL: {bf}")
110
+ score += 35
111
+
112
+ # ---------------------------
113
+ # QUERY OBFUSCATION
114
+ # ---------------------------
115
+
116
+ parsed = urlparse(url)
117
+ if parsed.query:
118
+ if len(parsed.query) > 60:
119
+ findings.append(f"Long obfuscated query string in URL ({domain})")
120
+ score += 15
121
+ if "%3D" in parsed.query or "%2F" in parsed.query:
122
+ findings.append(f"Encoded parameters used to obscure URL ({domain})")
123
+ score += 10
124
+
125
+ # ---------------------------
126
+ # GOOGLE SAFE BROWSING
127
+ # ---------------------------
128
+
129
  if SAFE_BROWSING_API_KEY:
130
  try:
131
  payload = {
132
+ "client": {"clientId": "email-guardian", "clientVersion": "1.0"},
133
  "threatInfo": {
134
+ "threatTypes": [
135
+ "MALWARE",
136
+ "SOCIAL_ENGINEERING",
137
+ "UNWANTED_SOFTWARE",
138
+ "PHISHING",
139
+ ],
140
  "platformTypes": ["ANY_PLATFORM"],
141
  "threatEntryTypes": ["URL"],
142
  "threatEntries": [{"url": url}],
143
  },
144
  }
145
+
146
  res = requests.post(
147
  f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_API_KEY}",
148
  json=payload,
149
+ timeout=10,
150
  )
151
+
152
+ if res.status_code == 200 and res.json().get("matches"):
153
+ findings.append(f"URL flagged by Google Safe Browsing ({url})")
154
+ score += 45
155
+
 
156
  except Exception:
157
+ findings.append(f"Safe Browsing lookup failed ({url})")
158
+
159
+ # ---------------------------
160
+ # ALIENVAULT OTX
161
+ # ---------------------------
162
 
 
163
  if OTX_API_KEY:
164
  try:
165
  headers = {"X-OTX-API-KEY": OTX_API_KEY}
 
166
  res = requests.get(
167
+ f"https://otx.alienvault.com/api/v1/indicators/domain/{domain}/general",
168
  headers=headers,
169
+ timeout=10,
170
  )
171
+
172
  if res.status_code == 200:
173
  data = res.json()
174
  if data.get("pulse_info", {}).get("count", 0) > 0:
175
+ findings.append(f"Domain reported in AlienVault OTX ({domain})")
176
  score += 30
177
+
 
 
 
178
  except Exception:
179
+ findings.append(f"OTX lookup failed ({domain})")
180
+
181
+ # ---------------------------
182
+ # URLHAUS
183
+ # ---------------------------
184
 
 
185
  try:
186
+ res = requests.post(
187
+ "https://urlhaus-api.abuse.ch/v1/url/",
188
+ data={"url": url},
189
+ timeout=10,
190
+ )
191
  data = res.json()
192
  if data.get("query_status") == "ok":
193
+ status = data.get("url_status", "malicious")
194
+ findings.append(f"URL flagged in URLHaus as {status} ({url})")
195
+ score += 35
 
 
196
  except Exception:
197
+ findings.append(f"URLHaus lookup failed ({url})")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
 
199
+ return findings, min(score, 100)