precison9 commited on
Commit
1a0e62f
·
verified ·
1 Parent(s): 1fc423c

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +262 -113
flask_Character.py CHANGED
@@ -1,12 +1,21 @@
 
 
 
 
 
 
 
 
 
 
1
  import json
2
  import re
3
  from datetime import date, datetime, timedelta
4
  from typing import List, Optional, Literal, Dict, Any, Tuple
5
- import os
6
  import traceback
7
  import asyncio
8
 
9
- from fastapi import FastAPI, HTTPException, Response, Query, Depends
10
  from fastapi.responses import FileResponse
11
  from fastapi.exception_handlers import http_exception_handler
12
  from starlette.exceptions import HTTPException as StarletteHTTPException
@@ -14,44 +23,72 @@ from langchain.prompts import PromptTemplate
14
  from langchain_groq import ChatGroq
15
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
16
  from typing_extensions import Annotated
17
- import uvicorn
18
 
19
  from pymongo import MongoClient
20
  from pymongo.errors import ConnectionFailure, OperationFailure
21
  from bson import ObjectId
22
 
23
  # --- MongoDB Configuration ---
24
- MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net" # Replace with your actual URI
 
 
25
  DB_NAME = "email_assistant_db"
26
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
27
  GENERATED_REPLIES_COLLECTION = "generated_replies"
28
 
 
29
  client: Optional[MongoClient] = None
30
- db: Optional[Any] = None # Changed to Optional[Any] as Database type is not directly imported for annotation here
31
  extracted_emails_collection: Optional[Any] = None
32
  generated_replies_collection: Optional[Any] = None
33
 
34
  # --- Pydantic ObjectId Handling ---
35
  class CustomObjectId(str):
 
 
 
 
 
36
  @classmethod
37
  def __get_validators__(cls):
38
  yield cls.validate
39
 
40
  @classmethod
41
- def validate(cls, v, info):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  if not ObjectId.is_valid(v):
43
- raise ValueError("Invalid ObjectId")
44
- return str(v)
45
 
 
46
  @classmethod
47
- def __get_pydantic_json_schema__(cls, core_schema, handler):
48
- json_schema = handler(core_schema)
49
- json_schema["type"] = "string"
50
- json_schema["example"] = "60c728ef238b9c7b9e0f6c2a"
 
 
 
51
  return json_schema
52
 
 
53
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
54
 
 
55
  # ---------------------- Models ----------------------
56
  class Contact(BaseModel):
57
  name: str
@@ -73,6 +110,7 @@ class Task(BaseModel):
73
  due_date: date
74
 
75
  class ExtractedData(BaseModel):
 
76
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
77
  contacts: List[Contact]
78
  appointments: List[Appointment]
@@ -81,14 +119,19 @@ class ExtractedData(BaseModel):
81
  processed_at: datetime = Field(default_factory=datetime.utcnow)
82
 
83
  class Config:
84
- populate_by_name = True
85
- arbitrary_types_allowed = True
86
 
 
87
  @model_serializer(when_used='json')
88
  def serialize_model(self):
89
  data = self.model_dump(by_alias=True, exclude_none=True)
 
90
  if "_id" in data and isinstance(data["_id"], ObjectId):
91
  data["_id"] = str(data["_id"])
 
 
 
92
  if 'appointments' in data:
93
  for appt in data['appointments']:
94
  if isinstance(appt.get('start_date'), date):
@@ -115,6 +158,7 @@ class GenerateReplyRequest(BaseModel):
115
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
116
 
117
  class GeneratedReplyData(BaseModel):
 
118
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
119
  original_email_text: str
120
  generated_reply_text: str
@@ -155,30 +199,44 @@ class GeneratedReplyQuery(BaseModel):
155
 
156
  # ---------------------- Utility Functions ----------------------
157
  def extract_last_json_block(text: str) -> Optional[str]:
 
 
 
 
158
  pattern = r'```json\s*(.*?)\s*```'
159
  matches = re.findall(pattern, text, re.DOTALL)
160
  if matches:
161
  return matches[-1].strip()
 
162
  match = re.search(r'\{.*\}', text, re.DOTALL)
163
  if match:
164
  return match.group(0)
165
  return None
166
 
167
- def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]: # Allow None to pass through
168
- if not date_str: return None # If input is None (e.g. optional end_date), return None
 
 
 
 
 
169
  date_str_lower = date_str.lower().strip()
170
- if date_str_lower == "today": return current_date
171
- if date_str_lower == "tomorrow": return current_date + timedelta(days=1)
 
 
172
  try:
173
  return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
174
  except ValueError:
175
- # For "appointments" "start_date", you assumed "today" if parsing failed or not present.
176
- # For "end_date" it was optional. We need to be consistent.
177
- # Given the original normalize_llm_output, start_date defaulted to today, end_date was optional.
178
- # This parse_date is more general. The default handling should be in normalize_llm_output.
179
- return current_date # Fallback, or raise error, or return None depending on strictness
180
 
181
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
 
 
 
 
182
  def split_name(full_name: str) -> tuple[str, str]:
183
  parts = full_name.strip().split()
184
  name = parts[0] if parts else ""
@@ -192,8 +250,10 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
192
 
193
  appointments_data = []
194
  for a in data.get("appointments", []):
195
- start_date_val = parse_date(a.get("start_date", "today"), current_date) or current_date # Default to current_date if parse_date returns None
196
- end_date_val = parse_date(a.get("end_date"), current_date) # parse_date can return None if end_date is not provided or invalid
 
 
197
 
198
  appointments_data.append(Appointment(
199
  title=a.get("title", "Untitled"), description=a.get("description", "No description"),
@@ -203,7 +263,8 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
203
 
204
  tasks_data = []
205
  for t in data.get("tasks", []):
206
- due_date_val = parse_date(t.get("due_date", "today"), current_date) or current_date # Default to current_date
 
207
  tasks_data.append(Task(
208
  task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
209
  due_date=due_date_val
@@ -212,11 +273,17 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
212
 
213
  # ---------------------- Core Logic (Internal Functions) ----------------------
214
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
215
- if not email_text: raise ValueError("Email text cannot be empty for processing.")
 
 
 
 
 
216
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
 
217
  prompt_today_str = current_date.isoformat()
218
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
219
- # Ensure your full, detailed prompt is used here
220
  prompt_template_str = f"""
221
  You are an expert email assistant tasked with extracting structured information from an Italian email.
222
 
@@ -257,24 +324,18 @@ Email:
257
  prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
258
  chain = prompt_template | llm
259
  try:
260
- # print(f"DEBUG: Invoking LLM with email_text length: {len(email_text)} and current_date: {current_date}")
261
  llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
262
  llm_output_str = llm_output.content
263
- # print(f"DEBUG: Raw LLM output:\n{llm_output_str[:500]}...")
264
 
265
  json_str = extract_last_json_block(llm_output_str)
266
- # print(f"DEBUG: Extracted JSON string:\n{json_str}")
267
 
268
- if not json_str: raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
 
269
  json_data = json.loads(json_str)
270
- # print(f"DEBUG: Parsed JSON data: {json.dumps(json_data, indent=2)}")
271
 
272
  extracted_data = normalize_llm_output(json_data, current_date, email_text)
273
- # print("DEBUG: Data normalized successfully.")
274
  return extracted_data
275
  except json.JSONDecodeError as e:
276
- # print(f"ERROR: JSON Decode Error: {e}")
277
- # print(f"ERROR: LLM response that caused error:\n{llm_output_str}")
278
  raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
279
  except Exception as e:
280
  traceback.print_exc()
@@ -284,9 +345,12 @@ def _generate_response_internal(
284
  email_text: str, api_key: str, language: Literal["Italian", "English"],
285
  length: str, style: str, tone: str, emoji: str
286
  ) -> str:
287
- if not email_text: return "Cannot generate reply for empty email text."
 
 
 
 
288
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
289
- # Ensure your full, detailed prompt is used here
290
  prompt_template_str="""
291
  You are an assistant that helps reply to emails.
292
 
@@ -312,7 +376,7 @@ Write only the reply body. Do not repeat the email or mention any instruction.
312
 
313
  # --- Batching and Caching Configuration ---
314
  MAX_BATCH_SIZE = 20
315
- BATCH_TIMEOUT = 0.5 # seconds
316
 
317
  reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
318
  reply_queue_lock = asyncio.Lock()
@@ -327,7 +391,7 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
327
  return
328
  try:
329
  if generated_replies_collection is None:
330
- raise HTTPException(status_code=503, detail="Database service not available for caching/storage.")
331
 
332
  cache_query = {
333
  "original_email_text": request_data.email_text,
@@ -337,6 +401,7 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
337
  "tone": request_data.tone,
338
  "emoji": request_data.emoji,
339
  }
 
340
  cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
341
 
342
  if cached_reply_doc:
@@ -345,7 +410,8 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
345
  "stored_id": str(cached_reply_doc["_id"]),
346
  "cached": True
347
  }
348
- if not future.done(): future.set_result(response)
 
349
  return
350
 
351
  reply_content = await asyncio.to_thread(
@@ -368,6 +434,7 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
368
  tone=request_data.tone,
369
  emoji=request_data.emoji
370
  )
 
371
  reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
372
 
373
  insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
@@ -378,12 +445,15 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
378
  "stored_id": stored_id,
379
  "cached": False
380
  }
381
- if not future.done(): future.set_result(final_response)
 
382
 
383
  except Exception as e:
384
  traceback.print_exc()
385
  if not future.done():
386
- future.set_exception(e)
 
 
387
 
388
  async def process_reply_batches():
389
  """Continuously processes requests from the reply_request_queue in batches."""
@@ -392,13 +462,16 @@ async def process_reply_batches():
392
  batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
393
  async with reply_queue_condition:
394
  if not reply_request_queue:
 
395
  await reply_queue_condition.wait()
 
396
  if not reply_request_queue:
397
  continue
398
 
399
  now = asyncio.get_event_loop().time()
400
  oldest_item_timestamp = reply_request_queue[0][2]
401
 
 
402
  if len(reply_request_queue) >= MAX_BATCH_SIZE or \
403
  (now - oldest_item_timestamp >= BATCH_TIMEOUT):
404
  num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
@@ -406,60 +479,78 @@ async def process_reply_batches():
406
  req, fut, _ = reply_request_queue.pop(0)
407
  batch_to_fire.append((req, fut))
408
  else:
 
409
  time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
410
  try:
411
  await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
412
  except asyncio.TimeoutError:
413
- pass # Loop will re-evaluate
414
 
415
  if batch_to_fire:
416
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
 
417
  await asyncio.gather(*tasks)
418
  else:
419
- await asyncio.sleep(0.001)
 
 
420
 
421
 
422
  # ---------------------- FastAPI Application ----------------------
423
  app = FastAPI(
424
  title="Email Assistant API",
425
  description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
426
- version="1.1.0", # Incremented version
427
- docs_url="/",
428
  redoc_url="/redoc"
429
  )
430
 
431
  # --- Global Exception Handler ---
 
432
  @app.exception_handler(StarletteHTTPException)
433
  async def custom_http_exception_handler_wrapper(request, exc):
 
434
  return await http_exception_handler(request, exc)
435
 
 
436
  @app.exception_handler(Exception)
437
  async def global_exception_handler_wrapper(request, exc):
 
438
  print(f"Unhandled exception caught by global handler for request: {request.url}")
439
- traceback.print_exc()
440
- # Ensure it returns a valid FastAPI response
441
- return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}"}), status_code=500, media_type="application/json")
 
 
 
 
442
 
443
 
444
  # --- FastAPI Event Handlers for MongoDB & Batch Processor ---
445
  @app.on_event("startup")
446
  async def startup_event():
447
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
 
448
  try:
 
449
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
450
- client.admin.command('ping')
451
  db = client[DB_NAME]
452
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
453
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
454
  print(f"Successfully connected to MongoDB: {DB_NAME}")
455
 
456
- if batch_processor_task is None:
457
- loop = asyncio.get_event_loop()
458
- batch_processor_task = loop.create_task(process_reply_batches())
 
459
  print("Batch processor task for replies started.")
 
 
460
 
461
  except (ConnectionFailure, OperationFailure) as e:
462
  print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
 
463
  client = None
464
  db = None
465
  extracted_emails_collection = None
@@ -472,154 +563,208 @@ async def startup_event():
472
  extracted_emails_collection = None
473
  generated_replies_collection = None
474
  finally:
475
- # Corrected condition for checking client and db
476
  if client is not None and db is not None:
477
  try:
 
478
  client.admin.command('ping')
479
- except Exception:
480
- print("MongoDB ping failed after initial connection attempt during finally block.")
481
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
482
  else:
483
- print("MongoDB client or db object is None after connection attempt in startup.")
484
- if client is None or db is None: # Ensure all are None if one is
 
485
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
486
- print("FastAPI app starting up. MongoDB client & Batch Processor initialization attempted.")
487
 
488
 
489
  @app.on_event("shutdown")
490
  async def shutdown_event():
491
  global client, batch_processor_task
 
 
492
  if batch_processor_task:
493
  batch_processor_task.cancel()
494
  try:
 
495
  await batch_processor_task
496
  except asyncio.CancelledError:
497
- print("Batch processor task for replies cancelled.")
498
  except Exception as e:
499
  print(f"Error during batch processor task shutdown: {e}")
500
  traceback.print_exc()
501
  batch_processor_task = None
502
 
 
503
  if client:
504
  client.close()
505
- print("FastAPI app shutting down. MongoDB client closed.")
506
 
507
 
 
508
  @app.get("/health", summary="Health Check")
509
  async def health_check():
510
- db_status = "MongoDB not connected. Check server startup logs."
 
 
 
511
  db_ok = False
512
- if client is not None and db is not None: # Corrected check
513
  try:
514
- db.list_collection_names()
 
 
515
  db_status = "MongoDB connection OK."
516
  db_ok = True
517
  except Exception as e:
518
  db_status = f"MongoDB connection error: {e}"
 
519
 
520
- batch_processor_status = "Batch processor not running or state unknown."
521
- if batch_processor_task is not None :
522
  if not batch_processor_task.done():
523
- batch_processor_status = "Batch processor is running."
524
  else:
525
- batch_processor_status = "Batch processor task is done (may have completed or errored)."
526
-
 
 
 
 
 
 
527
  if db_ok:
528
- return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
529
  else:
530
- # Return a JSON response for HTTPException as well for consistency
531
  raise HTTPException(
532
- status_code=503,
533
- detail={"message": "Service unavailable.", "database": db_status, "batch_processor": batch_processor_status}
534
  )
535
 
536
 
537
  @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
538
  async def extract_email_data(request: ProcessEmailRequest):
 
 
 
 
539
  if extracted_emails_collection is None:
540
- raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
541
  try:
542
  current_date_val = date.today()
 
543
  extracted_data = await asyncio.to_thread(
544
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
545
  )
 
 
546
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
547
- for appt in extracted_data_dict.get('appointments', []):
548
- if isinstance(appt.get('start_date'), date): appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
549
- if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
550
- for task_item in extracted_data_dict.get('tasks', []):
551
- if isinstance(task_item.get('due_date'), date): task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
 
 
 
 
 
 
552
 
 
553
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
554
- # Pydantic model expects string ID, convert from ObjectId before assigning if needed
555
- # However, PyObjectId should handle this if result.inserted_id is ObjectId
556
- extracted_data.id = str(result.inserted_id) if isinstance(result.inserted_id, ObjectId) else result.inserted_id
 
557
  return extracted_data
558
  except ValueError as e:
559
- raise HTTPException(status_code=400, detail=str(e))
560
  except Exception as e:
561
  traceback.print_exc()
562
- raise HTTPException(status_code=500, detail=f"Internal server error during data extraction: {e}")
563
 
564
 
565
  @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
566
  async def extract_email_data_excel(request: ProcessEmailRequest):
567
- raise HTTPException(status_code=501, detail="Excel functionality is currently disabled.")
 
 
 
 
568
 
569
 
570
  @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
571
  async def generate_email_reply(request: GenerateReplyRequest):
 
 
 
 
572
  if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
573
- raise HTTPException(status_code=503, detail="Reply generation service not fully initialized. Check server logs.")
574
 
575
  future = asyncio.Future()
576
  current_time = asyncio.get_event_loop().time()
577
 
578
  async with reply_queue_condition:
579
  reply_request_queue.append((request, future, current_time))
580
- reply_queue_condition.notify()
581
 
582
  try:
583
- client_timeout = BATCH_TIMEOUT + 10.0
 
 
584
  result = await asyncio.wait_for(future, timeout=client_timeout)
585
  return result
586
  except asyncio.TimeoutError:
 
587
  if not future.done():
588
  future.cancel()
589
- raise HTTPException(status_code=504, detail=f"Request timed out after {client_timeout}s waiting for batch processing.")
590
  except Exception as e:
 
591
  if isinstance(e, HTTPException):
592
  raise e
593
  traceback.print_exc()
594
- raise HTTPException(status_code=500, detail=f"Error processing your reply request: {str(e)}")
595
 
596
 
597
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
598
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
599
  if extracted_emails_collection is None:
600
- raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
601
  mongo_query: Dict[str, Any] = {}
602
- if query_params.contact_name: mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"}
603
- if query_params.appointment_title: mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
604
- if query_params.task_title: mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
 
 
 
605
 
606
  if query_params.from_date or query_params.to_date:
607
  date_query: Dict[str, datetime] = {}
608
- if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
609
- if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
610
- if date_query : mongo_query["processed_at"] = date_query
 
 
 
 
611
 
612
  try:
613
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
614
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
615
-
616
  results = []
617
  for doc_raw in extracted_docs_raw:
618
  # Convert _id to string for Pydantic model if it's an ObjectId
619
- if isinstance(doc_raw.get("_id"), ObjectId):
620
- doc_raw["_id"] = str(doc_raw["_id"])
621
-
622
- # Convert datetime objects back to date objects for Pydantic model fields that are `date`
 
623
  if 'appointments' in doc_raw:
624
  for appt in doc_raw['appointments']:
625
  if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
@@ -631,13 +776,13 @@ async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = De
631
  return results
632
  except Exception as e:
633
  traceback.print_exc()
634
- raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
635
 
636
 
637
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
638
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
639
  if generated_replies_collection is None:
640
- raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.")
641
  mongo_query: Dict[str, Any] = {}
642
  if query_params.language: mongo_query["language"] = query_params.language
643
  if query_params.style: mongo_query["style"] = query_params.style
@@ -645,20 +790,24 @@ async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = D
645
 
646
  if query_params.from_date or query_params.to_date:
647
  date_query: Dict[str, datetime] = {}
648
- if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
649
- if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
650
- if date_query: mongo_query["generated_at"] = date_query
651
-
 
 
 
 
652
  try:
653
  cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
654
  generated_docs_raw = await asyncio.to_thread(list, cursor)
655
  results = []
656
  for doc_raw in generated_docs_raw:
657
- if isinstance(doc_raw.get("_id"), ObjectId):
658
- doc_raw["_id"] = str(doc_raw["_id"])
 
659
  results.append(GeneratedReplyData(**doc_raw))
660
  return results
661
  except Exception as e:
662
  traceback.print_exc()
663
- raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")
664
-
 
1
+ # This software is licensed under a **dual-license model**
2
+ # For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
3
+ # Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
4
+ import os
5
+ # NUMBA_CACHE_DIR and NUMBA_DISABLE_CACHE are often set for specific environments,
6
+ # e.g., if you're experiencing issues with Numba's caching behavior or in containerized environments.
7
+ # Keep them if they serve a specific purpose in your deployment environment.
8
+ os.environ["NUMBA_CACHE_DIR"] = "/tmp/numba_cache"
9
+ os.environ["NUMBA_DISABLE_CACHE"] = "1"
10
+
11
  import json
12
  import re
13
  from datetime import date, datetime, timedelta
14
  from typing import List, Optional, Literal, Dict, Any, Tuple
 
15
  import traceback
16
  import asyncio
17
 
18
+ from fastapi import FastAPI, HTTPException, Response, Query, Depends, status
19
  from fastapi.responses import FileResponse
20
  from fastapi.exception_handlers import http_exception_handler
21
  from starlette.exceptions import HTTPException as StarletteHTTPException
 
23
  from langchain_groq import ChatGroq
24
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
25
  from typing_extensions import Annotated
26
+ from pydantic_core import core_schema # Import core_schema for direct use in __get_pydantic_json_schema__
27
 
28
  from pymongo import MongoClient
29
  from pymongo.errors import ConnectionFailure, OperationFailure
30
  from bson import ObjectId
31
 
32
  # --- MongoDB Configuration ---
33
+ # IMPORTANT: Use environment variables for your MONGO_URI in production for security.
34
+ # Example: MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
35
+ MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net"
36
  DB_NAME = "email_assistant_db"
37
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
38
  GENERATED_REPLIES_COLLECTION = "generated_replies"
39
 
40
+ # Global variables for MongoDB client and collections
41
  client: Optional[MongoClient] = None
42
+ db: Optional[Any] = None
43
  extracted_emails_collection: Optional[Any] = None
44
  generated_replies_collection: Optional[Any] = None
45
 
46
  # --- Pydantic ObjectId Handling ---
47
  class CustomObjectId(str):
48
+ """
49
+ Custom Pydantic type for handling MongoDB ObjectIds.
50
+ It validates that the input is a valid ObjectId string and
51
+ ensures it's represented as a string in JSON Schema.
52
+ """
53
  @classmethod
54
  def __get_validators__(cls):
55
  yield cls.validate
56
 
57
  @classmethod
58
+ def validate(cls, v):
59
+ # Allow None or empty string to pass through for optional fields
60
+ # This validator is only called if the field is not None
61
+ # Pydantic's Optional[PyObjectId] handles the None case before this validator
62
+ if v is None or v == "":
63
+ return None # Should not be reached if Optional[PyObjectId] is used correctly
64
+
65
+ if not isinstance(v, (str, ObjectId)):
66
+ raise ValueError("ObjectId must be a string or ObjectId instance")
67
+
68
+ # Convert ObjectId to string if it's already an ObjectId instance
69
+ if isinstance(v, ObjectId):
70
+ return str(v)
71
+
72
+ # Validate string format
73
  if not ObjectId.is_valid(v):
74
+ raise ValueError("Invalid ObjectId format")
75
+ return cls(v) # Return an instance of CustomObjectId (which is a str subclass)
76
 
77
+ # This method is crucial for Pydantic v2 to generate correct OpenAPI schema
78
  @classmethod
79
+ def __get_pydantic_json_schema__(
80
+ cls, _core_schema: core_schema.CoreSchema, handler
81
+ ) -> Dict[str, Any]:
82
+ # We tell Pydantic that this custom type should be represented as a standard string
83
+ # in the generated JSON Schema (OpenAPI documentation).
84
+ json_schema = handler(core_schema.str_schema())
85
+ json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" # Add an example for clarity
86
  return json_schema
87
 
88
+ # Annotated type for convenience in models
89
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
90
 
91
+
92
  # ---------------------- Models ----------------------
93
  class Contact(BaseModel):
94
  name: str
 
110
  due_date: date
111
 
112
  class ExtractedData(BaseModel):
113
+ # Use PyObjectId for the _id field
114
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
115
  contacts: List[Contact]
116
  appointments: List[Appointment]
 
119
  processed_at: datetime = Field(default_factory=datetime.utcnow)
120
 
121
  class Config:
122
+ populate_by_name = True # Allow setting 'id' or '_id'
123
+ arbitrary_types_allowed = True # Allow CustomObjectId and ObjectId
124
 
125
+ # Custom serializer for JSON output to ensure ObjectId is converted to string
126
  @model_serializer(when_used='json')
127
  def serialize_model(self):
128
  data = self.model_dump(by_alias=True, exclude_none=True)
129
+ # Ensure _id is a string when serializing to JSON
130
  if "_id" in data and isinstance(data["_id"], ObjectId):
131
  data["_id"] = str(data["_id"])
132
+ # Ensure dates are correctly serialized to ISO format if they are date objects
133
+ # Pydantic v2 usually handles this automatically for `date` types,
134
+ # but explicit conversion can be useful if direct manipulation is expected or for specific formats.
135
  if 'appointments' in data:
136
  for appt in data['appointments']:
137
  if isinstance(appt.get('start_date'), date):
 
158
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
159
 
160
  class GeneratedReplyData(BaseModel):
161
+ # Use PyObjectId for the _id field
162
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
163
  original_email_text: str
164
  generated_reply_text: str
 
199
 
200
  # ---------------------- Utility Functions ----------------------
201
  def extract_last_json_block(text: str) -> Optional[str]:
202
+ """
203
+ Extracts the last JSON block enclosed in ```json``` from a string,
204
+ or a standalone JSON object if no code block is found.
205
+ """
206
  pattern = r'```json\s*(.*?)\s*```'
207
  matches = re.findall(pattern, text, re.DOTALL)
208
  if matches:
209
  return matches[-1].strip()
210
+ # Fallback: try to find a standalone JSON object
211
  match = re.search(r'\{.*\}', text, re.DOTALL)
212
  if match:
213
  return match.group(0)
214
  return None
215
 
216
+ def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
217
+ """
218
+ Parses a date string, handling 'today', 'tomorrow', and YYYY-MM-DD format.
219
+ Returns None if input is None or cannot be parsed into a valid date.
220
+ """
221
+ if not date_str:
222
+ return None
223
  date_str_lower = date_str.lower().strip()
224
+ if date_str_lower == "today":
225
+ return current_date
226
+ if date_str_lower == "tomorrow":
227
+ return current_date + timedelta(days=1)
228
  try:
229
  return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
230
  except ValueError:
231
+ # If parsing fails, return None. The calling function (normalize_llm_output)
232
+ # will then decide the default (e.g., current_date).
233
+ return None
 
 
234
 
235
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
236
+ """
237
+ Normalizes and validates LLM extracted data into ExtractedData Pydantic model.
238
+ Handles defaults for dates and name splitting.
239
+ """
240
  def split_name(full_name: str) -> tuple[str, str]:
241
  parts = full_name.strip().split()
242
  name = parts[0] if parts else ""
 
250
 
251
  appointments_data = []
252
  for a in data.get("appointments", []):
253
+ # Default start_date to current_date if not provided or invalid
254
+ start_date_val = parse_date(a.get("start_date"), current_date) or current_date
255
+ # end_date remains optional
256
+ end_date_val = parse_date(a.get("end_date"), current_date)
257
 
258
  appointments_data.append(Appointment(
259
  title=a.get("title", "Untitled"), description=a.get("description", "No description"),
 
263
 
264
  tasks_data = []
265
  for t in data.get("tasks", []):
266
+ # Default due_date to current_date if not provided or invalid
267
+ due_date_val = parse_date(t.get("due_date"), current_date) or current_date
268
  tasks_data.append(Task(
269
  task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
270
  due_date=due_date_val
 
273
 
274
  # ---------------------- Core Logic (Internal Functions) ----------------------
275
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
276
+ """
277
+ Internal function to process email text using LLM and extract structured data.
278
+ """
279
+ if not email_text:
280
+ raise ValueError("Email text cannot be empty for processing.")
281
+
282
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
283
+
284
  prompt_today_str = current_date.isoformat()
285
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
286
+
287
  prompt_template_str = f"""
288
  You are an expert email assistant tasked with extracting structured information from an Italian email.
289
 
 
324
  prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
325
  chain = prompt_template | llm
326
  try:
 
327
  llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
328
  llm_output_str = llm_output.content
 
329
 
330
  json_str = extract_last_json_block(llm_output_str)
 
331
 
332
+ if not json_str:
333
+ raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
334
  json_data = json.loads(json_str)
 
335
 
336
  extracted_data = normalize_llm_output(json_data, current_date, email_text)
 
337
  return extracted_data
338
  except json.JSONDecodeError as e:
 
 
339
  raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
340
  except Exception as e:
341
  traceback.print_exc()
 
345
  email_text: str, api_key: str, language: Literal["Italian", "English"],
346
  length: str, style: str, tone: str, emoji: str
347
  ) -> str:
348
+ """
349
+ Internal function to generate a reply to an email using LLM.
350
+ """
351
+ if not email_text:
352
+ return "Cannot generate reply for empty email text."
353
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
 
354
  prompt_template_str="""
355
  You are an assistant that helps reply to emails.
356
 
 
376
 
377
  # --- Batching and Caching Configuration ---
378
  MAX_BATCH_SIZE = 20
379
+ BATCH_TIMEOUT = 0.5 # seconds (Adjust based on expected LLM response time and desired latency)
380
 
381
  reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
382
  reply_queue_lock = asyncio.Lock()
 
391
  return
392
  try:
393
  if generated_replies_collection is None:
394
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service not available for caching/storage.")
395
 
396
  cache_query = {
397
  "original_email_text": request_data.email_text,
 
401
  "tone": request_data.tone,
402
  "emoji": request_data.emoji,
403
  }
404
+ # Use await asyncio.to_thread for blocking MongoDB operations
405
  cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
406
 
407
  if cached_reply_doc:
 
410
  "stored_id": str(cached_reply_doc["_id"]),
411
  "cached": True
412
  }
413
+ if not future.done():
414
+ future.set_result(response)
415
  return
416
 
417
  reply_content = await asyncio.to_thread(
 
434
  tone=request_data.tone,
435
  emoji=request_data.emoji
436
  )
437
+ # Use model_dump for Pydantic v2
438
  reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
439
 
440
  insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
 
445
  "stored_id": stored_id,
446
  "cached": False
447
  }
448
+ if not future.done():
449
+ future.set_result(final_response)
450
 
451
  except Exception as e:
452
  traceback.print_exc()
453
  if not future.done():
454
+ # Set the exception on the future so the client can catch it
455
+ future.set_exception(HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate reply: {e}"))
456
+
457
 
458
  async def process_reply_batches():
459
  """Continuously processes requests from the reply_request_queue in batches."""
 
462
  batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
463
  async with reply_queue_condition:
464
  if not reply_request_queue:
465
+ # Wait for new requests or timeout
466
  await reply_queue_condition.wait()
467
+ # After waking up, re-check if queue is still empty
468
  if not reply_request_queue:
469
  continue
470
 
471
  now = asyncio.get_event_loop().time()
472
  oldest_item_timestamp = reply_request_queue[0][2]
473
 
474
+ # Condition to trigger batch processing: queue is full OR timeout reached for oldest item
475
  if len(reply_request_queue) >= MAX_BATCH_SIZE or \
476
  (now - oldest_item_timestamp >= BATCH_TIMEOUT):
477
  num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
 
479
  req, fut, _ = reply_request_queue.pop(0)
480
  batch_to_fire.append((req, fut))
481
  else:
482
+ # Calculate time to wait for the next batch or timeout
483
  time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
484
  try:
485
  await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
486
  except asyncio.TimeoutError:
487
+ pass # Loop will re-evaluate and likely fire the batch
488
 
489
  if batch_to_fire:
490
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
491
+ # Use asyncio.gather to run all tasks in the batch concurrently
492
  await asyncio.gather(*tasks)
493
  else:
494
+ # Short sleep to prevent busy-waiting if queue is empty but not waiting
495
+ # (e.g., if a notify happened just before the wait, but queue was already processed)
496
+ await asyncio.sleep(0.001)
497
 
498
 
499
  # ---------------------- FastAPI Application ----------------------
500
  app = FastAPI(
501
  title="Email Assistant API",
502
  description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
503
+ version="1.1.0",
504
+ docs_url="/", # Sets Swagger UI to be the root path
505
  redoc_url="/redoc"
506
  )
507
 
508
  # --- Global Exception Handler ---
509
+ # Catch Starlette HTTPExceptions (FastAPI uses these internally)
510
  @app.exception_handler(StarletteHTTPException)
511
  async def custom_http_exception_handler_wrapper(request, exc):
512
+ """Handles FastAPI's internal HTTP exceptions."""
513
  return await http_exception_handler(request, exc)
514
 
515
+ # Catch all other unhandled exceptions
516
  @app.exception_handler(Exception)
517
  async def global_exception_handler_wrapper(request, exc):
518
+ """Handles all unhandled exceptions and returns a consistent JSON error response."""
519
  print(f"Unhandled exception caught by global handler for request: {request.url}")
520
+ traceback.print_exc() # Print traceback to console for debugging
521
+ # Return a JSON response for consistency, even for unhandled errors
522
+ return Response(
523
+ content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}),
524
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
525
+ media_type="application/json"
526
+ )
527
 
528
 
529
  # --- FastAPI Event Handlers for MongoDB & Batch Processor ---
530
  @app.on_event("startup")
531
  async def startup_event():
532
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
533
+ print("FastAPI app startup sequence initiated.")
534
  try:
535
+ # Connect to MongoDB
536
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
537
+ client.admin.command('ping') # Test connection
538
  db = client[DB_NAME]
539
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
540
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
541
  print(f"Successfully connected to MongoDB: {DB_NAME}")
542
 
543
+ # Start the batch processor task if not already running
544
+ # Using asyncio.create_task is the correct way to run background tasks in FastAPI
545
+ if batch_processor_task is None or batch_processor_task.done():
546
+ batch_processor_task = asyncio.create_task(process_reply_batches())
547
  print("Batch processor task for replies started.")
548
+ else:
549
+ print("Batch processor task for replies is already running or being initialized.")
550
 
551
  except (ConnectionFailure, OperationFailure) as e:
552
  print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
553
+ # Ensure all DB related globals are reset to None if connection fails
554
  client = None
555
  db = None
556
  extracted_emails_collection = None
 
563
  extracted_emails_collection = None
564
  generated_replies_collection = None
565
  finally:
566
+ # Final check and logging for MongoDB connection status
567
  if client is not None and db is not None:
568
  try:
569
+ # One last ping to confirm connection before app fully starts
570
  client.admin.command('ping')
571
+ except Exception as e:
572
+ print(f"MongoDB ping failed after initial connection attempt during finally block: {e}")
573
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
574
  else:
575
+ print("MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
576
+ # Ensure all are None if one is, to avoid partial state
577
+ if client is None or db is None:
578
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
579
+ print("FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
580
 
581
 
582
  @app.on_event("shutdown")
583
  async def shutdown_event():
584
  global client, batch_processor_task
585
+ print("FastAPI app shutting down.")
586
+ # Cancel the batch processor task
587
  if batch_processor_task:
588
  batch_processor_task.cancel()
589
  try:
590
+ # Await the task to ensure it has a chance to clean up/handle cancellation
591
  await batch_processor_task
592
  except asyncio.CancelledError:
593
+ print("Batch processor task for replies cancelled during shutdown.")
594
  except Exception as e:
595
  print(f"Error during batch processor task shutdown: {e}")
596
  traceback.print_exc()
597
  batch_processor_task = None
598
 
599
+ # Close MongoDB client connection
600
  if client:
601
  client.close()
602
+ print("MongoDB client closed.")
603
 
604
 
605
+ # --- API Endpoints ---
606
  @app.get("/health", summary="Health Check")
607
  async def health_check():
608
+ """
609
+ Checks the health of the API, including MongoDB connection and batch processor status.
610
+ """
611
+ db_status = "MongoDB not connected."
612
  db_ok = False
613
+ if client is not None and db is not None:
614
  try:
615
+ # Attempt a simple database operation to confirm connectivity
616
+ # For async functions, ensure you use await asyncio.to_thread for blocking MongoDB operations
617
+ await asyncio.to_thread(db.list_collection_names)
618
  db_status = "MongoDB connection OK."
619
  db_ok = True
620
  except Exception as e:
621
  db_status = f"MongoDB connection error: {e}"
622
+ db_ok = False # Explicitly set to False on error
623
 
624
+ batch_processor_status = "Batch processor not running."
625
+ if batch_processor_task is not None:
626
  if not batch_processor_task.done():
627
+ batch_processor_status = "Batch processor is running."
628
  else:
629
+ # Check if it finished with an exception
630
+ if batch_processor_task.exception():
631
+ batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}"
632
+ else:
633
+ batch_processor_status = "Batch processor task is done (may have completed or cancelled)."
634
+ else:
635
+ batch_processor_status = "Batch processor task has not been initialized."
636
+
637
  if db_ok:
638
+ return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
639
  else:
640
+ # If DB is not OK, return a 503 Service Unavailable
641
  raise HTTPException(
642
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
643
+ detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status}
644
  )
645
 
646
 
647
  @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
648
  async def extract_email_data(request: ProcessEmailRequest):
649
+ """
650
+ Receives an email, extracts contacts, appointments, and tasks using an LLM,
651
+ and stores the extracted data in MongoDB.
652
+ """
653
  if extracted_emails_collection is None:
654
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.")
655
  try:
656
  current_date_val = date.today()
657
+ # Call the internal processing function in a separate thread to not block the event loop
658
  extracted_data = await asyncio.to_thread(
659
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
660
  )
661
+
662
+ # Prepare data for MongoDB insertion: convert date objects to datetime for storage
663
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
664
+ if 'appointments' in extracted_data_dict:
665
+ for appt in extracted_data_dict['appointments']:
666
+ # MongoDB stores dates as datetime.datetime, so convert
667
+ if isinstance(appt.get('start_date'), date):
668
+ appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
669
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
670
+ appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
671
+ if 'tasks' in extracted_data_dict:
672
+ for task_item in extracted_data_dict['tasks']:
673
+ if isinstance(task_item.get('due_date'), date):
674
+ task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
675
 
676
+ # Insert into MongoDB
677
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
678
+
679
+ # Update the Pydantic model's ID with the generated MongoDB ObjectId for the response
680
+ # PyObjectId's __get_validators__ should handle this conversion from ObjectId to str
681
+ extracted_data.id = result.inserted_id
682
  return extracted_data
683
  except ValueError as e:
684
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
685
  except Exception as e:
686
  traceback.print_exc()
687
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during data extraction: {e}")
688
 
689
 
690
  @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
691
  async def extract_email_data_excel(request: ProcessEmailRequest):
692
+ """
693
+ Placeholder for future functionality to extract data and provide as an Excel download.
694
+ Currently disabled.
695
+ """
696
+ raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Excel functionality is currently disabled.")
697
 
698
 
699
  @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
700
  async def generate_email_reply(request: GenerateReplyRequest):
701
+ """
702
+ Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
703
+ Uses a batch processing system with caching for efficiency.
704
+ """
705
  if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
706
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Reply generation service not fully initialized. Check server logs for database or batch processor issues.")
707
 
708
  future = asyncio.Future()
709
  current_time = asyncio.get_event_loop().time()
710
 
711
  async with reply_queue_condition:
712
  reply_request_queue.append((request, future, current_time))
713
+ reply_queue_condition.notify() # Notify the batch processor that a new request is available
714
 
715
  try:
716
+ # Give a reasonable timeout for the client to wait for a reply
717
+ # This timeout should be greater than BATCH_TIMEOUT
718
+ client_timeout = BATCH_TIMEOUT + 10.0 # e.g., 0.5s batch + 10s LLM response buffer
719
  result = await asyncio.wait_for(future, timeout=client_timeout)
720
  return result
721
  except asyncio.TimeoutError:
722
+ # If the client times out, ensure the future is cancelled if not already done
723
  if not future.done():
724
  future.cancel()
725
+ raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long.")
726
  except Exception as e:
727
+ # Re-raise HTTPException if it's already one, otherwise wrap in 500
728
  if isinstance(e, HTTPException):
729
  raise e
730
  traceback.print_exc()
731
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error processing your reply request: {str(e)}")
732
 
733
 
734
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
735
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
736
  if extracted_emails_collection is None:
737
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying extracted emails.")
738
  mongo_query: Dict[str, Any] = {}
739
+ if query_params.contact_name:
740
+ mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"} # Case-insensitive regex
741
+ if query_params.appointment_title:
742
+ mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
743
+ if query_params.task_title:
744
+ mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
745
 
746
  if query_params.from_date or query_params.to_date:
747
  date_query: Dict[str, datetime] = {}
748
+ if query_params.from_date:
749
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
750
+ if query_params.to_date:
751
+ # Query up to the end of the 'to_date' day
752
+ date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
753
+ if date_query :
754
+ mongo_query["processed_at"] = date_query
755
 
756
  try:
757
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
758
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
759
+
760
  results = []
761
  for doc_raw in extracted_docs_raw:
762
  # Convert _id to string for Pydantic model if it's an ObjectId
763
+ # PyObjectId type hint handles this on model parsing
764
+ # if isinstance(doc_raw.get("_id"), ObjectId):
765
+ # doc_raw["_id"] = str(doc_raw["_id"])
766
+
767
+ # Convert datetime objects from MongoDB back to date objects for Pydantic model fields that are `date`
768
  if 'appointments' in doc_raw:
769
  for appt in doc_raw['appointments']:
770
  if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
 
776
  return results
777
  except Exception as e:
778
  traceback.print_exc()
779
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying extracted emails: {e}")
780
 
781
 
782
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
783
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
784
  if generated_replies_collection is None:
785
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying generated replies.")
786
  mongo_query: Dict[str, Any] = {}
787
  if query_params.language: mongo_query["language"] = query_params.language
788
  if query_params.style: mongo_query["style"] = query_params.style
 
790
 
791
  if query_params.from_date or query_params.to_date:
792
  date_query: Dict[str, datetime] = {}
793
+ if query_params.from_date:
794
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
795
+ if query_params.to_date:
796
+ # Query up to the end of the 'to_date' day
797
+ date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
798
+ if date_query:
799
+ mongo_query["generated_at"] = date_query
800
+
801
  try:
802
  cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
803
  generated_docs_raw = await asyncio.to_thread(list, cursor)
804
  results = []
805
  for doc_raw in generated_docs_raw:
806
+ # PyObjectId type hint handles this on model parsing
807
+ # if isinstance(doc_raw.get("_id"), ObjectId):
808
+ # doc_raw["_id"] = str(doc_raw["_id"])
809
  results.append(GeneratedReplyData(**doc_raw))
810
  return results
811
  except Exception as e:
812
  traceback.print_exc()
813
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying generated replies: {e}")