Chatopus / app /channel_manager.py
VietCat's picture
refactor channel flow
704e2f8
from __future__ import annotations
from typing import Dict, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from .llm import LLMClient
from .sheets import SheetsClient
from .supabase_db import SupabaseClient
from .embedding import EmbeddingClient
from .reranker import Reranker
from .channel import Channel
class ChannelManager:
"""
Quản lý tất cả các channel (ví dụ: các Page Facebook) đang hoạt động.
Lưu trữ các client dùng chung để truyền cho các channel, tối ưu hóa tài nguyên.
"""
def __init__(self):
self.channels: Dict[str, Channel] = {}
# Khởi tạo các client dùng chung là None, sẽ được gán từ main.py
self.llm_client: Optional[LLMClient] = None
self.sheets_client: Optional[SheetsClient] = None
self.supabase_client: Optional[SupabaseClient] = None
self.embedding_client: Optional[EmbeddingClient] = None
self.reranker: Optional[Reranker] = None
def set_shared_clients(
self,
llm_client: LLMClient,
sheets_client: SheetsClient,
supabase_client: SupabaseClient,
embedding_client: EmbeddingClient,
reranker: Reranker,
):
"""Gán các client được chia sẻ từ main.py."""
self.llm_client = llm_client
self.sheets_client = sheets_client
self.supabase_client = supabase_client
self.embedding_client = embedding_client
self.reranker = reranker
def get_or_create_channel(self, platform: str, page_id: str) -> Channel:
"""Lấy hoặc tạo một channel mới và truyền các client dùng chung vào."""
channel_key = f"{platform}:{page_id}"
if channel_key not in self.channels:
# Kiểm tra để đảm bảo các client đã được gán
if not all([self.llm_client, self.sheets_client, self.supabase_client, self.embedding_client, self.reranker]):
raise RuntimeError("Một hoặc nhiều client dùng chung chưa được thiết lập cho ChannelManager.")
from .channel import Channel
self.channels[channel_key] = Channel(platform=platform, page_id=page_id, llm_client=self.llm_client, sheets_client=self.sheets_client, supabase_client=self.supabase_client, embedding_client=self.embedding_client, reranker=self.reranker)
return self.channels[channel_key]
# Khởi tạo instance singleton để sử dụng trong toàn bộ ứng dụng
channel_manager = ChannelManager()