|
|
from __future__ import annotations |
|
|
from typing import Dict, TYPE_CHECKING |
|
|
|
|
|
from loguru import logger |
|
|
|
|
|
from .config import get_settings |
|
|
from .message_processor import MessageProcessor |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from .llm import LLMClient |
|
|
from .sheets import SheetsClient |
|
|
from .supabase_db import SupabaseClient |
|
|
from .embedding import EmbeddingClient |
|
|
from .reranker import Reranker |
|
|
|
|
|
|
|
|
class Channel: |
|
|
""" |
|
|
Đại diện cho một kênh giao tiếp độc lập (ví dụ: một Page Facebook). |
|
|
Mỗi channel quản lý nhiều cuộc hội thoại (conversations) với người dùng. |
|
|
Nó sử dụng các client được chia sẻ từ ChannelManager. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
platform: str, |
|
|
page_id: str, |
|
|
llm_client: LLMClient, |
|
|
sheets_client: SheetsClient, |
|
|
supabase_client: SupabaseClient, |
|
|
embedding_client: EmbeddingClient, |
|
|
reranker: Reranker, |
|
|
): |
|
|
self.platform = platform |
|
|
self.page_id = page_id |
|
|
|
|
|
|
|
|
self.llm = llm_client |
|
|
self.sheets = sheets_client |
|
|
self.supabase = supabase_client |
|
|
self.embedder = embedding_client |
|
|
self.reranker = reranker |
|
|
|
|
|
self.conversations: Dict[str, MessageProcessor] = {} |
|
|
self._page_token: str | None = None |
|
|
|
|
|
def get_or_create_conversation(self, sender_id: str) -> MessageProcessor: |
|
|
"""Lấy hoặc tạo một cuộc hội thoại mới (MessageProcessor) cho một người dùng.""" |
|
|
if sender_id not in self.conversations: |
|
|
|
|
|
self.conversations[sender_id] = MessageProcessor(self, sender_id) |
|
|
return self.conversations[sender_id] |
|
|
|
|
|
def get_sheets_client(self) -> SheetsClient: |
|
|
"""Trả về instance của SheetsClient cho channel này.""" |
|
|
return self.sheets |
|
|
|
|
|
def get_page_token(self, force_refresh: bool = False) -> str | None: |
|
|
"""Lấy page access token cho channel này.""" |
|
|
|
|
|
if not self._page_token or force_refresh: |
|
|
logger.info( |
|
|
f"Cache miss for page token. Fetching from Supabase for page_id: {self.page_id}" |
|
|
) |
|
|
self._page_token = self.supabase.get_page_token(self.page_id) |
|
|
if not self._page_token: |
|
|
logger.warning( |
|
|
f"Không tìm thấy page token cho page_id: {self.page_id} trong Supabase." |
|
|
) |
|
|
return self._page_token |
|
|
|
|
|
def invalidate_page_token(self): |
|
|
"""Vô hiệu hóa page token hiện tại để buộc làm mới.""" |
|
|
self._page_token = None |
|
|
|