precison9 commited on
Commit
efd4eb0
·
verified ·
1 Parent(s): 6e2b097

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +184 -353
flask_Character.py CHANGED
@@ -1,21 +1,12 @@
1
- # This software is licensed under a **dual-license model**
2
- # For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
3
- # Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
4
- import os
5
- # NUMBA_CACHE_DIR and NUMBA_DISABLE_CACHE are often set for specific environments,
6
- # e.g., if you're experiencing issues with Numba's caching behavior or in containerized environments.
7
- # Keep them if they serve a specific purpose in your deployment environment.
8
- os.environ["NUMBA_CACHE_DIR"] = "/tmp/numba_cache"
9
- os.environ["NUMBA_DISABLE_CACHE"] = "1"
10
-
11
  import json
12
  import re
13
  from datetime import date, datetime, timedelta
14
  from typing import List, Optional, Literal, Dict, Any, Tuple
 
15
  import traceback
16
  import asyncio
17
 
18
- from fastapi import FastAPI, HTTPException, Response, Query, Depends, status
19
  from fastapi.responses import FileResponse
20
  from fastapi.exception_handlers import http_exception_handler
21
  from starlette.exceptions import HTTPException as StarletteHTTPException
@@ -23,74 +14,45 @@ from langchain.prompts import PromptTemplate
23
  from langchain_groq import ChatGroq
24
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
25
  from typing_extensions import Annotated
26
- from pydantic_core import core_schema # Import core_schema for direct use in __get_pydantic_json_schema__
27
 
28
  from pymongo import MongoClient
29
  from pymongo.errors import ConnectionFailure, OperationFailure
30
  from bson import ObjectId
31
 
32
  # --- MongoDB Configuration ---
33
- # IMPORTANT: Use environment variables for your MONGO_URI in production for security.
34
- # Example: MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
35
- MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net"
36
  DB_NAME = "email_assistant_db"
37
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
38
- GENERATED_REPLIES_COLLECTION = "generated_replies" # Still defined, but not used by generate-reply logic
39
 
40
- # Global variables for MongoDB client and collections
41
  client: Optional[MongoClient] = None
42
  db: Optional[Any] = None
43
  extracted_emails_collection: Optional[Any] = None
44
- # generated_replies_collection is no longer needed for /generate-reply logic,
45
- # but kept for /query-generated-replies endpoint if that's still desired.
46
  generated_replies_collection: Optional[Any] = None
47
 
48
  # --- Pydantic ObjectId Handling ---
49
  class CustomObjectId(str):
50
- """
51
- Custom Pydantic type for handling MongoDB ObjectIds.
52
- It validates that the input is a valid ObjectId string and
53
- ensures it's represented as a string in JSON Schema.
54
- """
55
  @classmethod
56
  def __get_validators__(cls):
57
  yield cls.validate
58
 
59
  @classmethod
60
- def validate(cls, v):
61
- # Allow None or empty string to pass through for optional fields
62
- # This validator is only called if the field is not None
63
- # Pydantic's Optional[PyObjectId] handles the None case before this validator
64
- if v is None or v == "":
65
- return None
66
-
67
- if not isinstance(v, (str, ObjectId)):
68
- raise ValueError("ObjectId must be a string or ObjectId instance")
69
-
70
- # Convert ObjectId to string if it's already an ObjectId instance
71
- if isinstance(v, ObjectId):
72
- return str(v)
73
-
74
- # Validate string format
75
  if not ObjectId.is_valid(v):
76
- raise ValueError("Invalid ObjectId format")
77
- return cls(v) # Return an instance of CustomObjectId (which is a str subclass)
78
 
79
- # This method is crucial for Pydantic v2 to generate correct OpenAPI schema
80
  @classmethod
81
- def __get_pydantic_json_schema__(
82
- cls, _core_schema: core_schema.CoreSchema, handler
83
- ) -> Dict[str, Any]:
84
- # We tell Pydantic that this custom type should be represented as a standard string
85
- # in the generated JSON Schema (OpenAPI documentation).
86
- json_schema = handler(core_schema.str_schema())
87
- json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" # Add an example for clarity
88
  return json_schema
89
 
90
- # Annotated type for convenience in models
91
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
92
 
93
-
94
  # ---------------------- Models ----------------------
95
  class Contact(BaseModel):
96
  name: str
@@ -112,7 +74,6 @@ class Task(BaseModel):
112
  due_date: date
113
 
114
  class ExtractedData(BaseModel):
115
- # Use PyObjectId for the _id field
116
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
117
  contacts: List[Contact]
118
  appointments: List[Appointment]
@@ -121,19 +82,14 @@ class ExtractedData(BaseModel):
121
  processed_at: datetime = Field(default_factory=datetime.utcnow)
122
 
123
  class Config:
124
- populate_by_name = True # Allow setting 'id' or '_id'
125
- arbitrary_types_allowed = True # Allow CustomObjectId and ObjectId
126
 
127
- # Custom serializer for JSON output to ensure ObjectId is converted to string
128
  @model_serializer(when_used='json')
129
  def serialize_model(self):
130
  data = self.model_dump(by_alias=True, exclude_none=True)
131
- # Ensure _id is a string when serializing to JSON
132
  if "_id" in data and isinstance(data["_id"], ObjectId):
133
  data["_id"] = str(data["_id"])
134
- # Ensure dates are correctly serialized to ISO format if they are date objects
135
- # Pydantic v2 usually handles this automatically for `date` types,
136
- # but explicit conversion can be useful if direct manipulation is expected or for specific formats.
137
  if 'appointments' in data:
138
  for appt in data['appointments']:
139
  if isinstance(appt.get('start_date'), date):
@@ -148,11 +104,11 @@ class ExtractedData(BaseModel):
148
 
149
  class ProcessEmailRequest(BaseModel):
150
  email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
151
- groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY")
152
 
153
  class GenerateReplyRequest(BaseModel):
154
  email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
155
- groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY")
156
  language: Literal["Italian", "English"] = Field("Italian", examples=["Italian", "English"])
157
  length: str = Field("Auto", examples=["Short", "Medium", "Long", "Auto"])
158
  style: str = Field("Professional", examples=["Professional", "Casual", "Formal", "Informal"])
@@ -160,7 +116,6 @@ class GenerateReplyRequest(BaseModel):
160
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
161
 
162
  class GeneratedReplyData(BaseModel):
163
- # Use PyObjectId for the _id field (This model is now only used for the query endpoint)
164
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
165
  original_email_text: str
166
  generated_reply_text: str
@@ -182,12 +137,6 @@ class GeneratedReplyData(BaseModel):
182
  data["_id"] = str(data["_id"])
183
  return data
184
 
185
- # Response Model for /generate-reply endpoint (simplified)
186
- class GenerateReplyResponse(BaseModel):
187
- reply: str = Field(..., description="The AI-generated reply text.")
188
- # 'stored_id' and 'cached' are removed as caching/storage is removed
189
- # from the main generate-reply logic.
190
-
191
  # --- Query Models for GET Endpoints ---
192
  class ExtractedEmailQuery(BaseModel):
193
  contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
@@ -207,44 +156,26 @@ class GeneratedReplyQuery(BaseModel):
207
 
208
  # ---------------------- Utility Functions ----------------------
209
  def extract_last_json_block(text: str) -> Optional[str]:
210
- """
211
- Extracts the last JSON block enclosed in ```json``` from a string,
212
- or a standalone JSON object if no code block is found.
213
- """
214
  pattern = r'```json\s*(.*?)\s*```'
215
  matches = re.findall(pattern, text, re.DOTALL)
216
  if matches:
217
  return matches[-1].strip()
218
- # Fallback: try to find a standalone JSON object
219
  match = re.search(r'\{.*\}', text, re.DOTALL)
220
  if match:
221
  return match.group(0)
222
  return None
223
 
224
  def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
225
- """
226
- Parses a date string, handling 'today', 'tomorrow', and APAC-MM-DD format.
227
- Returns None if input is None or cannot be parsed into a valid date.
228
- """
229
- if not date_str:
230
- return None
231
  date_str_lower = date_str.lower().strip()
232
- if date_str_lower == "today":
233
- return current_date
234
- if date_str_lower == "tomorrow":
235
- return current_date + timedelta(days=1)
236
  try:
237
  return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
238
  except ValueError:
239
- # If parsing fails, return None. The calling function (normalize_llm_output)
240
- # will then decide the default (e.g., current_date).
241
- return None
242
 
243
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
244
- """
245
- Normalizes and validates LLM extracted data into ExtractedData Pydantic model.
246
- Handles defaults for dates and name splitting.
247
- """
248
  def split_name(full_name: str) -> tuple[str, str]:
249
  parts = full_name.strip().split()
250
  name = parts[0] if parts else ""
@@ -258,9 +189,7 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
258
 
259
  appointments_data = []
260
  for a in data.get("appointments", []):
261
- # Default start_date to current_date if not provided or invalid
262
- start_date_val = parse_date(a.get("start_date"), current_date) or current_date
263
- # end_date remains optional
264
  end_date_val = parse_date(a.get("end_date"), current_date)
265
 
266
  appointments_data.append(Appointment(
@@ -271,8 +200,7 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
271
 
272
  tasks_data = []
273
  for t in data.get("tasks", []):
274
- # Default due_date to current_date if not provided or invalid
275
- due_date_val = parse_date(t.get("due_date"), current_date) or current_date
276
  tasks_data.append(Task(
277
  task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
278
  due_date=due_date_val
@@ -281,17 +209,10 @@ def normalize_llm_output(data: dict, current_date: date, original_email_text: st
281
 
282
  # ---------------------- Core Logic (Internal Functions) ----------------------
283
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
284
- """
285
- Internal function to process email text using LLM and extract structured data.
286
- """
287
- if not email_text:
288
- raise ValueError("Email text cannot be empty for processing.")
289
-
290
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
291
-
292
  prompt_today_str = current_date.isoformat()
293
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
294
-
295
  prompt_template_str = f"""
296
  You are an expert email assistant tasked with extracting structured information from an Italian email.
297
 
@@ -313,34 +234,30 @@ Here is the required JSON schema for each category:
313
  Each Appointment object must have:
314
  - `title` (string, short, meaningful title in Italian based on the meeting's purpose)
315
  - `description` (string, summary of the meeting's goal)
316
- - `start_date` (string, APAC-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
317
  - `start_time` (string, optional, e.g., "10:30 AM", null if not present)
318
- - `end_date` (string, APAC-MM-DD, optional, null if unknown or not applicable)
319
  - `end_time` (string, optional, e.g., "11:00 AM", null if not present)
320
 
321
  - **tasks**: List of Task objects.
322
  Each Task object must have:
323
  - `task_title` (string, short summary of action item)
324
  - `task_description` (string, more detailed explanation)
325
- - `due_date` (string, APAC-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
326
 
327
  ---
328
 
329
  Email:
330
  {{email}}
331
  """
332
- prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
333
  chain = prompt_template | llm
334
  try:
335
- llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
336
  llm_output_str = llm_output.content
337
-
338
  json_str = extract_last_json_block(llm_output_str)
339
-
340
- if not json_str:
341
- raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
342
  json_data = json.loads(json_str)
343
-
344
  extracted_data = normalize_llm_output(json_data, current_date, email_text)
345
  return extracted_data
346
  except json.JSONDecodeError as e:
@@ -353,48 +270,34 @@ def _generate_response_internal(
353
  email_text: str, api_key: str, language: Literal["Italian", "English"],
354
  length: str, style: str, tone: str, emoji: str
355
  ) -> str:
356
- """
357
- Internal function to generate a reply to an email using LLM.
358
- """
359
- print(f"[{datetime.now()}] _generate_response_internal: Starting LLM call. API Key starts with: {api_key[:5]}...") # Debug log
360
- if not email_text:
361
- print(f"[{datetime.now()}] _generate_response_internal: Email text is empty.")
362
- return "Cannot generate reply for empty email text."
 
 
 
 
363
 
364
- try:
365
- llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
366
- prompt_template_str="""
367
- You are an assistant that helps reply to emails.
368
-
369
- Create a response to the following email with the following parameters:
370
- - Language: {language}
371
- - Length: {length}
372
- - Style: {style}
373
- - Tone: {tone}
374
- - Emoji usage: {emoji}
375
-
376
- Email:
377
- {email}
378
-
379
- Write only the reply body. Do not repeat the email or mention any instruction.
380
- """
381
- prompt = PromptTemplate(
382
- input_variables=["email", "language", "length", "style", "tone", "emoji"],
383
- template=prompt_template_str
384
- )
385
- chain = prompt | llm
386
- print(f"[{datetime.now()}] _generate_response_internal: Invoking LLM chain...") # Debug log
387
- output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
388
- print(f"[{datetime.now()}] _generate_response_internal: LLM chain returned. Content length: {len(output.content)}.") # Debug log
389
- return output.content.strip()
390
- except Exception as e:
391
- print(f"[{datetime.now()}] _generate_response_internal: ERROR during LLM invocation: {e}") # Debug log
392
- traceback.print_exc() # Print full traceback to logs
393
- raise # Re-raise the exception so it can be caught by handle_single_reply_request
394
 
395
- # --- Batching Configuration (Caching/Storage logic removed) ---
396
  MAX_BATCH_SIZE = 20
397
- BATCH_TIMEOUT = 0.5 # seconds (Adjust based on expected LLM response time and desired latency)
398
 
399
  reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
400
  reply_queue_lock = asyncio.Lock()
@@ -402,101 +305,103 @@ reply_queue_condition = asyncio.Condition(lock=reply_queue_lock)
402
  batch_processor_task: Optional[asyncio.Task] = None
403
 
404
 
405
- # --- Batch Processor and Handler (Simplified) ---
406
  async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future):
407
- """Handles a single request: calls LLM, and sets future with the reply."""
408
- print(f"[{datetime.now()}] Handle single reply: Starting for email_text_start='{request_data.email_text[:50]}'...")
409
  if future.cancelled():
410
- print(f"[{datetime.now()}] Handle single reply: Future cancelled. Aborting.")
411
  return
412
  try:
413
- # Directly call LLM (no cache check or storage)
414
- print(f"[{datetime.now()}] Handle single reply: Calling LLM for reply generation...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415
  reply_content = await asyncio.to_thread(
416
  _generate_response_internal,
417
  request_data.email_text,
418
- request_data.groq_api_key,
419
  request_data.language,
420
  request_data.length,
421
  request_data.style,
422
  request_data.tone,
423
  request_data.emoji
424
  )
425
- print(f"[{datetime.now()}] Handle single reply: LLM call completed. Reply length: {len(reply_content)}.")
426
 
427
- # Simplified response as no storage/cache ID
 
 
 
 
 
 
 
 
 
 
 
 
 
428
  final_response = {
429
  "reply": reply_content,
430
- "stored_id": "N/A - Caching disabled", # Indicate that ID is not available
431
  "cached": False
432
  }
433
- if not future.done():
434
- future.set_result(final_response)
435
- print(f"[{datetime.now()}] Handle single reply: Final result set on future.")
436
 
437
  except Exception as e:
438
- print(f"[{datetime.now()}] Handle single reply: EXCEPTION: {e}")
439
- traceback.print_exc() # Print full traceback to logs
440
  if not future.done():
441
- # Set the exception on the future so the client can catch it
442
- future.set_exception(HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate reply: {e}"))
443
- print(f"[{datetime.now()}] Handle single reply: Exception set on future.")
444
-
445
 
446
  async def process_reply_batches():
447
  """Continuously processes requests from the reply_request_queue in batches."""
448
  global reply_request_queue
449
- print(f"[{datetime.now()}] Batch processor task started.")
450
  while True:
451
  batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
452
  async with reply_queue_condition:
453
  if not reply_request_queue:
454
- print(f"[{datetime.now()}] Batch processor: Queue empty, waiting for requests...")
455
- # Wait for new requests or timeout
456
  await reply_queue_condition.wait()
457
- # After waking up, re-check if queue is still empty
458
  if not reply_request_queue:
459
- print(f"[{datetime.now()}] Batch processor: Woke up, queue still empty. Continuing loop.")
460
  continue
461
 
462
  now = asyncio.get_event_loop().time()
463
- # Safety check: ensure queue is not empty before accessing index 0
464
- if reply_request_queue:
465
- oldest_item_timestamp = reply_request_queue[0][2]
466
- else:
467
- # If queue became empty while waiting, loop again
468
- print(f"[{datetime.now()}] Batch processor: Queue became empty before processing. Restarting loop.")
469
- continue
470
-
471
- print(f"[{datetime.now()}] Batch processor: Woke up. Queue size: {len(reply_request_queue)}. Oldest item age: {now - oldest_item_timestamp:.2f}s")
472
 
473
- # Condition to trigger batch processing: queue is full OR timeout reached for oldest item
474
  if len(reply_request_queue) >= MAX_BATCH_SIZE or \
475
  (now - oldest_item_timestamp >= BATCH_TIMEOUT):
476
  num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
477
  for _ in range(num_to_take):
478
- # Safety check: ensure queue is not empty before popping
479
- if reply_request_queue:
480
- req, fut, _ = reply_request_queue.pop(0)
481
- batch_to_fire.append((req, fut))
482
- print(f"[{datetime.now()}] Batch processor: Firing batch of {len(batch_to_fire)} requests.")
483
  else:
484
- # Calculate time to wait for the next batch or timeout
485
  time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
486
- print(f"[{datetime.now()}] Batch processor: Not enough requests or timeout not reached. Waiting for {time_to_wait:.2f}s.")
487
  try:
488
  await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
489
  except asyncio.TimeoutError:
490
- print(f"[{datetime.now()}] Batch processor: wait timed out.")
491
- pass # Loop will re-evaluate and likely fire the batch
492
 
493
  if batch_to_fire:
494
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
495
- print(f"[{datetime.now()}] Batch processor: Awaiting completion of {len(tasks)} single reply tasks.")
496
  await asyncio.gather(*tasks)
497
- print(f"[{datetime.now()}] Batch processor: Batch processing complete.")
498
  else:
499
- # Short sleep to prevent busy-waiting if queue is empty but not waiting
500
  await asyncio.sleep(0.001)
501
 
502
 
@@ -505,62 +410,53 @@ app = FastAPI(
505
  title="Email Assistant API",
506
  description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
507
  version="1.1.0",
508
- docs_url="/", # Sets Swagger UI to be the root path
509
  redoc_url="/redoc"
510
  )
511
 
512
  # --- Global Exception Handler ---
513
- # Catch Starlette HTTPExceptions (FastAPI uses these internally)
514
  @app.exception_handler(StarletteHTTPException)
515
  async def custom_http_exception_handler_wrapper(request, exc):
516
- """Handles FastAPI's internal HTTP exceptions."""
517
- print(f"[{datetime.now()}] Caught StarletteHTTPException: {exc.status_code} - {exc.detail}")
518
  return await http_exception_handler(request, exc)
519
 
520
- # Catch all other unhandled exceptions
521
  @app.exception_handler(Exception)
522
  async def global_exception_handler_wrapper(request, exc):
523
- """Handles all unhandled exceptions and returns a consistent JSON error response."""
524
- print(f"[{datetime.now()}] Unhandled exception caught by global handler for request: {request.url}")
525
- traceback.print_exc() # Print traceback to console for debugging
526
- # Return a JSON response for consistency, even for unhandled errors
527
- return Response(
528
- content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}),
529
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
530
- media_type="application/json"
531
- )
532
 
533
 
534
  # --- FastAPI Event Handlers for MongoDB & Batch Processor ---
535
  @app.on_event("startup")
536
  async def startup_event():
537
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
538
- print(f"[{datetime.now()}] FastAPI app startup sequence initiated.")
539
  try:
540
- # Connect to MongoDB
 
 
 
 
 
541
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
542
- client.admin.command('ping') # Test connection
543
  db = client[DB_NAME]
544
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
545
- # Keep generated_replies_collection definition if /query-generated-replies is still desired
546
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
547
- print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}")
548
 
549
- # Start the batch processor task if not already running
550
- if batch_processor_task is None or batch_processor_task.done():
551
- batch_processor_task = asyncio.create_task(process_reply_batches())
552
- print(f"[{datetime.now()}] Batch processor task for replies started.")
553
- else:
554
- print(f"[{datetime.now()}] Batch processor task for replies is already running or being initialized.")
555
 
556
  except (ConnectionFailure, OperationFailure) as e:
557
- print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}")
558
  client = None
559
  db = None
560
  extracted_emails_collection = None
561
  generated_replies_collection = None
562
  except Exception as e:
563
- print(f"[{datetime.now()}] ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}")
564
  traceback.print_exc()
565
  client = None
566
  db = None
@@ -570,208 +466,147 @@ async def startup_event():
570
  if client is not None and db is not None:
571
  try:
572
  client.admin.command('ping')
573
- except Exception as e:
574
- print(f"[{datetime.now()}] MongoDB ping failed after initial connection attempt during finally block: {e}")
575
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
576
  else:
577
- print(f"[{datetime.now()}] MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
578
  if client is None or db is None:
579
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
580
- print(f"[{datetime.now()}] FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
581
 
582
 
583
  @app.on_event("shutdown")
584
  async def shutdown_event():
585
  global client, batch_processor_task
586
- print(f"[{datetime.now()}] FastAPI app shutting down.")
587
  if batch_processor_task:
588
  batch_processor_task.cancel()
589
  try:
590
  await batch_processor_task
591
- print(f"[{datetime.now()}] Batch processor task awaited.")
592
  except asyncio.CancelledError:
593
- print(f"[{datetime.now()}] Batch processor task for replies cancelled during shutdown.")
594
  except Exception as e:
595
- print(f"[{datetime.now()}] Error during batch processor task shutdown: {e}")
596
  traceback.print_exc()
597
  batch_processor_task = None
598
 
599
  if client:
600
  client.close()
601
- print(f"[{datetime.now()}] MongoDB client closed.")
602
 
603
 
604
- # --- API Endpoints ---
605
  @app.get("/health", summary="Health Check")
606
  async def health_check():
607
- """
608
- Checks the health of the API, including MongoDB connection and batch processor status.
609
- """
610
- db_status = "MongoDB not connected."
611
  db_ok = False
612
  if client is not None and db is not None:
613
  try:
614
- # Use asyncio.to_thread for blocking MongoDB call
615
- await asyncio.to_thread(db.list_collection_names)
616
  db_status = "MongoDB connection OK."
617
  db_ok = True
618
  except Exception as e:
619
  db_status = f"MongoDB connection error: {e}"
620
- db_ok = False
621
 
622
- batch_processor_status = "Batch processor not running."
623
- if batch_processor_task is not None:
624
  if not batch_processor_task.done():
625
  batch_processor_status = "Batch processor is running."
626
  else:
627
- if batch_processor_task.exception():
628
- batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}"
629
- else:
630
- batch_processor_status = "Batch processor task is done (may have completed or cancelled)."
631
- else:
632
- batch_processor_status = "Batch processor task has not been initialized."
633
-
634
  if db_ok:
635
  return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
636
  else:
637
  raise HTTPException(
638
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
639
- detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status}
640
  )
641
 
642
 
643
  @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
644
  async def extract_email_data(request: ProcessEmailRequest):
645
- """
646
- Receives an email, extracts contacts, appointments, and tasks using an LLM,
647
- and stores the extracted data in MongoDB.
648
- """
649
- print(f"[{datetime.now()}] /extract-data: Received request.")
650
  if extracted_emails_collection is None:
651
- print(f"[{datetime.now()}] /extract-data: MongoDB collection is None.")
652
- raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.")
653
  try:
654
  current_date_val = date.today()
655
- print(f"[{datetime.now()}] /extract-data: Calling internal processing function.")
656
  extracted_data = await asyncio.to_thread(
657
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
658
  )
659
- print(f"[{datetime.now()}] /extract-data: Internal processing complete. Preparing for DB insert.")
660
-
661
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
662
- # Convert date objects to datetime for MongoDB storage if they are just date objects
663
- # Pydantic's default `date` handling might serialize to ISO string, but for
664
- # internal MongoDB storage, sometimes `datetime` is preferred for consistency.
665
- if 'appointments' in extracted_data_dict:
666
- for appt in extracted_data_dict['appointments']:
667
- if isinstance(appt.get('start_date'), date):
668
- appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
669
- if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
670
- appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
671
- if 'tasks' in extracted_data_dict:
672
- for task_item in extracted_data_dict['tasks']:
673
- if isinstance(task_item.get('due_date'), date):
674
- task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
675
 
676
- print(f"[{datetime.now()}] /extract-data: Inserting into MongoDB...")
677
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
678
- print(f"[{datetime.now()}] /extract-data: Data inserted into MongoDB. ID: {result.inserted_id}")
679
-
680
- extracted_data.id = result.inserted_id
681
  return extracted_data
682
  except ValueError as e:
683
- print(f"[{datetime.now()}] /extract-data: ValueError: {e}")
684
- raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
685
  except Exception as e:
686
- print(f"[{datetime.now()}] /extract-data: Unhandled Exception: {e}")
687
  traceback.print_exc()
688
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during data extraction: {e}")
689
 
690
 
691
  @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
692
  async def extract_email_data_excel(request: ProcessEmailRequest):
693
- """
694
- Placeholder for future functionality to extract data and provide as an Excel download.
695
- Currently disabled.
696
- """
697
- raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Excel functionality is currently disabled.")
698
 
699
 
700
- @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email (batched)")
701
  async def generate_email_reply(request: GenerateReplyRequest):
702
- """
703
- Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
704
- Uses a batch processing system. Caching and database storage for replies are disabled.
705
- """
706
- print(f"[{datetime.now()}] /generate-reply: Received request.")
707
- # generated_replies_collection check is no longer relevant for this endpoint's logic
708
- if batch_processor_task is None or reply_queue_condition is None:
709
- print(f"[{datetime.now()}] /generate-reply: Service not fully initialized. batch_task={batch_processor_task is not None}, queue_cond={reply_queue_condition is not None}")
710
- raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Reply generation service not fully initialized. Check server logs for batch processor issues.")
711
 
712
  future = asyncio.Future()
713
  current_time = asyncio.get_event_loop().time()
714
 
715
  async with reply_queue_condition:
716
  reply_request_queue.append((request, future, current_time))
717
- reply_queue_condition.notify() # Notify the batch processor that a new request is available
718
- print(f"[{datetime.now()}] /generate-reply: Request added to queue, notifying batch processor. Queue size: {len(reply_request_queue)}")
719
 
720
  try:
721
- # Debugging: Use a very long timeout for now to ensure server-side logs complete
722
- client_timeout = BATCH_TIMEOUT + 300.0 # 5 minutes (0.5s batch + 300s buffer)
723
- print(f"[{datetime.now()}] /generate-reply: Waiting for future result with timeout {client_timeout}s.")
724
  result = await asyncio.wait_for(future, timeout=client_timeout)
725
- print(f"[{datetime.now()}] /generate-reply: Future result received. Returning data.")
726
  return result
727
  except asyncio.TimeoutError:
728
- print(f"[{datetime.now()}] /generate-reply: Client timeout waiting for future after {client_timeout}s. Future done: {future.done()}")
729
  if not future.done():
730
- future.cancel() # Cancel if it's still pending
731
- raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long. Check server logs for more details.")
732
  except Exception as e:
733
  if isinstance(e, HTTPException):
734
- print(f"[{datetime.now()}] /generate-reply: Caught HTTPException: {e.status_code} - {e.detail}")
735
- raise e # Re-raise FastAPI HTTPExceptions
736
- print(f"[{datetime.now()}] /generate-reply: Unhandled Exception: {e}")
737
  traceback.print_exc()
738
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error processing your reply request: {str(e)}. Check server logs for more details.")
739
 
740
 
741
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
742
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
743
- print(f"[{datetime.now()}] /query-extracted-emails: Received request with params: {query_params.model_dump_json()}")
744
  if extracted_emails_collection is None:
745
- print(f"[{datetime.now()}] /query-extracted-emails: MongoDB collection is None.")
746
- raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying extracted emails.")
747
  mongo_query: Dict[str, Any] = {}
748
- if query_params.contact_name:
749
- mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"} # Case-insensitive regex
750
- if query_params.appointment_title:
751
- mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
752
- if query_params.task_title:
753
- mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
754
 
755
  if query_params.from_date or query_params.to_date:
756
  date_query: Dict[str, datetime] = {}
757
- if query_params.from_date:
758
- date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
759
- if query_params.to_date:
760
- # Query up to the end of the 'to_date' day
761
- date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
762
- if date_query :
763
- mongo_query["processed_at"] = date_query
764
- print(f"[{datetime.now()}] /query-extracted-emails: MongoDB query built: {mongo_query}")
765
 
766
  try:
767
- # Use await asyncio.to_thread for blocking MongoDB operations
768
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
769
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
770
- print(f"[{datetime.now()}] /query-extracted-emails: Found {len(extracted_docs_raw)} documents.")
771
-
772
  results = []
773
  for doc_raw in extracted_docs_raw:
774
- # Convert datetime objects back to date for Pydantic model validation if necessary
 
 
 
775
  if 'appointments' in doc_raw:
776
  for appt in doc_raw['appointments']:
777
  if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
@@ -780,21 +615,16 @@ async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = De
780
  for task_item in doc_raw['tasks']:
781
  if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
782
  results.append(ExtractedData(**doc_raw))
783
- print(f"[{datetime.now()}] /query-extracted-emails: Returning {len(results)} results.")
784
  return results
785
  except Exception as e:
786
- print(f"[{datetime.now()}] /query-extracted-emails: Unhandled Exception during query: {e}")
787
  traceback.print_exc()
788
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying extracted emails: {e}")
789
 
790
 
791
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
792
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
793
- print(f"[{datetime.now()}] /query-generated-replies: Received request with params: {query_params.model_dump_json()}")
794
- # This endpoint still relies on `generated_replies_collection`
795
  if generated_replies_collection is None:
796
- print(f"[{datetime.now()}] /query-generated-replies: MongoDB collection is None.")
797
- raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying generated replies.")
798
  mongo_query: Dict[str, Any] = {}
799
  if query_params.language: mongo_query["language"] = query_params.language
800
  if query_params.style: mongo_query["style"] = query_params.style
@@ -802,25 +632,26 @@ async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = D
802
 
803
  if query_params.from_date or query_params.to_date:
804
  date_query: Dict[str, datetime] = {}
805
- if query_params.from_date:
806
- date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
807
- if query_params.to_date:
808
- date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
809
- if date_query:
810
- mongo_query["generated_at"] = date_query
811
- print(f"[{datetime.now()}] /query-generated-replies: MongoDB query built: {mongo_query}")
812
-
813
  try:
814
- # Use await asyncio.to_thread for blocking MongoDB operations
815
  cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
816
  generated_docs_raw = await asyncio.to_thread(list, cursor)
817
- print(f"[{datetime.now()}] /query-generated-replies: Found {len(generated_docs_raw)} documents.")
818
  results = []
819
  for doc_raw in generated_docs_raw:
 
 
820
  results.append(GeneratedReplyData(**doc_raw))
821
- print(f"[{datetime.now()}] /query-generated-replies: Returning {len(results)} results.")
822
  return results
823
  except Exception as e:
824
- print(f"[{datetime.now()}] /query-generated-replies: Unhandled Exception during query: {e}")
825
  traceback.print_exc()
826
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying generated replies: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import json
2
  import re
3
  from datetime import date, datetime, timedelta
4
  from typing import List, Optional, Literal, Dict, Any, Tuple
5
+ import os
6
  import traceback
7
  import asyncio
8
 
9
+ from fastapi import FastAPI, HTTPException, Response, Query, Depends
10
  from fastapi.responses import FileResponse
11
  from fastapi.exception_handlers import http_exception_handler
12
  from starlette.exceptions import HTTPException as StarletteHTTPException
 
14
  from langchain_groq import ChatGroq
15
  from pydantic import BaseModel, Field, BeforeValidator, model_serializer
16
  from typing_extensions import Annotated
17
+ import uvicorn
18
 
19
  from pymongo import MongoClient
20
  from pymongo.errors import ConnectionFailure, OperationFailure
21
  from bson import ObjectId
22
 
23
  # --- MongoDB Configuration ---
24
+ # Load from environment variables for secure deployment on Hugging Face
25
+ MONGO_URI = os.getenv("MONGO_URI", "mongodb+srv://user:pass@cluster.mongodb.net/dbname")
 
26
  DB_NAME = "email_assistant_db"
27
  EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
28
+ GENERATED_REPLIES_COLLECTION = "generated_replies"
29
 
 
30
  client: Optional[MongoClient] = None
31
  db: Optional[Any] = None
32
  extracted_emails_collection: Optional[Any] = None
 
 
33
  generated_replies_collection: Optional[Any] = None
34
 
35
  # --- Pydantic ObjectId Handling ---
36
  class CustomObjectId(str):
 
 
 
 
 
37
  @classmethod
38
  def __get_validators__(cls):
39
  yield cls.validate
40
 
41
  @classmethod
42
+ def validate(cls, v, info):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  if not ObjectId.is_valid(v):
44
+ raise ValueError("Invalid ObjectId")
45
+ return str(v)
46
 
 
47
  @classmethod
48
+ def __get_pydantic_json_schema__(cls, core_schema, handler):
49
+ json_schema = handler(core_schema)
50
+ json_schema["type"] = "string"
51
+ json_schema["example"] = "60c728ef238b9c7b9e0f6c2a"
 
 
 
52
  return json_schema
53
 
 
54
  PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
55
 
 
56
  # ---------------------- Models ----------------------
57
  class Contact(BaseModel):
58
  name: str
 
74
  due_date: date
75
 
76
  class ExtractedData(BaseModel):
 
77
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
78
  contacts: List[Contact]
79
  appointments: List[Appointment]
 
82
  processed_at: datetime = Field(default_factory=datetime.utcnow)
83
 
84
  class Config:
85
+ populate_by_name = True
86
+ arbitrary_types_allowed = True
87
 
 
88
  @model_serializer(when_used='json')
89
  def serialize_model(self):
90
  data = self.model_dump(by_alias=True, exclude_none=True)
 
91
  if "_id" in data and isinstance(data["_id"], ObjectId):
92
  data["_id"] = str(data["_id"])
 
 
 
93
  if 'appointments' in data:
94
  for appt in data['appointments']:
95
  if isinstance(appt.get('start_date'), date):
 
104
 
105
  class ProcessEmailRequest(BaseModel):
106
  email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
107
+ groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") # Should be handled securely, see notes below
108
 
109
  class GenerateReplyRequest(BaseModel):
110
  email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
111
+ groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") # Should be handled securely, see notes below
112
  language: Literal["Italian", "English"] = Field("Italian", examples=["Italian", "English"])
113
  length: str = Field("Auto", examples=["Short", "Medium", "Long", "Auto"])
114
  style: str = Field("Professional", examples=["Professional", "Casual", "Formal", "Informal"])
 
116
  emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
117
 
118
  class GeneratedReplyData(BaseModel):
 
119
  id: Optional[PyObjectId] = Field(alias="_id", default=None)
120
  original_email_text: str
121
  generated_reply_text: str
 
137
  data["_id"] = str(data["_id"])
138
  return data
139
 
 
 
 
 
 
 
140
  # --- Query Models for GET Endpoints ---
141
  class ExtractedEmailQuery(BaseModel):
142
  contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
 
156
 
157
  # ---------------------- Utility Functions ----------------------
158
  def extract_last_json_block(text: str) -> Optional[str]:
 
 
 
 
159
  pattern = r'```json\s*(.*?)\s*```'
160
  matches = re.findall(pattern, text, re.DOTALL)
161
  if matches:
162
  return matches[-1].strip()
 
163
  match = re.search(r'\{.*\}', text, re.DOTALL)
164
  if match:
165
  return match.group(0)
166
  return None
167
 
168
  def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
169
+ if not date_str: return None
 
 
 
 
 
170
  date_str_lower = date_str.lower().strip()
171
+ if date_str_lower == "today": return current_date
172
+ if date_str_lower == "tomorrow": return current_date + timedelta(days=1)
 
 
173
  try:
174
  return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
175
  except ValueError:
176
+ return current_date
 
 
177
 
178
  def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
 
 
 
 
179
  def split_name(full_name: str) -> tuple[str, str]:
180
  parts = full_name.strip().split()
181
  name = parts[0] if parts else ""
 
189
 
190
  appointments_data = []
191
  for a in data.get("appointments", []):
192
+ start_date_val = parse_date(a.get("start_date", "today"), current_date) or current_date
 
 
193
  end_date_val = parse_date(a.get("end_date"), current_date)
194
 
195
  appointments_data.append(Appointment(
 
200
 
201
  tasks_data = []
202
  for t in data.get("tasks", []):
203
+ due_date_val = parse_date(t.get("due_date", "today"), current_date) or current_date
 
204
  tasks_data.append(Task(
205
  task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
206
  due_date=due_date_val
 
209
 
210
  # ---------------------- Core Logic (Internal Functions) ----------------------
211
  def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
212
+ if not email_text: raise ValueError("Email text cannot be empty for processing.")
 
 
 
 
 
213
  llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
 
214
  prompt_today_str = current_date.isoformat()
215
  prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
 
216
  prompt_template_str = f"""
217
  You are an expert email assistant tasked with extracting structured information from an Italian email.
218
 
 
234
  Each Appointment object must have:
235
  - `title` (string, short, meaningful title in Italian based on the meeting's purpose)
236
  - `description` (string, summary of the meeting's goal)
237
+ - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
238
  - `start_time` (string, optional, e.g., "10:30 AM", null if not present)
239
+ - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable)
240
  - `end_time` (string, optional, e.g., "11:00 AM", null if not present)
241
 
242
  - **tasks**: List of Task objects.
243
  Each Task object must have:
244
  - `task_title` (string, short summary of action item)
245
  - `task_description` (string, more detailed explanation)
246
+ - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
247
 
248
  ---
249
 
250
  Email:
251
  {{email}}
252
  """
253
+ prompt_template = PromptTemplate(input_variables=["email"], template=prompt_template_str) # Removed prompt_today_str and prompt_tomorrow_str as they are in the template string
254
  chain = prompt_template | llm
255
  try:
256
+ llm_output = chain.invoke({"email": email_text})
257
  llm_output_str = llm_output.content
 
258
  json_str = extract_last_json_block(llm_output_str)
259
+ if not json_str: raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
 
 
260
  json_data = json.loads(json_str)
 
261
  extracted_data = normalize_llm_output(json_data, current_date, email_text)
262
  return extracted_data
263
  except json.JSONDecodeError as e:
 
270
  email_text: str, api_key: str, language: Literal["Italian", "English"],
271
  length: str, style: str, tone: str, emoji: str
272
  ) -> str:
273
+ if not email_text: return "Cannot generate reply for empty email text."
274
+ llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
275
+ prompt_template_str="""
276
+ You are an assistant that helps reply to emails.
277
+
278
+ Create a response to the following email with the following parameters:
279
+ - Language: {language}
280
+ - Length: {length}
281
+ - Style: {style}
282
+ - Tone: {tone}
283
+ - Emoji usage: {emoji}
284
 
285
+ Email:
286
+ {email}
287
+
288
+ Write only the reply body. Do not repeat the email or mention any instruction.
289
+ """
290
+ prompt = PromptTemplate(
291
+ input_variables=["email", "language", "length", "style", "tone", "emoji"],
292
+ template=prompt_template_str
293
+ )
294
+ chain = prompt | llm
295
+ output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
296
+ return output.content.strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
297
 
298
+ # --- Batching and Caching Configuration ---
299
  MAX_BATCH_SIZE = 20
300
+ BATCH_TIMEOUT = 0.5
301
 
302
  reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
303
  reply_queue_lock = asyncio.Lock()
 
305
  batch_processor_task: Optional[asyncio.Task] = None
306
 
307
 
308
+ # --- Batch Processor and Handler ---
309
  async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future):
310
+ """Handles a single request: checks cache, calls LLM, stores result, and sets future."""
 
311
  if future.cancelled():
 
312
  return
313
  try:
314
+ if generated_replies_collection is None:
315
+ raise HTTPException(status_code=503, detail="Database service not available for caching/storage.")
316
+
317
+ cache_query = {
318
+ "original_email_text": request_data.email_text,
319
+ "language": request_data.language,
320
+ "length": request_data.length,
321
+ "style": request_data.style,
322
+ "tone": request_data.tone,
323
+ "emoji": request_data.emoji,
324
+ }
325
+ # Use asyncio.to_thread for blocking MongoDB operations
326
+ cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
327
+
328
+ if cached_reply_doc:
329
+ response = {
330
+ "reply": cached_reply_doc["generated_reply_text"],
331
+ "stored_id": str(cached_reply_doc["_id"]),
332
+ "cached": True
333
+ }
334
+ if not future.done(): future.set_result(response)
335
+ return
336
+
337
  reply_content = await asyncio.to_thread(
338
  _generate_response_internal,
339
  request_data.email_text,
340
+ request_data.groq_api_key, # Groq API key is passed here
341
  request_data.language,
342
  request_data.length,
343
  request_data.style,
344
  request_data.tone,
345
  request_data.emoji
346
  )
 
347
 
348
+ reply_data_to_store = GeneratedReplyData(
349
+ original_email_text=request_data.email_text,
350
+ generated_reply_text=reply_content,
351
+ language=request_data.language,
352
+ length=request_data.length,
353
+ style=request_data.style,
354
+ tone=request_data.tone,
355
+ emoji=request_data.emoji
356
+ )
357
+ reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
358
+
359
+ insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
360
+ stored_id = str(insert_result.inserted_id)
361
+
362
  final_response = {
363
  "reply": reply_content,
364
+ "stored_id": stored_id,
365
  "cached": False
366
  }
367
+ if not future.done(): future.set_result(final_response)
 
 
368
 
369
  except Exception as e:
370
+ traceback.print_exc()
 
371
  if not future.done():
372
+ future.set_exception(e)
 
 
 
373
 
374
  async def process_reply_batches():
375
  """Continuously processes requests from the reply_request_queue in batches."""
376
  global reply_request_queue
 
377
  while True:
378
  batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
379
  async with reply_queue_condition:
380
  if not reply_request_queue:
 
 
381
  await reply_queue_condition.wait()
 
382
  if not reply_request_queue:
 
383
  continue
384
 
385
  now = asyncio.get_event_loop().time()
386
+ oldest_item_timestamp = reply_request_queue[0][2]
 
 
 
 
 
 
 
 
387
 
 
388
  if len(reply_request_queue) >= MAX_BATCH_SIZE or \
389
  (now - oldest_item_timestamp >= BATCH_TIMEOUT):
390
  num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
391
  for _ in range(num_to_take):
392
+ req, fut, _ = reply_request_queue.pop(0)
393
+ batch_to_fire.append((req, fut))
 
 
 
394
  else:
 
395
  time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
 
396
  try:
397
  await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
398
  except asyncio.TimeoutError:
399
+ pass
 
400
 
401
  if batch_to_fire:
402
  tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
 
403
  await asyncio.gather(*tasks)
 
404
  else:
 
405
  await asyncio.sleep(0.001)
406
 
407
 
 
410
  title="Email Assistant API",
411
  description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
412
  version="1.1.0",
413
+ docs_url="/",
414
  redoc_url="/redoc"
415
  )
416
 
417
  # --- Global Exception Handler ---
 
418
  @app.exception_handler(StarletteHTTPException)
419
  async def custom_http_exception_handler_wrapper(request, exc):
 
 
420
  return await http_exception_handler(request, exc)
421
 
 
422
  @app.exception_handler(Exception)
423
  async def global_exception_handler_wrapper(request, exc):
424
+ print(f"Unhandled exception caught by global handler for request: {request.url}")
425
+ traceback.print_exc()
426
+ return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}"}), status_code=500, media_type="application/json")
 
 
 
 
 
 
427
 
428
 
429
  # --- FastAPI Event Handlers for MongoDB & Batch Processor ---
430
  @app.on_event("startup")
431
  async def startup_event():
432
  global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
 
433
  try:
434
+ # Check if MONGO_URI is set before attempting connection
435
+ if not MONGO_URI or MONGO_URI == "mongodb+srv://user:pass@cluster.mongodb.net/dbname":
436
+ print("WARNING: MONGO_URI environment variable not set or using default. MongoDB connection will fail.")
437
+ # Optionally raise an exception here or set a flag to disable DB functionality
438
+ # For now, we'll let the connection attempt proceed and catch its failure.
439
+
440
  client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
441
+ client.admin.command('ping')
442
  db = client[DB_NAME]
443
  extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
 
444
  generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
445
+ print(f"Successfully connected to MongoDB: {DB_NAME}")
446
 
447
+ if batch_processor_task is None:
448
+ loop = asyncio.get_event_loop()
449
+ batch_processor_task = loop.create_task(process_reply_batches())
450
+ print("Batch processor task for replies started.")
 
 
451
 
452
  except (ConnectionFailure, OperationFailure) as e:
453
+ print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
454
  client = None
455
  db = None
456
  extracted_emails_collection = None
457
  generated_replies_collection = None
458
  except Exception as e:
459
+ print(f"ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}")
460
  traceback.print_exc()
461
  client = None
462
  db = None
 
466
  if client is not None and db is not None:
467
  try:
468
  client.admin.command('ping')
469
+ except Exception:
470
+ print("MongoDB ping failed after initial connection attempt during finally block.")
471
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
472
  else:
473
+ print("MongoDB client or db object is None after connection attempt in startup.")
474
  if client is None or db is None:
475
  client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
476
+ print("FastAPI app starting up. MongoDB client & Batch Processor initialization attempted.")
477
 
478
 
479
  @app.on_event("shutdown")
480
  async def shutdown_event():
481
  global client, batch_processor_task
 
482
  if batch_processor_task:
483
  batch_processor_task.cancel()
484
  try:
485
  await batch_processor_task
 
486
  except asyncio.CancelledError:
487
+ print("Batch processor task for replies cancelled.")
488
  except Exception as e:
489
+ print(f"Error during batch processor task shutdown: {e}")
490
  traceback.print_exc()
491
  batch_processor_task = None
492
 
493
  if client:
494
  client.close()
495
+ print("FastAPI app shutting down. MongoDB client closed.")
496
 
497
 
 
498
  @app.get("/health", summary="Health Check")
499
  async def health_check():
500
+ db_status = "MongoDB not connected. Check server startup logs."
 
 
 
501
  db_ok = False
502
  if client is not None and db is not None:
503
  try:
504
+ db.list_collection_names()
 
505
  db_status = "MongoDB connection OK."
506
  db_ok = True
507
  except Exception as e:
508
  db_status = f"MongoDB connection error: {e}"
 
509
 
510
+ batch_processor_status = "Batch processor not running or state unknown."
511
+ if batch_processor_task is not None :
512
  if not batch_processor_task.done():
513
  batch_processor_status = "Batch processor is running."
514
  else:
515
+ batch_processor_status = "Batch processor task is done (may have completed or errored)."
516
+
 
 
 
 
 
517
  if db_ok:
518
  return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
519
  else:
520
  raise HTTPException(
521
+ status_code=503,
522
+ detail={"message": "Service unavailable.", "database": db_status, "batch_processor": batch_processor_status}
523
  )
524
 
525
 
526
  @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
527
  async def extract_email_data(request: ProcessEmailRequest):
 
 
 
 
 
528
  if extracted_emails_collection is None:
529
+ raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails. Check server logs for connection errors.")
 
530
  try:
531
  current_date_val = date.today()
 
532
  extracted_data = await asyncio.to_thread(
533
  _process_email_internal, request.email_text, request.groq_api_key, current_date_val
534
  )
 
 
535
  extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
536
+ # Convert date objects to datetime for MongoDB storage
537
+ for appt in extracted_data_dict.get('appointments', []):
538
+ if isinstance(appt.get('start_date'), date): appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
539
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
540
+ for task_item in extracted_data_dict.get('tasks', []):
541
+ if isinstance(task_item.get('due_date'), date): task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
 
 
 
 
 
 
 
542
 
 
543
  result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
544
+ extracted_data.id = str(result.inserted_id) if isinstance(result.inserted_id, ObjectId) else result.inserted_id
 
 
545
  return extracted_data
546
  except ValueError as e:
547
+ raise HTTPException(status_code=400, detail=str(e))
 
548
  except Exception as e:
 
549
  traceback.print_exc()
550
+ raise HTTPException(status_code=500, detail=f"Internal server error during data extraction: {e}")
551
 
552
 
553
  @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
554
  async def extract_email_data_excel(request: ProcessEmailRequest):
555
+ raise HTTPException(status_code=501, detail="Excel functionality is currently disabled.")
 
 
 
 
556
 
557
 
558
+ @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
559
  async def generate_email_reply(request: GenerateReplyRequest):
560
+ if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
561
+ raise HTTPException(status_code=503, detail="Reply generation service not fully initialized. Check server logs.")
 
 
 
 
 
 
 
562
 
563
  future = asyncio.Future()
564
  current_time = asyncio.get_event_loop().time()
565
 
566
  async with reply_queue_condition:
567
  reply_request_queue.append((request, future, current_time))
568
+ reply_queue_condition.notify()
 
569
 
570
  try:
571
+ client_timeout = BATCH_TIMEOUT + 10.0
 
 
572
  result = await asyncio.wait_for(future, timeout=client_timeout)
 
573
  return result
574
  except asyncio.TimeoutError:
 
575
  if not future.done():
576
+ future.cancel()
577
+ raise HTTPException(status_code=504, detail=f"Request timed out after {client_timeout}s waiting for batch processing.")
578
  except Exception as e:
579
  if isinstance(e, HTTPException):
580
+ raise e
 
 
581
  traceback.print_exc()
582
+ raise HTTPException(status_code=500, detail=f"Error processing your reply request: {str(e)}")
583
 
584
 
585
  @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
586
  async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
 
587
  if extracted_emails_collection is None:
588
+ raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails. Check server logs for connection errors.")
 
589
  mongo_query: Dict[str, Any] = {}
590
+ if query_params.contact_name: mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"}
591
+ if query_params.appointment_title: mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
592
+ if query_params.task_title: mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
 
 
 
593
 
594
  if query_params.from_date or query_params.to_date:
595
  date_query: Dict[str, datetime] = {}
596
+ if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
597
+ if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
598
+ if date_query : mongo_query["processed_at"] = date_query
 
 
 
 
 
599
 
600
  try:
 
601
  cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
602
  extracted_docs_raw = await asyncio.to_thread(list, cursor)
603
+
 
604
  results = []
605
  for doc_raw in extracted_docs_raw:
606
+ if isinstance(doc_raw.get("_id"), ObjectId):
607
+ doc_raw["_id"] = str(doc_raw["_id"])
608
+
609
+ # Convert datetime objects back to date objects for Pydantic model fields that are `date`
610
  if 'appointments' in doc_raw:
611
  for appt in doc_raw['appointments']:
612
  if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
 
615
  for task_item in doc_raw['tasks']:
616
  if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
617
  results.append(ExtractedData(**doc_raw))
 
618
  return results
619
  except Exception as e:
 
620
  traceback.print_exc()
621
+ raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
622
 
623
 
624
  @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
625
  async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
 
 
626
  if generated_replies_collection is None:
627
+ raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies. Check server logs for connection errors.")
 
628
  mongo_query: Dict[str, Any] = {}
629
  if query_params.language: mongo_query["language"] = query_params.language
630
  if query_params.style: mongo_query["style"] = query_params.style
 
632
 
633
  if query_params.from_date or query_params.to_date:
634
  date_query: Dict[str, datetime] = {}
635
+ if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
636
+ if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
637
+ if date_query: mongo_query["generated_at"] = date_query
638
+
 
 
 
 
639
  try:
 
640
  cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
641
  generated_docs_raw = await asyncio.to_thread(list, cursor)
 
642
  results = []
643
  for doc_raw in generated_docs_raw:
644
+ if isinstance(doc_raw.get("_id"), ObjectId):
645
+ doc_raw["_id"] = str(doc_raw["_id"])
646
  results.append(GeneratedReplyData(**doc_raw))
 
647
  return results
648
  except Exception as e:
 
649
  traceback.print_exc()
650
+ raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")
651
+
652
+ # --- Main execution for Hugging Face Spaces ---
653
+ # Hugging Face Spaces expects the `app` object to be directly available.
654
+ # The `if __name__ == "__main__":` block is usually not needed for deployment.
655
+ # However, for local testing, you can keep it or use `uvicorn app:app --host 0.0.0.0 --port 8000`
656
+ # if __name__ == "__main__":
657
+ # uvicorn.run(app, host="0.0.0.0", port=8000)