Ultronprime commited on
Commit
51e3cb1
·
verified ·
1 Parent(s): 70d427a

Update email_fetcher: add fetch_all_emails_by_date and fetch_headers_only for historical processing

Browse files
Files changed (1) hide show
  1. modules/email_fetcher.py +89 -6
modules/email_fetcher.py CHANGED
@@ -92,7 +92,6 @@ class EmailFetcher:
92
  logger.error(f"Failed to select folder: {folder}")
93
  return []
94
 
95
- # Search for unread emails
96
  status, message_ids = self.connection.search(None, "UNSEEN")
97
  if status != "OK":
98
  logger.error("Failed to search for unread emails")
@@ -115,24 +114,33 @@ class EmailFetcher:
115
 
116
  return emails_data
117
 
118
- def fetch_emails_by_date(self, since_date: str, folder: str = "INBOX") -> List[Dict[str, Any]]:
119
- """Fetch emails since a specific date (format: DD-Mon-YYYY, e.g., '01-Jan-2024')."""
 
 
 
 
 
120
  if not self.connection:
121
  if not self.connect():
122
  return []
123
 
124
  emails_data = []
125
  try:
 
126
  status, _ = self.connection.select(folder, readonly=True)
127
  if status != "OK":
 
128
  return []
129
 
 
130
  status, message_ids = self.connection.search(None, f'SINCE {since_date}')
131
  if status != "OK":
 
132
  return []
133
 
134
  ids = message_ids[0].split()
135
- logger.info(f"Found {len(ids)} emails since {since_date}")
136
 
137
  for msg_id in ids:
138
  try:
@@ -148,6 +156,78 @@ class EmailFetcher:
148
 
149
  return emails_data
150
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
  def _fetch_single_email(self, msg_id: bytes, mark_read: bool = False) -> Optional[Dict[str, Any]]:
152
  """Fetch and parse a single email."""
153
  status, msg_data = self.connection.fetch(msg_id, "(RFC822)")
@@ -184,9 +264,12 @@ class EmailFetcher:
184
  # Extract attachments
185
  attachments = self._extract_attachments(msg, message_id)
186
 
187
- # Mark as read if requested
188
  if mark_read:
189
- self.connection.store(msg_id, "+FLAGS", "\\Seen")
 
 
 
190
 
191
  return {
192
  "message_id": message_id,
 
92
  logger.error(f"Failed to select folder: {folder}")
93
  return []
94
 
 
95
  status, message_ids = self.connection.search(None, "UNSEEN")
96
  if status != "OK":
97
  logger.error("Failed to search for unread emails")
 
114
 
115
  return emails_data
116
 
117
+ def fetch_all_emails_by_date(self, since_date: str, folder: str = "INBOX") -> List[Dict[str, Any]]:
118
+ """
119
+ Fetch ALL emails (read + unread) since a date.
120
+ This is the key method for historical processing.
121
+ Date format: DD-Mon-YYYY (e.g., '01-Jan-2024')
122
+ Opens mailbox as READONLY so read status is NOT changed.
123
+ """
124
  if not self.connection:
125
  if not self.connect():
126
  return []
127
 
128
  emails_data = []
129
  try:
130
+ # READONLY mode - does NOT mark emails as read
131
  status, _ = self.connection.select(folder, readonly=True)
132
  if status != "OK":
133
+ logger.error(f"Failed to select folder: {folder}")
134
  return []
135
 
136
+ # Search ALL emails since date (not just UNSEEN)
137
  status, message_ids = self.connection.search(None, f'SINCE {since_date}')
138
  if status != "OK":
139
+ logger.error(f"Failed to search emails since {since_date}")
140
  return []
141
 
142
  ids = message_ids[0].split()
143
+ logger.info(f"Found {len(ids)} total emails since {since_date}")
144
 
145
  for msg_id in ids:
146
  try:
 
156
 
157
  return emails_data
158
 
159
+ def fetch_email_headers_by_date(self, since_date: str, folder: str = "INBOX") -> List[Dict[str, Any]]:
160
+ """
161
+ Fetch only email HEADERS (fast) since a date - for preview/listing.
162
+ Does NOT download attachments or full body. Very fast.
163
+ Date format: DD-Mon-YYYY (e.g., '01-Apr-2025')
164
+ """
165
+ if not self.connection:
166
+ if not self.connect():
167
+ return []
168
+
169
+ headers_list = []
170
+ try:
171
+ status, _ = self.connection.select(folder, readonly=True)
172
+ if status != "OK":
173
+ return []
174
+
175
+ status, message_ids = self.connection.search(None, f'SINCE {since_date}')
176
+ if status != "OK":
177
+ return []
178
+
179
+ ids = message_ids[0].split()
180
+ logger.info(f"Found {len(ids)} emails since {since_date} (headers only)")
181
+
182
+ for msg_id in ids:
183
+ try:
184
+ # Fetch only headers (ENVELOPE) - much faster than full RFC822
185
+ status, msg_data = self.connection.fetch(msg_id, "(BODY[HEADER.FIELDS (FROM TO SUBJECT DATE MESSAGE-ID)])")
186
+ if status != "OK":
187
+ continue
188
+
189
+ header_bytes = msg_data[0][1]
190
+ msg = email.message_from_bytes(header_bytes)
191
+
192
+ message_id = msg.get("Message-ID", "").strip()
193
+ if not message_id:
194
+ message_id = f"generated_{msg_id.decode()}_{datetime.now().timestamp()}"
195
+
196
+ subject = self._decode_header(msg.get("Subject", ""))
197
+ sender = self._decode_header(msg.get("From", ""))
198
+ recipients = self._decode_header(msg.get("To", ""))
199
+
200
+ date_str = msg.get("Date", "")
201
+ try:
202
+ date_received = parsedate_to_datetime(date_str)
203
+ except Exception:
204
+ date_received = None
205
+
206
+ headers_list.append({
207
+ "msg_id": msg_id,
208
+ "message_id": message_id,
209
+ "subject": subject,
210
+ "sender": sender,
211
+ "recipients": recipients,
212
+ "date_received": date_received.isoformat() if date_received else None,
213
+ })
214
+
215
+ except Exception as e:
216
+ logger.error(f"Error fetching header for {msg_id}: {e}")
217
+ continue
218
+
219
+ except Exception as e:
220
+ logger.error(f"Error fetching headers: {e}")
221
+
222
+ return headers_list
223
+
224
+ def fetch_single_email_by_msg_id(self, msg_id: bytes) -> Optional[Dict[str, Any]]:
225
+ """Fetch a single email by its IMAP message ID (for selective processing)."""
226
+ if not self.connection:
227
+ if not self.connect():
228
+ return None
229
+ return self._fetch_single_email(msg_id, mark_read=False)
230
+
231
  def _fetch_single_email(self, msg_id: bytes, mark_read: bool = False) -> Optional[Dict[str, Any]]:
232
  """Fetch and parse a single email."""
233
  status, msg_data = self.connection.fetch(msg_id, "(RFC822)")
 
264
  # Extract attachments
265
  attachments = self._extract_attachments(msg, message_id)
266
 
267
+ # Mark as read if requested (only works if mailbox opened in read-write mode)
268
  if mark_read:
269
+ try:
270
+ self.connection.store(msg_id, "+FLAGS", "\\Seen")
271
+ except Exception:
272
+ pass
273
 
274
  return {
275
  "message_id": message_id,