precison9 commited on
Commit
fbe81c0
·
verified ·
1 Parent(s): 604c55b

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +333 -441
flask_Character.py CHANGED
@@ -1,6 +1,3 @@
1
- # This software is licensed under a **dual-license model**
2
- # For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
3
- # Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
4
  import os
5
  # NUMBA_CACHE_DIR and NUMBA_DISABLE_CACHE are often set for specific environments,
6
  # e.g., if you're experiencing issues with Numba's caching behavior or in containerized environments.
@@ -14,11 +11,13 @@ from datetime import date, datetime, timedelta
14
  from typing import List, Optional, Literal, Dict, Any, Tuple
15
  import traceback
16
  import asyncio
 
17
 
18
  from fastapi import FastAPI, HTTPException, Response, Query, Depends, status
19
  from fastapi.responses import FileResponse
20
  from fastapi.exception_handlers import http_exception_handler
21
  from starlette.exceptions import HTTPException as StarletteHTTPException
 
22
  from langchain.prompts import PromptTemplate
23
  from langchain_groq import ChatGroq
24
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
@@ -29,15 +28,29 @@ from pymongo import MongoClient
29
  from pymongo.errors import ConnectionFailure, OperationFailure
30
  from bson import ObjectId
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  # --- MongoDB Configuration ---
33
- # IMPORTANT: Use environment variables for your MONGO_URI in production for security.
34
- # Example: MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
35
- MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net"
36
  DB_NAME = "email_assistant_db"
37
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
38
  GENERATED_REPLIES_COLLECTION = "generated_replies"
39
 
40
- # Global variables for MongoDB client and collections
41
  client: Optional[MongoClient] = None
42
  db: Optional[Any] = None
43
  extracted_emails_collection: Optional[Any] = None
@@ -45,50 +58,30 @@ generated_replies_collection: Optional[Any] = None
45
 
46
  # --- Pydantic ObjectId Handling ---
47
  class CustomObjectId(str):
48
- """
49
- Custom Pydantic type for handling MongoDB ObjectIds.
50
- It validates that the input is a valid ObjectId string and
51
- ensures it's represented as a string in JSON Schema.
52
- """
53
  @classmethod
54
  def __get_validators__(cls):
55
  yield cls.validate
56
-
57
  @classmethod
58
  def validate(cls, v):
59
- # Allow None or empty string to pass through for optional fields
60
- # This validator is only called if the field is not None
61
- # Pydantic's Optional[PyObjectId] handles the None case before this validator
62
  if v is None or v == "":
63
  return None
64
-
65
  if not isinstance(v, (str, ObjectId)):
66
  raise ValueError("ObjectId must be a string or ObjectId instance")
67
-
68
- # Convert ObjectId to string if it's already an ObjectId instance
69
  if isinstance(v, ObjectId):
70
  return str(v)
71
-
72
- # Validate string format
73
  if not ObjectId.is_valid(v):
74
  raise ValueError("Invalid ObjectId format")
75
- return cls(v) # Return an instance of CustomObjectId (which is a str subclass)
76
-
77
- # This method is crucial for Pydantic v2 to generate correct OpenAPI schema
78
  @classmethod
79
  def __get_pydantic_json_schema__(
80
  cls, _core_schema: core_schema.CoreSchema, handler
81
  ) -> Dict[str, Any]:
82
- # We tell Pydantic that this custom type should be represented as a standard string
83
- # in the generated JSON Schema (OpenAPI documentation).
84
  json_schema = handler(core_schema.str_schema())
85
- json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" # Add an example for clarity
86
  return json_schema
87
 
88
- # Annotated type for convenience in models
89
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
90
 
91
-
92
  # ---------------------- Models ----------------------
93
  class Contact(BaseModel):
94
  name: str
@@ -110,28 +103,20 @@ class Task(BaseModel):
110
  due_date: date
111
 
112
  class ExtractedData(BaseModel):
113
- # Use PyObjectId for the _id field
114
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
115
  contacts: List[Contact]
116
  appointments: List[Appointment]
117
  tasks: List[Task]
118
  original_email_text: str
119
  processed_at: datetime = Field(default_factory=datetime.utcnow)
120
-
121
  class Config:
122
- populate_by_name = True # Allow setting 'id' or '_id'
123
- arbitrary_types_allowed = True # Allow CustomObjectId and ObjectId
124
-
125
- # Custom serializer for JSON output to ensure ObjectId is converted to string
126
  @model_serializer(when_used='json')
127
  def serialize_model(self):
128
  data = self.model_dump(by_alias=True, exclude_none=True)
129
- # Ensure _id is a string when serializing to JSON
130
  if "_id" in data and isinstance(data["_id"], ObjectId):
131
  data["_id"] = str(data["_id"])
132
- # Ensure dates are correctly serialized to ISO format if they are date objects
133
- # Pydantic v2 usually handles this automatically for `date` types,
134
- # but explicit conversion can be useful if direct manipulation is expected or for specific formats.
135
  if 'appointments' in data:
136
  for appt in data['appointments']:
137
  if isinstance(appt.get('start_date'), date):
@@ -158,7 +143,6 @@ class GenerateReplyRequest(BaseModel):
158
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
159
 
160
  class GeneratedReplyData(BaseModel):
161
- # Use PyObjectId for the _id field
162
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
163
  original_email_text: str
164
  generated_reply_text: str
@@ -168,11 +152,9 @@ class GeneratedReplyData(BaseModel):
168
  tone: str
169
  emoji: str
170
  generated_at: datetime = Field(default_factory=datetime.utcnow)
171
-
172
  class Config:
173
  populate_by_name = True
174
  arbitrary_types_allowed = True
175
-
176
  @model_serializer(when_used='json')
177
  def serialize_model(self):
178
  data = self.model_dump(by_alias=True, exclude_none=True)
@@ -180,7 +162,6 @@ class GeneratedReplyData(BaseModel):
180
  data["_id"] = str(data["_id"])
181
  return data
182
 
183
- # NEW: Response Model for /generate-reply endpoint
184
  class GenerateReplyResponse(BaseModel):
185
  reply: str = Field(..., description="The AI-generated reply text.")
186
  stored_id: str = Field(..., description="The MongoDB ID of the stored reply.")
@@ -188,519 +169,430 @@ class GenerateReplyResponse(BaseModel):
188
 
189
  # --- Query Models for GET Endpoints ---
190
  class ExtractedEmailQuery(BaseModel):
191
- contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
192
- appointment_title: Optional[str] = Query(None, description="Filter by appointment title (case-insensitive partial match).")
193
- task_title: Optional[str] = Query(None, description="Filter by task title (case-insensitive partial match).")
194
- from_date: Optional[date] = Query(None, description="Filter by data processed on or after this date (YYYY-MM-DD).")
195
- to_date: Optional[date] = Query(None, description="Filter by data processed on or before this date (YYYY-MM-DD).")
196
- limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
197
 
198
  class GeneratedReplyQuery(BaseModel):
199
- language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by reply language.")
200
- style: Optional[str] = Query(None, description="Filter by reply style (e.g., Professional, Casual).")
201
- tone: Optional[str] = Query(None, description="Filter by reply tone (e.g., Friendly, Neutral).")
202
- from_date: Optional[date] = Query(None, description="Filter by data generated on or after this date (YYYY-MM-DD).")
203
- to_date: Optional[date] = Query(None, description="Filter by data generated on or before this date (YYYY-MM-DD).")
204
- limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
205
 
206
  # ---------------------- Utility Functions ----------------------
207
  def extract_last_json_block(text: str) -> Optional[str]:
208
- """
209
- Extracts the last JSON block enclosed in ```json``` from a string,
210
- or a standalone JSON object if no code block is found.
211
- """
212
  pattern = r'```json\s*(.*?)\s*```'
213
  matches = re.findall(pattern, text, re.DOTALL)
214
- if matches:
215
- return matches[-1].strip()
216
- # Fallback: try to find a standalone JSON object
217
  match = re.search(r'\{.*\}', text, re.DOTALL)
218
- if match:
219
- return match.group(0)
220
  return None
221
 
222
  def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
223
- """
224
- Parses a date string, handling 'today', 'tomorrow', and YYYY-MM-DD format.
225
- Returns None if input is None or cannot be parsed into a valid date.
226
- """
227
- if not date_str:
228
- return None
229
  date_str_lower = date_str.lower().strip()
230
- if date_str_lower == "today":
231
- return current_date
232
- if date_str_lower == "tomorrow":
233
- return current_date + timedelta(days=1)
234
- try:
235
- return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
236
- except ValueError:
237
- # If parsing fails, return None. The calling function (normalize_llm_output)
238
- # will then decide the default (e.g., current_date).
239
- return None
240
 
241
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
242
- """
243
- Normalizes and validates LLM extracted data into ExtractedData Pydantic model.
244
- Handles defaults for dates and name splitting.
245
- """
246
  def split_name(full_name: str) -> tuple[str, str]:
247
  parts = full_name.strip().split()
248
  name = parts[0] if parts else ""
249
  last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
250
  return name, last_name
251
-
252
- contacts_data = []
253
- for c in data.get("contacts", []):
254
- name_val, last_name_val = split_name(c.get("name", ""))
255
- contacts_data.append(Contact(name=name_val, last_name=last_name_val, email=c.get("email"), phone_number=c.get("phone_number")))
256
-
257
- appointments_data = []
258
- for a in data.get("appointments", []):
259
- # Default start_date to current_date if not provided or invalid
260
- start_date_val = parse_date(a.get("start_date"), current_date) or current_date
261
- # end_date remains optional
262
- end_date_val = parse_date(a.get("end_date"), current_date)
263
-
264
- appointments_data.append(Appointment(
265
- title=a.get("title", "Untitled"), description=a.get("description", "No description"),
266
- start_date=start_date_val, start_time=a.get("start_time"),
267
- end_date=end_date_val, end_time=a.get("end_time")
268
- ))
269
-
270
- tasks_data = []
271
- for t in data.get("tasks", []):
272
- # Default due_date to current_date if not provided or invalid
273
- due_date_val = parse_date(t.get("due_date"), current_date) or current_date
274
- tasks_data.append(Task(
275
- task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
276
- due_date=due_date_val
277
- ))
278
  return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text)
279
 
280
  # ---------------------- Core Logic (Internal Functions) ----------------------
281
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
282
- """
283
- Internal function to process email text using LLM and extract structured data.
284
- """
285
- if not email_text:
286
- raise ValueError("Email text cannot be empty for processing.")
287
-
288
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
289
-
290
  prompt_today_str = current_date.isoformat()
291
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
292
-
293
- prompt_template_str = f"""
294
- You are an expert email assistant tasked with extracting structured information from an Italian email.
295
-
296
- **Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.**
297
- **DO NOT include any conversational text, explanations, or preambles outside the JSON block.**
298
- **The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".**
299
- If a category has no items, its list should be empty (e.g., "contacts": []).
300
-
301
- Here is the required JSON schema for each category:
302
-
303
- - **contacts**: List of Contact objects.
304
- Each Contact object must have:
305
- - `name` (string, full name)
306
- - `last_name` (string, last name) - You should infer this from the full name.
307
- - `email` (string, optional, null if not present)
308
- - `phone_number` (string, optional, null if not present)
309
-
310
- - **appointments**: List of Appointment objects.
311
- Each Appointment object must have:
312
- - `title` (string, short, meaningful title in Italian based on the meeting's purpose)
313
- - `description` (string, summary of the meeting's goal)
314
- - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
315
- - `start_time` (string, optional, e.g., "10:30 AM", null if not present)
316
- - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable)
317
- - `end_time` (string, optional, e.g., "11:00 AM", null if not present)
318
-
319
- - **tasks**: List of Task objects.
320
- Each Task object must have:
321
- - `task_title` (string, short summary of action item)
322
- - `task_description` (string, more detailed explanation)
323
- - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
324
-
325
- ---
326
-
327
- Email:
328
- {{email}}
329
- """
330
- prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
331
  chain = prompt_template | llm
332
  try:
333
- llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
334
  llm_output_str = llm_output.content
335
-
336
  json_str = extract_last_json_block(llm_output_str)
337
-
338
- if not json_str:
339
- raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
340
  json_data = json.loads(json_str)
 
 
 
341
 
342
- extracted_data = normalize_llm_output(json_data, current_date, email_text)
343
- return extracted_data
344
- except json.JSONDecodeError as e:
345
- raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
346
- except Exception as e:
347
- traceback.print_exc()
348
- raise Exception(f"An error occurred during email processing: {e}")
349
-
350
- def _generate_response_internal(
351
- email_text: str, api_key: str, language: Literal["Italian", "English"],
352
- length: str, style: str, tone: str, emoji: str
353
- ) -> str:
354
- """
355
- Internal function to generate a reply to an email using LLM.
356
- """
357
- print(f"[{datetime.now()}] _generate_response_internal: Starting LLM call. API Key starts with: {api_key[:5]}...") # Debug log
358
- if not email_text:
359
- print(f"[{datetime.now()}] _generate_response_internal: Email text is empty.")
360
- return "Cannot generate reply for empty email text."
361
-
362
  try:
363
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
364
- prompt_template_str="""
365
- You are an assistant that helps reply to emails.
366
-
367
- Create a response to the following email with the following parameters:
368
- - Language: {language}
369
- - Length: {length}
370
- - Style: {style}
371
- - Tone: {tone}
372
- - Emoji usage: {emoji}
373
-
374
- Email:
375
- {email}
376
-
377
- Write only the reply body. Do not repeat the email or mention any instruction.
378
- """
379
- prompt = PromptTemplate(
380
- input_variables=["email", "language", "length", "style", "tone", "emoji"],
381
- template=prompt_template_str
382
- )
383
  chain = prompt | llm
384
- print(f"[{datetime.now()}] _generate_response_internal: Invoking LLM chain...") # Debug log
385
  output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
386
- print(f"[{datetime.now()}] _generate_response_internal: LLM chain returned. Content length: {len(output.content)}.") # Debug log
387
  return output.content.strip()
388
- except Exception as e:
389
- print(f"[{datetime.now()}] _generate_response_internal: ERROR during LLM invocation: {e}") # Debug log
390
- traceback.print_exc() # Print full traceback to logs
391
- raise # Re-raise the exception so it can be caught by handle_single_reply_request
392
-
393
 
394
  # --- FastAPI Application ---
395
- app = FastAPI(
396
- title="Email Assistant API",
397
- description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, and caching.",
398
- version="1.1.0",
399
- docs_url="/", # Sets Swagger UI to be the root path
400
- redoc_url="/redoc"
401
- )
402
-
403
- # --- Global Exception Handler ---
404
- # Catch Starlette HTTPExceptions (FastAPI uses these internally)
405
  @app.exception_handler(StarletteHTTPException)
406
  async def custom_http_exception_handler_wrapper(request, exc):
407
- """Handles FastAPI's internal HTTP exceptions."""
408
- print(f"[{datetime.now()}] Caught StarletteHTTPException: {exc.status_code} - {exc.detail}")
409
  return await http_exception_handler(request, exc)
410
 
411
- # Catch all other unhandled exceptions
412
  @app.exception_handler(Exception)
413
  async def global_exception_handler_wrapper(request, exc):
414
- """Handles all unhandled exceptions and returns a consistent JSON error response."""
415
- print(f"[{datetime.now()}] Unhandled exception caught by global handler for request: {request.url}")
416
- traceback.print_exc() # Print traceback to console for debugging
417
- # Return a JSON response for consistency, even for unhandled errors
418
- return Response(
419
- content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}),
420
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
421
- media_type="application/json"
422
- )
423
-
424
-
425
- # --- FastAPI Event Handlers for MongoDB ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
426
  @app.on_event("startup")
427
  async def startup_event():
428
  global client, db, extracted_emails_collection, generated_replies_collection
 
 
429
  print(f"[{datetime.now()}] FastAPI app startup sequence initiated.")
430
  try:
431
- # Connect to MongoDB
432
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
433
- client.admin.command('ping') # Test connection
434
  db = client[DB_NAME]
435
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
436
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
437
  print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}")
438
 
 
 
 
 
 
 
 
439
  except (ConnectionFailure, OperationFailure) as e:
440
  print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}")
441
- client = None
442
- db = None
443
- extracted_emails_collection = None
444
- generated_replies_collection = None
445
  except Exception as e:
446
- print(f"[{datetime.now()}] ERROR: An unexpected error occurred during MongoDB connection startup: {e}")
447
  traceback.print_exc()
448
- client = None
449
- db = None
450
- extracted_emails_collection = None
451
- generated_replies_collection = None
452
  finally:
453
- if client is not None and db is not None:
454
- try:
455
- client.admin.command('ping')
456
- except Exception as e:
457
- print(f"[{datetime.now()}] MongoDB ping failed after initial connection attempt during finally block: {e}")
458
- client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
459
- else:
460
- print(f"[{datetime.now()}] MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
461
- if client is None or db is None:
462
- client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
463
- print(f"[{datetime.now()}] FastAPI app startup sequence completed for MongoDB client initialization.")
464
 
465
 
466
  @app.on_event("shutdown")
467
- async def shutdown_event():
468
- global client
469
  print(f"[{datetime.now()}] FastAPI app shutting down.")
 
 
 
 
 
 
 
 
 
470
  if client:
471
  client.close()
472
  print(f"[{datetime.now()}] MongoDB client closed.")
473
-
474
 
475
  # --- API Endpoints ---
476
  @app.get("/health", summary="Health Check")
477
  async def health_check():
478
- """
479
- Checks the health of the API, including MongoDB connection.
480
- """
481
  db_status = "MongoDB not connected."
482
  db_ok = False
483
- if client is not None and db is not None:
484
  try:
485
- # Use asyncio.to_thread for blocking MongoDB call
486
  await asyncio.to_thread(db.list_collection_names)
487
  db_status = "MongoDB connection OK."
488
  db_ok = True
489
  except Exception as e:
490
  db_status = f"MongoDB connection error: {e}"
491
- db_ok = False
492
-
 
 
 
 
493
  if db_ok:
494
- return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status}
495
  else:
496
- raise HTTPException(
497
- status_code=503,
498
- detail={"message": "Service unavailable.", "database": db_status}
499
- )
500
 
 
 
 
501
 
502
- @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email")
503
- async def generate_email_reply(request: GenerateReplyRequest):
504
- """
505
- Generates a smart reply to the provided email text using an LLM.
506
- The generated reply is also stored in MongoDB for caching and historical purposes.
507
- """
508
- if generated_replies_collection is None:
509
- raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.")
510
 
511
  try:
512
- # Check cache first
513
- cache_query = {
514
- "original_email_text": request.email_text,
515
- "language": request.language,
516
- "length": request.length,
517
- "style": request.style,
518
- "tone": request.tone,
519
- "emoji": request.emoji,
520
- }
521
- print(f"[{datetime.now()}] /generate-reply: Checking cache for reply...")
522
- # Use asyncio.to_thread for blocking MongoDB operations
523
- cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
524
-
525
- if cached_reply_doc:
526
- print(f"[{datetime.now()}] /generate-reply: Reply found in cache. ID: {str(cached_reply_doc['_id'])}")
527
- return GenerateReplyResponse(
528
- reply=cached_reply_doc["generated_reply_text"],
529
- stored_id=str(cached_reply_doc["_id"]),
530
- cached=True
531
- )
532
-
533
- # If not in cache, directly call the internal LLM function
534
- print(f"[{datetime.now()}] /generate-reply: Reply not in cache. Calling LLM for generation...")
535
- reply_content = await asyncio.to_thread(
536
- _generate_response_internal,
537
- request.email_text,
538
- request.groq_api_key,
539
- request.language,
540
- request.length,
541
- request.style,
542
- request.tone,
543
- request.emoji
544
- )
545
- print(f"[{datetime.now()}] /generate-reply: LLM call completed. Storing newly generated reply in MongoDB.")
546
 
547
- # Prepare data for storage
548
- reply_data_to_store = GeneratedReplyData(
549
- original_email_text=request.email_text,
550
- generated_reply_text=reply_content,
551
- language=request.language,
552
- length=request.length,
553
- style=request.style,
554
- tone=request.tone,
555
- emoji=request.emoji
556
- )
557
- # Use model_dump for Pydantic v2. Exclude 'id' as it's generated by MongoDB.
558
- reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
559
 
560
- # Insert into MongoDB
561
- insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
562
- stored_id = str(insert_result.inserted_id) # Convert ObjectId to string for the response
 
 
 
 
 
 
 
563
 
564
- print(f"[{datetime.now()}] /generate-reply: Reply stored in MongoDB. ID: {stored_id}")
565
 
566
- # Return the response as per GenerateReplyResponse model
567
- return GenerateReplyResponse(
568
- reply=reply_content,
569
- stored_id=stored_id,
570
- cached=False # Always False since we just generated it
571
- )
572
- except Exception as e:
573
- traceback.print_exc()
574
- # Ensure consistent error response
575
- raise HTTPException(status_code=500, detail=f"Error generating or storing reply: {str(e)}")
576
 
577
- @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email")
578
- async def extract_email_data(request: ProcessEmailRequest):
579
- """
580
- Extracts contacts, appointments, and tasks from the provided email text.
581
- """
582
- if extracted_emails_collection is None:
583
- raise HTTPException(status_code=503, detail="MongoDB not available.")
584
 
585
- current_date = date.today() # Get current date for context
 
 
 
 
 
586
 
587
- print(f"[{datetime.now()}] /extract-data: Received request.")
588
- try:
589
- print(f"[{datetime.now()}] /extract-data: Calling internal processing function.")
590
- # Run blocking LLM call in a thread pool
591
- extracted_data = await asyncio.to_thread(_process_email_internal, request.email_text, request.groq_api_key, current_date)
592
 
593
- print(f"[{datetime.now()}] /extract-data: Internal processing complete. Preparing for DB insert.")
594
- # Convert Pydantic model to dictionary for MongoDB insert, handling _id alias
595
- # Use model_dump for Pydantic v2
596
- data_to_insert = extracted_data.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
597
 
598
- # --- NEW CONVERSION FOR MONGODB ---
599
- # MongoDB's BSON doesn't natively support Python's datetime.date type.
600
- # It expects datetime.datetime. Convert all date fields to datetime.datetime.
601
- if 'appointments' in data_to_insert:
602
- for appt in data_to_insert['appointments']:
603
- if isinstance(appt.get('start_date'), date):
604
- appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
605
- if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
606
- appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
607
- if 'tasks' in data_to_insert:
608
- for task_item in data_to_insert['tasks']:
609
- if isinstance(task_item.get('due_date'), date):
610
- task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
611
- # --- END NEW CONVERSION ---
612
 
613
- print(f"[{datetime.now()}] /extract-data: Inserting into MongoDB... Data: {data_to_insert}") # Add data logging
614
- # Use asyncio.to_thread for blocking MongoDB insert operation
615
- insert_result = await asyncio.to_thread(extracted_emails_collection.insert_one, data_to_insert)
616
 
617
- # Update the extracted_data object with the MongoDB-generated ID
618
- extracted_data.id = str(insert_result.inserted_id)
619
- print(f"[{datetime.now()}] /extract-data: Data inserted into MongoDB. ID: {extracted_data.id}")
 
 
 
 
 
 
 
 
620
 
621
- return extracted_data
622
- except ValueError as ve:
623
- raise HTTPException(status_code=400, detail=str(ve))
624
  except Exception as e:
625
- traceback.print_exc() # Print full traceback for debugging
626
- raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}")
 
 
627
 
628
 
629
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query stored extracted email data")
630
  async def query_extracted_emails(query_params: ExtractedEmailQuery = Depends()):
631
- """
632
- Queries extracted email data from MongoDB based on various filters.
633
- """
634
- if extracted_emails_collection is None:
635
- raise HTTPException(status_code=503, detail="MongoDB not available.")
636
-
637
  mongo_query = {}
638
- if query_params.contact_name:
639
- # Case-insensitive partial match on contact name or last name
640
- mongo_query["$or"] = [
641
- {"contacts.name": {"$regex": query_params.contact_name, "$options": "i"}},
642
- {"contacts.last_name": {"$regex": query_params.contact_name, "$options": "i"}}
643
- ]
644
- if query_params.appointment_title:
645
- mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
646
- if query_params.task_title:
647
- mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
648
-
649
- # Date range filtering for processed_at
650
  date_query = {}
651
- if query_params.from_date:
652
- date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
653
- if query_params.to_date:
654
- date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
655
- if date_query:
656
- mongo_query["processed_at"] = date_query
657
-
658
  try:
659
- # Use asyncio.to_thread for blocking MongoDB find operation
660
  cursor = await asyncio.to_thread(extracted_emails_collection.find, mongo_query)
661
- # Use to_list to limit results and convert to list
662
  results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
663
-
664
- # Convert MongoDB documents to ExtractedData Pydantic models
665
  return [ExtractedData(**doc) for doc in results]
666
- except Exception as e:
667
- traceback.print_exc()
668
- raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
669
-
670
 
671
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query stored generated replies")
672
  async def query_generated_replies(query_params: GeneratedReplyQuery = Depends()):
673
- """
674
- Queries generated email replies from MongoDB based on various filters.
675
- """
676
- if generated_replies_collection is None:
677
- raise HTTPException(status_code=503, detail="MongoDB not available.")
678
-
679
  mongo_query = {}
680
- if query_params.language:
681
- mongo_query["language"] = query_params.language
682
- if query_params.style:
683
- mongo_query["style"] = query_params.style
684
- if query_params.tone:
685
- mongo_query["tone"] = query_params.tone
686
-
687
- # Date range filtering for generated_at
688
  date_query = {}
689
- if query_params.from_date:
690
- date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
691
- if query_params.to_date:
692
- date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
693
- if date_query:
694
- mongo_query["generated_at"] = date_query
695
-
696
  try:
697
- # Use asyncio.to_thread for blocking MongoDB find operation
698
  cursor = await asyncio.to_thread(generated_replies_collection.find, mongo_query)
699
- # Use to_list to limit results and convert to list
700
  results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
701
-
702
- # Convert MongoDB documents to GeneratedReplyData Pydantic models
703
  return [GeneratedReplyData(**doc) for doc in results]
704
- except Exception as e:
705
- traceback.print_exc()
706
- raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")
 
 
 
 
1
  import os
2
  # NUMBA_CACHE_DIR and NUMBA_DISABLE_CACHE are often set for specific environments,
3
  # e.g., if you're experiencing issues with Numba's caching behavior or in containerized environments.
 
11
  from typing import List, Optional, Literal, Dict, Any, Tuple
12
  import traceback
13
  import asyncio
14
+ from uuid import uuid4, UUID # For unique request IDs
15
 
16
  from fastapi import FastAPI, HTTPException, Response, Query, Depends, status
17
  from fastapi.responses import FileResponse
18
  from fastapi.exception_handlers import http_exception_handler
19
  from starlette.exceptions import HTTPException as StarletteHTTPException
20
+
21
  from langchain.prompts import PromptTemplate
22
  from langchain_groq import ChatGroq
23
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
 
28
  from pymongo.errors import ConnectionFailure, OperationFailure
29
  from bson import ObjectId
30
 
31
+ # --- Batching Configuration ---
32
+ MAX_BATCH_SIZE = 20
33
+ BATCH_INTERVAL_SECONDS = 1.0
34
+
35
+ # --- Queues and pending request stores for batching ---
36
+ extract_data_queue: Optional[asyncio.Queue] = None
37
+ generate_reply_queue: Optional[asyncio.Queue] = None
38
+
39
+ # Dictionaries to store futures for pending requests, keyed by unique request ID
40
+ # The value will be an asyncio.Future that the endpoint handler will await
41
+ extract_pending_requests: Dict[UUID, asyncio.Future] = {}
42
+ generate_pending_requests: Dict[UUID, asyncio.Future] = {}
43
+
44
+ # Shutdown event for worker tasks
45
+ shutdown_event = asyncio.Event()
46
+
47
+
48
  # --- MongoDB Configuration ---
49
+ MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net" # Use os.getenv in prod
 
 
50
  DB_NAME = "email_assistant_db"
51
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
52
  GENERATED_REPLIES_COLLECTION = "generated_replies"
53
 
 
54
  client: Optional[MongoClient] = None
55
  db: Optional[Any] = None
56
  extracted_emails_collection: Optional[Any] = None
 
58
 
59
  # --- Pydantic ObjectId Handling ---
60
  class CustomObjectId(str):
 
 
 
 
 
61
  @classmethod
62
  def __get_validators__(cls):
63
  yield cls.validate
 
64
  @classmethod
65
  def validate(cls, v):
 
 
 
66
  if v is None or v == "":
67
  return None
 
68
  if not isinstance(v, (str, ObjectId)):
69
  raise ValueError("ObjectId must be a string or ObjectId instance")
 
 
70
  if isinstance(v, ObjectId):
71
  return str(v)
 
 
72
  if not ObjectId.is_valid(v):
73
  raise ValueError("Invalid ObjectId format")
74
+ return cls(v)
 
 
75
  @classmethod
76
  def __get_pydantic_json_schema__(
77
  cls, _core_schema: core_schema.CoreSchema, handler
78
  ) -> Dict[str, Any]:
 
 
79
  json_schema = handler(core_schema.str_schema())
80
+ json_schema["example"] = "60c728ef238b9c7b9e0f6c2a"
81
  return json_schema
82
 
 
83
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
84
 
 
85
  # ---------------------- Models ----------------------
86
  class Contact(BaseModel):
87
  name: str
 
103
  due_date: date
104
 
105
  class ExtractedData(BaseModel):
 
106
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
107
  contacts: List[Contact]
108
  appointments: List[Appointment]
109
  tasks: List[Task]
110
  original_email_text: str
111
  processed_at: datetime = Field(default_factory=datetime.utcnow)
 
112
  class Config:
113
+ populate_by_name = True
114
+ arbitrary_types_allowed = True
 
 
115
  @model_serializer(when_used='json')
116
  def serialize_model(self):
117
  data = self.model_dump(by_alias=True, exclude_none=True)
 
118
  if "_id" in data and isinstance(data["_id"], ObjectId):
119
  data["_id"] = str(data["_id"])
 
 
 
120
  if 'appointments' in data:
121
  for appt in data['appointments']:
122
  if isinstance(appt.get('start_date'), date):
 
143
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
144
 
145
  class GeneratedReplyData(BaseModel):
 
146
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
147
  original_email_text: str
148
  generated_reply_text: str
 
152
  tone: str
153
  emoji: str
154
  generated_at: datetime = Field(default_factory=datetime.utcnow)
 
155
  class Config:
156
  populate_by_name = True
157
  arbitrary_types_allowed = True
 
158
  @model_serializer(when_used='json')
159
  def serialize_model(self):
160
  data = self.model_dump(by_alias=True, exclude_none=True)
 
162
  data["_id"] = str(data["_id"])
163
  return data
164
 
 
165
  class GenerateReplyResponse(BaseModel):
166
  reply: str = Field(..., description="The AI-generated reply text.")
167
  stored_id: str = Field(..., description="The MongoDB ID of the stored reply.")
 
169
 
170
  # --- Query Models for GET Endpoints ---
171
  class ExtractedEmailQuery(BaseModel):
172
+ contact_name: Optional[str] = Query(None)
173
+ appointment_title: Optional[str] = Query(None)
174
+ task_title: Optional[str] = Query(None)
175
+ from_date: Optional[date] = Query(None)
176
+ to_date: Optional[date] = Query(None)
177
+ limit: int = Query(10, ge=1, le=100)
178
 
179
  class GeneratedReplyQuery(BaseModel):
180
+ language: Optional[Literal["Italian", "English"]] = Query(None)
181
+ style: Optional[str] = Query(None)
182
+ tone: Optional[str] = Query(None)
183
+ from_date: Optional[date] = Query(None)
184
+ to_date: Optional[date] = Query(None)
185
+ limit: int = Query(10, ge=1, le=100)
186
 
187
  # ---------------------- Utility Functions ----------------------
188
  def extract_last_json_block(text: str) -> Optional[str]:
 
 
 
 
189
  pattern = r'```json\s*(.*?)\s*```'
190
  matches = re.findall(pattern, text, re.DOTALL)
191
+ if matches: return matches[-1].strip()
 
 
192
  match = re.search(r'\{.*\}', text, re.DOTALL)
193
+ if match: return match.group(0)
 
194
  return None
195
 
196
  def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
197
+ if not date_str: return None
 
 
 
 
 
198
  date_str_lower = date_str.lower().strip()
199
+ if date_str_lower == "today": return current_date
200
+ if date_str_lower == "tomorrow": return current_date + timedelta(days=1)
201
+ try: return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
202
+ except ValueError: return None
 
 
 
 
 
 
203
 
204
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
 
 
 
 
205
  def split_name(full_name: str) -> tuple[str, str]:
206
  parts = full_name.strip().split()
207
  name = parts[0] if parts else ""
208
  last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
209
  return name, last_name
210
+ contacts_data = [Contact(name=split_name(c.get("name",""))[0], last_name=split_name(c.get("name",""))[1], email=c.get("email"), phone_number=c.get("phone_number")) for c in data.get("contacts", [])]
211
+ appointments_data = [Appointment(title=a.get("title", "Untitled"), description=a.get("description", "No description"), start_date=parse_date(a.get("start_date"), current_date) or current_date, start_time=a.get("start_time"), end_date=parse_date(a.get("end_date"), current_date), end_time=a.get("end_time")) for a in data.get("appointments", [])]
212
+ tasks_data = [Task(task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"), due_date=parse_date(t.get("due_date"), current_date) or current_date) for t in data.get("tasks", [])]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
213
  return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text)
214
 
215
  # ---------------------- Core Logic (Internal Functions) ----------------------
216
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
217
+ if not email_text: raise ValueError("Email text cannot be empty for processing.")
 
 
 
 
 
218
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
 
219
  prompt_today_str = current_date.isoformat()
220
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
221
+ prompt_template_str = f"""You are an expert email assistant tasked with extracting structured information from an Italian email.**Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.****DO NOT include any conversational text, explanations, or preambles outside the JSON block.****The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".**If a category has no items, its list should be empty (e.g., "contacts": []).Here is the required JSON schema for each category:- **contacts**: List of Contact objects. Each Contact object must have: - `name` (string, full name) - `last_name` (string, last name) - You should infer this from the full name. - `email` (string, optional, null if not present) - `phone_number` (string, optional, null if not present)- **appointments**: List of Appointment objects. Each Appointment object must have: - `title` (string, short, meaningful title in Italian based on the meeting's purpose) - `description` (string, summary of the meeting's goal) - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow") - `start_time` (string, optional, e.g., "10:30 AM", null if not present) - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable) - `end_time` (string, optional, e.g., "11:00 AM", null if not present)- **tasks**: List of Task objects. Each Task object must have: - `task_title` (string, short summary of action item) - `task_description` (string, more detailed explanation) - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")---Email:{{email}}"""
222
+ prompt_template = PromptTemplate(input_variables=["email"], template=prompt_template_str) # Removed unused prompt_today_str, prompt_tomorrow_str from input_variables
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
223
  chain = prompt_template | llm
224
  try:
225
+ llm_output = chain.invoke({"email": email_text}) # Removed unused variables
226
  llm_output_str = llm_output.content
 
227
  json_str = extract_last_json_block(llm_output_str)
228
+ if not json_str: raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
 
 
229
  json_data = json.loads(json_str)
230
+ return normalize_llm_output(json_data, current_date, email_text)
231
+ except json.JSONDecodeError as e: raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
232
+ except Exception as e: traceback.print_exc(); raise Exception(f"An error occurred during email processing: {e}")
233
 
234
+ def _generate_response_internal(email_text: str, api_key: str, language: Literal["Italian", "English"], length: str, style: str, tone: str, emoji: str) -> str:
235
+ if not email_text: return "Cannot generate reply for empty email text."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
  try:
237
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
238
+ prompt_template_str=""" You are an assistant that helps reply to emails. Create a response to the following email with the following parameters: - Language: {language} - Length: {length} - Style: {style} - Tone: {tone} - Emoji usage: {emoji} Email: {email} Write only the reply body. Do not repeat the email or mention any instruction. """
239
+ prompt = PromptTemplate(input_variables=["email", "language", "length", "style", "tone", "emoji"], template=prompt_template_str)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
  chain = prompt | llm
 
241
  output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
 
242
  return output.content.strip()
243
+ except Exception as e: traceback.print_exc(); raise
 
 
 
 
244
 
245
  # --- FastAPI Application ---
246
+ app = FastAPI(title="Email Assistant API", description="API for extracting structured data and generating replies.", version="1.2.0", docs_url="/", redoc_url="/redoc")
247
+
248
+ # --- Exception Handlers ---
 
 
 
 
 
 
 
249
  @app.exception_handler(StarletteHTTPException)
250
  async def custom_http_exception_handler_wrapper(request, exc):
 
 
251
  return await http_exception_handler(request, exc)
252
 
 
253
  @app.exception_handler(Exception)
254
  async def global_exception_handler_wrapper(request, exc):
255
+ traceback.print_exc()
256
+ return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}), status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, media_type="application/json")
257
+
258
+ # --- Batch Worker Functions ---
259
+ async def _execute_single_extract_task(request_item: ProcessEmailRequest, request_id: UUID) -> Tuple[UUID, Any]:
260
+ """Helper to run a single extract task and return its result or exception."""
261
+ current_date = date.today()
262
+ try:
263
+ # Run blocking LLM call in a thread pool
264
+ result = await asyncio.to_thread(
265
+ _process_email_internal, request_item.email_text, request_item.groq_api_key, current_date
266
+ )
267
+ return request_id, result
268
+ except Exception as e:
269
+ return request_id, e
270
+
271
+ async def extract_data_batch_worker():
272
+ global extract_data_queue, extract_pending_requests, shutdown_event
273
+ print(f"[{datetime.now()}] Extract Data Batch Worker started.")
274
+ while not shutdown_event.is_set():
275
+ try:
276
+ await asyncio.wait_for(asyncio.sleep(BATCH_INTERVAL_SECONDS), timeout=BATCH_INTERVAL_SECONDS + 0.1) # Ensures it runs roughly every interval
277
+ except asyncio.TimeoutError: # woken up by shutdown
278
+ if shutdown_event.is_set(): break
279
+
280
+ batch_to_process: List[Tuple[ProcessEmailRequest, UUID]] = []
281
+ while len(batch_to_process) < MAX_BATCH_SIZE:
282
+ try:
283
+ request_obj, req_id = extract_data_queue.get_nowait()
284
+ batch_to_process.append((request_obj, req_id))
285
+ except asyncio.QueueEmpty:
286
+ break # No more items for this batch
287
+
288
+ if not batch_to_process:
289
+ continue
290
+
291
+ print(f"[{datetime.now()}] Extract Worker: Processing batch of {len(batch_to_process)} requests.")
292
+
293
+ # Concurrently execute all LLM calls for the current batch
294
+ llm_tasks = [_execute_single_extract_task(req_obj, req_id) for req_obj, req_id in batch_to_process]
295
+ results = await asyncio.gather(*llm_tasks) # Results are (request_id, result_or_exception)
296
+
297
+ for req_id, result_or_exc in results:
298
+ future = extract_pending_requests.pop(req_id, None) # Get and remove future
299
+ if future and not future.done():
300
+ if isinstance(result_or_exc, Exception):
301
+ future.set_exception(result_or_exc)
302
+ else:
303
+ future.set_result(result_or_exc) # This is ExtractedData object (pre-DB)
304
+ elif future and future.done():
305
+ print(f"[{datetime.now()}] Extract Worker: Future for {req_id} was already done (e.g. timed out).")
306
+
307
+ print(f"[{datetime.now()}] Extract Data Batch Worker shutting down.")
308
+ # Clear out any remaining requests in the queue by setting exception
309
+ while not extract_data_queue.empty():
310
+ try:
311
+ _, req_id = extract_data_queue.get_nowait()
312
+ future = extract_pending_requests.pop(req_id, None)
313
+ if future and not future.done():
314
+ future.set_exception(HTTPException(status_code=503, detail="Service shutting down, request cancelled."))
315
+ except asyncio.QueueEmpty:
316
+ break
317
+
318
+
319
+ async def _execute_single_generate_reply_task(request_item: GenerateReplyRequest, request_id: UUID) -> Tuple[UUID, Any]:
320
+ """Helper to run a single generate reply task and return its result or exception."""
321
+ try:
322
+ # Run blocking LLM call in a thread pool
323
+ result = await asyncio.to_thread(
324
+ _generate_response_internal,
325
+ request_item.email_text, request_item.groq_api_key, request_item.language,
326
+ request_item.length, request_item.style, request_item.tone, request_item.emoji
327
+ )
328
+ return request_id, result # This is the reply string
329
+ except Exception as e:
330
+ return request_id, e
331
+
332
+ async def generate_reply_batch_worker():
333
+ global generate_reply_queue, generate_pending_requests, shutdown_event
334
+ print(f"[{datetime.now()}] Generate Reply Batch Worker started.")
335
+ while not shutdown_event.is_set():
336
+ try:
337
+ await asyncio.wait_for(asyncio.sleep(BATCH_INTERVAL_SECONDS), timeout=BATCH_INTERVAL_SECONDS + 0.1)
338
+ except asyncio.TimeoutError:
339
+ if shutdown_event.is_set(): break
340
+
341
+ batch_to_process: List[Tuple[GenerateReplyRequest, UUID]] = []
342
+ while len(batch_to_process) < MAX_BATCH_SIZE:
343
+ try:
344
+ request_obj, req_id = generate_reply_queue.get_nowait()
345
+ batch_to_process.append((request_obj, req_id))
346
+ except asyncio.QueueEmpty:
347
+ break
348
+
349
+ if not batch_to_process:
350
+ continue
351
+
352
+ print(f"[{datetime.now()}] Reply Worker: Processing batch of {len(batch_to_process)} requests.")
353
+
354
+ llm_tasks = [_execute_single_generate_reply_task(req_obj, req_id) for req_obj, req_id in batch_to_process]
355
+ results = await asyncio.gather(*llm_tasks)
356
+
357
+ for req_id, result_or_exc in results:
358
+ future = generate_pending_requests.pop(req_id, None)
359
+ if future and not future.done():
360
+ if isinstance(result_or_exc, Exception):
361
+ future.set_exception(result_or_exc)
362
+ else:
363
+ future.set_result(result_or_exc) # This is reply string
364
+ elif future and future.done():
365
+ print(f"[{datetime.now()}] Reply Worker: Future for {req_id} was already done (e.g. timed out).")
366
+
367
+ print(f"[{datetime.now()}] Generate Reply Batch Worker shutting down.")
368
+ while not generate_reply_queue.empty():
369
+ try:
370
+ _, req_id = generate_reply_queue.get_nowait()
371
+ future = generate_pending_requests.pop(req_id, None)
372
+ if future and not future.done():
373
+ future.set_exception(HTTPException(status_code=503, detail="Service shutting down, request cancelled."))
374
+ except asyncio.QueueEmpty:
375
+ break
376
+
377
+ # --- FastAPI Event Handlers ---
378
  @app.on_event("startup")
379
  async def startup_event():
380
  global client, db, extracted_emails_collection, generated_replies_collection
381
+ global extract_data_queue, generate_reply_queue # Initialize queues
382
+
383
  print(f"[{datetime.now()}] FastAPI app startup sequence initiated.")
384
  try:
 
385
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
386
+ client.admin.command('ping')
387
  db = client[DB_NAME]
388
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
389
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
390
  print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}")
391
 
392
+ # Initialize queues and start worker tasks
393
+ extract_data_queue = asyncio.Queue()
394
+ generate_reply_queue = asyncio.Queue()
395
+ asyncio.create_task(extract_data_batch_worker())
396
+ asyncio.create_task(generate_reply_batch_worker())
397
+ print(f"[{datetime.now()}] Batch processing workers started.")
398
+
399
  except (ConnectionFailure, OperationFailure) as e:
400
  print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}")
401
+ # Critical error, prevent app from fully starting or indicate non-operational state
402
+ # For simplicity, we'll let it run but endpoints relying on DB will fail
403
+ client = db = extracted_emails_collection = generated_replies_collection = None
 
404
  except Exception as e:
405
+ print(f"[{datetime.now()}] ERROR: An unexpected error during startup: {e}")
406
  traceback.print_exc()
407
+ client = db = extracted_emails_collection = generated_replies_collection = None
 
 
 
408
  finally:
409
+ # Simplified check after connection attempt
410
+ if not (client and db and extracted_emails_collection and generated_replies_collection):
411
+ print(f"[{datetime.now()}] MongoDB or dependent services (batch queues) might not be fully initialized.")
412
+ print(f"[{datetime.now()}] FastAPI app startup sequence completed.")
 
 
 
 
 
 
 
413
 
414
 
415
  @app.on_event("shutdown")
416
+ async def shutdown_event_handler(): # Renamed to avoid conflict with global shutdown_event
417
+ global client, shutdown_event # Use the global shutdown_event
418
  print(f"[{datetime.now()}] FastAPI app shutting down.")
419
+
420
+ # Signal workers to stop
421
+ shutdown_event.set()
422
+
423
+ # Give workers a moment to process their current items or exit
424
+ # This timeout should be slightly longer than BATCH_INTERVAL_SECONDS to allow a final batch cycle
425
+ # Or join the worker tasks if they are stored globally (more robust)
426
+ await asyncio.sleep(BATCH_INTERVAL_SECONDS + 0.5)
427
+
428
  if client:
429
  client.close()
430
  print(f"[{datetime.now()}] MongoDB client closed.")
431
+ print(f"[{datetime.now()}] FastAPI app shutdown sequence completed.")
432
 
433
  # --- API Endpoints ---
434
  @app.get("/health", summary="Health Check")
435
  async def health_check():
 
 
 
436
  db_status = "MongoDB not connected."
437
  db_ok = False
438
+ if client and db:
439
  try:
 
440
  await asyncio.to_thread(db.list_collection_names)
441
  db_status = "MongoDB connection OK."
442
  db_ok = True
443
  except Exception as e:
444
  db_status = f"MongoDB connection error: {e}"
445
+
446
+ queue_status = {
447
+ "extract_data_queue_size": extract_data_queue.qsize() if extract_data_queue else "N/A",
448
+ "generate_reply_queue_size": generate_reply_queue.qsize() if generate_reply_queue else "N/A"
449
+ }
450
+
451
  if db_ok:
452
+ return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "queues": queue_status}
453
  else:
454
+ raise HTTPException(status_code=503, detail={"message": "Service unavailable.", "database": db_status, "queues": queue_status})
 
 
 
455
 
456
+ @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data (batched)")
457
+ async def extract_email_data(request: ProcessEmailRequest):
458
+ global extract_data_queue, extract_pending_requests, extracted_emails_collection
459
 
460
+ if not extracted_emails_collection or not extract_data_queue:
461
+ raise HTTPException(status_code=503, detail="Service not available (DB or batch queue).")
462
+
463
+ request_id = uuid4()
464
+ future = asyncio.get_event_loop().create_future()
465
+ extract_pending_requests[request_id] = future
 
 
466
 
467
  try:
468
+ await extract_data_queue.put((request, request_id))
469
+ print(f"[{datetime.now()}] /extract-data: Queued request {request_id}. Queue size: {extract_data_queue.qsize()}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
470
 
471
+ # Wait for the future to be resolved by the worker, with a timeout
472
+ try:
473
+ # Timeout should be configurable, longer than batch interval + processing time
474
+ extracted_data_obj = await asyncio.wait_for(future, timeout=60.0)
475
+ except asyncio.TimeoutError:
476
+ print(f"[{datetime.now()}] /extract-data: Request {request_id} timed out waiting for worker.")
477
+ # The future might still be in extract_pending_requests if worker hasn't processed it
478
+ # Worker will try to pop it; if already popped here, it's fine.
479
+ extract_pending_requests.pop(request_id, None) # Clean up if timed out
480
+ raise HTTPException(status_code=504, detail="Request timed out while awaiting processing in batch.")
481
+
482
+ # If here, extracted_data_obj is the ExtractedData model instance from the worker
483
+ print(f"[{datetime.now()}] /extract-data: Worker processed {request_id}. Inserting to DB.")
484
+
485
+ data_to_insert = extracted_data_obj.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
486
+ if 'appointments' in data_to_insert:
487
+ for appt in data_to_insert['appointments']:
488
+ if isinstance(appt.get('start_date'), date): appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
489
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
490
+ if 'tasks' in data_to_insert:
491
+ for task_item in data_to_insert['tasks']:
492
+ if isinstance(task_item.get('due_date'), date): task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
493
+
494
+ insert_result = await asyncio.to_thread(extracted_emails_collection.insert_one, data_to_insert)
495
+ extracted_data_obj.id = str(insert_result.inserted_id) # Update with DB ID
496
+
497
+ return extracted_data_obj
498
 
499
+ except HTTPException: # Re-raise HTTPExceptions (like timeout or from future)
500
+ raise
501
+ except ValueError as ve: # Typically from Pydantic validation or LLM output parsing
502
+ raise HTTPException(status_code=400, detail=str(ve))
503
+ except Exception as e: # Any other exception from the future or this handler
504
+ traceback.print_exc()
505
+ raise HTTPException(status_code=500, detail=f"Internal server error during extract data: {e}")
506
+ finally:
507
+ # Ensure future is removed if it wasn't already (e.g. successful completion)
508
+ extract_pending_requests.pop(request_id, None)
509
 
 
510
 
511
+ @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate smart reply (batched)")
512
+ async def generate_email_reply(request: GenerateReplyRequest):
513
+ global generate_reply_queue, generate_pending_requests, generated_replies_collection
 
 
 
 
 
 
 
514
 
515
+ if not generated_replies_collection or not generate_reply_queue:
516
+ raise HTTPException(status_code=503, detail="Service not available (DB or batch queue).")
 
 
 
 
 
517
 
518
+ # --- Check cache first (remains outside batching) ---
519
+ cache_query = {"original_email_text": request.email_text, "language": request.language, "length": request.length, "style": request.style, "tone": request.tone, "emoji": request.emoji}
520
+ cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
521
+ if cached_reply_doc:
522
+ print(f"[{datetime.now()}] /generate-reply: Reply found in cache. ID: {str(cached_reply_doc['_id'])}")
523
+ return GenerateReplyResponse(reply=cached_reply_doc["generated_reply_text"], stored_id=str(cached_reply_doc["_id"]), cached=True)
524
 
525
+ # --- If not cached, queue for generation ---
526
+ request_id = uuid4()
527
+ future = asyncio.get_event_loop().create_future()
528
+ generate_pending_requests[request_id] = future
 
529
 
530
+ try:
531
+ await generate_reply_queue.put((request, request_id))
532
+ print(f"[{datetime.now()}] /generate-reply: Queued request {request_id}. Queue size: {generate_reply_queue.qsize()}")
 
533
 
534
+ try:
535
+ # Timeout should be configurable
536
+ reply_content_str = await asyncio.wait_for(future, timeout=60.0)
537
+ except asyncio.TimeoutError:
538
+ print(f"[{datetime.now()}] /generate-reply: Request {request_id} timed out waiting for worker.")
539
+ generate_pending_requests.pop(request_id, None)
540
+ raise HTTPException(status_code=504, detail="Request timed out while awaiting reply generation in batch.")
 
 
 
 
 
 
 
541
 
542
+ # If here, reply_content_str is the generated string from the worker
543
+ print(f"[{datetime.now()}] /generate-reply: Worker generated reply for {request_id}. Storing to DB.")
 
544
 
545
+ reply_data_to_store = GeneratedReplyData(
546
+ original_email_text=request.email_text, generated_reply_text=reply_content_str,
547
+ language=request.language, length=request.length, style=request.style,
548
+ tone=request.tone, emoji=request.emoji
549
+ )
550
+ reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
551
+
552
+ insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
553
+ stored_id = str(insert_result.inserted_id)
554
+
555
+ return GenerateReplyResponse(reply=reply_content_str, stored_id=stored_id, cached=False)
556
 
557
+ except HTTPException:
558
+ raise
 
559
  except Exception as e:
560
+ traceback.print_exc()
561
+ raise HTTPException(status_code=500, detail=f"Internal server error during generate reply: {e}")
562
+ finally:
563
+ generate_pending_requests.pop(request_id, None)
564
 
565
 
566
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query stored extracted email data")
567
  async def query_extracted_emails(query_params: ExtractedEmailQuery = Depends()):
568
+ if extracted_emails_collection is None: raise HTTPException(status_code=503, detail="MongoDB not available.")
 
 
 
 
 
569
  mongo_query = {}
570
+ if query_params.contact_name: mongo_query["$or"] = [{"contacts.name": {"$regex": query_params.contact_name, "$options": "i"}}, {"contacts.last_name": {"$regex": query_params.contact_name, "$options": "i"}}]
571
+ if query_params.appointment_title: mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
572
+ if query_params.task_title: mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
 
 
 
 
 
 
 
 
 
573
  date_query = {}
574
+ if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
575
+ if query_params.to_date: date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
576
+ if date_query: mongo_query["processed_at"] = date_query
 
 
 
 
577
  try:
 
578
  cursor = await asyncio.to_thread(extracted_emails_collection.find, mongo_query)
 
579
  results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
 
 
580
  return [ExtractedData(**doc) for doc in results]
581
+ except Exception as e: traceback.print_exc(); raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
 
 
 
582
 
583
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query stored generated replies")
584
  async def query_generated_replies(query_params: GeneratedReplyQuery = Depends()):
585
+ if generated_replies_collection is None: raise HTTPException(status_code=503, detail="MongoDB not available.")
 
 
 
 
 
586
  mongo_query = {}
587
+ if query_params.language: mongo_query["language"] = query_params.language
588
+ if query_params.style: mongo_query["style"] = query_params.style
589
+ if query_params.tone: mongo_query["tone"] = query_params.tone
 
 
 
 
 
590
  date_query = {}
591
+ if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
592
+ if query_params.to_date: date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time())
593
+ if date_query: mongo_query["generated_at"] = date_query
 
 
 
 
594
  try:
 
595
  cursor = await asyncio.to_thread(generated_replies_collection.find, mongo_query)
 
596
  results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit)))
 
 
597
  return [GeneratedReplyData(**doc) for doc in results]
598
+ except Exception as e: traceback.print_exc(); raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")