npv2k1 commited on
Commit
bc0521a
·
1 Parent(s): d875f25

feat: implement WebSocket support and RabbitMQ integration for message handling

Browse files
src/modules/api/__init__.py CHANGED
@@ -1,3 +1,14 @@
1
- """API module initialization."""
2
- print("API module loaded.")
3
- import src.modules.api.index
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uvicorn
2
+ from .app import app as api_app
3
+ import logging
4
+
5
+ async def server():
6
+ """Main application entry point."""
7
+ try:
8
+ print("Start api...")
9
+ config = uvicorn.Config(api_app, host="0.0.0.0", port=8080, log_level="info")
10
+ server = uvicorn.Server(config)
11
+ await server.serve()
12
+ except Exception as e:
13
+ logging.error(f"Application failed to start: {e}", exc_info=True)
14
+ raise
src/modules/api/app.py CHANGED
@@ -1,10 +1,26 @@
1
  """FastAPI application factory."""
2
 
3
- from fastapi import FastAPI
4
  from fastapi.middleware.cors import CORSMiddleware
5
 
6
  from .routes import v1_router
7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
  def create_app() -> FastAPI:
10
  """Create and configure the FastAPI application."""
@@ -31,6 +47,18 @@ def create_app() -> FastAPI:
31
  async def shutdown_event():
32
  """Run shutdown events."""
33
  pass
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
  return app
36
 
 
1
  """FastAPI application factory."""
2
 
3
+ from fastapi import FastAPI, WebSocket, WebSocketDisconnect
4
  from fastapi.middleware.cors import CORSMiddleware
5
 
6
  from .routes import v1_router
7
 
8
+ class ConnectionManager:
9
+ def __init__(self):
10
+ self.active_connections: List[WebSocket] = []
11
+
12
+ async def connect(self, websocket: WebSocket):
13
+ await websocket.accept()
14
+ self.active_connections.append(websocket)
15
+
16
+ def disconnect(self, websocket: WebSocket):
17
+ self.active_connections.remove(websocket)
18
+
19
+ async def broadcast(self, message: str):
20
+ for connection in self.active_connections:
21
+ await connection.send_text(message)
22
+
23
+ manager = ConnectionManager()
24
 
25
  def create_app() -> FastAPI:
26
  """Create and configure the FastAPI application."""
 
47
  async def shutdown_event():
48
  """Run shutdown events."""
49
  pass
50
+
51
+ @app.websocket("/ws")
52
+ async def websocket_endpoint(websocket: WebSocket):
53
+ await manager.connect(websocket)
54
+ try:
55
+ while True:
56
+ data = await websocket.receive_text()
57
+ print(f"Received: {data}")
58
+ await manager.broadcast(f"Message from client: {data}")
59
+ except WebSocketDisconnect:
60
+ manager.disconnect(websocket)
61
+ await manager.broadcast("A client disconnected")
62
 
63
  return app
64
 
src/modules/api/index.py DELETED
@@ -1,21 +0,0 @@
1
- import uvicorn
2
- from .app import app as api_app
3
- import logging
4
-
5
-
6
- def main():
7
- """Main application entry point."""
8
- try:
9
- print("Start api...")
10
- uvicorn.run(
11
- api_app,
12
- host="0.0.0.0",
13
- port=8080,
14
- reload=False,
15
- )
16
- except Exception as e:
17
- logging.error(f"Application failed to start: {e}", exc_info=True)
18
- raise
19
-
20
-
21
- main()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/modules/api/routes/v1.py CHANGED
@@ -2,7 +2,7 @@
2
 
3
  from fastapi import APIRouter
4
  from typing import Dict, List
5
-
6
 
7
  # Create v1 router
8
  router = APIRouter(prefix='/v1', tags=['v1'])
@@ -10,6 +10,7 @@ router = APIRouter(prefix='/v1', tags=['v1'])
10
  @router.get("/hello")
11
  async def hello_world() -> Dict[str, str]:
12
  """Hello world endpoint."""
 
13
  return {"message": "Hello, World!"}
14
 
15
  @router.get("/health")
 
2
 
3
  from fastapi import APIRouter
4
  from typing import Dict, List
5
+ from src.modules.transporter import publish_message
6
 
7
  # Create v1 router
8
  router = APIRouter(prefix='/v1', tags=['v1'])
 
10
  @router.get("/hello")
11
  async def hello_world() -> Dict[str, str]:
12
  """Hello world endpoint."""
13
+ publish_message("hello-python", "Hello from FastAPI!")
14
  return {"message": "Hello, World!"}
15
 
16
  @router.get("/health")
src/modules/transporter/__init__.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/utils/__init__.py
2
+
3
+ import os
4
+ import importlib
5
+ import pkgutil
6
+
7
+ # Lấy tên package hiện tại
8
+ __all__ = []
9
+
10
+ # Duyệt tất cả module con trong thư mục hiện tại (không đệ quy)
11
+ package_dir = os.path.dirname(__file__)
12
+ for _, module_name, is_pkg in pkgutil.iter_modules([package_dir]):
13
+ if not is_pkg and module_name != "__init__":
14
+ module = importlib.import_module(f".{module_name}", package=__name__)
15
+
16
+ # Nếu module có biến __all__, import những gì nó khai báo
17
+ if hasattr(module, "__all__"):
18
+ for attr in module.__all__:
19
+ globals()[attr] = getattr(module, attr)
20
+ __all__.append(attr)
21
+ else:
22
+ # Import tất cả public attributes (không bắt đầu bằng _)
23
+ for attr in dir(module):
24
+ if not attr.startswith("_"):
25
+ globals()[attr] = getattr(module, attr)
26
+ __all__.append(attr)
src/modules/transporter/rabbitmq.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pika
2
+ import threading
3
+
4
+ credentials = pika.PlainCredentials('guest', 'guest')
5
+ parameters = pika.ConnectionParameters(
6
+ host='ubuntu',
7
+ port=5672, # default RabbitMQ port
8
+ virtual_host='/', # default vhost, change if needed
9
+ credentials=credentials
10
+ )
11
+
12
+ def queue_listener(queue_name, host="localhost"):
13
+ def decorator(func):
14
+ def start_consumer():
15
+ connection = pika.BlockingConnection(parameters)
16
+ channel = connection.channel()
17
+ channel.queue_declare(queue=queue_name, durable=True)
18
+
19
+ def callback(ch, method, properties, body):
20
+ try:
21
+ func(body.decode()) # Gọi hàm xử lý
22
+ ch.basic_ack(delivery_tag=method.delivery_tag)
23
+ except Exception as e:
24
+ print(f"Error handling message: {e}")
25
+ ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
26
+
27
+ channel.basic_qos(prefetch_count=1)
28
+ channel.basic_consume(queue=queue_name, on_message_callback=callback)
29
+ print(f"[+] Listening on queue '{queue_name}'")
30
+ channel.start_consuming()
31
+
32
+ # chạy consumer trên thread riêng (không chặn main thread)
33
+ threading.Thread(target=start_consumer, daemon=True).start()
34
+ return func
35
+ return decorator
36
+
37
+ @queue_listener("hello-python")
38
+ def process_message(msg):
39
+ print(f"[x] Received: {msg}")
40
+
41
+
42
+ # publish a message to the queue
43
+ def publish_message(queue_name, message):
44
+ connection = pika.BlockingConnection(parameters)
45
+ channel = connection.channel()
46
+ # channel.queue_declare(queue=queue_name, durable=True)
47
+ channel.basic_publish(
48
+ exchange='',
49
+ routing_key=queue_name,
50
+ body=message,
51
+ properties=pika.BasicProperties(
52
+ delivery_mode=2, # make message persistent
53
+ )
54
+ )
55
+ print(f"[x] Sent: {message}")
56
+
57
+ publish_message("hello-python", "RabbitMQ is running!")