precison9 commited on
Commit
20cb3f9
·
verified ·
1 Parent(s): e99343e

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +169 -344
flask_Character.py CHANGED
@@ -301,14 +301,14 @@ If a category has no items, its list should be empty (e.g., "contacts": []).
301
  Here is the required JSON schema for each category:
302
 
303
  - **contacts**: List of Contact objects.
304
- Each Contact object must have:
305
  - `name` (string, full name)
306
  - `last_name` (string, last name) - You should infer this from the full name.
307
  - `email` (string, optional, null if not present)
308
  - `phone_number` (string, optional, null if not present)
309
 
310
  - **appointments**: List of Appointment objects.
311
- Each Appointment object must have:
312
  - `title` (string, short, meaningful title in Italian based on the meeting's purpose)
313
  - `description` (string, summary of the meeting's goal)
314
  - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
@@ -317,7 +317,7 @@ Here is the required JSON schema for each category:
317
  - `end_time` (string, optional, e.g., "11:00 AM", null if not present)
318
 
319
  - **tasks**: List of Task objects.
320
- Each Task object must have:
321
  - `task_title` (string, short summary of action item)
322
  - `task_description` (string, more detailed explanation)
323
  - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
@@ -390,163 +390,11 @@ def _generate_response_internal(
390
  traceback.print_exc() # Print full traceback to logs
391
  raise # Re-raise the exception so it can be caught by handle_single_reply_request
392
 
393
- # --- Batching and Caching Configuration ---
394
- MAX_BATCH_SIZE = 20
395
- BATCH_TIMEOUT = 0.5 # seconds (Adjust based on expected LLM response time and desired latency)
396
 
397
- reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
398
- reply_queue_lock = asyncio.Lock()
399
- reply_queue_condition = asyncio.Condition(lock=reply_queue_lock)
400
- batch_processor_task: Optional[asyncio.Task] = None
401
-
402
-
403
- # --- Batch Processor and Handler ---
404
- async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future):
405
- """Handles a single request: checks cache, calls LLM, stores result, and sets future."""
406
- print(f"[{datetime.now()}] Handle single reply: Starting for email_text_start='{request_data.email_text[:50]}'...")
407
- if future.cancelled():
408
- print(f"[{datetime.now()}] Handle single reply: Future cancelled. Aborting.")
409
- return
410
- try:
411
- if generated_replies_collection is None:
412
- print(f"[{datetime.now()}] Handle single reply: DB collection 'generated_replies_collection' is None.")
413
- if not future.done():
414
- future.set_exception(HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service not available for caching/storage."))
415
- return
416
-
417
- cache_query = {
418
- "original_email_text": request_data.email_text,
419
- "language": request_data.language,
420
- "length": request_data.length,
421
- "style": request_data.style,
422
- "tone": request_data.tone,
423
- "emoji": request_data.emoji,
424
- }
425
- print(f"[{datetime.now()}] Handle single reply: Checking cache for reply...")
426
- # Use await asyncio.to_thread for blocking MongoDB operations
427
- cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
428
-
429
- if cached_reply_doc:
430
- print(f"[{datetime.now()}] Handle single reply: Reply found in cache. ID: {str(cached_reply_doc['_id'])}")
431
- response = {
432
- "reply": cached_reply_doc["generated_reply_text"],
433
- "stored_id": str(cached_reply_doc["_id"]),
434
- "cached": True
435
- }
436
- if not future.done():
437
- future.set_result(response)
438
- print(f"[{datetime.now()}] Handle single reply: Cache result set on future.")
439
- return
440
-
441
- print(f"[{datetime.now()}] Handle single reply: Reply not in cache. Calling LLM...")
442
- reply_content = await asyncio.to_thread(
443
- _generate_response_internal,
444
- request_data.email_text,
445
- request_data.groq_api_key,
446
- request_data.language,
447
- request_data.length,
448
- request_data.style,
449
- request_data.tone,
450
- request_data.emoji
451
- )
452
- print(f"[{datetime.now()}] Handle single reply: LLM call completed. Reply length: {len(reply_content)}.")
453
-
454
- reply_data_to_store = GeneratedReplyData(
455
- original_email_text=request_data.email_text,
456
- generated_reply_text=reply_content,
457
- language=request_data.language,
458
- length=request_data.length,
459
- style=request_data.style,
460
- tone=request_data.tone,
461
- emoji=request_data.emoji
462
- )
463
- print(f"[{datetime.now()}] Handle single reply: Storing reply in DB...")
464
- # Use model_dump for Pydantic v2
465
- reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
466
-
467
- insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
468
- stored_id = str(insert_result.inserted_id)
469
- print(f"[{datetime.now()}] Handle single reply: Reply stored in DB. ID: {stored_id}")
470
-
471
- final_response = {
472
- "reply": reply_content,
473
- "stored_id": stored_id,
474
- "cached": False
475
- }
476
- if not future.done():
477
- future.set_result(final_response)
478
- print(f"[{datetime.now()}] Handle single reply: Final result set on future.")
479
-
480
- except Exception as e:
481
- print(f"[{datetime.now()}] Handle single reply: EXCEPTION: {e}")
482
- traceback.print_exc() # Print full traceback to logs
483
- if not future.done():
484
- # Set the exception on the future so the client can catch it
485
- future.set_exception(HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate reply: {e}"))
486
- print(f"[{datetime.now()}] Handle single reply: Exception set on future.")
487
-
488
-
489
- async def process_reply_batches():
490
- """Continuously processes requests from the reply_request_queue in batches."""
491
- global reply_request_queue
492
- print(f"[{datetime.now()}] Batch processor task started.")
493
- while True:
494
- batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
495
- async with reply_queue_condition:
496
- if not reply_request_queue:
497
- print(f"[{datetime.now()}] Batch processor: Queue empty, waiting for requests...")
498
- # Wait for new requests or timeout
499
- await reply_queue_condition.wait()
500
- # After waking up, re-check if queue is still empty
501
- if not reply_request_queue:
502
- print(f"[{datetime.now()}] Batch processor: Woke up, queue still empty. Continuing loop.")
503
- continue
504
-
505
- now = asyncio.get_event_loop().time()
506
- # Safety check: ensure queue is not empty before accessing index 0
507
- if reply_request_queue:
508
- oldest_item_timestamp = reply_request_queue[0][2]
509
- else:
510
- # If queue became empty while waiting, loop again
511
- print(f"[{datetime.now()}] Batch processor: Queue became empty before processing. Restarting loop.")
512
- continue
513
-
514
- print(f"[{datetime.now()}] Batch processor: Woke up. Queue size: {len(reply_request_queue)}. Oldest item age: {now - oldest_item_timestamp:.2f}s")
515
-
516
- # Condition to trigger batch processing: queue is full OR timeout reached for oldest item
517
- if len(reply_request_queue) >= MAX_BATCH_SIZE or \
518
- (now - oldest_item_timestamp >= BATCH_TIMEOUT):
519
- num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
520
- for _ in range(num_to_take):
521
- # Safety check: ensure queue is not empty before popping
522
- if reply_request_queue:
523
- req, fut, _ = reply_request_queue.pop(0)
524
- batch_to_fire.append((req, fut))
525
- print(f"[{datetime.now()}] Batch processor: Firing batch of {len(batch_to_fire)} requests.")
526
- else:
527
- # Calculate time to wait for the next batch or timeout
528
- time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
529
- print(f"[{datetime.now()}] Batch processor: Not enough requests or timeout not reached. Waiting for {time_to_wait:.2f}s.")
530
- try:
531
- await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
532
- except asyncio.TimeoutError:
533
- print(f"[{datetime.now()}] Batch processor: wait timed out.")
534
- pass # Loop will re-evaluate and likely fire the batch
535
-
536
- if batch_to_fire:
537
- tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
538
- print(f"[{datetime.now()}] Batch processor: Awaiting completion of {len(tasks)} single reply tasks.")
539
- await asyncio.gather(*tasks)
540
- print(f"[{datetime.now()}] Batch processor: Batch processing complete.")
541
- else:
542
- # Short sleep to prevent busy-waiting if queue is empty but not waiting
543
- await asyncio.sleep(0.001)
544
-
545
-
546
- # ---------------------- FastAPI Application ----------------------
547
  app = FastAPI(
548
  title="Email Assistant API",
549
- description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
550
  version="1.1.0",
551
  docs_url="/", # Sets Swagger UI to be the root path
552
  redoc_url="/redoc"
@@ -574,10 +422,10 @@ async def global_exception_handler_wrapper(request, exc):
574
  )
575
 
576
 
577
- # --- FastAPI Event Handlers for MongoDB & Batch Processor ---
578
  @app.on_event("startup")
579
  async def startup_event():
580
- global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
581
  print(f"[{datetime.now()}] FastAPI app startup sequence initiated.")
582
  try:
583
  # Connect to MongoDB
@@ -588,13 +436,6 @@ async def startup_event():
588
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
589
  print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}")
590
 
591
- # Start the batch processor task if not already running
592
- if batch_processor_task is None or batch_processor_task.done():
593
- batch_processor_task = asyncio.create_task(process_reply_batches())
594
- print(f"[{datetime.now()}] Batch processor task for replies started.")
595
- else:
596
- print(f"[{datetime.now()}] Batch processor task for replies is already running or being initialized.")
597
-
598
  except (ConnectionFailure, OperationFailure) as e:
599
  print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}")
600
  client = None
@@ -602,7 +443,7 @@ async def startup_event():
602
  extracted_emails_collection = None
603
  generated_replies_collection = None
604
  except Exception as e:
605
- print(f"[{datetime.now()}] ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}")
606
  traceback.print_exc()
607
  client = None
608
  db = None
@@ -619,25 +460,13 @@ async def startup_event():
619
  print(f"[{datetime.now()}] MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
620
  if client is None or db is None:
621
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
622
- print(f"[{datetime.now()}] FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
623
 
624
 
625
  @app.on_event("shutdown")
626
  async def shutdown_event():
627
- global client, batch_processor_task
628
  print(f"[{datetime.now()}] FastAPI app shutting down.")
629
- if batch_processor_task:
630
- batch_processor_task.cancel()
631
- try:
632
- await batch_processor_task
633
- print(f"[{datetime.now()}] Batch processor task awaited.")
634
- except asyncio.CancelledError:
635
- print(f"[{datetime.now()}] Batch processor task for replies cancelled during shutdown.")
636
- except Exception as e:
637
- print(f"[{datetime.now()}] Error during batch processor task shutdown: {e}")
638
- traceback.print_exc()
639
- batch_processor_task = None
640
-
641
  if client:
642
  client.close()
643
  print(f"[{datetime.now()}] MongoDB client closed.")
@@ -647,7 +476,7 @@ async def shutdown_event():
647
  @app.get("/health", summary="Health Check")
648
  async def health_check():
649
  """
650
- Checks the health of the API, including MongoDB connection and batch processor status.
651
  """
652
  db_status = "MongoDB not connected."
653
  db_ok = False
@@ -661,206 +490,202 @@ async def health_check():
661
  db_status = f"MongoDB connection error: {e}"
662
  db_ok = False
663
 
664
- batch_processor_status = "Batch processor not running."
665
- if batch_processor_task is not None:
666
- if not batch_processor_task.done():
667
- batch_processor_status = "Batch processor is running."
668
- else:
669
- if batch_processor_task.exception():
670
- batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}"
671
- else:
672
- batch_processor_status = "Batch processor task is done (may have completed or cancelled)."
673
- else:
674
- batch_processor_status = "Batch processor task has not been initialized."
675
-
676
  if db_ok:
677
- return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
678
  else:
679
  raise HTTPException(
680
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
681
- detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status}
682
  )
683
 
684
 
685
- @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
686
- async def extract_email_data(request: ProcessEmailRequest):
687
  """
688
- Receives an email, extracts contacts, appointments, and tasks using an LLM,
689
- and stores the extracted data in MongoDB.
690
  """
691
- print(f"[{datetime.now()}] /extract-data: Received request.")
692
- if extracted_emails_collection is None:
693
- print(f"[{datetime.now()}] /extract-data: MongoDB collection is None.")
694
- raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.")
695
  try:
696
- current_date_val = date.today()
697
- print(f"[{datetime.now()}] /extract-data: Calling internal processing function.")
698
- extracted_data = await asyncio.to_thread(
699
- _process_email_internal, request.email_text, request.groq_api_key, current_date_val
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
700
  )
701
- print(f"[{datetime.now()}] /extract-data: Internal processing complete. Preparing for DB insert.")
702
 
703
- extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
704
- # Convert date objects to datetime for MongoDB storage if they are just date objects
705
- # Pydantic's default `date` handling might serialize to ISO string, but for
706
- # internal MongoDB storage, sometimes `datetime` is preferred for consistency.
707
- if 'appointments' in extracted_data_dict:
708
- for appt in extracted_data_dict['appointments']:
709
- if isinstance(appt.get('start_date'), date):
710
- appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
711
- if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
712
- appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
713
- if 'tasks' in extracted_data_dict:
714
- for task_item in extracted_data_dict['tasks']:
715
- if isinstance(task_item.get('due_date'), date):
716
- task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
717
 
718
- print(f"[{datetime.now()}] /extract-data: Inserting into MongoDB...")
719
- result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
720
- print(f"[{datetime.now()}] /extract-data: Data inserted into MongoDB. ID: {result.inserted_id}")
721
 
722
- extracted_data.id = result.inserted_id
723
- return extracted_data
724
- except ValueError as e:
725
- print(f"[{datetime.now()}] /extract-data: ValueError: {e}")
726
- raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
 
 
 
727
  except Exception as e:
728
- print(f"[{datetime.now()}] /extract-data: Unhandled Exception: {e}")
729
  traceback.print_exc()
730
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during data extraction: {e}")
731
-
732
 
733
- @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
734
- async def extract_email_data_excel(request: ProcessEmailRequest):
735
- """
736
- Placeholder for future functionality to extract data and provide as an Excel download.
737
- Currently disabled.
738
- """
739
- raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Excel functionality is currently disabled.")
740
-
741
-
742
- @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email (batched & cached)")
743
- async def generate_email_reply(request: GenerateReplyRequest):
744
  """
745
- Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
746
- Uses a batch processing system with caching for efficiency.
747
  """
748
- print(f"[{datetime.now()}] /generate-reply: Received request.")
749
- if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
750
- print(f"[{datetime.now()}] /generate-reply: Service not initialized. gen_replies_coll={generated_replies_collection is not None}, batch_task={batch_processor_task is not None}, queue_cond={reply_queue_condition is not None}")
751
- raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Reply generation service not fully initialized. Check server logs for database or batch processor issues.")
752
-
753
- future = asyncio.Future()
754
- current_time = asyncio.get_event_loop().time()
755
-
756
- async with reply_queue_condition:
757
- reply_request_queue.append((request, future, current_time))
758
- reply_queue_condition.notify() # Notify the batch processor that a new request is available
759
- print(f"[{datetime.now()}] /generate-reply: Request added to queue, notifying batch processor. Queue size: {len(reply_request_queue)}")
760
 
 
761
  try:
762
- # Debugging: Increase timeout significantly to allow full tracing in logs
763
- client_timeout = BATCH_TIMEOUT + 60.0 # Example: 0.5s batch + 60s LLM response buffer = 60.5s total timeout
764
- print(f"[{datetime.now()}] /generate-reply: Waiting for future result with timeout {client_timeout}s.")
765
- result = await asyncio.wait_for(future, timeout=client_timeout)
766
- print(f"[{datetime.now()}] /generate-reply: Future result received. Returning data.")
767
- return result
768
- except asyncio.TimeoutError:
769
- print(f"[{datetime.now()}] /generate-reply: Client timeout waiting for future after {client_timeout}s. Future done: {future.done()}")
770
- if not future.done():
771
- future.cancel() # Cancel if it's still pending
772
- raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long. Check server logs for more details.")
 
 
 
 
 
 
 
 
 
773
  except Exception as e:
774
- if isinstance(e, HTTPException):
775
- print(f"[{datetime.now()}] /generate-reply: Caught HTTPException: {e.status_code} - {e.detail}")
776
- raise e # Re-raise FastAPI HTTPExceptions
777
- print(f"[{datetime.now()}] /generate-reply: Unhandled Exception: {e}")
778
- traceback.print_exc()
779
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error processing your reply request: {str(e)}. Check server logs for more details.")
780
 
781
 
782
- @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
783
- async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
784
- print(f"[{datetime.now()}] /query-extracted-emails: Received request with params: {query_params.model_dump_json()}")
 
 
785
  if extracted_emails_collection is None:
786
- print(f"[{datetime.now()}] /query-extracted-emails: MongoDB collection is None.")
787
- raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying extracted emails.")
788
- mongo_query: Dict[str, Any] = {}
789
  if query_params.contact_name:
790
- mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"} # Case-insensitive regex
 
 
 
 
791
  if query_params.appointment_title:
792
  mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
793
  if query_params.task_title:
794
  mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
795
 
796
- if query_params.from_date or query_params.to_date:
797
- date_query: Dict[str, datetime] = {}
798
- if query_params.from_date:
799
- date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
800
- if query_params.to_date:
801
- # Query up to the end of the 'to_date' day
802
- date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
803
- if date_query :
804
- mongo_query["processed_at"] = date_query
805
- print(f"[{datetime.now()}] /query-extracted-emails: MongoDB query built: {mongo_query}")
806
 
807
  try:
808
- # Use await asyncio.to_thread for blocking MongoDB operations
809
- cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
810
- extracted_docs_raw = await asyncio.to_thread(list, cursor)
811
- print(f"[{datetime.now()}] /query-extracted-emails: Found {len(extracted_docs_raw)} documents.")
812
-
813
- results = []
814
- for doc_raw in extracted_docs_raw:
815
- # Convert datetime objects back to date for Pydantic model validation if necessary
816
- if 'appointments' in doc_raw:
817
- for appt in doc_raw['appointments']:
818
- if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
819
- if isinstance(appt.get('end_date'), datetime): appt['end_date'] = appt['end_date'].date()
820
- if 'tasks' in doc_raw:
821
- for task_item in doc_raw['tasks']:
822
- if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
823
- results.append(ExtractedData(**doc_raw))
824
- print(f"[{datetime.now()}] /query-extracted-emails: Returning {len(results)} results.")
825
- return results
826
  except Exception as e:
827
- print(f"[{datetime.now()}] /query-extracted-emails: Unhandled Exception during query: {e}")
828
  traceback.print_exc()
829
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying extracted emails: {e}")
830
 
831
 
832
- @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
833
- async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
834
- print(f"[{datetime.now()}] /query-generated-replies: Received request with params: {query_params.model_dump_json()}")
 
 
835
  if generated_replies_collection is None:
836
- print(f"[{datetime.now()}] /query-generated-replies: MongoDB collection is None.")
837
- raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying generated replies.")
838
- mongo_query: Dict[str, Any] = {}
839
- if query_params.language: mongo_query["language"] = query_params.language
840
- if query_params.style: mongo_query["style"] = query_params.style
841
- if query_params.tone: mongo_query["tone"] = query_params.tone
842
-
843
- if query_params.from_date or query_params.to_date:
844
- date_query: Dict[str, datetime] = {}
845
- if query_params.from_date:
846
- date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
847
- if query_params.to_date:
848
- date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
849
- if date_query:
850
- mongo_query["generated_at"] = date_query
851
- print(f"[{datetime.now()}] /query-generated-replies: MongoDB query built: {mongo_query}")
 
 
852
 
853
  try:
854
- # Use await asyncio.to_thread for blocking MongoDB operations
855
- cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
856
- generated_docs_raw = await asyncio.to_thread(list, cursor)
857
- print(f"[{datetime.now()}] /query-generated-replies: Found {len(generated_docs_raw)} documents.")
858
- results = []
859
- for doc_raw in generated_docs_raw:
860
- results.append(GeneratedReplyData(**doc_raw))
861
- print(f"[{datetime.now()}] /query-generated-replies: Returning {len(results)} results.")
862
- return results
863
  except Exception as e:
864
- print(f"[{datetime.now()}] /query-generated-replies: Unhandled Exception during query: {e}")
865
  traceback.print_exc()
866
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying generated replies: {e}")
 
301
  Here is the required JSON schema for each category:
302
 
303
  - **contacts**: List of Contact objects.
304
+ Each Contact object must have:
305
  - `name` (string, full name)
306
  - `last_name` (string, last name) - You should infer this from the full name.
307
  - `email` (string, optional, null if not present)
308
  - `phone_number` (string, optional, null if not present)
309
 
310
  - **appointments**: List of Appointment objects.
311
+ Each Appointment object must have:
312
  - `title` (string, short, meaningful title in Italian based on the meeting's purpose)
313
  - `description` (string, summary of the meeting's goal)
314
  - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
 
317
  - `end_time` (string, optional, e.g., "11:00 AM", null if not present)
318
 
319
  - **tasks**: List of Task objects.
320
+ Each Task object must have:
321
  - `task_title` (string, short summary of action item)
322
  - `task_description` (string, more detailed explanation)
323
  - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
 
390
  traceback.print_exc() # Print full traceback to logs
391
  raise # Re-raise the exception so it can be caught by handle_single_reply_request
392
 
 
 
 
393
 
394
+ # --- FastAPI Application ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
395
  app = FastAPI(
396
  title="Email Assistant API",
397
+ description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, and caching.",
398
  version="1.1.0",
399
  docs_url="/", # Sets Swagger UI to be the root path
400
  redoc_url="/redoc"
 
422
  )
423
 
424
 
425
+ # --- FastAPI Event Handlers for MongoDB ---
426
  @app.on_event("startup")
427
  async def startup_event():
428
+ global client, db, extracted_emails_collection, generated_replies_collection
429
  print(f"[{datetime.now()}] FastAPI app startup sequence initiated.")
430
  try:
431
  # Connect to MongoDB
 
436
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
437
  print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}")
438
 
 
 
 
 
 
 
 
439
  except (ConnectionFailure, OperationFailure) as e:
440
  print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}")
441
  client = None
 
443
  extracted_emails_collection = None
444
  generated_replies_collection = None
445
  except Exception as e:
446
+ print(f"[{datetime.now()}] ERROR: An unexpected error occurred during MongoDB connection startup: {e}")
447
  traceback.print_exc()
448
  client = None
449
  db = None
 
460
  print(f"[{datetime.now()}] MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
461
  if client is None or db is None:
462
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
463
+ print(f"[{datetime.now()}] FastAPI app startup sequence completed for MongoDB client initialization.")
464
 
465
 
466
  @app.on_event("shutdown")
467
  async def shutdown_event():
468
+ global client
469
  print(f"[{datetime.now()}] FastAPI app shutting down.")
 
 
 
 
 
 
 
 
 
 
 
 
470
  if client:
471
  client.close()
472
  print(f"[{datetime.now()}] MongoDB client closed.")
 
476
  @app.get("/health", summary="Health Check")
477
  async def health_check():
478
  """
479
+ Checks the health of the API, including MongoDB connection.
480
  """
481
  db_status = "MongoDB not connected."
482
  db_ok = False
 
490
  db_status = f"MongoDB connection error: {e}"
491
  db_ok = False
492
 
 
 
 
 
 
 
 
 
 
 
 
 
493
  if db_ok:
494
+ return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status}
495
  else:
496
  raise HTTPException(
497
+ status_code=503,
498
+ detail={"message": "Service unavailable.", "database": db_status}
499
  )
500
 
501
 
502
+ @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email")
503
+ async def generate_email_reply(request: GenerateReplyRequest):
504
  """
505
+ Generates a smart reply to the provided email text using an LLM.
506
+ The generated reply is also stored in MongoDB for caching and historical purposes.
507
  """
508
+ if generated_replies_collection is None:
509
+ raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.")
510
+
 
511
  try:
512
+ # Check cache first
513
+ cache_query = {
514
+ "original_email_text": request.email_text,
515
+ "language": request.language,
516
+ "length": request.length,
517
+ "style": request.style,
518
+ "tone": request.tone,
519
+ "emoji": request.emoji,
520
+ }
521
+ print(f"[{datetime.now()}] /generate-reply: Checking cache for reply...")
522
+ # Use asyncio.to_thread for blocking MongoDB operations
523
+ cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
524
+
525
+ if cached_reply_doc:
526
+ print(f"[{datetime.now()}] /generate-reply: Reply found in cache. ID: {str(cached_reply_doc['_id'])}")
527
+ return GenerateReplyResponse(
528
+ reply=cached_reply_doc["generated_reply_text"],
529
+ stored_id=str(cached_reply_doc["_id"]),
530
+ cached=True
531
+ )
532
+
533
+ # If not in cache, directly call the internal LLM function
534
+ print(f"[{datetime.now()}] /generate-reply: Reply not in cache. Calling LLM for generation...")
535
+ reply_content = await asyncio.to_thread(
536
+ _generate_response_internal,
537
+ request.email_text,
538
+ request.groq_api_key,
539
+ request.language,
540
+ request.length,
541
+ request.style,
542
+ request.tone,
543
+ request.emoji
544
  )
545
+ print(f"[{datetime.now()}] /generate-reply: LLM call completed. Storing newly generated reply in MongoDB.")
546
 
547
+ # Prepare data for storage
548
+ reply_data_to_store = GeneratedReplyData(
549
+ original_email_text=request.email_text,
550
+ generated_reply_text=reply_content,
551
+ language=request.language,
552
+ length=request.length,
553
+ style=request.style,
554
+ tone=request.tone,
555
+ emoji=request.emoji
556
+ )
557
+ # Use model_dump for Pydantic v2. Exclude 'id' as it's generated by MongoDB.
558
+ reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
 
 
559
 
560
+ # Insert into MongoDB
561
+ insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
562
+ stored_id = str(insert_result.inserted_id) # Convert ObjectId to string for the response
563
 
564
+ print(f"[{datetime.now()}] /generate-reply: Reply stored in MongoDB. ID: {stored_id}")
565
+
566
+ # Return the response as per GenerateReplyResponse model
567
+ return GenerateReplyResponse(
568
+ reply=reply_content,
569
+ stored_id=stored_id,
570
+ cached=False # Always False since we just generated it
571
+ )
572
  except Exception as e:
 
573
  traceback.print_exc()
574
+ # Ensure consistent error response
575
+ raise HTTPException(status_code=500, detail=f"Error generating or storing reply: {str(e)}")
576
 
577
+ @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email")
578
+ async def extract_email_data(request: ProcessEmailRequest):
 
 
 
 
 
 
 
 
 
579
  """
580
+ Extracts contacts, appointments, and tasks from the provided email text.
 
581
  """
582
+ if extracted_emails_collection is None:
583
+ raise HTTPException(status_code=503, detail="MongoDB not available.")
584
+
585
+ current_date = date.today() # Get current date for context
 
 
 
 
 
 
 
 
586
 
587
+ print(f"[{datetime.now()}] /extract-data: Received request.")
588
  try:
589
+ print(f"[{datetime.now()}] /extract-data: Calling internal processing function.")
590
+ # Run blocking LLM call in a thread pool
591
+ extracted_data = await asyncio.to_thread(_process_email_internal, request.email_text, request.groq_api_key, current_date)
592
+
593
+ print(f"[{datetime.now()}] /extract-data: Internal processing complete. Preparing for DB insert.")
594
+ # Convert Pydantic model to dictionary for MongoDB insert, handling _id alias
595
+ # Use model_dump for Pydantic v2
596
+ data_to_insert = extracted_data.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
597
+
598
+ print(f"[{datetime.now()}] /extract-data: Inserting into MongoDB...")
599
+ # Use asyncio.to_thread for blocking MongoDB insert operation
600
+ insert_result = await asyncio.to_thread(extracted_emails_collection.insert_one, data_to_insert)
601
+
602
+ # Update the extracted_data object with the MongoDB-generated ID
603
+ extracted_data.id = str(insert_result.inserted_id)
604
+ print(f"[{datetime.now()}] /extract-data: Data inserted into MongoDB. ID: {extracted_data.id}")
605
+
606
+ return extracted_data
607
+ except ValueError as ve:
608
+ raise HTTPException(status_code=400, detail=str(ve))
609
  except Exception as e:
610
+ traceback.print_exc() # Print full traceback for debugging
611
+ raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}")
 
 
 
 
612
 
613
 
614
+ @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query stored extracted email data")
615
+ async def query_extracted_emails(query_params: ExtractedEmailQuery = Depends()):
616
+ """
617
+ Queries extracted email data from MongoDB based on various filters.
618
+ """
619
  if extracted_emails_collection is None:
620
+ raise HTTPException(status_code=503, detail="MongoDB not available.")
621
+
622
+ mongo_query = {}
623
  if query_params.contact_name:
624
+ # Case-insensitive partial match on contact name or last name
625
+ mongo_query["$or"] = [
626
+ {"contacts.name": {"$regex": query_params.contact_name, "$options": "i"}},
627
+ {"contacts.last_name": {"$regex": query_params.contact_name, "$options": "i"}}
628
+ ]
629
  if query_params.appointment_title:
630
  mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
631
  if query_params.task_title:
632
  mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
633
 
634
+ # Date range filtering for processed_at
635
+ date_query = {}
636
+ if query_params.from_date:
637
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
638
+ if query_params.to_date:
639
+ date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
640
+ if date_query:
641
+ mongo_query["processed_at"] = date_query
 
 
642
 
643
  try:
644
+ # Use asyncio.to_thread for blocking MongoDB find operation
645
+ cursor = await asyncio.to_thread(extracted_emails_collection.find, mongo_query)
646
+ # Use to_list to limit results and convert to list
647
+ results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
648
+
649
+ # Convert MongoDB documents to ExtractedData Pydantic models
650
+ return [ExtractedData(**doc) for doc in results]
 
 
 
 
 
 
 
 
 
 
 
651
  except Exception as e:
 
652
  traceback.print_exc()
653
+ raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
654
 
655
 
656
+ @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query stored generated replies")
657
+ async def query_generated_replies(query_params: GeneratedReplyQuery = Depends()):
658
+ """
659
+ Queries generated email replies from MongoDB based on various filters.
660
+ """
661
  if generated_replies_collection is None:
662
+ raise HTTPException(status_code=503, detail="MongoDB not available.")
663
+
664
+ mongo_query = {}
665
+ if query_params.language:
666
+ mongo_query["language"] = query_params.language
667
+ if query_params.style:
668
+ mongo_query["style"] = query_params.style
669
+ if query_params.tone:
670
+ mongo_query["tone"] = query_params.tone
671
+
672
+ # Date range filtering for generated_at
673
+ date_query = {}
674
+ if query_params.from_date:
675
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
676
+ if query_params.to_date:
677
+ date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
678
+ if date_query:
679
+ mongo_query["generated_at"] = date_query
680
 
681
  try:
682
+ # Use asyncio.to_thread for blocking MongoDB find operation
683
+ cursor = await asyncio.to_thread(generated_replies_collection.find, mongo_query)
684
+ # Use to_list to limit results and convert to list
685
+ results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
686
+
687
+ # Convert MongoDB documents to GeneratedReplyData Pydantic models
688
+ return [GeneratedReplyData(**doc) for doc in results]
 
 
689
  except Exception as e:
 
690
  traceback.print_exc()
691
+ raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")