VietCat commited on
Commit
704e2f8
·
1 Parent(s): d86fb66

refactor channel flow

Browse files
Files changed (3) hide show
  1. app/channel.py +71 -0
  2. app/channel_manager.py +49 -8
  3. app/main.py +13 -1
app/channel.py ADDED
@@ -0,0 +1,71 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+ from typing import Dict, TYPE_CHECKING
3
+
4
+ from loguru import logger
5
+
6
+ from .config import get_settings
7
+ from .message_processor import MessageProcessor
8
+
9
+ if TYPE_CHECKING:
10
+ from .llm import LLMClient
11
+ from .sheets import SheetsClient
12
+ from .supabase_db import SupabaseClient
13
+ from .embedding import EmbeddingClient
14
+ from .reranker import Reranker
15
+
16
+
17
+ class Channel:
18
+ """
19
+ Đại diện cho một kênh giao tiếp độc lập (ví dụ: một Page Facebook).
20
+ Mỗi channel quản lý nhiều cuộc hội thoại (conversations) với người dùng.
21
+ Nó sử dụng các client được chia sẻ từ ChannelManager.
22
+ """
23
+
24
+ def __init__(
25
+ self,
26
+ platform: str,
27
+ page_id: str,
28
+ llm_client: LLMClient,
29
+ sheets_client: SheetsClient,
30
+ supabase_client: SupabaseClient,
31
+ embedding_client: EmbeddingClient,
32
+ reranker: Reranker,
33
+ ):
34
+ self.platform = platform
35
+ self.page_id = page_id
36
+
37
+ # Gán các client được chia sẻ từ ChannelManager
38
+ self.llm = llm_client
39
+ self.sheets = sheets_client
40
+ self.supabase = supabase_client
41
+ self.embedder = embedding_client
42
+ self.reranker = reranker
43
+
44
+ self.conversations: Dict[str, MessageProcessor] = {}
45
+ self._page_token: str | None = None
46
+
47
+ def get_or_create_conversation(self, sender_id: str) -> MessageProcessor:
48
+ """Lấy hoặc tạo một cuộc hội thoại mới (MessageProcessor) cho một người dùng."""
49
+ if sender_id not in self.conversations:
50
+ # MessageProcessor sẽ sử dụng các client của Channel (self)
51
+ self.conversations[sender_id] = MessageProcessor(self, sender_id)
52
+ return self.conversations[sender_id]
53
+
54
+ def get_sheets_client(self) -> SheetsClient:
55
+ """Trả về instance của SheetsClient cho channel này."""
56
+ return self.sheets
57
+
58
+ def get_page_token(self, force_refresh: bool = False) -> str | None:
59
+ """Lấy page access token cho channel này."""
60
+ if not self._page_token or force_refresh:
61
+ page_tokens = get_settings().facebook_page_tokens
62
+ self._page_token = page_tokens.get(self.page_id) if page_tokens else None
63
+ if not self._page_token:
64
+ logger.warning(
65
+ f"Không tìm thấy page token cho page_id: {self.page_id} trong config."
66
+ )
67
+ return self._page_token
68
+
69
+ def invalidate_page_token(self):
70
+ """Vô hiệu hóa page token hiện tại để buộc làm mới."""
71
+ self._page_token = None
app/channel_manager.py CHANGED
@@ -1,13 +1,54 @@
1
- from app.chat_channel import ChatChannel
 
 
 
 
 
 
 
 
 
2
 
3
  class ChannelManager:
 
 
 
 
4
  def __init__(self):
5
- self.channels = {} # {(channel_type, page_id): ChatChannel}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
- def get_or_create_channel(self, channel_type: str, page_id: str) -> ChatChannel:
8
- key = (channel_type, page_id)
9
- if key not in self.channels:
10
- self.channels[key] = ChatChannel(page_id, channel_type)
11
- return self.channels[key]
12
 
13
- channel_manager = ChannelManager()
 
 
1
+ from __future__ import annotations
2
+ from typing import Dict, Optional, TYPE_CHECKING
3
+
4
+ if TYPE_CHECKING:
5
+ from .llm import LLMClient
6
+ from .sheets import SheetsClient
7
+ from .supabase_db import SupabaseClient
8
+ from .embedding import EmbeddingClient
9
+ from .reranker import Reranker
10
+ from .channel import Channel
11
 
12
  class ChannelManager:
13
+ """
14
+ Quản lý tất cả các channel (ví dụ: các Page Facebook) đang hoạt động.
15
+ Lưu trữ các client dùng chung để truyền cho các channel, tối ưu hóa tài nguyên.
16
+ """
17
  def __init__(self):
18
+ self.channels: Dict[str, Channel] = {}
19
+ # Khởi tạo các client dùng chung là None, sẽ được gán từ main.py
20
+ self.llm_client: Optional[LLMClient] = None
21
+ self.sheets_client: Optional[SheetsClient] = None
22
+ self.supabase_client: Optional[SupabaseClient] = None
23
+ self.embedding_client: Optional[EmbeddingClient] = None
24
+ self.reranker: Optional[Reranker] = None
25
+
26
+ def set_shared_clients(
27
+ self,
28
+ llm_client: LLMClient,
29
+ sheets_client: SheetsClient,
30
+ supabase_client: SupabaseClient,
31
+ embedding_client: EmbeddingClient,
32
+ reranker: Reranker,
33
+ ):
34
+ """Gán các client được chia sẻ từ main.py."""
35
+ self.llm_client = llm_client
36
+ self.sheets_client = sheets_client
37
+ self.supabase_client = supabase_client
38
+ self.embedding_client = embedding_client
39
+ self.reranker = reranker
40
+
41
+ def get_or_create_channel(self, platform: str, page_id: str) -> Channel:
42
+ """Lấy hoặc tạo một channel mới và truyền các client dùng chung vào."""
43
+ channel_key = f"{platform}:{page_id}"
44
+ if channel_key not in self.channels:
45
+ # Kiểm tra để đảm bảo các client đã được gán
46
+ if not all([self.llm_client, self.sheets_client, self.supabase_client, self.embedding_client, self.reranker]):
47
+ raise RuntimeError("Một hoặc nhiều client dùng chung chưa được thiết lập cho ChannelManager.")
48
 
49
+ from .channel import Channel
50
+ self.channels[channel_key] = Channel(platform=platform, page_id=page_id, llm_client=self.llm_client, sheets_client=self.sheets_client, supabase_client=self.supabase_client, embedding_client=self.embedding_client, reranker=self.reranker)
51
+ return self.channels[channel_key]
 
 
52
 
53
+ # Khởi tạo instance singleton để sử dụng trong toàn bộ ứng dụng
54
+ channel_manager = ChannelManager()
app/main.py CHANGED
@@ -88,12 +88,24 @@ llm_client = create_llm_client(
88
  ),
89
  )
90
 
91
- reranker = Reranker()
92
 
93
  # Khởi tạo LawDocumentChunker
94
  law_chunker = LawDocumentChunker()
95
  law_chunker.llm_client = llm_client
96
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  logger.info("[STARTUP] Mount health router...")
98
  app.include_router(health_router)
99
 
 
88
  ),
89
  )
90
 
91
+ reranker = Reranker() # Khởi tạo Reranker
92
 
93
  # Khởi tạo LawDocumentChunker
94
  law_chunker = LawDocumentChunker()
95
  law_chunker.llm_client = llm_client
96
 
97
+ # Gán các client dùng chung cho ChannelManager.
98
+ # Đây là một bước refactor quan trọng theo nguyên tắc Dependency Injection.
99
+ # Thay vì mỗi Channel tự tạo client, chúng ta tạo một lần và chia sẻ,
100
+ # giúp tối ưu tài nguyên và làm cho code dễ quản lý, dễ test hơn.
101
+ channel_manager.set_shared_clients(
102
+ llm_client=llm_client,
103
+ sheets_client=sheets_client,
104
+ supabase_client=supabase_client,
105
+ embedding_client=embedding_client,
106
+ reranker=reranker,
107
+ )
108
+
109
  logger.info("[STARTUP] Mount health router...")
110
  app.include_router(health_router)
111