precison9 commited on
Commit
ec704f6
·
verified ·
1 Parent(s): b33297e

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +228 -99
flask_Character.py CHANGED
@@ -1,4 +1,3 @@
1
-
2
  # This software is licensed under a **dual-license model**
3
  # For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
4
  # Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
@@ -20,44 +19,75 @@ from starlette.exceptions import HTTPException as StarletteHTTPException
20
  from langchain.prompts import PromptTemplate
21
  from langchain_groq import ChatGroq
22
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
 
23
  from typing_extensions import Annotated
 
24
 
25
  from pymongo import MongoClient
26
  from pymongo.errors import ConnectionFailure, OperationFailure
27
  from bson import ObjectId
28
 
29
  # --- MongoDB Configuration ---
30
- MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net" # Replace with your actual URI
 
 
31
  DB_NAME = "email_assistant_db"
32
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
33
  GENERATED_REPLIES_COLLECTION = "generated_replies"
34
 
 
35
  client: Optional[MongoClient] = None
36
- db: Optional[Any] = None # Changed to Optional[Any] as Database type is not directly imported for annotation here
37
  extracted_emails_collection: Optional[Any] = None
38
  generated_replies_collection: Optional[Any] = None
39
 
40
  # --- Pydantic ObjectId Handling ---
41
  class CustomObjectId(str):
 
 
 
 
 
42
  @classmethod
43
  def __get_validators__(cls):
44
  yield cls.validate
45
 
46
  @classmethod
47
- def validate(cls, v, info):
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  if not ObjectId.is_valid(v):
49
- raise ValueError("Invalid ObjectId")
50
- return str(v)
51
 
 
 
52
  @classmethod
53
- def __get_pydantic_json_schema__(cls, core_schema, handler):
54
- json_schema = handler(core_schema)
55
- json_schema["type"] = "string"
56
- json_schema["example"] = "60c728ef238b9c7b9e0f6c2a"
 
 
 
 
57
  return json_schema
58
 
 
59
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
60
 
 
61
  # ---------------------- Models ----------------------
62
  class Contact(BaseModel):
63
  name: str
@@ -79,6 +109,7 @@ class Task(BaseModel):
79
  due_date: date
80
 
81
  class ExtractedData(BaseModel):
 
82
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
83
  contacts: List[Contact]
84
  appointments: List[Appointment]
@@ -87,14 +118,17 @@ class ExtractedData(BaseModel):
87
  processed_at: datetime = Field(default_factory=datetime.utcnow)
88
 
89
  class Config:
90
- populate_by_name = True
91
- arbitrary_types_allowed = True
92
 
 
93
  @model_serializer(when_used='json')
94
  def serialize_model(self):
95
  data = self.model_dump(by_alias=True, exclude_none=True)
 
96
  if "_id" in data and isinstance(data["_id"], ObjectId):
97
  data["_id"] = str(data["_id"])
 
98
  if 'appointments' in data:
99
  for appt in data['appointments']:
100
  if isinstance(appt.get('start_date'), date):
@@ -121,6 +155,7 @@ class GenerateReplyRequest(BaseModel):
121
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
122
 
123
  class GeneratedReplyData(BaseModel):
 
124
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
125
  original_email_text: str
126
  generated_reply_text: str
@@ -161,30 +196,42 @@ class GeneratedReplyQuery(BaseModel):
161
 
162
  # ---------------------- Utility Functions ----------------------
163
  def extract_last_json_block(text: str) -> Optional[str]:
 
 
 
 
164
  pattern = r'```json\s*(.*?)\s*```'
165
  matches = re.findall(pattern, text, re.DOTALL)
166
  if matches:
167
  return matches[-1].strip()
 
168
  match = re.search(r'\{.*\}', text, re.DOTALL)
169
  if match:
170
  return match.group(0)
171
  return None
172
 
173
- def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]: # Allow None to pass through
174
- if not date_str: return None # If input is None (e.g. optional end_date), return None
 
 
 
 
 
175
  date_str_lower = date_str.lower().strip()
176
- if date_str_lower == "today": return current_date
177
- if date_str_lower == "tomorrow": return current_date + timedelta(days=1)
 
 
178
  try:
179
  return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
180
  except ValueError:
181
- # For "appointments" "start_date", you assumed "today" if parsing failed or not present.
182
- # For "end_date" it was optional. We need to be consistent.
183
- # Given the original normalize_llm_output, start_date defaulted to today, end_date was optional.
184
- # This parse_date is more general. The default handling should be in normalize_llm_output.
185
- return current_date # Fallback, or raise error, or return None depending on strictness
186
 
187
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
 
 
 
 
188
  def split_name(full_name: str) -> tuple[str, str]:
189
  parts = full_name.strip().split()
190
  name = parts[0] if parts else ""
@@ -198,8 +245,10 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
198
 
199
  appointments_data = []
200
  for a in data.get("appointments", []):
201
- start_date_val = parse_date(a.get("start_date", "today"), current_date) or current_date # Default to current_date if parse_date returns None
202
- end_date_val = parse_date(a.get("end_date"), current_date) # parse_date can return None if end_date is not provided or invalid
 
 
203
 
204
  appointments_data.append(Appointment(
205
  title=a.get("title", "Untitled"), description=a.get("description", "No description"),
@@ -209,7 +258,8 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
209
 
210
  tasks_data = []
211
  for t in data.get("tasks", []):
212
- due_date_val = parse_date(t.get("due_date", "today"), current_date) or current_date # Default to current_date
 
213
  tasks_data.append(Task(
214
  task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
215
  due_date=due_date_val
@@ -218,11 +268,17 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
218
 
219
  # ---------------------- Core Logic (Internal Functions) ----------------------
220
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
221
- if not email_text: raise ValueError("Email text cannot be empty for processing.")
 
 
 
 
 
222
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
 
223
  prompt_today_str = current_date.isoformat()
224
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
225
- # Ensure your full, detailed prompt is used here
226
  prompt_template_str = f"""
227
  You are an expert email assistant tasked with extracting structured information from an Italian email.
228
 
@@ -263,24 +319,18 @@ Email:
263
  prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
264
  chain = prompt_template | llm
265
  try:
266
- # print(f"DEBUG: Invoking LLM with email_text length: {len(email_text)} and current_date: {current_date}")
267
  llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
268
  llm_output_str = llm_output.content
269
- # print(f"DEBUG: Raw LLM output:\n{llm_output_str[:500]}...")
270
 
271
  json_str = extract_last_json_block(llm_output_str)
272
- # print(f"DEBUG: Extracted JSON string:\n{json_str}")
273
 
274
- if not json_str: raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
 
275
  json_data = json.loads(json_str)
276
- # print(f"DEBUG: Parsed JSON data: {json.dumps(json_data, indent=2)}")
277
 
278
  extracted_data = normalize_llm_output(json_data, current_date, email_text)
279
- # print("DEBUG: Data normalized successfully.")
280
  return extracted_data
281
  except json.JSONDecodeError as e:
282
- # print(f"ERROR: JSON Decode Error: {e}")
283
- # print(f"ERROR: LLM response that caused error:\n{llm_output_str}")
284
  raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
285
  except Exception as e:
286
  traceback.print_exc()
@@ -290,9 +340,12 @@ def _generate_response_internal(
290
  email_text: str, api_key: str, language: Literal["Italian", "English"],
291
  length: str, style: str, tone: str, emoji: str
292
  ) -> str:
293
- if not email_text: return "Cannot generate reply for empty email text."
 
 
 
 
294
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
295
- # Ensure your full, detailed prompt is used here
296
  prompt_template_str="""
297
  You are an assistant that helps reply to emails.
298
 
@@ -343,6 +396,7 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
343
  "tone": request_data.tone,
344
  "emoji": request_data.emoji,
345
  }
 
346
  cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
347
 
348
  if cached_reply_doc:
@@ -351,7 +405,8 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
351
  "stored_id": str(cached_reply_doc["_id"]),
352
  "cached": True
353
  }
354
- if not future.done(): future.set_result(response)
 
355
  return
356
 
357
  reply_content = await asyncio.to_thread(
@@ -374,6 +429,7 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
374
  tone=request_data.tone,
375
  emoji=request_data.emoji
376
  )
 
377
  reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
378
 
379
  insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
@@ -384,7 +440,8 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
384
  "stored_id": stored_id,
385
  "cached": False
386
  }
387
- if not future.done(): future.set_result(final_response)
 
388
 
389
  except Exception as e:
390
  traceback.print_exc()
@@ -399,6 +456,7 @@ async def process_reply_batches():
399
  async with reply_queue_condition:
400
  if not reply_request_queue:
401
  await reply_queue_condition.wait()
 
402
  if not reply_request_queue:
403
  continue
404
 
@@ -422,28 +480,31 @@ async def process_reply_batches():
422
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
423
  await asyncio.gather(*tasks)
424
  else:
425
- await asyncio.sleep(0.001)
 
426
 
427
 
428
  # ---------------------- FastAPI Application ----------------------
429
  app = FastAPI(
430
  title="Email Assistant API",
431
  description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
432
- version="1.1.0", # Incremented version
433
- docs_url="/",
434
  redoc_url="/redoc"
435
  )
436
 
437
  # --- Global Exception Handler ---
 
438
  @app.exception_handler(StarletteHTTPException)
439
  async def custom_http_exception_handler_wrapper(request, exc):
440
  return await http_exception_handler(request, exc)
441
 
 
442
  @app.exception_handler(Exception)
443
  async def global_exception_handler_wrapper(request, exc):
444
  print(f"Unhandled exception caught by global handler for request: {request.url}")
445
- traceback.print_exc()
446
- # Ensure it returns a valid FastAPI response
447
  return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}"}), status_code=500, media_type="application/json")
448
 
449
 
@@ -452,20 +513,23 @@ async def global_exception_handler_wrapper(request, exc):
452
  async def startup_event():
453
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
454
  try:
 
455
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
456
- client.admin.command('ping')
457
  db = client[DB_NAME]
458
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
459
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
460
  print(f"Successfully connected to MongoDB: {DB_NAME}")
461
 
 
462
  if batch_processor_task is None:
463
- loop = asyncio.get_event_loop()
464
- batch_processor_task = loop.create_task(process_reply_batches())
465
  print("Batch processor task for replies started.")
466
 
467
  except (ConnectionFailure, OperationFailure) as e:
468
  print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
 
469
  client = None
470
  db = None
471
  extracted_emails_collection = None
@@ -478,87 +542,117 @@ async def startup_event():
478
  extracted_emails_collection = None
479
  generated_replies_collection = None
480
  finally:
481
- # Corrected condition for checking client and db
482
  if client is not None and db is not None:
483
  try:
 
484
  client.admin.command('ping')
485
- except Exception:
486
- print("MongoDB ping failed after initial connection attempt during finally block.")
487
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
488
  else:
489
- print("MongoDB client or db object is None after connection attempt in startup.")
490
- if client is None or db is None: # Ensure all are None if one is
 
491
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
492
- print("FastAPI app starting up. MongoDB client & Batch Processor initialization attempted.")
493
 
494
 
495
  @app.on_event("shutdown")
496
  async def shutdown_event():
497
  global client, batch_processor_task
 
498
  if batch_processor_task:
499
  batch_processor_task.cancel()
500
  try:
 
501
  await batch_processor_task
502
  except asyncio.CancelledError:
503
- print("Batch processor task for replies cancelled.")
504
  except Exception as e:
505
  print(f"Error during batch processor task shutdown: {e}")
506
  traceback.print_exc()
507
  batch_processor_task = None
508
 
 
509
  if client:
510
  client.close()
511
  print("FastAPI app shutting down. MongoDB client closed.")
512
 
513
 
 
514
  @app.get("/health", summary="Health Check")
515
  async def health_check():
516
- db_status = "MongoDB not connected. Check server startup logs."
 
 
 
517
  db_ok = False
518
- if client is not None and db is not None: # Corrected check
519
  try:
520
- db.list_collection_names()
 
521
  db_status = "MongoDB connection OK."
522
  db_ok = True
523
  except Exception as e:
524
  db_status = f"MongoDB connection error: {e}"
 
525
 
526
- batch_processor_status = "Batch processor not running or state unknown."
527
- if batch_processor_task is not None :
528
  if not batch_processor_task.done():
529
- batch_processor_status = "Batch processor is running."
530
  else:
531
- batch_processor_status = "Batch processor task is done (may have completed or errored)."
532
-
 
 
 
 
 
 
533
  if db_ok:
534
- return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
535
  else:
536
- # Return a JSON response for HTTPException as well for consistency
537
  raise HTTPException(
538
  status_code=503,
539
- detail={"message": "Service unavailable.", "database": db_status, "batch_processor": batch_processor_status}
540
  )
541
 
542
 
543
  @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
544
  async def extract_email_data(request: ProcessEmailRequest):
 
 
 
 
545
  if extracted_emails_collection is None:
546
- raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
547
  try:
548
  current_date_val = date.today()
 
549
  extracted_data = await asyncio.to_thread(
550
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
551
  )
 
 
552
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
553
- for appt in extracted_data_dict.get('appointments', []):
554
- if isinstance(appt.get('start_date'), date): appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
555
- if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
556
- for task_item in extracted_data_dict.get('tasks', []):
557
- if isinstance(task_item.get('due_date'), date): task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
 
 
 
 
 
558
 
 
559
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
560
- # Pydantic model expects string ID, convert from ObjectId before assigning if needed
561
- # However, PyObjectId should handle this if result.inserted_id is ObjectId
562
  extracted_data.id = str(result.inserted_id) if isinstance(result.inserted_id, ObjectId) else result.inserted_id
563
  return extracted_data
564
  except ValueError as e:
@@ -570,69 +664,95 @@ async def extract_email_data(request: ProcessEmailRequest):
570
 
571
  @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
572
  async def extract_email_data_excel(request: ProcessEmailRequest):
 
 
 
 
573
  raise HTTPException(status_code=501, detail="Excel functionality is currently disabled.")
574
 
575
 
576
  @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
577
  async def generate_email_reply(request: GenerateReplyRequest):
 
 
 
 
578
  if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
579
- raise HTTPException(status_code=503, detail="Reply generation service not fully initialized. Check server logs.")
580
 
581
  future = asyncio.Future()
582
  current_time = asyncio.get_event_loop().time()
583
 
584
  async with reply_queue_condition:
585
  reply_request_queue.append((request, future, current_time))
586
- reply_queue_condition.notify()
587
 
588
  try:
 
589
  client_timeout = BATCH_TIMEOUT + 10.0
590
  result = await asyncio.wait_for(future, timeout=client_timeout)
591
  return result
592
  except asyncio.TimeoutError:
 
593
  if not future.done():
594
  future.cancel()
595
- raise HTTPException(status_code=504, detail=f"Request timed out after {client_timeout}s waiting for batch processing.")
596
  except Exception as e:
597
  if isinstance(e, HTTPException):
598
- raise e
599
  traceback.print_exc()
600
  raise HTTPException(status_code=500, detail=f"Error processing your reply request: {str(e)}")
601
 
602
 
603
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
604
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
 
 
 
605
  if extracted_emails_collection is None:
606
- raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
 
607
  mongo_query: Dict[str, Any] = {}
608
- if query_params.contact_name: mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"}
609
- if query_params.appointment_title: mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
610
- if query_params.task_title: mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
 
 
 
611
 
612
  if query_params.from_date or query_params.to_date:
613
  date_query: Dict[str, datetime] = {}
614
- if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
615
- if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
616
- if date_query : mongo_query["processed_at"] = date_query
 
 
 
 
 
617
 
618
  try:
 
619
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
620
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
621
-
622
  results = []
623
  for doc_raw in extracted_docs_raw:
624
  # Convert _id to string for Pydantic model if it's an ObjectId
625
  if isinstance(doc_raw.get("_id"), ObjectId):
626
  doc_raw["_id"] = str(doc_raw["_id"])
627
-
628
- # Convert datetime objects back to date objects for Pydantic model fields that are `date`
629
  if 'appointments' in doc_raw:
630
  for appt in doc_raw['appointments']:
631
- if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
632
- if isinstance(appt.get('end_date'), datetime): appt['end_date'] = appt['end_date'].date()
 
 
633
  if 'tasks' in doc_raw:
634
  for task_item in doc_raw['tasks']:
635
- if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
 
636
  results.append(ExtractedData(**doc_raw))
637
  return results
638
  except Exception as e:
@@ -642,19 +762,29 @@ async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = De
642
 
643
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
644
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
 
 
 
645
  if generated_replies_collection is None:
646
- raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.")
 
647
  mongo_query: Dict[str, Any] = {}
648
- if query_params.language: mongo_query["language"] = query_params.language
649
- if query_params.style: mongo_query["style"] = query_params.style
650
- if query_params.tone: mongo_query["tone"] = query_params.tone
 
 
 
651
 
652
  if query_params.from_date or query_params.to_date:
653
  date_query: Dict[str, datetime] = {}
654
- if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
655
- if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
656
- if date_query: mongo_query["generated_at"] = date_query
657
-
 
 
 
658
  try:
659
  cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
660
  generated_docs_raw = await asyncio.to_thread(list, cursor)
@@ -666,5 +796,4 @@ async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = D
666
  return results
667
  except Exception as e:
668
  traceback.print_exc()
669
- raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")
670
-
 
 
1
  # This software is licensed under a **dual-license model**
2
  # For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
3
  # Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
 
19
  from langchain.prompts import PromptTemplate
20
  from langchain_groq import ChatGroq
21
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
22
+ # Ensure you have pydantic >= 2.0.0 for core_schema and typing_extensions for Annotated
23
  from typing_extensions import Annotated
24
+ from pydantic_core import core_schema # Import core_schema for direct use in __get_pydantic_json_schema__
25
 
26
  from pymongo import MongoClient
27
  from pymongo.errors import ConnectionFailure, OperationFailure
28
  from bson import ObjectId
29
 
30
  # --- MongoDB Configuration ---
31
+ # IMPORTANT: Replace with your actual URI.
32
+ # For security, consider using environment variables for your MONGO_URI in production.
33
+ MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net"
34
  DB_NAME = "email_assistant_db"
35
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
36
  GENERATED_REPLIES_COLLECTION = "generated_replies"
37
 
38
+ # Global variables for MongoDB client and collections
39
  client: Optional[MongoClient] = None
40
+ db: Optional[Any] = None
41
  extracted_emails_collection: Optional[Any] = None
42
  generated_replies_collection: Optional[Any] = None
43
 
44
  # --- Pydantic ObjectId Handling ---
45
  class CustomObjectId(str):
46
+ """
47
+ Custom Pydantic type for handling MongoDB ObjectIds.
48
+ It validates that the input is a valid ObjectId string and
49
+ ensures it's represented as a string in JSON Schema.
50
+ """
51
  @classmethod
52
  def __get_validators__(cls):
53
  yield cls.validate
54
 
55
  @classmethod
56
+ def validate(cls, v):
57
+ # Allow None or empty string to pass through for optional fields if not handled by Optional[PyObjectId]
58
+ if v is None or v == "":
59
+ return None # Or raise ValueError if not allowed
60
+
61
+ # Ensure input is a string or convertible to string for ObjectId.is_valid
62
+ if not isinstance(v, (str, ObjectId)):
63
+ raise ValueError("ObjectId must be a string or ObjectId instance")
64
+
65
+ # Convert ObjectId to string if it's already an ObjectId instance
66
+ if isinstance(v, ObjectId):
67
+ return str(v)
68
+
69
+ # Validate string format
70
  if not ObjectId.is_valid(v):
71
+ raise ValueError("Invalid ObjectId format")
72
+ return cls(v) # Return an instance of CustomObjectId (which is a str subclass)
73
 
74
+
75
+ # This method is crucial for Pydantic v2 to generate correct OpenAPI schema
76
  @classmethod
77
+ def __get_pydantic_json_schema__(
78
+ cls, _core_schema: core_schema.CoreSchema, handler
79
+ ) -> Dict[str, Any]:
80
+ # We tell Pydantic that this custom type should be represented as a standard string
81
+ # in the generated JSON Schema (OpenAPI documentation).
82
+ # We use handler to process a simple string schema.
83
+ json_schema = handler(core_schema.str_schema())
84
+ json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" # Add an example for clarity
85
  return json_schema
86
 
87
+ # Annotated type for convenience in models
88
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
89
 
90
+
91
  # ---------------------- Models ----------------------
92
  class Contact(BaseModel):
93
  name: str
 
109
  due_date: date
110
 
111
  class ExtractedData(BaseModel):
112
+ # Use PyObjectId for the _id field
113
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
114
  contacts: List[Contact]
115
  appointments: List[Appointment]
 
118
  processed_at: datetime = Field(default_factory=datetime.utcnow)
119
 
120
  class Config:
121
+ populate_by_name = True # Allow setting 'id' or '_id'
122
+ arbitrary_types_allowed = True # Allow CustomObjectId and ObjectId
123
 
124
+ # Custom serializer for JSON output to ensure ObjectId is converted to string
125
  @model_serializer(when_used='json')
126
  def serialize_model(self):
127
  data = self.model_dump(by_alias=True, exclude_none=True)
128
+ # Ensure _id is a string when serializing to JSON
129
  if "_id" in data and isinstance(data["_id"], ObjectId):
130
  data["_id"] = str(data["_id"])
131
+ # Ensure dates are correctly serialized to ISO format if they are date objects
132
  if 'appointments' in data:
133
  for appt in data['appointments']:
134
  if isinstance(appt.get('start_date'), date):
 
155
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
156
 
157
  class GeneratedReplyData(BaseModel):
158
+ # Use PyObjectId for the _id field
159
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
160
  original_email_text: str
161
  generated_reply_text: str
 
196
 
197
  # ---------------------- Utility Functions ----------------------
198
  def extract_last_json_block(text: str) -> Optional[str]:
199
+ """
200
+ Extracts the last JSON block enclosed in ```json``` from a string,
201
+ or a standalone JSON object if no code block is found.
202
+ """
203
  pattern = r'```json\s*(.*?)\s*```'
204
  matches = re.findall(pattern, text, re.DOTALL)
205
  if matches:
206
  return matches[-1].strip()
207
+ # Fallback: try to find a standalone JSON object
208
  match = re.search(r'\{.*\}', text, re.DOTALL)
209
  if match:
210
  return match.group(0)
211
  return None
212
 
213
+ def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
214
+ """
215
+ Parses a date string, handling 'today', 'tomorrow', and YYYY-MM-DD format.
216
+ Returns None if input is None or cannot be parsed into a date.
217
+ """
218
+ if not date_str:
219
+ return None
220
  date_str_lower = date_str.lower().strip()
221
+ if date_str_lower == "today":
222
+ return current_date
223
+ if date_str_lower == "tomorrow":
224
+ return current_date + timedelta(days=1)
225
  try:
226
  return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
227
  except ValueError:
228
+ return None # Return None if parsing fails, let normalize_llm_output handle defaults
 
 
 
 
229
 
230
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
231
+ """
232
+ Normalizes and validates LLM extracted data into ExtractedData Pydantic model.
233
+ Handles defaults for dates and name splitting.
234
+ """
235
  def split_name(full_name: str) -> tuple[str, str]:
236
  parts = full_name.strip().split()
237
  name = parts[0] if parts else ""
 
245
 
246
  appointments_data = []
247
  for a in data.get("appointments", []):
248
+ # Default start_date to current_date if not provided or invalid
249
+ start_date_val = parse_date(a.get("start_date"), current_date) or current_date
250
+ # end_date remains optional
251
+ end_date_val = parse_date(a.get("end_date"), current_date)
252
 
253
  appointments_data.append(Appointment(
254
  title=a.get("title", "Untitled"), description=a.get("description", "No description"),
 
258
 
259
  tasks_data = []
260
  for t in data.get("tasks", []):
261
+ # Default due_date to current_date if not provided or invalid
262
+ due_date_val = parse_date(t.get("due_date"), current_date) or current_date
263
  tasks_data.append(Task(
264
  task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
265
  due_date=due_date_val
 
268
 
269
  # ---------------------- Core Logic (Internal Functions) ----------------------
270
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
271
+ """
272
+ Internal function to process email text using LLM and extract structured data.
273
+ """
274
+ if not email_text:
275
+ raise ValueError("Email text cannot be empty for processing.")
276
+
277
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
278
+
279
  prompt_today_str = current_date.isoformat()
280
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
281
+
282
  prompt_template_str = f"""
283
  You are an expert email assistant tasked with extracting structured information from an Italian email.
284
 
 
319
  prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
320
  chain = prompt_template | llm
321
  try:
 
322
  llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
323
  llm_output_str = llm_output.content
 
324
 
325
  json_str = extract_last_json_block(llm_output_str)
 
326
 
327
+ if not json_str:
328
+ raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
329
  json_data = json.loads(json_str)
 
330
 
331
  extracted_data = normalize_llm_output(json_data, current_date, email_text)
 
332
  return extracted_data
333
  except json.JSONDecodeError as e:
 
 
334
  raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
335
  except Exception as e:
336
  traceback.print_exc()
 
340
  email_text: str, api_key: str, language: Literal["Italian", "English"],
341
  length: str, style: str, tone: str, emoji: str
342
  ) -> str:
343
+ """
344
+ Internal function to generate a reply to an email using LLM.
345
+ """
346
+ if not email_text:
347
+ return "Cannot generate reply for empty email text."
348
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
 
349
  prompt_template_str="""
350
  You are an assistant that helps reply to emails.
351
 
 
396
  "tone": request_data.tone,
397
  "emoji": request_data.emoji,
398
  }
399
+ # Use await asyncio.to_thread for blocking MongoDB operations
400
  cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
401
 
402
  if cached_reply_doc:
 
405
  "stored_id": str(cached_reply_doc["_id"]),
406
  "cached": True
407
  }
408
+ if not future.done():
409
+ future.set_result(response)
410
  return
411
 
412
  reply_content = await asyncio.to_thread(
 
429
  tone=request_data.tone,
430
  emoji=request_data.emoji
431
  )
432
+ # Use model_dump for Pydantic v2
433
  reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
434
 
435
  insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
 
440
  "stored_id": stored_id,
441
  "cached": False
442
  }
443
+ if not future.done():
444
+ future.set_result(final_response)
445
 
446
  except Exception as e:
447
  traceback.print_exc()
 
456
  async with reply_queue_condition:
457
  if not reply_request_queue:
458
  await reply_queue_condition.wait()
459
+ # After waking up, re-check if queue is still empty (e.g., if notified but then emptied by another worker)
460
  if not reply_request_queue:
461
  continue
462
 
 
480
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
481
  await asyncio.gather(*tasks)
482
  else:
483
+ # Short sleep to prevent busy-waiting if queue is empty but not waiting
484
+ await asyncio.sleep(0.001)
485
 
486
 
487
  # ---------------------- FastAPI Application ----------------------
488
  app = FastAPI(
489
  title="Email Assistant API",
490
  description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
491
+ version="1.1.0",
492
+ docs_url="/", # Sets Swagger UI to be the root path
493
  redoc_url="/redoc"
494
  )
495
 
496
  # --- Global Exception Handler ---
497
+ # Catch Starlette HTTPExceptions (FastAPI uses these internally)
498
  @app.exception_handler(StarletteHTTPException)
499
  async def custom_http_exception_handler_wrapper(request, exc):
500
  return await http_exception_handler(request, exc)
501
 
502
+ # Catch all other unhandled exceptions
503
  @app.exception_handler(Exception)
504
  async def global_exception_handler_wrapper(request, exc):
505
  print(f"Unhandled exception caught by global handler for request: {request.url}")
506
+ traceback.print_exc() # Print traceback to console for debugging
507
+ # Return a JSON response for consistency, even for unhandled errors
508
  return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}"}), status_code=500, media_type="application/json")
509
 
510
 
 
513
  async def startup_event():
514
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
515
  try:
516
+ # Connect to MongoDB
517
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
518
+ client.admin.command('ping') # Test connection
519
  db = client[DB_NAME]
520
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
521
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
522
  print(f"Successfully connected to MongoDB: {DB_NAME}")
523
 
524
+ # Start the batch processor task if not already running
525
  if batch_processor_task is None:
526
+ # Use asyncio.create_task for proper task management in an async app
527
+ batch_processor_task = asyncio.create_task(process_reply_batches())
528
  print("Batch processor task for replies started.")
529
 
530
  except (ConnectionFailure, OperationFailure) as e:
531
  print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
532
+ # Ensure all DB related globals are reset to None if connection fails
533
  client = None
534
  db = None
535
  extracted_emails_collection = None
 
542
  extracted_emails_collection = None
543
  generated_replies_collection = None
544
  finally:
545
+ # Final check and logging for MongoDB connection status
546
  if client is not None and db is not None:
547
  try:
548
+ # One last ping to confirm connection before app fully starts
549
  client.admin.command('ping')
550
+ except Exception as e:
551
+ print(f"MongoDB ping failed after initial connection attempt during finally block: {e}")
552
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
553
  else:
554
+ print("MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
555
+ # Ensure all are None if one is, to avoid partial state
556
+ if client is None or db is None:
557
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
558
+ print("FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
559
 
560
 
561
  @app.on_event("shutdown")
562
  async def shutdown_event():
563
  global client, batch_processor_task
564
+ # Cancel the batch processor task
565
  if batch_processor_task:
566
  batch_processor_task.cancel()
567
  try:
568
+ # Await the task to ensure it has a chance to clean up/handle cancellation
569
  await batch_processor_task
570
  except asyncio.CancelledError:
571
+ print("Batch processor task for replies cancelled during shutdown.")
572
  except Exception as e:
573
  print(f"Error during batch processor task shutdown: {e}")
574
  traceback.print_exc()
575
  batch_processor_task = None
576
 
577
+ # Close MongoDB client connection
578
  if client:
579
  client.close()
580
  print("FastAPI app shutting down. MongoDB client closed.")
581
 
582
 
583
+ # --- API Endpoints ---
584
  @app.get("/health", summary="Health Check")
585
  async def health_check():
586
+ """
587
+ Checks the health of the API, including MongoDB connection and batch processor status.
588
+ """
589
+ db_status = "MongoDB not connected."
590
  db_ok = False
591
+ if client is not None and db is not None:
592
  try:
593
+ # Attempt a simple database operation to confirm connectivity
594
+ await asyncio.to_thread(db.list_collection_names)
595
  db_status = "MongoDB connection OK."
596
  db_ok = True
597
  except Exception as e:
598
  db_status = f"MongoDB connection error: {e}"
599
+ db_ok = False # Explicitly set to False on error
600
 
601
+ batch_processor_status = "Batch processor not running."
602
+ if batch_processor_task is not None:
603
  if not batch_processor_task.done():
604
+ batch_processor_status = "Batch processor is running."
605
  else:
606
+ # Check if it finished with an exception
607
+ if batch_processor_task.exception():
608
+ batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}"
609
+ else:
610
+ batch_processor_status = "Batch processor task is done (may have completed or cancelled)."
611
+ else:
612
+ batch_processor_status = "Batch processor task has not been initialized."
613
+
614
  if db_ok:
615
+ return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
616
  else:
617
+ # If DB is not OK, return a 503 Service Unavailable
618
  raise HTTPException(
619
  status_code=503,
620
+ detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status}
621
  )
622
 
623
 
624
  @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
625
  async def extract_email_data(request: ProcessEmailRequest):
626
+ """
627
+ Receives an email, extracts contacts, appointments, and tasks using an LLM,
628
+ and stores the extracted data in MongoDB.
629
+ """
630
  if extracted_emails_collection is None:
631
+ raise HTTPException(status_code=503, detail="MongoDB not available for extracted email storage.")
632
  try:
633
  current_date_val = date.today()
634
+ # Call the internal processing function in a separate thread to not block the event loop
635
  extracted_data = await asyncio.to_thread(
636
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
637
  )
638
+
639
+ # Prepare data for MongoDB insertion: convert date objects to datetime for storage
640
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
641
+ if 'appointments' in extracted_data_dict:
642
+ for appt in extracted_data_dict['appointments']:
643
+ if isinstance(appt.get('start_date'), date):
644
+ appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
645
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
646
+ appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
647
+ if 'tasks' in extracted_data_dict:
648
+ for task_item in extracted_data_dict['tasks']:
649
+ if isinstance(task_item.get('due_date'), date):
650
+ task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
651
 
652
+ # Insert into MongoDB
653
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
654
+
655
+ # Update the Pydantic model's ID with the generated MongoDB ObjectId for the response
656
  extracted_data.id = str(result.inserted_id) if isinstance(result.inserted_id, ObjectId) else result.inserted_id
657
  return extracted_data
658
  except ValueError as e:
 
664
 
665
  @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
666
  async def extract_email_data_excel(request: ProcessEmailRequest):
667
+ """
668
+ Placeholder for future functionality to extract data and provide as an Excel download.
669
+ Currently disabled.
670
+ """
671
  raise HTTPException(status_code=501, detail="Excel functionality is currently disabled.")
672
 
673
 
674
  @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
675
  async def generate_email_reply(request: GenerateReplyRequest):
676
+ """
677
+ Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
678
+ Uses a batch processing system with caching for efficiency.
679
+ """
680
  if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
681
+ raise HTTPException(status_code=503, detail="Reply generation service not fully initialized. Check server logs for database or batch processor errors.")
682
 
683
  future = asyncio.Future()
684
  current_time = asyncio.get_event_loop().time()
685
 
686
  async with reply_queue_condition:
687
  reply_request_queue.append((request, future, current_time))
688
+ reply_queue_condition.notify() # Notify the batch processor that there's a new item
689
 
690
  try:
691
+ # A generous timeout for the client waiting for a response
692
  client_timeout = BATCH_TIMEOUT + 10.0
693
  result = await asyncio.wait_for(future, timeout=client_timeout)
694
  return result
695
  except asyncio.TimeoutError:
696
+ # If the client times out, cancel the future to clean up
697
  if not future.done():
698
  future.cancel()
699
+ raise HTTPException(status_code=504, detail=f"Request timed out after {client_timeout}s waiting for batch processing. This may indicate high load or an issue with the batch processor.")
700
  except Exception as e:
701
  if isinstance(e, HTTPException):
702
+ raise e # Re-raise FastAPI HTTPExceptions directly
703
  traceback.print_exc()
704
  raise HTTPException(status_code=500, detail=f"Error processing your reply request: {str(e)}")
705
 
706
 
707
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
708
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
709
+ """
710
+ Queries stored extracted email data from MongoDB based on various filters.
711
+ """
712
  if extracted_emails_collection is None:
713
+ raise HTTPException(status_code=503, detail="MongoDB not available for querying extracted emails.")
714
+
715
  mongo_query: Dict[str, Any] = {}
716
+ if query_params.contact_name:
717
+ mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"} # Case-insensitive regex
718
+ if query_params.appointment_title:
719
+ mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
720
+ if query_params.task_title:
721
+ mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
722
 
723
  if query_params.from_date or query_params.to_date:
724
  date_query: Dict[str, datetime] = {}
725
+ if query_params.from_date:
726
+ # Query for documents processed on or after the start of from_date
727
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
728
+ if query_params.to_date:
729
+ # Query for documents processed before the start of the day *after* to_date
730
+ date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
731
+ if date_query:
732
+ mongo_query["processed_at"] = date_query
733
 
734
  try:
735
+ # Sort by processed_at in descending order (most recent first)
736
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
737
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
738
+
739
  results = []
740
  for doc_raw in extracted_docs_raw:
741
  # Convert _id to string for Pydantic model if it's an ObjectId
742
  if isinstance(doc_raw.get("_id"), ObjectId):
743
  doc_raw["_id"] = str(doc_raw["_id"])
744
+
745
+ # Convert datetime objects from MongoDB back to date objects for Pydantic model fields that are `date`
746
  if 'appointments' in doc_raw:
747
  for appt in doc_raw['appointments']:
748
+ if isinstance(appt.get('start_date'), datetime):
749
+ appt['start_date'] = appt['start_date'].date()
750
+ if isinstance(appt.get('end_date'), datetime) and appt.get('end_date') is not None:
751
+ appt['end_date'] = appt['end_date'].date()
752
  if 'tasks' in doc_raw:
753
  for task_item in doc_raw['tasks']:
754
+ if isinstance(task_item.get('due_date'), datetime):
755
+ task_item['due_date'] = task_item['due_date'].date()
756
  results.append(ExtractedData(**doc_raw))
757
  return results
758
  except Exception as e:
 
762
 
763
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
764
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
765
+ """
766
+ Queries stored generated email replies from MongoDB based on various filters.
767
+ """
768
  if generated_replies_collection is None:
769
+ raise HTTPException(status_code=503, detail="MongoDB not available for querying generated replies.")
770
+
771
  mongo_query: Dict[str, Any] = {}
772
+ if query_params.language:
773
+ mongo_query["language"] = query_params.language
774
+ if query_params.style:
775
+ mongo_query["style"] = query_params.style
776
+ if query_params.tone:
777
+ mongo_query["tone"] = query_params.tone
778
 
779
  if query_params.from_date or query_params.to_date:
780
  date_query: Dict[str, datetime] = {}
781
+ if query_params.from_date:
782
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
783
+ if query_params.to_date:
784
+ date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
785
+ if date_query:
786
+ mongo_query["generated_at"] = date_query
787
+
788
  try:
789
  cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
790
  generated_docs_raw = await asyncio.to_thread(list, cursor)
 
796
  return results
797
  except Exception as e:
798
  traceback.print_exc()
799
+ raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")