| |
| |
| |
| import os |
| |
| |
| |
| os.environ["NUMBA_CACHE_DIR"] = "/tmp/numba_cache" |
| os.environ["NUMBA_DISABLE_CACHE"] = "1" |
|
|
| import json |
| import re |
| from datetime import date, datetime, timedelta |
| from typing import List, Optional, Literal, Dict, Any, Tuple |
| import traceback |
| import asyncio |
|
|
| from fastapi import FastAPI, HTTPException, Response, Query, Depends, status |
| from fastapi.responses import FileResponse |
| from fastapi.exception_handlers import http_exception_handler |
| from starlette.exceptions import HTTPException as StarletteHTTPException |
| from langchain.prompts import PromptTemplate |
| from langchain_groq import ChatGroq |
| from pydantic import BaseModel, Field, BeforeValidator, model_serializer |
| from typing_extensions import Annotated |
| from pydantic_core import core_schema |
|
|
| from pymongo import MongoClient |
| from pymongo.errors import ConnectionFailure, OperationFailure |
| from bson import ObjectId |
|
|
| |
| |
| |
| MONGO_URI = "mongodb+srv://precison9:P1LhtFknkT75yg5L@cluster0.isuwpef.mongodb.net" |
| DB_NAME = "email_assistant_db" |
| EXTRACTED_EMAILS_COLLECTION = "extracted_emails" |
| GENERATED_REPLIES_COLLECTION = "generated_replies" |
|
|
| |
| client: Optional[MongoClient] = None |
| db: Optional[Any] = None |
| extracted_emails_collection: Optional[Any] = None |
| generated_replies_collection: Optional[Any] = None |
|
|
| |
| class CustomObjectId(str): |
| """ |
| Custom Pydantic type for handling MongoDB ObjectIds. |
| It validates that the input is a valid ObjectId string and |
| ensures it's represented as a string in JSON Schema. |
| """ |
| @classmethod |
| def __get_validators__(cls): |
| yield cls.validate |
|
|
| @classmethod |
| def validate(cls, v): |
| |
| |
| |
| if v is None or v == "": |
| return None |
|
|
| if not isinstance(v, (str, ObjectId)): |
| raise ValueError("ObjectId must be a string or ObjectId instance") |
|
|
| |
| if isinstance(v, ObjectId): |
| return str(v) |
|
|
| |
| if not ObjectId.is_valid(v): |
| raise ValueError("Invalid ObjectId format") |
| return cls(v) |
|
|
| |
| @classmethod |
| def __get_pydantic_json_schema__( |
| cls, _core_schema: core_schema.CoreSchema, handler |
| ) -> Dict[str, Any]: |
| |
| |
| json_schema = handler(core_schema.str_schema()) |
| json_schema["example"] = "60c728ef238b9c7b9e0f6c2a" |
| return json_schema |
|
|
| |
| PyObjectId = Annotated[CustomObjectId, BeforeValidator(str)] |
|
|
|
|
| |
| class Contact(BaseModel): |
| name: str |
| last_name: str |
| email: Optional[str] = None |
| phone_number: Optional[str] = None |
|
|
| class Appointment(BaseModel): |
| title: str |
| description: str |
| start_date: date |
| start_time: Optional[str] = None |
| end_date: Optional[date] = None |
| end_time: Optional[str] = None |
|
|
| class Task(BaseModel): |
| task_title: str |
| task_description: str |
| due_date: date |
|
|
| class ExtractedData(BaseModel): |
| |
| id: Optional[PyObjectId] = Field(alias="_id", default=None) |
| contacts: List[Contact] |
| appointments: List[Appointment] |
| tasks: List[Task] |
| original_email_text: str |
| processed_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
| class Config: |
| populate_by_name = True |
| arbitrary_types_allowed = True |
|
|
| |
| @model_serializer(when_used='json') |
| def serialize_model(self): |
| data = self.model_dump(by_alias=True, exclude_none=True) |
| |
| if "_id" in data and isinstance(data["_id"], ObjectId): |
| data["_id"] = str(data["_id"]) |
| |
| |
| |
| if 'appointments' in data: |
| for appt in data['appointments']: |
| if isinstance(appt.get('start_date'), date): |
| appt['start_date'] = appt['start_date'].isoformat() |
| if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: |
| appt['end_date'] = appt['end_date'].isoformat() |
| if 'tasks' in data: |
| for task_item in data['tasks']: |
| if isinstance(task_item.get('due_date'), date): |
| task_item['due_date'] = task_item['due_date'].isoformat() |
| return data |
|
|
| class ProcessEmailRequest(BaseModel): |
| email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...") |
| groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") |
|
|
| class GenerateReplyRequest(BaseModel): |
| email_text: str = Field(..., example="Oggetto: Follow-up progetto “Delta”...") |
| groq_api_key: str = Field(..., example="YOUR_GROQ_API_KEY") |
| language: Literal["Italian", "English"] = Field("Italian", examples=["Italian", "English"]) |
| length: str = Field("Auto", examples=["Short", "Medium", "Long", "Auto"]) |
| style: str = Field("Professional", examples=["Professional", "Casual", "Formal", "Informal"]) |
| tone: str = Field("Friendly", examples=["Friendly", "Neutral", "Urgent", "Empathetic"]) |
| emoji: str = Field("Auto", examples=["Auto", "None", "Occasional", "Frequent"]) |
|
|
| class GeneratedReplyData(BaseModel): |
| |
| id: Optional[PyObjectId] = Field(alias="_id", default=None) |
| original_email_text: str |
| generated_reply_text: str |
| language: str |
| length: str |
| style: str |
| tone: str |
| emoji: str |
| generated_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
| class Config: |
| populate_by_name = True |
| arbitrary_types_allowed = True |
|
|
| @model_serializer(when_used='json') |
| def serialize_model(self): |
| data = self.model_dump(by_alias=True, exclude_none=True) |
| if "_id" in data and isinstance(data["_id"], ObjectId): |
| data["_id"] = str(data["_id"]) |
| return data |
|
|
| |
| class GenerateReplyResponse(BaseModel): |
| reply: str = Field(..., description="The AI-generated reply text.") |
| stored_id: str = Field(..., description="The MongoDB ID of the stored reply.") |
| cached: bool = Field(..., description="True if the reply was retrieved from cache, False if newly generated.") |
|
|
| |
| class ExtractedEmailQuery(BaseModel): |
| contact_name: Optional[str] = Query(None, description="Filter by contact name (case-insensitive partial match).") |
| appointment_title: Optional[str] = Query(None, description="Filter by appointment title (case-insensitive partial match).") |
| task_title: Optional[str] = Query(None, description="Filter by task title (case-insensitive partial match).") |
| from_date: Optional[date] = Query(None, description="Filter by data processed on or after this date (YYYY-MM-DD).") |
| to_date: Optional[date] = Query(None, description="Filter by data processed on or before this date (YYYY-MM-DD).") |
| limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.") |
|
|
| class GeneratedReplyQuery(BaseModel): |
| language: Optional[Literal["Italian", "English"]] = Query(None, description="Filter by reply language.") |
| style: Optional[str] = Query(None, description="Filter by reply style (e.g., Professional, Casual).") |
| tone: Optional[str] = Query(None, description="Filter by reply tone (e.g., Friendly, Neutral).") |
| from_date: Optional[date] = Query(None, description="Filter by data generated on or after this date (YYYY-MM-DD).") |
| to_date: Optional[date] = Query(None, description="Filter by data generated on or before this date (YYYY-MM-DD).") |
| limit: int = Query(10, ge=1, le=100, description="Maximum number of results to return.") |
|
|
| |
| def extract_last_json_block(text: str) -> Optional[str]: |
| """ |
| Extracts the last JSON block enclosed in ```json``` from a string, |
| or a standalone JSON object if no code block is found. |
| """ |
| pattern = r'```json\s*(.*?)\s*```' |
| matches = re.findall(pattern, text, re.DOTALL) |
| if matches: |
| return matches[-1].strip() |
| |
| match = re.search(r'\{.*\}', text, re.DOTALL) |
| if match: |
| return match.group(0) |
| return None |
|
|
| def parse_date(date_str: Optional[str], current_date: date) -> Optional[date]: |
| """ |
| Parses a date string, handling 'today', 'tomorrow', and YYYY-MM-DD format. |
| Returns None if input is None or cannot be parsed into a valid date. |
| """ |
| if not date_str: |
| return None |
| date_str_lower = date_str.lower().strip() |
| if date_str_lower == "today": |
| return current_date |
| if date_str_lower == "tomorrow": |
| return current_date + timedelta(days=1) |
| try: |
| return datetime.strptime(date_str_lower, "%Y-%m-%d").date() |
| except ValueError: |
| |
| |
| return None |
|
|
| def normalize_llm_output(data: dict, current_date: date, original_email_text: str) -> ExtractedData: |
| """ |
| Normalizes and validates LLM extracted data into ExtractedData Pydantic model. |
| Handles defaults for dates and name splitting. |
| """ |
| def split_name(full_name: str) -> tuple[str, str]: |
| parts = full_name.strip().split() |
| name = parts[0] if parts else "" |
| last_name = " ".join(parts[1:]) if len(parts) > 1 else "" |
| return name, last_name |
|
|
| contacts_data = [] |
| for c in data.get("contacts", []): |
| name_val, last_name_val = split_name(c.get("name", "")) |
| contacts_data.append(Contact(name=name_val, last_name=last_name_val, email=c.get("email"), phone_number=c.get("phone_number"))) |
|
|
| appointments_data = [] |
| for a in data.get("appointments", []): |
| |
| start_date_val = parse_date(a.get("start_date"), current_date) or current_date |
| |
| end_date_val = parse_date(a.get("end_date"), current_date) |
|
|
| appointments_data.append(Appointment( |
| title=a.get("title", "Untitled"), description=a.get("description", "No description"), |
| start_date=start_date_val, start_time=a.get("start_time"), |
| end_date=end_date_val, end_time=a.get("end_time") |
| )) |
|
|
| tasks_data = [] |
| for t in data.get("tasks", []): |
| |
| due_date_val = parse_date(t.get("due_date"), current_date) or current_date |
| tasks_data.append(Task( |
| task_title=t.get("task_title", "Untitled"), task_description=t.get("task_description", "No description"), |
| due_date=due_date_val |
| )) |
| return ExtractedData(contacts=contacts_data, appointments=appointments_data, tasks=tasks_data, original_email_text=original_email_text) |
|
|
| |
| def _process_email_internal(email_text: str, api_key: str, current_date: date) -> ExtractedData: |
| """ |
| Internal function to process email text using LLM and extract structured data. |
| """ |
| if not email_text: |
| raise ValueError("Email text cannot be empty for processing.") |
|
|
| llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, max_tokens=2000, groq_api_key=api_key) |
|
|
| prompt_today_str = current_date.isoformat() |
| prompt_tomorrow_str = (current_date + timedelta(days=1)).isoformat() |
|
|
| prompt_template_str = f""" |
| You are an expert email assistant tasked with extracting structured information from an Italian email. |
| |
| **Your response MUST be a single, complete JSON object, wrapped in a ```json``` block.** |
| **DO NOT include any conversational text, explanations, or preambles outside the JSON block.** |
| **The JSON should contain three top-level keys: "contacts", "appointments", and "tasks".** |
| If a category has no items, its list should be empty (e.g., "contacts": []). |
| |
| Here is the required JSON schema for each category: |
| |
| - **contacts**: List of Contact objects. |
| Each Contact object must have: |
| - `name` (string, full name) |
| - `last_name` (string, last name) - You should infer this from the full name. |
| - `email` (string, optional, null if not present) |
| - `phone_number` (string, optional, null if not present) |
| |
| - **appointments**: List of Appointment objects. |
| Each Appointment object must have: |
| - `title` (string, short, meaningful title in Italian based on the meeting's purpose) |
| - `description` (string, summary of the meeting's goal) |
| - `start_date` (string, YYYY-MM-DD. If not explicitly mentioned, use "{prompt_today_str}" for "today", or "{prompt_tomorrow_str}" for "tomorrow") |
| - `start_time` (string, optional, e.g., "10:30 AM", null if not present) |
| - `end_date` (string, YYYY-MM-DD, optional, null if unknown or not applicable) |
| - `end_time` (string, optional, e.g., "11:00 AM", null if not present) |
| |
| - **tasks**: List of Task objects. |
| Each Task object must have: |
| - `task_title` (string, short summary of action item) |
| - `task_description` (string, more detailed explanation) |
| - `due_date` (string, YYYY-MM-DD. Infer from context, e.g., "entro domani" becomes "{prompt_tomorrow_str}", "today" becomes "{prompt_today_str}") |
| |
| --- |
| |
| Email: |
| {{email}} |
| """ |
| prompt_template = PromptTemplate(input_variables=["email", "prompt_today_str", "prompt_tomorrow_str"], template=prompt_template_str) |
| chain = prompt_template | llm |
| try: |
| llm_output = chain.invoke({"email": email_text, "prompt_today_str": prompt_today_str, "prompt_tomorrow_str": prompt_tomorrow_str}) |
| llm_output_str = llm_output.content |
|
|
| json_str = extract_last_json_block(llm_output_str) |
|
|
| if not json_str: |
| raise ValueError(f"No JSON block found in LLM output. LLM response: {llm_output_str}") |
| json_data = json.loads(json_str) |
|
|
| extracted_data = normalize_llm_output(json_data, current_date, email_text) |
| return extracted_data |
| except json.JSONDecodeError as e: |
| raise ValueError(f"Failed to parse JSON from LLM output: {e}\nLLM response was:\n{llm_output_str}") |
| except Exception as e: |
| traceback.print_exc() |
| raise Exception(f"An error occurred during email processing: {e}") |
|
|
| def _generate_response_internal( |
| email_text: str, api_key: str, language: Literal["Italian", "English"], |
| length: str, style: str, tone: str, emoji: str |
| ) -> str: |
| """ |
| Internal function to generate a reply to an email using LLM. |
| """ |
| print(f"[{datetime.now()}] _generate_response_internal: Starting LLM call. API Key starts with: {api_key[:5]}...") |
| if not email_text: |
| print(f"[{datetime.now()}] _generate_response_internal: Email text is empty.") |
| return "Cannot generate reply for empty email text." |
| |
| try: |
| llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.7, max_tokens=800, groq_api_key=api_key) |
| prompt_template_str=""" |
| You are an assistant that helps reply to emails. |
| |
| Create a response to the following email with the following parameters: |
| - Language: {language} |
| - Length: {length} |
| - Style: {style} |
| - Tone: {tone} |
| - Emoji usage: {emoji} |
| |
| Email: |
| {email} |
| |
| Write only the reply body. Do not repeat the email or mention any instruction. |
| """ |
| prompt = PromptTemplate( |
| input_variables=["email", "language", "length", "style", "tone", "emoji"], |
| template=prompt_template_str |
| ) |
| chain = prompt | llm |
| print(f"[{datetime.now()}] _generate_response_internal: Invoking LLM chain...") |
| output = chain.invoke({"email": email_text, "language": language, "length": length, "style": style, "tone": tone, "emoji": emoji}) |
| print(f"[{datetime.now()}] _generate_response_internal: LLM chain returned. Content length: {len(output.content)}.") |
| return output.content.strip() |
| except Exception as e: |
| print(f"[{datetime.now()}] _generate_response_internal: ERROR during LLM invocation: {e}") |
| traceback.print_exc() |
| raise |
|
|
| |
| MAX_BATCH_SIZE = 20 |
| BATCH_TIMEOUT = 0.5 |
|
|
| reply_request_queue: List[Tuple[GenerateReplyRequest, asyncio.Future, float]] = [] |
| reply_queue_lock = asyncio.Lock() |
| reply_queue_condition = asyncio.Condition(lock=reply_queue_lock) |
| batch_processor_task: Optional[asyncio.Task] = None |
|
|
|
|
| |
| async def handle_single_reply_request(request_data: GenerateReplyRequest, future: asyncio.Future): |
| """Handles a single request: checks cache, calls LLM, stores result, and sets future.""" |
| print(f"[{datetime.now()}] Handle single reply: Starting for email_text_start='{request_data.email_text[:50]}'...") |
| if future.cancelled(): |
| print(f"[{datetime.now()}] Handle single reply: Future cancelled. Aborting.") |
| return |
| try: |
| if generated_replies_collection is None: |
| print(f"[{datetime.now()}] Handle single reply: DB collection 'generated_replies_collection' is None.") |
| if not future.done(): |
| future.set_exception(HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database service not available for caching/storage.")) |
| return |
|
|
| cache_query = { |
| "original_email_text": request_data.email_text, |
| "language": request_data.language, |
| "length": request_data.length, |
| "style": request_data.style, |
| "tone": request_data.tone, |
| "emoji": request_data.emoji, |
| } |
| print(f"[{datetime.now()}] Handle single reply: Checking cache for reply...") |
| |
| cached_reply_doc = await asyncio.to_thread(generated_replies_collection.find_one, cache_query) |
|
|
| if cached_reply_doc: |
| print(f"[{datetime.now()}] Handle single reply: Reply found in cache. ID: {str(cached_reply_doc['_id'])}") |
| response = { |
| "reply": cached_reply_doc["generated_reply_text"], |
| "stored_id": str(cached_reply_doc["_id"]), |
| "cached": True |
| } |
| if not future.done(): |
| future.set_result(response) |
| print(f"[{datetime.now()}] Handle single reply: Cache result set on future.") |
| return |
|
|
| print(f"[{datetime.now()}] Handle single reply: Reply not in cache. Calling LLM...") |
| reply_content = await asyncio.to_thread( |
| _generate_response_internal, |
| request_data.email_text, |
| request_data.groq_api_key, |
| request_data.language, |
| request_data.length, |
| request_data.style, |
| request_data.tone, |
| request_data.emoji |
| ) |
| print(f"[{datetime.now()}] Handle single reply: LLM call completed. Reply length: {len(reply_content)}.") |
|
|
| reply_data_to_store = GeneratedReplyData( |
| original_email_text=request_data.email_text, |
| generated_reply_text=reply_content, |
| language=request_data.language, |
| length=request_data.length, |
| style=request_data.style, |
| tone=request_data.tone, |
| emoji=request_data.emoji |
| ) |
| print(f"[{datetime.now()}] Handle single reply: Storing reply in DB...") |
| |
| reply_data_dict = reply_data_to_store.model_dump(by_alias=True, exclude_none=True, exclude={'id'}) |
|
|
| insert_result = await asyncio.to_thread(generated_replies_collection.insert_one, reply_data_dict) |
| stored_id = str(insert_result.inserted_id) |
| print(f"[{datetime.now()}] Handle single reply: Reply stored in DB. ID: {stored_id}") |
|
|
| final_response = { |
| "reply": reply_content, |
| "stored_id": stored_id, |
| "cached": False |
| } |
| if not future.done(): |
| future.set_result(final_response) |
| print(f"[{datetime.now()}] Handle single reply: Final result set on future.") |
|
|
| except Exception as e: |
| print(f"[{datetime.now()}] Handle single reply: EXCEPTION: {e}") |
| traceback.print_exc() |
| if not future.done(): |
| |
| future.set_exception(HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to generate reply: {e}")) |
| print(f"[{datetime.now()}] Handle single reply: Exception set on future.") |
|
|
|
|
| async def process_reply_batches(): |
| """Continuously processes requests from the reply_request_queue in batches.""" |
| global reply_request_queue |
| print(f"[{datetime.now()}] Batch processor task started.") |
| while True: |
| batch_to_fire: List[Tuple[GenerateReplyRequest, asyncio.Future]] = [] |
| async with reply_queue_condition: |
| if not reply_request_queue: |
| print(f"[{datetime.now()}] Batch processor: Queue empty, waiting for requests...") |
| |
| await reply_queue_condition.wait() |
| |
| if not reply_request_queue: |
| print(f"[{datetime.now()}] Batch processor: Woke up, queue still empty. Continuing loop.") |
| continue |
|
|
| now = asyncio.get_event_loop().time() |
| |
| if reply_request_queue: |
| oldest_item_timestamp = reply_request_queue[0][2] |
| else: |
| |
| print(f"[{datetime.now()}] Batch processor: Queue became empty before processing. Restarting loop.") |
| continue |
|
|
| print(f"[{datetime.now()}] Batch processor: Woke up. Queue size: {len(reply_request_queue)}. Oldest item age: {now - oldest_item_timestamp:.2f}s") |
|
|
| |
| if len(reply_request_queue) >= MAX_BATCH_SIZE or \ |
| (now - oldest_item_timestamp >= BATCH_TIMEOUT): |
| num_to_take = min(len(reply_request_queue), MAX_BATCH_SIZE) |
| for _ in range(num_to_take): |
| |
| if reply_request_queue: |
| req, fut, _ = reply_request_queue.pop(0) |
| batch_to_fire.append((req, fut)) |
| print(f"[{datetime.now()}] Batch processor: Firing batch of {len(batch_to_fire)} requests.") |
| else: |
| |
| time_to_wait = BATCH_TIMEOUT - (now - oldest_item_timestamp) |
| print(f"[{datetime.now()}] Batch processor: Not enough requests or timeout not reached. Waiting for {time_to_wait:.2f}s.") |
| try: |
| await asyncio.wait_for(reply_queue_condition.wait(), timeout=time_to_wait) |
| except asyncio.TimeoutError: |
| print(f"[{datetime.now()}] Batch processor: wait timed out.") |
| pass |
|
|
| if batch_to_fire: |
| tasks = [handle_single_reply_request(req_data, fut) for req_data, fut in batch_to_fire] |
| print(f"[{datetime.now()}] Batch processor: Awaiting completion of {len(tasks)} single reply tasks.") |
| await asyncio.gather(*tasks) |
| print(f"[{datetime.now()}] Batch processor: Batch processing complete.") |
| else: |
| |
| await asyncio.sleep(0.001) |
|
|
|
|
| |
| app = FastAPI( |
| title="Email Assistant API", |
| description="API for extracting structured data from emails and generating intelligent replies using Groq LLMs, with MongoDB integration, dynamic date handling, batching, and caching.", |
| version="1.1.0", |
| docs_url="/", |
| redoc_url="/redoc" |
| ) |
|
|
| |
| |
| @app.exception_handler(StarletteHTTPException) |
| async def custom_http_exception_handler_wrapper(request, exc): |
| """Handles FastAPI's internal HTTP exceptions.""" |
| print(f"[{datetime.now()}] Caught StarletteHTTPException: {exc.status_code} - {exc.detail}") |
| return await http_exception_handler(request, exc) |
|
|
| |
| @app.exception_handler(Exception) |
| async def global_exception_handler_wrapper(request, exc): |
| """Handles all unhandled exceptions and returns a consistent JSON error response.""" |
| print(f"[{datetime.now()}] Unhandled exception caught by global handler for request: {request.url}") |
| traceback.print_exc() |
| |
| return Response( |
| content=json.dumps({"detail": f"Internal Server Error: {str(exc)}", "type": "unhandled_exception"}), |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| media_type="application/json" |
| ) |
|
|
|
|
| |
| @app.on_event("startup") |
| async def startup_event(): |
| global client, db, extracted_emails_collection, generated_replies_collection, batch_processor_task |
| print(f"[{datetime.now()}] FastAPI app startup sequence initiated.") |
| try: |
| |
| client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) |
| client.admin.command('ping') |
| db = client[DB_NAME] |
| extracted_emails_collection = db[EXTRACTED_EMAILS_COLLECTION] |
| generated_replies_collection = db[GENERATED_REPLIES_COLLECTION] |
| print(f"[{datetime.now()}] Successfully connected to MongoDB: {DB_NAME}") |
|
|
| |
| if batch_processor_task is None or batch_processor_task.done(): |
| batch_processor_task = asyncio.create_task(process_reply_batches()) |
| print(f"[{datetime.now()}] Batch processor task for replies started.") |
| else: |
| print(f"[{datetime.now()}] Batch processor task for replies is already running or being initialized.") |
|
|
| except (ConnectionFailure, OperationFailure) as e: |
| print(f"[{datetime.now()}] ERROR: MongoDB Connection/Operation Failure: {e}") |
| client = None |
| db = None |
| extracted_emails_collection = None |
| generated_replies_collection = None |
| except Exception as e: |
| print(f"[{datetime.now()}] ERROR: An unexpected error occurred during MongoDB connection or batch startup: {e}") |
| traceback.print_exc() |
| client = None |
| db = None |
| extracted_emails_collection = None |
| generated_replies_collection = None |
| finally: |
| if client is not None and db is not None: |
| try: |
| client.admin.command('ping') |
| except Exception as e: |
| print(f"[{datetime.now()}] MongoDB ping failed after initial connection attempt during finally block: {e}") |
| client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None |
| else: |
| print(f"[{datetime.now()}] MongoDB client or db object is None after connection attempt in startup. Database likely not connected.") |
| if client is None or db is None: |
| client = None; db = None; extracted_emails_collection = None; generated_replies_collection = None |
| print(f"[{datetime.now()}] FastAPI app startup sequence completed for MongoDB client & Batch Processor initialization.") |
|
|
|
|
| @app.on_event("shutdown") |
| async def shutdown_event(): |
| global client, batch_processor_task |
| print(f"[{datetime.now()}] FastAPI app shutting down.") |
| if batch_processor_task: |
| batch_processor_task.cancel() |
| try: |
| await batch_processor_task |
| print(f"[{datetime.now()}] Batch processor task awaited.") |
| except asyncio.CancelledError: |
| print(f"[{datetime.now()}] Batch processor task for replies cancelled during shutdown.") |
| except Exception as e: |
| print(f"[{datetime.now()}] Error during batch processor task shutdown: {e}") |
| traceback.print_exc() |
| batch_processor_task = None |
|
|
| if client: |
| client.close() |
| print(f"[{datetime.now()}] MongoDB client closed.") |
|
|
|
|
| |
| @app.get("/health", summary="Health Check") |
| async def health_check(): |
| """ |
| Checks the health of the API, including MongoDB connection and batch processor status. |
| """ |
| db_status = "MongoDB not connected." |
| db_ok = False |
| if client is not None and db is not None: |
| try: |
| |
| await asyncio.to_thread(db.list_collection_names) |
| db_status = "MongoDB connection OK." |
| db_ok = True |
| except Exception as e: |
| db_status = f"MongoDB connection error: {e}" |
| db_ok = False |
|
|
| batch_processor_status = "Batch processor not running." |
| if batch_processor_task is not None: |
| if not batch_processor_task.done(): |
| batch_processor_status = "Batch processor is running." |
| else: |
| if batch_processor_task.exception(): |
| batch_processor_status = f"Batch processor task ended with exception: {batch_processor_task.exception()}" |
| else: |
| batch_processor_status = "Batch processor task is done (may have completed or cancelled)." |
| else: |
| batch_processor_status = "Batch processor task has not been initialized." |
|
|
| if db_ok: |
| return {"status": "ok", "message": "Email Assistant API is up.", "database": db_status, "batch_processor": batch_processor_status} |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, |
| detail={"message": "Service unavailable due to issues.", "database": db_status, "batch_processor": batch_processor_status} |
| ) |
|
|
|
|
| @app.post("/extract-data", response_model=ExtractedData, summary="Extract structured data from an email and store in MongoDB") |
| async def extract_email_data(request: ProcessEmailRequest): |
| """ |
| Receives an email, extracts contacts, appointments, and tasks using an LLM, |
| and stores the extracted data in MongoDB. |
| """ |
| print(f"[{datetime.now()}] /extract-data: Received request.") |
| if extracted_emails_collection is None: |
| print(f"[{datetime.now()}] /extract-data: MongoDB collection is None.") |
| raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for extracted email storage. Check server startup logs.") |
| try: |
| current_date_val = date.today() |
| print(f"[{datetime.now()}] /extract-data: Calling internal processing function.") |
| extracted_data = await asyncio.to_thread( |
| _process_email_internal, request.email_text, request.groq_api_key, current_date_val |
| ) |
| print(f"[{datetime.now()}] /extract-data: Internal processing complete. Preparing for DB insert.") |
|
|
| extracted_data_dict = extracted_data.model_dump(by_alias=True, exclude_none=True) |
| |
| |
| |
| if 'appointments' in extracted_data_dict: |
| for appt in extracted_data_dict['appointments']: |
| if isinstance(appt.get('start_date'), date): |
| appt['start_date'] = datetime.combine(appt['start_date'], datetime.min.time()) |
| if isinstance(appt.get('end_date'), date) and appt.get('end_date') is not None: |
| appt['end_date'] = datetime.combine(appt['end_date'], datetime.min.time()) |
| if 'tasks' in extracted_data_dict: |
| for task_item in extracted_data_dict['tasks']: |
| if isinstance(task_item.get('due_date'), date): |
| task_item['due_date'] = datetime.combine(task_item['due_date'], datetime.min.time()) |
|
|
| print(f"[{datetime.now()}] /extract-data: Inserting into MongoDB...") |
| result = await asyncio.to_thread(extracted_emails_collection.insert_one, extracted_data_dict) |
| print(f"[{datetime.now()}] /extract-data: Data inserted into MongoDB. ID: {result.inserted_id}") |
|
|
| extracted_data.id = result.inserted_id |
| return extracted_data |
| except ValueError as e: |
| print(f"[{datetime.now()}] /extract-data: ValueError: {e}") |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) |
| except Exception as e: |
| print(f"[{datetime.now()}] /extract-data: Unhandled Exception: {e}") |
| traceback.print_exc() |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during data extraction: {e}") |
|
|
|
|
| @app.post("/extract-data-excel", summary="Extract structured data and download as Excel (also stores in MongoDB)") |
| async def extract_email_data_excel(request: ProcessEmailRequest): |
| """ |
| Placeholder for future functionality to extract data and provide as an Excel download. |
| Currently disabled. |
| """ |
| raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Excel functionality is currently disabled.") |
|
|
|
|
| @app.post("/generate-reply", response_model=GenerateReplyResponse, summary="Generate a smart reply to an email (batched & cached)") |
| async def generate_email_reply(request: GenerateReplyRequest): |
| """ |
| Generates an intelligent email reply based on specified parameters (language, length, style, tone, emoji). |
| Uses a batch processing system with caching for efficiency. |
| """ |
| print(f"[{datetime.now()}] /generate-reply: Received request.") |
| if generated_replies_collection is None or batch_processor_task is None or reply_queue_condition is None: |
| print(f"[{datetime.now()}] /generate-reply: Service not initialized. gen_replies_coll={generated_replies_collection is not None}, batch_task={batch_processor_task is not None}, queue_cond={reply_queue_condition is not None}") |
| raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Reply generation service not fully initialized. Check server logs for database or batch processor issues.") |
|
|
| future = asyncio.Future() |
| current_time = asyncio.get_event_loop().time() |
|
|
| async with reply_queue_condition: |
| reply_request_queue.append((request, future, current_time)) |
| reply_queue_condition.notify() |
| print(f"[{datetime.now()}] /generate-reply: Request added to queue, notifying batch processor. Queue size: {len(reply_request_queue)}") |
|
|
| try: |
| |
| client_timeout = BATCH_TIMEOUT + 60.0 |
| print(f"[{datetime.now()}] /generate-reply: Waiting for future result with timeout {client_timeout}s.") |
| result = await asyncio.wait_for(future, timeout=client_timeout) |
| print(f"[{datetime.now()}] /generate-reply: Future result received. Returning data.") |
| return result |
| except asyncio.TimeoutError: |
| print(f"[{datetime.now()}] /generate-reply: Client timeout waiting for future after {client_timeout}s. Future done: {future.done()}") |
| if not future.done(): |
| future.cancel() |
| raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Request timed out after {client_timeout}s waiting for batch processing. The LLM might be busy or the request queue too long. Check server logs for more details.") |
| except Exception as e: |
| if isinstance(e, HTTPException): |
| print(f"[{datetime.now()}] /generate-reply: Caught HTTPException: {e.status_code} - {e.detail}") |
| raise e |
| print(f"[{datetime.now()}] /generate-reply: Unhandled Exception: {e}") |
| traceback.print_exc() |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error processing your reply request: {str(e)}. Check server logs for more details.") |
|
|
|
|
| @app.get("/query-extracted-emails", response_model=List[ExtractedData], summary="Query extracted emails from MongoDB") |
| async def query_extracted_emails_endpoint(query_params: ExtractedEmailQuery = Depends()): |
| print(f"[{datetime.now()}] /query-extracted-emails: Received request with params: {query_params.model_dump_json()}") |
| if extracted_emails_collection is None: |
| print(f"[{datetime.now()}] /query-extracted-emails: MongoDB collection is None.") |
| raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying extracted emails.") |
| mongo_query: Dict[str, Any] = {} |
| if query_params.contact_name: |
| mongo_query["contacts.name"] = {"$regex": query_params.contact_name, "$options": "i"} |
| if query_params.appointment_title: |
| mongo_query["appointments.title"] = {"$regex": query_params.appointment_title, "$options": "i"} |
| if query_params.task_title: |
| mongo_query["tasks.task_title"] = {"$regex": query_params.task_title, "$options": "i"} |
|
|
| if query_params.from_date or query_params.to_date: |
| date_query: Dict[str, datetime] = {} |
| if query_params.from_date: |
| date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time()) |
| if query_params.to_date: |
| |
| date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time()) |
| if date_query : |
| mongo_query["processed_at"] = date_query |
| print(f"[{datetime.now()}] /query-extracted-emails: MongoDB query built: {mongo_query}") |
|
|
| try: |
| |
| cursor = extracted_emails_collection.find(mongo_query).sort("processed_at", -1).limit(query_params.limit) |
| extracted_docs_raw = await asyncio.to_thread(list, cursor) |
| print(f"[{datetime.now()}] /query-extracted-emails: Found {len(extracted_docs_raw)} documents.") |
|
|
| results = [] |
| for doc_raw in extracted_docs_raw: |
| |
| if 'appointments' in doc_raw: |
| for appt in doc_raw['appointments']: |
| if isinstance(appt.get('start_date'), datetime): appt['start_date'] = appt['start_date'].date() |
| if isinstance(appt.get('end_date'), datetime): appt['end_date'] = appt['end_date'].date() |
| if 'tasks' in doc_raw: |
| for task_item in doc_raw['tasks']: |
| if isinstance(task_item.get('due_date'), datetime): task_item['due_date'] = task_item['due_date'].date() |
| results.append(ExtractedData(**doc_raw)) |
| print(f"[{datetime.now()}] /query-extracted-emails: Returning {len(results)} results.") |
| return results |
| except Exception as e: |
| print(f"[{datetime.now()}] /query-extracted-emails: Unhandled Exception during query: {e}") |
| traceback.print_exc() |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying extracted emails: {e}") |
|
|
|
|
| @app.get("/query-generated-replies", response_model=List[GeneratedReplyData], summary="Query generated replies from MongoDB") |
| async def query_generated_replies_endpoint(query_params: GeneratedReplyQuery = Depends()): |
| print(f"[{datetime.now()}] /query-generated-replies: Received request with params: {query_params.model_dump_json()}") |
| if generated_replies_collection is None: |
| print(f"[{datetime.now()}] /query-generated-replies: MongoDB collection is None.") |
| raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="MongoDB not available for querying generated replies.") |
| mongo_query: Dict[str, Any] = {} |
| if query_params.language: mongo_query["language"] = query_params.language |
| if query_params.style: mongo_query["style"] = query_params.style |
| if query_params.tone: mongo_query["tone"] = query_params.tone |
|
|
| if query_params.from_date or query_params.to_date: |
| date_query: Dict[str, datetime] = {} |
| if query_params.from_date: |
| date_query["$gte"] = datetime.combine(query_params.from_date, datetime.min.time()) |
| if query_params.to_date: |
| date_query["$lt"] = datetime.combine(query_params.to_date + timedelta(days=1), datetime.min.time()) |
| if date_query: |
| mongo_query["generated_at"] = date_query |
| print(f"[{datetime.now()}] /query-generated-replies: MongoDB query built: {mongo_query}") |
|
|
| try: |
| |
| cursor = generated_replies_collection.find(mongo_query).sort("generated_at", -1).limit(query_params.limit) |
| generated_docs_raw = await asyncio.to_thread(list, cursor) |
| print(f"[{datetime.now()}] /query-generated-replies: Found {len(generated_docs_raw)} documents.") |
| results = [] |
| for doc_raw in generated_docs_raw: |
| results.append(GeneratedReplyData(**doc_raw)) |
| print(f"[{datetime.now()}] /query-generated-replies: Returning {len(results)} results.") |
| return results |
| except Exception as e: |
| print(f"[{datetime.now()}] /query-generated-replies: Unhandled Exception during query: {e}") |
| traceback.print_exc() |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error querying generated replies: {e}") |