precison9 commited on
Commit
c313a26
·
verified ·
1 Parent(s): 55fcf87

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +655 -18
flask_Character.py CHANGED
@@ -5,29 +5,666 @@
5
  import os
6
  os.environ["NUMBA_CACHE_DIR"] = "/tmp/numba_cache"
7
  os.environ["NUMBA_DISABLE_CACHE"] = "1"
8
- from flask import request, jsonify
9
- import numpy as np
10
- import torch
11
- import flask
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
- from utils.generate_face_shapes import generate_facial_data_from_bytes
14
- from utils.model.model import load_model
15
- from utils.config import config
16
 
17
- app = flask.Flask(__name__)
 
 
 
 
 
 
 
18
 
19
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
20
- print("Activated device:", device)
 
 
 
21
 
22
- model_path = 'utils/model/model.pth'
23
- blendshape_model = load_model(model_path, config, device)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
 
25
- @app.route('/audio_to_blendshapes', methods=['POST'])
26
- def audio_to_blendshapes_route():
27
- audio_bytes = request.data
28
- generated_facial_data = generate_facial_data_from_bytes(audio_bytes, blendshape_model, device, config)
29
- generated_facial_data_list = generated_facial_data.tolist() if isinstance(generated_facial_data, np.ndarray) else generated_facial_data
30
 
31
- return jsonify({'blendshapes': generated_facial_data_list})
 
 
 
 
 
 
 
32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
 
5
  import os
6
  os.environ["NUMBA_CACHE_DIR"] = "/tmp/numba_cache"
7
  os.environ["NUMBA_DISABLE_CACHE"] = "1"
8
+ import json
9
+ import re
10
+ from datetime import date, datetime, timedelta
11
+ from typing import List, Optional, Literal, Dict, Any, Tuple
12
+ import os
13
+ import traceback
14
+ import asyncio
15
+
16
+ from fastapi import FastAPI, HTTPException, Response, Query, Depends
17
+ from fastapi.responses import FileResponse
18
+ from fastapi.exception_handlers import http_exception_handler
19
+ from starlette.exceptions import HTTPException as StarletteHTTPException
20
+ from langchain.prompts import PromptTemplate
21
+ from langchain_groq import ChatGroq
22
+ from pydantic import BaseModel, Field, BeforeValidator, model_serializer
23
+ from typing_extensions import Annotated
24
+
25
+ from pymongo import MongoClient
26
+ from pymongo.errors import ConnectionFailure, OperationFailure
27
+ from bson import ObjectId
28
+
29
+ # --- MongoDB Configuration ---
30
+ MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net" # Replace with your actual URI
31
+ DB_NAME = "email_assistant_db"
32
+ EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
33
+ GENERATED_REPLIES_COLLECTION = "generated_replies"
34
+
35
+ client: Optional[MongoClient] = None
36
+ db: Optional[Any] = None # Changed to Optional[Any] as Database type is not directly imported for annotation here
37
+ extracted_emails_collection: Optional[Any] = None
38
+ generated_replies_collection: Optional[Any] = None
39
+
40
+ # --- Pydantic ObjectId Handling ---
41
+ class CustomObjectId(str):
42
+ @classmethod
43
+ def __get_validators__(cls):
44
+ yield cls.validate
45
+
46
+ @classmethod
47
+ def validate(cls, v, info):
48
+ if not ObjectId.is_valid(v):
49
+ raise ValueError("Invalid ObjectId")
50
+ return str(v)
51
+
52
+ @classmethod
53
+ def __get_pydantic_json_schema__(cls, core_schema, handler):
54
+ json_schema = handler(core_schema)
55
+ json_schema["type"] = "string"
56
+ json_schema["example"] = "60c728ef238b9c7b9e0f6c2a"
57
+ return json_schema
58
+
59
+ PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
60
+
61
+ # ---------------------- Models ----------------------
62
+ class Contact(BaseModel):
63
+ name: str
64
+ last_name: str
65
+ email: Optional[str] = None
66
+ phone_number: Optional[str] = None
67
+
68
+ class Appointment(BaseModel):
69
+ title: str
70
+ description: str
71
+ start_date: date
72
+ start_time: Optional[str] = None
73
+ end_date: Optional[date] = None
74
+ end_time: Optional[str] = None
75
+
76
+ class Task(BaseModel):
77
+ task_title: str
78
+ task_description: str
79
+ due_date: date
80
+
81
+ class ExtractedData(BaseModel):
82
+ id: Optional[PyObjectId] = Field(alias="_id", default=None)
83
+ contacts: List[Contact]
84
+ appointments: List[Appointment]
85
+ tasks: List[Task]
86
+ original_email_text: str
87
+ processed_at: datetime = Field(default_factory=datetime.utcnow)
88
+
89
+ class Config:
90
+ populate_by_name = True
91
+ arbitrary_types_allowed = True
92
+
93
+ @model_serializer(when_used='json')
94
+ def serialize_model(self):
95
+ data = self.model_dump(by_alias=True, exclude_none=True)
96
+ if "_id" in data and isinstance(data["_id"], ObjectId):
97
+ data["_id"] = str(data["_id"])
98
+ if 'appointments' in data:
99
+ for appt in data['appointments']:
100
+ if isinstance(appt.get('start_date'), date):
101
+ appt['start_date'] = appt['start_date'].isoformat()
102
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
103
+ appt['end_date'] = appt['end_date'].isoformat()
104
+ if 'tasks' in data:
105
+ for task_item in data['tasks']:
106
+ if isinstance(task_item.get('due_date'), date):
107
+ task_item['due_date'] = task_item['due_date'].isoformat()
108
+ return data
109
+
110
+ class ProcessEmailRequest(BaseModel):
111
+ email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
112
+ groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY")
113
+
114
+ class GenerateReplyRequest(BaseModel):
115
+ email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
116
+ groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY")
117
+ language: Literal["Italian", "English"] = Field("Italian", examples=["Italian", "English"])
118
+ length: str = Field("Auto", examples=["Short", "Medium", "Long", "Auto"])
119
+ style: str = Field("Professional", examples=["Professional", "Casual", "Formal", "Informal"])
120
+ tone: str = Field("Friendly", examples=["Friendly", "Neutral", "Urgent", "Empathetic"])
121
+ emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
122
+
123
+ class GeneratedReplyData(BaseModel):
124
+ id: Optional[PyObjectId] = Field(alias="_id", default=None)
125
+ original_email_text: str
126
+ generated_reply_text: str
127
+ language: str
128
+ length: str
129
+ style: str
130
+ tone: str
131
+ emoji: str
132
+ generated_at: datetime = Field(default_factory=datetime.utcnow)
133
+
134
+ class Config:
135
+ populate_by_name = True
136
+ arbitrary_types_allowed = True
137
+
138
+ @model_serializer(when_used='json')
139
+ def serialize_model(self):
140
+ data = self.model_dump(by_alias=True, exclude_none=True)
141
+ if "_id" in data and isinstance(data["_id"], ObjectId):
142
+ data["_id"] = str(data["_id"])
143
+ return data
144
+
145
+ # --- Query Models for GET Endpoints ---
146
+ class ExtractedEmailQuery(BaseModel):
147
+ contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
148
+ appointment_title: Optional[str] = Query(None, description="Filter by appointment title (case-insensitive partial match).")
149
+ task_title: Optional[str] = Query(None, description="Filter by task title (case-insensitive partial match).")
150
+ from_date: Optional[date] = Query(None, description="Filter by data processed on or after this date (YYYY-MM-DD).")
151
+ to_date: Optional[date] = Query(None, description="Filter by data processed on or before this date (YYYY-MM-DD).")
152
+ limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
153
+
154
+ class GeneratedReplyQuery(BaseModel):
155
+ language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by reply language.")
156
+ style: Optional[str] = Query(None, description="Filter by reply style (e.g., Professional, Casual).")
157
+ tone: Optional[str] = Query(None, description="Filter by reply tone (e.g., Friendly, Neutral).")
158
+ from_date: Optional[date] = Query(None, description="Filter by data generated on or after this date (YYYY-MM-DD).")
159
+ to_date: Optional[date] = Query(None, description="Filter by data generated on or before this date (YYYY-MM-DD).")
160
+ limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
161
+
162
+ # ---------------------- Utility Functions ----------------------
163
+ def extract_last_json_block(text: str) -> Optional[str]:
164
+ pattern = r'```json\s*(.*?)\s*```'
165
+ matches = re.findall(pattern, text, re.DOTALL)
166
+ if matches:
167
+ return matches[-1].strip()
168
+ match = re.search(r'\{.*\}', text, re.DOTALL)
169
+ if match:
170
+ return match.group(0)
171
+ return None
172
+
173
+ def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]: # Allow None to pass through
174
+ if not date_str: return None # If input is None (e.g. optional end_date), return None
175
+ date_str_lower = date_str.lower().strip()
176
+ if date_str_lower == "today": return current_date
177
+ if date_str_lower == "tomorrow": return current_date + timedelta(days=1)
178
+ try:
179
+ return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
180
+ except ValueError:
181
+ # For "appointments" "start_date", you assumed "today" if parsing failed or not present.
182
+ # For "end_date" it was optional. We need to be consistent.
183
+ # Given the original normalize_llm_output, start_date defaulted to today, end_date was optional.
184
+ # This parse_date is more general. The default handling should be in normalize_llm_output.
185
+ return current_date # Fallback, or raise error, or return None depending on strictness
186
+
187
+ def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
188
+ def split_name(full_name: str) -> tuple[str, str]:
189
+ parts = full_name.strip().split()
190
+ name = parts[0] if parts else ""
191
+ last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
192
+ return name, last_name
193
+
194
+ contacts_data = []
195
+ for c in data.get("contacts", []):
196
+ name_val, last_name_val = split_name(c.get("name", ""))
197
+ contacts_data.append(Contact(name=name_val, last_name=last_name_val, email=c.get("email"), phone_number=c.get("phone_number")))
198
+
199
+ appointments_data = []
200
+ for a in data.get("appointments", []):
201
+ start_date_val = parse_date(a.get("start_date", "today"), current_date) or current_date # Default to current_date if parse_date returns None
202
+ end_date_val = parse_date(a.get("end_date"), current_date) # parse_date can return None if end_date is not provided or invalid
203
+
204
+ appointments_data.append(Appointment(
205
+ title=a.get("title", "Untitled"), description=a.get("description", "No description"),
206
+ start_date=start_date_val, start_time=a.get("start_time"),
207
+ end_date=end_date_val, end_time=a.get("end_time")
208
+ ))
209
+
210
+ tasks_data = []
211
+ for t in data.get("tasks", []):
212
+ due_date_val = parse_date(t.get("due_date", "today"), current_date) or current_date # Default to current_date
213
+ tasks_data.append(Task(
214
+ task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
215
+ due_date=due_date_val
216
+ ))
217
+ return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text)
218
+
219
+ # ---------------------- Core Logic (Internal Functions) ----------------------
220
+ def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
221
+ if not email_text: raise ValueError("Email text cannot be empty for processing.")
222
+ llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
223
+ prompt_today_str = current_date.isoformat()
224
+ prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
225
+ # Ensure your full, detailed prompt is used here
226
+ prompt_template_str = f"""
227
+ You are an expert email assistant tasked with extracting structured information from an Italian email.
228
+
229
+ **Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.**
230
+ **DO NOT include any conversational text, explanations, or preambles outside the JSON block.**
231
+ **The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".**
232
+ If a category has no items, its list should be empty (e.g., "contacts": []).
233
+
234
+ Here is the required JSON schema for each category:
235
+
236
+ - **contacts**: List of Contact objects.
237
+ Each Contact object must have:
238
+ - `name` (string, full name)
239
+ - `last_name` (string, last name) - You should infer this from the full name.
240
+ - `email` (string, optional, null if not present)
241
+ - `phone_number` (string, optional, null if not present)
242
+
243
+ - **appointments**: List of Appointment objects.
244
+ Each Appointment object must have:
245
+ - `title` (string, short, meaningful title in Italian based on the meeting's purpose)
246
+ - `description` (string, summary of the meeting's goal)
247
+ - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
248
+ - `start_time` (string, optional, e.g., "10:30 AM", null if not present)
249
+ - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable)
250
+ - `end_time` (string, optional, e.g., "11:00 AM", null if not present)
251
+
252
+ - **tasks**: List of Task objects.
253
+ Each Task object must have:
254
+ - `task_title` (string, short summary of action item)
255
+ - `task_description` (string, more detailed explanation)
256
+ - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
257
+
258
+ ---
259
+
260
+ Email:
261
+ {{email}}
262
+ """
263
+ prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
264
+ chain = prompt_template | llm
265
+ try:
266
+ # print(f"DEBUG: Invoking LLM with email_text length: {len(email_text)} and current_date: {current_date}")
267
+ llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
268
+ llm_output_str = llm_output.content
269
+ # print(f"DEBUG: Raw LLM output:\n{llm_output_str[:500]}...")
270
+
271
+ json_str = extract_last_json_block(llm_output_str)
272
+ # print(f"DEBUG: Extracted JSON string:\n{json_str}")
273
+
274
+ if not json_str: raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
275
+ json_data = json.loads(json_str)
276
+ # print(f"DEBUG: Parsed JSON data: {json.dumps(json_data, indent=2)}")
277
+
278
+ extracted_data = normalize_llm_output(json_data, current_date, email_text)
279
+ # print("DEBUG: Data normalized successfully.")
280
+ return extracted_data
281
+ except json.JSONDecodeError as e:
282
+ # print(f"ERROR: JSON Decode Error: {e}")
283
+ # print(f"ERROR: LLM response that caused error:\n{llm_output_str}")
284
+ raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
285
+ except Exception as e:
286
+ traceback.print_exc()
287
+ raise Exception(f"An error occurred during email processing: {e}")
288
+
289
+ def _generate_response_internal(
290
+ email_text: str, api_key: str, language: Literal["Italian", "English"],
291
+ length: str, style: str, tone: str, emoji: str
292
+ ) -> str:
293
+ if not email_text: return "Cannot generate reply for empty email text."
294
+ llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
295
+ # Ensure your full, detailed prompt is used here
296
+ prompt_template_str="""
297
+ You are an assistant that helps reply to emails.
298
+
299
+ Create a response to the following email with the following parameters:
300
+ - Language: {language}
301
+ - Length: {length}
302
+ - Style: {style}
303
+ - Tone: {tone}
304
+ - Emoji usage: {emoji}
305
+
306
+ Email:
307
+ {email}
308
+
309
+ Write only the reply body. Do not repeat the email or mention any instruction.
310
+ """
311
+ prompt = PromptTemplate(
312
+ input_variables=["email", "language", "length", "style", "tone", "emoji"],
313
+ template=prompt_template_str
314
+ )
315
+ chain = prompt | llm
316
+ output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
317
+ return output.content.strip()
318
+
319
+ # --- Batching and Caching Configuration ---
320
+ MAX_BATCH_SIZE = 20
321
+ BATCH_TIMEOUT = 0.5 # seconds
322
+
323
+ reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
324
+ reply_queue_lock = asyncio.Lock()
325
+ reply_queue_condition = asyncio.Condition(lock=reply_queue_lock)
326
+ batch_processor_task: Optional[asyncio.Task] = None
327
+
328
+
329
+ # --- Batch Processor and Handler ---
330
+ async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future):
331
+ """Handles a single request: checks cache, calls LLM, stores result, and sets future."""
332
+ if future.cancelled():
333
+ return
334
+ try:
335
+ if generated_replies_collection is None:
336
+ raise HTTPException(status_code=503, detail="Database service not available for caching/storage.")
337
+
338
+ cache_query = {
339
+ "original_email_text": request_data.email_text,
340
+ "language": request_data.language,
341
+ "length": request_data.length,
342
+ "style": request_data.style,
343
+ "tone": request_data.tone,
344
+ "emoji": request_data.emoji,
345
+ }
346
+ cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
347
+
348
+ if cached_reply_doc:
349
+ response = {
350
+ "reply": cached_reply_doc["generated_reply_text"],
351
+ "stored_id": str(cached_reply_doc["_id"]),
352
+ "cached": True
353
+ }
354
+ if not future.done(): future.set_result(response)
355
+ return
356
+
357
+ reply_content = await asyncio.to_thread(
358
+ _generate_response_internal,
359
+ request_data.email_text,
360
+ request_data.groq_api_key,
361
+ request_data.language,
362
+ request_data.length,
363
+ request_data.style,
364
+ request_data.tone,
365
+ request_data.emoji
366
+ )
367
+
368
+ reply_data_to_store = GeneratedReplyData(
369
+ original_email_text=request_data.email_text,
370
+ generated_reply_text=reply_content,
371
+ language=request_data.language,
372
+ length=request_data.length,
373
+ style=request_data.style,
374
+ tone=request_data.tone,
375
+ emoji=request_data.emoji
376
+ )
377
+ reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
378
+
379
+ insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
380
+ stored_id = str(insert_result.inserted_id)
381
+
382
+ final_response = {
383
+ "reply": reply_content,
384
+ "stored_id": stored_id,
385
+ "cached": False
386
+ }
387
+ if not future.done(): future.set_result(final_response)
388
+
389
+ except Exception as e:
390
+ traceback.print_exc()
391
+ if not future.done():
392
+ future.set_exception(e)
393
+
394
+ async def process_reply_batches():
395
+ """Continuously processes requests from the reply_request_queue in batches."""
396
+ global reply_request_queue
397
+ while True:
398
+ batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
399
+ async with reply_queue_condition:
400
+ if not reply_request_queue:
401
+ await reply_queue_condition.wait()
402
+ if not reply_request_queue:
403
+ continue
404
+
405
+ now = asyncio.get_event_loop().time()
406
+ oldest_item_timestamp = reply_request_queue[0][2]
407
+
408
+ if len(reply_request_queue) >= MAX_BATCH_SIZE or \
409
+ (now - oldest_item_timestamp >= BATCH_TIMEOUT):
410
+ num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
411
+ for _ in range(num_to_take):
412
+ req, fut, _ = reply_request_queue.pop(0)
413
+ batch_to_fire.append((req, fut))
414
+ else:
415
+ time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
416
+ try:
417
+ await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
418
+ except asyncio.TimeoutError:
419
+ pass # Loop will re-evaluate
420
+
421
+ if batch_to_fire:
422
+ tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
423
+ await asyncio.gather(*tasks)
424
+ else:
425
+ await asyncio.sleep(0.001)
426
+
427
+
428
+ # ---------------------- FastAPI Application ----------------------
429
+ app = FastAPI(
430
+ title="Email Assistant API",
431
+ description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
432
+ version="1.1.0", # Incremented version
433
+ docs_url="/",
434
+ redoc_url="/redoc"
435
+ )
436
+
437
+ # --- Global Exception Handler ---
438
+ @app.exception_handler(StarletteHTTPException)
439
+ async def custom_http_exception_handler_wrapper(request, exc):
440
+ return await http_exception_handler(request, exc)
441
+
442
+ @app.exception_handler(Exception)
443
+ async def global_exception_handler_wrapper(request, exc):
444
+ print(f"Unhandled exception caught by global handler for request: {request.url}")
445
+ traceback.print_exc()
446
+ # Ensure it returns a valid FastAPI response
447
+ return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}"}), status_code=500, media_type="application/json")
448
+
449
+
450
+ # --- FastAPI Event Handlers for MongoDB & Batch Processor ---
451
+ @app.on_event("startup")
452
+ async def startup_event():
453
+ global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
454
+ try:
455
+ client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
456
+ client.admin.command('ping')
457
+ db = client[DB_NAME]
458
+ extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
459
+ generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
460
+ print(f"Successfully connected to MongoDB: {DB_NAME}")
461
+
462
+ if batch_processor_task is None:
463
+ loop = asyncio.get_event_loop()
464
+ batch_processor_task = loop.create_task(process_reply_batches())
465
+ print("Batch processor task for replies started.")
466
+
467
+ except (ConnectionFailure, OperationFailure) as e:
468
+ print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
469
+ client = None
470
+ db = None
471
+ extracted_emails_collection = None
472
+ generated_replies_collection = None
473
+ except Exception as e:
474
+ print(f"ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}")
475
+ traceback.print_exc()
476
+ client = None
477
+ db = None
478
+ extracted_emails_collection = None
479
+ generated_replies_collection = None
480
+ finally:
481
+ # Corrected condition for checking client and db
482
+ if client is not None and db is not None:
483
+ try:
484
+ client.admin.command('ping')
485
+ except Exception:
486
+ print("MongoDB ping failed after initial connection attempt during finally block.")
487
+ client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
488
+ else:
489
+ print("MongoDB client or db object is None after connection attempt in startup.")
490
+ if client is None or db is None: # Ensure all are None if one is
491
+ client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
492
+ print("FastAPI app starting up. MongoDB client & Batch Processor initialization attempted.")
493
+
494
+
495
+ @app.on_event("shutdown")
496
+ async def shutdown_event():
497
+ global client, batch_processor_task
498
+ if batch_processor_task:
499
+ batch_processor_task.cancel()
500
+ try:
501
+ await batch_processor_task
502
+ except asyncio.CancelledError:
503
+ print("Batch processor task for replies cancelled.")
504
+ except Exception as e:
505
+ print(f"Error during batch processor task shutdown: {e}")
506
+ traceback.print_exc()
507
+ batch_processor_task = None
508
+
509
+ if client:
510
+ client.close()
511
+ print("FastAPI app shutting down. MongoDB client closed.")
512
+
513
+
514
+ @app.get("/health", summary="Health Check")
515
+ async def health_check():
516
+ db_status = "MongoDB not connected. Check server startup logs."
517
+ db_ok = False
518
+ if client is not None and db is not None: # Corrected check
519
+ try:
520
+ db.list_collection_names()
521
+ db_status = "MongoDB connection OK."
522
+ db_ok = True
523
+ except Exception as e:
524
+ db_status = f"MongoDB connection error: {e}"
525
+
526
+ batch_processor_status = "Batch processor not running or state unknown."
527
+ if batch_processor_task is not None :
528
+ if not batch_processor_task.done():
529
+ batch_processor_status = "Batch processor is running."
530
+ else:
531
+ batch_processor_status = "Batch processor task is done (may have completed or errored)."
532
+
533
+ if db_ok:
534
+ return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
535
+ else:
536
+ # Return a JSON response for HTTPException as well for consistency
537
+ raise HTTPException(
538
+ status_code=503,
539
+ detail={"message": "Service unavailable.", "database": db_status, "batch_processor": batch_processor_status}
540
+ )
541
+
542
+
543
+ @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
544
+ async def extract_email_data(request: ProcessEmailRequest):
545
+ if extracted_emails_collection is None:
546
+ raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
547
+ try:
548
+ current_date_val = date.today()
549
+ extracted_data = await asyncio.to_thread(
550
+ _process_email_internal, request.email_text, request.groq_api_key, current_date_val
551
+ )
552
+ extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
553
+ for appt in extracted_data_dict.get('appointments', []):
554
+ if isinstance(appt.get('start_date'), date): appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
555
+ if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
556
+ for task_item in extracted_data_dict.get('tasks', []):
557
+ if isinstance(task_item.get('due_date'), date): task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
558
+
559
+ result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
560
+ # Pydantic model expects string ID, convert from ObjectId before assigning if needed
561
+ # However, PyObjectId should handle this if result.inserted_id is ObjectId
562
+ extracted_data.id = str(result.inserted_id) if isinstance(result.inserted_id, ObjectId) else result.inserted_id
563
+ return extracted_data
564
+ except ValueError as e:
565
+ raise HTTPException(status_code=400, detail=str(e))
566
+ except Exception as e:
567
+ traceback.print_exc()
568
+ raise HTTPException(status_code=500, detail=f"Internal server error during data extraction: {e}")
569
+
570
+
571
+ @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
572
+ async def extract_email_data_excel(request: ProcessEmailRequest):
573
+ raise HTTPException(status_code=501, detail="Excel functionality is currently disabled.")
574
+
575
+
576
+ @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
577
+ async def generate_email_reply(request: GenerateReplyRequest):
578
+ if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
579
+ raise HTTPException(status_code=503, detail="Reply generation service not fully initialized. Check server logs.")
580
+
581
+ future = asyncio.Future()
582
+ current_time = asyncio.get_event_loop().time()
583
+
584
+ async with reply_queue_condition:
585
+ reply_request_queue.append((request, future, current_time))
586
+ reply_queue_condition.notify()
587
+
588
+ try:
589
+ client_timeout = BATCH_TIMEOUT + 10.0
590
+ result = await asyncio.wait_for(future, timeout=client_timeout)
591
+ return result
592
+ except asyncio.TimeoutError:
593
+ if not future.done():
594
+ future.cancel()
595
+ raise HTTPException(status_code=504, detail=f"Request timed out after {client_timeout}s waiting for batch processing.")
596
+ except Exception as e:
597
+ if isinstance(e, HTTPException):
598
+ raise e
599
+ traceback.print_exc()
600
+ raise HTTPException(status_code=500, detail=f"Error processing your reply request: {str(e)}")
601
 
 
 
 
602
 
603
+ @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
604
+ async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
605
+ if extracted_emails_collection is None:
606
+ raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
607
+ mongo_query: Dict[str, Any] = {}
608
+ if query_params.contact_name: mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"}
609
+ if query_params.appointment_title: mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
610
+ if query_params.task_title: mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
611
 
612
+ if query_params.from_date or query_params.to_date:
613
+ date_query: Dict[str, datetime] = {}
614
+ if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
615
+ if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
616
+ if date_query : mongo_query["processed_at"] = date_query
617
 
618
+ try:
619
+ cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
620
+ extracted_docs_raw = await asyncio.to_thread(list, cursor)
621
+
622
+ results = []
623
+ for doc_raw in extracted_docs_raw:
624
+ # Convert _id to string for Pydantic model if it's an ObjectId
625
+ if isinstance(doc_raw.get("_id"), ObjectId):
626
+ doc_raw["_id"] = str(doc_raw["_id"])
627
+
628
+ # Convert datetime objects back to date objects for Pydantic model fields that are `date`
629
+ if 'appointments' in doc_raw:
630
+ for appt in doc_raw['appointments']:
631
+ if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
632
+ if isinstance(appt.get('end_date'), datetime): appt['end_date'] = appt['end_date'].date()
633
+ if 'tasks' in doc_raw:
634
+ for task_item in doc_raw['tasks']:
635
+ if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
636
+ results.append(ExtractedData(**doc_raw))
637
+ return results
638
+ except Exception as e:
639
+ traceback.print_exc()
640
+ raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
641
 
 
 
 
 
 
642
 
643
+ @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
644
+ async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
645
+ if generated_replies_collection is None:
646
+ raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.")
647
+ mongo_query: Dict[str, Any] = {}
648
+ if query_params.language: mongo_query["language"] = query_params.language
649
+ if query_params.style: mongo_query["style"] = query_params.style
650
+ if query_params.tone: mongo_query["tone"] = query_params.tone
651
 
652
+ if query_params.from_date or query_params.to_date:
653
+ date_query: Dict[str, datetime] = {}
654
+ if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
655
+ if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
656
+ if date_query: mongo_query["generated_at"] = date_query
657
+
658
+ try:
659
+ cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
660
+ generated_docs_raw = await asyncio.to_thread(list, cursor)
661
+ results = []
662
+ for doc_raw in generated_docs_raw:
663
+ if isinstance(doc_raw.get("_id"), ObjectId):
664
+ doc_raw["_id"] = str(doc_raw["_id"])
665
+ results.append(GeneratedReplyData(**doc_raw))
666
+ return results
667
+ except Exception as e:
668
+ traceback.print_exc()
669
+ raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")
670