precison9 commited on
Commit
e99343e
·
verified ·
1 Parent(s): 66aaed6

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +839 -123
flask_Character.py CHANGED
@@ -1,150 +1,866 @@
 
 
 
1
  import os
2
- import logging
3
- from fastapi import FastAPI, Response, status
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  from pymongo import MongoClient
5
- from pymongo.errors import ConfigurationError, ConnectionFailure
6
- import time # For simulating a batch processor initialization delay
7
-
8
- # --- 1. Configuration and Global Variables ---
9
- # Use a proper logger for better debugging and visibility
10
- logger = logging.getLogger(__name__)
11
- logger.setLevel(logging.INFO) # Set desired logging level (INFO, DEBUG, WARNING, ERROR, CRITICAL)
12
- handler = logging.StreamHandler()
13
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
14
- handler.setFormatter(formatter)
15
- logger.addHandler(handler)
16
-
17
- # Retrieve MONGO_URI from environment variable. If not set, it will be None.
18
- MONGO_URI = os.getenv("MONGO_URI")
19
- DB_NAME = "email_assistant_db" # Replace with your actual database name
20
-
21
- # Global variables to hold MongoDB client and database instance
22
- # These will be initialized in the startup event
23
- mongo_client = None
24
- mongo_db = None
25
-
26
- # Placeholder for your Batch Processor (if applicable)
27
- batch_processor_initialized = False
28
-
29
- # --- 2. FastAPI Application Instance ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  app = FastAPI(
31
- title="Character API",
32
- description="API for managing character data and processing emails.",
33
- version="0.0.1",
 
 
34
  )
35
 
36
- # --- 3. Application Startup Event Handler ---
37
- @app.on_event("startup")
38
- async def startup_event():
39
- global mongo_client, mongo_db, batch_processor_initialized
 
 
 
40
 
41
- logger.info("Application startup initiated.")
 
 
 
 
 
 
 
 
 
 
 
42
 
43
- # Check for MONGO_URI BEFORE attempting connection
44
- if not MONGO_URI:
45
- logger.critical("CRITICAL ERROR: MONGO_URI environment variable is NOT SET.")
46
- logger.critical("Please set the MONGO_URI environment variable with your MongoDB Atlas connection string.")
47
- logger.critical("Example: mongodb+srv://<user>:<pass>@cluster.mongodb.net/dbname?retryWrites=true&w=majority")
48
- # Depending on your application's criticality, you might want to exit or raise an exception here
49
- # raise ValueError("MONGO_URI environment variable is not set. Cannot connect to MongoDB.")
50
- return # Prevent further execution if critical dependency is missing
51
 
 
 
 
 
 
52
  try:
53
- logger.info("Attempting to connect to MongoDB...")
54
- # Initialize MongoClient
55
- client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) # 5-second timeout for server selection
56
-
57
- # The ismaster command is a lightweight way to check connectivity and auth.
58
- # It triggers the actual connection attempt and DNS resolution.
59
- client.admin.command('ismaster')
60
-
61
- mongo_client = client
62
- mongo_db = client[DB_NAME]
63
- logger.info(f"SUCCESS: Successfully connected to MongoDB database: '{DB_NAME}'")
64
-
65
- # --- Initialize Batch Processor (if applicable) ---
66
- logger.info("Initializing Batch Processor...")
67
- # Simulate some work
68
- time.sleep(1) # Simulate delay for batch processor setup
69
- batch_processor_initialized = True
70
- logger.info("Batch Processor initialized successfully.")
71
-
72
- except ConfigurationError as e:
73
- logger.critical(f"FATAL ERROR: MongoDB Configuration Error: {e}")
74
- logger.critical("Please check your MONGO_URI format and ensure the hostname is correct and resolvable.")
75
- mongo_client = None
76
- mongo_db = None
77
- except ConnectionFailure as e:
78
- logger.critical(f"FATAL ERROR: MongoDB Connection Failure: {e}")
79
- logger.critical("Check network connectivity, MongoDB Atlas IP access list, and firewall rules.")
80
- logger.critical("Also verify that your MongoDB user credentials are correct.")
81
- mongo_client = None
82
- mongo_db = None
83
- except Exception as e:
84
- logger.critical(f"FATAL ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}", exc_info=True)
85
- mongo_client = None
86
- mongo_db = None
87
 
88
- if mongo_client is None or mongo_db is None:
89
- logger.critical("CRITICAL: MongoDB client or db object is None after connection attempt in startup. Application might not function correctly.")
90
- else:
91
- logger.info("Application startup complete. MongoDB client & Batch Processor initialization attempted.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
 
94
- # --- 4. Application Shutdown Event Handler (Good Practice) ---
95
  @app.on_event("shutdown")
96
  async def shutdown_event():
97
- global mongo_client
98
- if mongo_client:
99
- mongo_client.close()
100
- logger.info("MongoDB connection closed.")
101
- logger.info("Application shutdown complete.")
 
 
 
 
 
 
 
 
 
 
 
 
102
 
103
- # --- 5. API Endpoints ---
104
 
105
- # Health check endpoint
106
- @app.get("/health")
107
  async def health_check():
108
- db_status = "Connected" if mongo_db else "Disconnected"
109
- batch_status = "Initialized" if batch_processor_initialized else "Not Initialized"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
 
111
- if mongo_db and batch_processor_initialized:
112
- return {"status": "ok", "mongodb": db_status, "batch_processor": batch_status}
113
  else:
114
- # Return 503 Service Unavailable if critical dependencies are not met
115
- return Response(
116
- content={"status": "degraded", "mongodb": db_status, "batch_processor": batch_status},
117
  status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
118
- media_type="application/json"
119
  )
120
 
121
- # Example endpoint - requires MongoDB connection
122
- @app.get("/characters/{character_id}")
123
- async def get_character(character_id: str):
124
- if not mongo_db:
125
- logger.error(f"Attempted to access /characters/{character_id} but MongoDB is not connected.")
126
- return Response(content="MongoDB is not connected.", status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
127
 
 
 
 
 
 
 
 
 
 
 
128
  try:
129
- # Example: Fetch a character from a 'characters' collection
130
- # Replace with your actual collection and query
131
- character_data = await mongo_db.get_collection("characters").find_one({"_id": character_id})
132
- # Note: PyMongo's find_one is synchronous. If using async, you'd need motor or an async wrapper.
133
- # For a truly async FastAPI app with MongoDB, consider 'motor' (async pymongo driver).
134
- # For now, we'll assume this is fine in an 'await' context for simplicity,
135
- # but in a real-world async app, you'd want to run blocking operations in a thread pool.
136
- # e.g., result = await loop.run_in_executor(None, mongo_db.collection.find_one, {"_id": character_id})
137
-
138
- if character_data:
139
- # MongoDB's _id might be an ObjectId, convert to string for JSON serialization
140
- character_data["_id"] = str(character_data["_id"])
141
- return character_data
142
- else:
143
- return Response(content="Character not found.", status_code=status.HTTP_404_NOT_FOUND)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
  except Exception as e:
145
- logger.error(f"Error fetching character {character_id}: {e}", exc_info=True)
146
- return Response(content="Internal server error.", status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
 
 
 
 
147
 
148
- # You would add more endpoints here for your application logic
149
- # e.g., @app.post("/email-process") etc.
150
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # This software is licensed under a **dual-license model**
2
+ # For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
3
+ # Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
4
  import os
5
+ # NUMBA_CACHE_DIR and NUMBA_DISABLE_CACHE are often set for specific environments,
6
+ # e.g., if you're experiencing issues with Numba's caching behavior or in containerized environments.
7
+ # Keep them if they serve a specific purpose in your deployment environment.
8
+ os.environ["NUMBA_CACHE_DIR"] = "/tmp/numba_cache"
9
+ os.environ["NUMBA_DISABLE_CACHE"] = "1"
10
+
11
+ import json
12
+ import re
13
+ from datetime import date, datetime, timedelta
14
+ from typing import List, Optional, Literal, Dict, Any, Tuple
15
+ import traceback
16
+ import asyncio
17
+
18
+ from fastapi import FastAPI, HTTPException, Response, Query, Depends, status
19
+ from fastapi.responses import FileResponse
20
+ from fastapi.exception_handlers import http_exception_handler
21
+ from starlette.exceptions import HTTPException as StarletteHTTPException
22
+ from langchain.prompts import PromptTemplate
23
+ from langchain_groq import ChatGroq
24
+ from pydantic import BaseModel, Field, BeforeValidator, model_serializer
25
+ from typing_extensions import Annotated
26
+ from pydantic_core import core_schema # Import core_schema for direct use in __get_pydantic_json_schema__
27
+
28
  from pymongo import MongoClient
29
+ from pymongo.errors import ConnectionFailure, OperationFailure
30
+ from bson import ObjectId
31
+
32
+ # --- MongoDB Configuration ---
33
+ # IMPORTANT: Use environment variables for your MONGO_URI in production for security.
34
+ # Example: MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
35
+ MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net"
36
+ DB_NAME = "email_assistant_db"
37
+ EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
38
+ GENERATED_REPLIES_COLLECTION = "generated_replies"
39
+
40
+ # Global variables for MongoDB client and collections
41
+ client: Optional[MongoClient] = None
42
+ db: Optional[Any] = None
43
+ extracted_emails_collection: Optional[Any] = None
44
+ generated_replies_collection: Optional[Any] = None
45
+
46
+ # --- Pydantic ObjectId Handling ---
47
+ class CustomObjectId(str):
48
+ """
49
+ Custom Pydantic type for handling MongoDB ObjectIds.
50
+ It validates that the input is a valid ObjectId string and
51
+ ensures it's represented as a string in JSON Schema.
52
+ """
53
+ @classmethod
54
+ def __get_validators__(cls):
55
+ yield cls.validate
56
+
57
+ @classmethod
58
+ def validate(cls, v):
59
+ # Allow None or empty string to pass through for optional fields
60
+ # This validator is only called if the field is not None
61
+ # Pydantic's Optional[PyObjectId] handles the None case before this validator
62
+ if v is None or v == "":
63
+ return None
64
+
65
+ if not isinstance(v, (str, ObjectId)):
66
+ raise ValueError("ObjectId must be a string or ObjectId instance")
67
+
68
+ # Convert ObjectId to string if it's already an ObjectId instance
69
+ if isinstance(v, ObjectId):
70
+ return str(v)
71
+
72
+ # Validate string format
73
+ if not ObjectId.is_valid(v):
74
+ raise ValueError("Invalid ObjectId format")
75
+ return cls(v) # Return an instance of CustomObjectId (which is a str subclass)
76
+
77
+ # This method is crucial for Pydantic v2 to generate correct OpenAPI schema
78
+ @classmethod
79
+ def __get_pydantic_json_schema__(
80
+ cls, _core_schema: core_schema.CoreSchema, handler
81
+ ) -> Dict[str, Any]:
82
+ # We tell Pydantic that this custom type should be represented as a standard string
83
+ # in the generated JSON Schema (OpenAPI documentation).
84
+ json_schema = handler(core_schema.str_schema())
85
+ json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" # Add an example for clarity
86
+ return json_schema
87
+
88
+ # Annotated type for convenience in models
89
+ PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
90
+
91
+
92
+ # ---------------------- Models ----------------------
93
+ class Contact(BaseModel):
94
+ name: str
95
+ last_name: str
96
+ email: Optional[str] = None
97
+ phone_number: Optional[str] = None
98
+
99
+ class Appointment(BaseModel):
100
+ title: str
101
+ description: str
102
+ start_date: date
103
+ start_time: Optional[str] = None
104
+ end_date: Optional[date] = None
105
+ end_time: Optional[str] = None
106
+
107
+ class Task(BaseModel):
108
+ task_title: str
109
+ task_description: str
110
+ due_date: date
111
+
112
+ class ExtractedData(BaseModel):
113
+ # Use PyObjectId for the _id field
114
+ id: Optional[PyObjectId] = Field(alias="_id", default=None)
115
+ contacts: List[Contact]
116
+ appointments: List[Appointment]
117
+ tasks: List[Task]
118
+ original_email_text: str
119
+ processed_at: datetime = Field(default_factory=datetime.utcnow)
120
+
121
+ class Config:
122
+ populate_by_name = True # Allow setting 'id' or '_id'
123
+ arbitrary_types_allowed = True # Allow CustomObjectId and ObjectId
124
+
125
+ # Custom serializer for JSON output to ensure ObjectId is converted to string
126
+ @model_serializer(when_used='json')
127
+ def serialize_model(self):
128
+ data = self.model_dump(by_alias=True, exclude_none=True)
129
+ # Ensure _id is a string when serializing to JSON
130
+ if "_id" in data and isinstance(data["_id"], ObjectId):
131
+ data["_id"] = str(data["_id"])
132
+ # Ensure dates are correctly serialized to ISO format if they are date objects
133
+ # Pydantic v2 usually handles this automatically for `date` types,
134
+ # but explicit conversion can be useful if direct manipulation is expected or for specific formats.
135
+ if 'appointments' in data:
136
+ for appt in data['appointments']:
137
+ if isinstance(appt.get('start_date'), date):
138
+ appt['start_date'] = appt['start_date'].isoformat()
139
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
140
+ appt['end_date'] = appt['end_date'].isoformat()
141
+ if 'tasks' in data:
142
+ for task_item in data['tasks']:
143
+ if isinstance(task_item.get('due_date'), date):
144
+ task_item['due_date'] = task_item['due_date'].isoformat()
145
+ return data
146
+
147
+ class ProcessEmailRequest(BaseModel):
148
+ email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
149
+ groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY")
150
+
151
+ class GenerateReplyRequest(BaseModel):
152
+ email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
153
+ groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY")
154
+ language: Literal["Italian", "English"] = Field("Italian", examples=["Italian", "English"])
155
+ length: str = Field("Auto", examples=["Short", "Medium", "Long", "Auto"])
156
+ style: str = Field("Professional", examples=["Professional", "Casual", "Formal", "Informal"])
157
+ tone: str = Field("Friendly", examples=["Friendly", "Neutral", "Urgent", "Empathetic"])
158
+ emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
159
+
160
+ class GeneratedReplyData(BaseModel):
161
+ # Use PyObjectId for the _id field
162
+ id: Optional[PyObjectId] = Field(alias="_id", default=None)
163
+ original_email_text: str
164
+ generated_reply_text: str
165
+ language: str
166
+ length: str
167
+ style: str
168
+ tone: str
169
+ emoji: str
170
+ generated_at: datetime = Field(default_factory=datetime.utcnow)
171
+
172
+ class Config:
173
+ populate_by_name = True
174
+ arbitrary_types_allowed = True
175
+
176
+ @model_serializer(when_used='json')
177
+ def serialize_model(self):
178
+ data = self.model_dump(by_alias=True, exclude_none=True)
179
+ if "_id" in data and isinstance(data["_id"], ObjectId):
180
+ data["_id"] = str(data["_id"])
181
+ return data
182
+
183
+ # NEW: Response Model for /generate-reply endpoint
184
+ class GenerateReplyResponse(BaseModel):
185
+ reply: str = Field(..., description="The AI-generated reply text.")
186
+ stored_id: str = Field(..., description="The MongoDB ID of the stored reply.")
187
+ cached: bool = Field(..., description="True if the reply was retrieved from cache, False if newly generated.")
188
+
189
+ # --- Query Models for GET Endpoints ---
190
+ class ExtractedEmailQuery(BaseModel):
191
+ contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
192
+ appointment_title: Optional[str] = Query(None, description="Filter by appointment title (case-insensitive partial match).")
193
+ task_title: Optional[str] = Query(None, description="Filter by task title (case-insensitive partial match).")
194
+ from_date: Optional[date] = Query(None, description="Filter by data processed on or after this date (YYYY-MM-DD).")
195
+ to_date: Optional[date] = Query(None, description="Filter by data processed on or before this date (YYYY-MM-DD).")
196
+ limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
197
+
198
+ class GeneratedReplyQuery(BaseModel):
199
+ language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by reply language.")
200
+ style: Optional[str] = Query(None, description="Filter by reply style (e.g., Professional, Casual).")
201
+ tone: Optional[str] = Query(None, description="Filter by reply tone (e.g., Friendly, Neutral).")
202
+ from_date: Optional[date] = Query(None, description="Filter by data generated on or after this date (YYYY-MM-DD).")
203
+ to_date: Optional[date] = Query(None, description="Filter by data generated on or before this date (YYYY-MM-DD).")
204
+ limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
205
+
206
+ # ---------------------- Utility Functions ----------------------
207
+ def extract_last_json_block(text: str) -> Optional[str]:
208
+ """
209
+ Extracts the last JSON block enclosed in ```json``` from a string,
210
+ or a standalone JSON object if no code block is found.
211
+ """
212
+ pattern = r'```json\s*(.*?)\s*```'
213
+ matches = re.findall(pattern, text, re.DOTALL)
214
+ if matches:
215
+ return matches[-1].strip()
216
+ # Fallback: try to find a standalone JSON object
217
+ match = re.search(r'\{.*\}', text, re.DOTALL)
218
+ if match:
219
+ return match.group(0)
220
+ return None
221
+
222
+ def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
223
+ """
224
+ Parses a date string, handling 'today', 'tomorrow', and YYYY-MM-DD format.
225
+ Returns None if input is None or cannot be parsed into a valid date.
226
+ """
227
+ if not date_str:
228
+ return None
229
+ date_str_lower = date_str.lower().strip()
230
+ if date_str_lower == "today":
231
+ return current_date
232
+ if date_str_lower == "tomorrow":
233
+ return current_date + timedelta(days=1)
234
+ try:
235
+ return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
236
+ except ValueError:
237
+ # If parsing fails, return None. The calling function (normalize_llm_output)
238
+ # will then decide the default (e.g., current_date).
239
+ return None
240
+
241
+ def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
242
+ """
243
+ Normalizes and validates LLM extracted data into ExtractedData Pydantic model.
244
+ Handles defaults for dates and name splitting.
245
+ """
246
+ def split_name(full_name: str) -> tuple[str, str]:
247
+ parts = full_name.strip().split()
248
+ name = parts[0] if parts else ""
249
+ last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
250
+ return name, last_name
251
+
252
+ contacts_data = []
253
+ for c in data.get("contacts", []):
254
+ name_val, last_name_val = split_name(c.get("name", ""))
255
+ contacts_data.append(Contact(name=name_val, last_name=last_name_val, email=c.get("email"), phone_number=c.get("phone_number")))
256
+
257
+ appointments_data = []
258
+ for a in data.get("appointments", []):
259
+ # Default start_date to current_date if not provided or invalid
260
+ start_date_val = parse_date(a.get("start_date"), current_date) or current_date
261
+ # end_date remains optional
262
+ end_date_val = parse_date(a.get("end_date"), current_date)
263
+
264
+ appointments_data.append(Appointment(
265
+ title=a.get("title", "Untitled"), description=a.get("description", "No description"),
266
+ start_date=start_date_val, start_time=a.get("start_time"),
267
+ end_date=end_date_val, end_time=a.get("end_time")
268
+ ))
269
+
270
+ tasks_data = []
271
+ for t in data.get("tasks", []):
272
+ # Default due_date to current_date if not provided or invalid
273
+ due_date_val = parse_date(t.get("due_date"), current_date) or current_date
274
+ tasks_data.append(Task(
275
+ task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
276
+ due_date=due_date_val
277
+ ))
278
+ return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text)
279
+
280
+ # ---------------------- Core Logic (Internal Functions) ----------------------
281
+ def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
282
+ """
283
+ Internal function to process email text using LLM and extract structured data.
284
+ """
285
+ if not email_text:
286
+ raise ValueError("Email text cannot be empty for processing.")
287
+
288
+ llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
289
+
290
+ prompt_today_str = current_date.isoformat()
291
+ prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
292
+
293
+ prompt_template_str = f"""
294
+ You are an expert email assistant tasked with extracting structured information from an Italian email.
295
+
296
+ **Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.**
297
+ **DO NOT include any conversational text, explanations, or preambles outside the JSON block.**
298
+ **The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".**
299
+ If a category has no items, its list should be empty (e.g., "contacts": []).
300
+
301
+ Here is the required JSON schema for each category:
302
+
303
+ - **contacts**: List of Contact objects.
304
+ Each Contact object must have:
305
+ - `name` (string, full name)
306
+ - `last_name` (string, last name) - You should infer this from the full name.
307
+ - `email` (string, optional, null if not present)
308
+ - `phone_number` (string, optional, null if not present)
309
+
310
+ - **appointments**: List of Appointment objects.
311
+ Each Appointment object must have:
312
+ - `title` (string, short, meaningful title in Italian based on the meeting's purpose)
313
+ - `description` (string, summary of the meeting's goal)
314
+ - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
315
+ - `start_time` (string, optional, e.g., "10:30 AM", null if not present)
316
+ - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable)
317
+ - `end_time` (string, optional, e.g., "11:00 AM", null if not present)
318
+
319
+ - **tasks**: List of Task objects.
320
+ Each Task object must have:
321
+ - `task_title` (string, short summary of action item)
322
+ - `task_description` (string, more detailed explanation)
323
+ - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
324
+
325
+ ---
326
+
327
+ Email:
328
+ {{email}}
329
+ """
330
+ prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
331
+ chain = prompt_template | llm
332
+ try:
333
+ llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
334
+ llm_output_str = llm_output.content
335
+
336
+ json_str = extract_last_json_block(llm_output_str)
337
+
338
+ if not json_str:
339
+ raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
340
+ json_data = json.loads(json_str)
341
+
342
+ extracted_data = normalize_llm_output(json_data, current_date, email_text)
343
+ return extracted_data
344
+ except json.JSONDecodeError as e:
345
+ raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
346
+ except Exception as e:
347
+ traceback.print_exc()
348
+ raise Exception(f"An error occurred during email processing: {e}")
349
+
350
+ def _generate_response_internal(
351
+ email_text: str, api_key: str, language: Literal["Italian", "English"],
352
+ length: str, style: str, tone: str, emoji: str
353
+ ) -> str:
354
+ """
355
+ Internal function to generate a reply to an email using LLM.
356
+ """
357
+ print(f"[{datetime.now()}] _generate_response_internal: Starting LLM call. API Key starts with: {api_key[:5]}...") # Debug log
358
+ if not email_text:
359
+ print(f"[{datetime.now()}] _generate_response_internal: Email text is empty.")
360
+ return "Cannot generate reply for empty email text."
361
+
362
+ try:
363
+ llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
364
+ prompt_template_str="""
365
+ You are an assistant that helps reply to emails.
366
+
367
+ Create a response to the following email with the following parameters:
368
+ - Language: {language}
369
+ - Length: {length}
370
+ - Style: {style}
371
+ - Tone: {tone}
372
+ - Emoji usage: {emoji}
373
+
374
+ Email:
375
+ {email}
376
+
377
+ Write only the reply body. Do not repeat the email or mention any instruction.
378
+ """
379
+ prompt = PromptTemplate(
380
+ input_variables=["email", "language", "length", "style", "tone", "emoji"],
381
+ template=prompt_template_str
382
+ )
383
+ chain = prompt | llm
384
+ print(f"[{datetime.now()}] _generate_response_internal: Invoking LLM chain...") # Debug log
385
+ output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
386
+ print(f"[{datetime.now()}] _generate_response_internal: LLM chain returned. Content length: {len(output.content)}.") # Debug log
387
+ return output.content.strip()
388
+ except Exception as e:
389
+ print(f"[{datetime.now()}] _generate_response_internal: ERROR during LLM invocation: {e}") # Debug log
390
+ traceback.print_exc() # Print full traceback to logs
391
+ raise # Re-raise the exception so it can be caught by handle_single_reply_request
392
+
393
+ # --- Batching and Caching Configuration ---
394
+ MAX_BATCH_SIZE = 20
395
+ BATCH_TIMEOUT = 0.5 # seconds (Adjust based on expected LLM response time and desired latency)
396
+
397
+ reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
398
+ reply_queue_lock = asyncio.Lock()
399
+ reply_queue_condition = asyncio.Condition(lock=reply_queue_lock)
400
+ batch_processor_task: Optional[asyncio.Task] = None
401
+
402
+
403
+ # --- Batch Processor and Handler ---
404
+ async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future):
405
+ """Handles a single request: checks cache, calls LLM, stores result, and sets future."""
406
+ print(f"[{datetime.now()}] Handle single reply: Starting for email_text_start='{request_data.email_text[:50]}'...")
407
+ if future.cancelled():
408
+ print(f"[{datetime.now()}] Handle single reply: Future cancelled. Aborting.")
409
+ return
410
+ try:
411
+ if generated_replies_collection is None:
412
+ print(f"[{datetime.now()}] Handle single reply: DB collection 'generated_replies_collection' is None.")
413
+ if not future.done():
414
+ future.set_exception(HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service not available for caching/storage."))
415
+ return
416
+
417
+ cache_query = {
418
+ "original_email_text": request_data.email_text,
419
+ "language": request_data.language,
420
+ "length": request_data.length,
421
+ "style": request_data.style,
422
+ "tone": request_data.tone,
423
+ "emoji": request_data.emoji,
424
+ }
425
+ print(f"[{datetime.now()}] Handle single reply: Checking cache for reply...")
426
+ # Use await asyncio.to_thread for blocking MongoDB operations
427
+ cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
428
+
429
+ if cached_reply_doc:
430
+ print(f"[{datetime.now()}] Handle single reply: Reply found in cache. ID: {str(cached_reply_doc['_id'])}")
431
+ response = {
432
+ "reply": cached_reply_doc["generated_reply_text"],
433
+ "stored_id": str(cached_reply_doc["_id"]),
434
+ "cached": True
435
+ }
436
+ if not future.done():
437
+ future.set_result(response)
438
+ print(f"[{datetime.now()}] Handle single reply: Cache result set on future.")
439
+ return
440
+
441
+ print(f"[{datetime.now()}] Handle single reply: Reply not in cache. Calling LLM...")
442
+ reply_content = await asyncio.to_thread(
443
+ _generate_response_internal,
444
+ request_data.email_text,
445
+ request_data.groq_api_key,
446
+ request_data.language,
447
+ request_data.length,
448
+ request_data.style,
449
+ request_data.tone,
450
+ request_data.emoji
451
+ )
452
+ print(f"[{datetime.now()}] Handle single reply: LLM call completed. Reply length: {len(reply_content)}.")
453
+
454
+ reply_data_to_store = GeneratedReplyData(
455
+ original_email_text=request_data.email_text,
456
+ generated_reply_text=reply_content,
457
+ language=request_data.language,
458
+ length=request_data.length,
459
+ style=request_data.style,
460
+ tone=request_data.tone,
461
+ emoji=request_data.emoji
462
+ )
463
+ print(f"[{datetime.now()}] Handle single reply: Storing reply in DB...")
464
+ # Use model_dump for Pydantic v2
465
+ reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
466
+
467
+ insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
468
+ stored_id = str(insert_result.inserted_id)
469
+ print(f"[{datetime.now()}] Handle single reply: Reply stored in DB. ID: {stored_id}")
470
+
471
+ final_response = {
472
+ "reply": reply_content,
473
+ "stored_id": stored_id,
474
+ "cached": False
475
+ }
476
+ if not future.done():
477
+ future.set_result(final_response)
478
+ print(f"[{datetime.now()}] Handle single reply: Final result set on future.")
479
+
480
+ except Exception as e:
481
+ print(f"[{datetime.now()}] Handle single reply: EXCEPTION: {e}")
482
+ traceback.print_exc() # Print full traceback to logs
483
+ if not future.done():
484
+ # Set the exception on the future so the client can catch it
485
+ future.set_exception(HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate reply: {e}"))
486
+ print(f"[{datetime.now()}] Handle single reply: Exception set on future.")
487
+
488
+
489
+ async def process_reply_batches():
490
+ """Continuously processes requests from the reply_request_queue in batches."""
491
+ global reply_request_queue
492
+ print(f"[{datetime.now()}] Batch processor task started.")
493
+ while True:
494
+ batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
495
+ async with reply_queue_condition:
496
+ if not reply_request_queue:
497
+ print(f"[{datetime.now()}] Batch processor: Queue empty, waiting for requests...")
498
+ # Wait for new requests or timeout
499
+ await reply_queue_condition.wait()
500
+ # After waking up, re-check if queue is still empty
501
+ if not reply_request_queue:
502
+ print(f"[{datetime.now()}] Batch processor: Woke up, queue still empty. Continuing loop.")
503
+ continue
504
+
505
+ now = asyncio.get_event_loop().time()
506
+ # Safety check: ensure queue is not empty before accessing index 0
507
+ if reply_request_queue:
508
+ oldest_item_timestamp = reply_request_queue[0][2]
509
+ else:
510
+ # If queue became empty while waiting, loop again
511
+ print(f"[{datetime.now()}] Batch processor: Queue became empty before processing. Restarting loop.")
512
+ continue
513
+
514
+ print(f"[{datetime.now()}] Batch processor: Woke up. Queue size: {len(reply_request_queue)}. Oldest item age: {now - oldest_item_timestamp:.2f}s")
515
+
516
+ # Condition to trigger batch processing: queue is full OR timeout reached for oldest item
517
+ if len(reply_request_queue) >= MAX_BATCH_SIZE or \
518
+ (now - oldest_item_timestamp >= BATCH_TIMEOUT):
519
+ num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
520
+ for _ in range(num_to_take):
521
+ # Safety check: ensure queue is not empty before popping
522
+ if reply_request_queue:
523
+ req, fut, _ = reply_request_queue.pop(0)
524
+ batch_to_fire.append((req, fut))
525
+ print(f"[{datetime.now()}] Batch processor: Firing batch of {len(batch_to_fire)} requests.")
526
+ else:
527
+ # Calculate time to wait for the next batch or timeout
528
+ time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
529
+ print(f"[{datetime.now()}] Batch processor: Not enough requests or timeout not reached. Waiting for {time_to_wait:.2f}s.")
530
+ try:
531
+ await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
532
+ except asyncio.TimeoutError:
533
+ print(f"[{datetime.now()}] Batch processor: wait timed out.")
534
+ pass # Loop will re-evaluate and likely fire the batch
535
+
536
+ if batch_to_fire:
537
+ tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
538
+ print(f"[{datetime.now()}] Batch processor: Awaiting completion of {len(tasks)} single reply tasks.")
539
+ await asyncio.gather(*tasks)
540
+ print(f"[{datetime.now()}] Batch processor: Batch processing complete.")
541
+ else:
542
+ # Short sleep to prevent busy-waiting if queue is empty but not waiting
543
+ await asyncio.sleep(0.001)
544
+
545
+
546
+ # ---------------------- FastAPI Application ----------------------
547
  app = FastAPI(
548
+ title="Email Assistant API",
549
+ description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
550
+ version="1.1.0",
551
+ docs_url="/", # Sets Swagger UI to be the root path
552
+ redoc_url="/redoc"
553
  )
554
 
555
+ # --- Global Exception Handler ---
556
+ # Catch Starlette HTTPExceptions (FastAPI uses these internally)
557
+ @app.exception_handler(StarletteHTTPException)
558
+ async def custom_http_exception_handler_wrapper(request, exc):
559
+ """Handles FastAPI's internal HTTP exceptions."""
560
+ print(f"[{datetime.now()}] Caught StarletteHTTPException: {exc.status_code} - {exc.detail}")
561
+ return await http_exception_handler(request, exc)
562
 
563
+ # Catch all other unhandled exceptions
564
+ @app.exception_handler(Exception)
565
+ async def global_exception_handler_wrapper(request, exc):
566
+ """Handles all unhandled exceptions and returns a consistent JSON error response."""
567
+ print(f"[{datetime.now()}] Unhandled exception caught by global handler for request: {request.url}")
568
+ traceback.print_exc() # Print traceback to console for debugging
569
+ # Return a JSON response for consistency, even for unhandled errors
570
+ return Response(
571
+ content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}),
572
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
573
+ media_type="application/json"
574
+ )
575
 
 
 
 
 
 
 
 
 
576
 
577
+ # --- FastAPI Event Handlers for MongoDB & Batch Processor ---
578
+ @app.on_event("startup")
579
+ async def startup_event():
580
+ global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
581
+ print(f"[{datetime.now()}] FastAPI app startup sequence initiated.")
582
  try:
583
+ # Connect to MongoDB
584
+ client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
585
+ client.admin.command('ping') # Test connection
586
+ db = client[DB_NAME]
587
+ extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
588
+ generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
589
+ print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
590
 
591
+ # Start the batch processor task if not already running
592
+ if batch_processor_task is None or batch_processor_task.done():
593
+ batch_processor_task = asyncio.create_task(process_reply_batches())
594
+ print(f"[{datetime.now()}] Batch processor task for replies started.")
595
+ else:
596
+ print(f"[{datetime.now()}] Batch processor task for replies is already running or being initialized.")
597
+
598
+ except (ConnectionFailure, OperationFailure) as e:
599
+ print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}")
600
+ client = None
601
+ db = None
602
+ extracted_emails_collection = None
603
+ generated_replies_collection = None
604
+ except Exception as e:
605
+ print(f"[{datetime.now()}] ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}")
606
+ traceback.print_exc()
607
+ client = None
608
+ db = None
609
+ extracted_emails_collection = None
610
+ generated_replies_collection = None
611
+ finally:
612
+ if client is not None and db is not None:
613
+ try:
614
+ client.admin.command('ping')
615
+ except Exception as e:
616
+ print(f"[{datetime.now()}] MongoDB ping failed after initial connection attempt during finally block: {e}")
617
+ client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
618
+ else:
619
+ print(f"[{datetime.now()}] MongoDB client or db object is None after connection attempt in startup. Database likely not connected.")
620
+ if client is None or db is None:
621
+ client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
622
+ print(f"[{datetime.now()}] FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.")
623
 
624
 
 
625
  @app.on_event("shutdown")
626
  async def shutdown_event():
627
+ global client, batch_processor_task
628
+ print(f"[{datetime.now()}] FastAPI app shutting down.")
629
+ if batch_processor_task:
630
+ batch_processor_task.cancel()
631
+ try:
632
+ await batch_processor_task
633
+ print(f"[{datetime.now()}] Batch processor task awaited.")
634
+ except asyncio.CancelledError:
635
+ print(f"[{datetime.now()}] Batch processor task for replies cancelled during shutdown.")
636
+ except Exception as e:
637
+ print(f"[{datetime.now()}] Error during batch processor task shutdown: {e}")
638
+ traceback.print_exc()
639
+ batch_processor_task = None
640
+
641
+ if client:
642
+ client.close()
643
+ print(f"[{datetime.now()}] MongoDB client closed.")
644
 
 
645
 
646
+ # --- API Endpoints ---
647
+ @app.get("/health", summary="Health Check")
648
  async def health_check():
649
+ """
650
+ Checks the health of the API, including MongoDB connection and batch processor status.
651
+ """
652
+ db_status = "MongoDB not connected."
653
+ db_ok = False
654
+ if client is not None and db is not None:
655
+ try:
656
+ # Use asyncio.to_thread for blocking MongoDB call
657
+ await asyncio.to_thread(db.list_collection_names)
658
+ db_status = "MongoDB connection OK."
659
+ db_ok = True
660
+ except Exception as e:
661
+ db_status = f"MongoDB connection error: {e}"
662
+ db_ok = False
663
+
664
+ batch_processor_status = "Batch processor not running."
665
+ if batch_processor_task is not None:
666
+ if not batch_processor_task.done():
667
+ batch_processor_status = "Batch processor is running."
668
+ else:
669
+ if batch_processor_task.exception():
670
+ batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}"
671
+ else:
672
+ batch_processor_status = "Batch processor task is done (may have completed or cancelled)."
673
+ else:
674
+ batch_processor_status = "Batch processor task has not been initialized."
675
 
676
+ if db_ok:
677
+ return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
678
  else:
679
+ raise HTTPException(
 
 
680
  status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
681
+ detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status}
682
  )
683
 
 
 
 
 
 
 
684
 
685
+ @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
686
+ async def extract_email_data(request: ProcessEmailRequest):
687
+ """
688
+ Receives an email, extracts contacts, appointments, and tasks using an LLM,
689
+ and stores the extracted data in MongoDB.
690
+ """
691
+ print(f"[{datetime.now()}] /extract-data: Received request.")
692
+ if extracted_emails_collection is None:
693
+ print(f"[{datetime.now()}] /extract-data: MongoDB collection is None.")
694
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.")
695
  try:
696
+ current_date_val = date.today()
697
+ print(f"[{datetime.now()}] /extract-data: Calling internal processing function.")
698
+ extracted_data = await asyncio.to_thread(
699
+ _process_email_internal, request.email_text, request.groq_api_key, current_date_val
700
+ )
701
+ print(f"[{datetime.now()}] /extract-data: Internal processing complete. Preparing for DB insert.")
702
+
703
+ extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
704
+ # Convert date objects to datetime for MongoDB storage if they are just date objects
705
+ # Pydantic's default `date` handling might serialize to ISO string, but for
706
+ # internal MongoDB storage, sometimes `datetime` is preferred for consistency.
707
+ if 'appointments' in extracted_data_dict:
708
+ for appt in extracted_data_dict['appointments']:
709
+ if isinstance(appt.get('start_date'), date):
710
+ appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
711
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
712
+ appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
713
+ if 'tasks' in extracted_data_dict:
714
+ for task_item in extracted_data_dict['tasks']:
715
+ if isinstance(task_item.get('due_date'), date):
716
+ task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
717
+
718
+ print(f"[{datetime.now()}] /extract-data: Inserting into MongoDB...")
719
+ result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
720
+ print(f"[{datetime.now()}] /extract-data: Data inserted into MongoDB. ID: {result.inserted_id}")
721
+
722
+ extracted_data.id = result.inserted_id
723
+ return extracted_data
724
+ except ValueError as e:
725
+ print(f"[{datetime.now()}] /extract-data: ValueError: {e}")
726
+ raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
727
+ except Exception as e:
728
+ print(f"[{datetime.now()}] /extract-data: Unhandled Exception: {e}")
729
+ traceback.print_exc()
730
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during data extraction: {e}")
731
+
732
+
733
+ @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
734
+ async def extract_email_data_excel(request: ProcessEmailRequest):
735
+ """
736
+ Placeholder for future functionality to extract data and provide as an Excel download.
737
+ Currently disabled.
738
+ """
739
+ raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Excel functionality is currently disabled.")
740
+
741
+
742
+ @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email (batched & cached)")
743
+ async def generate_email_reply(request: GenerateReplyRequest):
744
+ """
745
+ Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji).
746
+ Uses a batch processing system with caching for efficiency.
747
+ """
748
+ print(f"[{datetime.now()}] /generate-reply: Received request.")
749
+ if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
750
+ print(f"[{datetime.now()}] /generate-reply: Service not initialized. gen_replies_coll={generated_replies_collection is not None}, batch_task={batch_processor_task is not None}, queue_cond={reply_queue_condition is not None}")
751
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Reply generation service not fully initialized. Check server logs for database or batch processor issues.")
752
+
753
+ future = asyncio.Future()
754
+ current_time = asyncio.get_event_loop().time()
755
+
756
+ async with reply_queue_condition:
757
+ reply_request_queue.append((request, future, current_time))
758
+ reply_queue_condition.notify() # Notify the batch processor that a new request is available
759
+ print(f"[{datetime.now()}] /generate-reply: Request added to queue, notifying batch processor. Queue size: {len(reply_request_queue)}")
760
+
761
+ try:
762
+ # Debugging: Increase timeout significantly to allow full tracing in logs
763
+ client_timeout = BATCH_TIMEOUT + 60.0 # Example: 0.5s batch + 60s LLM response buffer = 60.5s total timeout
764
+ print(f"[{datetime.now()}] /generate-reply: Waiting for future result with timeout {client_timeout}s.")
765
+ result = await asyncio.wait_for(future, timeout=client_timeout)
766
+ print(f"[{datetime.now()}] /generate-reply: Future result received. Returning data.")
767
+ return result
768
+ except asyncio.TimeoutError:
769
+ print(f"[{datetime.now()}] /generate-reply: Client timeout waiting for future after {client_timeout}s. Future done: {future.done()}")
770
+ if not future.done():
771
+ future.cancel() # Cancel if it's still pending
772
+ raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long. Check server logs for more details.")
773
  except Exception as e:
774
+ if isinstance(e, HTTPException):
775
+ print(f"[{datetime.now()}] /generate-reply: Caught HTTPException: {e.status_code} - {e.detail}")
776
+ raise e # Re-raise FastAPI HTTPExceptions
777
+ print(f"[{datetime.now()}] /generate-reply: Unhandled Exception: {e}")
778
+ traceback.print_exc()
779
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error processing your reply request: {str(e)}. Check server logs for more details.")
780
 
 
 
781
 
782
+ @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
783
+ async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
784
+ print(f"[{datetime.now()}] /query-extracted-emails: Received request with params: {query_params.model_dump_json()}")
785
+ if extracted_emails_collection is None:
786
+ print(f"[{datetime.now()}] /query-extracted-emails: MongoDB collection is None.")
787
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying extracted emails.")
788
+ mongo_query: Dict[str, Any] = {}
789
+ if query_params.contact_name:
790
+ mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"} # Case-insensitive regex
791
+ if query_params.appointment_title:
792
+ mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
793
+ if query_params.task_title:
794
+ mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
795
+
796
+ if query_params.from_date or query_params.to_date:
797
+ date_query: Dict[str, datetime] = {}
798
+ if query_params.from_date:
799
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
800
+ if query_params.to_date:
801
+ # Query up to the end of the 'to_date' day
802
+ date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
803
+ if date_query :
804
+ mongo_query["processed_at"] = date_query
805
+ print(f"[{datetime.now()}] /query-extracted-emails: MongoDB query built: {mongo_query}")
806
+
807
+ try:
808
+ # Use await asyncio.to_thread for blocking MongoDB operations
809
+ cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
810
+ extracted_docs_raw = await asyncio.to_thread(list, cursor)
811
+ print(f"[{datetime.now()}] /query-extracted-emails: Found {len(extracted_docs_raw)} documents.")
812
+
813
+ results = []
814
+ for doc_raw in extracted_docs_raw:
815
+ # Convert datetime objects back to date for Pydantic model validation if necessary
816
+ if 'appointments' in doc_raw:
817
+ for appt in doc_raw['appointments']:
818
+ if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
819
+ if isinstance(appt.get('end_date'), datetime): appt['end_date'] = appt['end_date'].date()
820
+ if 'tasks' in doc_raw:
821
+ for task_item in doc_raw['tasks']:
822
+ if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
823
+ results.append(ExtractedData(**doc_raw))
824
+ print(f"[{datetime.now()}] /query-extracted-emails: Returning {len(results)} results.")
825
+ return results
826
+ except Exception as e:
827
+ print(f"[{datetime.now()}] /query-extracted-emails: Unhandled Exception during query: {e}")
828
+ traceback.print_exc()
829
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying extracted emails: {e}")
830
+
831
+
832
+ @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
833
+ async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
834
+ print(f"[{datetime.now()}] /query-generated-replies: Received request with params: {query_params.model_dump_json()}")
835
+ if generated_replies_collection is None:
836
+ print(f"[{datetime.now()}] /query-generated-replies: MongoDB collection is None.")
837
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying generated replies.")
838
+ mongo_query: Dict[str, Any] = {}
839
+ if query_params.language: mongo_query["language"] = query_params.language
840
+ if query_params.style: mongo_query["style"] = query_params.style
841
+ if query_params.tone: mongo_query["tone"] = query_params.tone
842
+
843
+ if query_params.from_date or query_params.to_date:
844
+ date_query: Dict[str, datetime] = {}
845
+ if query_params.from_date:
846
+ date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
847
+ if query_params.to_date:
848
+ date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
849
+ if date_query:
850
+ mongo_query["generated_at"] = date_query
851
+ print(f"[{datetime.now()}] /query-generated-replies: MongoDB query built: {mongo_query}")
852
+
853
+ try:
854
+ # Use await asyncio.to_thread for blocking MongoDB operations
855
+ cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
856
+ generated_docs_raw = await asyncio.to_thread(list, cursor)
857
+ print(f"[{datetime.now()}] /query-generated-replies: Found {len(generated_docs_raw)} documents.")
858
+ results = []
859
+ for doc_raw in generated_docs_raw:
860
+ results.append(GeneratedReplyData(**doc_raw))
861
+ print(f"[{datetime.now()}] /query-generated-replies: Returning {len(results)} results.")
862
+ return results
863
+ except Exception as e:
864
+ print(f"[{datetime.now()}] /query-generated-replies: Unhandled Exception during query: {e}")
865
+ traceback.print_exc()
866
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying generated replies: {e}")