princemaxp commited on
Commit
e30d91f
·
verified ·
1 Parent(s): b00d456

Update parse_email.py

Browse files
Files changed (1) hide show
  1. parse_email.py +146 -65
parse_email.py CHANGED
@@ -1,96 +1,177 @@
1
  # parse_email.py
2
  import email
3
  from email import policy
 
 
4
  from bs4 import BeautifulSoup
5
  import re
6
  import base64
7
- import io
8
 
9
- def _extract_inline_images_from_html(html):
10
- images = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  soup = BeautifulSoup(html or "", "html.parser")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  for img in soup.find_all("img"):
13
  src = img.get("src", "")
14
- if src.startswith("data:image/"):
15
- # e.g. data:image/png;base64,iVBORw0...
16
  try:
17
  header, b64 = src.split(",", 1)
18
- data = base64.b64decode(b64)
19
- images.append(data)
20
  except Exception:
21
- continue
22
- return images
 
 
 
 
 
23
 
24
  def parse_email(file_path):
25
  """
26
- Returns: headers(dict), subject(str), body(str), urls(list), images(list of bytes)
 
 
 
 
 
 
 
27
  """
 
28
  with open(file_path, "rb") as f:
29
- msg = email.message_from_binary_file(f, policy=policy.default)
30
 
 
 
 
31
  headers = dict(msg.items())
32
- subject = headers.get("Subject", "") or ""
33
 
34
- body = ""
35
- images = []
 
 
 
 
 
36
 
37
- # Walk parts - handle multipart and attachments
38
- if msg.is_multipart():
39
- for part in msg.walk():
40
- ctype = part.get_content_type()
41
- disp = str(part.get("Content-Disposition") or "").lower()
42
- # attachments that are images
43
- if ctype.startswith("image/"):
44
- try:
45
- data = part.get_payload(decode=True)
46
- if data:
47
- images.append(data)
48
- except Exception:
49
- pass
50
-
51
- # text/plain
52
- if ctype == "text/plain" and "attachment" not in disp:
53
- try:
54
- body += part.get_content()
55
- except Exception:
56
- pass
57
-
58
- # text/html
59
- if ctype == "text/html" and "attachment" not in disp:
60
- try:
61
- html_body = part.get_content()
62
- # extract inline images from this html (data URIs)
63
- images += _extract_inline_images_from_html(html_body)
64
- # convert html to text
65
- soup = BeautifulSoup(html_body, "html.parser")
66
- body += soup.get_text(" ", strip=True)
67
- except Exception:
68
- pass
69
- else:
70
- # not multipart
71
- try:
72
- if msg.get_content_type() == "text/html":
73
- html_body = msg.get_content()
74
- images += _extract_inline_images_from_html(html_body)
75
- soup = BeautifulSoup(html_body, "html.parser")
76
- body = soup.get_text(" ", strip=True)
77
- else:
78
- body = msg.get_content()
79
- except Exception:
80
- body = ""
81
-
82
- # URL extraction (from combined body)
83
- urls = set()
84
  try:
85
- urls.update(re.findall(r"https?://[^\s\"'<>]+", body))
86
  except Exception:
87
  pass
88
 
89
- # Also try to find URLs in headers (e.g., List-Unsubscribe) or other parts
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  for k, v in headers.items():
91
  try:
92
- urls.update(re.findall(r"https?://[^\s\"'<>]+", str(v)))
93
  except Exception:
94
  pass
95
 
96
- return headers, subject, body, list(urls), images
 
 
 
 
 
 
 
 
 
1
  # parse_email.py
2
  import email
3
  from email import policy
4
+ from email.parser import BytesParser
5
+ from email.utils import parsedate_to_datetime
6
  from bs4 import BeautifulSoup
7
  import re
8
  import base64
9
+ import quopri
10
 
11
+ # ------------------------------------------------
12
+ # Helpers
13
+ # ------------------------------------------------
14
+
15
+ URL_REGEX = re.compile(r"https?://[^\s\"'<>]+")
16
+
17
+ def normalize_text(text: str) -> str:
18
+ if not text:
19
+ return ""
20
+ text = re.sub(r"\s+", " ", text)
21
+ return text.strip()
22
+
23
+ def decode_payload(part):
24
+ payload = part.get_payload(decode=True)
25
+ if payload is None:
26
+ return ""
27
+
28
+ charset = part.get_content_charset() or "utf-8"
29
+ try:
30
+ return payload.decode(charset, errors="replace")
31
+ except Exception:
32
+ return payload.decode("utf-8", errors="replace")
33
+
34
+ def extract_urls_from_text(text):
35
+ return set(URL_REGEX.findall(text or ""))
36
+
37
+ # ------------------------------------------------
38
+ # HTML ANALYSIS
39
+ # ------------------------------------------------
40
+
41
+ def analyze_html(html):
42
  soup = BeautifulSoup(html or "", "html.parser")
43
+ text = soup.get_text(" ", strip=True)
44
+
45
+ urls = set()
46
+ hidden_links = []
47
+
48
+ for tag in soup.find_all("a"):
49
+ href = tag.get("href", "").strip()
50
+ anchor_text = normalize_text(tag.get_text())
51
+
52
+ if href.startswith("http"):
53
+ urls.add(href)
54
+
55
+ # Anchor mismatch (classic phishing trick)
56
+ if anchor_text and anchor_text.startswith("http") and anchor_text not in href:
57
+ hidden_links.append({
58
+ "displayed": anchor_text,
59
+ "actual": href
60
+ })
61
+
62
+ # Inline images (base64)
63
+ inline_images = []
64
  for img in soup.find_all("img"):
65
  src = img.get("src", "")
66
+ if src.startswith("data:image"):
 
67
  try:
68
  header, b64 = src.split(",", 1)
69
+ inline_images.append(base64.b64decode(b64))
 
70
  except Exception:
71
+ pass
72
+
73
+ return normalize_text(text), urls, hidden_links, inline_images
74
+
75
+ # ------------------------------------------------
76
+ # MAIN PARSER
77
+ # ------------------------------------------------
78
 
79
  def parse_email(file_path):
80
  """
81
+ Returns:
82
+ headers: dict
83
+ metadata: dict
84
+ body: str
85
+ urls: list
86
+ hidden_links: list
87
+ attachments: list
88
+ images: list (bytes)
89
  """
90
+
91
  with open(file_path, "rb") as f:
92
+ msg = BytesParser(policy=policy.default).parse(f)
93
 
94
+ # -----------------------
95
+ # HEADERS
96
+ # -----------------------
97
  headers = dict(msg.items())
 
98
 
99
+ metadata = {
100
+ "subject": headers.get("Subject", ""),
101
+ "from": headers.get("From", ""),
102
+ "to": headers.get("To", ""),
103
+ "date": None,
104
+ "message_id": headers.get("Message-ID", "")
105
+ }
106
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
  try:
108
+ metadata["date"] = parsedate_to_datetime(headers.get("Date"))
109
  except Exception:
110
  pass
111
 
112
+ # -----------------------
113
+ # CONTENT EXTRACTION
114
+ # -----------------------
115
+ full_text = ""
116
+ urls = set()
117
+ hidden_links = []
118
+ images = []
119
+ attachments = []
120
+
121
+ for part in msg.walk():
122
+ ctype = part.get_content_type()
123
+ disp = (part.get("Content-Disposition") or "").lower()
124
+
125
+ # -------- Attachments --------
126
+ if "attachment" in disp:
127
+ attachments.append({
128
+ "filename": part.get_filename(),
129
+ "content_type": ctype,
130
+ "size": len(part.get_payload(decode=True) or b""),
131
+ })
132
+ continue
133
+
134
+ # -------- Images --------
135
+ if ctype.startswith("image/"):
136
+ try:
137
+ data = part.get_payload(decode=True)
138
+ if data:
139
+ images.append(data)
140
+ except Exception:
141
+ pass
142
+ continue
143
+
144
+ # -------- Text Plain --------
145
+ if ctype == "text/plain":
146
+ text = decode_payload(part)
147
+ text = normalize_text(text)
148
+ full_text += " " + text
149
+ urls.update(extract_urls_from_text(text))
150
+
151
+ # -------- HTML --------
152
+ elif ctype == "text/html":
153
+ html = decode_payload(part)
154
+ text, found_urls, hidden, inline_imgs = analyze_html(html)
155
+ full_text += " " + text
156
+ urls.update(found_urls)
157
+ hidden_links.extend(hidden)
158
+ images.extend(inline_imgs)
159
+
160
+ # -----------------------
161
+ # HEADER URL EXTRACTION
162
+ # -----------------------
163
  for k, v in headers.items():
164
  try:
165
+ urls.update(extract_urls_from_text(str(v)))
166
  except Exception:
167
  pass
168
 
169
+ return (
170
+ headers,
171
+ metadata,
172
+ normalize_text(full_text),
173
+ list(urls),
174
+ hidden_links,
175
+ attachments,
176
+ images,
177
+ )