princemaxp commited on
Commit
748e27f
·
verified ·
1 Parent(s): a4c47a0

Update parse_email.py

Browse files
Files changed (1) hide show
  1. parse_email.py +72 -140
parse_email.py CHANGED
@@ -1,177 +1,109 @@
1
  # parse_email.py
2
  import email
3
  from email import policy
4
- from email.parser import BytesParser
5
- from email.utils import parsedate_to_datetime
6
  from bs4 import BeautifulSoup
7
  import re
8
  import base64
9
- import quopri
10
 
11
- # ------------------------------------------------
12
- # Helpers
13
- # ------------------------------------------------
14
 
15
- URL_REGEX = re.compile(r"https?://[^\s\"'<>]+")
16
-
17
- def normalize_text(text: str) -> str:
18
- if not text:
19
- return ""
20
- text = re.sub(r"\s+", " ", text)
21
- return text.strip()
22
-
23
- def decode_payload(part):
24
- payload = part.get_payload(decode=True)
25
- if payload is None:
26
- return ""
27
-
28
- charset = part.get_content_charset() or "utf-8"
29
- try:
30
- return payload.decode(charset, errors="replace")
31
- except Exception:
32
- return payload.decode("utf-8", errors="replace")
33
-
34
- def extract_urls_from_text(text):
35
- return set(URL_REGEX.findall(text or ""))
36
-
37
- # ------------------------------------------------
38
- # HTML ANALYSIS
39
- # ------------------------------------------------
40
-
41
- def analyze_html(html):
42
  soup = BeautifulSoup(html or "", "html.parser")
43
- text = soup.get_text(" ", strip=True)
44
-
45
- urls = set()
46
- hidden_links = []
47
-
48
- for tag in soup.find_all("a"):
49
- href = tag.get("href", "").strip()
50
- anchor_text = normalize_text(tag.get_text())
51
-
52
- if href.startswith("http"):
53
- urls.add(href)
54
-
55
- # Anchor mismatch (classic phishing trick)
56
- if anchor_text and anchor_text.startswith("http") and anchor_text not in href:
57
- hidden_links.append({
58
- "displayed": anchor_text,
59
- "actual": href
60
- })
61
-
62
- # Inline images (base64)
63
- inline_images = []
64
  for img in soup.find_all("img"):
65
  src = img.get("src", "")
66
- if src.startswith("data:image"):
67
  try:
68
- header, b64 = src.split(",", 1)
69
- inline_images.append(base64.b64decode(b64))
70
  except Exception:
71
  pass
 
72
 
73
- return normalize_text(text), urls, hidden_links, inline_images
74
-
75
- # ------------------------------------------------
76
- # MAIN PARSER
77
- # ------------------------------------------------
78
 
79
  def parse_email(file_path):
80
  """
81
  Returns:
82
- headers: dict
83
- metadata: dict
84
- body: str
85
- urls: list
86
- hidden_links: list
87
- attachments: list
88
- images: list (bytes)
89
  """
90
 
91
  with open(file_path, "rb") as f:
92
- msg = BytesParser(policy=policy.default).parse(f)
93
 
94
- # -----------------------
95
- # HEADERS
96
- # -----------------------
97
  headers = dict(msg.items())
 
98
 
99
- metadata = {
100
- "subject": headers.get("Subject", ""),
101
- "from": headers.get("From", ""),
102
- "to": headers.get("To", ""),
103
- "date": None,
104
- "message_id": headers.get("Message-ID", "")
105
- }
106
-
107
- try:
108
- metadata["date"] = parsedate_to_datetime(headers.get("Date"))
109
- except Exception:
110
- pass
111
-
112
- # -----------------------
113
- # CONTENT EXTRACTION
114
- # -----------------------
115
- full_text = ""
116
- urls = set()
117
- hidden_links = []
118
  images = []
119
  attachments = []
 
120
 
121
- for part in msg.walk():
122
- ctype = part.get_content_type()
123
- disp = (part.get("Content-Disposition") or "").lower()
124
-
125
- # -------- Attachments --------
126
- if "attachment" in disp:
127
- attachments.append({
128
- "filename": part.get_filename(),
129
- "content_type": ctype,
130
- "size": len(part.get_payload(decode=True) or b""),
131
- })
132
- continue
133
-
134
- # -------- Images --------
135
- if ctype.startswith("image/"):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  try:
137
- data = part.get_payload(decode=True)
138
- if data:
139
- images.append(data)
 
 
 
 
 
140
  except Exception:
141
  pass
142
- continue
143
-
144
- # -------- Text Plain --------
145
- if ctype == "text/plain":
146
- text = decode_payload(part)
147
- text = normalize_text(text)
148
- full_text += " " + text
149
- urls.update(extract_urls_from_text(text))
 
 
 
150
 
151
- # -------- HTML --------
152
- elif ctype == "text/html":
153
- html = decode_payload(part)
154
- text, found_urls, hidden, inline_imgs = analyze_html(html)
155
- full_text += " " + text
156
- urls.update(found_urls)
157
- hidden_links.extend(hidden)
158
- images.extend(inline_imgs)
159
 
160
- # -----------------------
161
- # HEADER URL EXTRACTION
162
- # -----------------------
163
- for k, v in headers.items():
164
  try:
165
- urls.update(extract_urls_from_text(str(v)))
166
  except Exception:
167
  pass
168
 
169
- return (
170
- headers,
171
- metadata,
172
- normalize_text(full_text),
173
- list(urls),
174
- hidden_links,
175
- attachments,
176
- images,
177
- )
 
1
  # parse_email.py
2
  import email
3
  from email import policy
 
 
4
  from bs4 import BeautifulSoup
5
  import re
6
  import base64
 
7
 
 
 
 
8
 
9
+ def _extract_inline_images_from_html(html):
10
+ images = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  soup = BeautifulSoup(html or "", "html.parser")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  for img in soup.find_all("img"):
13
  src = img.get("src", "")
14
+ if src.startswith("data:image/"):
15
  try:
16
+ _, b64 = src.split(",", 1)
17
+ images.append(base64.b64decode(b64))
18
  except Exception:
19
  pass
20
+ return images
21
 
 
 
 
 
 
22
 
23
  def parse_email(file_path):
24
  """
25
  Returns:
26
+ headers (dict),
27
+ subject (str),
28
+ body (str),
29
+ urls (list),
30
+ images (list of bytes),
31
+ attachments (list of dict)
 
32
  """
33
 
34
  with open(file_path, "rb") as f:
35
+ msg = email.message_from_binary_file(f, policy=policy.default)
36
 
 
 
 
37
  headers = dict(msg.items())
38
+ subject = headers.get("Subject", "") or ""
39
 
40
+ body = ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  images = []
42
  attachments = []
43
+ urls = set()
44
 
45
+ if msg.is_multipart():
46
+ for part in msg.walk():
47
+ ctype = part.get_content_type()
48
+ disp = str(part.get("Content-Disposition") or "").lower()
49
+
50
+ # ---------- ATTACHMENTS ----------
51
+ if "attachment" in disp:
52
+ try:
53
+ data = part.get_payload(decode=True)
54
+ attachments.append({
55
+ "filename": part.get_filename(),
56
+ "content_type": ctype,
57
+ "size": len(data) if data else 0,
58
+ "data": data
59
+ })
60
+ except Exception:
61
+ pass
62
+ continue
63
+
64
+ # ---------- INLINE IMAGES ----------
65
+ if ctype.startswith("image/"):
66
+ try:
67
+ data = part.get_payload(decode=True)
68
+ if data:
69
+ images.append(data)
70
+ except Exception:
71
+ pass
72
+
73
+ # ---------- TEXT ----------
74
  try:
75
+ if ctype == "text/plain":
76
+ body += part.get_content() + "\n"
77
+
78
+ elif ctype == "text/html":
79
+ html = part.get_content()
80
+ images += _extract_inline_images_from_html(html)
81
+ soup = BeautifulSoup(html, "html.parser")
82
+ body += soup.get_text(" ", strip=True) + "\n"
83
  except Exception:
84
  pass
85
+ else:
86
+ try:
87
+ if msg.get_content_type() == "text/html":
88
+ html = msg.get_content()
89
+ images += _extract_inline_images_from_html(html)
90
+ soup = BeautifulSoup(html, "html.parser")
91
+ body = soup.get_text(" ", strip=True)
92
+ else:
93
+ body = msg.get_content()
94
+ except Exception:
95
+ pass
96
 
97
+ # ---------- URL EXTRACTION ----------
98
+ try:
99
+ urls.update(re.findall(r"https?://[^\s\"'<>]+", body))
100
+ except Exception:
101
+ pass
 
 
 
102
 
103
+ for _, v in headers.items():
 
 
 
104
  try:
105
+ urls.update(re.findall(r"https?://[^\s\"'<>]+", str(v)))
106
  except Exception:
107
  pass
108
 
109
+ return headers, subject, body.strip(), list(urls), images, attachments