Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| FastAPI Routes for Email Query System | |
| """ | |
| from fastapi import APIRouter, HTTPException | |
| from pydantic import BaseModel, EmailStr | |
| from typing import List, Dict, Optional | |
| import json | |
| # Import our modules | |
| from query_parser import parse_email_query, store_name_email_mapping | |
| from email_scraper import scrape_emails_from_sender | |
| router = APIRouter() | |
| # Pydantic models | |
| class NaturalQuery(BaseModel): | |
| query: str | |
| class EmailMappingInput(BaseModel): | |
| name: str | |
| email: EmailStr | |
| class EmailResponse(BaseModel): | |
| date: str | |
| time: str | |
| subject: str | |
| content: str | |
| message_id: str | |
| class QueryParseResponse(BaseModel): | |
| status: str | |
| sender_intent: Optional[str] = None | |
| resolved_email: Optional[str] = None | |
| start_date: Optional[str] = None | |
| end_date: Optional[str] = None | |
| message: str | |
| error: Optional[str] = None | |
| class EmailsResponse(BaseModel): | |
| status: str | |
| sender_intent: str | |
| resolved_email: str | |
| start_date: str | |
| end_date: str | |
| total_emails: int | |
| emails: List[EmailResponse] | |
| message: str | |
| def parse_email_query_endpoint(input_data: NaturalQuery): | |
| """ | |
| Parse natural language query to extract intent and dates | |
| """ | |
| try: | |
| result = parse_email_query(input_data.query) | |
| return QueryParseResponse(**result) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Query parsing failed: {str(e)}") | |
| def add_email_mapping(mapping: EmailMappingInput): | |
| """ | |
| Add new name to email mapping | |
| """ | |
| try: | |
| store_name_email_mapping(mapping.name, mapping.email) | |
| return { | |
| "status": "success", | |
| "message": f"Mapping added: '{mapping.name}' → '{mapping.email}'" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Failed to add mapping: {str(e)}") | |
| def get_emails_from_query(input_data: NaturalQuery): | |
| """ | |
| Complete flow: Parse query → Resolve email → Scrape emails | |
| """ | |
| try: | |
| # Step 1: Parse the query | |
| parsed_result = parse_email_query(input_data.query) | |
| if parsed_result["status"] == "need_email_input": | |
| raise HTTPException( | |
| status_code=400, | |
| detail={ | |
| "type": "need_email_input", | |
| "sender_intent": parsed_result["sender_intent"], | |
| "message": parsed_result["message"] | |
| } | |
| ) | |
| elif parsed_result["status"] == "error": | |
| raise HTTPException(status_code=400, detail=parsed_result["message"]) | |
| # Step 2: Scrape emails | |
| emails = scrape_emails_from_sender( | |
| parsed_result["resolved_email"], | |
| parsed_result["start_date"], | |
| parsed_result["end_date"] | |
| ) | |
| # Step 3: Format response | |
| email_responses = [ | |
| EmailResponse( | |
| date=email["date"], | |
| time=email["time"], | |
| subject=email["subject"], | |
| content=email["content"], | |
| message_id=email["message_id"] | |
| ) | |
| for email in emails | |
| ] | |
| return EmailsResponse( | |
| status="success", | |
| sender_intent=parsed_result["sender_intent"], | |
| resolved_email=parsed_result["resolved_email"], | |
| start_date=parsed_result["start_date"], | |
| end_date=parsed_result["end_date"], | |
| total_emails=len(emails), | |
| emails=email_responses, | |
| message=f"Found {len(emails)} emails from {parsed_result['resolved_email']}" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Email retrieval failed: {str(e)}") | |
| def view_name_mappings(): | |
| """ | |
| View all stored name to email mappings | |
| """ | |
| try: | |
| from query_parser import _load_name_mapping | |
| mappings = _load_name_mapping() | |
| return { | |
| "status": "success", | |
| "total_mappings": len(mappings), | |
| "mappings": mappings | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to load mappings: {str(e)}") | |
| def health_check(): | |
| """ | |
| Health check endpoint | |
| """ | |
| return { | |
| "status": "healthy", | |
| "message": "Email query system is running" | |
| } | |
| # For testing - manual endpoint to add mapping and then query | |
| def complete_email_flow(input_data: dict): | |
| """ | |
| Test endpoint for complete flow with optional mapping | |
| Expected input: | |
| { | |
| "query": "emails from john last week", | |
| "mapping": {"name": "john", "email": "john@example.com"} # optional | |
| } | |
| """ | |
| try: | |
| query = input_data.get("query") | |
| mapping = input_data.get("mapping") | |
| if not query: | |
| raise HTTPException(status_code=400, detail="Query is required") | |
| # Add mapping if provided | |
| if mapping: | |
| store_name_email_mapping(mapping["name"], mapping["email"]) | |
| # Parse and get emails | |
| parsed_result = parse_email_query(query) | |
| if parsed_result["status"] == "need_email_input": | |
| return { | |
| "status": "need_mapping", | |
| "message": parsed_result["message"], | |
| "sender_intent": parsed_result["sender_intent"] | |
| } | |
| # Get emails | |
| emails = scrape_emails_from_sender( | |
| parsed_result["resolved_email"], | |
| parsed_result["start_date"], | |
| parsed_result["end_date"] | |
| ) | |
| return { | |
| "status": "success", | |
| "query": query, | |
| "parsed": parsed_result, | |
| "total_emails": len(emails), | |
| "emails": emails[:5] # Return first 5 emails | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) |