precison9 commited on
Commit
0f1b423
·
verified ·
1 Parent(s): 1a0e62f

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +24 -41
flask_Character.py CHANGED
@@ -60,8 +60,8 @@ class CustomObjectId(str):
60
  # This validator is only called if the field is not None
61
  # Pydantic's Optional[PyObjectId] handles the None case before this validator
62
  if v is None or v == "":
63
- return None # Should not be reached if Optional[PyObjectId] is used correctly
64
-
65
  if not isinstance(v, (str, ObjectId)):
66
  raise ValueError("ObjectId must be a string or ObjectId instance")
67
 
@@ -180,6 +180,12 @@ class GeneratedReplyData(BaseModel):
180
  data["_id"] = str(data["_id"])
181
  return data
182
 
 
 
 
 
 
 
183
  # --- Query Models for GET Endpoints ---
184
  class ExtractedEmailQuery(BaseModel):
185
  contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
@@ -391,7 +397,10 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
391
  return
392
  try:
393
  if generated_replies_collection is None:
394
- raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service not available for caching/storage.")
 
 
 
395
 
396
  cache_query = {
397
  "original_email_text": request_data.email_text,
@@ -469,15 +478,22 @@ async def process_reply_batches():
469
  continue
470
 
471
  now = asyncio.get_event_loop().time()
472
- oldest_item_timestamp = reply_request_queue[0][2]
 
 
 
 
 
473
 
474
  # Condition to trigger batch processing: queue is full OR timeout reached for oldest item
475
  if len(reply_request_queue) >= MAX_BATCH_SIZE or \
476
  (now - oldest_item_timestamp >= BATCH_TIMEOUT):
477
  num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
478
  for _ in range(num_to_take):
479
- req, fut, _ = reply_request_queue.pop(0)
480
- batch_to_fire.append((req, fut))
 
 
481
  else:
482
  # Calculate time to wait for the next batch or timeout
483
  time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
@@ -492,7 +508,6 @@ async def process_reply_batches():
492
  await asyncio.gather(*tasks)
493
  else:
494
  # Short sleep to prevent busy-waiting if queue is empty but not waiting
495
- # (e.g., if a notify happened just before the wait, but queue was already processed)
496
  await asyncio.sleep(0.001)
497
 
498
 
@@ -541,7 +556,6 @@ async def startup_event():
541
  print(f"Successfully connected to MongoDB: {DB_NAME}")
542
 
543
  # Start the batch processor task if not already running
544
- # Using asyncio.create_task is the correct way to run background tasks in FastAPI
545
  if batch_processor_task is None or batch_processor_task.done():
546
  batch_processor_task = asyncio.create_task(process_reply_batches())
547
  print("Batch processor task for replies started.")
@@ -550,7 +564,6 @@ async def startup_event():
550
 
551
  except (ConnectionFailure, OperationFailure) as e:
552
  print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
553
- # Ensure all DB related globals are reset to None if connection fails
554
  client = None
555
  db = None
556
  extracted_emails_collection = None
@@ -563,17 +576,14 @@ async def startup_event():
563
  extracted_emails_collection = None
564
  generated_replies_collection = None
565
  finally:
566
- # Final check and logging for MongoDB connection status
567
  if client is not None and db is not None:
568
  try:
569
- # One last ping to confirm connection before app fully starts
570
  client.admin.command('ping')
571
  except Exception as e:
572
  print(f"MongoDB ping failed after initial connection attempt during finally block: {e}")
573
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
574
  else:
575
  print("MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
576
- # Ensure all are None if one is, to avoid partial state
577
  if client is None or db is None:
578
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
579
  print("FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
@@ -583,11 +593,9 @@ async def startup_event():
583
  async def shutdown_event():
584
  global client, batch_processor_task
585
  print("FastAPI app shutting down.")
586
- # Cancel the batch processor task
587
  if batch_processor_task:
588
  batch_processor_task.cancel()
589
  try:
590
- # Await the task to ensure it has a chance to clean up/handle cancellation
591
  await batch_processor_task
592
  except asyncio.CancelledError:
593
  print("Batch processor task for replies cancelled during shutdown.")
@@ -596,7 +604,6 @@ async def shutdown_event():
596
  traceback.print_exc()
597
  batch_processor_task = None
598
 
599
- # Close MongoDB client connection
600
  if client:
601
  client.close()
602
  print("MongoDB client closed.")
@@ -612,21 +619,18 @@ async def health_check():
612
  db_ok = False
613
  if client is not None and db is not None:
614
  try:
615
- # Attempt a simple database operation to confirm connectivity
616
- # For async functions, ensure you use await asyncio.to_thread for blocking MongoDB operations
617
  await asyncio.to_thread(db.list_collection_names)
618
  db_status = "MongoDB connection OK."
619
  db_ok = True
620
  except Exception as e:
621
  db_status = f"MongoDB connection error: {e}"
622
- db_ok = False # Explicitly set to False on error
623
 
624
  batch_processor_status = "Batch processor not running."
625
  if batch_processor_task is not None:
626
  if not batch_processor_task.done():
627
  batch_processor_status = "Batch processor is running."
628
  else:
629
- # Check if it finished with an exception
630
  if batch_processor_task.exception():
631
  batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}"
632
  else:
@@ -637,7 +641,6 @@ async def health_check():
637
  if db_ok:
638
  return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
639
  else:
640
- # If DB is not OK, return a 503 Service Unavailable
641
  raise HTTPException(
642
  status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
643
  detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status}
@@ -654,16 +657,13 @@ async def extract_email_data(request: ProcessEmailRequest):
654
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.")
655
  try:
656
  current_date_val = date.today()
657
- # Call the internal processing function in a separate thread to not block the event loop
658
  extracted_data = await asyncio.to_thread(
659
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
660
  )
661
 
662
- # Prepare data for MongoDB insertion: convert date objects to datetime for storage
663
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
664
  if 'appointments' in extracted_data_dict:
665
  for appt in extracted_data_dict['appointments']:
666
- # MongoDB stores dates as datetime.datetime, so convert
667
  if isinstance(appt.get('start_date'), date):
668
  appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
669
  if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
@@ -673,11 +673,8 @@ async def extract_email_data(request: ProcessEmailRequest):
673
  if isinstance(task_item.get('due_date'), date):
674
  task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
675
 
676
- # Insert into MongoDB
677
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
678
 
679
- # Update the Pydantic model's ID with the generated MongoDB ObjectId for the response
680
- # PyObjectId's __get_validators__ should handle this conversion from ObjectId to str
681
  extracted_data.id = result.inserted_id
682
  return extracted_data
683
  except ValueError as e:
@@ -696,7 +693,7 @@ async def extract_email_data_excel(request: ProcessEmailRequest):
696
  raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Excel functionality is currently disabled.")
697
 
698
 
699
- @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
700
  async def generate_email_reply(request: GenerateReplyRequest):
701
  """
702
  Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
@@ -713,18 +710,14 @@ async def generate_email_reply(request: GenerateReplyRequest):
713
  reply_queue_condition.notify() # Notify the batch processor that a new request is available
714
 
715
  try:
716
- # Give a reasonable timeout for the client to wait for a reply
717
- # This timeout should be greater than BATCH_TIMEOUT
718
  client_timeout = BATCH_TIMEOUT + 10.0 # e.g., 0.5s batch + 10s LLM response buffer
719
  result = await asyncio.wait_for(future, timeout=client_timeout)
720
  return result
721
  except asyncio.TimeoutError:
722
- # If the client times out, ensure the future is cancelled if not already done
723
  if not future.done():
724
  future.cancel()
725
  raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long.")
726
  except Exception as e:
727
- # Re-raise HTTPException if it's already one, otherwise wrap in 500
728
  if isinstance(e, HTTPException):
729
  raise e
730
  traceback.print_exc()
@@ -759,12 +752,6 @@ async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = De
759
 
760
  results = []
761
  for doc_raw in extracted_docs_raw:
762
- # Convert _id to string for Pydantic model if it's an ObjectId
763
- # PyObjectId type hint handles this on model parsing
764
- # if isinstance(doc_raw.get("_id"), ObjectId):
765
- # doc_raw["_id"] = str(doc_raw["_id"])
766
-
767
- # Convert datetime objects from MongoDB back to date objects for Pydantic model fields that are `date`
768
  if 'appointments' in doc_raw:
769
  for appt in doc_raw['appointments']:
770
  if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
@@ -793,7 +780,6 @@ async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = D
793
  if query_params.from_date:
794
  date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
795
  if query_params.to_date:
796
- # Query up to the end of the 'to_date' day
797
  date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
798
  if date_query:
799
  mongo_query["generated_at"] = date_query
@@ -803,9 +789,6 @@ async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = D
803
  generated_docs_raw = await asyncio.to_thread(list, cursor)
804
  results = []
805
  for doc_raw in generated_docs_raw:
806
- # PyObjectId type hint handles this on model parsing
807
- # if isinstance(doc_raw.get("_id"), ObjectId):
808
- # doc_raw["_id"] = str(doc_raw["_id"])
809
  results.append(GeneratedReplyData(**doc_raw))
810
  return results
811
  except Exception as e:
 
60
  # This validator is only called if the field is not None
61
  # Pydantic's Optional[PyObjectId] handles the None case before this validator
62
  if v is None or v == "":
63
+ return None
64
+
65
  if not isinstance(v, (str, ObjectId)):
66
  raise ValueError("ObjectId must be a string or ObjectId instance")
67
 
 
180
  data["_id"] = str(data["_id"])
181
  return data
182
 
183
+ # NEW: Response Model for /generate-reply endpoint
184
+ class GenerateReplyResponse(BaseModel):
185
+ reply: str = Field(..., description="The AI-generated reply text.")
186
+ stored_id: str = Field(..., description="The MongoDB ID of the stored reply.")
187
+ cached: bool = Field(..., description="True if the reply was retrieved from cache, False if newly generated.")
188
+
189
  # --- Query Models for GET Endpoints ---
190
  class ExtractedEmailQuery(BaseModel):
191
  contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
 
397
  return
398
  try:
399
  if generated_replies_collection is None:
400
+ # If DB is not available, set a specific exception on the future
401
+ if not future.done():
402
+ future.set_exception(HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service not available for caching/storage."))
403
+ return
404
 
405
  cache_query = {
406
  "original_email_text": request_data.email_text,
 
478
  continue
479
 
480
  now = asyncio.get_event_loop().time()
481
+ # Safety check: ensure queue is not empty before accessing index 0
482
+ if reply_request_queue:
483
+ oldest_item_timestamp = reply_request_queue[0][2]
484
+ else:
485
+ # If queue became empty while waiting, loop again
486
+ continue
487
 
488
  # Condition to trigger batch processing: queue is full OR timeout reached for oldest item
489
  if len(reply_request_queue) >= MAX_BATCH_SIZE or \
490
  (now - oldest_item_timestamp >= BATCH_TIMEOUT):
491
  num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
492
  for _ in range(num_to_take):
493
+ # Safety check: ensure queue is not empty before popping
494
+ if reply_request_queue:
495
+ req, fut, _ = reply_request_queue.pop(0)
496
+ batch_to_fire.append((req, fut))
497
  else:
498
  # Calculate time to wait for the next batch or timeout
499
  time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
 
508
  await asyncio.gather(*tasks)
509
  else:
510
  # Short sleep to prevent busy-waiting if queue is empty but not waiting
 
511
  await asyncio.sleep(0.001)
512
 
513
 
 
556
  print(f"Successfully connected to MongoDB: {DB_NAME}")
557
 
558
  # Start the batch processor task if not already running
 
559
  if batch_processor_task is None or batch_processor_task.done():
560
  batch_processor_task = asyncio.create_task(process_reply_batches())
561
  print("Batch processor task for replies started.")
 
564
 
565
  except (ConnectionFailure, OperationFailure) as e:
566
  print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
 
567
  client = None
568
  db = None
569
  extracted_emails_collection = None
 
576
  extracted_emails_collection = None
577
  generated_replies_collection = None
578
  finally:
 
579
  if client is not None and db is not None:
580
  try:
 
581
  client.admin.command('ping')
582
  except Exception as e:
583
  print(f"MongoDB ping failed after initial connection attempt during finally block: {e}")
584
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
585
  else:
586
  print("MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
 
587
  if client is None or db is None:
588
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
589
  print("FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
 
593
  async def shutdown_event():
594
  global client, batch_processor_task
595
  print("FastAPI app shutting down.")
 
596
  if batch_processor_task:
597
  batch_processor_task.cancel()
598
  try:
 
599
  await batch_processor_task
600
  except asyncio.CancelledError:
601
  print("Batch processor task for replies cancelled during shutdown.")
 
604
  traceback.print_exc()
605
  batch_processor_task = None
606
 
 
607
  if client:
608
  client.close()
609
  print("MongoDB client closed.")
 
619
  db_ok = False
620
  if client is not None and db is not None:
621
  try:
 
 
622
  await asyncio.to_thread(db.list_collection_names)
623
  db_status = "MongoDB connection OK."
624
  db_ok = True
625
  except Exception as e:
626
  db_status = f"MongoDB connection error: {e}"
627
+ db_ok = False
628
 
629
  batch_processor_status = "Batch processor not running."
630
  if batch_processor_task is not None:
631
  if not batch_processor_task.done():
632
  batch_processor_status = "Batch processor is running."
633
  else:
 
634
  if batch_processor_task.exception():
635
  batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}"
636
  else:
 
641
  if db_ok:
642
  return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
643
  else:
 
644
  raise HTTPException(
645
  status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
646
  detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status}
 
657
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.")
658
  try:
659
  current_date_val = date.today()
 
660
  extracted_data = await asyncio.to_thread(
661
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
662
  )
663
 
 
664
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
665
  if 'appointments' in extracted_data_dict:
666
  for appt in extracted_data_dict['appointments']:
 
667
  if isinstance(appt.get('start_date'), date):
668
  appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
669
  if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
 
673
  if isinstance(task_item.get('due_date'), date):
674
  task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
675
 
 
676
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
677
 
 
 
678
  extracted_data.id = result.inserted_id
679
  return extracted_data
680
  except ValueError as e:
 
693
  raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Excel functionality is currently disabled.")
694
 
695
 
696
+ @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email (batched & cached)")
697
  async def generate_email_reply(request: GenerateReplyRequest):
698
  """
699
  Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
 
710
  reply_queue_condition.notify() # Notify the batch processor that a new request is available
711
 
712
  try:
 
 
713
  client_timeout = BATCH_TIMEOUT + 10.0 # e.g., 0.5s batch + 10s LLM response buffer
714
  result = await asyncio.wait_for(future, timeout=client_timeout)
715
  return result
716
  except asyncio.TimeoutError:
 
717
  if not future.done():
718
  future.cancel()
719
  raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long.")
720
  except Exception as e:
 
721
  if isinstance(e, HTTPException):
722
  raise e
723
  traceback.print_exc()
 
752
 
753
  results = []
754
  for doc_raw in extracted_docs_raw:
 
 
 
 
 
 
755
  if 'appointments' in doc_raw:
756
  for appt in doc_raw['appointments']:
757
  if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
 
780
  if query_params.from_date:
781
  date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
782
  if query_params.to_date:
 
783
  date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
784
  if date_query:
785
  mongo_query["generated_at"] = date_query
 
789
  generated_docs_raw = await asyncio.to_thread(list, cursor)
790
  results = []
791
  for doc_raw in generated_docs_raw:
 
 
 
792
  results.append(GeneratedReplyData(**doc_raw))
793
  return results
794
  except Exception as e: