| | |
| | |
| | |
| | import os |
| | |
| | |
| | |
| | os.environ["NUMBA_CACHE_DIR"] = "/tmp/numba_cache" |
| | os.environ["NUMBA_DISABLE_CACHE"] = "1" |
| |
|
| | import json |
| | import re |
| | from datetime import date, datetime, timedelta |
| | from typing import List, Optional, Literal, Dict, Any, Tuple |
| | import traceback |
| | import asyncio |
| |
|
| | from fastapi import FastAPI, HTTPException, Response, Query, Depends, status |
| | from fastapi.responses import FileResponse |
| | from fastapi.exception_handlers import http_exception_handler |
| | from starlette.exceptions import HTTPException as StarletteHTTPException |
| | from langchain.prompts import PromptTemplate |
| | from langchain_groq import ChatGroq |
| | from pydantic import BaseModel, Field, BeforeValidator, model_serializer |
| | from typing_extensions import Annotated |
| | from pydantic_core import core_schema |
| |
|
| | from pymongo import MongoClient |
| | from pymongo.errors import ConnectionFailure, OperationFailure |
| | from bson import ObjectId |
| |
|
| | |
| | |
| | |
| | MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net" |
| | DB_NAME = "email_assistant_db" |
| | EXTRACTED_EMAILS_COLLECTION = "extracted_emails" |
| | GENERATED_REPLIES_COLLECTION = "generated_replies" |
| |
|
| | |
| | client: Optional[MongoClient] = None |
| | db: Optional[Any] = None |
| | extracted_emails_collection: Optional[Any] = None |
| | generated_replies_collection: Optional[Any] = None |
| |
|
| | |
| | class CustomObjectId(str): |
| | """ |
| | Custom Pydantic type for handling MongoDB ObjectIds. |
| | It validates that the input is a valid ObjectId string and |
| | ensures it's represented as a string in JSON Schema. |
| | """ |
| | @classmethod |
| | def __get_validators__(cls): |
| | yield cls.validate |
| |
|
| | @classmethod |
| | def validate(cls, v): |
| | |
| | |
| | |
| | if v is None or v == "": |
| | return None |
| |
|
| | if not isinstance(v, (str, ObjectId)): |
| | raise ValueError("ObjectId must be a string or ObjectId instance") |
| |
|
| | |
| | if isinstance(v, ObjectId): |
| | return str(v) |
| |
|
| | |
| | if not ObjectId.is_valid(v): |
| | raise ValueError("Invalid ObjectId format") |
| | return cls(v) |
| |
|
| | |
| | @classmethod |
| | def __get_pydantic_json_schema__( |
| | cls, _core_schema: core_schema.CoreSchema, handler |
| | ) -> Dict[str, Any]: |
| | |
| | |
| | json_schema = handler(core_schema.str_schema()) |
| | json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" |
| | return json_schema |
| |
|
| | |
| | PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)] |
| |
|
| |
|
| | |
| | class Contact(BaseModel): |
| | name: str |
| | last_name: str |
| | email: Optional[str] = None |
| | phone_number: Optional[str] = None |
| |
|
| | class Appointment(BaseModel): |
| | title: str |
| | description: str |
| | start_date: date |
| | start_time: Optional[str] = None |
| | end_date: Optional[date] = None |
| | end_time: Optional[str] = None |
| |
|
| | class Task(BaseModel): |
| | task_title: str |
| | task_description: str |
| | due_date: date |
| |
|
| | class ExtractedData(BaseModel): |
| | |
| | id: Optional[PyObjectId] = Field(alias="_id", default=None) |
| | contacts: List[Contact] |
| | appointments: List[Appointment] |
| | tasks: List[Task] |
| | original_email_text: str |
| | processed_at: datetime = Field(default_factory=datetime.utcnow) |
| |
|
| | class Config: |
| | populate_by_name = True |
| | arbitrary_types_allowed = True |
| |
|
| | |
| | @model_serializer(when_used='json') |
| | def serialize_model(self): |
| | data = self.model_dump(by_alias=True, exclude_none=True) |
| | |
| | if "_id" in data and isinstance(data["_id"], ObjectId): |
| | data["_id"] = str(data["_id"]) |
| | |
| | |
| | |
| | if 'appointments' in data: |
| | for appt in data['appointments']: |
| | if isinstance(appt.get('start_date'), date): |
| | appt['start_date'] = appt['start_date'].isoformat() |
| | if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: |
| | appt['end_date'] = appt['end_date'].isoformat() |
| | if 'tasks' in data: |
| | for task_item in data['tasks']: |
| | if isinstance(task_item.get('due_date'), date): |
| | task_item['due_date'] = task_item['due_date'].isoformat() |
| | return data |
| |
|
| | class ProcessEmailRequest(BaseModel): |
| | email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...") |
| | groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") |
| |
|
| | class GenerateReplyRequest(BaseModel): |
| | email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...") |
| | groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") |
| | language: Literal["Italian", "English"] = Field("Italian", examples=["Italian", "English"]) |
| | length: str = Field("Auto", examples=["Short", "Medium", "Long", "Auto"]) |
| | style: str = Field("Professional", examples=["Professional", "Casual", "Formal", "Informal"]) |
| | tone: str = Field("Friendly", examples=["Friendly", "Neutral", "Urgent", "Empathetic"]) |
| | emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"]) |
| |
|
| | class GeneratedReplyData(BaseModel): |
| | |
| | id: Optional[PyObjectId] = Field(alias="_id", default=None) |
| | original_email_text: str |
| | generated_reply_text: str |
| | language: str |
| | length: str |
| | style: str |
| | tone: str |
| | emoji: str |
| | generated_at: datetime = Field(default_factory=datetime.utcnow) |
| |
|
| | class Config: |
| | populate_by_name = True |
| | arbitrary_types_allowed = True |
| |
|
| | @model_serializer(when_used='json') |
| | def serialize_model(self): |
| | data = self.model_dump(by_alias=True, exclude_none=True) |
| | if "_id" in data and isinstance(data["_id"], ObjectId): |
| | data["_id"] = str(data["_id"]) |
| | return data |
| |
|
| | |
| | class GenerateReplyResponse(BaseModel): |
| | reply: str = Field(..., description="The AI-generated reply text.") |
| | stored_id: str = Field(..., description="The MongoDB ID of the stored reply.") |
| | cached: bool = Field(..., description="True if the reply was retrieved from cache, False if newly generated.") |
| |
|
| | |
| | class ExtractedEmailQuery(BaseModel): |
| | contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).") |
| | appointment_title: Optional[str] = Query(None, description="Filter by appointment title (case-insensitive partial match).") |
| | task_title: Optional[str] = Query(None, description="Filter by task title (case-insensitive partial match).") |
| | from_date: Optional[date] = Query(None, description="Filter by data processed on or after this date (YYYY-MM-DD).") |
| | to_date: Optional[date] = Query(None, description="Filter by data processed on or before this date (YYYY-MM-DD).") |
| | limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.") |
| |
|
| | class GeneratedReplyQuery(BaseModel): |
| | language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by reply language.") |
| | style: Optional[str] = Query(None, description="Filter by reply style (e.g., Professional, Casual).") |
| | tone: Optional[str] = Query(None, description="Filter by reply tone (e.g., Friendly, Neutral).") |
| | from_date: Optional[date] = Query(None, description="Filter by data generated on or after this date (YYYY-MM-DD).") |
| | to_date: Optional[date] = Query(None, description="Filter by data generated on or before this date (YYYY-MM-DD).") |
| | limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.") |
| |
|
| | |
| | def extract_last_json_block(text: str) -> Optional[str]: |
| | """ |
| | Extracts the last JSON block enclosed in ```json``` from a string, |
| | or a standalone JSON object if no code block is found. |
| | """ |
| | pattern = r'```json\s*(.*?)\s*```' |
| | matches = re.findall(pattern, text, re.DOTALL) |
| | if matches: |
| | return matches[-1].strip() |
| | |
| | match = re.search(r'\{.*\}', text, re.DOTALL) |
| | if match: |
| | return match.group(0) |
| | return None |
| |
|
| | def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]: |
| | """ |
| | Parses a date string, handling 'today', 'tomorrow', and YYYY-MM-DD format. |
| | Returns None if input is None or cannot be parsed into a valid date. |
| | """ |
| | if not date_str: |
| | return None |
| | date_str_lower = date_str.lower().strip() |
| | if date_str_lower == "today": |
| | return current_date |
| | if date_str_lower == "tomorrow": |
| | return current_date + timedelta(days=1) |
| | try: |
| | return datetime.strptime(date_str_lower, "%Y-%m-%d").date() |
| | except ValueError: |
| | |
| | |
| | return None |
| |
|
| | def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData: |
| | """ |
| | Normalizes and validates LLM extracted data into ExtractedData Pydantic model. |
| | Handles defaults for dates and name splitting. |
| | """ |
| | def split_name(full_name: str) -> tuple[str, str]: |
| | parts = full_name.strip().split() |
| | name = parts[0] if parts else "" |
| | last_name = " ".join(parts[1:]) if len(parts) > 1 else "" |
| | return name, last_name |
| |
|
| | contacts_data = [] |
| | for c in data.get("contacts", []): |
| | name_val, last_name_val = split_name(c.get("name", "")) |
| | contacts_data.append(Contact(name=name_val, last_name=last_name_val, email=c.get("email"), phone_number=c.get("phone_number"))) |
| |
|
| | appointments_data = [] |
| | for a in data.get("appointments", []): |
| | |
| | start_date_val = parse_date(a.get("start_date"), current_date) or current_date |
| | |
| | end_date_val = parse_date(a.get("end_date"), current_date) |
| |
|
| | appointments_data.append(Appointment( |
| | title=a.get("title", "Untitled"), description=a.get("description", "No description"), |
| | start_date=start_date_val, start_time=a.get("start_time"), |
| | end_date=end_date_val, end_time=a.get("end_time") |
| | )) |
| |
|
| | tasks_data = [] |
| | for t in data.get("tasks", []): |
| | |
| | due_date_val = parse_date(t.get("due_date"), current_date) or current_date |
| | tasks_data.append(Task( |
| | task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"), |
| | due_date=due_date_val |
| | )) |
| | return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text) |
| |
|
| | |
| | def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData: |
| | """ |
| | Internal function to process email text using LLM and extract structured data. |
| | """ |
| | if not email_text: |
| | raise ValueError("Email text cannot be empty for processing.") |
| |
|
| | llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key) |
| |
|
| | prompt_today_str = current_date.isoformat() |
| | prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat() |
| |
|
| | prompt_template_str = f""" |
| | You are an expert email assistant tasked with extracting structured information from an Italian email. |
| | |
| | **Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.** |
| | **DO NOT include any conversational text, explanations, or preambles outside the JSON block.** |
| | **The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".** |
| | If a category has no items, its list should be empty (e.g., "contacts": []). |
| | |
| | Here is the required JSON schema for each category: |
| | |
| | - **contacts**: List of Contact objects. |
| | Each Contact object must have: |
| | - `name` (string, full name) |
| | - `last_name` (string, last name) - You should infer this from the full name. |
| | - `email` (string, optional, null if not present) |
| | - `phone_number` (string, optional, null if not present) |
| | |
| | - **appointments**: List of Appointment objects. |
| | Each Appointment object must have: |
| | - `title` (string, short, meaningful title in Italian based on the meeting's purpose) |
| | - `description` (string, summary of the meeting's goal) |
| | - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow") |
| | - `start_time` (string, optional, e.g., "10:30 AM", null if not present) |
| | - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable) |
| | - `end_time` (string, optional, e.g., "11:00 AM", null if not present) |
| | |
| | - **tasks**: List of Task objects. |
| | Each Task object must have: |
| | - `task_title` (string, short summary of action item) |
| | - `task_description` (string, more detailed explanation) |
| | - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}") |
| | |
| | --- |
| | |
| | Email: |
| | {{email}} |
| | """ |
| | prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str) |
| | chain = prompt_template | llm |
| | try: |
| | llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str}) |
| | llm_output_str = llm_output.content |
| |
|
| | json_str = extract_last_json_block(llm_output_str) |
| |
|
| | if not json_str: |
| | raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}") |
| | json_data = json.loads(json_str) |
| |
|
| | extracted_data = normalize_llm_output(json_data, current_date, email_text) |
| | return extracted_data |
| | except json.JSONDecodeError as e: |
| | raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}") |
| | except Exception as e: |
| | traceback.print_exc() |
| | raise Exception(f"An error occurred during email processing: {e}") |
| |
|
| | def _generate_response_internal( |
| | email_text: str, api_key: str, language: Literal["Italian", "English"], |
| | length: str, style: str, tone: str, emoji: str |
| | ) -> str: |
| | """ |
| | Internal function to generate a reply to an email using LLM. |
| | """ |
| | if not email_text: |
| | return "Cannot generate reply for empty email text." |
| | llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key) |
| | prompt_template_str=""" |
| | You are an assistant that helps reply to emails. |
| | |
| | Create a response to the following email with the following parameters: |
| | - Language: {language} |
| | - Length: {length} |
| | - Style: {style} |
| | - Tone: {tone} |
| | - Emoji usage: {emoji} |
| | |
| | Email: |
| | {email} |
| | |
| | Write only the reply body. Do not repeat the email or mention any instruction. |
| | """ |
| | prompt = PromptTemplate( |
| | input_variables=["email", "language", "length", "style", "tone", "emoji"], |
| | template=prompt_template_str |
| | ) |
| | chain = prompt | llm |
| | output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji}) |
| | return output.content.strip() |
| |
|
| | |
| | MAX_BATCH_SIZE = 20 |
| | BATCH_TIMEOUT = 0.5 |
| |
|
| | reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = [] |
| | reply_queue_lock = asyncio.Lock() |
| | reply_queue_condition = asyncio.Condition(lock=reply_queue_lock) |
| | batch_processor_task: Optional[asyncio.Task] = None |
| |
|
| |
|
| | |
| | async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future): |
| | """Handles a single request: checks cache, calls LLM, stores result, and sets future.""" |
| | if future.cancelled(): |
| | return |
| | try: |
| | if generated_replies_collection is None: |
| | |
| | if not future.done(): |
| | future.set_exception(HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service not available for caching/storage.")) |
| | return |
| |
|
| | cache_query = { |
| | "original_email_text": request_data.email_text, |
| | "language": request_data.language, |
| | "length": request_data.length, |
| | "style": request_data.style, |
| | "tone": request_data.tone, |
| | "emoji": request_data.emoji, |
| | } |
| | |
| | cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query) |
| |
|
| | if cached_reply_doc: |
| | response = { |
| | "reply": cached_reply_doc["generated_reply_text"], |
| | "stored_id": str(cached_reply_doc["_id"]), |
| | "cached": True |
| | } |
| | if not future.done(): |
| | future.set_result(response) |
| | return |
| |
|
| | reply_content = await asyncio.to_thread( |
| | _generate_response_internal, |
| | request_data.email_text, |
| | request_data.groq_api_key, |
| | request_data.language, |
| | request_data.length, |
| | request_data.style, |
| | request_data.tone, |
| | request_data.emoji |
| | ) |
| |
|
| | reply_data_to_store = GeneratedReplyData( |
| | original_email_text=request_data.email_text, |
| | generated_reply_text=reply_content, |
| | language=request_data.language, |
| | length=request_data.length, |
| | style=request_data.style, |
| | tone=request_data.tone, |
| | emoji=request_data.emoji |
| | ) |
| | |
| | reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'}) |
| |
|
| | insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict) |
| | stored_id = str(insert_result.inserted_id) |
| |
|
| | final_response = { |
| | "reply": reply_content, |
| | "stored_id": stored_id, |
| | "cached": False |
| | } |
| | if not future.done(): |
| | future.set_result(final_response) |
| |
|
| | except Exception as e: |
| | traceback.print_exc() |
| | if not future.done(): |
| | |
| | future.set_exception(HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate reply: {e}")) |
| |
|
| |
|
| | async def process_reply_batches(): |
| | """Continuously processes requests from the reply_request_queue in batches.""" |
| | global reply_request_queue |
| | while True: |
| | batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = [] |
| | async with reply_queue_condition: |
| | if not reply_request_queue: |
| | |
| | await reply_queue_condition.wait() |
| | |
| | if not reply_request_queue: |
| | continue |
| |
|
| | now = asyncio.get_event_loop().time() |
| | |
| | if reply_request_queue: |
| | oldest_item_timestamp = reply_request_queue[0][2] |
| | else: |
| | |
| | continue |
| |
|
| | |
| | if len(reply_request_queue) >= MAX_BATCH_SIZE or \ |
| | (now - oldest_item_timestamp >= BATCH_TIMEOUT): |
| | num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE) |
| | for _ in range(num_to_take): |
| | |
| | if reply_request_queue: |
| | req, fut, _ = reply_request_queue.pop(0) |
| | batch_to_fire.append((req, fut)) |
| | else: |
| | |
| | time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp) |
| | try: |
| | await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait) |
| | except asyncio.TimeoutError: |
| | pass |
| |
|
| | if batch_to_fire: |
| | tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire] |
| | |
| | await asyncio.gather(*tasks) |
| | else: |
| | |
| | await asyncio.sleep(0.001) |
| |
|
| |
|
| | |
| | app = FastAPI( |
| | title="Email Assistant API", |
| | description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.", |
| | version="1.1.0", |
| | docs_url="/", |
| | redoc_url="/redoc" |
| | ) |
| |
|
| | |
| | |
| | @app.exception_handler(StarletteHTTPException) |
| | async def custom_http_exception_handler_wrapper(request, exc): |
| | """Handles FastAPI's internal HTTP exceptions.""" |
| | return await http_exception_handler(request, exc) |
| |
|
| | |
| | @app.exception_handler(Exception) |
| | async def global_exception_handler_wrapper(request, exc): |
| | """Handles all unhandled exceptions and returns a consistent JSON error response.""" |
| | print(f"Unhandled exception caught by global handler for request: {request.url}") |
| | traceback.print_exc() |
| | |
| | return Response( |
| | content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}), |
| | status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| | media_type="application/json" |
| | ) |
| |
|
| |
|
| | |
| | @app.on_event("startup") |
| | async def startup_event(): |
| | global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task |
| | print("FastAPI app startup sequence initiated.") |
| | try: |
| | |
| | client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) |
| | client.admin.command('ping') |
| | db = client[DB_NAME] |
| | extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION] |
| | generated_replies_collection = db[GENERATED_REPLIES_COLLECTION] |
| | print(f"Successfully connected to MongoDB: {DB_NAME}") |
| |
|
| | |
| | if batch_processor_task is None or batch_processor_task.done(): |
| | batch_processor_task = asyncio.create_task(process_reply_batches()) |
| | print("Batch processor task for replies started.") |
| | else: |
| | print("Batch processor task for replies is already running or being initialized.") |
| |
|
| | except (ConnectionFailure, OperationFailure) as e: |
| | print(f"ERROR: MongoDB Connection/Operation Failure: {e}") |
| | client = None |
| | db = None |
| | extracted_emails_collection = None |
| | generated_replies_collection = None |
| | except Exception as e: |
| | print(f"ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}") |
| | traceback.print_exc() |
| | client = None |
| | db = None |
| | extracted_emails_collection = None |
| | generated_replies_collection = None |
| | finally: |
| | if client is not None and db is not None: |
| | try: |
| | client.admin.command('ping') |
| | except Exception as e: |
| | print(f"MongoDB ping failed after initial connection attempt during finally block: {e}") |
| | client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None |
| | else: |
| | print("MongoDB client or db object is None after connection attempt in startup. Database likely not connected.") |
| | if client is None or db is None: |
| | client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None |
| | print("FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.") |
| |
|
| |
|
| | @app.on_event("shutdown") |
| | async def shutdown_event(): |
| | global client, batch_processor_task |
| | print("FastAPI app shutting down.") |
| | if batch_processor_task: |
| | batch_processor_task.cancel() |
| | try: |
| | await batch_processor_task |
| | except asyncio.CancelledError: |
| | print("Batch processor task for replies cancelled during shutdown.") |
| | except Exception as e: |
| | print(f"Error during batch processor task shutdown: {e}") |
| | traceback.print_exc() |
| | batch_processor_task = None |
| |
|
| | if client: |
| | client.close() |
| | print("MongoDB client closed.") |
| |
|
| |
|
| | |
| | @app.get("/health", summary="Health Check") |
| | async def health_check(): |
| | """ |
| | Checks the health of the API, including MongoDB connection and batch processor status. |
| | """ |
| | db_status = "MongoDB not connected." |
| | db_ok = False |
| | if client is not None and db is not None: |
| | try: |
| | await asyncio.to_thread(db.list_collection_names) |
| | db_status = "MongoDB connection OK." |
| | db_ok = True |
| | except Exception as e: |
| | db_status = f"MongoDB connection error: {e}" |
| | db_ok = False |
| |
|
| | batch_processor_status = "Batch processor not running." |
| | if batch_processor_task is not None: |
| | if not batch_processor_task.done(): |
| | batch_processor_status = "Batch processor is running." |
| | else: |
| | if batch_processor_task.exception(): |
| | batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}" |
| | else: |
| | batch_processor_status = "Batch processor task is done (may have completed or cancelled)." |
| | else: |
| | batch_processor_status = "Batch processor task has not been initialized." |
| |
|
| | if db_ok: |
| | return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status} |
| | else: |
| | raise HTTPException( |
| | status_code=status.HTTP_503_SERVICE_UNAVAILABLE, |
| | detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status} |
| | ) |
| |
|
| |
|
| | @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB") |
| | async def extract_email_data(request: ProcessEmailRequest): |
| | """ |
| | Receives an email, extracts contacts, appointments, and tasks using an LLM, |
| | and stores the extracted data in MongoDB. |
| | """ |
| | if extracted_emails_collection is None: |
| | raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.") |
| | try: |
| | current_date_val = date.today() |
| | extracted_data = await asyncio.to_thread( |
| | _process_email_internal, request.email_text, request.groq_api_key, current_date_val |
| | ) |
| |
|
| | extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True) |
| | if 'appointments' in extracted_data_dict: |
| | for appt in extracted_data_dict['appointments']: |
| | if isinstance(appt.get('start_date'), date): |
| | appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time()) |
| | if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: |
| | appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time()) |
| | if 'tasks' in extracted_data_dict: |
| | for task_item in extracted_data_dict['tasks']: |
| | if isinstance(task_item.get('due_date'), date): |
| | task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time()) |
| |
|
| | result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict) |
| |
|
| | extracted_data.id = result.inserted_id |
| | return extracted_data |
| | except ValueError as e: |
| | raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) |
| | except Exception as e: |
| | traceback.print_exc() |
| | raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during data extraction: {e}") |
| |
|
| |
|
| | @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)") |
| | async def extract_email_data_excel(request: ProcessEmailRequest): |
| | """ |
| | Placeholder for future functionality to extract data and provide as an Excel download. |
| | Currently disabled. |
| | """ |
| | raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Excel functionality is currently disabled.") |
| |
|
| |
|
| | @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email (batched & cached)") |
| | async def generate_email_reply(request: GenerateReplyRequest): |
| | """ |
| | Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji). |
| | Uses a batch processing system with caching for efficiency. |
| | """ |
| | if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None: |
| | raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Reply generation service not fully initialized. Check server logs for database or batch processor issues.") |
| |
|
| | future = asyncio.Future() |
| | current_time = asyncio.get_event_loop().time() |
| |
|
| | async with reply_queue_condition: |
| | reply_request_queue.append((request, future, current_time)) |
| | reply_queue_condition.notify() |
| |
|
| | try: |
| | client_timeout = BATCH_TIMEOUT + 10.0 |
| | result = await asyncio.wait_for(future, timeout=client_timeout) |
| | return result |
| | except asyncio.TimeoutError: |
| | if not future.done(): |
| | future.cancel() |
| | raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long.") |
| | except Exception as e: |
| | if isinstance(e, HTTPException): |
| | raise e |
| | traceback.print_exc() |
| | raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error processing your reply request: {str(e)}") |
| |
|
| |
|
| | @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB") |
| | async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()): |
| | if extracted_emails_collection is None: |
| | raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying extracted emails.") |
| | mongo_query: Dict[str, Any] = {} |
| | if query_params.contact_name: |
| | mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"} |
| | if query_params.appointment_title: |
| | mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"} |
| | if query_params.task_title: |
| | mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"} |
| |
|
| | if query_params.from_date or query_params.to_date: |
| | date_query: Dict[str, datetime] = {} |
| | if query_params.from_date: |
| | date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time()) |
| | if query_params.to_date: |
| | |
| | date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time()) |
| | if date_query : |
| | mongo_query["processed_at"] = date_query |
| |
|
| | try: |
| | cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit) |
| | extracted_docs_raw = await asyncio.to_thread(list, cursor) |
| |
|
| | results = [] |
| | for doc_raw in extracted_docs_raw: |
| | if 'appointments' in doc_raw: |
| | for appt in doc_raw['appointments']: |
| | if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date() |
| | if isinstance(appt.get('end_date'), datetime): appt['end_date'] = appt['end_date'].date() |
| | if 'tasks' in doc_raw: |
| | for task_item in doc_raw['tasks']: |
| | if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date() |
| | results.append(ExtractedData(**doc_raw)) |
| | return results |
| | except Exception as e: |
| | traceback.print_exc() |
| | raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying extracted emails: {e}") |
| |
|
| |
|
| | @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB") |
| | async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()): |
| | if generated_replies_collection is None: |
| | raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying generated replies.") |
| | mongo_query: Dict[str, Any] = {} |
| | if query_params.language: mongo_query["language"] = query_params.language |
| | if query_params.style: mongo_query["style"] = query_params.style |
| | if query_params.tone: mongo_query["tone"] = query_params.tone |
| |
|
| | if query_params.from_date or query_params.to_date: |
| | date_query: Dict[str, datetime] = {} |
| | if query_params.from_date: |
| | date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time()) |
| | if query_params.to_date: |
| | date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time()) |
| | if date_query: |
| | mongo_query["generated_at"] = date_query |
| |
|
| | try: |
| | cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit) |
| | generated_docs_raw = await asyncio.to_thread(list, cursor) |
| | results = [] |
| | for doc_raw in generated_docs_raw: |
| | results.append(GeneratedReplyData(**doc_raw)) |
| | return results |
| | except Exception as e: |
| | traceback.print_exc() |
| | raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying generated replies: {e}") |