Chatopus / app /channel.py
VietCat's picture
refactor channel flow
c56dd72
from __future__ import annotations
from typing import Dict, TYPE_CHECKING
from loguru import logger
from .config import get_settings
from .message_processor import MessageProcessor
if TYPE_CHECKING:
from .llm import LLMClient
from .sheets import SheetsClient
from .supabase_db import SupabaseClient
from .embedding import EmbeddingClient
from .reranker import Reranker
class Channel:
"""
Đại diện cho một kênh giao tiếp độc lập (ví dụ: một Page Facebook).
Mỗi channel quản lý nhiều cuộc hội thoại (conversations) với người dùng.
Nó sử dụng các client được chia sẻ từ ChannelManager.
"""
def __init__(
self,
platform: str,
page_id: str,
llm_client: LLMClient,
sheets_client: SheetsClient,
supabase_client: SupabaseClient,
embedding_client: EmbeddingClient,
reranker: Reranker,
):
self.platform = platform
self.page_id = page_id
# Gán các client được chia sẻ từ ChannelManager
self.llm = llm_client
self.sheets = sheets_client
self.supabase = supabase_client
self.embedder = embedding_client
self.reranker = reranker
self.conversations: Dict[str, MessageProcessor] = {}
self._page_token: str | None = None
def get_or_create_conversation(self, sender_id: str) -> MessageProcessor:
"""Lấy hoặc tạo một cuộc hội thoại mới (MessageProcessor) cho một người dùng."""
if sender_id not in self.conversations:
# MessageProcessor sẽ sử dụng các client của Channel (self)
self.conversations[sender_id] = MessageProcessor(self, sender_id)
return self.conversations[sender_id]
def get_sheets_client(self) -> SheetsClient:
"""Trả về instance của SheetsClient cho channel này."""
return self.sheets
def get_page_token(self, force_refresh: bool = False) -> str | None:
"""Lấy page access token cho channel này."""
# Sửa lỗi: Khôi phục lại logic lấy token từ Supabase thay vì từ config.
if not self._page_token or force_refresh:
logger.info(
f"Cache miss for page token. Fetching from Supabase for page_id: {self.page_id}"
)
self._page_token = self.supabase.get_page_token(self.page_id)
if not self._page_token:
logger.warning(
f"Không tìm thấy page token cho page_id: {self.page_id} trong Supabase."
)
return self._page_token
def invalidate_page_token(self):
"""Vô hiệu hóa page token hiện tại để buộc làm mới."""
self._page_token = None