Gaykar commited on
Commit
9cb712d
·
1 Parent(s): 4ed95ae

added parts

Browse files
app/{memory_store → agent_memory_store}/embeddings.py RENAMED
File without changes
app/{memory_store → agent_memory_store}/memory_store.py RENAMED
@@ -1,7 +1,7 @@
1
  from psycopg_pool import ConnectionPool
2
  from langgraph.store.postgres import PostgresStore
3
  from langgraph.checkpoint.postgres import PostgresSaver
4
- from app.memory_store.embeddings import remote_embeddings
5
 
6
  from app.core.config import settings
7
 
@@ -14,5 +14,7 @@ pool = ConnectionPool(
14
  max_size=10,
15
  kwargs={"autocommit": True}
16
  )
 
 
17
 
18
  memory_store = PostgresStore(pool, index={"dims": 384, "embed": remote_embeddings})
 
1
  from psycopg_pool import ConnectionPool
2
  from langgraph.store.postgres import PostgresStore
3
  from langgraph.checkpoint.postgres import PostgresSaver
4
+ from app.agent_memory_store.embeddings import remote_embeddings
5
 
6
  from app.core.config import settings
7
 
 
14
  max_size=10,
15
  kwargs={"autocommit": True}
16
  )
17
+ 4
18
+
19
 
20
  memory_store = PostgresStore(pool, index={"dims": 384, "embed": remote_embeddings})
app/agents/memory_manager_agent.py CHANGED
@@ -2,6 +2,7 @@ import types
2
  from langchain_groq import ChatGroq
3
  from langmem import create_memory_store_manager
4
  from app.schemas.memory_agent_schema import EmailMemory
 
5
  import os
6
  from app.core.config import settings
7
 
@@ -45,11 +46,12 @@ def patch_groq_for_extractions(model: ChatGroq):
45
  model=ChatGroq(model="openai/gpt-oss-20b", temperature=0.2)
46
  model = patch_groq_for_extractions(model)
47
 
 
48
  memory_manager_agent = create_memory_store_manager(
49
  model,
50
  schemas=[EmailMemory],
51
  namespace=namespace,
52
- store=p_store,
53
  instructions="Extract required info from incoming mail and its reply .",
54
  enable_inserts=True,
55
  enable_deletes=True,
 
2
  from langchain_groq import ChatGroq
3
  from langmem import create_memory_store_manager
4
  from app.schemas.memory_agent_schema import EmailMemory
5
+ from app.agent_memory_store import memory_store
6
  import os
7
  from app.core.config import settings
8
 
 
46
  model=ChatGroq(model="openai/gpt-oss-20b", temperature=0.2)
47
  model = patch_groq_for_extractions(model)
48
 
49
+ namespace = ("emails", "{user_id}", "collection")
50
  memory_manager_agent = create_memory_store_manager(
51
  model,
52
  schemas=[EmailMemory],
53
  namespace=namespace,
54
+ store=memory_store,
55
  instructions="Extract required info from incoming mail and its reply .",
56
  enable_inserts=True,
57
  enable_deletes=True,
app/core/config.py CHANGED
@@ -12,6 +12,11 @@ class Settings(BaseSettings):
12
  CLOUDINARY_CLOUD_NAME: str
13
  CLOUDINARY_API_KEY: str
14
  CLOUDINARY_API_SECRET: str
 
 
 
 
 
15
 
16
  model_config = SettingsConfigDict(
17
  env_file=str(BASE_DIR / ".env"),
@@ -19,4 +24,6 @@ class Settings(BaseSettings):
19
  extra="ignore"
20
  )
21
 
22
- settings = Settings()
 
 
 
12
  CLOUDINARY_CLOUD_NAME: str
13
  CLOUDINARY_API_KEY: str
14
  CLOUDINARY_API_SECRET: str
15
+
16
+
17
+ DB_URL_FOR_CHECKPOINTER_STORE: str
18
+
19
+ DB_URL_FOR_SQL_AL:str
20
 
21
  model_config = SettingsConfigDict(
22
  env_file=str(BASE_DIR / ".env"),
 
24
  extra="ignore"
25
  )
26
 
27
+ settings = Settings()
28
+
29
+
app/database/connection.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import create_engine
2
+ from sqlalchemy.orm import sessionmaker, Session
3
+ from app.core.config import DB_URL_FOR_SQL_AL
4
+ engine = create_engine(
5
+ DB_URL_FOR_SQL_AL,
6
+ echo=False,
7
+ pool_pre_ping=True,
8
+ pool_recycle=1800
9
+ )
10
+
11
+ SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
12
+
13
+ def get_session() -> Session:
14
+ return SessionLocal()
app/database/models.py ADDED
@@ -0,0 +1,49 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import (
2
+ Column, Integer, Text, Boolean, DateTime, ForeignKey, String, func
3
+
4
+ )
5
+ from sqlalchemy.orm import DeclarativeBase, relationship
6
+
7
+ class Base(DeclarativeBase):
8
+ pass
9
+
10
+
11
+ class User(Base):
12
+ __tablename__ = "users"
13
+
14
+ id = Column(Integer, primary_key=True, autoincrement=True)
15
+ email = Column(String, unique=True, nullable=False)
16
+ created_at = Column(DateTime, default=func.now())
17
+
18
+ received_emails = relationship("ReceivedEmail", back_populates="owner")
19
+ sent_emails = relationship("SentEmail", back_populates="sender")
20
+
21
+
22
+ class ReceivedEmail(Base):
23
+ __tablename__ = "received_emails"
24
+
25
+ id = Column(Integer, primary_key=True, autoincrement=True)
26
+ thread_id = Column(String, nullable=False)
27
+ owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
28
+ sender_email = Column(Text, nullable=False)
29
+ subject = Column(Text)
30
+ body = Column(Text)
31
+ received_at = Column(DateTime, default=func.now())
32
+ is_safe = Column(Boolean, default=False)
33
+ owner = relationship("User", back_populates="received_emails")
34
+
35
+
36
+ class SentEmail(Base):
37
+ __tablename__ = "sent_emails"
38
+
39
+ id = Column(Integer, primary_key=True, autoincrement=True)
40
+ thread_id = Column(String, nullable=False)
41
+ sender_id = Column(Integer, ForeignKey("users.id"), nullable=False)
42
+ recipient_email = Column(Text, nullable=False)
43
+ subject = Column(Text)
44
+ body = Column(Text)
45
+ sent_at = Column(DateTime, default=func.now())
46
+
47
+ sender = relationship("User", back_populates="sent_emails")
48
+
49
+
app/database/utils.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from app.database.models import User, ReceivedEmail, SentEmail
2
+ from sqlalchemy.orm import Session
3
+
4
+ def get_or_create_user(session, email: str):
5
+ user = session.query(User).filter_by(email=email).first()
6
+
7
+ if not user:
8
+ user = User(email=email)
9
+ session.add(user)
10
+ session.commit()
11
+ session.refresh(user)
12
+
13
+ return user
14
+
15
+
16
+
17
+
18
+ def save_received_email(session: Session, owner_id: int,thread_id: str, state: dict) -> ReceivedEmail:
19
+ """
20
+ Persists the incoming email and the agent's initial triage results.
21
+ """
22
+ email = ReceivedEmail(
23
+ owner_id = owner_id,
24
+ thread_id = thread_id,
25
+ sender_email = state.get("sender_email_id"),
26
+ subject = state.get("sender_subject"),
27
+ body = state.get("sender_email_body"),
28
+ is_safe = state.get("is_safe"),
29
+ )
30
+ session.add(email)
31
+ # session.flush() # Use flush to get the ID without closing the transaction
32
+ session.commit() # Only commit here if this is a standalone operation
33
+ return email
34
+
35
+
36
+ def save_sent_email(session: Session, sender_id: int, thread_id: str,state: dict) -> SentEmail:
37
+ """
38
+ Persists the final outbound reply sent by the agent.
39
+ """
40
+ # Ensure we don't save an empty email if the agent decided not to reply
41
+
42
+ email = SentEmail(
43
+ sender_id = sender_id,
44
+ thread_id = thread_id,
45
+ recipient_email = state.get("sender_email_id"),
46
+ subject = state.get("reply_subject") or f"Re: {state.get('sender_subject')}",
47
+ body = state.get("reply_email_body"),
48
+ )
49
+ session.add(email)
50
+ session.commit()
51
+ return email
52
+
app/nodes/store_memory_data_node.py CHANGED
@@ -4,6 +4,8 @@ from app.agents.memory_manager_agent import memory_manager_agent
4
  from app.prompts.memory_manager_agent_prompt import memory_agent_template
5
  from app.state.state import EmailAgentState
6
  from app.agents.memory_manager_agent import memory_manager_agent
 
 
7
 
8
  def store_memory_and_data_node(state: EmailAgentState, config: RunnableConfig):
9
  """
 
4
  from app.prompts.memory_manager_agent_prompt import memory_agent_template
5
  from app.state.state import EmailAgentState
6
  from app.agents.memory_manager_agent import memory_manager_agent
7
+ from app.database.connection import get_session
8
+ from app.database.utils import save_sent_email, save_received_email
9
 
10
  def store_memory_and_data_node(state: EmailAgentState, config: RunnableConfig):
11
  """
app/nodes/unsafe_email_nodes.py CHANGED
@@ -1,14 +1,32 @@
1
  from app.state.state import EmailAgentState
2
  from langchain_core.runnables.config import RunnableConfig
 
 
3
 
4
- def unsafe_emails_node(state: EmailAgentState,config: RunnableConfig) -> dict:
5
- print(f"[QUARANTINE] {state['sender_subject']} — reason: {state['safety_reason']}")
 
 
 
 
 
6
 
7
- session=get_session()
 
8
 
9
- user_id=state['user_id']
 
 
 
10
 
11
- thread_id = config.get("configurable", {}).get("thread_id")
 
 
 
 
 
 
 
 
12
 
13
- save_received_email(session, user_id, thread_id,state)
14
- return {}
 
1
  from app.state.state import EmailAgentState
2
  from langchain_core.runnables.config import RunnableConfig
3
+ from app.database.connection import get_session
4
+ from app.database.utils import save_received_email
5
 
6
+ def unsafe_emails_node(state: EmailAgentState, config: RunnableConfig) -> dict:
7
+ """
8
+ Handles emails flagged as unsafe by persisting them to the database
9
+ and logging the security reason.
10
+ """
11
+ print(f"--- [QUARANTINE SIGNAL] {state['sender_subject']} ---")
12
+ print(f"Reason: {state['safety_reason']}")
13
 
14
+ user_id = state['user_id']
15
+ thread_id = config.get("configurable", {}).get("thread_id")
16
 
17
+ with get_session() as session:
18
+ try:
19
+ # 2. Persist the received email even if it's unsafe (for records/logging)
20
+ save_received_email(session, user_id, thread_id, state)
21
 
22
+ session.commit()
23
+ print("--- [QUARANTINE] Data persisted successfully ---")
24
+
25
+ except Exception as e:
26
+ # 4. Rollback in case of an OperationalError or SSL timeout
27
+ session.rollback()
28
+ print(f"--- [QUARANTINE ERROR] Failed to persist unsafe email: {e} ---")
29
+
30
+ raise e
31
 
32
+ return {}