Gaykar commited on
Commit
21f1bdf
·
1 Parent(s): b99e403

made chnages in the nodes and prompts

Browse files
app/core/config.py CHANGED
@@ -7,22 +7,22 @@ class Settings(BaseSettings):
7
  PROJECT_NAME: str = "Email Assistant Project"
8
 
9
  GROQ_API_KEY: str
10
- PINECONE_API_KEY: str
11
 
12
- CLOUDINARY_CLOUD_NAME: str
13
- CLOUDINARY_API_KEY: str
14
- CLOUDINARY_API_SECRET: str
15
 
16
 
17
- DB_URL_FOR_CHECKPOINTER_STORE: str
18
 
19
- DB_URL_FOR_SQL_AL:str
20
 
21
- model_config = SettingsConfigDict(
22
- env_file=str(BASE_DIR / ".env"),
23
- env_file_encoding="utf-8",
24
- extra="ignore"
25
- )
26
 
27
  settings = Settings()
28
 
 
7
  PROJECT_NAME: str = "Email Assistant Project"
8
 
9
  GROQ_API_KEY: str
10
+ # PINECONE_API_KEY: str
11
 
12
+ # CLOUDINARY_CLOUD_NAME: str
13
+ # CLOUDINARY_API_KEY: str
14
+ # CLOUDINARY_API_SECRET: str
15
 
16
 
17
+ # DB_URL_FOR_CHECKPOINTER_STORE: str
18
 
19
+ # DB_URL_FOR_SQL_AL:str
20
 
21
+ # model_config = SettingsConfigDict(
22
+ # env_file=str(BASE_DIR / ".env"),
23
+ # env_file_encoding="utf-8",
24
+ # extra="ignore"
25
+ # )
26
 
27
  settings = Settings()
28
 
app/graph.py CHANGED
@@ -11,25 +11,38 @@ from app.nodes.parse_node import parse_response_node
11
  from app.nodes.context_node import prepare_context_node
12
  from app.nodes.store_memory_data_node import store_memory_and_data_node
13
  from app.nodes.unsafe_email_node import unsafe_emails_node
 
14
  from langgraph.types import RetryPolicy
15
  from psycopg import OperationalError # Or sqlalchemy.exc.OperationalError depending on your driver
 
 
 
16
 
17
  # Define a standard retry policy for database-heavy nodes
18
  db_retry_policy = RetryPolicy(
19
  retry_on=OperationalError,
20
- max_attempts=3,
21
- backoff_factor=2 # Waits 2s, then 4s, then 8s between retries
 
 
 
 
 
 
 
22
  )
23
 
 
24
  builder = StateGraph(EmailAgentState)
25
 
26
  # Nodes
27
  builder.add_node("safety_check_node", safety_classifier_node)
 
28
  builder.add_node("triage_node", triage_node)
29
  builder.add_node("prepare_context_node", prepare_context_node)
30
  builder.add_node("email_writing_agent", email_writing_agent_node)
31
 
32
- # --- APPLY RETRY POLICIES HERE ---
33
  builder.add_node(
34
  "store_memory_and_data_node",
35
  store_memory_and_data_node,
@@ -41,9 +54,9 @@ builder.add_node(
41
  retry=db_retry_policy
42
  )
43
 
44
- builder.add_node("archive_node", archive_node)
45
  builder.add_node("parse_node", parse_response_node)
46
- builder.add_node("tools", ToolNode(email_writing_agent_tools))
47
 
48
  # Edges (Same as your original logic)
49
  builder.add_edge(START, "safety_check_node")
@@ -54,12 +67,21 @@ builder.add_conditional_edges("safety_check_node", after_safety, {
54
  })
55
 
56
  builder.add_conditional_edges("triage_node", route_after_triage, {
57
- "prepare_context_node": "prepare_context_node",
58
  "archive_node": "archive_node",
59
  })
60
 
 
 
 
 
 
 
 
 
61
  builder.add_edge("prepare_context_node", "email_writing_agent")
62
 
 
63
  builder.add_conditional_edges(
64
  "email_writing_agent",
65
  tools_condition,
@@ -81,4 +103,10 @@ builder.add_conditional_edges(
81
  builder.add_edge("parse_node", "store_memory_and_data_node")
82
  builder.add_edge("store_memory_and_data_node", END)
83
  builder.add_edge("unsafe_emails_node", END)
84
- builder.add_edge("archive_node", END)
 
 
 
 
 
 
 
11
  from app.nodes.context_node import prepare_context_node
12
  from app.nodes.store_memory_data_node import store_memory_and_data_node
13
  from app.nodes.unsafe_email_node import unsafe_emails_node
14
+ from app.nodes.check_email_exist_node import *
15
  from langgraph.types import RetryPolicy
16
  from psycopg import OperationalError # Or sqlalchemy.exc.OperationalError depending on your driver
17
+ # imoprt display
18
+ from IPython.display import Image, display
19
+
20
 
21
  # Define a standard retry policy for database-heavy nodes
22
  db_retry_policy = RetryPolicy(
23
  retry_on=OperationalError,
24
+ max_attempts=4,
25
+ backoff_factor=1 # Waits 2s, then 4s, then 8s between retries
26
+ )
27
+
28
+ tool_node_retry_policy = RetryPolicy(
29
+ max_attempts=4,
30
+ initial_interval=1.0,
31
+ backoff_factor=2.0,
32
+ jitter=True
33
  )
34
 
35
+
36
  builder = StateGraph(EmailAgentState)
37
 
38
  # Nodes
39
  builder.add_node("safety_check_node", safety_classifier_node)
40
+ builder.add_node("check_previous_email_exist_node", check_previous_email_exist_node)
41
  builder.add_node("triage_node", triage_node)
42
  builder.add_node("prepare_context_node", prepare_context_node)
43
  builder.add_node("email_writing_agent", email_writing_agent_node)
44
 
45
+
46
  builder.add_node(
47
  "store_memory_and_data_node",
48
  store_memory_and_data_node,
 
54
  retry=db_retry_policy
55
  )
56
 
57
+ builder.add_node("archive_node", archive_node,retry=db_retry_policy)
58
  builder.add_node("parse_node", parse_response_node)
59
+ builder.add_node("tools", ToolNode(email_writing_agent_tools), retry_policy=tool_node_retry_policy)
60
 
61
  # Edges (Same as your original logic)
62
  builder.add_edge(START, "safety_check_node")
 
67
  })
68
 
69
  builder.add_conditional_edges("triage_node", route_after_triage, {
70
+ "check_previous_email_exist_node": "check_previous_email_exist_node",
71
  "archive_node": "archive_node",
72
  })
73
 
74
+ builder.add_conditional_edges(
75
+ "check_previous_email_exist_node",
76
+ after_mail_check,
77
+ {
78
+ "email_writing_agent": "email_writing_agent",
79
+ "prepare_context_node": "prepare_context_node"
80
+ }
81
+ )
82
  builder.add_edge("prepare_context_node", "email_writing_agent")
83
 
84
+
85
  builder.add_conditional_edges(
86
  "email_writing_agent",
87
  tools_condition,
 
103
  builder.add_edge("parse_node", "store_memory_and_data_node")
104
  builder.add_edge("store_memory_and_data_node", END)
105
  builder.add_edge("unsafe_emails_node", END)
106
+ builder.add_edge("archive_node", END)
107
+
108
+
109
+
110
+ graph = builder.compile()
111
+
112
+ display(graph)
app/nodes/archive_node.py CHANGED
@@ -1,5 +1,8 @@
1
  from app.state.state import EmailAgentState
2
  from langchain_core.runnables.config import RunnableConfig
 
 
 
3
 
4
  def archive_node(state: EmailAgentState,config: RunnableConfig) -> dict:
5
 
@@ -12,6 +15,20 @@ def archive_node(state: EmailAgentState,config: RunnableConfig) -> dict:
12
 
13
  thread_id = config.get("configurable", {}).get("thread_id")
14
 
15
- save_received_email(session, user_id, thread_id,state)
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
 
17
  return {}
 
1
  from app.state.state import EmailAgentState
2
  from langchain_core.runnables.config import RunnableConfig
3
+ from app.database.connection import get_session
4
+ from app.database.utils import save_received_email
5
+
6
 
7
  def archive_node(state: EmailAgentState,config: RunnableConfig) -> dict:
8
 
 
15
 
16
  thread_id = config.get("configurable", {}).get("thread_id")
17
 
18
+ with get_session() as session:
19
+ try:
20
+ # 2. Persist the received email even if it's unsafe (for records/logging)
21
+ save_received_email(session, user_id, thread_id, state)
22
+
23
+ session.commit()
24
+ print("--- [QUARANTINE] Data persisted successfully ---")
25
+
26
+ except Exception as e:
27
+ # 4. Rollback in case of an OperationalError or SSL timeout
28
+ session.rollback()
29
+ print(f"--- [QUARANTINE ERROR] Failed to persist unsafe email: {e} ---")
30
+
31
+ raise e
32
 
33
+
34
  return {}
app/nodes/check_email_exist_node.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ from app.state.state import EmailAgentState
3
+ from langchain_google_community import GmailToolkit
4
+
5
+ def check_previous_email_exist_node(state: EmailAgentState):
6
+ # Search for previous interactions with this sender
7
+ toolkit = GmailToolkit()
8
+ search_tool = [t for t in toolkit.get_tools() if t.name == "search_gmail"][0]
9
+ results = search_tool.invoke(f"from:{state['sender_email_id']}")
10
+
11
+ if len(results) == 0:
12
+ # No history found - flag this for the conditional edge
13
+ return {"draft_context": "No relevant past context found."}
14
+
15
+ def after_mail_check(state: EmailAgentState):
16
+ # Check the actual state value
17
+ if state.get("draft_context") == "No relevant past context found.":
18
+ return "email_writing_agent"
19
+ return "prepare_context_node"
app/nodes/email_writing_node.py CHANGED
@@ -2,40 +2,44 @@ import json
2
  from langchain_core.runnables import RunnableConfig
3
  from app.state.state import EmailAgentState
4
  from app.agents.email_writing_agent import email_agent
5
- from app.prompts.email_writing_agent_prompt import email_agent_template
6
  from langchain_core.messages import ToolMessage
7
 
8
-
9
  def email_writing_agent_node(state: EmailAgentState) -> dict:
10
  print("--- DEBUG: ENTERING EMAIL_NODE ---")
 
11
 
 
 
 
 
 
 
 
12
 
13
- messages= state["messages"]
14
-
15
- if len(messages) <= 1:
16
-
17
- final_prompt = email_agent_template.invoke({
18
- "user_name": state.get("user_name"),
19
- "sender_email_id": state.get("sender_email_id"),
20
  "sender_subject": state.get("sender_subject"),
21
  "sender_email_body": state.get("sender_email_body"),
22
- "draft_context": state.get("draft_context") or "No relevant past context found.",
23
- })
24
-
25
- final_prompt = final_prompt.to_messages()
26
 
27
- else:
 
28
 
29
- final_prompt = messages
 
 
 
 
30
 
31
-
32
  response = email_agent.invoke(final_prompt)
33
 
34
-
35
-
36
  return {"messages": [response]}
37
 
38
 
 
39
  def route_after_tools(state: EmailAgentState):
40
  # Iterate backwards to find the latest ToolMessage
41
  # This handles cases where the LLM might have sent a text follow-up
 
2
  from langchain_core.runnables import RunnableConfig
3
  from app.state.state import EmailAgentState
4
  from app.agents.email_writing_agent import email_agent
5
+ from app.prompts.email_writing_agent_prompt import *
6
  from langchain_core.messages import ToolMessage
7
 
 
8
  def email_writing_agent_node(state: EmailAgentState) -> dict:
9
  print("--- DEBUG: ENTERING EMAIL_NODE ---")
10
+ messages = state.get("messages", [])
11
 
12
+ # 1. Prepare Prompts (Your Style)
13
+ # .to_messages() converts the template into a list: [SystemMessage(...)]
14
+ system_prompt = system_prompt_email_agent_template.invoke({
15
+ "user_name": state.get("user_name"),
16
+ "draft_context": state.get("draft_context") or "No relevant past context found.",
17
+ "sender_email_id": state.get("sender_email_id"),
18
+ }).to_messages()
19
 
20
+ human_prompt = human_prompt_email_agent_template.invoke({
 
 
 
 
 
 
21
  "sender_subject": state.get("sender_subject"),
22
  "sender_email_body": state.get("sender_email_body"),
23
+ "sender_email_id": state.get("sender_email_id"),
24
+ }).to_messages()
 
 
25
 
26
+
27
+ if len(messages) == 0:
28
 
29
+ final_prompt = system_prompt + human_prompt
30
+
31
+ else:
32
+
33
+ final_prompt = system_prompt + messages
34
 
35
+ # 3. Invoke
36
  response = email_agent.invoke(final_prompt)
37
 
38
+ # 4. Return
 
39
  return {"messages": [response]}
40
 
41
 
42
+
43
  def route_after_tools(state: EmailAgentState):
44
  # Iterate backwards to find the latest ToolMessage
45
  # This handles cases where the LLM might have sent a text follow-up
app/nodes/store_memory_data_node.py CHANGED
@@ -44,4 +44,4 @@ def store_memory_and_data_node(state: EmailAgentState, config: RunnableConfig):
44
  print(f"--- Memory Node Error: {e} ---")
45
  raise e
46
 
47
- return {"memory_agent_messages": [AIMessage(content="Memory successfully persisted.")]}
 
44
  print(f"--- Memory Node Error: {e} ---")
45
  raise e
46
 
47
+ return {"memory_agent_messages": [AIMessage(content="Memory successfully persisted.")],"email_sent": True}
app/nodes/triage_node.py CHANGED
@@ -26,6 +26,6 @@ def route_after_triage(state: EmailAgentState):
26
  label = state["triage_label"]
27
  # Go to the node that formats the prompt
28
  if label == "FOLLOW_UP_REQUIRED":
29
- return "prepare_context_node"
30
 
31
  return "archive_node"
 
26
  label = state["triage_label"]
27
  # Go to the node that formats the prompt
28
  if label == "FOLLOW_UP_REQUIRED":
29
+ return "check_previous_email_exist_node"
30
 
31
  return "archive_node"
app/prompts/email_writing_agent_prompt.py CHANGED
@@ -1,6 +1,7 @@
1
  from langchain_core.prompts import ChatPromptTemplate
2
 
3
- email_agent_template = ChatPromptTemplate([
 
4
  ("system", """
5
  <role>
6
  You are {user_name}'s email assistant. MUST use tools for every action.
@@ -12,24 +13,30 @@ You are {user_name}'s email assistant. MUST use tools for every action.
12
 
13
  <one_shot_example>
14
  User: "Reply to client."
15
- Agent Tool: create_gmail_draft_tool(...)
16
  Output: "Success! ID: draft_999"
17
  User: "Send it."
18
- Agent Tool: send_draft_by_id_tool(draft_id="draft_999")
19
  Output: "SUCCESS: Sent! Message ID: msg_123"
20
  Output: "Stored."
21
  </one_shot_example>
22
 
23
  <rules>
24
- 1. NEW DRAFT: Call `create_gmail_draft_tool` first.
25
  2. REJECTION: If "DRAFT REJECTED", rewrite and call `create_gmail_draft_tool` again.
26
- 3. SENDING: Only call `send_draft_by_id_tool` when user explicitly orders.
27
- 4. ARCHIVING: After `send_draft_by_id_tool` returns a Message ID.
28
  important:You are not allowed to send until the user explicitly orders to send.
29
  5.Use **{sender_email_id}** as the recipient not the name.
30
  </rules>
31
 
32
- """),
 
 
 
 
 
 
33
  ("human", """
34
  <incoming_email>
35
  Subject : {sender_subject}
@@ -40,3 +47,6 @@ Body : {sender_email_body}
40
 
41
  """)
42
  ])
 
 
 
 
1
  from langchain_core.prompts import ChatPromptTemplate
2
 
3
+
4
+ system_prompt_email_agent_template = ChatPromptTemplate([
5
  ("system", """
6
  <role>
7
  You are {user_name}'s email assistant. MUST use tools for every action.
 
13
 
14
  <one_shot_example>
15
  User: "Reply to client."
16
+ Agent Tool: create_gmail_draft(...)
17
  Output: "Success! ID: draft_999"
18
  User: "Send it."
19
+ Agent Tool: send_draft_by_id(draft_id="draft_999")
20
  Output: "SUCCESS: Sent! Message ID: msg_123"
21
  Output: "Stored."
22
  </one_shot_example>
23
 
24
  <rules>
25
+ 1. NEW DRAFT: Call `create_gmail_draft` first.
26
  2. REJECTION: If "DRAFT REJECTED", rewrite and call `create_gmail_draft_tool` again.
27
+ 3. SENDING: Only call `send_draft_by_id` when user explicitly orders to send. **dont call send_draft_by_id until user allows you to send the draft**
28
+ 4. ARCHIVING: After `send_draft_by_id` returns a Message ID.
29
  important:You are not allowed to send until the user explicitly orders to send.
30
  5.Use **{sender_email_id}** as the recipient not the name.
31
  </rules>
32
 
33
+ """)
34
+ ])
35
+
36
+
37
+
38
+ human_prompt_email_agent_template = ChatPromptTemplate([
39
+
40
  ("human", """
41
  <incoming_email>
42
  Subject : {sender_subject}
 
47
 
48
  """)
49
  ])
50
+
51
+
52
+
app/schemas/memory_agent_schema.py CHANGED
@@ -14,7 +14,7 @@ class EmailMemory(BaseModel):
14
  description="The email address of the person with whom user is communicating."
15
  )
16
 
17
- content: str = Field(
18
  ...,
19
  description="A concise summary of the key information, intent, and decisions found in the emails.The content should be stored using user name"
20
  )
 
14
  description="The email address of the person with whom user is communicating."
15
  )
16
 
17
+ summary: str = Field(
18
  ...,
19
  description="A concise summary of the key information, intent, and decisions found in the emails.The content should be stored using user name"
20
  )
app/state/state.py CHANGED
@@ -56,6 +56,8 @@ class EmailAgentState(TypedDict):
56
 
57
  context_agent_messages:Annotated[Sequence[BaseMessage],add_messages]
58
 
 
 
59
 
60
  human_approved: Optional[bool]
61
  reply_email_body:Optional[str]
 
56
 
57
  context_agent_messages:Annotated[Sequence[BaseMessage],add_messages]
58
 
59
+ email_sent: Optional[bool]
60
+
61
 
62
  human_approved: Optional[bool]
63
  reply_email_body:Optional[str]
app/tools/email_writing_agent_tools.py CHANGED
@@ -8,6 +8,12 @@ from langchain_google_community import GmailToolkit
8
  @tool(args_schema=CreateDraftSchema)
9
  def create_gmail_draft(to: str, subject: str, body: str):
10
  """Creates a new Gmail draft after human approval."""
 
 
 
 
 
 
11
 
12
  # 1. Pause and ask for review
13
  response = interrupt({
 
8
  @tool(args_schema=CreateDraftSchema)
9
  def create_gmail_draft(to: str, subject: str, body: str):
10
  """Creates a new Gmail draft after human approval."""
11
+
12
+ if isinstance(to, list):
13
+ if len(to) > 0:
14
+ to = str(to[0])
15
+ else:
16
+ return "ERROR: 'to' parameter is an empty list. Please provide a valid email string."
17
 
18
  # 1. Pause and ask for review
19
  response = interrupt({