ishaq101 Claude Sonnet 4.6 (1M context) commited on
Commit
08df5ae
·
1 Parent(s): 4083bd8

feat: chat history, room soft-delete, and user migration to PostgreSQL

Browse files

## Chat History
- Persist user and assistant messages to `chat_messages` table after every
chat interaction (both LLM-generated and direct/greeting responses)
- GET /room/{room_id} now returns full chat history via eager-loaded
`messages` relation, sorted by created_at ascending
- Added `ChatMessageResponse` Pydantic model to room response schema

## Room Soft Delete
- Added `status` column (active | inactive) to Room model
- GET /rooms/{user_id} filters to only return active rooms
- New DELETE /room/{room_id}?user_id=... endpoint: sets status to inactive
instead of physically deleting the row (data preserved for audit/recovery)
- Added idempotent migration in init_db.py:
ALTER TABLE rooms ADD COLUMN IF NOT EXISTS status VARCHAR NOT NULL DEFAULT 'active'

## User Migration: MongoDB → PostgreSQL
- Migrated user storage from MongoDB to PostgreSQL (User model added to models.py)
- get_user() now queries PostgreSQL via AsyncSessionLocal instead of MongoClient
- Replaced all emarcal--*--* env var aliases with emarcal__*__* format
across settings.py, users.py (hash_password, encode_jwt, decode_jwt)
- Login endpoint now returns 403 for inactive accounts
- Removed playground_chat.py and playground_flush_cache.py (moved to playground/)
- Updated uvicorn entry point from src.main:app to main:app, port 7860

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

.gitignore CHANGED
@@ -26,7 +26,9 @@ test/users/user_accounts.csv
26
  .env.prd
27
  .env.example
28
 
 
29
  playground_retriever.py
30
- playground_chat.pyc
31
- playground_flush_cache.pyc
 
32
  API_CONTRACT.md
 
26
  .env.prd
27
  .env.example
28
 
29
+ playground/
30
  playground_retriever.py
31
+ playground_chat.py
32
+ playground_flush_cache.py
33
+ playground_create_user.py
34
  API_CONTRACT.md
main.py CHANGED
@@ -63,8 +63,8 @@ async def health_check():
63
 
64
  if __name__ == "__main__":
65
  uvicorn.run(
66
- "src.main:app",
67
  host="0.0.0.0",
68
- port=8000,
69
  reload=True
70
  )
 
63
 
64
  if __name__ == "__main__":
65
  uvicorn.run(
66
+ "main:app",
67
  host="0.0.0.0",
68
+ port=7860,
69
  reload=True
70
  )
src/api/v1/chat.py CHANGED
@@ -1,9 +1,11 @@
1
  """Chat endpoint with streaming support."""
2
 
3
  import asyncio
 
4
  from fastapi import APIRouter, Depends, HTTPException
5
  from sqlalchemy.ext.asyncio import AsyncSession
6
  from src.db.postgres.connection import get_db
 
7
  from src.agents.orchestration import orchestrator
8
  from src.agents.chatbot import chatbot
9
  from src.rag.retriever import retriever
@@ -81,6 +83,13 @@ async def cache_response(redis, cache_key: str, response: str):
81
  await redis.setex(cache_key, 86400, json.dumps(response))
82
 
83
 
 
 
 
 
 
 
 
84
  @router.post("/chat/stream")
85
  @log_execution(logger)
86
  async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
@@ -143,6 +152,7 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
143
  if intent_result.get("direct_response"):
144
  response = intent_result["direct_response"]
145
  await cache_response(redis, cache_key, response)
 
146
 
147
  async def stream_direct():
148
  yield {"event": "sources", "data": json.dumps([])}
@@ -161,6 +171,7 @@ async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
161
  yield {"event": "chunk", "data": token}
162
  yield {"event": "done", "data": ""}
163
  await cache_response(redis, cache_key, full_response)
 
164
 
165
  return EventSourceResponse(stream_response())
166
 
 
1
  """Chat endpoint with streaming support."""
2
 
3
  import asyncio
4
+ import uuid
5
  from fastapi import APIRouter, Depends, HTTPException
6
  from sqlalchemy.ext.asyncio import AsyncSession
7
  from src.db.postgres.connection import get_db
8
+ from src.db.postgres.models import ChatMessage
9
  from src.agents.orchestration import orchestrator
10
  from src.agents.chatbot import chatbot
11
  from src.rag.retriever import retriever
 
83
  await redis.setex(cache_key, 86400, json.dumps(response))
84
 
85
 
86
+ async def save_messages(db: AsyncSession, room_id: str, user_content: str, assistant_content: str):
87
+ """Persist user and assistant messages to chat_messages table."""
88
+ db.add(ChatMessage(id=str(uuid.uuid4()), room_id=room_id, role="user", content=user_content))
89
+ db.add(ChatMessage(id=str(uuid.uuid4()), room_id=room_id, role="assistant", content=assistant_content))
90
+ await db.commit()
91
+
92
+
93
  @router.post("/chat/stream")
94
  @log_execution(logger)
95
  async def chat_stream(request: ChatRequest, db: AsyncSession = Depends(get_db)):
 
152
  if intent_result.get("direct_response"):
153
  response = intent_result["direct_response"]
154
  await cache_response(redis, cache_key, response)
155
+ await save_messages(db, request.room_id, request.message, response)
156
 
157
  async def stream_direct():
158
  yield {"event": "sources", "data": json.dumps([])}
 
171
  yield {"event": "chunk", "data": token}
172
  yield {"event": "done", "data": ""}
173
  await cache_response(redis, cache_key, full_response)
174
+ await save_messages(db, request.room_id, request.message, full_response)
175
 
176
  return EventSourceResponse(stream_response())
177
 
src/api/v1/room.py CHANGED
@@ -3,8 +3,9 @@
3
  from fastapi import APIRouter, Depends, HTTPException, status
4
  from sqlalchemy.ext.asyncio import AsyncSession
5
  from sqlalchemy import select
 
6
  from src.db.postgres.connection import get_db
7
- from src.db.postgres.models import Room
8
  from src.middlewares.logging import get_logger, log_execution
9
  from pydantic import BaseModel
10
  from typing import List
@@ -16,11 +17,19 @@ logger = get_logger("room_api")
16
  router = APIRouter(prefix="/api/v1", tags=["Rooms"])
17
 
18
 
 
 
 
 
 
 
 
19
  class RoomResponse(BaseModel):
20
  id: str
21
  title: str
22
  created_at: str
23
  updated_at: str | None
 
24
 
25
 
26
  class CreateRoomRequest(BaseModel):
@@ -37,7 +46,7 @@ async def list_rooms(
37
  """List all rooms for a user."""
38
  result = await db.execute(
39
  select(Room)
40
- .where(Room.user_id == user_id)
41
  .order_by(Room.updated_at.desc())
42
  )
43
  rooms = result.scalars().all()
@@ -59,9 +68,11 @@ async def get_room(
59
  room_id: str,
60
  db: AsyncSession = Depends(get_db)
61
  ):
62
- """Get a specific room."""
63
  result = await db.execute(
64
- select(Room).where(Room.id == room_id)
 
 
65
  )
66
  room = result.scalars().first()
67
 
@@ -71,12 +82,48 @@ async def get_room(
71
  detail="Room not found"
72
  )
73
 
 
 
74
  return RoomResponse(
75
  id=room.id,
76
  title=room.title,
77
  created_at=room.created_at.isoformat(),
78
- updated_at=room.updated_at.isoformat() if room.updated_at else None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  )
 
 
 
 
 
 
 
 
 
 
 
 
80
 
81
 
82
  @router.post("/room/create")
 
3
  from fastapi import APIRouter, Depends, HTTPException, status
4
  from sqlalchemy.ext.asyncio import AsyncSession
5
  from sqlalchemy import select
6
+ from sqlalchemy.orm import selectinload
7
  from src.db.postgres.connection import get_db
8
+ from src.db.postgres.models import Room, ChatMessage
9
  from src.middlewares.logging import get_logger, log_execution
10
  from pydantic import BaseModel
11
  from typing import List
 
17
  router = APIRouter(prefix="/api/v1", tags=["Rooms"])
18
 
19
 
20
+ class ChatMessageResponse(BaseModel):
21
+ id: str
22
+ role: str
23
+ content: str
24
+ created_at: str
25
+
26
+
27
  class RoomResponse(BaseModel):
28
  id: str
29
  title: str
30
  created_at: str
31
  updated_at: str | None
32
+ messages: List[ChatMessageResponse] = []
33
 
34
 
35
  class CreateRoomRequest(BaseModel):
 
46
  """List all rooms for a user."""
47
  result = await db.execute(
48
  select(Room)
49
+ .where(Room.user_id == user_id, Room.status == "active")
50
  .order_by(Room.updated_at.desc())
51
  )
52
  rooms = result.scalars().all()
 
68
  room_id: str,
69
  db: AsyncSession = Depends(get_db)
70
  ):
71
+ """Get a specific room with its chat history."""
72
  result = await db.execute(
73
+ select(Room)
74
+ .where(Room.id == room_id)
75
+ .options(selectinload(Room.messages))
76
  )
77
  room = result.scalars().first()
78
 
 
82
  detail="Room not found"
83
  )
84
 
85
+ messages = sorted(room.messages, key=lambda m: m.created_at)
86
+
87
  return RoomResponse(
88
  id=room.id,
89
  title=room.title,
90
  created_at=room.created_at.isoformat(),
91
+ updated_at=room.updated_at.isoformat() if room.updated_at else None,
92
+ messages=[
93
+ ChatMessageResponse(
94
+ id=msg.id,
95
+ role=msg.role,
96
+ content=msg.content,
97
+ created_at=msg.created_at.isoformat()
98
+ )
99
+ for msg in messages
100
+ ]
101
+ )
102
+
103
+
104
+ @router.delete("/room/{room_id}")
105
+ @log_execution(logger)
106
+ async def delete_room(
107
+ room_id: str,
108
+ user_id: str,
109
+ db: AsyncSession = Depends(get_db)
110
+ ):
111
+ """Soft-delete a room by setting its status to inactive."""
112
+ result = await db.execute(
113
+ select(Room).where(Room.id == room_id)
114
  )
115
+ room = result.scalars().first()
116
+
117
+ if not room:
118
+ raise HTTPException(status_code=404, detail="Room not found")
119
+
120
+ if room.user_id != user_id:
121
+ raise HTTPException(status_code=403, detail="Access denied")
122
+
123
+ room.status = "inactive"
124
+ await db.commit()
125
+
126
+ return {"status": "success", "message": "Room deleted successfully"}
127
 
128
 
129
  @router.post("/room/create")
src/api/v1/users.py CHANGED
@@ -50,8 +50,12 @@ async def login(payload: ILogin):
50
  status_code=status.HTTP_404_NOT_FOUND,
51
  detail="Email not found"
52
  )
53
-
54
- user_profile.pop("_id", None)
 
 
 
 
55
 
56
  is_verified = verify_password(
57
  password=payload.password,
 
50
  status_code=status.HTTP_404_NOT_FOUND,
51
  detail="Email not found"
52
  )
53
+
54
+ if user_profile.get("status") == "inactive":
55
+ raise HTTPException(
56
+ status_code=status.HTTP_403_FORBIDDEN,
57
+ detail="Account is inactive"
58
+ )
59
 
60
  is_verified = verify_password(
61
  password=payload.password,
src/config/settings.py CHANGED
@@ -51,15 +51,15 @@ class Settings(BaseSettings):
51
  LANGFUSE_HOST: str
52
 
53
  # MongoDB (for users - existing)
54
- emarcal_mongo_endpoint_url: str = Field(alias="emarcal--mongo--endpoint--url", default="")
55
- emarcal_buma_mongo_dbname: str = Field(alias="emarcal--buma--mongo--dbname", default="")
56
 
57
  # JWT (for users - existing)
58
- emarcal_jwt_secret_key: str = Field(alias="emarcal--jwt--secret-key", default="")
59
- emarcal_jwt_algorithm: str = Field(alias="emarcal--jwt--algorithm", default="HS256")
60
 
61
  # Bcrypt salt (for users - existing)
62
- emarcal_bcrypt_salt: str = Field(alias="emarcal--bcrypt--salt", default="")
63
 
64
 
65
  # Singleton instance
 
51
  LANGFUSE_HOST: str
52
 
53
  # MongoDB (for users - existing)
54
+ emarcal_mongo_endpoint_url: str = Field(alias="emarcal__mongo__endpoint__url", default="")
55
+ emarcal_buma_mongo_dbname: str = Field(alias="emarcal__buma__mongo__dbname", default="")
56
 
57
  # JWT (for users - existing)
58
+ emarcal_jwt_secret_key: str = Field(alias="emarcal__jwt__secret_key", default="")
59
+ emarcal_jwt_algorithm: str = Field(alias="emarcal__jwt__algorithm", default="HS256")
60
 
61
  # Bcrypt salt (for users - existing)
62
+ emarcal_bcrypt_salt: str = Field(alias="emarcal__bcrypt__salt", default="")
63
 
64
 
65
  # Singleton instance
src/db/postgres/init_db.py CHANGED
@@ -2,7 +2,7 @@
2
 
3
  from sqlalchemy import text
4
  from src.db.postgres.connection import engine, Base
5
- from src.db.postgres.models import Document, Room, ChatMessage
6
 
7
 
8
  async def init_db():
@@ -16,3 +16,8 @@ async def init_db():
16
 
17
  # Create application tables
18
  await conn.run_sync(Base.metadata.create_all)
 
 
 
 
 
 
2
 
3
  from sqlalchemy import text
4
  from src.db.postgres.connection import engine, Base
5
+ from src.db.postgres.models import Document, Room, ChatMessage, User
6
 
7
 
8
  async def init_db():
 
16
 
17
  # Create application tables
18
  await conn.run_sync(Base.metadata.create_all)
19
+
20
+ # Schema migrations (idempotent — safe to run on every startup)
21
+ await conn.execute(text(
22
+ "ALTER TABLE rooms ADD COLUMN IF NOT EXISTS status VARCHAR NOT NULL DEFAULT 'active'"
23
+ ))
src/db/postgres/models.py CHANGED
@@ -7,8 +7,21 @@ from sqlalchemy.sql import func
7
  from src.db.postgres.connection import Base
8
 
9
 
10
- # Note: Users are managed in MongoDB (src/users/users.py).
11
- # user_id columns here are plain strings — no FK to a postgres users table.
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
 
14
  class Document(Base):
@@ -37,6 +50,8 @@ class Room(Base):
37
  created_at = Column(DateTime(timezone=True), server_default=func.now())
38
  updated_at = Column(DateTime(timezone=True), onupdate=func.now())
39
 
 
 
40
  messages = relationship("ChatMessage", back_populates="room", cascade="all, delete-orphan")
41
 
42
 
 
7
  from src.db.postgres.connection import Base
8
 
9
 
10
+ class User(Base):
11
+ """User model."""
12
+ __tablename__ = "users"
13
+
14
+ id = Column(String, primary_key=True, default=lambda: str(uuid4()))
15
+ fullname = Column(String, nullable=False)
16
+ email = Column(String, nullable=False, unique=True, index=True)
17
+ password = Column(String, nullable=False) # bcrypt-hashed
18
+ company = Column(String)
19
+ company_size = Column(String)
20
+ function = Column(String)
21
+ site = Column(String)
22
+ role = Column(String)
23
+ status = Column(String, nullable=False, default="active") # active | inactive
24
+ created_at = Column(DateTime(timezone=True), server_default=func.now())
25
 
26
 
27
  class Document(Base):
 
50
  created_at = Column(DateTime(timezone=True), server_default=func.now())
51
  updated_at = Column(DateTime(timezone=True), onupdate=func.now())
52
 
53
+ status = Column(String, nullable=False, default="active") # active | inactive
54
+
55
  messages = relationship("ChatMessage", back_populates="room", cascade="all, delete-orphan")
56
 
57
 
src/users/users.py CHANGED
@@ -56,7 +56,7 @@ def hash_password(password: str) -> str:
56
  Returns:
57
  str: Password yang sudah di-hash dan di-salt dengan aman.
58
  """
59
- salt = bytes(os.environ.get('emarcal--bcrypt--salt'), encoding='utf-8')
60
  bpassword = bytes(password, encoding='utf-8')
61
  hashed_password = bcrypt.hashpw(bpassword, salt=salt)
62
  hashed_password = hashed_password.decode('utf-8') # convert byte to str
@@ -95,21 +95,9 @@ import jwt
95
  import uuid
96
  from datetime import datetime
97
  from jwt.exceptions import ExpiredSignatureError, DecodeError
98
- from src.models.user_info import UserCreate
99
- from pymongo import MongoClient
100
-
101
-
102
-
103
- async def get_mongo_conn(collection_name:str = 'users'):
104
- mongo_client = MongoClient(
105
- host=os.environ.get('emarcal--mongo--endpoint--url'),
106
- # serverSelectionTimeoutMS=5000,
107
- # connectTimeoutMS=5000,
108
- # socketTimeoutMS=5000,
109
- )
110
- db = mongo_client[os.environ.get('emarcal--buma--mongo--dbname')]
111
- users_collection = db[collection_name]
112
- return users_collection
113
 
114
 
115
  # @trace_runtime
@@ -118,23 +106,23 @@ def encode_jwt(input:Dict) -> str:
118
  k: (str(v) if isinstance(v, (uuid.UUID, datetime)) else v)
119
  for k, v in input.items()
120
  }
121
- encoded_jwt = jwt.encode(safe_payload, os.environ.get("emarcal--jwt--secret-key"), algorithm=os.environ.get("emarcal--jwt--algorithm"))
122
  return encoded_jwt
123
 
124
 
125
  # @trace_runtime
126
  def decode_jwt(encoded_input:str) -> Any:
127
- decoded_payload = jwt.decode(encoded_input, os.environ.get("emarcal--jwt--secret-key"), algorithms=[os.environ.get("emarcal--jwt--algorithm")])
128
  return decoded_payload
129
 
130
 
131
- async def get_user(email: str) -> dict:
132
  try:
133
- users_collection = await get_mongo_conn(collection_name="users")
134
- user_profile:dict = users_collection.find_one({"email":email})
135
- if user_profile:
136
- return user_profile
137
- else:
138
  return None
139
  except Exception as E:
140
  print(f"❌ get user error, {E}")
@@ -142,19 +130,3 @@ async def get_user(email: str) -> dict:
142
  fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
143
  print(exc_type, fname, exc_tb.tb_lineno)
144
  raise
145
-
146
-
147
- # input = UserCreate(
148
- # fullname="Harry",
149
- # email="harryyanto.ia@bukittechnology.com",
150
- # password="#$ema.harry#$",
151
- # company="BUMA ID",
152
- # company_size="5000+",
153
- # function="MANAGEMENT",
154
- # site="HO",
155
- # role="admin"
156
- # )
157
-
158
- # xxx= input.model_dump()
159
- # encoded_jwt = encode_jwt(xxx)
160
- # decoded_jwt = decode_jwt(encoded_jwt)
 
56
  Returns:
57
  str: Password yang sudah di-hash dan di-salt dengan aman.
58
  """
59
+ salt = bytes(os.environ.get('emarcal__bcrypt__salt'), encoding='utf-8')
60
  bpassword = bytes(password, encoding='utf-8')
61
  hashed_password = bcrypt.hashpw(bpassword, salt=salt)
62
  hashed_password = hashed_password.decode('utf-8') # convert byte to str
 
95
  import uuid
96
  from datetime import datetime
97
  from jwt.exceptions import ExpiredSignatureError, DecodeError
98
+ from sqlalchemy import select
99
+ from src.db.postgres.connection import AsyncSessionLocal
100
+ from src.db.postgres.models import User
 
 
 
 
 
 
 
 
 
 
 
 
101
 
102
 
103
  # @trace_runtime
 
106
  k: (str(v) if isinstance(v, (uuid.UUID, datetime)) else v)
107
  for k, v in input.items()
108
  }
109
+ encoded_jwt = jwt.encode(safe_payload, os.environ.get("emarcal__jwt__secret_key"), algorithm=os.environ.get("emarcal__jwt__algorithm"))
110
  return encoded_jwt
111
 
112
 
113
  # @trace_runtime
114
  def decode_jwt(encoded_input:str) -> Any:
115
+ decoded_payload = jwt.decode(encoded_input, os.environ.get("emarcal__jwt__secret_key"), algorithms=[os.environ.get("emarcal__jwt__algorithm")])
116
  return decoded_payload
117
 
118
 
119
+ async def get_user(email: str) -> dict | None:
120
  try:
121
+ async with AsyncSessionLocal() as session:
122
+ result = await session.execute(select(User).where(User.email == email))
123
+ user = result.scalars().first()
124
+ if user:
125
+ return {c.key: getattr(user, c.key) for c in user.__mapper__.column_attrs}
126
  return None
127
  except Exception as E:
128
  print(f"❌ get user error, {E}")
 
130
  fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
131
  print(exc_type, fname, exc_tb.tb_lineno)
132
  raise