precison9 commited on
Commit
3557eaa
·
1 Parent(s): 5f6f924

integrate RabbitMQ with CloudAMQP

Browse files
app/__pycache__/config.cpython-311.pyc CHANGED
Binary files a/app/__pycache__/config.cpython-311.pyc and b/app/__pycache__/config.cpython-311.pyc differ
 
app/__pycache__/main.cpython-311.pyc CHANGED
Binary files a/app/__pycache__/main.cpython-311.pyc and b/app/__pycache__/main.cpython-311.pyc differ
 
app/auth/__pycache__/routes.cpython-311.pyc CHANGED
Binary files a/app/auth/__pycache__/routes.cpython-311.pyc and b/app/auth/__pycache__/routes.cpython-311.pyc differ
 
app/auth/routes.py CHANGED
@@ -1,5 +1,4 @@
1
  from datetime import datetime, timezone
2
- from typing import Optional
3
  from fastapi import APIRouter, Depends, HTTPException, status
4
  from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
5
  from motor.motor_asyncio import AsyncIOMotorDatabase
@@ -17,11 +16,18 @@ from app.auth.jwt_handler import (
17
  verify_refresh_token,
18
  )
19
  from app.config import settings
 
 
 
 
 
 
20
 
21
  router = APIRouter(prefix="/auth", tags=["Authentication"])
22
  pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
23
  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
24
 
 
25
  async def get_current_user(
26
  token: str = Depends(oauth2_scheme),
27
  db: AsyncIOMotorDatabase = Depends(get_db),
@@ -37,6 +43,7 @@ async def get_current_user(
37
  raise HTTPException(status_code=401, detail="User not found")
38
  return UserPublic(**user)
39
 
 
40
  @router.post("/register", response_model=UserPublic, status_code=status.HTTP_201_CREATED)
41
  async def register(user: UserCreate, db: AsyncIOMotorDatabase = Depends(get_db)):
42
  username = user.username.strip().lower()
@@ -52,8 +59,13 @@ async def register(user: UserCreate, db: AsyncIOMotorDatabase = Depends(get_db))
52
  "created_at": datetime.utcnow(),
53
  }
54
  await db.users.insert_one(doc)
 
 
 
 
55
  return UserPublic(**doc)
56
 
 
57
  @router.post("/login", response_model=Token)
58
  async def login(form_data: OAuth2PasswordRequestForm = Depends(), db=Depends(get_db)):
59
  username = form_data.username.strip().lower()
@@ -63,24 +75,28 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends(), db=Depends(get
63
  access_token = create_access_token(username)
64
  refresh_token = create_refresh_token(username)
65
  payload = decode_token(refresh_token)
66
- await db.sessions.insert_one(
67
- {
68
- "user_id": username,
69
- "refresh_token_hash": hash_refresh_token(refresh_token),
70
- "created_at": datetime.now(timezone.utc),
71
- "expires_at": datetime.fromtimestamp(payload["exp"], tz=timezone.utc),
72
- "revoked_at": None,
73
- }
74
- )
 
 
75
  return Token(
76
  access_token=access_token,
77
  refresh_token=refresh_token,
78
  expires_in=settings.access_token_expire_minutes * 60,
79
  )
80
 
 
81
  class RefreshIn(BaseModel):
82
  refresh_token: str
83
 
 
84
  @router.post("/refresh", response_model=Token)
85
  async def refresh_token(payload: RefreshIn, db=Depends(get_db)):
86
  try:
@@ -91,11 +107,7 @@ async def refresh_token(payload: RefreshIn, db=Depends(get_db)):
91
  raise HTTPException(status_code=401, detail="Invalid token type")
92
  username = decoded.get("sub")
93
  session_doc = await db.sessions.find_one(
94
- {
95
- "user_id": username,
96
- "revoked_at": None,
97
- "expires_at": {"$gt": datetime.now(timezone.utc)},
98
- },
99
  sort=[("created_at", -1)],
100
  )
101
  if not session_doc or not verify_refresh_token(payload.refresh_token, session_doc["refresh_token_hash"]):
@@ -103,19 +115,27 @@ async def refresh_token(payload: RefreshIn, db=Depends(get_db)):
103
  new_access = create_access_token(username)
104
  new_refresh = create_refresh_token(username)
105
  await db.sessions.update_one(
106
- {"_id": session_doc["_id"]}, {"$set": {"revoked_at": datetime.now(timezone.utc)}}
 
107
  )
108
  payload_new = decode_token(new_refresh)
109
- await db.sessions.insert_one(
110
- {
111
- "user_id": username,
112
- "refresh_token_hash": hash_refresh_token(new_refresh),
113
- "created_at": datetime.now(timezone.utc),
114
- "expires_at": datetime.fromtimestamp(payload_new["exp"], tz=timezone.utc),
115
- "revoked_at": None,
116
- }
 
 
 
 
 
 
 
117
  )
118
- return Token(access_token=new_access, refresh_token=new_refresh, expires_in=settings.access_token_expire_minutes * 60)
119
 
120
  @router.post("/logout")
121
  async def logout(payload: RefreshIn, db=Depends(get_db)):
@@ -125,21 +145,21 @@ async def logout(payload: RefreshIn, db=Depends(get_db)):
125
  return {"ok": True}
126
  username = decoded.get("sub")
127
  session_doc = await db.sessions.find_one(
128
- {
129
- "user_id": username,
130
- "revoked_at": None,
131
- "expires_at": {"$gt": datetime.now(timezone.utc)},
132
- },
133
  sort=[("created_at", -1)],
134
  )
135
  if not session_doc:
136
  return {"ok": True}
137
  if verify_refresh_token(payload.refresh_token, session_doc["refresh_token_hash"]):
138
  await db.sessions.update_one(
139
- {"_id": session_doc["_id"]}, {"$set": {"revoked_at": datetime.now(timezone.utc)}}
 
140
  )
 
 
141
  return {"ok": True}
142
 
 
143
  @router.get("/profile", response_model=UserPublic)
144
  async def read_users_me(current_user: UserPublic = Depends(get_current_user)):
145
  return current_user
 
1
  from datetime import datetime, timezone
 
2
  from fastapi import APIRouter, Depends, HTTPException, status
3
  from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
4
  from motor.motor_asyncio import AsyncIOMotorDatabase
 
16
  verify_refresh_token,
17
  )
18
  from app.config import settings
19
+ from app.rabbitmq.publishers import (
20
+ publish_user_registered,
21
+ publish_user_logged_in,
22
+ publish_user_logged_out,
23
+ publish_token_refreshed,
24
+ )
25
 
26
  router = APIRouter(prefix="/auth", tags=["Authentication"])
27
  pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
28
  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
29
 
30
+
31
  async def get_current_user(
32
  token: str = Depends(oauth2_scheme),
33
  db: AsyncIOMotorDatabase = Depends(get_db),
 
43
  raise HTTPException(status_code=401, detail="User not found")
44
  return UserPublic(**user)
45
 
46
+
47
  @router.post("/register", response_model=UserPublic, status_code=status.HTTP_201_CREATED)
48
  async def register(user: UserCreate, db: AsyncIOMotorDatabase = Depends(get_db)):
49
  username = user.username.strip().lower()
 
59
  "created_at": datetime.utcnow(),
60
  }
61
  await db.users.insert_one(doc)
62
+
63
+ # Publish registration event
64
+ await publish_user_registered(username, email, user.company)
65
+
66
  return UserPublic(**doc)
67
 
68
+
69
  @router.post("/login", response_model=Token)
70
  async def login(form_data: OAuth2PasswordRequestForm = Depends(), db=Depends(get_db)):
71
  username = form_data.username.strip().lower()
 
75
  access_token = create_access_token(username)
76
  refresh_token = create_refresh_token(username)
77
  payload = decode_token(refresh_token)
78
+ await db.sessions.insert_one({
79
+ "user_id": username,
80
+ "refresh_token_hash": hash_refresh_token(refresh_token),
81
+ "created_at": datetime.now(timezone.utc),
82
+ "expires_at": datetime.fromtimestamp(payload["exp"], tz=timezone.utc),
83
+ "revoked_at": None,
84
+ })
85
+
86
+ # Publish login event
87
+ await publish_user_logged_in(username)
88
+
89
  return Token(
90
  access_token=access_token,
91
  refresh_token=refresh_token,
92
  expires_in=settings.access_token_expire_minutes * 60,
93
  )
94
 
95
+
96
  class RefreshIn(BaseModel):
97
  refresh_token: str
98
 
99
+
100
  @router.post("/refresh", response_model=Token)
101
  async def refresh_token(payload: RefreshIn, db=Depends(get_db)):
102
  try:
 
107
  raise HTTPException(status_code=401, detail="Invalid token type")
108
  username = decoded.get("sub")
109
  session_doc = await db.sessions.find_one(
110
+ {"user_id": username, "revoked_at": None, "expires_at": {"$gt": datetime.now(timezone.utc)}},
 
 
 
 
111
  sort=[("created_at", -1)],
112
  )
113
  if not session_doc or not verify_refresh_token(payload.refresh_token, session_doc["refresh_token_hash"]):
 
115
  new_access = create_access_token(username)
116
  new_refresh = create_refresh_token(username)
117
  await db.sessions.update_one(
118
+ {"_id": session_doc["_id"]},
119
+ {"$set": {"revoked_at": datetime.now(timezone.utc)}}
120
  )
121
  payload_new = decode_token(new_refresh)
122
+ await db.sessions.insert_one({
123
+ "user_id": username,
124
+ "refresh_token_hash": hash_refresh_token(new_refresh),
125
+ "created_at": datetime.now(timezone.utc),
126
+ "expires_at": datetime.fromtimestamp(payload_new["exp"], tz=timezone.utc),
127
+ "revoked_at": None,
128
+ })
129
+
130
+ # Publish token refresh event
131
+ await publish_token_refreshed(username)
132
+
133
+ return Token(
134
+ access_token=new_access,
135
+ refresh_token=new_refresh,
136
+ expires_in=settings.access_token_expire_minutes * 60,
137
  )
138
+
139
 
140
  @router.post("/logout")
141
  async def logout(payload: RefreshIn, db=Depends(get_db)):
 
145
  return {"ok": True}
146
  username = decoded.get("sub")
147
  session_doc = await db.sessions.find_one(
148
+ {"user_id": username, "revoked_at": None, "expires_at": {"$gt": datetime.now(timezone.utc)}},
 
 
 
 
149
  sort=[("created_at", -1)],
150
  )
151
  if not session_doc:
152
  return {"ok": True}
153
  if verify_refresh_token(payload.refresh_token, session_doc["refresh_token_hash"]):
154
  await db.sessions.update_one(
155
+ {"_id": session_doc["_id"]},
156
+ {"$set": {"revoked_at": datetime.now(timezone.utc)}}
157
  )
158
+ await publish_user_logged_out(username)
159
+
160
  return {"ok": True}
161
 
162
+
163
  @router.get("/profile", response_model=UserPublic)
164
  async def read_users_me(current_user: UserPublic = Depends(get_current_user)):
165
  return current_user
app/config.py CHANGED
@@ -2,14 +2,17 @@ import os
2
  from pydantic_settings import BaseSettings
3
 
4
  class Settings(BaseSettings):
5
- mongo_uri: str = os.getenv("MONGO_URI")
6
- database_name: str = os.getenv("DATABASE_NAME")
7
- groq_api_key: str = os.getenv("GROQ_API_KEY")
8
- secret_key: str = os.getenv("SECRET_KEY")
9
- algorithm: str = os.getenv("ALGORITHM")
10
  access_token_expire_minutes: int = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30))
11
  refresh_token_expire_days: int = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", 7))
12
 
 
 
 
13
  class Config:
14
  env_file = ".env"
15
  env_file_encoding = "utf-8"
 
2
  from pydantic_settings import BaseSettings
3
 
4
  class Settings(BaseSettings):
5
+ mongo_uri: str = os.getenv("MONGO_URI", "")
6
+ database_name: str = os.getenv("DATABASE_NAME", "echoloft")
7
+ groq_api_key: str = os.getenv("GROQ_API_KEY", "")
8
+ secret_key: str = os.getenv("SECRET_KEY", "changeme")
9
+ algorithm: str = os.getenv("ALGORITHM", "HS256")
10
  access_token_expire_minutes: int = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30))
11
  refresh_token_expire_days: int = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", 7))
12
 
13
+ # RabbitMQ — defaults to CloudAMQP free tier URL or local
14
+ rabbitmq_url: str = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost/")
15
+
16
  class Config:
17
  env_file = ".env"
18
  env_file_encoding = "utf-8"
app/main.py CHANGED
@@ -1,16 +1,42 @@
 
 
1
  from fastapi import FastAPI
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from app.auth.routes import router as auth_router
4
  from app.rag.routes import router as rag_router
5
  from app.logging_config import setup_logging
 
 
6
 
7
  setup_logging()
 
8
 
9
- app = FastAPI(title="GrokRAG API", description="SaaS RAG Chat with Groq", version="1.1.0")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
 
11
  app.add_middleware(
12
  CORSMiddleware,
13
- allow_origins=["https://echoloftai.vercel.app/"],
 
 
 
14
  allow_credentials=True,
15
  allow_methods=["*"],
16
  allow_headers=["*"],
@@ -19,6 +45,19 @@ app.add_middleware(
19
  app.include_router(auth_router)
20
  app.include_router(rag_router, prefix="/rag")
21
 
 
22
  @app.get("/")
23
  async def root():
24
- return {"message": "Welcome to GrokRAG API"}
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from contextlib import asynccontextmanager
3
  from fastapi import FastAPI
4
  from fastapi.middleware.cors import CORSMiddleware
5
  from app.auth.routes import router as auth_router
6
  from app.rag.routes import router as rag_router
7
  from app.logging_config import setup_logging
8
+ from app.rabbitmq.connection import rabbitmq
9
+ from app.rabbitmq.consumers import start_consumers
10
 
11
  setup_logging()
12
+ logger = logging.getLogger(__name__)
13
 
14
+
15
+ @asynccontextmanager
16
+ async def lifespan(app: FastAPI):
17
+ # ── Startup ──
18
+ logger.info("Connecting to RabbitMQ...")
19
+ await rabbitmq.connect()
20
+ await start_consumers()
21
+ yield
22
+ # ── Shutdown ──
23
+ logger.info("Disconnecting from RabbitMQ...")
24
+ await rabbitmq.disconnect()
25
+
26
+
27
+ app = FastAPI(
28
+ title="EchoLoft AI API",
29
+ description="SaaS RAG Chat with Groq + RabbitMQ event streaming",
30
+ version="1.2.0",
31
+ lifespan=lifespan,
32
+ )
33
 
34
  app.add_middleware(
35
  CORSMiddleware,
36
+ allow_origins=[
37
+ "http://localhost:3000",
38
+ "https://echoloftai.vercel.app",
39
+ ],
40
  allow_credentials=True,
41
  allow_methods=["*"],
42
  allow_headers=["*"],
 
45
  app.include_router(auth_router)
46
  app.include_router(rag_router, prefix="/rag")
47
 
48
+
49
  @app.get("/")
50
  async def root():
51
+ return {
52
+ "message": "Welcome to EchoLoft AI API",
53
+ "version": "1.2.0",
54
+ "rabbitmq": "connected" if rabbitmq.is_connected else "disconnected",
55
+ }
56
+
57
+
58
+ @app.get("/health")
59
+ async def health():
60
+ return {
61
+ "status": "ok",
62
+ "rabbitmq": "connected" if rabbitmq.is_connected else "disconnected",
63
+ }
app/rabbitmq/__init__.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from .connection import rabbitmq
2
+ from .publishers import (
3
+ publish_user_registered,
4
+ publish_user_logged_in,
5
+ publish_user_logged_out,
6
+ publish_token_refreshed,
7
+ publish_conversation_created,
8
+ publish_message_sent,
9
+ )
10
+
11
+ __all__ = [
12
+ "rabbitmq",
13
+ "publish_user_registered",
14
+ "publish_user_logged_in",
15
+ "publish_user_logged_out",
16
+ "publish_token_refreshed",
17
+ "publish_conversation_created",
18
+ "publish_message_sent",
19
+ ]
app/rabbitmq/__pycache__/__init__.cpython-311.pyc ADDED
Binary file (587 Bytes). View file
 
app/rabbitmq/__pycache__/connection.cpython-311.pyc ADDED
Binary file (6.46 kB). View file
 
app/rabbitmq/__pycache__/consumers.cpython-311.pyc ADDED
Binary file (5.05 kB). View file
 
app/rabbitmq/__pycache__/publishers.cpython-311.pyc ADDED
Binary file (4.43 kB). View file
 
app/rabbitmq/__pycache__/queues.cpython-311.pyc ADDED
Binary file (1.44 kB). View file
 
app/rabbitmq/connection.py ADDED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import logging
3
+ import json
4
+ from typing import Optional, Callable
5
+ import aio_pika
6
+ from aio_pika import Message, DeliveryMode
7
+ from aio_pika.abc import AbstractRobustConnection, AbstractChannel, AbstractQueue
8
+ from app.config import settings
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+ class RabbitMQManager:
13
+ def __init__(self):
14
+ self.connection: Optional[AbstractRobustConnection] = None
15
+ self.channel: Optional[AbstractChannel] = None
16
+
17
+ async def connect(self):
18
+ try:
19
+ self.connection = await aio_pika.connect_robust(
20
+ settings.rabbitmq_url,
21
+ reconnect_interval=5,
22
+ )
23
+ self.channel = await self.connection.channel()
24
+ await self.channel.set_qos(prefetch_count=10)
25
+ logger.info("RabbitMQ connected successfully")
26
+ except Exception as e:
27
+ logger.error(f"RabbitMQ connection failed: {e}")
28
+ self.connection = None
29
+ self.channel = None
30
+
31
+ async def disconnect(self):
32
+ try:
33
+ if self.connection and not self.connection.is_closed:
34
+ await self.connection.close()
35
+ logger.info("RabbitMQ disconnected")
36
+ except Exception as e:
37
+ logger.error(f"RabbitMQ disconnect error: {e}")
38
+
39
+ async def declare_queue(self, queue_name: str, durable: bool = True) -> Optional[AbstractQueue]:
40
+ if not self.channel:
41
+ logger.warning("RabbitMQ channel not available")
42
+ return None
43
+ try:
44
+ queue = await self.channel.declare_queue(
45
+ queue_name,
46
+ durable=durable,
47
+ arguments={"x-message-ttl": 86400000} # 24hr TTL
48
+ )
49
+ return queue
50
+ except Exception as e:
51
+ logger.error(f"Queue declare error: {e}")
52
+ return None
53
+
54
+ async def publish(self, queue_name: str, payload: dict) -> bool:
55
+ if not self.channel:
56
+ logger.warning("RabbitMQ not connected — skipping publish")
57
+ return False
58
+ try:
59
+ await self.declare_queue(queue_name)
60
+ message = Message(
61
+ body=json.dumps(payload).encode(),
62
+ delivery_mode=DeliveryMode.PERSISTENT,
63
+ content_type="application/json",
64
+ )
65
+ await self.channel.default_exchange.publish(
66
+ message,
67
+ routing_key=queue_name,
68
+ )
69
+ logger.info(f"Published to {queue_name}: {payload.get('event', 'unknown')}")
70
+ return True
71
+ except Exception as e:
72
+ logger.error(f"Publish error: {e}")
73
+ return False
74
+
75
+ async def consume(self, queue_name: str, callback: Callable):
76
+ if not self.channel:
77
+ logger.warning("RabbitMQ not connected — skipping consume")
78
+ return
79
+ try:
80
+ queue = await self.declare_queue(queue_name)
81
+ if queue:
82
+ await queue.consume(callback)
83
+ logger.info(f"Consuming from queue: {queue_name}")
84
+ except Exception as e:
85
+ logger.error(f"Consume error: {e}")
86
+
87
+ @property
88
+ def is_connected(self) -> bool:
89
+ return self.connection is not None and not self.connection.is_closed
90
+
91
+
92
+ # Singleton instance
93
+ rabbitmq = RabbitMQManager()
app/rabbitmq/consumers.py ADDED
@@ -0,0 +1,59 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import logging
3
+ from aio_pika.abc import AbstractIncomingMessage
4
+ from app.rabbitmq.connection import rabbitmq
5
+ from app.rabbitmq.queues import (
6
+ QUEUE_AUTH_EVENTS, QUEUE_CHAT_EVENTS,
7
+ QUEUE_AUDIT_LOG, QUEUE_NOTIFICATIONS
8
+ )
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+
13
+ async def handle_auth_event(message: AbstractIncomingMessage):
14
+ async with message.process():
15
+ try:
16
+ data = json.loads(message.body.decode())
17
+ event = data.get("event")
18
+ logger.info(f"[AUTH EVENT] {event} — user: {data.get('username')}")
19
+ # Add your business logic here:
20
+ # e.g. send welcome email on user.registered
21
+ # e.g. alert on suspicious login patterns
22
+ except Exception as e:
23
+ logger.error(f"Auth event handler error: {e}")
24
+
25
+
26
+ async def handle_chat_event(message: AbstractIncomingMessage):
27
+ async with message.process():
28
+ try:
29
+ data = json.loads(message.body.decode())
30
+ event = data.get("event")
31
+ logger.info(f"[CHAT EVENT] {event} — conv: {data.get('conversation_id')}")
32
+ # Add your business logic here:
33
+ # e.g. track usage metrics
34
+ # e.g. send notifications
35
+ except Exception as e:
36
+ logger.error(f"Chat event handler error: {e}")
37
+
38
+
39
+ async def handle_audit_log(message: AbstractIncomingMessage):
40
+ async with message.process():
41
+ try:
42
+ data = json.loads(message.body.decode())
43
+ logger.info(f"[AUDIT] {data.get('event')} — user: {data.get('user')} at {data.get('timestamp')}")
44
+ # Add your business logic here:
45
+ # e.g. write to audit collection in MongoDB
46
+ # e.g. forward to external SIEM
47
+ except Exception as e:
48
+ logger.error(f"Audit log handler error: {e}")
49
+
50
+
51
+ async def start_consumers():
52
+ """Register all consumers. Called on app startup."""
53
+ if not rabbitmq.is_connected:
54
+ logger.warning("RabbitMQ not connected — consumers not started")
55
+ return
56
+ await rabbitmq.consume(QUEUE_AUTH_EVENTS, handle_auth_event)
57
+ await rabbitmq.consume(QUEUE_CHAT_EVENTS, handle_chat_event)
58
+ await rabbitmq.consume(QUEUE_AUDIT_LOG, handle_audit_log)
59
+ logger.info("All RabbitMQ consumers started")
app/rabbitmq/publishers.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from datetime import datetime, timezone
2
+ from app.rabbitmq.connection import rabbitmq
3
+ from app.rabbitmq.queues import (
4
+ QUEUE_AUTH_EVENTS, QUEUE_CHAT_EVENTS,
5
+ QUEUE_AUDIT_LOG, AuthEvent, ChatEvent
6
+ )
7
+
8
+
9
+ async def publish_user_registered(username: str, email: str, company: str):
10
+ await rabbitmq.publish(QUEUE_AUTH_EVENTS, {
11
+ "event": AuthEvent.USER_REGISTERED,
12
+ "username": username,
13
+ "email": email,
14
+ "company": company,
15
+ "timestamp": datetime.now(timezone.utc).isoformat(),
16
+ })
17
+ await rabbitmq.publish(QUEUE_AUDIT_LOG, {
18
+ "event": AuthEvent.USER_REGISTERED,
19
+ "user": username,
20
+ "timestamp": datetime.now(timezone.utc).isoformat(),
21
+ })
22
+
23
+
24
+ async def publish_user_logged_in(username: str):
25
+ await rabbitmq.publish(QUEUE_AUTH_EVENTS, {
26
+ "event": AuthEvent.USER_LOGGED_IN,
27
+ "username": username,
28
+ "timestamp": datetime.now(timezone.utc).isoformat(),
29
+ })
30
+ await rabbitmq.publish(QUEUE_AUDIT_LOG, {
31
+ "event": AuthEvent.USER_LOGGED_IN,
32
+ "user": username,
33
+ "timestamp": datetime.now(timezone.utc).isoformat(),
34
+ })
35
+
36
+
37
+ async def publish_user_logged_out(username: str):
38
+ await rabbitmq.publish(QUEUE_AUTH_EVENTS, {
39
+ "event": AuthEvent.USER_LOGGED_OUT,
40
+ "username": username,
41
+ "timestamp": datetime.now(timezone.utc).isoformat(),
42
+ })
43
+
44
+
45
+ async def publish_token_refreshed(username: str):
46
+ await rabbitmq.publish(QUEUE_AUTH_EVENTS, {
47
+ "event": AuthEvent.TOKEN_REFRESHED,
48
+ "username": username,
49
+ "timestamp": datetime.now(timezone.utc).isoformat(),
50
+ })
51
+
52
+
53
+ async def publish_conversation_created(user_id: str, conversation_id: str):
54
+ await rabbitmq.publish(QUEUE_CHAT_EVENTS, {
55
+ "event": ChatEvent.CONVERSATION_CREATED,
56
+ "user_id": user_id,
57
+ "conversation_id": conversation_id,
58
+ "timestamp": datetime.now(timezone.utc).isoformat(),
59
+ })
60
+
61
+
62
+ async def publish_message_sent(user_id: str, conversation_id: str, model: str, message_preview: str):
63
+ await rabbitmq.publish(QUEUE_CHAT_EVENTS, {
64
+ "event": ChatEvent.MESSAGE_SENT,
65
+ "user_id": user_id,
66
+ "conversation_id": conversation_id,
67
+ "model": model,
68
+ "message_preview": message_preview[:100],
69
+ "timestamp": datetime.now(timezone.utc).isoformat(),
70
+ })
app/rabbitmq/queues.py ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Queue Names
2
+ QUEUE_CHAT_EVENTS = "chat.events"
3
+ QUEUE_AUTH_EVENTS = "auth.events"
4
+ QUEUE_RAG_JOBS = "rag.jobs"
5
+ QUEUE_NOTIFICATIONS = "notifications"
6
+ QUEUE_AUDIT_LOG = "audit.log"
7
+
8
+ # Event Types
9
+ class AuthEvent:
10
+ USER_REGISTERED = "user.registered"
11
+ USER_LOGGED_IN = "user.logged_in"
12
+ USER_LOGGED_OUT = "user.logged_out"
13
+ TOKEN_REFRESHED = "token.refreshed"
14
+
15
+ class ChatEvent:
16
+ MESSAGE_SENT = "message.sent"
17
+ MESSAGE_RECEIVED = "message.received"
18
+ CONVERSATION_CREATED = "conversation.created"
19
+
20
+ class RagEvent:
21
+ JOB_QUEUED = "rag.job.queued"
22
+ JOB_STARTED = "rag.job.started"
23
+ JOB_COMPLETED = "rag.job.completed"
24
+ JOB_FAILED = "rag.job.failed"
app/rag/__pycache__/routes.cpython-311.pyc CHANGED
Binary files a/app/rag/__pycache__/routes.cpython-311.pyc and b/app/rag/__pycache__/routes.cpython-311.pyc differ
 
app/rag/routes.py CHANGED
@@ -15,6 +15,10 @@ from app.auth.models import UserPublic
15
  from app.rag.models import ALLOWED_MODELS, Message
16
  from app.rag.rag_processor import build_context_from_files, web_search
17
  from app.config import settings
 
 
 
 
18
 
19
  router = APIRouter(tags=["RAG Chat"])
20
  logger = logging.getLogger(__name__)
@@ -34,6 +38,7 @@ WEB_SEARCH_TOOL = {
34
  },
35
  }
36
 
 
37
  @router.post("/conversations", status_code=status.HTTP_201_CREATED)
38
  async def create_conversation(
39
  current_user: UserPublic = Depends(get_current_user),
@@ -41,8 +46,13 @@ async def create_conversation(
41
  ):
42
  conv = ConversationDB(user_id=current_user.username)
43
  result = await db.conversations.insert_one(conv.dict(exclude={"id"}))
44
- conv.id = str(result.inserted_id)
45
- return {"conversation_id": conv.id}
 
 
 
 
 
46
 
47
  @router.get("/conversations/{conv_id}")
48
  async def get_conversation(
@@ -52,7 +62,7 @@ async def get_conversation(
52
  ):
53
  try:
54
  oid = ObjectId(conv_id)
55
- except:
56
  raise HTTPException(status_code=400, detail="Invalid conversation ID")
57
  conv = await db.conversations.find_one({"_id": oid, "user_id": current_user.username})
58
  if not conv:
@@ -61,6 +71,7 @@ async def get_conversation(
61
  del conv["_id"]
62
  return conv
63
 
 
64
  @router.post("/conversations/{conv_id}/messages")
65
  async def send_message(
66
  conv_id: str,
@@ -75,39 +86,41 @@ async def send_message(
75
  raise HTTPException(status_code=400, detail="Invalid model")
76
  try:
77
  oid = ObjectId(conv_id)
78
- except:
79
  raise HTTPException(status_code=400, detail="Invalid conversation ID")
80
  conv = await db.conversations.find_one({"_id": oid, "user_id": current_user.username})
81
  if not conv:
82
  raise HTTPException(status_code=404, detail="Conversation not found")
83
 
84
- # Load messages
85
  messages = [Message(**m) for m in conv.get("messages", [])]
86
 
87
- # Build RAG context if files
88
  rag_context = ""
89
  if files:
90
  rag_context = build_context_from_files(files, message)
91
-
92
- # System prompt with context
93
- system_msg = {"role": "system", "content": SYSTEM_PROMPT + (f"\n\nContext: {rag_context}" if rag_context else "")}
94
 
95
- # Append user message
 
 
 
 
96
  user_msg = Message(role="user", content=message)
97
  messages.append(user_msg)
98
 
99
- # Groq client
100
- client = Groq(api_key=settings.groq_api_key)
 
 
 
 
 
101
 
102
- # Tools if enabled
103
  tools = [WEB_SEARCH_TOOL] if enable_web_search else None
104
 
105
- # Tool loop for reasoning and multiple calls (up to 3 iterations)
106
- chat_history = [
107
- system_msg if isinstance(system_msg, dict) else system_msg.dict()
108
- ] + [
109
- m if isinstance(m, dict) else m.dict() for m in messages
110
- ]
111
  max_tool_loops = 3
112
  for _ in range(max_tool_loops):
113
  completion = client.chat.completions.create(
@@ -122,25 +135,21 @@ async def send_message(
122
  )
123
  choice = completion.choices[0].message
124
  if not choice.tool_calls:
125
- # No more tools, prepare to stream
126
  break
127
  for tool_call in choice.tool_calls:
128
  if tool_call.function.name == "web_search":
129
  args = json.loads(tool_call.function.arguments)
130
- query = args["query"]
131
- result = web_search(query)
132
- tool_response = {
133
  "role": "tool",
134
  "tool_call_id": tool_call.id,
135
  "name": "web_search",
136
  "content": result,
137
- }
138
- chat_history.append(tool_response)
139
  else:
140
  logger.warning("Max tool loops reached")
141
  raise HTTPException(status_code=500, detail="Too many tool calls")
142
 
143
- # Final streaming call
144
  completion = client.chat.completions.create(
145
  model=model,
146
  messages=chat_history,
@@ -151,18 +160,19 @@ async def send_message(
151
  stop=None,
152
  )
153
 
154
- # Stream response
155
  async def generate():
156
  response_content = ""
157
  for chunk in completion:
158
  content = chunk.choices[0].delta.content or ""
159
  response_content += content
160
  yield content
161
- # Save to DB
162
  messages.append(Message(role="assistant", content=response_content))
163
  await db.conversations.update_one(
164
  {"_id": oid},
165
- {"$set": {"messages": [m.dict() for m in messages], "updated_at": datetime.utcnow()}}
 
 
 
166
  )
167
 
168
  return StreamingResponse(generate(), media_type="text/event-stream")
 
15
  from app.rag.models import ALLOWED_MODELS, Message
16
  from app.rag.rag_processor import build_context_from_files, web_search
17
  from app.config import settings
18
+ from app.rabbitmq.publishers import (
19
+ publish_conversation_created,
20
+ publish_message_sent,
21
+ )
22
 
23
  router = APIRouter(tags=["RAG Chat"])
24
  logger = logging.getLogger(__name__)
 
38
  },
39
  }
40
 
41
+
42
  @router.post("/conversations", status_code=status.HTTP_201_CREATED)
43
  async def create_conversation(
44
  current_user: UserPublic = Depends(get_current_user),
 
46
  ):
47
  conv = ConversationDB(user_id=current_user.username)
48
  result = await db.conversations.insert_one(conv.dict(exclude={"id"}))
49
+ conv_id = str(result.inserted_id)
50
+
51
+ # Publish conversation created event
52
+ await publish_conversation_created(current_user.username, conv_id)
53
+
54
+ return {"conversation_id": conv_id}
55
+
56
 
57
  @router.get("/conversations/{conv_id}")
58
  async def get_conversation(
 
62
  ):
63
  try:
64
  oid = ObjectId(conv_id)
65
+ except Exception:
66
  raise HTTPException(status_code=400, detail="Invalid conversation ID")
67
  conv = await db.conversations.find_one({"_id": oid, "user_id": current_user.username})
68
  if not conv:
 
71
  del conv["_id"]
72
  return conv
73
 
74
+
75
  @router.post("/conversations/{conv_id}/messages")
76
  async def send_message(
77
  conv_id: str,
 
86
  raise HTTPException(status_code=400, detail="Invalid model")
87
  try:
88
  oid = ObjectId(conv_id)
89
+ except Exception:
90
  raise HTTPException(status_code=400, detail="Invalid conversation ID")
91
  conv = await db.conversations.find_one({"_id": oid, "user_id": current_user.username})
92
  if not conv:
93
  raise HTTPException(status_code=404, detail="Conversation not found")
94
 
 
95
  messages = [Message(**m) for m in conv.get("messages", [])]
96
 
 
97
  rag_context = ""
98
  if files:
99
  rag_context = build_context_from_files(files, message)
 
 
 
100
 
101
+ system_msg = {
102
+ "role": "system",
103
+ "content": SYSTEM_PROMPT + (f"\n\nContext:\n{rag_context}" if rag_context else "")
104
+ }
105
+
106
  user_msg = Message(role="user", content=message)
107
  messages.append(user_msg)
108
 
109
+ # Publish message sent event
110
+ await publish_message_sent(
111
+ user_id=current_user.username,
112
+ conversation_id=conv_id,
113
+ model=model,
114
+ message_preview=message,
115
+ )
116
 
117
+ client = Groq(api_key=settings.groq_api_key)
118
  tools = [WEB_SEARCH_TOOL] if enable_web_search else None
119
 
120
+ chat_history = [system_msg] + [
121
+ m if isinstance(m, dict) else m.dict() for m in messages
122
+ ]
123
+
 
 
124
  max_tool_loops = 3
125
  for _ in range(max_tool_loops):
126
  completion = client.chat.completions.create(
 
135
  )
136
  choice = completion.choices[0].message
137
  if not choice.tool_calls:
 
138
  break
139
  for tool_call in choice.tool_calls:
140
  if tool_call.function.name == "web_search":
141
  args = json.loads(tool_call.function.arguments)
142
+ result = web_search(args["query"])
143
+ chat_history.append({
 
144
  "role": "tool",
145
  "tool_call_id": tool_call.id,
146
  "name": "web_search",
147
  "content": result,
148
+ })
 
149
  else:
150
  logger.warning("Max tool loops reached")
151
  raise HTTPException(status_code=500, detail="Too many tool calls")
152
 
 
153
  completion = client.chat.completions.create(
154
  model=model,
155
  messages=chat_history,
 
160
  stop=None,
161
  )
162
 
 
163
  async def generate():
164
  response_content = ""
165
  for chunk in completion:
166
  content = chunk.choices[0].delta.content or ""
167
  response_content += content
168
  yield content
 
169
  messages.append(Message(role="assistant", content=response_content))
170
  await db.conversations.update_one(
171
  {"_id": oid},
172
+ {"$set": {
173
+ "messages": [m.dict() for m in messages],
174
+ "updated_at": datetime.utcnow()
175
+ }}
176
  )
177
 
178
  return StreamingResponse(generate(), media_type="text/event-stream")
requirements.txt CHANGED
@@ -19,5 +19,5 @@ openpyxl==3.1.2
19
  pandas==2.2.2
20
  duckduckgo-search==6.1.9
21
  python-multipart==0.0.9
22
-
23
  numpy==1.26.4
 
19
  pandas==2.2.2
20
  duckduckgo-search==6.1.9
21
  python-multipart==0.0.9
22
+ pika==9.4.1
23
  numpy==1.26.4