fastapi / flask_Character.py
precison9's picture
Update flask_Character.py
1fc423c verified
raw
history blame
31.3 kB
import json
import re
from datetime import date, datetime, timedelta
from typing import List, Optional, Literal, Dict, Any, Tuple
import os
import traceback
import asyncio
from fastapi import FastAPI, HTTPException, Response, Query, Depends
from fastapi.responses import FileResponse
from fastapi.exception_handlers import http_exception_handler
from starlette.exceptions import HTTPException as StarletteHTTPException
from langchain.prompts import PromptTemplate
from langchain_groq import ChatGroq
from pydantic import BaseModel, Field, BeforeValidator, model_serializer
from typing_extensions import Annotated
import uvicorn
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
from bson import ObjectId
# --- MongoDB Configuration ---
MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net" # Replace with your actual URI
DB_NAME = "email_assistant_db"
EXTRACTED_EMAILS_COLLECTION = "extracted_emails"
GENERATED_REPLIES_COLLECTION = "generated_replies"
client: Optional[MongoClient] = None
db: Optional[Any] = None # Changed to Optional[Any] as Database type is not directly imported for annotation here
extracted_emails_collection: Optional[Any] = None
generated_replies_collection: Optional[Any] = None
# --- Pydantic ObjectId Handling ---
class CustomObjectId(str):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v, info):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return str(v)
@classmethod
def __get_pydantic_json_schema__(cls, core_schema, handler):
json_schema = handler(core_schema)
json_schema["type"] = "string"
json_schema["example"] = "60c728ef238b9c7b9e0f6c2a"
return json_schema
PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)]
# ---------------------- Models ----------------------
class Contact(BaseModel):
name: str
last_name: str
email: Optional[str] = None
phone_number: Optional[str] = None
class Appointment(BaseModel):
title: str
description: str
start_date: date
start_time: Optional[str] = None
end_date: Optional[date] = None
end_time: Optional[str] = None
class Task(BaseModel):
task_title: str
task_description: str
due_date: date
class ExtractedData(BaseModel):
id: Optional[PyObjectId] = Field(alias="_id", default=None)
contacts: List[Contact]
appointments: List[Appointment]
tasks: List[Task]
original_email_text: str
processed_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
populate_by_name = True
arbitrary_types_allowed = True
@model_serializer(when_used='json')
def serialize_model(self):
data = self.model_dump(by_alias=True, exclude_none=True)
if "_id" in data and isinstance(data["_id"], ObjectId):
data["_id"] = str(data["_id"])
if 'appointments' in data:
for appt in data['appointments']:
if isinstance(appt.get('start_date'), date):
appt['start_date'] = appt['start_date'].isoformat()
if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None:
appt['end_date'] = appt['end_date'].isoformat()
if 'tasks' in data:
for task_item in data['tasks']:
if isinstance(task_item.get('due_date'), date):
task_item['due_date'] = task_item['due_date'].isoformat()
return data
class ProcessEmailRequest(BaseModel):
email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY")
class GenerateReplyRequest(BaseModel):
email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...")
groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY")
language: Literal["Italian", "English"] = Field("Italian", examples=["Italian", "English"])
length: str = Field("Auto", examples=["Short", "Medium", "Long", "Auto"])
style: str = Field("Professional", examples=["Professional", "Casual", "Formal", "Informal"])
tone: str = Field("Friendly", examples=["Friendly", "Neutral", "Urgent", "Empathetic"])
emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"])
class GeneratedReplyData(BaseModel):
id: Optional[PyObjectId] = Field(alias="_id", default=None)
original_email_text: str
generated_reply_text: str
language: str
length: str
style: str
tone: str
emoji: str
generated_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
populate_by_name = True
arbitrary_types_allowed = True
@model_serializer(when_used='json')
def serialize_model(self):
data = self.model_dump(by_alias=True, exclude_none=True)
if "_id" in data and isinstance(data["_id"], ObjectId):
data["_id"] = str(data["_id"])
return data
# --- Query Models for GET Endpoints ---
class ExtractedEmailQuery(BaseModel):
contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).")
appointment_title: Optional[str] = Query(None, description="Filter by appointment title (case-insensitive partial match).")
task_title: Optional[str] = Query(None, description="Filter by task title (case-insensitive partial match).")
from_date: Optional[date] = Query(None, description="Filter by data processed on or after this date (YYYY-MM-DD).")
to_date: Optional[date] = Query(None, description="Filter by data processed on or before this date (YYYY-MM-DD).")
limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
class GeneratedReplyQuery(BaseModel):
language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by reply language.")
style: Optional[str] = Query(None, description="Filter by reply style (e.g., Professional, Casual).")
tone: Optional[str] = Query(None, description="Filter by reply tone (e.g., Friendly, Neutral).")
from_date: Optional[date] = Query(None, description="Filter by data generated on or after this date (YYYY-MM-DD).")
to_date: Optional[date] = Query(None, description="Filter by data generated on or before this date (YYYY-MM-DD).")
limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.")
# ---------------------- Utility Functions ----------------------
def extract_last_json_block(text: str) -> Optional[str]:
pattern = r'```json\s*(.*?)\s*```'
matches = re.findall(pattern, text, re.DOTALL)
if matches:
return matches[-1].strip()
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
return match.group(0)
return None
def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]: # Allow None to pass through
if not date_str: return None # If input is None (e.g. optional end_date), return None
date_str_lower = date_str.lower().strip()
if date_str_lower == "today": return current_date
if date_str_lower == "tomorrow": return current_date + timedelta(days=1)
try:
return datetime.strptime(date_str_lower, "%Y-%m-%d").date()
except ValueError:
# For "appointments" "start_date", you assumed "today" if parsing failed or not present.
# For "end_date" it was optional. We need to be consistent.
# Given the original normalize_llm_output, start_date defaulted to today, end_date was optional.
# This parse_date is more general. The default handling should be in normalize_llm_output.
return current_date # Fallback, or raise error, or return None depending on strictness
def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData:
def split_name(full_name: str) -> tuple[str, str]:
parts = full_name.strip().split()
name = parts[0] if parts else ""
last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
return name, last_name
contacts_data = []
for c in data.get("contacts", []):
name_val, last_name_val = split_name(c.get("name", ""))
contacts_data.append(Contact(name=name_val, last_name=last_name_val, email=c.get("email"), phone_number=c.get("phone_number")))
appointments_data = []
for a in data.get("appointments", []):
start_date_val = parse_date(a.get("start_date", "today"), current_date) or current_date # Default to current_date if parse_date returns None
end_date_val = parse_date(a.get("end_date"), current_date) # parse_date can return None if end_date is not provided or invalid
appointments_data.append(Appointment(
title=a.get("title", "Untitled"), description=a.get("description", "No description"),
start_date=start_date_val, start_time=a.get("start_time"),
end_date=end_date_val, end_time=a.get("end_time")
))
tasks_data = []
for t in data.get("tasks", []):
due_date_val = parse_date(t.get("due_date", "today"), current_date) or current_date # Default to current_date
tasks_data.append(Task(
task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"),
due_date=due_date_val
))
return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text)
# ---------------------- Core Logic (Internal Functions) ----------------------
def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData:
if not email_text: raise ValueError("Email text cannot be empty for processing.")
llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key)
prompt_today_str = current_date.isoformat()
prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat()
# Ensure your full, detailed prompt is used here
prompt_template_str = f"""
You are an expert email assistant tasked with extracting structured information from an Italian email.
**Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.**
**DO NOT include any conversational text, explanations, or preambles outside the JSON block.**
**The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".**
If a category has no items, its list should be empty (e.g., "contacts": []).
Here is the required JSON schema for each category:
- **contacts**: List of Contact objects.
Each Contact object must have:
- `name` (string, full name)
- `last_name` (string, last name) - You should infer this from the full name.
- `email` (string, optional, null if not present)
- `phone_number` (string, optional, null if not present)
- **appointments**: List of Appointment objects.
Each Appointment object must have:
- `title` (string, short, meaningful title in Italian based on the meeting's purpose)
- `description` (string, summary of the meeting's goal)
- `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow")
- `start_time` (string, optional, e.g., "10:30 AM", null if not present)
- `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable)
- `end_time` (string, optional, e.g., "11:00 AM", null if not present)
- **tasks**: List of Task objects.
Each Task object must have:
- `task_title` (string, short summary of action item)
- `task_description` (string, more detailed explanation)
- `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}")
---
Email:
{{email}}
"""
prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str)
chain = prompt_template | llm
try:
# print(f"DEBUG: Invoking LLM with email_text length: {len(email_text)} and current_date: {current_date}")
llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str})
llm_output_str = llm_output.content
# print(f"DEBUG: Raw LLM output:\n{llm_output_str[:500]}...")
json_str = extract_last_json_block(llm_output_str)
# print(f"DEBUG: Extracted JSON string:\n{json_str}")
if not json_str: raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}")
json_data = json.loads(json_str)
# print(f"DEBUG: Parsed JSON data: {json.dumps(json_data, indent=2)}")
extracted_data = normalize_llm_output(json_data, current_date, email_text)
# print("DEBUG: Data normalized successfully.")
return extracted_data
except json.JSONDecodeError as e:
# print(f"ERROR: JSON Decode Error: {e}")
# print(f"ERROR: LLM response that caused error:\n{llm_output_str}")
raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}")
except Exception as e:
traceback.print_exc()
raise Exception(f"An error occurred during email processing: {e}")
def _generate_response_internal(
email_text: str, api_key: str, language: Literal["Italian", "English"],
length: str, style: str, tone: str, emoji: str
) -> str:
if not email_text: return "Cannot generate reply for empty email text."
llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key)
# Ensure your full, detailed prompt is used here
prompt_template_str="""
You are an assistant that helps reply to emails.
Create a response to the following email with the following parameters:
- Language: {language}
- Length: {length}
- Style: {style}
- Tone: {tone}
- Emoji usage: {emoji}
Email:
{email}
Write only the reply body. Do not repeat the email or mention any instruction.
"""
prompt = PromptTemplate(
input_variables=["email", "language", "length", "style", "tone", "emoji"],
template=prompt_template_str
)
chain = prompt | llm
output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji})
return output.content.strip()
# --- Batching and Caching Configuration ---
MAX_BATCH_SIZE = 20
BATCH_TIMEOUT = 0.5 # seconds
reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = []
reply_queue_lock = asyncio.Lock()
reply_queue_condition = asyncio.Condition(lock=reply_queue_lock)
batch_processor_task: Optional[asyncio.Task] = None
# --- Batch Processor and Handler ---
async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future):
"""Handles a single request: checks cache, calls LLM, stores result, and sets future."""
if future.cancelled():
return
try:
if generated_replies_collection is None:
raise HTTPException(status_code=503, detail="Database service not available for caching/storage.")
cache_query = {
"original_email_text": request_data.email_text,
"language": request_data.language,
"length": request_data.length,
"style": request_data.style,
"tone": request_data.tone,
"emoji": request_data.emoji,
}
cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query)
if cached_reply_doc:
response = {
"reply": cached_reply_doc["generated_reply_text"],
"stored_id": str(cached_reply_doc["_id"]),
"cached": True
}
if not future.done(): future.set_result(response)
return
reply_content = await asyncio.to_thread(
_generate_response_internal,
request_data.email_text,
request_data.groq_api_key,
request_data.language,
request_data.length,
request_data.style,
request_data.tone,
request_data.emoji
)
reply_data_to_store = GeneratedReplyData(
original_email_text=request_data.email_text,
generated_reply_text=reply_content,
language=request_data.language,
length=request_data.length,
style=request_data.style,
tone=request_data.tone,
emoji=request_data.emoji
)
reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'})
insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict)
stored_id = str(insert_result.inserted_id)
final_response = {
"reply": reply_content,
"stored_id": stored_id,
"cached": False
}
if not future.done(): future.set_result(final_response)
except Exception as e:
traceback.print_exc()
if not future.done():
future.set_exception(e)
async def process_reply_batches():
"""Continuously processes requests from the reply_request_queue in batches."""
global reply_request_queue
while True:
batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = []
async with reply_queue_condition:
if not reply_request_queue:
await reply_queue_condition.wait()
if not reply_request_queue:
continue
now = asyncio.get_event_loop().time()
oldest_item_timestamp = reply_request_queue[0][2]
if len(reply_request_queue) >= MAX_BATCH_SIZE or \
(now - oldest_item_timestamp >= BATCH_TIMEOUT):
num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE)
for _ in range(num_to_take):
req, fut, _ = reply_request_queue.pop(0)
batch_to_fire.append((req, fut))
else:
time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp)
try:
await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait)
except asyncio.TimeoutError:
pass # Loop will re-evaluate
if batch_to_fire:
tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire]
await asyncio.gather(*tasks)
else:
await asyncio.sleep(0.001)
# ---------------------- FastAPI Application ----------------------
app = FastAPI(
title="Email Assistant API",
description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.",
version="1.1.0", # Incremented version
docs_url="/",
redoc_url="/redoc"
)
# --- Global Exception Handler ---
@app.exception_handler(StarletteHTTPException)
async def custom_http_exception_handler_wrapper(request, exc):
return await http_exception_handler(request, exc)
@app.exception_handler(Exception)
async def global_exception_handler_wrapper(request, exc):
print(f"Unhandled exception caught by global handler for request: {request.url}")
traceback.print_exc()
# Ensure it returns a valid FastAPI response
return Response(content=json.dumps({"detail": f"Internal Server Error: {str(exc)}"}), status_code=500, media_type="application/json")
# --- FastAPI Event Handlers for MongoDB & Batch Processor ---
@app.on_event("startup")
async def startup_event():
global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task
try:
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
client.admin.command('ping')
db = client[DB_NAME]
extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION]
generated_replies_collection = db[GENERATED_REPLIES_COLLECTION]
print(f"Successfully connected to MongoDB: {DB_NAME}")
if batch_processor_task is None:
loop = asyncio.get_event_loop()
batch_processor_task = loop.create_task(process_reply_batches())
print("Batch processor task for replies started.")
except (ConnectionFailure, OperationFailure) as e:
print(f"ERROR: MongoDB Connection/Operation Failure: {e}")
client = None
db = None
extracted_emails_collection = None
generated_replies_collection = None
except Exception as e:
print(f"ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}")
traceback.print_exc()
client = None
db = None
extracted_emails_collection = None
generated_replies_collection = None
finally:
# Corrected condition for checking client and db
if client is not None and db is not None:
try:
client.admin.command('ping')
except Exception:
print("MongoDB ping failed after initial connection attempt during finally block.")
client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
else:
print("MongoDB client or db object is None after connection attempt in startup.")
if client is None or db is None: # Ensure all are None if one is
client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None
print("FastAPI app starting up. MongoDB client & Batch Processor initialization attempted.")
@app.on_event("shutdown")
async def shutdown_event():
global client, batch_processor_task
if batch_processor_task:
batch_processor_task.cancel()
try:
await batch_processor_task
except asyncio.CancelledError:
print("Batch processor task for replies cancelled.")
except Exception as e:
print(f"Error during batch processor task shutdown: {e}")
traceback.print_exc()
batch_processor_task = None
if client:
client.close()
print("FastAPI app shutting down. MongoDB client closed.")
@app.get("/health", summary="Health Check")
async def health_check():
db_status = "MongoDB not connected. Check server startup logs."
db_ok = False
if client is not None and db is not None: # Corrected check
try:
db.list_collection_names()
db_status = "MongoDB connection OK."
db_ok = True
except Exception as e:
db_status = f"MongoDB connection error: {e}"
batch_processor_status = "Batch processor not running or state unknown."
if batch_processor_task is not None :
if not batch_processor_task.done():
batch_processor_status = "Batch processor is running."
else:
batch_processor_status = "Batch processor task is done (may have completed or errored)."
if db_ok:
return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status}
else:
# Return a JSON response for HTTPException as well for consistency
raise HTTPException(
status_code=503,
detail={"message": "Service unavailable.", "database": db_status, "batch_processor": batch_processor_status}
)
@app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB")
async def extract_email_data(request: ProcessEmailRequest):
if extracted_emails_collection is None:
raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
try:
current_date_val = date.today()
extracted_data = await asyncio.to_thread(
_process_email_internal, request.email_text, request.groq_api_key, current_date_val
)
extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True)
for appt in extracted_data_dict.get('appointments', []):
if isinstance(appt.get('start_date'), date): appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time())
if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time())
for task_item in extracted_data_dict.get('tasks', []):
if isinstance(task_item.get('due_date'), date): task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time())
result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict)
# Pydantic model expects string ID, convert from ObjectId before assigning if needed
# However, PyObjectId should handle this if result.inserted_id is ObjectId
extracted_data.id = str(result.inserted_id) if isinstance(result.inserted_id, ObjectId) else result.inserted_id
return extracted_data
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Internal server error during data extraction: {e}")
@app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)")
async def extract_email_data_excel(request: ProcessEmailRequest):
raise HTTPException(status_code=501, detail="Excel functionality is currently disabled.")
@app.post("/generate-reply", summary="Generate a smart reply to an email (batched & cached)")
async def generate_email_reply(request: GenerateReplyRequest):
if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None:
raise HTTPException(status_code=503, detail="Reply generation service not fully initialized. Check server logs.")
future = asyncio.Future()
current_time = asyncio.get_event_loop().time()
async with reply_queue_condition:
reply_request_queue.append((request, future, current_time))
reply_queue_condition.notify()
try:
client_timeout = BATCH_TIMEOUT + 10.0
result = await asyncio.wait_for(future, timeout=client_timeout)
return result
except asyncio.TimeoutError:
if not future.done():
future.cancel()
raise HTTPException(status_code=504, detail=f"Request timed out after {client_timeout}s waiting for batch processing.")
except Exception as e:
if isinstance(e, HTTPException):
raise e
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Error processing your reply request: {str(e)}")
@app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB")
async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()):
if extracted_emails_collection is None:
raise HTTPException(status_code=503, detail="MongoDB not available for extracted_emails.")
mongo_query: Dict[str, Any] = {}
if query_params.contact_name: mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"}
if query_params.appointment_title: mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"}
if query_params.task_title: mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"}
if query_params.from_date or query_params.to_date:
date_query: Dict[str, datetime] = {}
if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
if date_query : mongo_query["processed_at"] = date_query
try:
cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit)
extracted_docs_raw = await asyncio.to_thread(list, cursor)
results = []
for doc_raw in extracted_docs_raw:
# Convert _id to string for Pydantic model if it's an ObjectId
if isinstance(doc_raw.get("_id"), ObjectId):
doc_raw["_id"] = str(doc_raw["_id"])
# Convert datetime objects back to date objects for Pydantic model fields that are `date`
if 'appointments' in doc_raw:
for appt in doc_raw['appointments']:
if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date()
if isinstance(appt.get('end_date'), datetime): appt['end_date'] = appt['end_date'].date()
if 'tasks' in doc_raw:
for task_item in doc_raw['tasks']:
if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date()
results.append(ExtractedData(**doc_raw))
return results
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}")
@app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB")
async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()):
if generated_replies_collection is None:
raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.")
mongo_query: Dict[str, Any] = {}
if query_params.language: mongo_query["language"] = query_params.language
if query_params.style: mongo_query["style"] = query_params.style
if query_params.tone: mongo_query["tone"] = query_params.tone
if query_params.from_date or query_params.to_date:
date_query: Dict[str, datetime] = {}
if query_params.from_date: date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time())
if query_params.to_date: date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time())
if date_query: mongo_query["generated_at"] = date_query
try:
cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit)
generated_docs_raw = await asyncio.to_thread(list, cursor)
results = []
for doc_raw in generated_docs_raw:
if isinstance(doc_raw.get("_id"), ObjectId):
doc_raw["_id"] = str(doc_raw["_id"])
results.append(GeneratedReplyData(**doc_raw))
return results
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}")