precison9 commited on
Commit
99b21b2
·
verified ·
1 Parent(s): b4edd10

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +453 -254
flask_Character.py CHANGED
@@ -1,3 +1,6 @@
 
 
 
1
  import os
2
  # NUMBA_CACHE_DIR and NUMBA_DISABLE_CACHE are often set for specific environments,
3
  # e.g., if you're experiencing issues with Numba's caching behavior or in containerized environments.
@@ -11,43 +14,30 @@ from datetime import date, datetime, timedelta
11
  from typing import List, Optional, Literal, Dict, Any, Tuple
12
  import traceback
13
  import asyncio
14
- from uuid import uuid4, UUID
15
 
16
  from fastapi import FastAPI, HTTPException, Response, Query, Depends, status
 
17
  from fastapi.exception_handlers import http_exception_handler
18
  from starlette.exceptions import HTTPException as StarletteHTTPException
19
-
20
  from langchain.prompts import PromptTemplate
21
  from langchain_groq import ChatGroq
22
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
23
  from typing_extensions import Annotated
24
- from pydantic_core import core_schema
25
 
26
  from pymongo import MongoClient
27
  from pymongo.errors import ConnectionFailure, OperationFailure
28
  from bson import ObjectId
29
 
30
- # --- Batching Configuration ---
31
- MAX_BATCH_SIZE = 20
32
- BATCH_INTERVAL_SECONDS = 1.0
33
-
34
- # --- Queues and pending request stores for batching ---
35
- extract_data_queue: Optional[asyncio.Queue[Tuple[Any, UUID]]] = None
36
- generate_reply_queue: Optional[asyncio.Queue[Tuple[Any, UUID]]] = None
37
-
38
- extract_pending_requests: Dict[UUID, asyncio.Future] = {}
39
- generate_pending_requests: Dict[UUID, asyncio.Future] = {}
40
-
41
- # Shutdown event for worker tasks
42
- shutdown_event = asyncio.Event()
43
-
44
-
45
  # --- MongoDB Configuration ---
46
- MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net"
 
 
47
  DB_NAME = "email_assistant_db"
48
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
49
  GENERATED_REPLIES_COLLECTION = "generated_replies"
50
 
 
51
  client: Optional[MongoClient] = None
52
  db: Optional[Any] = None
53
  extracted_emails_collection: Optional[Any] = None
@@ -55,30 +45,50 @@ generated_replies_collection: Optional[Any] = None
55
 
56
  # --- Pydantic ObjectId Handling ---
57
  class CustomObjectId(str):
 
 
 
 
 
58
  @classmethod
59
  def __get_validators__(cls):
60
  yield cls.validate
 
61
  @classmethod
62
  def validate(cls, v):
 
 
 
63
  if v is None or v == "":
64
  return None
 
65
  if not isinstance(v, (str, ObjectId)):
66
  raise ValueError("ObjectId must be a string or ObjectId instance")
 
 
67
  if isinstance(v, ObjectId):
68
  return str(v)
 
 
69
  if not ObjectId.is_valid(v):
70
  raise ValueError("Invalid ObjectId format")
71
- return cls(v)
 
 
72
  @classmethod
73
  def __get_pydantic_json_schema__(
74
  cls, _core_schema: core_schema.CoreSchema, handler
75
  ) -> Dict[str, Any]:
 
 
76
  json_schema = handler(core_schema.str_schema())
77
- json_schema["example"] = "60c728ef238b9c7b9e0f6c2a"
78
  return json_schema
79
 
 
80
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
81
 
 
82
  # ---------------------- Models ----------------------
83
  class Contact(BaseModel):
84
  name: str
@@ -100,20 +110,28 @@ class Task(BaseModel):
100
  due_date: date
101
 
102
  class ExtractedData(BaseModel):
 
103
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
104
  contacts: List[Contact]
105
  appointments: List[Appointment]
106
  tasks: List[Task]
107
  original_email_text: str
108
  processed_at: datetime = Field(default_factory=datetime.utcnow)
 
109
  class Config:
110
- populate_by_name = True
111
- arbitrary_types_allowed = True
 
 
112
  @model_serializer(when_used='json')
113
  def serialize_model(self):
114
  data = self.model_dump(by_alias=True, exclude_none=True)
 
115
  if "_id" in data and isinstance(data["_id"], ObjectId):
116
  data["_id"] = str(data["_id"])
 
 
 
117
  if 'appointments' in data:
118
  for appt in data['appointments']:
119
  if isinstance(appt.get('start_date'), date):
@@ -140,6 +158,7 @@ class GenerateReplyRequest(BaseModel):
140
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
141
 
142
  class GeneratedReplyData(BaseModel):
 
143
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
144
  original_email_text: str
145
  generated_reply_text: str
@@ -149,9 +168,11 @@ class GeneratedReplyData(BaseModel):
149
  tone: str
150
  emoji: str
151
  generated_at: datetime = Field(default_factory=datetime.utcnow)
 
152
  class Config:
153
  populate_by_name = True
154
  arbitrary_types_allowed = True
 
155
  @model_serializer(when_used='json')
156
  def serialize_model(self):
157
  data = self.model_dump(by_alias=True, exclude_none=True)
@@ -159,349 +180,527 @@ class GeneratedReplyData(BaseModel):
159
  data["_id"] = str(data["_id"])
160
  return data
161
 
 
162
  class GenerateReplyResponse(BaseModel):
163
  reply: str = Field(..., description="The AI-generated reply text.")
164
  stored_id: str = Field(..., description="The MongoDB ID of the stored reply.")
165
  cached: bool = Field(..., description="True if the reply was retrieved from cache, False if newly generated.")
166
 
 
167
  class ExtractedEmailQuery(BaseModel):
168
- contact_name: Optional[str] = Query(None, description="Filter by contact name.")
169
- appointment_title: Optional[str] = Query(None, description="Filter by appointment title.")
170
- task_title: Optional[str] = Query(None, description="Filter by task title.")
171
- from_date: Optional[date] = Query(None, description="Filter by processed date (start).")
172
- to_date: Optional[date] = Query(None, description="Filter by processed date (end).")
173
- limit: int = Query(10, ge=1, le=100)
174
 
175
  class GeneratedReplyQuery(BaseModel):
176
- language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by language.")
177
- style: Optional[str] = Query(None, description="Filter by style.")
178
- tone: Optional[str] = Query(None, description="Filter by tone.")
179
- from_date: Optional[date] = Query(None, description="Filter by generation date (start).")
180
- to_date: Optional[date] = Query(None, description="Filter by generation date (end).")
181
- limit: int = Query(10, ge=1, le=100)
182
-
183
- # --- Utility Functions ---
184
  def extract_last_json_block(text: str) -> Optional[str]:
 
 
 
 
185
  pattern = r'```json\s*(.*?)\s*```'
186
  matches = re.findall(pattern, text, re.DOTALL)
187
- if matches: return matches[-1].strip()
 
 
188
  match = re.search(r'\{.*\}', text, re.DOTALL)
189
- if match: return match.group(0)
 
190
  return None
191
 
192
  def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
193
- if not date_str: return None
 
 
 
 
 
194
  date_str_lower = date_str.lower().strip()
195
- if date_str_lower == "today": return current_date
196
- if date_str_lower == "tomorrow": return current_date + timedelta(days=1)
197
- try: return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
198
- except ValueError: return None
 
 
 
 
 
 
199
 
200
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
 
 
 
 
201
  def split_name(full_name: str) -> tuple[str, str]:
202
  parts = full_name.strip().split()
203
  name = parts[0] if parts else ""
204
  last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
205
  return name, last_name
206
- contacts_data = [Contact(name=split_name(c.get("name",""))[0], last_name=split_name(c.get("name",""))[1], email=c.get("email"), phone_number=c.get("phone_number")) for c in data.get("contacts", [])]
207
- appointments_data = [Appointment(title=a.get("title", "Untitled"), description=a.get("description", "No description"), start_date=parse_date(a.get("start_date"), current_date) or current_date, start_time=a.get("start_time"), end_date=parse_date(a.get("end_date"), current_date), end_time=a.get("end_time")) for a in data.get("appointments", [])]
208
- tasks_data = [Task(task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"), due_date=parse_date(t.get("due_date"), current_date) or current_date) for t in data.get("tasks", [])]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
  return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text)
210
 
211
- # --- Core Logic (Internal Functions) ---
212
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
213
- if not email_text: raise ValueError("Email text cannot be empty for processing.")
 
 
 
 
 
214
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
 
215
  prompt_today_str = current_date.isoformat()
216
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
217
- prompt_template_str = f"""You are an expert email assistant tasked with extracting structured information from an Italian email.**Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.****DO NOT include any conversational text, explanations, or preambles outside the JSON block.****The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".**If a category has no items, its list should be empty (e.g., "contacts": []).Here is the required JSON schema for each category:- **contacts**: List of Contact objects. Each Contact object must have: - `name` (string, full name) - `last_name` (string, last name) - You should infer this from the full name. - `email` (string, optional, null if not present) - `phone_number` (string, optional, null if not present)- **appointments**: List of Appointment objects. Each Appointment object must have: - `title` (string, short, meaningful title in Italian based on the meeting's purpose) - `description` (string, summary of the meeting's goal) - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow") - `start_time` (string, optional, e.g., "10:30 AM", null if not present) - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable) - `end_time` (string, optional, e.g., "11:00 AM", null if not present)- **tasks**: List of Task objects. Each Task object must have: - `task_title` (string, short summary of action item) - `task_description` (string, more detailed explanation) - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")---Email:{{email}}"""
218
- prompt_template = PromptTemplate(input_variables=["email"], template=prompt_template_str)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
219
  chain = prompt_template | llm
220
  try:
221
- llm_output = chain.invoke({"email": email_text})
222
  llm_output_str = llm_output.content
 
223
  json_str = extract_last_json_block(llm_output_str)
224
- if not json_str: raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
 
 
225
  json_data = json.loads(json_str)
226
- return normalize_llm_output(json_data, current_date, email_text)
227
- except json.JSONDecodeError as e: raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
228
- except Exception as e: traceback.print_exc(); raise Exception(f"An error occurred during email processing: {e}")
229
 
230
- def _generate_response_internal(email_text: str, api_key: str, language: Literal["Italian", "English"], length: str, style: str, tone: str, emoji: str) -> str:
231
- if not email_text: return "Cannot generate reply for empty email text."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232
  try:
233
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
234
- prompt_template_str=""" You are an assistant that helps reply to emails. Create a response to the following email with the following parameters: - Language: {language} - Length: {length} - Style: {style} - Tone: {tone} - Emoji usage: {emoji} Email: {email} Write only the reply body. Do not repeat the email or mention any instruction. """
235
- prompt = PromptTemplate(input_variables=["email", "language", "length", "style", "tone", "emoji"], template=prompt_template_str)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
  chain = prompt | llm
 
237
  output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
 
238
  return output.content.strip()
239
- except Exception as e: traceback.print_exc(); raise
 
 
 
240
 
241
- # --- FastAPI Application ---
242
- # NOTE: To make this batching system work correctly with asyncio.Queue,
243
- # you must run the server with a SINGLE WORKER. For example:
244
- # uvicorn main:app --workers 1
245
- app = FastAPI(title="Email Assistant API", description="API with batch processing.", version="1.3.0", docs_url="/", redoc_url="/redoc")
246
 
247
- # --- Exception Handlers ---
 
 
 
 
 
 
 
 
 
 
248
  @app.exception_handler(StarletteHTTPException)
249
  async def custom_http_exception_handler_wrapper(request, exc):
 
 
250
  return await http_exception_handler(request, exc)
251
 
 
252
  @app.exception_handler(Exception)
253
  async def global_exception_handler_wrapper(request, exc):
254
- traceback.print_exc()
255
- return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}), status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, media_type="application/json")
256
-
257
- # --- Batch Worker Helper Functions ---
258
- async def _execute_single_extract_task(request_item: ProcessEmailRequest, request_id: UUID) -> Tuple[UUID, Any]:
259
- current_date = date.today()
260
- try:
261
- result = await asyncio.to_thread(_process_email_internal, request_item.email_text, request_item.groq_api_key, current_date)
262
- return request_id, result
263
- except Exception as e:
264
- return request_id, e
265
-
266
- async def _execute_single_generate_reply_task(request_item: GenerateReplyRequest, request_id: UUID) -> Tuple[UUID, Any]:
267
- try:
268
- result = await asyncio.to_thread(_generate_response_internal, request_item.email_text, request_item.groq_api_key, request_item.language, request_item.length, request_item.style, request_item.tone, request_item.emoji)
269
- return request_id, result
270
- except Exception as e:
271
- return request_id, e
272
-
273
- # --- Batch Worker Functions ---
274
- async def batch_worker(
275
- queue: asyncio.Queue,
276
- pending_requests: Dict[UUID, asyncio.Future],
277
- execution_function: callable,
278
- worker_name: str
279
- ):
280
- print(f"[{datetime.now()}] {worker_name} started.")
281
- while not shutdown_event.is_set():
282
- try:
283
- await asyncio.wait_for(shutdown_event.wait(), timeout=BATCH_INTERVAL_SECONDS)
284
- if shutdown_event.is_set(): break
285
- except asyncio.TimeoutError:
286
- pass # Interval elapsed, time to process
287
-
288
- batch_to_process = []
289
- while len(batch_to_process) < MAX_BATCH_SIZE and not queue.empty():
290
- try:
291
- batch_to_process.append(queue.get_nowait())
292
- except asyncio.QueueEmpty:
293
- break
294
-
295
- if not batch_to_process:
296
- continue
297
-
298
- print(f"[{datetime.now()}] {worker_name}: Processing batch of {len(batch_to_process)} requests.")
299
-
300
- tasks = [execution_function(req_obj, req_id) for req_obj, req_id in batch_to_process]
301
- results = await asyncio.gather(*tasks)
302
-
303
- for req_id, result_or_exc in results:
304
- future = pending_requests.pop(req_id, None)
305
- if future and not future.done():
306
- if isinstance(result_or_exc, Exception):
307
- future.set_exception(result_or_exc)
308
- else:
309
- future.set_result(result_or_exc)
310
-
311
- print(f"[{datetime.now()}] {worker_name} shutting down. Cancelling pending requests...")
312
- for req_id, future in list(pending_requests.items()):
313
- if not future.done():
314
- future.set_exception(HTTPException(status_code=503, detail="Service shutting down"))
315
- pending_requests.pop(req_id, None)
316
- print(f"[{datetime.now()}] {worker_name} stopped.")
317
-
318
- # --- FastAPI Event Handlers ---
319
- worker_tasks: List[asyncio.Task] = []
320
-
321
  @app.on_event("startup")
322
  async def startup_event():
323
  global client, db, extracted_emails_collection, generated_replies_collection
324
- global extract_data_queue, generate_reply_queue, worker_tasks
325
-
326
  print(f"[{datetime.now()}] FastAPI app startup sequence initiated.")
327
  try:
 
328
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
329
- client.admin.command('ping')
330
  db = client[DB_NAME]
331
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
332
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
333
  print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}")
334
 
335
- extract_data_queue = asyncio.Queue()
336
- generate_reply_queue = asyncio.Queue()
337
-
338
- task1 = asyncio.create_task(batch_worker(extract_data_queue, extract_pending_requests, _execute_single_extract_task, "Extract Worker"))
339
- task2 = asyncio.create_task(batch_worker(generate_reply_queue, generate_pending_requests, _execute_single_generate_reply_task, "Reply Worker"))
340
- worker_tasks.extend([task1, task2])
341
- print(f"[{datetime.now()}] Batch processing workers started.")
342
-
343
  except (ConnectionFailure, OperationFailure) as e:
344
  print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}")
345
- client = db = extracted_emails_collection = generated_replies_collection = None
 
 
 
346
  except Exception as e:
347
- print(f"[{datetime.now()}] ERROR: An unexpected error during startup: {e}")
348
  traceback.print_exc()
349
- client = db = extracted_emails_collection = generated_replies_collection = None
 
 
 
350
  finally:
351
- # CORRECTED: Use 'is not None' for pymongo objects
352
- if not (client is not None and db is not None and extracted_emails_collection is not None and generated_replies_collection is not None):
353
- print(f"[{datetime.now()}] WARNING: MongoDB might not be fully initialized.")
354
- print(f"[{datetime.now()}] FastAPI app startup sequence completed.")
 
 
 
 
 
 
 
355
 
356
 
357
  @app.on_event("shutdown")
358
- async def shutdown_event_handler():
359
- global client, shutdown_event, worker_tasks
360
  print(f"[{datetime.now()}] FastAPI app shutting down.")
361
-
362
- if not shutdown_event.is_set():
363
- shutdown_event.set()
364
-
365
- if worker_tasks:
366
- print(f"[{datetime.now()}] Waiting for worker tasks to complete...")
367
- await asyncio.gather(*worker_tasks, return_exceptions=True)
368
- print(f"[{datetime.now()}] Worker tasks completed.")
369
-
370
- if client is not None:
371
  client.close()
372
  print(f"[{datetime.now()}] MongoDB client closed.")
373
- print(f"[{datetime.now()}] FastAPI app shutdown sequence completed.")
374
 
375
  # --- API Endpoints ---
376
  @app.get("/health", summary="Health Check")
377
  async def health_check():
 
 
 
378
  db_status = "MongoDB not connected."
379
  db_ok = False
380
- # CORRECTED: Use 'is not None' for pymongo objects
381
  if client is not None and db is not None:
382
  try:
383
- await asyncio.to_thread(client.admin.command, 'ping')
 
384
  db_status = "MongoDB connection OK."
385
  db_ok = True
386
  except Exception as e:
387
  db_status = f"MongoDB connection error: {e}"
388
-
389
- queues = {}
390
- if extract_data_queue is not None and generate_reply_queue is not None:
391
- queues = {
392
- "extract_data_queue_size": extract_data_queue.qsize(),
393
- "generate_reply_queue_size": generate_reply_queue.qsize(),
394
- "extract_pending_requests": len(extract_pending_requests),
395
- "generate_pending_requests": len(generate_pending_requests)
396
- }
397
-
398
  if db_ok:
399
- return {"status": "ok", "message": "API is up.", "database": db_status, "queues": queues}
400
  else:
401
- raise HTTPException(status_code=503, detail={"message": "Service unavailable.", "database": db_status})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
402
 
403
- @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data (batched)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
404
  async def extract_email_data(request: ProcessEmailRequest):
405
- # CORRECTED: Use 'is None' check
406
- if extracted_emails_collection is None or extract_data_queue is None:
407
- raise HTTPException(status_code=503, detail="Service not available (DB or batch queue).")
 
 
408
 
409
- request_id = uuid4()
410
- future: asyncio.Future[ExtractedData] = asyncio.get_event_loop().create_future()
411
- extract_pending_requests[request_id] = future
412
 
 
413
  try:
414
- await extract_data_queue.put((request, request_id))
415
- extracted_data_obj = await asyncio.wait_for(future, timeout=60.0)
416
-
417
- data_to_insert = extracted_data_obj.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
 
 
 
 
 
 
 
 
418
  if 'appointments' in data_to_insert:
419
  for appt in data_to_insert['appointments']:
420
- if isinstance(appt.get('start_date'), date): appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
421
- if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
 
 
422
  if 'tasks' in data_to_insert:
423
  for task_item in data_to_insert['tasks']:
424
- if isinstance(task_item.get('due_date'), date): task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
425
-
426
- insert_result = await asyncio.to_thread(extracted_emails_collection.insert_one, data_to_insert)
427
- extracted_data_obj.id = str(insert_result.inserted_id)
428
-
429
- return extracted_data_obj
430
- except asyncio.TimeoutError:
431
- raise HTTPException(status_code=504, detail="Request timed out while awaiting processing in batch.")
432
- except Exception as e:
433
- # Re-raise exceptions set by the worker (like HTTPException or others)
434
- raise e
435
- finally:
436
- extract_pending_requests.pop(request_id, None)
437
-
438
- @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate smart reply (batched)")
439
- async def generate_email_reply(request: GenerateReplyRequest):
440
- # CORRECTED: Use 'is None' check
441
- if generated_replies_collection is None or generate_reply_queue is None:
442
- raise HTTPException(status_code=503, detail="Service not available (DB or batch queue).")
443
-
444
- cache_query = {"original_email_text": request.email_text, "language": request.language, "length": request.length, "style": request.style, "tone": request.tone, "emoji": request.emoji}
445
- cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
446
- if cached_reply_doc:
447
- return GenerateReplyResponse(reply=cached_reply_doc["generated_reply_text"], stored_id=str(cached_reply_doc["_id"]), cached=True)
448
 
449
- request_id = uuid4()
450
- future: asyncio.Future[str] = asyncio.get_event_loop().create_future()
451
- generate_pending_requests[request_id] = future
452
 
453
- try:
454
- await generate_reply_queue.put((request, request_id))
455
- reply_content_str = await asyncio.wait_for(future, timeout=60.0)
456
 
457
- reply_data_to_store = GeneratedReplyData(
458
- original_email_text=request.email_text, generated_reply_text=reply_content_str,
459
- language=request.language, length=request.length, style=request.style,
460
- tone=request.tone, emoji=request.emoji
461
- )
462
- reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
463
-
464
- insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
465
- stored_id = str(insert_result.inserted_id)
466
-
467
- return GenerateReplyResponse(reply=reply_content_str, stored_id=stored_id, cached=False)
468
- except asyncio.TimeoutError:
469
- raise HTTPException(status_code=504, detail="Request timed out while awaiting reply generation in batch.")
470
  except Exception as e:
471
- raise e
472
- finally:
473
- generate_pending_requests.pop(request_id, None)
474
 
475
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query stored extracted email data")
476
  async def query_extracted_emails(query_params: ExtractedEmailQuery = Depends()):
477
- if extracted_emails_collection is None: raise HTTPException(status_code=503, detail="MongoDB not available.")
 
 
 
 
 
478
  mongo_query = {}
479
- if query_params.contact_name: mongo_query["$or"] = [{"contacts.name": {"$regex": query_params.contact_name, "$options": "i"}}, {"contacts.last_name": {"$regex": query_params.contact_name, "$options": "i"}}]
480
- if query_params.appointment_title: mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
481
- if query_params.task_title: mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
 
 
 
 
 
 
 
 
 
482
  date_query = {}
483
- if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
484
- if query_params.to_date: date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
485
- if date_query: mongo_query["processed_at"] = date_query
 
 
 
 
486
  try:
 
487
  cursor = await asyncio.to_thread(extracted_emails_collection.find, mongo_query)
 
488
  results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
 
 
489
  return [ExtractedData(**doc) for doc in results]
490
- except Exception as e: traceback.print_exc(); raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
 
 
 
491
 
492
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query stored generated replies")
493
  async def query_generated_replies(query_params: GeneratedReplyQuery = Depends()):
494
- if generated_replies_collection is None: raise HTTPException(status_code=503, detail="MongoDB not available.")
 
 
 
 
 
495
  mongo_query = {}
496
- if query_params.language: mongo_query["language"] = query_params.language
497
- if query_params.style: mongo_query["style"] = query_params.style
498
- if query_params.tone: mongo_query["tone"] = query_params.tone
 
 
 
 
 
499
  date_query = {}
500
- if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
501
- if query_params.to_date: date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
502
- if date_query: mongo_query["generated_at"] = date_query
 
 
 
 
503
  try:
 
504
  cursor = await asyncio.to_thread(generated_replies_collection.find, mongo_query)
 
505
  results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
 
 
506
  return [GeneratedReplyData(**doc) for doc in results]
507
- except Exception as e: traceback.print_exc(); raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")
 
 
 
1
+ # This software is licensed under a **dual-license model**
2
+ # For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
3
+ # Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
4
  import os
5
  # NUMBA_CACHE_DIR and NUMBA_DISABLE_CACHE are often set for specific environments,
6
  # e.g., if you're experiencing issues with Numba's caching behavior or in containerized environments.
 
14
  from typing import List, Optional, Literal, Dict, Any, Tuple
15
  import traceback
16
  import asyncio
 
17
 
18
  from fastapi import FastAPI, HTTPException, Response, Query, Depends, status
19
+ from fastapi.responses import FileResponse
20
  from fastapi.exception_handlers import http_exception_handler
21
  from starlette.exceptions import HTTPException as StarletteHTTPException
 
22
  from langchain.prompts import PromptTemplate
23
  from langchain_groq import ChatGroq
24
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
25
  from typing_extensions import Annotated
26
+ from pydantic_core import core_schema # Import core_schema for direct use in __get_pydantic_json_schema__
27
 
28
  from pymongo import MongoClient
29
  from pymongo.errors import ConnectionFailure, OperationFailure
30
  from bson import ObjectId
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  # --- MongoDB Configuration ---
33
+ # IMPORTANT: Use environment variables for your MONGO_URI in production for security.
34
+ # Example: MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
35
+ MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net"
36
  DB_NAME = "email_assistant_db"
37
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
38
  GENERATED_REPLIES_COLLECTION = "generated_replies"
39
 
40
+ # Global variables for MongoDB client and collections
41
  client: Optional[MongoClient] = None
42
  db: Optional[Any] = None
43
  extracted_emails_collection: Optional[Any] = None
 
45
 
46
  # --- Pydantic ObjectId Handling ---
47
  class CustomObjectId(str):
48
+ """
49
+ Custom Pydantic type for handling MongoDB ObjectIds.
50
+ It validates that the input is a valid ObjectId string and
51
+ ensures it's represented as a string in JSON Schema.
52
+ """
53
  @classmethod
54
  def __get_validators__(cls):
55
  yield cls.validate
56
+
57
  @classmethod
58
  def validate(cls, v):
59
+ # Allow None or empty string to pass through for optional fields
60
+ # This validator is only called if the field is not None
61
+ # Pydantic's Optional[PyObjectId] handles the None case before this validator
62
  if v is None or v == "":
63
  return None
64
+
65
  if not isinstance(v, (str, ObjectId)):
66
  raise ValueError("ObjectId must be a string or ObjectId instance")
67
+
68
+ # Convert ObjectId to string if it's already an ObjectId instance
69
  if isinstance(v, ObjectId):
70
  return str(v)
71
+
72
+ # Validate string format
73
  if not ObjectId.is_valid(v):
74
  raise ValueError("Invalid ObjectId format")
75
+ return cls(v) # Return an instance of CustomObjectId (which is a str subclass)
76
+
77
+ # This method is crucial for Pydantic v2 to generate correct OpenAPI schema
78
  @classmethod
79
  def __get_pydantic_json_schema__(
80
  cls, _core_schema: core_schema.CoreSchema, handler
81
  ) -> Dict[str, Any]:
82
+ # We tell Pydantic that this custom type should be represented as a standard string
83
+ # in the generated JSON Schema (OpenAPI documentation).
84
  json_schema = handler(core_schema.str_schema())
85
+ json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" # Add an example for clarity
86
  return json_schema
87
 
88
+ # Annotated type for convenience in models
89
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
90
 
91
+
92
  # ---------------------- Models ----------------------
93
  class Contact(BaseModel):
94
  name: str
 
110
  due_date: date
111
 
112
  class ExtractedData(BaseModel):
113
+ # Use PyObjectId for the _id field
114
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
115
  contacts: List[Contact]
116
  appointments: List[Appointment]
117
  tasks: List[Task]
118
  original_email_text: str
119
  processed_at: datetime = Field(default_factory=datetime.utcnow)
120
+
121
  class Config:
122
+ populate_by_name = True # Allow setting 'id' or '_id'
123
+ arbitrary_types_allowed = True # Allow CustomObjectId and ObjectId
124
+
125
+ # Custom serializer for JSON output to ensure ObjectId is converted to string
126
  @model_serializer(when_used='json')
127
  def serialize_model(self):
128
  data = self.model_dump(by_alias=True, exclude_none=True)
129
+ # Ensure _id is a string when serializing to JSON
130
  if "_id" in data and isinstance(data["_id"], ObjectId):
131
  data["_id"] = str(data["_id"])
132
+ # Ensure dates are correctly serialized to ISO format if they are date objects
133
+ # Pydantic v2 usually handles this automatically for `date` types,
134
+ # but explicit conversion can be useful if direct manipulation is expected or for specific formats.
135
  if 'appointments' in data:
136
  for appt in data['appointments']:
137
  if isinstance(appt.get('start_date'), date):
 
158
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
159
 
160
  class GeneratedReplyData(BaseModel):
161
+ # Use PyObjectId for the _id field
162
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
163
  original_email_text: str
164
  generated_reply_text: str
 
168
  tone: str
169
  emoji: str
170
  generated_at: datetime = Field(default_factory=datetime.utcnow)
171
+
172
  class Config:
173
  populate_by_name = True
174
  arbitrary_types_allowed = True
175
+
176
  @model_serializer(when_used='json')
177
  def serialize_model(self):
178
  data = self.model_dump(by_alias=True, exclude_none=True)
 
180
  data["_id"] = str(data["_id"])
181
  return data
182
 
183
+ # NEW: Response Model for /generate-reply endpoint
184
  class GenerateReplyResponse(BaseModel):
185
  reply: str = Field(..., description="The AI-generated reply text.")
186
  stored_id: str = Field(..., description="The MongoDB ID of the stored reply.")
187
  cached: bool = Field(..., description="True if the reply was retrieved from cache, False if newly generated.")
188
 
189
+ # --- Query Models for GET Endpoints ---
190
  class ExtractedEmailQuery(BaseModel):
191
+ contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
192
+ appointment_title: Optional[str] = Query(None, description="Filter by appointment title (case-insensitive partial match).")
193
+ task_title: Optional[str] = Query(None, description="Filter by task title (case-insensitive partial match).")
194
+ from_date: Optional[date] = Query(None, description="Filter by data processed on or after this date (YYYY-MM-DD).")
195
+ to_date: Optional[date] = Query(None, description="Filter by data processed on or before this date (YYYY-MM-DD).")
196
+ limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
197
 
198
  class GeneratedReplyQuery(BaseModel):
199
+ language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by reply language.")
200
+ style: Optional[str] = Query(None, description="Filter by reply style (e.g., Professional, Casual).")
201
+ tone: Optional[str] = Query(None, description="Filter by reply tone (e.g., Friendly, Neutral).")
202
+ from_date: Optional[date] = Query(None, description="Filter by data generated on or after this date (YYYY-MM-DD).")
203
+ to_date: Optional[date] = Query(None, description="Filter by data generated on or before this date (YYYY-MM-DD).")
204
+ limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
205
+
206
+ # ---------------------- Utility Functions ----------------------
207
  def extract_last_json_block(text: str) -> Optional[str]:
208
+ """
209
+ Extracts the last JSON block enclosed in ```json``` from a string,
210
+ or a standalone JSON object if no code block is found.
211
+ """
212
  pattern = r'```json\s*(.*?)\s*```'
213
  matches = re.findall(pattern, text, re.DOTALL)
214
+ if matches:
215
+ return matches[-1].strip()
216
+ # Fallback: try to find a standalone JSON object
217
  match = re.search(r'\{.*\}', text, re.DOTALL)
218
+ if match:
219
+ return match.group(0)
220
  return None
221
 
222
  def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
223
+ """
224
+ Parses a date string, handling 'today', 'tomorrow', and YYYY-MM-DD format.
225
+ Returns None if input is None or cannot be parsed into a valid date.
226
+ """
227
+ if not date_str:
228
+ return None
229
  date_str_lower = date_str.lower().strip()
230
+ if date_str_lower == "today":
231
+ return current_date
232
+ if date_str_lower == "tomorrow":
233
+ return current_date + timedelta(days=1)
234
+ try:
235
+ return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
236
+ except ValueError:
237
+ # If parsing fails, return None. The calling function (normalize_llm_output)
238
+ # will then decide the default (e.g., current_date).
239
+ return None
240
 
241
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
242
+ """
243
+ Normalizes and validates LLM extracted data into ExtractedData Pydantic model.
244
+ Handles defaults for dates and name splitting.
245
+ """
246
  def split_name(full_name: str) -> tuple[str, str]:
247
  parts = full_name.strip().split()
248
  name = parts[0] if parts else ""
249
  last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
250
  return name, last_name
251
+
252
+ contacts_data = []
253
+ for c in data.get("contacts", []):
254
+ name_val, last_name_val = split_name(c.get("name", ""))
255
+ contacts_data.append(Contact(name=name_val, last_name=last_name_val, email=c.get("email"), phone_number=c.get("phone_number")))
256
+
257
+ appointments_data = []
258
+ for a in data.get("appointments", []):
259
+ # Default start_date to current_date if not provided or invalid
260
+ start_date_val = parse_date(a.get("start_date"), current_date) or current_date
261
+ # end_date remains optional
262
+ end_date_val = parse_date(a.get("end_date"), current_date)
263
+
264
+ appointments_data.append(Appointment(
265
+ title=a.get("title", "Untitled"), description=a.get("description", "No description"),
266
+ start_date=start_date_val, start_time=a.get("start_time"),
267
+ end_date=end_date_val, end_time=a.get("end_time")
268
+ ))
269
+
270
+ tasks_data = []
271
+ for t in data.get("tasks", []):
272
+ # Default due_date to current_date if not provided or invalid
273
+ due_date_val = parse_date(t.get("due_date"), current_date) or current_date
274
+ tasks_data.append(Task(
275
+ task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
276
+ due_date=due_date_val
277
+ ))
278
  return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text)
279
 
280
+ # ---------------------- Core Logic (Internal Functions) ----------------------
281
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
282
+ """
283
+ Internal function to process email text using LLM and extract structured data.
284
+ """
285
+ if not email_text:
286
+ raise ValueError("Email text cannot be empty for processing.")
287
+
288
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
289
+
290
  prompt_today_str = current_date.isoformat()
291
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
292
+
293
+ prompt_template_str = f"""
294
+ You are an expert email assistant tasked with extracting structured information from an Italian email.
295
+
296
+ **Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.**
297
+ **DO NOT include any conversational text, explanations, or preambles outside the JSON block.**
298
+ **The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".**
299
+ If a category has no items, its list should be empty (e.g., "contacts": []).
300
+
301
+ Here is the required JSON schema for each category:
302
+
303
+ - **contacts**: List of Contact objects.
304
+ Each Contact object must have:
305
+ - `name` (string, full name)
306
+ - `last_name` (string, last name) - You should infer this from the full name.
307
+ - `email` (string, optional, null if not present)
308
+ - `phone_number` (string, optional, null if not present)
309
+
310
+ - **appointments**: List of Appointment objects.
311
+ Each Appointment object must have:
312
+ - `title` (string, short, meaningful title in Italian based on the meeting's purpose)
313
+ - `description` (string, summary of the meeting's goal)
314
+ - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
315
+ - `start_time` (string, optional, e.g., "10:30 AM", null if not present)
316
+ - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable)
317
+ - `end_time` (string, optional, e.g., "11:00 AM", null if not present)
318
+
319
+ - **tasks**: List of Task objects.
320
+ Each Task object must have:
321
+ - `task_title` (string, short summary of action item)
322
+ - `task_description` (string, more detailed explanation)
323
+ - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
324
+
325
+ ---
326
+
327
+ Email:
328
+ {{email}}
329
+ """
330
+ prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
331
  chain = prompt_template | llm
332
  try:
333
+ llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
334
  llm_output_str = llm_output.content
335
+
336
  json_str = extract_last_json_block(llm_output_str)
337
+
338
+ if not json_str:
339
+ raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
340
  json_data = json.loads(json_str)
 
 
 
341
 
342
+ extracted_data = normalize_llm_output(json_data, current_date, email_text)
343
+ return extracted_data
344
+ except json.JSONDecodeError as e:
345
+ raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
346
+ except Exception as e:
347
+ traceback.print_exc()
348
+ raise Exception(f"An error occurred during email processing: {e}")
349
+
350
+ def _generate_response_internal(
351
+ email_text: str, api_key: str, language: Literal["Italian", "English"],
352
+ length: str, style: str, tone: str, emoji: str
353
+ ) -> str:
354
+ """
355
+ Internal function to generate a reply to an email using LLM.
356
+ """
357
+ print(f"[{datetime.now()}] _generate_response_internal: Starting LLM call. API Key starts with: {api_key[:5]}...") # Debug log
358
+ if not email_text:
359
+ print(f"[{datetime.now()}] _generate_response_internal: Email text is empty.")
360
+ return "Cannot generate reply for empty email text."
361
+
362
  try:
363
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
364
+ prompt_template_str="""
365
+ You are an assistant that helps reply to emails.
366
+
367
+ Create a response to the following email with the following parameters:
368
+ - Language: {language}
369
+ - Length: {length}
370
+ - Style: {style}
371
+ - Tone: {tone}
372
+ - Emoji usage: {emoji}
373
+
374
+ Email:
375
+ {email}
376
+
377
+ Write only the reply body. Do not repeat the email or mention any instruction.
378
+ """
379
+ prompt = PromptTemplate(
380
+ input_variables=["email", "language", "length", "style", "tone", "emoji"],
381
+ template=prompt_template_str
382
+ )
383
  chain = prompt | llm
384
+ print(f"[{datetime.now()}] _generate_response_internal: Invoking LLM chain...") # Debug log
385
  output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
386
+ print(f"[{datetime.now()}] _generate_response_internal: LLM chain returned. Content length: {len(output.content)}.") # Debug log
387
  return output.content.strip()
388
+ except Exception as e:
389
+ print(f"[{datetime.now()}] _generate_response_internal: ERROR during LLM invocation: {e}") # Debug log
390
+ traceback.print_exc() # Print full traceback to logs
391
+ raise # Re-raise the exception so it can be caught by handle_single_reply_request
392
 
 
 
 
 
 
393
 
394
+ # --- FastAPI Application ---
395
+ app = FastAPI(
396
+ title="Email Assistant API",
397
+ description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, and caching.",
398
+ version="1.1.0",
399
+ docs_url="/", # Sets Swagger UI to be the root path
400
+ redoc_url="/redoc"
401
+ )
402
+
403
+ # --- Global Exception Handler ---
404
+ # Catch Starlette HTTPExceptions (FastAPI uses these internally)
405
  @app.exception_handler(StarletteHTTPException)
406
  async def custom_http_exception_handler_wrapper(request, exc):
407
+ """Handles FastAPI's internal HTTP exceptions."""
408
+ print(f"[{datetime.now()}] Caught StarletteHTTPException: {exc.status_code} - {exc.detail}")
409
  return await http_exception_handler(request, exc)
410
 
411
+ # Catch all other unhandled exceptions
412
  @app.exception_handler(Exception)
413
  async def global_exception_handler_wrapper(request, exc):
414
+ """Handles all unhandled exceptions and returns a consistent JSON error response."""
415
+ print(f"[{datetime.now()}] Unhandled exception caught by global handler for request: {request.url}")
416
+ traceback.print_exc() # Print traceback to console for debugging
417
+ # Return a JSON response for consistency, even for unhandled errors
418
+ return Response(
419
+ content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}),
420
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
421
+ media_type="application/json"
422
+ )
423
+
424
+
425
+ # --- FastAPI Event Handlers for MongoDB ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
426
  @app.on_event("startup")
427
  async def startup_event():
428
  global client, db, extracted_emails_collection, generated_replies_collection
 
 
429
  print(f"[{datetime.now()}] FastAPI app startup sequence initiated.")
430
  try:
431
+ # Connect to MongoDB
432
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
433
+ client.admin.command('ping') # Test connection
434
  db = client[DB_NAME]
435
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
436
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
437
  print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}")
438
 
 
 
 
 
 
 
 
 
439
  except (ConnectionFailure, OperationFailure) as e:
440
  print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}")
441
+ client = None
442
+ db = None
443
+ extracted_emails_collection = None
444
+ generated_replies_collection = None
445
  except Exception as e:
446
+ print(f"[{datetime.now()}] ERROR: An unexpected error occurred during MongoDB connection startup: {e}")
447
  traceback.print_exc()
448
+ client = None
449
+ db = None
450
+ extracted_emails_collection = None
451
+ generated_replies_collection = None
452
  finally:
453
+ if client is not None and db is not None:
454
+ try:
455
+ client.admin.command('ping')
456
+ except Exception as e:
457
+ print(f"[{datetime.now()}] MongoDB ping failed after initial connection attempt during finally block: {e}")
458
+ client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
459
+ else:
460
+ print(f"[{datetime.now()}] MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
461
+ if client is None or db is None:
462
+ client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
463
+ print(f"[{datetime.now()}] FastAPI app startup sequence completed for MongoDB client initialization.")
464
 
465
 
466
  @app.on_event("shutdown")
467
+ async def shutdown_event():
468
+ global client
469
  print(f"[{datetime.now()}] FastAPI app shutting down.")
470
+ if client:
 
 
 
 
 
 
 
 
 
471
  client.close()
472
  print(f"[{datetime.now()}] MongoDB client closed.")
473
+
474
 
475
  # --- API Endpoints ---
476
  @app.get("/health", summary="Health Check")
477
  async def health_check():
478
+ """
479
+ Checks the health of the API, including MongoDB connection.
480
+ """
481
  db_status = "MongoDB not connected."
482
  db_ok = False
 
483
  if client is not None and db is not None:
484
  try:
485
+ # Use asyncio.to_thread for blocking MongoDB call
486
+ await asyncio.to_thread(db.list_collection_names)
487
  db_status = "MongoDB connection OK."
488
  db_ok = True
489
  except Exception as e:
490
  db_status = f"MongoDB connection error: {e}"
491
+ db_ok = False
492
+
 
 
 
 
 
 
 
 
493
  if db_ok:
494
+ return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status}
495
  else:
496
+ raise HTTPException(
497
+ status_code=503,
498
+ detail={"message": "Service unavailable.", "database": db_status}
499
+ )
500
+
501
+
502
+ @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email")
503
+ async def generate_email_reply(request: GenerateReplyRequest):
504
+ """
505
+ Generates a smart reply to the provided email text using an LLM.
506
+ The generated reply is also stored in MongoDB for caching and historical purposes.
507
+ """
508
+ if generated_replies_collection is None:
509
+ raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.")
510
+
511
+ try:
512
+ # Check cache first
513
+ cache_query = {
514
+ "original_email_text": request.email_text,
515
+ "language": request.language,
516
+ "length": request.length,
517
+ "style": request.style,
518
+ "tone": request.tone,
519
+ "emoji": request.emoji,
520
+ }
521
+ print(f"[{datetime.now()}] /generate-reply: Checking cache for reply...")
522
+ # Use asyncio.to_thread for blocking MongoDB operations
523
+ cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
524
+
525
+ if cached_reply_doc:
526
+ print(f"[{datetime.now()}] /generate-reply: Reply found in cache. ID: {str(cached_reply_doc['_id'])}")
527
+ return GenerateReplyResponse(
528
+ reply=cached_reply_doc["generated_reply_text"],
529
+ stored_id=str(cached_reply_doc["_id"]),
530
+ cached=True
531
+ )
532
+
533
+ # If not in cache, directly call the internal LLM function
534
+ print(f"[{datetime.now()}] /generate-reply: Reply not in cache. Calling LLM for generation...")
535
+ reply_content = await asyncio.to_thread(
536
+ _generate_response_internal,
537
+ request.email_text,
538
+ request.groq_api_key,
539
+ request.language,
540
+ request.length,
541
+ request.style,
542
+ request.tone,
543
+ request.emoji
544
+ )
545
+ print(f"[{datetime.now()}] /generate-reply: LLM call completed. Storing newly generated reply in MongoDB.")
546
+
547
+ # Prepare data for storage
548
+ reply_data_to_store = GeneratedReplyData(
549
+ original_email_text=request.email_text,
550
+ generated_reply_text=reply_content,
551
+ language=request.language,
552
+ length=request.length,
553
+ style=request.style,
554
+ tone=request.tone,
555
+ emoji=request.emoji
556
+ )
557
+ # Use model_dump for Pydantic v2. Exclude 'id' as it's generated by MongoDB.
558
+ reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
559
 
560
+ # Insert into MongoDB
561
+ insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
562
+ stored_id = str(insert_result.inserted_id) # Convert ObjectId to string for the response
563
+
564
+ print(f"[{datetime.now()}] /generate-reply: Reply stored in MongoDB. ID: {stored_id}")
565
+
566
+ # Return the response as per GenerateReplyResponse model
567
+ return GenerateReplyResponse(
568
+ reply=reply_content,
569
+ stored_id=stored_id,
570
+ cached=False # Always False since we just generated it
571
+ )
572
+ except Exception as e:
573
+ traceback.print_exc()
574
+ # Ensure consistent error response
575
+ raise HTTPException(status_code=500, detail=f"Error generating or storing reply: {str(e)}")
576
+
577
+ @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email")
578
  async def extract_email_data(request: ProcessEmailRequest):
579
+ """
580
+ Extracts contacts, appointments, and tasks from the provided email text.
581
+ """
582
+ if extracted_emails_collection is None:
583
+ raise HTTPException(status_code=503, detail="MongoDB not available.")
584
 
585
+ current_date = date.today() # Get current date for context
 
 
586
 
587
+ print(f"[{datetime.now()}] /extract-data: Received request.")
588
  try:
589
+ print(f"[{datetime.now()}] /extract-data: Calling internal processing function.")
590
+ # Run blocking LLM call in a thread pool
591
+ extracted_data = await asyncio.to_thread(_process_email_internal, request.email_text, request.groq_api_key, current_date)
592
+
593
+ print(f"[{datetime.now()}] /extract-data: Internal processing complete. Preparing for DB insert.")
594
+ # Convert Pydantic model to dictionary for MongoDB insert, handling _id alias
595
+ # Use model_dump for Pydantic v2
596
+ data_to_insert = extracted_data.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
597
+
598
+ # --- NEW CONVERSION FOR MONGODB ---
599
+ # MongoDB's BSON doesn't natively support Python's datetime.date type.
600
+ # It expects datetime.datetime. Convert all date fields to datetime.datetime.
601
  if 'appointments' in data_to_insert:
602
  for appt in data_to_insert['appointments']:
603
+ if isinstance(appt.get('start_date'), date):
604
+ appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
605
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
606
+ appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
607
  if 'tasks' in data_to_insert:
608
  for task_item in data_to_insert['tasks']:
609
+ if isinstance(task_item.get('due_date'), date):
610
+ task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
611
+ # --- END NEW CONVERSION ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
612
 
613
+ print(f"[{datetime.now()}] /extract-data: Inserting into MongoDB... Data: {data_to_insert}") # Add data logging
614
+ # Use asyncio.to_thread for blocking MongoDB insert operation
615
+ insert_result = await asyncio.to_thread(extracted_emails_collection.insert_one, data_to_insert)
616
 
617
+ # Update the extracted_data object with the MongoDB-generated ID
618
+ extracted_data.id = str(insert_result.inserted_id)
619
+ print(f"[{datetime.now()}] /extract-data: Data inserted into MongoDB. ID: {extracted_data.id}")
620
 
621
+ return extracted_data
622
+ except ValueError as ve:
623
+ raise HTTPException(status_code=400, detail=str(ve))
 
 
 
 
 
 
 
 
 
 
624
  except Exception as e:
625
+ traceback.print_exc() # Print full traceback for debugging
626
+ raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}")
627
+
628
 
629
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query stored extracted email data")
630
  async def query_extracted_emails(query_params: ExtractedEmailQuery = Depends()):
631
+ """
632
+ Queries extracted email data from MongoDB based on various filters.
633
+ """
634
+ if extracted_emails_collection is None:
635
+ raise HTTPException(status_code=503, detail="MongoDB not available.")
636
+
637
  mongo_query = {}
638
+ if query_params.contact_name:
639
+ # Case-insensitive partial match on contact name or last name
640
+ mongo_query["$or"] = [
641
+ {"contacts.name": {"$regex": query_params.contact_name, "$options": "i"}},
642
+ {"contacts.last_name": {"$regex": query_params.contact_name, "$options": "i"}}
643
+ ]
644
+ if query_params.appointment_title:
645
+ mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
646
+ if query_params.task_title:
647
+ mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
648
+
649
+ # Date range filtering for processed_at
650
  date_query = {}
651
+ if query_params.from_date:
652
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
653
+ if query_params.to_date:
654
+ date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
655
+ if date_query:
656
+ mongo_query["processed_at"] = date_query
657
+
658
  try:
659
+ # Use asyncio.to_thread for blocking MongoDB find operation
660
  cursor = await asyncio.to_thread(extracted_emails_collection.find, mongo_query)
661
+ # Use to_list to limit results and convert to list
662
  results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
663
+
664
+ # Convert MongoDB documents to ExtractedData Pydantic models
665
  return [ExtractedData(**doc) for doc in results]
666
+ except Exception as e:
667
+ traceback.print_exc()
668
+ raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
669
+
670
 
671
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query stored generated replies")
672
  async def query_generated_replies(query_params: GeneratedReplyQuery = Depends()):
673
+ """
674
+ Queries generated email replies from MongoDB based on various filters.
675
+ """
676
+ if generated_replies_collection is None:
677
+ raise HTTPException(status_code=503, detail="MongoDB not available.")
678
+
679
  mongo_query = {}
680
+ if query_params.language:
681
+ mongo_query["language"] = query_params.language
682
+ if query_params.style:
683
+ mongo_query["style"] = query_params.style
684
+ if query_params.tone:
685
+ mongo_query["tone"] = query_params.tone
686
+
687
+ # Date range filtering for generated_at
688
  date_query = {}
689
+ if query_params.from_date:
690
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
691
+ if query_params.to_date:
692
+ date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
693
+ if date_query:
694
+ mongo_query["generated_at"] = date_query
695
+
696
  try:
697
+ # Use asyncio.to_thread for blocking MongoDB find operation
698
  cursor = await asyncio.to_thread(generated_replies_collection.find, mongo_query)
699
+ # Use to_list to limit results and convert to list
700
  results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
701
+
702
+ # Convert MongoDB documents to GeneratedReplyData Pydantic models
703
  return [GeneratedReplyData(**doc) for doc in results]
704
+ except Exception as e:
705
+ traceback.print_exc()
706
+ raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")