precison9 commited on
Commit
33599e3
·
verified ·
1 Parent(s): 0f1b423

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +112 -42
flask_Character.py CHANGED
@@ -354,31 +354,41 @@ def _generate_response_internal(
354
  """
355
  Internal function to generate a reply to an email using LLM.
356
  """
 
357
  if not email_text:
 
358
  return "Cannot generate reply for empty email text."
359
- llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
360
- prompt_template_str="""
361
- You are an assistant that helps reply to emails.
 
 
362
 
363
- Create a response to the following email with the following parameters:
364
- - Language: {language}
365
- - Length: {length}
366
- - Style: {style}
367
- - Tone: {tone}
368
- - Emoji usage: {emoji}
369
 
370
- Email:
371
- {email}
372
 
373
- Write only the reply body. Do not repeat the email or mention any instruction.
374
- """
375
- prompt = PromptTemplate(
376
- input_variables=["email", "language", "length", "style", "tone", "emoji"],
377
- template=prompt_template_str
378
- )
379
- chain = prompt | llm
380
- output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
381
- return output.content.strip()
 
 
 
 
 
 
382
 
383
  # --- Batching and Caching Configuration ---
384
  MAX_BATCH_SIZE = 20
@@ -393,11 +403,13 @@ batch_processor_task: Optional[asyncio.Task] = None
393
  # --- Batch Processor and Handler ---
394
  async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future):
395
  """Handles a single request: checks cache, calls LLM, stores result, and sets future."""
 
396
  if future.cancelled():
 
397
  return
398
  try:
399
  if generated_replies_collection is None:
400
- # If DB is not available, set a specific exception on the future
401
  if not future.done():
402
  future.set_exception(HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service not available for caching/storage."))
403
  return
@@ -410,10 +422,12 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
410
  "tone": request_data.tone,
411
  "emoji": request_data.emoji,
412
  }
 
413
  # Use await asyncio.to_thread for blocking MongoDB operations
414
  cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
415
 
416
  if cached_reply_doc:
 
417
  response = {
418
  "reply": cached_reply_doc["generated_reply_text"],
419
  "stored_id": str(cached_reply_doc["_id"]),
@@ -421,8 +435,10 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
421
  }
422
  if not future.done():
423
  future.set_result(response)
 
424
  return
425
 
 
426
  reply_content = await asyncio.to_thread(
427
  _generate_response_internal,
428
  request_data.email_text,
@@ -433,6 +449,7 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
433
  request_data.tone,
434
  request_data.emoji
435
  )
 
436
 
437
  reply_data_to_store = GeneratedReplyData(
438
  original_email_text=request_data.email_text,
@@ -443,11 +460,13 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
443
  tone=request_data.tone,
444
  emoji=request_data.emoji
445
  )
 
446
  # Use model_dump for Pydantic v2
447
  reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
448
 
449
  insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
450
  stored_id = str(insert_result.inserted_id)
 
451
 
452
  final_response = {
453
  "reply": reply_content,
@@ -456,25 +475,31 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
456
  }
457
  if not future.done():
458
  future.set_result(final_response)
 
459
 
460
  except Exception as e:
461
- traceback.print_exc()
 
462
  if not future.done():
463
  # Set the exception on the future so the client can catch it
464
  future.set_exception(HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate reply: {e}"))
 
465
 
466
 
467
  async def process_reply_batches():
468
  """Continuously processes requests from the reply_request_queue in batches."""
469
  global reply_request_queue
 
470
  while True:
471
  batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
472
  async with reply_queue_condition:
473
  if not reply_request_queue:
 
474
  # Wait for new requests or timeout
475
  await reply_queue_condition.wait()
476
  # After waking up, re-check if queue is still empty
477
  if not reply_request_queue:
 
478
  continue
479
 
480
  now = asyncio.get_event_loop().time()
@@ -483,8 +508,11 @@ async def process_reply_batches():
483
  oldest_item_timestamp = reply_request_queue[0][2]
484
  else:
485
  # If queue became empty while waiting, loop again
 
486
  continue
487
 
 
 
488
  # Condition to trigger batch processing: queue is full OR timeout reached for oldest item
489
  if len(reply_request_queue) >= MAX_BATCH_SIZE or \
490
  (now - oldest_item_timestamp >= BATCH_TIMEOUT):
@@ -494,18 +522,22 @@ async def process_reply_batches():
494
  if reply_request_queue:
495
  req, fut, _ = reply_request_queue.pop(0)
496
  batch_to_fire.append((req, fut))
 
497
  else:
498
  # Calculate time to wait for the next batch or timeout
499
  time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
 
500
  try:
501
  await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
502
  except asyncio.TimeoutError:
 
503
  pass # Loop will re-evaluate and likely fire the batch
504
 
505
  if batch_to_fire:
506
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
507
- # Use asyncio.gather to run all tasks in the batch concurrently
508
  await asyncio.gather(*tasks)
 
509
  else:
510
  # Short sleep to prevent busy-waiting if queue is empty but not waiting
511
  await asyncio.sleep(0.001)
@@ -525,13 +557,14 @@ app = FastAPI(
525
  @app.exception_handler(StarletteHTTPException)
526
  async def custom_http_exception_handler_wrapper(request, exc):
527
  """Handles FastAPI's internal HTTP exceptions."""
 
528
  return await http_exception_handler(request, exc)
529
 
530
  # Catch all other unhandled exceptions
531
  @app.exception_handler(Exception)
532
  async def global_exception_handler_wrapper(request, exc):
533
  """Handles all unhandled exceptions and returns a consistent JSON error response."""
534
- print(f"Unhandled exception caught by global handler for request: {request.url}")
535
  traceback.print_exc() # Print traceback to console for debugging
536
  # Return a JSON response for consistency, even for unhandled errors
537
  return Response(
@@ -545,7 +578,7 @@ async def global_exception_handler_wrapper(request, exc):
545
  @app.on_event("startup")
546
  async def startup_event():
547
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
548
- print("FastAPI app startup sequence initiated.")
549
  try:
550
  # Connect to MongoDB
551
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
@@ -553,23 +586,23 @@ async def startup_event():
553
  db = client[DB_NAME]
554
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
555
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
556
- print(f"Successfully connected to MongoDB: {DB_NAME}")
557
 
558
  # Start the batch processor task if not already running
559
  if batch_processor_task is None or batch_processor_task.done():
560
  batch_processor_task = asyncio.create_task(process_reply_batches())
561
- print("Batch processor task for replies started.")
562
  else:
563
- print("Batch processor task for replies is already running or being initialized.")
564
 
565
  except (ConnectionFailure, OperationFailure) as e:
566
- print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
567
  client = None
568
  db = None
569
  extracted_emails_collection = None
570
  generated_replies_collection = None
571
  except Exception as e:
572
- print(f"ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}")
573
  traceback.print_exc()
574
  client = None
575
  db = None
@@ -580,33 +613,34 @@ async def startup_event():
580
  try:
581
  client.admin.command('ping')
582
  except Exception as e:
583
- print(f"MongoDB ping failed after initial connection attempt during finally block: {e}")
584
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
585
  else:
586
- print("MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
587
  if client is None or db is None:
588
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
589
- print("FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
590
 
591
 
592
  @app.on_event("shutdown")
593
  async def shutdown_event():
594
  global client, batch_processor_task
595
- print("FastAPI app shutting down.")
596
  if batch_processor_task:
597
  batch_processor_task.cancel()
598
  try:
599
  await batch_processor_task
 
600
  except asyncio.CancelledError:
601
- print("Batch processor task for replies cancelled during shutdown.")
602
  except Exception as e:
603
- print(f"Error during batch processor task shutdown: {e}")
604
  traceback.print_exc()
605
  batch_processor_task = None
606
 
607
  if client:
608
  client.close()
609
- print("MongoDB client closed.")
610
 
611
 
612
  # --- API Endpoints ---
@@ -619,6 +653,7 @@ async def health_check():
619
  db_ok = False
620
  if client is not None and db is not None:
621
  try:
 
622
  await asyncio.to_thread(db.list_collection_names)
623
  db_status = "MongoDB connection OK."
624
  db_ok = True
@@ -653,15 +688,22 @@ async def extract_email_data(request: ProcessEmailRequest):
653
  Receives an email, extracts contacts, appointments, and tasks using an LLM,
654
  and stores the extracted data in MongoDB.
655
  """
 
656
  if extracted_emails_collection is None:
 
657
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.")
658
  try:
659
  current_date_val = date.today()
 
660
  extracted_data = await asyncio.to_thread(
661
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
662
  )
 
663
 
664
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
 
 
 
665
  if 'appointments' in extracted_data_dict:
666
  for appt in extracted_data_dict['appointments']:
667
  if isinstance(appt.get('start_date'), date):
@@ -673,13 +715,17 @@ async def extract_email_data(request: ProcessEmailRequest):
673
  if isinstance(task_item.get('due_date'), date):
674
  task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
675
 
 
676
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
 
677
 
678
  extracted_data.id = result.inserted_id
679
  return extracted_data
680
  except ValueError as e:
 
681
  raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
682
  except Exception as e:
 
683
  traceback.print_exc()
684
  raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during data extraction: {e}")
685
 
@@ -699,7 +745,9 @@ async def generate_email_reply(request: GenerateReplyRequest):
699
  Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
700
  Uses a batch processing system with caching for efficiency.
701
  """
 
702
  if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
 
703
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Reply generation service not fully initialized. Check server logs for database or batch processor issues.")
704
 
705
  future = asyncio.Future()
@@ -708,25 +756,34 @@ async def generate_email_reply(request: GenerateReplyRequest):
708
  async with reply_queue_condition:
709
  reply_request_queue.append((request, future, current_time))
710
  reply_queue_condition.notify() # Notify the batch processor that a new request is available
 
711
 
712
  try:
713
- client_timeout = BATCH_TIMEOUT + 10.0 # e.g., 0.5s batch + 10s LLM response buffer
 
 
714
  result = await asyncio.wait_for(future, timeout=client_timeout)
 
715
  return result
716
  except asyncio.TimeoutError:
 
717
  if not future.done():
718
- future.cancel()
719
- raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long.")
720
  except Exception as e:
721
  if isinstance(e, HTTPException):
722
- raise e
 
 
723
  traceback.print_exc()
724
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error processing your reply request: {str(e)}")
725
 
726
 
727
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
728
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
 
729
  if extracted_emails_collection is None:
 
730
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying extracted emails.")
731
  mongo_query: Dict[str, Any] = {}
732
  if query_params.contact_name:
@@ -745,13 +802,17 @@ async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = De
745
  date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
746
  if date_query :
747
  mongo_query["processed_at"] = date_query
 
748
 
749
  try:
 
750
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
751
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
 
752
 
753
  results = []
754
  for doc_raw in extracted_docs_raw:
 
755
  if 'appointments' in doc_raw:
756
  for appt in doc_raw['appointments']:
757
  if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
@@ -760,15 +821,19 @@ async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = De
760
  for task_item in doc_raw['tasks']:
761
  if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
762
  results.append(ExtractedData(**doc_raw))
 
763
  return results
764
  except Exception as e:
 
765
  traceback.print_exc()
766
  raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying extracted emails: {e}")
767
 
768
 
769
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
770
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
 
771
  if generated_replies_collection is None:
 
772
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying generated replies.")
773
  mongo_query: Dict[str, Any] = {}
774
  if query_params.language: mongo_query["language"] = query_params.language
@@ -783,14 +848,19 @@ async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = D
783
  date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
784
  if date_query:
785
  mongo_query["generated_at"] = date_query
 
786
 
787
  try:
 
788
  cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
789
  generated_docs_raw = await asyncio.to_thread(list, cursor)
 
790
  results = []
791
  for doc_raw in generated_docs_raw:
792
  results.append(GeneratedReplyData(**doc_raw))
 
793
  return results
794
  except Exception as e:
 
795
  traceback.print_exc()
796
  raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying generated replies: {e}")
 
354
  """
355
  Internal function to generate a reply to an email using LLM.
356
  """
357
+ print(f"[{datetime.now()}] _generate_response_internal: Starting LLM call. API Key starts with: {api_key[:5]}...") # Debug log
358
  if not email_text:
359
+ print(f"[{datetime.now()}] _generate_response_internal: Email text is empty.")
360
  return "Cannot generate reply for empty email text."
361
+
362
+ try:
363
+ llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
364
+ prompt_template_str="""
365
+ You are an assistant that helps reply to emails.
366
 
367
+ Create a response to the following email with the following parameters:
368
+ - Language: {language}
369
+ - Length: {length}
370
+ - Style: {style}
371
+ - Tone: {tone}
372
+ - Emoji usage: {emoji}
373
 
374
+ Email:
375
+ {email}
376
 
377
+ Write only the reply body. Do not repeat the email or mention any instruction.
378
+ """
379
+ prompt = PromptTemplate(
380
+ input_variables=["email", "language", "length", "style", "tone", "emoji"],
381
+ template=prompt_template_str
382
+ )
383
+ chain = prompt | llm
384
+ print(f"[{datetime.now()}] _generate_response_internal: Invoking LLM chain...") # Debug log
385
+ output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
386
+ print(f"[{datetime.now()}] _generate_response_internal: LLM chain returned. Content length: {len(output.content)}.") # Debug log
387
+ return output.content.strip()
388
+ except Exception as e:
389
+ print(f"[{datetime.now()}] _generate_response_internal: ERROR during LLM invocation: {e}") # Debug log
390
+ traceback.print_exc() # Print full traceback to logs
391
+ raise # Re-raise the exception so it can be caught by handle_single_reply_request
392
 
393
  # --- Batching and Caching Configuration ---
394
  MAX_BATCH_SIZE = 20
 
403
  # --- Batch Processor and Handler ---
404
  async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future):
405
  """Handles a single request: checks cache, calls LLM, stores result, and sets future."""
406
+ print(f"[{datetime.now()}] Handle single reply: Starting for email_text_start='{request_data.email_text[:50]}'...")
407
  if future.cancelled():
408
+ print(f"[{datetime.now()}] Handle single reply: Future cancelled. Aborting.")
409
  return
410
  try:
411
  if generated_replies_collection is None:
412
+ print(f"[{datetime.now()}] Handle single reply: DB collection 'generated_replies_collection' is None.")
413
  if not future.done():
414
  future.set_exception(HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service not available for caching/storage."))
415
  return
 
422
  "tone": request_data.tone,
423
  "emoji": request_data.emoji,
424
  }
425
+ print(f"[{datetime.now()}] Handle single reply: Checking cache for reply...")
426
  # Use await asyncio.to_thread for blocking MongoDB operations
427
  cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
428
 
429
  if cached_reply_doc:
430
+ print(f"[{datetime.now()}] Handle single reply: Reply found in cache. ID: {str(cached_reply_doc['_id'])}")
431
  response = {
432
  "reply": cached_reply_doc["generated_reply_text"],
433
  "stored_id": str(cached_reply_doc["_id"]),
 
435
  }
436
  if not future.done():
437
  future.set_result(response)
438
+ print(f"[{datetime.now()}] Handle single reply: Cache result set on future.")
439
  return
440
 
441
+ print(f"[{datetime.now()}] Handle single reply: Reply not in cache. Calling LLM...")
442
  reply_content = await asyncio.to_thread(
443
  _generate_response_internal,
444
  request_data.email_text,
 
449
  request_data.tone,
450
  request_data.emoji
451
  )
452
+ print(f"[{datetime.now()}] Handle single reply: LLM call completed. Reply length: {len(reply_content)}.")
453
 
454
  reply_data_to_store = GeneratedReplyData(
455
  original_email_text=request_data.email_text,
 
460
  tone=request_data.tone,
461
  emoji=request_data.emoji
462
  )
463
+ print(f"[{datetime.now()}] Handle single reply: Storing reply in DB...")
464
  # Use model_dump for Pydantic v2
465
  reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
466
 
467
  insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
468
  stored_id = str(insert_result.inserted_id)
469
+ print(f"[{datetime.now()}] Handle single reply: Reply stored in DB. ID: {stored_id}")
470
 
471
  final_response = {
472
  "reply": reply_content,
 
475
  }
476
  if not future.done():
477
  future.set_result(final_response)
478
+ print(f"[{datetime.now()}] Handle single reply: Final result set on future.")
479
 
480
  except Exception as e:
481
+ print(f"[{datetime.now()}] Handle single reply: EXCEPTION: {e}")
482
+ traceback.print_exc() # Print full traceback to logs
483
  if not future.done():
484
  # Set the exception on the future so the client can catch it
485
  future.set_exception(HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate reply: {e}"))
486
+ print(f"[{datetime.now()}] Handle single reply: Exception set on future.")
487
 
488
 
489
  async def process_reply_batches():
490
  """Continuously processes requests from the reply_request_queue in batches."""
491
  global reply_request_queue
492
+ print(f"[{datetime.now()}] Batch processor task started.")
493
  while True:
494
  batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
495
  async with reply_queue_condition:
496
  if not reply_request_queue:
497
+ print(f"[{datetime.now()}] Batch processor: Queue empty, waiting for requests...")
498
  # Wait for new requests or timeout
499
  await reply_queue_condition.wait()
500
  # After waking up, re-check if queue is still empty
501
  if not reply_request_queue:
502
+ print(f"[{datetime.now()}] Batch processor: Woke up, queue still empty. Continuing loop.")
503
  continue
504
 
505
  now = asyncio.get_event_loop().time()
 
508
  oldest_item_timestamp = reply_request_queue[0][2]
509
  else:
510
  # If queue became empty while waiting, loop again
511
+ print(f"[{datetime.now()}] Batch processor: Queue became empty before processing. Restarting loop.")
512
  continue
513
 
514
+ print(f"[{datetime.now()}] Batch processor: Woke up. Queue size: {len(reply_request_queue)}. Oldest item age: {now - oldest_item_timestamp:.2f}s")
515
+
516
  # Condition to trigger batch processing: queue is full OR timeout reached for oldest item
517
  if len(reply_request_queue) >= MAX_BATCH_SIZE or \
518
  (now - oldest_item_timestamp >= BATCH_TIMEOUT):
 
522
  if reply_request_queue:
523
  req, fut, _ = reply_request_queue.pop(0)
524
  batch_to_fire.append((req, fut))
525
+ print(f"[{datetime.now()}] Batch processor: Firing batch of {len(batch_to_fire)} requests.")
526
  else:
527
  # Calculate time to wait for the next batch or timeout
528
  time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
529
+ print(f"[{datetime.now()}] Batch processor: Not enough requests or timeout not reached. Waiting for {time_to_wait:.2f}s.")
530
  try:
531
  await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
532
  except asyncio.TimeoutError:
533
+ print(f"[{datetime.now()}] Batch processor: wait timed out.")
534
  pass # Loop will re-evaluate and likely fire the batch
535
 
536
  if batch_to_fire:
537
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
538
+ print(f"[{datetime.now()}] Batch processor: Awaiting completion of {len(tasks)} single reply tasks.")
539
  await asyncio.gather(*tasks)
540
+ print(f"[{datetime.now()}] Batch processor: Batch processing complete.")
541
  else:
542
  # Short sleep to prevent busy-waiting if queue is empty but not waiting
543
  await asyncio.sleep(0.001)
 
557
  @app.exception_handler(StarletteHTTPException)
558
  async def custom_http_exception_handler_wrapper(request, exc):
559
  """Handles FastAPI's internal HTTP exceptions."""
560
+ print(f"[{datetime.now()}] Caught StarletteHTTPException: {exc.status_code} - {exc.detail}")
561
  return await http_exception_handler(request, exc)
562
 
563
  # Catch all other unhandled exceptions
564
  @app.exception_handler(Exception)
565
  async def global_exception_handler_wrapper(request, exc):
566
  """Handles all unhandled exceptions and returns a consistent JSON error response."""
567
+ print(f"[{datetime.now()}] Unhandled exception caught by global handler for request: {request.url}")
568
  traceback.print_exc() # Print traceback to console for debugging
569
  # Return a JSON response for consistency, even for unhandled errors
570
  return Response(
 
578
  @app.on_event("startup")
579
  async def startup_event():
580
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
581
+ print(f"[{datetime.now()}] FastAPI app startup sequence initiated.")
582
  try:
583
  # Connect to MongoDB
584
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
 
586
  db = client[DB_NAME]
587
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
588
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
589
+ print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}")
590
 
591
  # Start the batch processor task if not already running
592
  if batch_processor_task is None or batch_processor_task.done():
593
  batch_processor_task = asyncio.create_task(process_reply_batches())
594
+ print(f"[{datetime.now()}] Batch processor task for replies started.")
595
  else:
596
+ print(f"[{datetime.now()}] Batch processor task for replies is already running or being initialized.")
597
 
598
  except (ConnectionFailure, OperationFailure) as e:
599
+ print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}")
600
  client = None
601
  db = None
602
  extracted_emails_collection = None
603
  generated_replies_collection = None
604
  except Exception as e:
605
+ print(f"[{datetime.now()}] ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}")
606
  traceback.print_exc()
607
  client = None
608
  db = None
 
613
  try:
614
  client.admin.command('ping')
615
  except Exception as e:
616
+ print(f"[{datetime.now()}] MongoDB ping failed after initial connection attempt during finally block: {e}")
617
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
618
  else:
619
+ print(f"[{datetime.now()}] MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
620
  if client is None or db is None:
621
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
622
+ print(f"[{datetime.now()}] FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
623
 
624
 
625
  @app.on_event("shutdown")
626
  async def shutdown_event():
627
  global client, batch_processor_task
628
+ print(f"[{datetime.now()}] FastAPI app shutting down.")
629
  if batch_processor_task:
630
  batch_processor_task.cancel()
631
  try:
632
  await batch_processor_task
633
+ print(f"[{datetime.now()}] Batch processor task awaited.")
634
  except asyncio.CancelledError:
635
+ print(f"[{datetime.now()}] Batch processor task for replies cancelled during shutdown.")
636
  except Exception as e:
637
+ print(f"[{datetime.now()}] Error during batch processor task shutdown: {e}")
638
  traceback.print_exc()
639
  batch_processor_task = None
640
 
641
  if client:
642
  client.close()
643
+ print(f"[{datetime.now()}] MongoDB client closed.")
644
 
645
 
646
  # --- API Endpoints ---
 
653
  db_ok = False
654
  if client is not None and db is not None:
655
  try:
656
+ # Use asyncio.to_thread for blocking MongoDB call
657
  await asyncio.to_thread(db.list_collection_names)
658
  db_status = "MongoDB connection OK."
659
  db_ok = True
 
688
  Receives an email, extracts contacts, appointments, and tasks using an LLM,
689
  and stores the extracted data in MongoDB.
690
  """
691
+ print(f"[{datetime.now()}] /extract-data: Received request.")
692
  if extracted_emails_collection is None:
693
+ print(f"[{datetime.now()}] /extract-data: MongoDB collection is None.")
694
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.")
695
  try:
696
  current_date_val = date.today()
697
+ print(f"[{datetime.now()}] /extract-data: Calling internal processing function.")
698
  extracted_data = await asyncio.to_thread(
699
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
700
  )
701
+ print(f"[{datetime.now()}] /extract-data: Internal processing complete. Preparing for DB insert.")
702
 
703
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
704
+ # Convert date objects to datetime for MongoDB storage if they are just date objects
705
+ # Pydantic's default `date` handling might serialize to ISO string, but for
706
+ # internal MongoDB storage, sometimes `datetime` is preferred for consistency.
707
  if 'appointments' in extracted_data_dict:
708
  for appt in extracted_data_dict['appointments']:
709
  if isinstance(appt.get('start_date'), date):
 
715
  if isinstance(task_item.get('due_date'), date):
716
  task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
717
 
718
+ print(f"[{datetime.now()}] /extract-data: Inserting into MongoDB...")
719
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
720
+ print(f"[{datetime.now()}] /extract-data: Data inserted into MongoDB. ID: {result.inserted_id}")
721
 
722
  extracted_data.id = result.inserted_id
723
  return extracted_data
724
  except ValueError as e:
725
+ print(f"[{datetime.now()}] /extract-data: ValueError: {e}")
726
  raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
727
  except Exception as e:
728
+ print(f"[{datetime.now()}] /extract-data: Unhandled Exception: {e}")
729
  traceback.print_exc()
730
  raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during data extraction: {e}")
731
 
 
745
  Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
746
  Uses a batch processing system with caching for efficiency.
747
  """
748
+ print(f"[{datetime.now()}] /generate-reply: Received request.")
749
  if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
750
+ print(f"[{datetime.now()}] /generate-reply: Service not initialized. gen_replies_coll={generated_replies_collection is not None}, batch_task={batch_processor_task is not None}, queue_cond={reply_queue_condition is not None}")
751
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Reply generation service not fully initialized. Check server logs for database or batch processor issues.")
752
 
753
  future = asyncio.Future()
 
756
  async with reply_queue_condition:
757
  reply_request_queue.append((request, future, current_time))
758
  reply_queue_condition.notify() # Notify the batch processor that a new request is available
759
+ print(f"[{datetime.now()}] /generate-reply: Request added to queue, notifying batch processor. Queue size: {len(reply_request_queue)}")
760
 
761
  try:
762
+ # Debugging: Increase timeout significantly to allow full tracing in logs
763
+ client_timeout = BATCH_TIMEOUT + 60.0 # Example: 0.5s batch + 60s LLM response buffer = 60.5s total timeout
764
+ print(f"[{datetime.now()}] /generate-reply: Waiting for future result with timeout {client_timeout}s.")
765
  result = await asyncio.wait_for(future, timeout=client_timeout)
766
+ print(f"[{datetime.now()}] /generate-reply: Future result received. Returning data.")
767
  return result
768
  except asyncio.TimeoutError:
769
+ print(f"[{datetime.now()}] /generate-reply: Client timeout waiting for future after {client_timeout}s. Future done: {future.done()}")
770
  if not future.done():
771
+ future.cancel() # Cancel if it's still pending
772
+ raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long. Check server logs for more details.")
773
  except Exception as e:
774
  if isinstance(e, HTTPException):
775
+ print(f"[{datetime.now()}] /generate-reply: Caught HTTPException: {e.status_code} - {e.detail}")
776
+ raise e # Re-raise FastAPI HTTPExceptions
777
+ print(f"[{datetime.now()}] /generate-reply: Unhandled Exception: {e}")
778
  traceback.print_exc()
779
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error processing your reply request: {str(e)}. Check server logs for more details.")
780
 
781
 
782
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
783
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
784
+ print(f"[{datetime.now()}] /query-extracted-emails: Received request with params: {query_params.model_dump_json()}")
785
  if extracted_emails_collection is None:
786
+ print(f"[{datetime.now()}] /query-extracted-emails: MongoDB collection is None.")
787
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying extracted emails.")
788
  mongo_query: Dict[str, Any] = {}
789
  if query_params.contact_name:
 
802
  date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
803
  if date_query :
804
  mongo_query["processed_at"] = date_query
805
+ print(f"[{datetime.now()}] /query-extracted-emails: MongoDB query built: {mongo_query}")
806
 
807
  try:
808
+ # Use await asyncio.to_thread for blocking MongoDB operations
809
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
810
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
811
+ print(f"[{datetime.now()}] /query-extracted-emails: Found {len(extracted_docs_raw)} documents.")
812
 
813
  results = []
814
  for doc_raw in extracted_docs_raw:
815
+ # Convert datetime objects back to date for Pydantic model validation if necessary
816
  if 'appointments' in doc_raw:
817
  for appt in doc_raw['appointments']:
818
  if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
 
821
  for task_item in doc_raw['tasks']:
822
  if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
823
  results.append(ExtractedData(**doc_raw))
824
+ print(f"[{datetime.now()}] /query-extracted-emails: Returning {len(results)} results.")
825
  return results
826
  except Exception as e:
827
+ print(f"[{datetime.now()}] /query-extracted-emails: Unhandled Exception during query: {e}")
828
  traceback.print_exc()
829
  raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying extracted emails: {e}")
830
 
831
 
832
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
833
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
834
+ print(f"[{datetime.now()}] /query-generated-replies: Received request with params: {query_params.model_dump_json()}")
835
  if generated_replies_collection is None:
836
+ print(f"[{datetime.now()}] /query-generated-replies: MongoDB collection is None.")
837
  raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying generated replies.")
838
  mongo_query: Dict[str, Any] = {}
839
  if query_params.language: mongo_query["language"] = query_params.language
 
848
  date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
849
  if date_query:
850
  mongo_query["generated_at"] = date_query
851
+ print(f"[{datetime.now()}] /query-generated-replies: MongoDB query built: {mongo_query}")
852
 
853
  try:
854
+ # Use await asyncio.to_thread for blocking MongoDB operations
855
  cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
856
  generated_docs_raw = await asyncio.to_thread(list, cursor)
857
+ print(f"[{datetime.now()}] /query-generated-replies: Found {len(generated_docs_raw)} documents.")
858
  results = []
859
  for doc_raw in generated_docs_raw:
860
  results.append(GeneratedReplyData(**doc_raw))
861
+ print(f"[{datetime.now()}] /query-generated-replies: Returning {len(results)} results.")
862
  return results
863
  except Exception as e:
864
+ print(f"[{datetime.now()}] /query-generated-replies: Unhandled Exception during query: {e}")
865
  traceback.print_exc()
866
  raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying generated replies: {e}")