precison9 commited on
Commit
66aaed6
·
verified ·
1 Parent(s): efd4eb0

Update flask_Character.py

Browse files
Files changed (1) hide show
  1. flask_Character.py +123 -630
flask_Character.py CHANGED
@@ -1,657 +1,150 @@
1
- import json
2
- import re
3
- from datetime import date, datetime, timedelta
4
- from typing import List, Optional, Literal, Dict, Any, Tuple
5
  import os
6
- import traceback
7
- import asyncio
8
-
9
- from fastapi import FastAPI, HTTPException, Response, Query, Depends
10
- from fastapi.responses import FileResponse
11
- from fastapi.exception_handlers import http_exception_handler
12
- from starlette.exceptions import HTTPException as StarletteHTTPException
13
- from langchain.prompts import PromptTemplate
14
- from langchain_groq import ChatGroq
15
- from pydantic import BaseModel, Field, BeforeValidator, model_serializer
16
- from typing_extensions import Annotated
17
- import uvicorn
18
-
19
  from pymongo import MongoClient
20
- from pymongo.errors import ConnectionFailure, OperationFailure
21
- from bson import ObjectId
22
-
23
- # --- MongoDB Configuration ---
24
- # Load from environment variables for secure deployment on Hugging Face
25
- MONGO_URI = os.getenv("MONGO_URI", "mongodb+srv://user:pass@cluster.mongodb.net/dbname")
26
- DB_NAME = "email_assistant_db"
27
- EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
28
- GENERATED_REPLIES_COLLECTION = "generated_replies"
29
-
30
- client: Optional[MongoClient] = None
31
- db: Optional[Any] = None
32
- extracted_emails_collection: Optional[Any] = None
33
- generated_replies_collection: Optional[Any] = None
34
-
35
- # --- Pydantic ObjectId Handling ---
36
- class CustomObjectId(str):
37
- @classmethod
38
- def __get_validators__(cls):
39
- yield cls.validate
40
-
41
- @classmethod
42
- def validate(cls, v, info):
43
- if not ObjectId.is_valid(v):
44
- raise ValueError("Invalid ObjectId")
45
- return str(v)
46
-
47
- @classmethod
48
- def __get_pydantic_json_schema__(cls, core_schema, handler):
49
- json_schema = handler(core_schema)
50
- json_schema["type"] = "string"
51
- json_schema["example"] = "60c728ef238b9c7b9e0f6c2a"
52
- return json_schema
53
-
54
- PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
55
-
56
- # ---------------------- Models ----------------------
57
- class Contact(BaseModel):
58
- name: str
59
- last_name: str
60
- email: Optional[str] = None
61
- phone_number: Optional[str] = None
62
-
63
- class Appointment(BaseModel):
64
- title: str
65
- description: str
66
- start_date: date
67
- start_time: Optional[str] = None
68
- end_date: Optional[date] = None
69
- end_time: Optional[str] = None
70
-
71
- class Task(BaseModel):
72
- task_title: str
73
- task_description: str
74
- due_date: date
75
-
76
- class ExtractedData(BaseModel):
77
- id: Optional[PyObjectId] = Field(alias="_id", default=None)
78
- contacts: List[Contact]
79
- appointments: List[Appointment]
80
- tasks: List[Task]
81
- original_email_text: str
82
- processed_at: datetime = Field(default_factory=datetime.utcnow)
83
-
84
- class Config:
85
- populate_by_name = True
86
- arbitrary_types_allowed = True
87
-
88
- @model_serializer(when_used='json')
89
- def serialize_model(self):
90
- data = self.model_dump(by_alias=True, exclude_none=True)
91
- if "_id" in data and isinstance(data["_id"], ObjectId):
92
- data["_id"] = str(data["_id"])
93
- if 'appointments' in data:
94
- for appt in data['appointments']:
95
- if isinstance(appt.get('start_date'), date):
96
- appt['start_date'] = appt['start_date'].isoformat()
97
- if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
98
- appt['end_date'] = appt['end_date'].isoformat()
99
- if 'tasks' in data:
100
- for task_item in data['tasks']:
101
- if isinstance(task_item.get('due_date'), date):
102
- task_item['due_date'] = task_item['due_date'].isoformat()
103
- return data
104
-
105
- class ProcessEmailRequest(BaseModel):
106
- email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
107
- groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") # Should be handled securely, see notes below
108
-
109
- class GenerateReplyRequest(BaseModel):
110
- email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
111
- groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") # Should be handled securely, see notes below
112
- language: Literal["Italian", "English"] = Field("Italian", examples=["Italian", "English"])
113
- length: str = Field("Auto", examples=["Short", "Medium", "Long", "Auto"])
114
- style: str = Field("Professional", examples=["Professional", "Casual", "Formal", "Informal"])
115
- tone: str = Field("Friendly", examples=["Friendly", "Neutral", "Urgent", "Empathetic"])
116
- emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
117
-
118
- class GeneratedReplyData(BaseModel):
119
- id: Optional[PyObjectId] = Field(alias="_id", default=None)
120
- original_email_text: str
121
- generated_reply_text: str
122
- language: str
123
- length: str
124
- style: str
125
- tone: str
126
- emoji: str
127
- generated_at: datetime = Field(default_factory=datetime.utcnow)
128
-
129
- class Config:
130
- populate_by_name = True
131
- arbitrary_types_allowed = True
132
-
133
- @model_serializer(when_used='json')
134
- def serialize_model(self):
135
- data = self.model_dump(by_alias=True, exclude_none=True)
136
- if "_id" in data and isinstance(data["_id"], ObjectId):
137
- data["_id"] = str(data["_id"])
138
- return data
139
-
140
- # --- Query Models for GET Endpoints ---
141
- class ExtractedEmailQuery(BaseModel):
142
- contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
143
- appointment_title: Optional[str] = Query(None, description="Filter by appointment title (case-insensitive partial match).")
144
- task_title: Optional[str] = Query(None, description="Filter by task title (case-insensitive partial match).")
145
- from_date: Optional[date] = Query(None, description="Filter by data processed on or after this date (YYYY-MM-DD).")
146
- to_date: Optional[date] = Query(None, description="Filter by data processed on or before this date (YYYY-MM-DD).")
147
- limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
148
-
149
- class GeneratedReplyQuery(BaseModel):
150
- language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by reply language.")
151
- style: Optional[str] = Query(None, description="Filter by reply style (e.g., Professional, Casual).")
152
- tone: Optional[str] = Query(None, description="Filter by reply tone (e.g., Friendly, Neutral).")
153
- from_date: Optional[date] = Query(None, description="Filter by data generated on or after this date (YYYY-MM-DD).")
154
- to_date: Optional[date] = Query(None, description="Filter by data generated on or before this date (YYYY-MM-DD).")
155
- limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
156
-
157
- # ---------------------- Utility Functions ----------------------
158
- def extract_last_json_block(text: str) -> Optional[str]:
159
- pattern = r'```json\s*(.*?)\s*```'
160
- matches = re.findall(pattern, text, re.DOTALL)
161
- if matches:
162
- return matches[-1].strip()
163
- match = re.search(r'\{.*\}', text, re.DOTALL)
164
- if match:
165
- return match.group(0)
166
- return None
167
-
168
- def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]:
169
- if not date_str: return None
170
- date_str_lower = date_str.lower().strip()
171
- if date_str_lower == "today": return current_date
172
- if date_str_lower == "tomorrow": return current_date + timedelta(days=1)
173
- try:
174
- return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
175
- except ValueError:
176
- return current_date
177
-
178
- def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
179
- def split_name(full_name: str) -> tuple[str, str]:
180
- parts = full_name.strip().split()
181
- name = parts[0] if parts else ""
182
- last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
183
- return name, last_name
184
-
185
- contacts_data = []
186
- for c in data.get("contacts", []):
187
- name_val, last_name_val = split_name(c.get("name", ""))
188
- contacts_data.append(Contact(name=name_val, last_name=last_name_val, email=c.get("email"), phone_number=c.get("phone_number")))
189
-
190
- appointments_data = []
191
- for a in data.get("appointments", []):
192
- start_date_val = parse_date(a.get("start_date", "today"), current_date) or current_date
193
- end_date_val = parse_date(a.get("end_date"), current_date)
194
-
195
- appointments_data.append(Appointment(
196
- title=a.get("title", "Untitled"), description=a.get("description", "No description"),
197
- start_date=start_date_val, start_time=a.get("start_time"),
198
- end_date=end_date_val, end_time=a.get("end_time")
199
- ))
200
-
201
- tasks_data = []
202
- for t in data.get("tasks", []):
203
- due_date_val = parse_date(t.get("due_date", "today"), current_date) or current_date
204
- tasks_data.append(Task(
205
- task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
206
- due_date=due_date_val
207
- ))
208
- return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text)
209
-
210
- # ---------------------- Core Logic (Internal Functions) ----------------------
211
- def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
212
- if not email_text: raise ValueError("Email text cannot be empty for processing.")
213
- llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
214
- prompt_today_str = current_date.isoformat()
215
- prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
216
- prompt_template_str = f"""
217
- You are an expert email assistant tasked with extracting structured information from an Italian email.
218
-
219
- **Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.**
220
- **DO NOT include any conversational text, explanations, or preambles outside the JSON block.**
221
- **The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".**
222
- If a category has no items, its list should be empty (e.g., "contacts": []).
223
-
224
- Here is the required JSON schema for each category:
225
-
226
- - **contacts**: List of Contact objects.
227
- Each Contact object must have:
228
- - `name` (string, full name)
229
- - `last_name` (string, last name) - You should infer this from the full name.
230
- - `email` (string, optional, null if not present)
231
- - `phone_number` (string, optional, null if not present)
232
-
233
- - **appointments**: List of Appointment objects.
234
- Each Appointment object must have:
235
- - `title` (string, short, meaningful title in Italian based on the meeting's purpose)
236
- - `description` (string, summary of the meeting's goal)
237
- - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
238
- - `start_time` (string, optional, e.g., "10:30 AM", null if not present)
239
- - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable)
240
- - `end_time` (string, optional, e.g., "11:00 AM", null if not present)
241
-
242
- - **tasks**: List of Task objects.
243
- Each Task object must have:
244
- - `task_title` (string, short summary of action item)
245
- - `task_description` (string, more detailed explanation)
246
- - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
247
-
248
- ---
249
-
250
- Email:
251
- {{email}}
252
- """
253
- prompt_template = PromptTemplate(input_variables=["email"], template=prompt_template_str) # Removed prompt_today_str and prompt_tomorrow_str as they are in the template string
254
- chain = prompt_template | llm
255
- try:
256
- llm_output = chain.invoke({"email": email_text})
257
- llm_output_str = llm_output.content
258
- json_str = extract_last_json_block(llm_output_str)
259
- if not json_str: raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
260
- json_data = json.loads(json_str)
261
- extracted_data = normalize_llm_output(json_data, current_date, email_text)
262
- return extracted_data
263
- except json.JSONDecodeError as e:
264
- raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
265
- except Exception as e:
266
- traceback.print_exc()
267
- raise Exception(f"An error occurred during email processing: {e}")
268
-
269
- def _generate_response_internal(
270
- email_text: str, api_key: str, language: Literal["Italian", "English"],
271
- length: str, style: str, tone: str, emoji: str
272
- ) -> str:
273
- if not email_text: return "Cannot generate reply for empty email text."
274
- llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
275
- prompt_template_str="""
276
- You are an assistant that helps reply to emails.
277
-
278
- Create a response to the following email with the following parameters:
279
- - Language: {language}
280
- - Length: {length}
281
- - Style: {style}
282
- - Tone: {tone}
283
- - Emoji usage: {emoji}
284
-
285
- Email:
286
- {email}
287
-
288
- Write only the reply body. Do not repeat the email or mention any instruction.
289
- """
290
- prompt = PromptTemplate(
291
- input_variables=["email", "language", "length", "style", "tone", "emoji"],
292
- template=prompt_template_str
293
- )
294
- chain = prompt | llm
295
- output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
296
- return output.content.strip()
297
-
298
- # --- Batching and Caching Configuration ---
299
- MAX_BATCH_SIZE = 20
300
- BATCH_TIMEOUT = 0.5
301
-
302
- reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
303
- reply_queue_lock = asyncio.Lock()
304
- reply_queue_condition = asyncio.Condition(lock=reply_queue_lock)
305
- batch_processor_task: Optional[asyncio.Task] = None
306
-
307
-
308
- # --- Batch Processor and Handler ---
309
- async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future):
310
- """Handles a single request: checks cache, calls LLM, stores result, and sets future."""
311
- if future.cancelled():
312
- return
313
- try:
314
- if generated_replies_collection is None:
315
- raise HTTPException(status_code=503, detail="Database service not available for caching/storage.")
316
-
317
- cache_query = {
318
- "original_email_text": request_data.email_text,
319
- "language": request_data.language,
320
- "length": request_data.length,
321
- "style": request_data.style,
322
- "tone": request_data.tone,
323
- "emoji": request_data.emoji,
324
- }
325
- # Use asyncio.to_thread for blocking MongoDB operations
326
- cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
327
-
328
- if cached_reply_doc:
329
- response = {
330
- "reply": cached_reply_doc["generated_reply_text"],
331
- "stored_id": str(cached_reply_doc["_id"]),
332
- "cached": True
333
- }
334
- if not future.done(): future.set_result(response)
335
- return
336
-
337
- reply_content = await asyncio.to_thread(
338
- _generate_response_internal,
339
- request_data.email_text,
340
- request_data.groq_api_key, # Groq API key is passed here
341
- request_data.language,
342
- request_data.length,
343
- request_data.style,
344
- request_data.tone,
345
- request_data.emoji
346
- )
347
-
348
- reply_data_to_store = GeneratedReplyData(
349
- original_email_text=request_data.email_text,
350
- generated_reply_text=reply_content,
351
- language=request_data.language,
352
- length=request_data.length,
353
- style=request_data.style,
354
- tone=request_data.tone,
355
- emoji=request_data.emoji
356
- )
357
- reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
358
-
359
- insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
360
- stored_id = str(insert_result.inserted_id)
361
-
362
- final_response = {
363
- "reply": reply_content,
364
- "stored_id": stored_id,
365
- "cached": False
366
- }
367
- if not future.done(): future.set_result(final_response)
368
-
369
- except Exception as e:
370
- traceback.print_exc()
371
- if not future.done():
372
- future.set_exception(e)
373
-
374
- async def process_reply_batches():
375
- """Continuously processes requests from the reply_request_queue in batches."""
376
- global reply_request_queue
377
- while True:
378
- batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
379
- async with reply_queue_condition:
380
- if not reply_request_queue:
381
- await reply_queue_condition.wait()
382
- if not reply_request_queue:
383
- continue
384
-
385
- now = asyncio.get_event_loop().time()
386
- oldest_item_timestamp = reply_request_queue[0][2]
387
-
388
- if len(reply_request_queue) >= MAX_BATCH_SIZE or \
389
- (now - oldest_item_timestamp >= BATCH_TIMEOUT):
390
- num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
391
- for _ in range(num_to_take):
392
- req, fut, _ = reply_request_queue.pop(0)
393
- batch_to_fire.append((req, fut))
394
- else:
395
- time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
396
- try:
397
- await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
398
- except asyncio.TimeoutError:
399
- pass
400
-
401
- if batch_to_fire:
402
- tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
403
- await asyncio.gather(*tasks)
404
- else:
405
- await asyncio.sleep(0.001)
406
-
407
-
408
- # ---------------------- FastAPI Application ----------------------
409
  app = FastAPI(
410
- title="Email Assistant API",
411
- description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
412
- version="1.1.0",
413
- docs_url="/",
414
- redoc_url="/redoc"
415
  )
416
 
417
- # --- Global Exception Handler ---
418
- @app.exception_handler(StarletteHTTPException)
419
- async def custom_http_exception_handler_wrapper(request, exc):
420
- return await http_exception_handler(request, exc)
421
-
422
- @app.exception_handler(Exception)
423
- async def global_exception_handler_wrapper(request, exc):
424
- print(f"Unhandled exception caught by global handler for request: {request.url}")
425
- traceback.print_exc()
426
- return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}"}), status_code=500, media_type="application/json")
427
-
428
-
429
- # --- FastAPI Event Handlers for MongoDB & Batch Processor ---
430
  @app.on_event("startup")
431
  async def startup_event():
432
- global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
433
- try:
434
- # Check if MONGO_URI is set before attempting connection
435
- if not MONGO_URI or MONGO_URI == "mongodb+srv://user:pass@cluster.mongodb.net/dbname":
436
- print("WARNING: MONGO_URI environment variable not set or using default. MongoDB connection will fail.")
437
- # Optionally raise an exception here or set a flag to disable DB functionality
438
- # For now, we'll let the connection attempt proceed and catch its failure.
439
 
440
- client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
441
- client.admin.command('ping')
442
- db = client[DB_NAME]
443
- extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
444
- generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
445
- print(f"Successfully connected to MongoDB: {DB_NAME}")
446
 
447
- if batch_processor_task is None:
448
- loop = asyncio.get_event_loop()
449
- batch_processor_task = loop.create_task(process_reply_batches())
450
- print("Batch processor task for replies started.")
 
 
 
 
451
 
452
- except (ConnectionFailure, OperationFailure) as e:
453
- print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
454
- client = None
455
- db = None
456
- extracted_emails_collection = None
457
- generated_replies_collection = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
458
  except Exception as e:
459
- print(f"ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}")
460
- traceback.print_exc()
461
- client = None
462
- db = None
463
- extracted_emails_collection = None
464
- generated_replies_collection = None
465
- finally:
466
- if client is not None and db is not None:
467
- try:
468
- client.admin.command('ping')
469
- except Exception:
470
- print("MongoDB ping failed after initial connection attempt during finally block.")
471
- client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
472
- else:
473
- print("MongoDB client or db object is None after connection attempt in startup.")
474
- if client is None or db is None:
475
- client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
476
- print("FastAPI app starting up. MongoDB client & Batch Processor initialization attempted.")
477
 
478
 
 
479
  @app.on_event("shutdown")
480
  async def shutdown_event():
481
- global client, batch_processor_task
482
- if batch_processor_task:
483
- batch_processor_task.cancel()
484
- try:
485
- await batch_processor_task
486
- except asyncio.CancelledError:
487
- print("Batch processor task for replies cancelled.")
488
- except Exception as e:
489
- print(f"Error during batch processor task shutdown: {e}")
490
- traceback.print_exc()
491
- batch_processor_task = None
492
 
493
- if client:
494
- client.close()
495
- print("FastAPI app shutting down. MongoDB client closed.")
496
 
497
-
498
- @app.get("/health", summary="Health Check")
499
  async def health_check():
500
- db_status = "MongoDB not connected. Check server startup logs."
501
- db_ok = False
502
- if client is not None and db is not None:
503
- try:
504
- db.list_collection_names()
505
- db_status = "MongoDB connection OK."
506
- db_ok = True
507
- except Exception as e:
508
- db_status = f"MongoDB connection error: {e}"
509
 
510
- batch_processor_status = "Batch processor not running or state unknown."
511
- if batch_processor_task is not None :
512
- if not batch_processor_task.done():
513
- batch_processor_status = "Batch processor is running."
514
- else:
515
- batch_processor_status = "Batch processor task is done (may have completed or errored)."
516
-
517
- if db_ok:
518
- return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
519
  else:
520
- raise HTTPException(
521
- status_code=503,
522
- detail={"message": "Service unavailable.", "database": db_status, "batch_processor": batch_processor_status}
523
- )
524
-
525
-
526
- @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
527
- async def extract_email_data(request: ProcessEmailRequest):
528
- if extracted_emails_collection is None:
529
- raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails. Check server logs for connection errors.")
530
- try:
531
- current_date_val = date.today()
532
- extracted_data = await asyncio.to_thread(
533
- _process_email_internal, request.email_text, request.groq_api_key, current_date_val
534
  )
535
- extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
536
- # Convert date objects to datetime for MongoDB storage
537
- for appt in extracted_data_dict.get('appointments', []):
538
- if isinstance(appt.get('start_date'), date): appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
539
- if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
540
- for task_item in extracted_data_dict.get('tasks', []):
541
- if isinstance(task_item.get('due_date'), date): task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
542
-
543
- result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
544
- extracted_data.id = str(result.inserted_id) if isinstance(result.inserted_id, ObjectId) else result.inserted_id
545
- return extracted_data
546
- except ValueError as e:
547
- raise HTTPException(status_code=400, detail=str(e))
548
- except Exception as e:
549
- traceback.print_exc()
550
- raise HTTPException(status_code=500, detail=f"Internal server error during data extraction: {e}")
551
-
552
-
553
- @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
554
- async def extract_email_data_excel(request: ProcessEmailRequest):
555
- raise HTTPException(status_code=501, detail="Excel functionality is currently disabled.")
556
-
557
-
558
- @app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
559
- async def generate_email_reply(request: GenerateReplyRequest):
560
- if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
561
- raise HTTPException(status_code=503, detail="Reply generation service not fully initialized. Check server logs.")
562
 
563
- future = asyncio.Future()
564
- current_time = asyncio.get_event_loop().time()
565
-
566
- async with reply_queue_condition:
567
- reply_request_queue.append((request, future, current_time))
568
- reply_queue_condition.notify()
569
-
570
- try:
571
- client_timeout = BATCH_TIMEOUT + 10.0
572
- result = await asyncio.wait_for(future, timeout=client_timeout)
573
- return result
574
- except asyncio.TimeoutError:
575
- if not future.done():
576
- future.cancel()
577
- raise HTTPException(status_code=504, detail=f"Request timed out after {client_timeout}s waiting for batch processing.")
578
- except Exception as e:
579
- if isinstance(e, HTTPException):
580
- raise e
581
- traceback.print_exc()
582
- raise HTTPException(status_code=500, detail=f"Error processing your reply request: {str(e)}")
583
-
584
-
585
- @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
586
- async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
587
- if extracted_emails_collection is None:
588
- raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails. Check server logs for connection errors.")
589
- mongo_query: Dict[str, Any] = {}
590
- if query_params.contact_name: mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"}
591
- if query_params.appointment_title: mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
592
- if query_params.task_title: mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
593
-
594
- if query_params.from_date or query_params.to_date:
595
- date_query: Dict[str, datetime] = {}
596
- if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
597
- if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
598
- if date_query : mongo_query["processed_at"] = date_query
599
 
600
  try:
601
- cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
602
- extracted_docs_raw = await asyncio.to_thread(list, cursor)
603
-
604
- results = []
605
- for doc_raw in extracted_docs_raw:
606
- if isinstance(doc_raw.get("_id"), ObjectId):
607
- doc_raw["_id"] = str(doc_raw["_id"])
608
-
609
- # Convert datetime objects back to date objects for Pydantic model fields that are `date`
610
- if 'appointments' in doc_raw:
611
- for appt in doc_raw['appointments']:
612
- if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
613
- if isinstance(appt.get('end_date'), datetime): appt['end_date'] = appt['end_date'].date()
614
- if 'tasks' in doc_raw:
615
- for task_item in doc_raw['tasks']:
616
- if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
617
- results.append(ExtractedData(**doc_raw))
618
- return results
619
  except Exception as e:
620
- traceback.print_exc()
621
- raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
622
-
623
 
624
- @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
625
- async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
626
- if generated_replies_collection is None:
627
- raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies. Check server logs for connection errors.")
628
- mongo_query: Dict[str, Any] = {}
629
- if query_params.language: mongo_query["language"] = query_params.language
630
- if query_params.style: mongo_query["style"] = query_params.style
631
- if query_params.tone: mongo_query["tone"] = query_params.tone
632
-
633
- if query_params.from_date or query_params.to_date:
634
- date_query: Dict[str, datetime] = {}
635
- if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
636
- if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
637
- if date_query: mongo_query["generated_at"] = date_query
638
-
639
- try:
640
- cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
641
- generated_docs_raw = await asyncio.to_thread(list, cursor)
642
- results = []
643
- for doc_raw in generated_docs_raw:
644
- if isinstance(doc_raw.get("_id"), ObjectId):
645
- doc_raw["_id"] = str(doc_raw["_id"])
646
- results.append(GeneratedReplyData(**doc_raw))
647
- return results
648
- except Exception as e:
649
- traceback.print_exc()
650
- raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")
651
 
652
- # --- Main execution for Hugging Face Spaces ---
653
- # Hugging Face Spaces expects the `app` object to be directly available.
654
- # The `if __name__ == "__main__":` block is usually not needed for deployment.
655
- # However, for local testing, you can keep it or use `uvicorn app:app --host 0.0.0.0 --port 8000`
656
- # if __name__ == "__main__":
657
- # uvicorn.run(app, host="0.0.0.0", port=8000)
 
 
 
 
 
1
  import os
2
+ import logging
3
+ from fastapi import FastAPI, Response, status
 
 
 
 
 
 
 
 
 
 
 
4
  from pymongo import MongoClient
5
+ from pymongo.errors import ConfigurationError, ConnectionFailure
6
+ import time # For simulating a batch processor initialization delay
7
+
8
+ # --- 1. Configuration and Global Variables ---
9
+ # Use a proper logger for better debugging and visibility
10
+ logger = logging.getLogger(__name__)
11
+ logger.setLevel(logging.INFO) # Set desired logging level (INFO, DEBUG, WARNING, ERROR, CRITICAL)
12
+ handler = logging.StreamHandler()
13
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
14
+ handler.setFormatter(formatter)
15
+ logger.addHandler(handler)
16
+
17
+ # Retrieve MONGO_URI from environment variable. If not set, it will be None.
18
+ MONGO_URI = os.getenv("MONGO_URI")
19
+ DB_NAME = "email_assistant_db" # Replace with your actual database name
20
+
21
+ # Global variables to hold MongoDB client and database instance
22
+ # These will be initialized in the startup event
23
+ mongo_client = None
24
+ mongo_db = None
25
+
26
+ # Placeholder for your Batch Processor (if applicable)
27
+ batch_processor_initialized = False
28
+
29
+ # --- 2. FastAPI Application Instance ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  app = FastAPI(
31
+ title="Character API",
32
+ description="API for managing character data and processing emails.",
33
+ version="0.0.1",
 
 
34
  )
35
 
36
+ # --- 3. Application Startup Event Handler ---
 
 
 
 
 
 
 
 
 
 
 
 
37
  @app.on_event("startup")
38
  async def startup_event():
39
+ global mongo_client, mongo_db, batch_processor_initialized
 
 
 
 
 
 
40
 
41
+ logger.info("Application startup initiated.")
 
 
 
 
 
42
 
43
+ # Check for MONGO_URI BEFORE attempting connection
44
+ if not MONGO_URI:
45
+ logger.critical("CRITICAL ERROR: MONGO_URI environment variable is NOT SET.")
46
+ logger.critical("Please set the MONGO_URI environment variable with your MongoDB Atlas connection string.")
47
+ logger.critical("Example: mongodb+srv://<user>:<pass>@cluster.mongodb.net/dbname?retryWrites=true&w=majority")
48
+ # Depending on your application's criticality, you might want to exit or raise an exception here
49
+ # raise ValueError("MONGO_URI environment variable is not set. Cannot connect to MongoDB.")
50
+ return # Prevent further execution if critical dependency is missing
51
 
52
+ try:
53
+ logger.info("Attempting to connect to MongoDB...")
54
+ # Initialize MongoClient
55
+ client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) # 5-second timeout for server selection
56
+
57
+ # The ismaster command is a lightweight way to check connectivity and auth.
58
+ # It triggers the actual connection attempt and DNS resolution.
59
+ client.admin.command('ismaster')
60
+
61
+ mongo_client = client
62
+ mongo_db = client[DB_NAME]
63
+ logger.info(f"SUCCESS: Successfully connected to MongoDB database: '{DB_NAME}'")
64
+
65
+ # --- Initialize Batch Processor (if applicable) ---
66
+ logger.info("Initializing Batch Processor...")
67
+ # Simulate some work
68
+ time.sleep(1) # Simulate delay for batch processor setup
69
+ batch_processor_initialized = True
70
+ logger.info("Batch Processor initialized successfully.")
71
+
72
+ except ConfigurationError as e:
73
+ logger.critical(f"FATAL ERROR: MongoDB Configuration Error: {e}")
74
+ logger.critical("Please check your MONGO_URI format and ensure the hostname is correct and resolvable.")
75
+ mongo_client = None
76
+ mongo_db = None
77
+ except ConnectionFailure as e:
78
+ logger.critical(f"FATAL ERROR: MongoDB Connection Failure: {e}")
79
+ logger.critical("Check network connectivity, MongoDB Atlas IP access list, and firewall rules.")
80
+ logger.critical("Also verify that your MongoDB user credentials are correct.")
81
+ mongo_client = None
82
+ mongo_db = None
83
  except Exception as e:
84
+ logger.critical(f"FATAL ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}", exc_info=True)
85
+ mongo_client = None
86
+ mongo_db = None
87
+
88
+ if mongo_client is None or mongo_db is None:
89
+ logger.critical("CRITICAL: MongoDB client or db object is None after connection attempt in startup. Application might not function correctly.")
90
+ else:
91
+ logger.info("Application startup complete. MongoDB client & Batch Processor initialization attempted.")
 
 
 
 
 
 
 
 
 
 
92
 
93
 
94
+ # --- 4. Application Shutdown Event Handler (Good Practice) ---
95
  @app.on_event("shutdown")
96
  async def shutdown_event():
97
+ global mongo_client
98
+ if mongo_client:
99
+ mongo_client.close()
100
+ logger.info("MongoDB connection closed.")
101
+ logger.info("Application shutdown complete.")
 
 
 
 
 
 
102
 
103
+ # --- 5. API Endpoints ---
 
 
104
 
105
+ # Health check endpoint
106
+ @app.get("/health")
107
  async def health_check():
108
+ db_status = "Connected" if mongo_db else "Disconnected"
109
+ batch_status = "Initialized" if batch_processor_initialized else "Not Initialized"
 
 
 
 
 
 
 
110
 
111
+ if mongo_db and batch_processor_initialized:
112
+ return {"status": "ok", "mongodb": db_status, "batch_processor": batch_status}
 
 
 
 
 
 
 
113
  else:
114
+ # Return 503 Service Unavailable if critical dependencies are not met
115
+ return Response(
116
+ content={"status": "degraded", "mongodb": db_status, "batch_processor": batch_status},
117
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
118
+ media_type="application/json"
 
 
 
 
 
 
 
 
 
119
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
 
121
+ # Example endpoint - requires MongoDB connection
122
+ @app.get("/characters/{character_id}")
123
+ async def get_character(character_id: str):
124
+ if not mongo_db:
125
+ logger.error(f"Attempted to access /characters/{character_id} but MongoDB is not connected.")
126
+ return Response(content="MongoDB is not connected.", status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
 
128
  try:
129
+ # Example: Fetch a character from a 'characters' collection
130
+ # Replace with your actual collection and query
131
+ character_data = await mongo_db.get_collection("characters").find_one({"_id": character_id})
132
+ # Note: PyMongo's find_one is synchronous. If using async, you'd need motor or an async wrapper.
133
+ # For a truly async FastAPI app with MongoDB, consider 'motor' (async pymongo driver).
134
+ # For now, we'll assume this is fine in an 'await' context for simplicity,
135
+ # but in a real-world async app, you'd want to run blocking operations in a thread pool.
136
+ # e.g., result = await loop.run_in_executor(None, mongo_db.collection.find_one, {"_id": character_id})
137
+
138
+ if character_data:
139
+ # MongoDB's _id might be an ObjectId, convert to string for JSON serialization
140
+ character_data["_id"] = str(character_data["_id"])
141
+ return character_data
142
+ else:
143
+ return Response(content="Character not found.", status_code=status.HTTP_404_NOT_FOUND)
 
 
 
144
  except Exception as e:
145
+ logger.error(f"Error fetching character {character_id}: {e}", exc_info=True)
146
+ return Response(content="Internal server error.", status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
 
147
 
148
+ # You would add more endpoints here for your application logic
149
+ # e.g., @app.post("/email-process") etc.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150