precison9 commited on
Commit
1fc423c
·
verified ·
1 Parent(s): 86ca6e5

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +92 -217
flask_Character.py CHANGED
@@ -1,9 +1,3 @@
1
- # This software is licensed under a **dual-license model**
2
- # For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
3
- # Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
4
- import os
5
- os.environ["NUMBA_CACHE_DIR"] = "/tmp/numba_cache"
6
- os.environ["NUMBA_DISABLE_CACHE"] = "1"
7
  import json
8
  import re
9
  from datetime import date, datetime, timedelta
@@ -19,75 +13,45 @@ from starlette.exceptions import HTTPException as StarletteHTTPException
19
  from langchain.prompts import PromptTemplate
20
  from langchain_groq import ChatGroq
21
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
22
- # Ensure you have pydantic >= 2.0.0 for core_schema and typing_extensions for Annotated
23
  from typing_extensions import Annotated
24
- from pydantic_core import core_schema # Import core_schema for direct use in __get_pydantic_json_schema__
25
 
26
  from pymongo import MongoClient
27
  from pymongo.errors import ConnectionFailure, OperationFailure
28
  from bson import ObjectId
29
 
30
  # --- MongoDB Configuration ---
31
- # IMPORTANT: Replace with your actual URI.
32
- # For security, consider using environment variables for your MONGO_URI in production.
33
- MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net"
34
  DB_NAME = "email_assistant_db"
35
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
36
  GENERATED_REPLIES_COLLECTION = "generated_replies"
37
 
38
- # Global variables for MongoDB client and collections
39
  client: Optional[MongoClient] = None
40
- db: Optional[Any] = None
41
  extracted_emails_collection: Optional[Any] = None
42
  generated_replies_collection: Optional[Any] = None
43
 
44
  # --- Pydantic ObjectId Handling ---
45
  class CustomObjectId(str):
46
- """
47
- Custom Pydantic type for handling MongoDB ObjectIds.
48
- It validates that the input is a valid ObjectId string and
49
- ensures it's represented as a string in JSON Schema.
50
- """
51
  @classmethod
52
  def __get_validators__(cls):
53
  yield cls.validate
54
 
55
  @classmethod
56
- def validate(cls, v):
57
- # Allow None or empty string to pass through for optional fields if not handled by Optional[PyObjectId]
58
- if v is None or v == "":
59
- return None # Or raise ValueError if not allowed
60
-
61
- # Ensure input is a string or convertible to string for ObjectId.is_valid
62
- if not isinstance(v, (str, ObjectId)):
63
- raise ValueError("ObjectId must be a string or ObjectId instance")
64
-
65
- # Convert ObjectId to string if it's already an ObjectId instance
66
- if isinstance(v, ObjectId):
67
- return str(v)
68
-
69
- # Validate string format
70
  if not ObjectId.is_valid(v):
71
- raise ValueError("Invalid ObjectId format")
72
- return cls(v) # Return an instance of CustomObjectId (which is a str subclass)
73
 
74
-
75
- # This method is crucial for Pydantic v2 to generate correct OpenAPI schema
76
  @classmethod
77
- def __get_pydantic_json_schema__(
78
- cls, _core_schema: core_schema.CoreSchema, handler
79
- ) -> Dict[str, Any]:
80
- # We tell Pydantic that this custom type should be represented as a standard string
81
- # in the generated JSON Schema (OpenAPI documentation).
82
- # We use handler to process a simple string schema.
83
- json_schema = handler(core_schema.str_schema())
84
- json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" # Add an example for clarity
85
  return json_schema
86
 
87
- # Annotated type for convenience in models
88
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
89
 
90
-
91
  # ---------------------- Models ----------------------
92
  class Contact(BaseModel):
93
  name: str
@@ -109,7 +73,6 @@ class Task(BaseModel):
109
  due_date: date
110
 
111
  class ExtractedData(BaseModel):
112
- # Use PyObjectId for the _id field
113
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
114
  contacts: List[Contact]
115
  appointments: List[Appointment]
@@ -118,17 +81,14 @@ class ExtractedData(BaseModel):
118
  processed_at: datetime = Field(default_factory=datetime.utcnow)
119
 
120
  class Config:
121
- populate_by_name = True # Allow setting 'id' or '_id'
122
- arbitrary_types_allowed = True # Allow CustomObjectId and ObjectId
123
 
124
- # Custom serializer for JSON output to ensure ObjectId is converted to string
125
  @model_serializer(when_used='json')
126
  def serialize_model(self):
127
  data = self.model_dump(by_alias=True, exclude_none=True)
128
- # Ensure _id is a string when serializing to JSON
129
  if "_id" in data and isinstance(data["_id"], ObjectId):
130
  data["_id"] = str(data["_id"])
131
- # Ensure dates are correctly serialized to ISO format if they are date objects
132
  if 'appointments' in data:
133
  for appt in data['appointments']:
134
  if isinstance(appt.get('start_date'), date):
@@ -155,7 +115,6 @@ class GenerateReplyRequest(BaseModel):
155
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
156
 
157
  class GeneratedReplyData(BaseModel):
158
- # Use PyObjectId for the _id field
159
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
160
  original_email_text: str
161
  generated_reply_text: str
@@ -196,42 +155,30 @@ class GeneratedReplyQuery(BaseModel):
196
 
197
  # ---------------------- Utility Functions ----------------------
198
  def extract_last_json_block(text: str) -> Optional[str]:
199
- """
200
- Extracts the last JSON block enclosed in ```json``` from a string,
201
- or a standalone JSON object if no code block is found.
202
- """
203
  pattern = r'```json\s*(.*?)\s*```'
204
  matches = re.findall(pattern, text, re.DOTALL)
205
  if matches:
206
  return matches[-1].strip()
207
- # Fallback: try to find a standalone JSON object
208
  match = re.search(r'\{.*\}', text, re.DOTALL)
209
  if match:
210
  return match.group(0)
211
  return None
212
 
213
- def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
214
- """
215
- Parses a date string, handling 'today', 'tomorrow', and YYYY-MM-DD format.
216
- Returns None if input is None or cannot be parsed into a date.
217
- """
218
- if not date_str:
219
- return None
220
  date_str_lower = date_str.lower().strip()
221
- if date_str_lower == "today":
222
- return current_date
223
- if date_str_lower == "tomorrow":
224
- return current_date + timedelta(days=1)
225
  try:
226
  return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
227
  except ValueError:
228
- return None # Return None if parsing fails, let normalize_llm_output handle defaults
 
 
 
 
229
 
230
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
231
- """
232
- Normalizes and validates LLM extracted data into ExtractedData Pydantic model.
233
- Handles defaults for dates and name splitting.
234
- """
235
  def split_name(full_name: str) -> tuple[str, str]:
236
  parts = full_name.strip().split()
237
  name = parts[0] if parts else ""
@@ -245,10 +192,8 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
245
 
246
  appointments_data = []
247
  for a in data.get("appointments", []):
248
- # Default start_date to current_date if not provided or invalid
249
- start_date_val = parse_date(a.get("start_date"), current_date) or current_date
250
- # end_date remains optional
251
- end_date_val = parse_date(a.get("end_date"), current_date)
252
 
253
  appointments_data.append(Appointment(
254
  title=a.get("title", "Untitled"), description=a.get("description", "No description"),
@@ -258,8 +203,7 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
258
 
259
  tasks_data = []
260
  for t in data.get("tasks", []):
261
- # Default due_date to current_date if not provided or invalid
262
- due_date_val = parse_date(t.get("due_date"), current_date) or current_date
263
  tasks_data.append(Task(
264
  task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
265
  due_date=due_date_val
@@ -268,17 +212,11 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
268
 
269
  # ---------------------- Core Logic (Internal Functions) ----------------------
270
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
271
- """
272
- Internal function to process email text using LLM and extract structured data.
273
- """
274
- if not email_text:
275
- raise ValueError("Email text cannot be empty for processing.")
276
-
277
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
278
-
279
  prompt_today_str = current_date.isoformat()
280
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
281
-
282
  prompt_template_str = f"""
283
  You are an expert email assistant tasked with extracting structured information from an Italian email.
284
 
@@ -319,18 +257,24 @@ Email:
319
  prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
320
  chain = prompt_template | llm
321
  try:
 
322
  llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
323
  llm_output_str = llm_output.content
 
324
 
325
  json_str = extract_last_json_block(llm_output_str)
 
326
 
327
- if not json_str:
328
- raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
329
  json_data = json.loads(json_str)
 
330
 
331
  extracted_data = normalize_llm_output(json_data, current_date, email_text)
 
332
  return extracted_data
333
  except json.JSONDecodeError as e:
 
 
334
  raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
335
  except Exception as e:
336
  traceback.print_exc()
@@ -340,12 +284,9 @@ def _generate_response_internal(
340
  email_text: str, api_key: str, language: Literal["Italian", "English"],
341
  length: str, style: str, tone: str, emoji: str
342
  ) -> str:
343
- """
344
- Internal function to generate a reply to an email using LLM.
345
- """
346
- if not email_text:
347
- return "Cannot generate reply for empty email text."
348
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
 
349
  prompt_template_str="""
350
  You are an assistant that helps reply to emails.
351
 
@@ -371,7 +312,7 @@ Write only the reply body. Do not repeat the email or mention any instruction.
371
 
372
  # --- Batching and Caching Configuration ---
373
  MAX_BATCH_SIZE = 20
374
- BATCH_TIMEOUT = 9000000 # seconds
375
 
376
  reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
377
  reply_queue_lock = asyncio.Lock()
@@ -396,7 +337,6 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
396
  "tone": request_data.tone,
397
  "emoji": request_data.emoji,
398
  }
399
- # Use await asyncio.to_thread for blocking MongoDB operations
400
  cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
401
 
402
  if cached_reply_doc:
@@ -405,8 +345,7 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
405
  "stored_id": str(cached_reply_doc["_id"]),
406
  "cached": True
407
  }
408
- if not future.done():
409
- future.set_result(response)
410
  return
411
 
412
  reply_content = await asyncio.to_thread(
@@ -429,7 +368,6 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
429
  tone=request_data.tone,
430
  emoji=request_data.emoji
431
  )
432
- # Use model_dump for Pydantic v2
433
  reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
434
 
435
  insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
@@ -440,8 +378,7 @@ async def handle_single_reply_request(request_data: GenerateReplyRequest, future
440
  "stored_id": stored_id,
441
  "cached": False
442
  }
443
- if not future.done():
444
- future.set_result(final_response)
445
 
446
  except Exception as e:
447
  traceback.print_exc()
@@ -456,7 +393,6 @@ async def process_reply_batches():
456
  async with reply_queue_condition:
457
  if not reply_request_queue:
458
  await reply_queue_condition.wait()
459
- # After waking up, re-check if queue is still empty (e.g., if notified but then emptied by another worker)
460
  if not reply_request_queue:
461
  continue
462
 
@@ -480,31 +416,28 @@ async def process_reply_batches():
480
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
481
  await asyncio.gather(*tasks)
482
  else:
483
- # Short sleep to prevent busy-waiting if queue is empty but not waiting
484
- await asyncio.sleep(0.001)
485
 
486
 
487
  # ---------------------- FastAPI Application ----------------------
488
  app = FastAPI(
489
  title="Email Assistant API",
490
  description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
491
- version="1.1.0",
492
- docs_url="/", # Sets Swagger UI to be the root path
493
  redoc_url="/redoc"
494
  )
495
 
496
  # --- Global Exception Handler ---
497
- # Catch Starlette HTTPExceptions (FastAPI uses these internally)
498
  @app.exception_handler(StarletteHTTPException)
499
  async def custom_http_exception_handler_wrapper(request, exc):
500
  return await http_exception_handler(request, exc)
501
 
502
- # Catch all other unhandled exceptions
503
  @app.exception_handler(Exception)
504
  async def global_exception_handler_wrapper(request, exc):
505
  print(f"Unhandled exception caught by global handler for request: {request.url}")
506
- traceback.print_exc() # Print traceback to console for debugging
507
- # Return a JSON response for consistency, even for unhandled errors
508
  return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}"}), status_code=500, media_type="application/json")
509
 
510
 
@@ -513,23 +446,20 @@ async def global_exception_handler_wrapper(request, exc):
513
  async def startup_event():
514
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
515
  try:
516
- # Connect to MongoDB
517
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
518
- client.admin.command('ping') # Test connection
519
  db = client[DB_NAME]
520
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
521
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
522
  print(f"Successfully connected to MongoDB: {DB_NAME}")
523
 
524
- # Start the batch processor task if not already running
525
  if batch_processor_task is None:
526
- # Use asyncio.create_task for proper task management in an async app
527
- batch_processor_task = asyncio.create_task(process_reply_batches())
528
  print("Batch processor task for replies started.")
529
 
530
  except (ConnectionFailure, OperationFailure) as e:
531
  print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
532
- # Ensure all DB related globals are reset to None if connection fails
533
  client = None
534
  db = None
535
  extracted_emails_collection = None
@@ -542,117 +472,87 @@ async def startup_event():
542
  extracted_emails_collection = None
543
  generated_replies_collection = None
544
  finally:
545
- # Final check and logging for MongoDB connection status
546
  if client is not None and db is not None:
547
  try:
548
- # One last ping to confirm connection before app fully starts
549
  client.admin.command('ping')
550
- except Exception as e:
551
- print(f"MongoDB ping failed after initial connection attempt during finally block: {e}")
552
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
553
  else:
554
- print("MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
555
- # Ensure all are None if one is, to avoid partial state
556
- if client is None or db is None:
557
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
558
- print("FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
559
 
560
 
561
  @app.on_event("shutdown")
562
  async def shutdown_event():
563
  global client, batch_processor_task
564
- # Cancel the batch processor task
565
  if batch_processor_task:
566
  batch_processor_task.cancel()
567
  try:
568
- # Await the task to ensure it has a chance to clean up/handle cancellation
569
  await batch_processor_task
570
  except asyncio.CancelledError:
571
- print("Batch processor task for replies cancelled during shutdown.")
572
  except Exception as e:
573
  print(f"Error during batch processor task shutdown: {e}")
574
  traceback.print_exc()
575
  batch_processor_task = None
576
 
577
- # Close MongoDB client connection
578
  if client:
579
  client.close()
580
  print("FastAPI app shutting down. MongoDB client closed.")
581
 
582
 
583
- # --- API Endpoints ---
584
  @app.get("/health", summary="Health Check")
585
  async def health_check():
586
- """
587
- Checks the health of the API, including MongoDB connection and batch processor status.
588
- """
589
- db_status = "MongoDB not connected."
590
  db_ok = False
591
- if client is not None and db is not None:
592
  try:
593
- # Attempt a simple database operation to confirm connectivity
594
- await asyncio.to_thread(db.list_collection_names)
595
  db_status = "MongoDB connection OK."
596
  db_ok = True
597
  except Exception as e:
598
  db_status = f"MongoDB connection error: {e}"
599
- db_ok = False # Explicitly set to False on error
600
 
601
- batch_processor_status = "Batch processor not running."
602
- if batch_processor_task is not None:
603
  if not batch_processor_task.done():
604
- batch_processor_status = "Batch processor is running."
605
  else:
606
- # Check if it finished with an exception
607
- if batch_processor_task.exception():
608
- batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}"
609
- else:
610
- batch_processor_status = "Batch processor task is done (may have completed or cancelled)."
611
- else:
612
- batch_processor_status = "Batch processor task has not been initialized."
613
-
614
  if db_ok:
615
- return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
616
  else:
617
- # If DB is not OK, return a 503 Service Unavailable
618
  raise HTTPException(
619
  status_code=503,
620
- detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status}
621
  )
622
 
623
 
624
  @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
625
  async def extract_email_data(request: ProcessEmailRequest):
626
- """
627
- Receives an email, extracts contacts, appointments, and tasks using an LLM,
628
- and stores the extracted data in MongoDB.
629
- """
630
  if extracted_emails_collection is None:
631
- raise HTTPException(status_code=503, detail="MongoDB not available for extracted email storage.")
632
  try:
633
  current_date_val = date.today()
634
- # Call the internal processing function in a separate thread to not block the event loop
635
  extracted_data = await asyncio.to_thread(
636
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
637
  )
638
-
639
- # Prepare data for MongoDB insertion: convert date objects to datetime for storage
640
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
641
- if 'appointments' in extracted_data_dict:
642
- for appt in extracted_data_dict['appointments']:
643
- if isinstance(appt.get('start_date'), date):
644
- appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
645
- if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
646
- appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
647
- if 'tasks' in extracted_data_dict:
648
- for task_item in extracted_data_dict['tasks']:
649
- if isinstance(task_item.get('due_date'), date):
650
- task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
651
 
652
- # Insert into MongoDB
653
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
654
-
655
- # Update the Pydantic model's ID with the generated MongoDB ObjectId for the response
656
  extracted_data.id = str(result.inserted_id) if isinstance(result.inserted_id, ObjectId) else result.inserted_id
657
  return extracted_data
658
  except ValueError as e:
@@ -664,95 +564,69 @@ async def extract_email_data(request: ProcessEmailRequest):
664
 
665
  @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
666
  async def extract_email_data_excel(request: ProcessEmailRequest):
667
- """
668
- Placeholder for future functionality to extract data and provide as an Excel download.
669
- Currently disabled.
670
- """
671
  raise HTTPException(status_code=501, detail="Excel functionality is currently disabled.")
672
 
673
 
674
  @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
675
  async def generate_email_reply(request: GenerateReplyRequest):
676
- """
677
- Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
678
- Uses a batch processing system with caching for efficiency.
679
- """
680
  if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
681
- raise HTTPException(status_code=503, detail="Reply generation service not fully initialized. Check server logs for database or batch processor errors.")
682
 
683
  future = asyncio.Future()
684
  current_time = asyncio.get_event_loop().time()
685
 
686
  async with reply_queue_condition:
687
  reply_request_queue.append((request, future, current_time))
688
- reply_queue_condition.notify() # Notify the batch processor that there's a new item
689
 
690
  try:
691
- # A generous timeout for the client waiting for a response
692
  client_timeout = BATCH_TIMEOUT + 10.0
693
  result = await asyncio.wait_for(future, timeout=client_timeout)
694
  return result
695
  except asyncio.TimeoutError:
696
- # If the client times out, cancel the future to clean up
697
  if not future.done():
698
  future.cancel()
699
- raise HTTPException(status_code=504, detail=f"Request timed out after {client_timeout}s waiting for batch processing. This may indicate high load or an issue with the batch processor.")
700
  except Exception as e:
701
  if isinstance(e, HTTPException):
702
- raise e # Re-raise FastAPI HTTPExceptions directly
703
  traceback.print_exc()
704
  raise HTTPException(status_code=500, detail=f"Error processing your reply request: {str(e)}")
705
 
706
 
707
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
708
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
709
- """
710
- Queries stored extracted email data from MongoDB based on various filters.
711
- """
712
  if extracted_emails_collection is None:
713
- raise HTTPException(status_code=503, detail="MongoDB not available for querying extracted emails.")
714
-
715
  mongo_query: Dict[str, Any] = {}
716
- if query_params.contact_name:
717
- mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"} # Case-insensitive regex
718
- if query_params.appointment_title:
719
- mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
720
- if query_params.task_title:
721
- mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
722
 
723
  if query_params.from_date or query_params.to_date:
724
  date_query: Dict[str, datetime] = {}
725
- if query_params.from_date:
726
- # Query for documents processed on or after the start of from_date
727
- date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
728
- if query_params.to_date:
729
- # Query for documents processed before the start of the day *after* to_date
730
- date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
731
- if date_query:
732
- mongo_query["processed_at"] = date_query
733
 
734
  try:
735
- # Sort by processed_at in descending order (most recent first)
736
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
737
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
738
-
739
  results = []
740
  for doc_raw in extracted_docs_raw:
741
  # Convert _id to string for Pydantic model if it's an ObjectId
742
  if isinstance(doc_raw.get("_id"), ObjectId):
743
  doc_raw["_id"] = str(doc_raw["_id"])
744
-
745
- # Convert datetime objects from MongoDB back to date objects for Pydantic model fields that are `date`
746
  if 'appointments' in doc_raw:
747
  for appt in doc_raw['appointments']:
748
- if isinstance(appt.get('start_date'), datetime):
749
- appt['start_date'] = appt['start_date'].date()
750
- if isinstance(appt.get('end_date'), datetime) and appt.get('end_date') is not None:
751
- appt['end_date'] = appt['end_date'].date()
752
  if 'tasks' in doc_raw:
753
  for task_item in doc_raw['tasks']:
754
- if isinstance(task_item.get('due_date'), datetime):
755
- task_item['due_date'] = task_item['due_date'].date()
756
  results.append(ExtractedData(**doc_raw))
757
  return results
758
  except Exception as e:
@@ -760,7 +634,7 @@ async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = De
760
  raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
761
 
762
 
763
- app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
764
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
765
  if generated_replies_collection is None:
766
  raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.")
@@ -787,3 +661,4 @@ async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = D
787
  except Exception as e:
788
  traceback.print_exc()
789
  raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")
 
 
 
 
 
 
 
 
1
  import json
2
  import re
3
  from datetime import date, datetime, timedelta
 
13
  from langchain.prompts import PromptTemplate
14
  from langchain_groq import ChatGroq
15
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
 
16
  from typing_extensions import Annotated
17
+ import uvicorn
18
 
19
  from pymongo import MongoClient
20
  from pymongo.errors import ConnectionFailure, OperationFailure
21
  from bson import ObjectId
22
 
23
  # --- MongoDB Configuration ---
24
+ MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net" # Replace with your actual URI
 
 
25
  DB_NAME = "email_assistant_db"
26
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
27
  GENERATED_REPLIES_COLLECTION = "generated_replies"
28
 
 
29
  client: Optional[MongoClient] = None
30
+ db: Optional[Any] = None # Changed to Optional[Any] as Database type is not directly imported for annotation here
31
  extracted_emails_collection: Optional[Any] = None
32
  generated_replies_collection: Optional[Any] = None
33
 
34
  # --- Pydantic ObjectId Handling ---
35
  class CustomObjectId(str):
 
 
 
 
 
36
  @classmethod
37
  def __get_validators__(cls):
38
  yield cls.validate
39
 
40
  @classmethod
41
+ def validate(cls, v, info):
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  if not ObjectId.is_valid(v):
43
+ raise ValueError("Invalid ObjectId")
44
+ return str(v)
45
 
 
 
46
  @classmethod
47
+ def __get_pydantic_json_schema__(cls, core_schema, handler):
48
+ json_schema = handler(core_schema)
49
+ json_schema["type"] = "string"
50
+ json_schema["example"] = "60c728ef238b9c7b9e0f6c2a"
 
 
 
 
51
  return json_schema
52
 
 
53
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
54
 
 
55
  # ---------------------- Models ----------------------
56
  class Contact(BaseModel):
57
  name: str
 
73
  due_date: date
74
 
75
  class ExtractedData(BaseModel):
 
76
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
77
  contacts: List[Contact]
78
  appointments: List[Appointment]
 
81
  processed_at: datetime = Field(default_factory=datetime.utcnow)
82
 
83
  class Config:
84
+ populate_by_name = True
85
+ arbitrary_types_allowed = True
86
 
 
87
  @model_serializer(when_used='json')
88
  def serialize_model(self):
89
  data = self.model_dump(by_alias=True, exclude_none=True)
 
90
  if "_id" in data and isinstance(data["_id"], ObjectId):
91
  data["_id"] = str(data["_id"])
 
92
  if 'appointments' in data:
93
  for appt in data['appointments']:
94
  if isinstance(appt.get('start_date'), date):
 
115
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
116
 
117
  class GeneratedReplyData(BaseModel):
 
118
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
119
  original_email_text: str
120
  generated_reply_text: str
 
155
 
156
  # ---------------------- Utility Functions ----------------------
157
  def extract_last_json_block(text: str) -> Optional[str]:
 
 
 
 
158
  pattern = r'```json\s*(.*?)\s*```'
159
  matches = re.findall(pattern, text, re.DOTALL)
160
  if matches:
161
  return matches[-1].strip()
 
162
  match = re.search(r'\{.*\}', text, re.DOTALL)
163
  if match:
164
  return match.group(0)
165
  return None
166
 
167
+ def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]: # Allow None to pass through
168
+ if not date_str: return None # If input is None (e.g. optional end_date), return None
 
 
 
 
 
169
  date_str_lower = date_str.lower().strip()
170
+ if date_str_lower == "today": return current_date
171
+ if date_str_lower == "tomorrow": return current_date + timedelta(days=1)
 
 
172
  try:
173
  return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
174
  except ValueError:
175
+ # For "appointments" "start_date", you assumed "today" if parsing failed or not present.
176
+ # For "end_date" it was optional. We need to be consistent.
177
+ # Given the original normalize_llm_output, start_date defaulted to today, end_date was optional.
178
+ # This parse_date is more general. The default handling should be in normalize_llm_output.
179
+ return current_date # Fallback, or raise error, or return None depending on strictness
180
 
181
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
 
 
 
 
182
  def split_name(full_name: str) -> tuple[str, str]:
183
  parts = full_name.strip().split()
184
  name = parts[0] if parts else ""
 
192
 
193
  appointments_data = []
194
  for a in data.get("appointments", []):
195
+ start_date_val = parse_date(a.get("start_date", "today"), current_date) or current_date # Default to current_date if parse_date returns None
196
+ end_date_val = parse_date(a.get("end_date"), current_date) # parse_date can return None if end_date is not provided or invalid
 
 
197
 
198
  appointments_data.append(Appointment(
199
  title=a.get("title", "Untitled"), description=a.get("description", "No description"),
 
203
 
204
  tasks_data = []
205
  for t in data.get("tasks", []):
206
+ due_date_val = parse_date(t.get("due_date", "today"), current_date) or current_date # Default to current_date
 
207
  tasks_data.append(Task(
208
  task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
209
  due_date=due_date_val
 
212
 
213
  # ---------------------- Core Logic (Internal Functions) ----------------------
214
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
215
+ if not email_text: raise ValueError("Email text cannot be empty for processing.")
 
 
 
 
 
216
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
 
217
  prompt_today_str = current_date.isoformat()
218
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
219
+ # Ensure your full, detailed prompt is used here
220
  prompt_template_str = f"""
221
  You are an expert email assistant tasked with extracting structured information from an Italian email.
222
 
 
257
  prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
258
  chain = prompt_template | llm
259
  try:
260
+ # print(f"DEBUG: Invoking LLM with email_text length: {len(email_text)} and current_date: {current_date}")
261
  llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
262
  llm_output_str = llm_output.content
263
+ # print(f"DEBUG: Raw LLM output:\n{llm_output_str[:500]}...")
264
 
265
  json_str = extract_last_json_block(llm_output_str)
266
+ # print(f"DEBUG: Extracted JSON string:\n{json_str}")
267
 
268
+ if not json_str: raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
 
269
  json_data = json.loads(json_str)
270
+ # print(f"DEBUG: Parsed JSON data: {json.dumps(json_data, indent=2)}")
271
 
272
  extracted_data = normalize_llm_output(json_data, current_date, email_text)
273
+ # print("DEBUG: Data normalized successfully.")
274
  return extracted_data
275
  except json.JSONDecodeError as e:
276
+ # print(f"ERROR: JSON Decode Error: {e}")
277
+ # print(f"ERROR: LLM response that caused error:\n{llm_output_str}")
278
  raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
279
  except Exception as e:
280
  traceback.print_exc()
 
284
  email_text: str, api_key: str, language: Literal["Italian", "English"],
285
  length: str, style: str, tone: str, emoji: str
286
  ) -> str:
287
+ if not email_text: return "Cannot generate reply for empty email text."
 
 
 
 
288
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
289
+ # Ensure your full, detailed prompt is used here
290
  prompt_template_str="""
291
  You are an assistant that helps reply to emails.
292
 
 
312
 
313
  # --- Batching and Caching Configuration ---
314
  MAX_BATCH_SIZE = 20
315
+ BATCH_TIMEOUT = 0.5 # seconds
316
 
317
  reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
318
  reply_queue_lock = asyncio.Lock()
 
337
  "tone": request_data.tone,
338
  "emoji": request_data.emoji,
339
  }
 
340
  cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
341
 
342
  if cached_reply_doc:
 
345
  "stored_id": str(cached_reply_doc["_id"]),
346
  "cached": True
347
  }
348
+ if not future.done(): future.set_result(response)
 
349
  return
350
 
351
  reply_content = await asyncio.to_thread(
 
368
  tone=request_data.tone,
369
  emoji=request_data.emoji
370
  )
 
371
  reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
372
 
373
  insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
 
378
  "stored_id": stored_id,
379
  "cached": False
380
  }
381
+ if not future.done(): future.set_result(final_response)
 
382
 
383
  except Exception as e:
384
  traceback.print_exc()
 
393
  async with reply_queue_condition:
394
  if not reply_request_queue:
395
  await reply_queue_condition.wait()
 
396
  if not reply_request_queue:
397
  continue
398
 
 
416
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
417
  await asyncio.gather(*tasks)
418
  else:
419
+ await asyncio.sleep(0.001)
 
420
 
421
 
422
  # ---------------------- FastAPI Application ----------------------
423
  app = FastAPI(
424
  title="Email Assistant API",
425
  description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
426
+ version="1.1.0", # Incremented version
427
+ docs_url="/",
428
  redoc_url="/redoc"
429
  )
430
 
431
  # --- Global Exception Handler ---
 
432
  @app.exception_handler(StarletteHTTPException)
433
  async def custom_http_exception_handler_wrapper(request, exc):
434
  return await http_exception_handler(request, exc)
435
 
 
436
  @app.exception_handler(Exception)
437
  async def global_exception_handler_wrapper(request, exc):
438
  print(f"Unhandled exception caught by global handler for request: {request.url}")
439
+ traceback.print_exc()
440
+ # Ensure it returns a valid FastAPI response
441
  return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}"}), status_code=500, media_type="application/json")
442
 
443
 
 
446
  async def startup_event():
447
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
448
  try:
 
449
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
450
+ client.admin.command('ping')
451
  db = client[DB_NAME]
452
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
453
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
454
  print(f"Successfully connected to MongoDB: {DB_NAME}")
455
 
 
456
  if batch_processor_task is None:
457
+ loop = asyncio.get_event_loop()
458
+ batch_processor_task = loop.create_task(process_reply_batches())
459
  print("Batch processor task for replies started.")
460
 
461
  except (ConnectionFailure, OperationFailure) as e:
462
  print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
 
463
  client = None
464
  db = None
465
  extracted_emails_collection = None
 
472
  extracted_emails_collection = None
473
  generated_replies_collection = None
474
  finally:
475
+ # Corrected condition for checking client and db
476
  if client is not None and db is not None:
477
  try:
 
478
  client.admin.command('ping')
479
+ except Exception:
480
+ print("MongoDB ping failed after initial connection attempt during finally block.")
481
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
482
  else:
483
+ print("MongoDB client or db object is None after connection attempt in startup.")
484
+ if client is None or db is None: # Ensure all are None if one is
 
485
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
486
+ print("FastAPI app starting up. MongoDB client & Batch Processor initialization attempted.")
487
 
488
 
489
  @app.on_event("shutdown")
490
  async def shutdown_event():
491
  global client, batch_processor_task
 
492
  if batch_processor_task:
493
  batch_processor_task.cancel()
494
  try:
 
495
  await batch_processor_task
496
  except asyncio.CancelledError:
497
+ print("Batch processor task for replies cancelled.")
498
  except Exception as e:
499
  print(f"Error during batch processor task shutdown: {e}")
500
  traceback.print_exc()
501
  batch_processor_task = None
502
 
 
503
  if client:
504
  client.close()
505
  print("FastAPI app shutting down. MongoDB client closed.")
506
 
507
 
 
508
  @app.get("/health", summary="Health Check")
509
  async def health_check():
510
+ db_status = "MongoDB not connected. Check server startup logs."
 
 
 
511
  db_ok = False
512
+ if client is not None and db is not None: # Corrected check
513
  try:
514
+ db.list_collection_names()
 
515
  db_status = "MongoDB connection OK."
516
  db_ok = True
517
  except Exception as e:
518
  db_status = f"MongoDB connection error: {e}"
 
519
 
520
+ batch_processor_status = "Batch processor not running or state unknown."
521
+ if batch_processor_task is not None :
522
  if not batch_processor_task.done():
523
+ batch_processor_status = "Batch processor is running."
524
  else:
525
+ batch_processor_status = "Batch processor task is done (may have completed or errored)."
526
+
 
 
 
 
 
 
527
  if db_ok:
528
+ return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
529
  else:
530
+ # Return a JSON response for HTTPException as well for consistency
531
  raise HTTPException(
532
  status_code=503,
533
+ detail={"message": "Service unavailable.", "database": db_status, "batch_processor": batch_processor_status}
534
  )
535
 
536
 
537
  @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
538
  async def extract_email_data(request: ProcessEmailRequest):
 
 
 
 
539
  if extracted_emails_collection is None:
540
+ raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
541
  try:
542
  current_date_val = date.today()
 
543
  extracted_data = await asyncio.to_thread(
544
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
545
  )
 
 
546
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
547
+ for appt in extracted_data_dict.get('appointments', []):
548
+ if isinstance(appt.get('start_date'), date): appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
549
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
550
+ for task_item in extracted_data_dict.get('tasks', []):
551
+ if isinstance(task_item.get('due_date'), date): task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
 
 
 
 
 
552
 
 
553
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
554
+ # Pydantic model expects string ID, convert from ObjectId before assigning if needed
555
+ # However, PyObjectId should handle this if result.inserted_id is ObjectId
556
  extracted_data.id = str(result.inserted_id) if isinstance(result.inserted_id, ObjectId) else result.inserted_id
557
  return extracted_data
558
  except ValueError as e:
 
564
 
565
  @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
566
  async def extract_email_data_excel(request: ProcessEmailRequest):
 
 
 
 
567
  raise HTTPException(status_code=501, detail="Excel functionality is currently disabled.")
568
 
569
 
570
  @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
571
  async def generate_email_reply(request: GenerateReplyRequest):
 
 
 
 
572
  if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
573
+ raise HTTPException(status_code=503, detail="Reply generation service not fully initialized. Check server logs.")
574
 
575
  future = asyncio.Future()
576
  current_time = asyncio.get_event_loop().time()
577
 
578
  async with reply_queue_condition:
579
  reply_request_queue.append((request, future, current_time))
580
+ reply_queue_condition.notify()
581
 
582
  try:
 
583
  client_timeout = BATCH_TIMEOUT + 10.0
584
  result = await asyncio.wait_for(future, timeout=client_timeout)
585
  return result
586
  except asyncio.TimeoutError:
 
587
  if not future.done():
588
  future.cancel()
589
+ raise HTTPException(status_code=504, detail=f"Request timed out after {client_timeout}s waiting for batch processing.")
590
  except Exception as e:
591
  if isinstance(e, HTTPException):
592
+ raise e
593
  traceback.print_exc()
594
  raise HTTPException(status_code=500, detail=f"Error processing your reply request: {str(e)}")
595
 
596
 
597
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
598
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
 
 
 
599
  if extracted_emails_collection is None:
600
+ raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
 
601
  mongo_query: Dict[str, Any] = {}
602
+ if query_params.contact_name: mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"}
603
+ if query_params.appointment_title: mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
604
+ if query_params.task_title: mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
 
 
 
605
 
606
  if query_params.from_date or query_params.to_date:
607
  date_query: Dict[str, datetime] = {}
608
+ if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
609
+ if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
610
+ if date_query : mongo_query["processed_at"] = date_query
 
 
 
 
 
611
 
612
  try:
 
613
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
614
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
615
+
616
  results = []
617
  for doc_raw in extracted_docs_raw:
618
  # Convert _id to string for Pydantic model if it's an ObjectId
619
  if isinstance(doc_raw.get("_id"), ObjectId):
620
  doc_raw["_id"] = str(doc_raw["_id"])
621
+
622
+ # Convert datetime objects back to date objects for Pydantic model fields that are `date`
623
  if 'appointments' in doc_raw:
624
  for appt in doc_raw['appointments']:
625
+ if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
626
+ if isinstance(appt.get('end_date'), datetime): appt['end_date'] = appt['end_date'].date()
 
 
627
  if 'tasks' in doc_raw:
628
  for task_item in doc_raw['tasks']:
629
+ if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
 
630
  results.append(ExtractedData(**doc_raw))
631
  return results
632
  except Exception as e:
 
634
  raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
635
 
636
 
637
+ @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
638
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
639
  if generated_replies_collection is None:
640
  raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.")
 
661
  except Exception as e:
662
  traceback.print_exc()
663
  raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")
664
+