import os import logging from typing import List, Dict import google.generativeai as genai from pathlib import Path import json import asyncio import random logger = logging.getLogger(__name__) class SourceMatcher: def __init__(self, api_key: str = None, sources_root_dir: str = "public"): """ Khởi tạo SourceMatcher Args: api_key: Gemini API key (nếu không có sẽ lấy từ env) sources_root_dir: Thư mục gốc chứa các source folder (mặc định: public) """ # Cấu hình Gemini AI if api_key: genai.configure(api_key=api_key) else: # Lấy từ environment variable api_key = os.getenv('GEMINI_API_KEY') if api_key: genai.configure(api_key=api_key) self.model = genai.GenerativeModel('gemini-pro') self.use_ai = True logger.info("Gemini AI được kích hoạt") else: logger.warning("GEMINI_API_KEY không được cung cấp, sử dụng keyword matching đơn giản") self.model = None self.use_ai = False self.sources_root_dir = sources_root_dir self.sources_cache = {} # Tạo folder public nếu chưa có os.makedirs(self.sources_root_dir, exist_ok=True) # Scan available sources self._scan_sources() def _scan_sources(self): """ Scan tất cả các folder source có sẵn trong public """ try: self.sources_cache = {} if not os.path.exists(self.sources_root_dir): logger.warning(f"Thư mục sources không tồn tại: {self.sources_root_dir}") os.makedirs(self.sources_root_dir, exist_ok=True) return logger.info(f"Đang scan folder: {self.sources_root_dir}") for item in os.listdir(self.sources_root_dir): item_path = os.path.join(self.sources_root_dir, item) if os.path.isdir(item_path): # Tìm các file media trong folder media_files = [] # Kiểm tra subfolder 'media' media_folder = os.path.join(item_path, "media") if os.path.exists(media_folder): for file in os.listdir(media_folder): file_path = os.path.join(media_folder, file) if os.path.isfile(file_path) and self._is_media_file(file_path): media_files.append(file_path) # Tìm media files trực tiếp trong folder chính for file in os.listdir(item_path): file_path = os.path.join(item_path, file) if os.path.isfile(file_path) and self._is_media_file(file_path): media_files.append(file_path) if media_files: self.sources_cache[item] = { "folder_name": item, "folder_path": item_path, "media_files": media_files, "media_count": len(media_files) } logger.info(f"✓ Tìm thấy source: {item} ({len(media_files)} media files)") else: logger.info(f"✗ Folder trống hoặc không có media: {item}") logger.info(f"Tổng cộng {len(self.sources_cache)} source folders có media") # Log chi tiết các folder được tìm thấy if self.sources_cache: logger.info("Chi tiết sources:") for folder_name, data in self.sources_cache.items(): logger.info(f" - {folder_name}: {data['media_count']} files") except Exception as e: logger.error(f"Lỗi khi scan sources: {str(e)}") def _is_media_file(self, file_path: str) -> bool: """ Kiểm tra xem file có phải là media không """ media_extensions = [ # Video formats '.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.wmv', '.m4v', # Image formats '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff', '.tif' ] return any(file_path.lower().endswith(ext) for ext in media_extensions) async def find_matching_sources( self, keywords: List[str], title: str = "", max_sources: int = 15 ) -> List[str]: """ Tìm các source media phù hợp với keywords Args: keywords: Danh sách keywords title: Tiêu đề video (optional) max_sources: Số lượng source tối đa Returns: Danh sách đường dẫn các file media phù hợp """ try: if not self.sources_cache: logger.warning("Không có source nào available trong folder public") return [] logger.info(f"Tìm kiếm sources cho keywords: {keywords}") logger.info(f"Title: {title}") if self.use_ai and self.model: # Sử dụng Gemini AI selected_folders = await self._ai_matching(keywords, title, max_sources) else: # Sử dụng keyword matching đơn giản selected_folders = self._simple_keyword_matching(keywords, title) # Lấy media files từ các folder đã chọn selected_media_files = [] for folder_info in selected_folders: folder_name = folder_info.get('folder_name', '') if folder_name in self.sources_cache: folder_data = self.sources_cache[folder_name] folder_files = folder_data['media_files'] # Shuffle để tạo sự ngẫu nhiên random.shuffle(folder_files) selected_media_files.extend(folder_files) logger.info( f"✓ Selected folder: {folder_name} " f"(score: {folder_info.get('relevance_score', 'N/A'):.2f}) " f"- {len(folder_files)} files" ) else: logger.warning(f"✗ Folder không tồn tại trong cache: {folder_name}") # Shuffle toàn bộ danh sách để tạo sự đa dạng random.shuffle(selected_media_files) # Giới hạn số lượng files if len(selected_media_files) > max_sources * 5: selected_media_files = selected_media_files[:max_sources * 5] logger.info(f"Tổng cộng {len(selected_media_files)} media files được chọn") return selected_media_files except Exception as e: logger.error(f"Lỗi khi tìm matching sources: {str(e)}") return self.get_fallback_sources() async def _ai_matching(self, keywords: List[str], title: str, max_sources: int) -> List[Dict]: """ Sử dụng Gemini AI để matching """ try: source_names = list(self.sources_cache.keys()) prompt = f""" Bạn là một AI chuyên phân tích và matching nội dung video cho người Việt Nam. NHIỆM VỤ: Tìm các folder source phù hợp nhất với keywords và title đã cho. THÔNG TIN INPUT: - Title: "{title}" - Keywords: {keywords} CÁC FOLDER SOURCE AVAILABLE trong thư mục public: {json.dumps(source_names, ensure_ascii=False, indent=2)} YÊU CẦU: 1. Phân tích semantic meaning của title và keywords (hỗ trợ tiếng Việt) 2. So sánh với tên các folder source 3. Chọn tối đa {max_sources} folder phù hợp nhất 4. Sắp xếp theo độ phù hợp (cao nhất trước) 5. Ưu tiên folder có tên chứa keywords hoặc có ý nghĩa tương tự 6. Xem xét cả từ khóa tiếng Việt và tiếng Anh Trả về CHÍNH XÁC theo format JSON sau: {{ "analysis": "Phân tích ngắn gọn về sự phù hợp", "selected_folders": [ {{ "folder_name": "tên_folder_chính_xác_từ_danh_sách", "relevance_score": 0.95, "reason": "Lý do chọn folder này" }} ] }} LưU Ý: - CHỈ chọn folder names CÓ TRONG danh sách available ở trên - Relevance score từ 0.0 đến 1.0 - Không tạo ra tên folder mới - Ưu tiên folders liên quan đến chủ đề """ logger.info("Đang phân tích sources với Gemini AI...") # Gọi Gemini API response = self.model.generate_content(prompt) response_text = response.text.strip() logger.info(f"Gemini response: {response_text[:200]}...") # Parse JSON response try: # Tìm và extract JSON từ response start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx >= 0 and end_idx > start_idx: json_str = response_text[start_idx:end_idx] result = json.loads(json_str) selected_folders = result.get('selected_folders', []) logger.info(f"Gemini analysis: {result.get('analysis', 'No analysis')}") return selected_folders else: raise ValueError("Không tìm thấy JSON trong response") except (json.JSONDecodeError, ValueError) as e: logger.error(f"Lỗi parse JSON từ Gemini: {e}") logger.info("Fallback to simple keyword matching") return self._simple_keyword_matching(keywords, title) except Exception as e: logger.error(f"Lỗi Gemini AI: {str(e)}") return self._simple_keyword_matching(keywords, title) def _simple_keyword_matching(self, keywords: List[str], title: str) -> List[Dict]: """ Fallback method: keyword matching đơn giản """ logger.info("Sử dụng simple keyword matching...") all_text_lower = " ".join(keywords + [title]).lower() matches = [] for folder_name, folder_data in self.sources_cache.items(): folder_name_lower = folder_name.lower() # Tính score dựa trên số keywords match score = 0 match_reasons = [] # Kiểm tra từng keyword for keyword in keywords: keyword_lower = keyword.lower() if keyword_lower in folder_name_lower: score += 0.4 match_reasons.append(f"keyword '{keyword}'") # Kiểm tra title if title and title.lower() in folder_name_lower: score += 0.3 match_reasons.append("title") # Kiểm tra các từ phổ biến common_terms = { 'sport': ['sport', 'thể thao', 'bóng đá', 'football'], 'news': ['news', 'tin tức', 'báo'], 'u23': ['u23', 'đội tuyển'], 'vietnam': ['vietnam', 'việt nam', 'vn'], 'music': ['music', 'nhạc', 'âm nhạc'] } for category, terms in common_terms.items(): for term in terms: if term in all_text_lower and term in folder_name_lower: score += 0.2 match_reasons.append(f"{category} term") break if score > 0: matches.append({ "folder_name": folder_name, "relevance_score": min(score, 1.0), "reason": f"Keyword matching: {', '.join(match_reasons)}" }) # Sắp xếp theo score matches.sort(key=lambda x: x['relevance_score'], reverse=True) logger.info(f"Simple matching found {len(matches)} relevant folders") return matches[:10] # Top 10 def get_fallback_sources(self, max_files: int = 20) -> List[str]: """ Lấy một số source ngẫu nhiên làm fallback """ logger.info("Sử dụng fallback sources...") all_files = [] for folder_data in self.sources_cache.values(): all_files.extend(folder_data['media_files']) if all_files: random.shuffle(all_files) selected = all_files[:max_files] logger.info(f"Fallback: chọn {len(selected)} files ngẫu nhiên") return selected return [] async def get_available_sources(self) -> Dict: """ Lấy danh sách tất cả sources available """ self._scan_sources() # Refresh cache return { "total_folders": len(self.sources_cache), "folders": [ { "name": folder_name, "path": data["folder_path"], "media_count": data["media_count"], "sample_files": data["media_files"][:3] # 3 files đầu làm sample } for folder_name, data in self.sources_cache.items() ] } def refresh_sources(self): """ Refresh danh sách sources """ logger.info("Refreshing sources cache...") self._scan_sources() return len(self.sources_cache)