|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
|
|
|
|
|
|
|
|
|
os.environ["NUMBA_CACHE_DIR"] = "/tmp/numba_cache" |
|
|
os.environ["NUMBA_DISABLE_CACHE"] = "1" |
|
|
|
|
|
import json |
|
|
import re |
|
|
from datetime import date, datetime, timedelta |
|
|
from typing import List, Optional, Literal, Dict, Any, Tuple |
|
|
import traceback |
|
|
import asyncio |
|
|
|
|
|
from fastapi import FastAPI, HTTPException, Response, Query, Depends, status |
|
|
from fastapi.responses import FileResponse |
|
|
from fastapi.exception_handlers import http_exception_handler |
|
|
from starlette.exceptions import HTTPException as StarletteHTTPException |
|
|
from langchain.prompts import PromptTemplate |
|
|
from langchain_groq import ChatGroq |
|
|
from pydantic import BaseModel, Field, BeforeValidator, model_serializer |
|
|
from typing_extensions import Annotated |
|
|
from pydantic_core import core_schema |
|
|
|
|
|
from pymongo import MongoClient |
|
|
from pymongo.errors import ConnectionFailure, OperationFailure |
|
|
from bson import ObjectId |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net" |
|
|
DB_NAME = "email_assistant_db" |
|
|
EXTRACTED_EMAILS_COLLECTION = "extracted_emails" |
|
|
GENERATED_REPLIES_COLLECTION = "generated_replies" |
|
|
|
|
|
|
|
|
client: Optional[MongoClient] = None |
|
|
db: Optional[Any] = None |
|
|
extracted_emails_collection: Optional[Any] = None |
|
|
generated_replies_collection: Optional[Any] = None |
|
|
|
|
|
|
|
|
class CustomObjectId(str): |
|
|
""" |
|
|
Custom Pydantic type for handling MongoDB ObjectIds. |
|
|
It validates that the input is a valid ObjectId string and |
|
|
ensures it's represented as a string in JSON Schema. |
|
|
""" |
|
|
@classmethod |
|
|
def __get_validators__(cls): |
|
|
yield cls.validate |
|
|
|
|
|
@classmethod |
|
|
def validate(cls, v): |
|
|
|
|
|
|
|
|
|
|
|
if v is None or v == "": |
|
|
return None |
|
|
|
|
|
if not isinstance(v, (str, ObjectId)): |
|
|
raise ValueError("ObjectId must be a string or ObjectId instance") |
|
|
|
|
|
|
|
|
if isinstance(v, ObjectId): |
|
|
return str(v) |
|
|
|
|
|
|
|
|
if not ObjectId.is_valid(v): |
|
|
raise ValueError("Invalid ObjectId format") |
|
|
return cls(v) |
|
|
|
|
|
|
|
|
@classmethod |
|
|
def __get_pydantic_json_schema__( |
|
|
cls, _core_schema: core_schema.CoreSchema, handler |
|
|
) -> Dict[str, Any]: |
|
|
|
|
|
|
|
|
json_schema = handler(core_schema.str_schema()) |
|
|
json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" |
|
|
return json_schema |
|
|
|
|
|
|
|
|
PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)] |
|
|
|
|
|
|
|
|
|
|
|
class Contact(BaseModel): |
|
|
name: str |
|
|
last_name: str |
|
|
email: Optional[str] = None |
|
|
phone_number: Optional[str] = None |
|
|
|
|
|
class Appointment(BaseModel): |
|
|
title: str |
|
|
description: str |
|
|
start_date: date |
|
|
start_time: Optional[str] = None |
|
|
end_date: Optional[date] = None |
|
|
end_time: Optional[str] = None |
|
|
|
|
|
class Task(BaseModel): |
|
|
task_title: str |
|
|
task_description: str |
|
|
due_date: date |
|
|
|
|
|
class ExtractedData(BaseModel): |
|
|
|
|
|
id: Optional[PyObjectId] = Field(alias="_id", default=None) |
|
|
contacts: List[Contact] |
|
|
appointments: List[Appointment] |
|
|
tasks: List[Task] |
|
|
original_email_text: str |
|
|
processed_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
|
|
|
class Config: |
|
|
populate_by_name = True |
|
|
arbitrary_types_allowed = True |
|
|
|
|
|
|
|
|
@model_serializer(when_used='json') |
|
|
def serialize_model(self): |
|
|
data = self.model_dump(by_alias=True, exclude_none=True) |
|
|
|
|
|
if "_id" in data and isinstance(data["_id"], ObjectId): |
|
|
data["_id"] = str(data["_id"]) |
|
|
|
|
|
|
|
|
|
|
|
if 'appointments' in data: |
|
|
for appt in data['appointments']: |
|
|
if isinstance(appt.get('start_date'), date): |
|
|
appt['start_date'] = appt['start_date'].isoformat() |
|
|
if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: |
|
|
appt['end_date'] = appt['end_date'].isoformat() |
|
|
if 'tasks' in data: |
|
|
for task_item in data['tasks']: |
|
|
if isinstance(task_item.get('due_date'), date): |
|
|
task_item['due_date'] = task_item['due_date'].isoformat() |
|
|
return data |
|
|
|
|
|
class ProcessEmailRequest(BaseModel): |
|
|
email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...") |
|
|
groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") |
|
|
|
|
|
class GenerateReplyRequest(BaseModel): |
|
|
email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...") |
|
|
groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") |
|
|
language: Literal["Italian", "English"] = Field("Italian", examples=["Italian", "English"]) |
|
|
length: str = Field("Auto", examples=["Short", "Medium", "Long", "Auto"]) |
|
|
style: str = Field("Professional", examples=["Professional", "Casual", "Formal", "Informal"]) |
|
|
tone: str = Field("Friendly", examples=["Friendly", "Neutral", "Urgent", "Empathetic"]) |
|
|
emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"]) |
|
|
|
|
|
class GeneratedReplyData(BaseModel): |
|
|
|
|
|
id: Optional[PyObjectId] = Field(alias="_id", default=None) |
|
|
original_email_text: str |
|
|
generated_reply_text: str |
|
|
language: str |
|
|
length: str |
|
|
style: str |
|
|
tone: str |
|
|
emoji: str |
|
|
generated_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
|
|
|
class Config: |
|
|
populate_by_name = True |
|
|
arbitrary_types_allowed = True |
|
|
|
|
|
@model_serializer(when_used='json') |
|
|
def serialize_model(self): |
|
|
data = self.model_dump(by_alias=True, exclude_none=True) |
|
|
if "_id" in data and isinstance(data["_id"], ObjectId): |
|
|
data["_id"] = str(data["_id"]) |
|
|
return data |
|
|
|
|
|
|
|
|
class GenerateReplyResponse(BaseModel): |
|
|
reply: str = Field(..., description="The AI-generated reply text.") |
|
|
stored_id: str = Field(..., description="The MongoDB ID of the stored reply.") |
|
|
cached: bool = Field(..., description="True if the reply was retrieved from cache, False if newly generated.") |
|
|
|
|
|
|
|
|
class ExtractedEmailQuery(BaseModel): |
|
|
contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).") |
|
|
appointment_title: Optional[str] = Query(None, description="Filter by appointment title (case-insensitive partial match).") |
|
|
task_title: Optional[str] = Query(None, description="Filter by task title (case-insensitive partial match).") |
|
|
from_date: Optional[date] = Query(None, description="Filter by data processed on or after this date (YYYY-MM-DD).") |
|
|
to_date: Optional[date] = Query(None, description="Filter by data processed on or before this date (YYYY-MM-DD).") |
|
|
limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.") |
|
|
|
|
|
class GeneratedReplyQuery(BaseModel): |
|
|
language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by reply language.") |
|
|
style: Optional[str] = Query(None, description="Filter by reply style (e.g., Professional, Casual).") |
|
|
tone: Optional[str] = Query(None, description="Filter by reply tone (e.g., Friendly, Neutral).") |
|
|
from_date: Optional[date] = Query(None, description="Filter by data generated on or after this date (YYYY-MM-DD).") |
|
|
to_date: Optional[date] = Query(None, description="Filter by data generated on or before this date (YYYY-MM-DD).") |
|
|
limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.") |
|
|
|
|
|
|
|
|
def extract_last_json_block(text: str) -> Optional[str]: |
|
|
""" |
|
|
Extracts the last JSON block enclosed in ```json``` from a string, |
|
|
or a standalone JSON object if no code block is found. |
|
|
""" |
|
|
pattern = r'```json\s*(.*?)\s*```' |
|
|
matches = re.findall(pattern, text, re.DOTALL) |
|
|
if matches: |
|
|
return matches[-1].strip() |
|
|
|
|
|
match = re.search(r'\{.*\}', text, re.DOTALL) |
|
|
if match: |
|
|
return match.group(0) |
|
|
return None |
|
|
|
|
|
def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]: |
|
|
""" |
|
|
Parses a date string, handling 'today', 'tomorrow', and YYYY-MM-DD format. |
|
|
Returns None if input is None or cannot be parsed into a valid date. |
|
|
""" |
|
|
if not date_str: |
|
|
return None |
|
|
date_str_lower = date_str.lower().strip() |
|
|
if date_str_lower == "today": |
|
|
return current_date |
|
|
if date_str_lower == "tomorrow": |
|
|
return current_date + timedelta(days=1) |
|
|
try: |
|
|
return datetime.strptime(date_str_lower, "%Y-%m-%d").date() |
|
|
except ValueError: |
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData: |
|
|
""" |
|
|
Normalizes and validates LLM extracted data into ExtractedData Pydantic model. |
|
|
Handles defaults for dates and name splitting. |
|
|
""" |
|
|
def split_name(full_name: str) -> tuple[str, str]: |
|
|
parts = full_name.strip().split() |
|
|
name = parts[0] if parts else "" |
|
|
last_name = " ".join(parts[1:]) if len(parts) > 1 else "" |
|
|
return name, last_name |
|
|
|
|
|
contacts_data = [] |
|
|
for c in data.get("contacts", []): |
|
|
name_val, last_name_val = split_name(c.get("name", "")) |
|
|
contacts_data.append(Contact(name=name_val, last_name=last_name_val, email=c.get("email"), phone_number=c.get("phone_number"))) |
|
|
|
|
|
appointments_data = [] |
|
|
for a in data.get("appointments", []): |
|
|
|
|
|
start_date_val = parse_date(a.get("start_date"), current_date) or current_date |
|
|
|
|
|
end_date_val = parse_date(a.get("end_date"), current_date) |
|
|
|
|
|
appointments_data.append(Appointment( |
|
|
title=a.get("title", "Untitled"), description=a.get("description", "No description"), |
|
|
start_date=start_date_val, start_time=a.get("start_time"), |
|
|
end_date=end_date_val, end_time=a.get("end_time") |
|
|
)) |
|
|
|
|
|
tasks_data = [] |
|
|
for t in data.get("tasks", []): |
|
|
|
|
|
due_date_val = parse_date(t.get("due_date"), current_date) or current_date |
|
|
tasks_data.append(Task( |
|
|
task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"), |
|
|
due_date=due_date_val |
|
|
)) |
|
|
return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text) |
|
|
|
|
|
|
|
|
def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData: |
|
|
""" |
|
|
Internal function to process email text using LLM and extract structured data. |
|
|
""" |
|
|
if not email_text: |
|
|
raise ValueError("Email text cannot be empty for processing.") |
|
|
|
|
|
llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key) |
|
|
|
|
|
prompt_today_str = current_date.isoformat() |
|
|
prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat() |
|
|
|
|
|
prompt_template_str = f""" |
|
|
You are an expert email assistant tasked with extracting structured information from an Italian email. |
|
|
|
|
|
**Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.** |
|
|
**DO NOT include any conversational text, explanations, or preambles outside the JSON block.** |
|
|
**The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".** |
|
|
If a category has no items, its list should be empty (e.g., "contacts": []). |
|
|
|
|
|
Here is the required JSON schema for each category: |
|
|
|
|
|
- **contacts**: List of Contact objects. |
|
|
Each Contact object must have: |
|
|
- `name` (string, full name) |
|
|
- `last_name` (string, last name) - You should infer this from the full name. |
|
|
- `email` (string, optional, null if not present) |
|
|
- `phone_number` (string, optional, null if not present) |
|
|
|
|
|
- **appointments**: List of Appointment objects. |
|
|
Each Appointment object must have: |
|
|
- `title` (string, short, meaningful title in Italian based on the meeting's purpose) |
|
|
- `description` (string, summary of the meeting's goal) |
|
|
- `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow") |
|
|
- `start_time` (string, optional, e.g., "10:30 AM", null if not present) |
|
|
- `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable) |
|
|
- `end_time` (string, optional, e.g., "11:00 AM", null if not present) |
|
|
|
|
|
- **tasks**: List of Task objects. |
|
|
Each Task object must have: |
|
|
- `task_title` (string, short summary of action item) |
|
|
- `task_description` (string, more detailed explanation) |
|
|
- `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}") |
|
|
|
|
|
--- |
|
|
|
|
|
Email: |
|
|
{{email}} |
|
|
""" |
|
|
prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str) |
|
|
chain = prompt_template | llm |
|
|
try: |
|
|
llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str}) |
|
|
llm_output_str = llm_output.content |
|
|
|
|
|
json_str = extract_last_json_block(llm_output_str) |
|
|
|
|
|
if not json_str: |
|
|
raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}") |
|
|
json_data = json.loads(json_str) |
|
|
|
|
|
extracted_data = normalize_llm_output(json_data, current_date, email_text) |
|
|
return extracted_data |
|
|
except json.JSONDecodeError as e: |
|
|
raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}") |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
raise Exception(f"An error occurred during email processing: {e}") |
|
|
|
|
|
def _generate_response_internal( |
|
|
email_text: str, api_key: str, language: Literal["Italian", "English"], |
|
|
length: str, style: str, tone: str, emoji: str |
|
|
) -> str: |
|
|
""" |
|
|
Internal function to generate a reply to an email using LLM. |
|
|
""" |
|
|
print(f"[{datetime.now()}] _generate_response_internal: Starting LLM call. API Key starts with: {api_key[:5]}...") |
|
|
if not email_text: |
|
|
print(f"[{datetime.now()}] _generate_response_internal: Email text is empty.") |
|
|
return "Cannot generate reply for empty email text." |
|
|
|
|
|
try: |
|
|
llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key) |
|
|
prompt_template_str=""" |
|
|
You are an assistant that helps reply to emails. |
|
|
|
|
|
Create a response to the following email with the following parameters: |
|
|
- Language: {language} |
|
|
- Length: {length} |
|
|
- Style: {style} |
|
|
- Tone: {tone} |
|
|
- Emoji usage: {emoji} |
|
|
|
|
|
Email: |
|
|
{email} |
|
|
|
|
|
Write only the reply body. Do not repeat the email or mention any instruction. |
|
|
""" |
|
|
prompt = PromptTemplate( |
|
|
input_variables=["email", "language", "length", "style", "tone", "emoji"], |
|
|
template=prompt_template_str |
|
|
) |
|
|
chain = prompt | llm |
|
|
print(f"[{datetime.now()}] _generate_response_internal: Invoking LLM chain...") |
|
|
output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji}) |
|
|
print(f"[{datetime.now()}] _generate_response_internal: LLM chain returned. Content length: {len(output.content)}.") |
|
|
return output.content.strip() |
|
|
except Exception as e: |
|
|
print(f"[{datetime.now()}] _generate_response_internal: ERROR during LLM invocation: {e}") |
|
|
traceback.print_exc() |
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Email Assistant API", |
|
|
description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, and caching.", |
|
|
version="1.1.0", |
|
|
docs_url="/", |
|
|
redoc_url="/redoc" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@app.exception_handler(StarletteHTTPException) |
|
|
async def custom_http_exception_handler_wrapper(request, exc): |
|
|
"""Handles FastAPI's internal HTTP exceptions.""" |
|
|
print(f"[{datetime.now()}] Caught StarletteHTTPException: {exc.status_code} - {exc.detail}") |
|
|
return await http_exception_handler(request, exc) |
|
|
|
|
|
|
|
|
@app.exception_handler(Exception) |
|
|
async def global_exception_handler_wrapper(request, exc): |
|
|
"""Handles all unhandled exceptions and returns a consistent JSON error response.""" |
|
|
print(f"[{datetime.now()}] Unhandled exception caught by global handler for request: {request.url}") |
|
|
traceback.print_exc() |
|
|
|
|
|
return Response( |
|
|
content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}), |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
media_type="application/json" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
global client, db, extracted_emails_collection, generated_replies_collection |
|
|
print(f"[{datetime.now()}] FastAPI app startup sequence initiated.") |
|
|
try: |
|
|
|
|
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) |
|
|
client.admin.command('ping') |
|
|
db = client[DB_NAME] |
|
|
extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION] |
|
|
generated_replies_collection = db[GENERATED_REPLIES_COLLECTION] |
|
|
print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}") |
|
|
|
|
|
except (ConnectionFailure, OperationFailure) as e: |
|
|
print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}") |
|
|
client = None |
|
|
db = None |
|
|
extracted_emails_collection = None |
|
|
generated_replies_collection = None |
|
|
except Exception as e: |
|
|
print(f"[{datetime.now()}] ERROR: An unexpected error occurred during MongoDB connection startup: {e}") |
|
|
traceback.print_exc() |
|
|
client = None |
|
|
db = None |
|
|
extracted_emails_collection = None |
|
|
generated_replies_collection = None |
|
|
finally: |
|
|
if client is not None and db is not None: |
|
|
try: |
|
|
client.admin.command('ping') |
|
|
except Exception as e: |
|
|
print(f"[{datetime.now()}] MongoDB ping failed after initial connection attempt during finally block: {e}") |
|
|
client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None |
|
|
else: |
|
|
print(f"[{datetime.now()}] MongoDB client or db object is None after connection attempt in startup. Database likely not connected.") |
|
|
if client is None or db is None: |
|
|
client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None |
|
|
print(f"[{datetime.now()}] FastAPI app startup sequence completed for MongoDB client initialization.") |
|
|
|
|
|
|
|
|
@app.on_event("shutdown") |
|
|
async def shutdown_event(): |
|
|
global client |
|
|
print(f"[{datetime.now()}] FastAPI app shutting down.") |
|
|
if client: |
|
|
client.close() |
|
|
print(f"[{datetime.now()}] MongoDB client closed.") |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/health", summary="Health Check") |
|
|
async def health_check(): |
|
|
""" |
|
|
Checks the health of the API, including MongoDB connection. |
|
|
""" |
|
|
db_status = "MongoDB not connected." |
|
|
db_ok = False |
|
|
if client is not None and db is not None: |
|
|
try: |
|
|
|
|
|
await asyncio.to_thread(db.list_collection_names) |
|
|
db_status = "MongoDB connection OK." |
|
|
db_ok = True |
|
|
except Exception as e: |
|
|
db_status = f"MongoDB connection error: {e}" |
|
|
db_ok = False |
|
|
|
|
|
if db_ok: |
|
|
return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status} |
|
|
else: |
|
|
raise HTTPException( |
|
|
status_code=503, |
|
|
detail={"message": "Service unavailable.", "database": db_status} |
|
|
) |
|
|
|
|
|
|
|
|
@app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email") |
|
|
async def generate_email_reply(request: GenerateReplyRequest): |
|
|
""" |
|
|
Generates a smart reply to the provided email text using an LLM. |
|
|
The generated reply is also stored in MongoDB for caching and historical purposes. |
|
|
""" |
|
|
if generated_replies_collection is None: |
|
|
raise HTTPException(status_code=503, detail="MongoDB not available for generated_replies.") |
|
|
|
|
|
try: |
|
|
|
|
|
cache_query = { |
|
|
"original_email_text": request.email_text, |
|
|
"language": request.language, |
|
|
"length": request.length, |
|
|
"style": request.style, |
|
|
"tone": request.tone, |
|
|
"emoji": request.emoji, |
|
|
} |
|
|
print(f"[{datetime.now()}] /generate-reply: Checking cache for reply...") |
|
|
|
|
|
cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query) |
|
|
|
|
|
if cached_reply_doc: |
|
|
print(f"[{datetime.now()}] /generate-reply: Reply found in cache. ID: {str(cached_reply_doc['_id'])}") |
|
|
return GenerateReplyResponse( |
|
|
reply=cached_reply_doc["generated_reply_text"], |
|
|
stored_id=str(cached_reply_doc["_id"]), |
|
|
cached=True |
|
|
) |
|
|
|
|
|
|
|
|
print(f"[{datetime.now()}] /generate-reply: Reply not in cache. Calling LLM for generation...") |
|
|
reply_content = await asyncio.to_thread( |
|
|
_generate_response_internal, |
|
|
request.email_text, |
|
|
request.groq_api_key, |
|
|
request.language, |
|
|
request.length, |
|
|
request.style, |
|
|
request.tone, |
|
|
request.emoji |
|
|
) |
|
|
print(f"[{datetime.now()}] /generate-reply: LLM call completed. Storing newly generated reply in MongoDB.") |
|
|
|
|
|
|
|
|
reply_data_to_store = GeneratedReplyData( |
|
|
original_email_text=request.email_text, |
|
|
generated_reply_text=reply_content, |
|
|
language=request.language, |
|
|
length=request.length, |
|
|
style=request.style, |
|
|
tone=request.tone, |
|
|
emoji=request.emoji |
|
|
) |
|
|
|
|
|
reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'}) |
|
|
|
|
|
|
|
|
insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict) |
|
|
stored_id = str(insert_result.inserted_id) |
|
|
|
|
|
print(f"[{datetime.now()}] /generate-reply: Reply stored in MongoDB. ID: {stored_id}") |
|
|
|
|
|
|
|
|
return GenerateReplyResponse( |
|
|
reply=reply_content, |
|
|
stored_id=stored_id, |
|
|
cached=False |
|
|
) |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
|
|
|
raise HTTPException(status_code=500, detail=f"Error generating or storing reply: {str(e)}") |
|
|
|
|
|
@app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email") |
|
|
async def extract_email_data(request: ProcessEmailRequest): |
|
|
""" |
|
|
Extracts contacts, appointments, and tasks from the provided email text. |
|
|
""" |
|
|
if extracted_emails_collection is None: |
|
|
raise HTTPException(status_code=503, detail="MongoDB not available.") |
|
|
|
|
|
current_date = date.today() |
|
|
|
|
|
print(f"[{datetime.now()}] /extract-data: Received request.") |
|
|
try: |
|
|
print(f"[{datetime.now()}] /extract-data: Calling internal processing function.") |
|
|
|
|
|
extracted_data = await asyncio.to_thread(_process_email_internal, request.email_text, request.groq_api_key, current_date) |
|
|
|
|
|
print(f"[{datetime.now()}] /extract-data: Internal processing complete. Preparing for DB insert.") |
|
|
|
|
|
|
|
|
data_to_insert = extracted_data.model_dump(by_alias=True, exclude_none=True, exclude={'id'}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if 'appointments' in data_to_insert: |
|
|
for appt in data_to_insert['appointments']: |
|
|
if isinstance(appt.get('start_date'), date): |
|
|
appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time()) |
|
|
if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: |
|
|
appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time()) |
|
|
if 'tasks' in data_to_insert: |
|
|
for task_item in data_to_insert['tasks']: |
|
|
if isinstance(task_item.get('due_date'), date): |
|
|
task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time()) |
|
|
|
|
|
|
|
|
print(f"[{datetime.now()}] /extract-data: Inserting into MongoDB... Data: {data_to_insert}") |
|
|
|
|
|
insert_result = await asyncio.to_thread(extracted_emails_collection.insert_one, data_to_insert) |
|
|
|
|
|
|
|
|
extracted_data.id = str(insert_result.inserted_id) |
|
|
print(f"[{datetime.now()}] /extract-data: Data inserted into MongoDB. ID: {extracted_data.id}") |
|
|
|
|
|
return extracted_data |
|
|
except ValueError as ve: |
|
|
raise HTTPException(status_code=400, detail=str(ve)) |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}") |
|
|
|
|
|
|
|
|
@app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query stored extracted email data") |
|
|
async def query_extracted_emails(query_params: ExtractedEmailQuery = Depends()): |
|
|
""" |
|
|
Queries extracted email data from MongoDB based on various filters. |
|
|
""" |
|
|
if extracted_emails_collection is None: |
|
|
raise HTTPException(status_code=503, detail="MongoDB not available.") |
|
|
|
|
|
mongo_query = {} |
|
|
if query_params.contact_name: |
|
|
|
|
|
mongo_query["$or"] = [ |
|
|
{"contacts.name": {"$regex": query_params.contact_name, "$options": "i"}}, |
|
|
{"contacts.last_name": {"$regex": query_params.contact_name, "$options": "i"}} |
|
|
] |
|
|
if query_params.appointment_title: |
|
|
mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"} |
|
|
if query_params.task_title: |
|
|
mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"} |
|
|
|
|
|
|
|
|
date_query = {} |
|
|
if query_params.from_date: |
|
|
date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time()) |
|
|
if query_params.to_date: |
|
|
date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time()) |
|
|
if date_query: |
|
|
mongo_query["processed_at"] = date_query |
|
|
|
|
|
try: |
|
|
|
|
|
cursor = await asyncio.to_thread(extracted_emails_collection.find, mongo_query) |
|
|
|
|
|
results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit))) |
|
|
|
|
|
|
|
|
return [ExtractedData(**doc) for doc in results] |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
raise HTTPException(status_code=500, detail=f"Error querying extracted emails: {e}") |
|
|
|
|
|
|
|
|
@app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query stored generated replies") |
|
|
async def query_generated_replies(query_params: GeneratedReplyQuery = Depends()): |
|
|
""" |
|
|
Queries generated email replies from MongoDB based on various filters. |
|
|
""" |
|
|
if generated_replies_collection is None: |
|
|
raise HTTPException(status_code=503, detail="MongoDB not available.") |
|
|
|
|
|
mongo_query = {} |
|
|
if query_params.language: |
|
|
mongo_query["language"] = query_params.language |
|
|
if query_params.style: |
|
|
mongo_query["style"] = query_params.style |
|
|
if query_params.tone: |
|
|
mongo_query["tone"] = query_params.tone |
|
|
|
|
|
|
|
|
date_query = {} |
|
|
if query_params.from_date: |
|
|
date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time()) |
|
|
if query_params.to_date: |
|
|
date_query["$lte"] = datetime.combine(query_params.to_date, datetime.max.time()) |
|
|
if date_query: |
|
|
mongo_query["generated_at"] = date_query |
|
|
|
|
|
try: |
|
|
|
|
|
cursor = await asyncio.to_thread(generated_replies_collection.find, mongo_query) |
|
|
|
|
|
results = await asyncio.to_thread(lambda: list(cursor.limit(query_params.limit))) |
|
|
|
|
|
|
|
|
return [GeneratedReplyData(**doc) for doc in results] |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
raise HTTPException(status_code=500, detail=f"Error querying generated replies: {e}") |