Spaces:
Runtime error
Runtime error
File size: 6,680 Bytes
4d860a2 57d78b2 4d860a2 d9012b6 4d860a2 57d78b2 aa34f6b 4d860a2 aa34f6b 57d78b2 4d860a2 aa34f6b 4d860a2 aa34f6b 4d860a2 57d78b2 4d860a2 44c6641 4d860a2 57d78b2 4d860a2 d9012b6 4d860a2 d9012b6 4d860a2 57d78b2 4d860a2 57d78b2 4d860a2 57d78b2 4d860a2 57d78b2 4d860a2 57d78b2 4d860a2 070a87d 57d78b2 070a87d 4d860a2 57d78b2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 | from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr
from typing import Optional, Dict, Any, TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import add_messages
from langgraph.types import Command
import uuid
import logging
from app.graph import graph
from app.state.state import EmailAgentState
from app.database.connection import get_session
from app.database.utils import get_or_create_user
from sqlalchemy.orm import Session
from app.database.connection import SessionLocal
logger = logging.getLogger(__name__)
def get_session():
db = SessionLocal()
try:
yield db
finally:
db.close()
app = FastAPI(title="AI Email Agent API")
# --- Schemas ---
class EmailProcessRequest(BaseModel):
thread_id: str
user_email: EmailStr
sender_email_id: EmailStr
sender_subject: str
sender_email_body: str
class ReviewActionRequest(BaseModel):
thread_id: str
user_id: str
status: str # "approved" or "rejected"
feedback: Optional[str] = None
class SendEmailRequest(BaseModel):
thread_id: str
user_id: str
human_message: str
# --- Helper Functions ---
def parse_interrupt(final_state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Parse interrupt from graph state."""
if "__interrupt__" not in final_state:
return None
interrupt_state = final_state.get("__interrupt__")
if not interrupt_state:
return None
interrupt = interrupt_state[0]
value = getattr(interrupt, "value", {}) or {}
return {
"action": value.get("action"),
"data": value.get("data", {})
}
# --- Endpoints ---
@app.post("/process-email")
def process_email(request: EmailProcessRequest, db: Session = Depends(get_session)) -> Dict[str, Any]:
"""Process email through the graph pipeline."""
try:
user = get_or_create_user(db, request.user_email)
thread_id = request.thread_id
config = {
"configurable": {
"thread_id": thread_id,
"user_id": str(user.id)
}
}
input_data = {
"user_email_id": request.user_email,
"user_id": user.id,
"user_name": "Atharva",
"sender_email_id": request.sender_email_id,
"sender_subject": request.sender_subject,
"sender_email_body": request.sender_email_body,
}
final_state = graph.invoke(input_data, config=config)
if final_state.get('triage_label') == "FOLLOW_UP_REQUIRED":
if "__interrupt__" in final_state and not final_state.get("draft_id"):
parsed_interrupt = parse_interrupt(final_state)
if parsed_interrupt:
data = parsed_interrupt["data"]
return {
"status": "needs_review",
"thread_id": thread_id,
"messages": final_state.get("messages", []),
"triage_label": final_state.get("triage_label"),
"action": parsed_interrupt["action"],
"email_draft": {
"to": data.get("to"),
"subject": data.get("subject"),
"body": data.get("body"),
}
}
return {
"thread_id": thread_id,
"triage_label": final_state.get("triage_label"),
}
except Exception as e:
logger.error(f"Error processing email: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/review-action")
def review_action(request: ReviewActionRequest) -> Dict[str, Any]:
"""Resume graph execution based on user review."""
try:
config = {
"configurable": {
"thread_id": request.thread_id,
"user_id": request.user_id
}
}
if request.status == "rejected":
payload = Command(resume={
"status": "rejected",
"feedback": request.feedback
})
elif request.status == "approved":
payload = Command(resume={
"status": "approved"
})
else:
raise HTTPException(status_code=400, detail="Invalid status")
intermediate_state = graph.invoke(payload, config=config)
# Still in review phase
if "__interrupt__" in intermediate_state and not intermediate_state.get("draft_id"):
parsed_interrupt = parse_interrupt(intermediate_state)
if parsed_interrupt:
data = parsed_interrupt["data"]
return {
"status": "needs_review",
"thread_id": request.thread_id,
"triage_label": intermediate_state.get("triage_label"),
"action": parsed_interrupt["action"],
"email_draft": {
"to": data.get("to"),
"subject": data.get("subject"),
"body": data.get("body"),
}
}
# Draft created, review complete
if intermediate_state.get("draft_id"):
return {
"thread_id": request.thread_id,
"draft_id": intermediate_state["draft_id"],
"messages": intermediate_state.get("messages", []),
"reply_subject": intermediate_state.get("reply_subject"),
"reply_email_body": intermediate_state.get("reply_email_body"),
}
except Exception as e:
logger.error(f"Error in review action: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/send_email")
def send_email(request: SendEmailRequest) -> Dict[str, Any]:
config = {
"configurable": {
"thread_id": request.thread_id,
"user_id": request.user_id
}
}
graph.update_state(
config,
{"messages": [HumanMessage(content=request.human_message)]},
as_node="prepare_context_node"
)
final_state = graph.invoke(None, config=config)
return {
"thread_id": request.thread_id,
"messages": final_state.get("messages", []),
"sent_message_id": final_state.get("sent_message_id")
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000) |