| | import os
|
| | import logging
|
| | from typing import List, Dict
|
| | import google.generativeai as genai
|
| | from pathlib import Path
|
| | import json
|
| | import asyncio
|
| | import random
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | class SourceMatcher:
|
| | def __init__(self, api_key: str = None, sources_root_dir: str = "public"):
|
| | """
|
| | Khởi tạo SourceMatcher
|
| |
|
| | Args:
|
| | api_key: Gemini API key (nếu không có sẽ lấy từ env)
|
| | sources_root_dir: Thư mục gốc chứa các source folder (mặc định: public)
|
| | """
|
| |
|
| | if api_key:
|
| | genai.configure(api_key=api_key)
|
| | else:
|
| |
|
| | api_key = os.getenv('GEMINI_API_KEY')
|
| | if api_key:
|
| | genai.configure(api_key=api_key)
|
| | self.model = genai.GenerativeModel('gemini-pro')
|
| | self.use_ai = True
|
| | logger.info("Gemini AI được kích hoạt")
|
| | else:
|
| | logger.warning("GEMINI_API_KEY không được cung cấp, sử dụng keyword matching đơn giản")
|
| | self.model = None
|
| | self.use_ai = False
|
| |
|
| | self.sources_root_dir = sources_root_dir
|
| | self.sources_cache = {}
|
| |
|
| |
|
| | os.makedirs(self.sources_root_dir, exist_ok=True)
|
| |
|
| |
|
| | self._scan_sources()
|
| |
|
| | def _scan_sources(self):
|
| | """
|
| | Scan tất cả các folder source có sẵn trong public
|
| | """
|
| | try:
|
| | self.sources_cache = {}
|
| |
|
| | if not os.path.exists(self.sources_root_dir):
|
| | logger.warning(f"Thư mục sources không tồn tại: {self.sources_root_dir}")
|
| | os.makedirs(self.sources_root_dir, exist_ok=True)
|
| | return
|
| |
|
| | logger.info(f"Đang scan folder: {self.sources_root_dir}")
|
| |
|
| | for item in os.listdir(self.sources_root_dir):
|
| | item_path = os.path.join(self.sources_root_dir, item)
|
| |
|
| | if os.path.isdir(item_path):
|
| |
|
| | media_files = []
|
| |
|
| |
|
| | media_folder = os.path.join(item_path, "media")
|
| | if os.path.exists(media_folder):
|
| | for file in os.listdir(media_folder):
|
| | file_path = os.path.join(media_folder, file)
|
| | if os.path.isfile(file_path) and self._is_media_file(file_path):
|
| | media_files.append(file_path)
|
| |
|
| |
|
| | for file in os.listdir(item_path):
|
| | file_path = os.path.join(item_path, file)
|
| | if os.path.isfile(file_path) and self._is_media_file(file_path):
|
| | media_files.append(file_path)
|
| |
|
| | if media_files:
|
| | self.sources_cache[item] = {
|
| | "folder_name": item,
|
| | "folder_path": item_path,
|
| | "media_files": media_files,
|
| | "media_count": len(media_files)
|
| | }
|
| | logger.info(f"✓ Tìm thấy source: {item} ({len(media_files)} media files)")
|
| | else:
|
| | logger.info(f"✗ Folder trống hoặc không có media: {item}")
|
| |
|
| | logger.info(f"Tổng cộng {len(self.sources_cache)} source folders có media")
|
| |
|
| |
|
| | if self.sources_cache:
|
| | logger.info("Chi tiết sources:")
|
| | for folder_name, data in self.sources_cache.items():
|
| | logger.info(f" - {folder_name}: {data['media_count']} files")
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Lỗi khi scan sources: {str(e)}")
|
| |
|
| | def _is_media_file(self, file_path: str) -> bool:
|
| | """
|
| | Kiểm tra xem file có phải là media không
|
| | """
|
| | media_extensions = [
|
| |
|
| | '.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.wmv', '.m4v',
|
| |
|
| | '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff', '.tif'
|
| | ]
|
| |
|
| | return any(file_path.lower().endswith(ext) for ext in media_extensions)
|
| |
|
| | async def find_matching_sources(
|
| | self,
|
| | keywords: List[str],
|
| | title: str = "",
|
| | max_sources: int = 15
|
| | ) -> List[str]:
|
| | """
|
| | Tìm các source media phù hợp với keywords
|
| |
|
| | Args:
|
| | keywords: Danh sách keywords
|
| | title: Tiêu đề video (optional)
|
| | max_sources: Số lượng source tối đa
|
| |
|
| | Returns:
|
| | Danh sách đường dẫn các file media phù hợp
|
| | """
|
| | try:
|
| | if not self.sources_cache:
|
| | logger.warning("Không có source nào available trong folder public")
|
| | return []
|
| |
|
| | logger.info(f"Tìm kiếm sources cho keywords: {keywords}")
|
| | logger.info(f"Title: {title}")
|
| |
|
| | if self.use_ai and self.model:
|
| |
|
| | selected_folders = await self._ai_matching(keywords, title, max_sources)
|
| | else:
|
| |
|
| | selected_folders = self._simple_keyword_matching(keywords, title)
|
| |
|
| |
|
| | selected_media_files = []
|
| |
|
| | for folder_info in selected_folders:
|
| | folder_name = folder_info.get('folder_name', '')
|
| |
|
| | if folder_name in self.sources_cache:
|
| | folder_data = self.sources_cache[folder_name]
|
| | folder_files = folder_data['media_files']
|
| |
|
| |
|
| | random.shuffle(folder_files)
|
| | selected_media_files.extend(folder_files)
|
| |
|
| | logger.info(
|
| | f"✓ Selected folder: {folder_name} "
|
| | f"(score: {folder_info.get('relevance_score', 'N/A'):.2f}) "
|
| | f"- {len(folder_files)} files"
|
| | )
|
| | else:
|
| | logger.warning(f"✗ Folder không tồn tại trong cache: {folder_name}")
|
| |
|
| |
|
| | random.shuffle(selected_media_files)
|
| |
|
| |
|
| | if len(selected_media_files) > max_sources * 5:
|
| | selected_media_files = selected_media_files[:max_sources * 5]
|
| |
|
| | logger.info(f"Tổng cộng {len(selected_media_files)} media files được chọn")
|
| |
|
| | return selected_media_files
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Lỗi khi tìm matching sources: {str(e)}")
|
| | return self.get_fallback_sources()
|
| |
|
| | async def _ai_matching(self, keywords: List[str], title: str, max_sources: int) -> List[Dict]:
|
| | """
|
| | Sử dụng Gemini AI để matching
|
| | """
|
| | try:
|
| | source_names = list(self.sources_cache.keys())
|
| |
|
| | prompt = f"""
|
| | Bạn là một AI chuyên phân tích và matching nội dung video cho người Việt Nam.
|
| |
|
| | NHIỆM VỤ: Tìm các folder source phù hợp nhất với keywords và title đã cho.
|
| |
|
| | THÔNG TIN INPUT:
|
| | - Title: "{title}"
|
| | - Keywords: {keywords}
|
| |
|
| | CÁC FOLDER SOURCE AVAILABLE trong thư mục public:
|
| | {json.dumps(source_names, ensure_ascii=False, indent=2)}
|
| |
|
| | YÊU CẦU:
|
| | 1. Phân tích semantic meaning của title và keywords (hỗ trợ tiếng Việt)
|
| | 2. So sánh với tên các folder source
|
| | 3. Chọn tối đa {max_sources} folder phù hợp nhất
|
| | 4. Sắp xếp theo độ phù hợp (cao nhất trước)
|
| | 5. Ưu tiên folder có tên chứa keywords hoặc có ý nghĩa tương tự
|
| | 6. Xem xét cả từ khóa tiếng Việt và tiếng Anh
|
| |
|
| | Trả về CHÍNH XÁC theo format JSON sau:
|
| |
|
| | {{
|
| | "analysis": "Phân tích ngắn gọn về sự phù hợp",
|
| | "selected_folders": [
|
| | {{
|
| | "folder_name": "tên_folder_chính_xác_từ_danh_sách",
|
| | "relevance_score": 0.95,
|
| | "reason": "Lý do chọn folder này"
|
| | }}
|
| | ]
|
| | }}
|
| |
|
| | LưU Ý:
|
| | - CHỈ chọn folder names CÓ TRONG danh sách available ở trên
|
| | - Relevance score từ 0.0 đến 1.0
|
| | - Không tạo ra tên folder mới
|
| | - Ưu tiên folders liên quan đến chủ đề
|
| | """
|
| |
|
| | logger.info("Đang phân tích sources với Gemini AI...")
|
| |
|
| |
|
| | response = self.model.generate_content(prompt)
|
| | response_text = response.text.strip()
|
| |
|
| | logger.info(f"Gemini response: {response_text[:200]}...")
|
| |
|
| |
|
| | try:
|
| |
|
| | start_idx = response_text.find('{')
|
| | end_idx = response_text.rfind('}') + 1
|
| |
|
| | if start_idx >= 0 and end_idx > start_idx:
|
| | json_str = response_text[start_idx:end_idx]
|
| | result = json.loads(json_str)
|
| | selected_folders = result.get('selected_folders', [])
|
| |
|
| | logger.info(f"Gemini analysis: {result.get('analysis', 'No analysis')}")
|
| | return selected_folders
|
| | else:
|
| | raise ValueError("Không tìm thấy JSON trong response")
|
| |
|
| | except (json.JSONDecodeError, ValueError) as e:
|
| | logger.error(f"Lỗi parse JSON từ Gemini: {e}")
|
| | logger.info("Fallback to simple keyword matching")
|
| | return self._simple_keyword_matching(keywords, title)
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Lỗi Gemini AI: {str(e)}")
|
| | return self._simple_keyword_matching(keywords, title)
|
| |
|
| | def _simple_keyword_matching(self, keywords: List[str], title: str) -> List[Dict]:
|
| | """
|
| | Fallback method: keyword matching đơn giản
|
| | """
|
| | logger.info("Sử dụng simple keyword matching...")
|
| |
|
| | all_text_lower = " ".join(keywords + [title]).lower()
|
| | matches = []
|
| |
|
| | for folder_name, folder_data in self.sources_cache.items():
|
| | folder_name_lower = folder_name.lower()
|
| |
|
| |
|
| | score = 0
|
| | match_reasons = []
|
| |
|
| |
|
| | for keyword in keywords:
|
| | keyword_lower = keyword.lower()
|
| | if keyword_lower in folder_name_lower:
|
| | score += 0.4
|
| | match_reasons.append(f"keyword '{keyword}'")
|
| |
|
| |
|
| | if title and title.lower() in folder_name_lower:
|
| | score += 0.3
|
| | match_reasons.append("title")
|
| |
|
| |
|
| | common_terms = {
|
| | 'sport': ['sport', 'thể thao', 'bóng đá', 'football'],
|
| | 'news': ['news', 'tin tức', 'báo'],
|
| | 'u23': ['u23', 'đội tuyển'],
|
| | 'vietnam': ['vietnam', 'việt nam', 'vn'],
|
| | 'music': ['music', 'nhạc', 'âm nhạc']
|
| | }
|
| |
|
| | for category, terms in common_terms.items():
|
| | for term in terms:
|
| | if term in all_text_lower and term in folder_name_lower:
|
| | score += 0.2
|
| | match_reasons.append(f"{category} term")
|
| | break
|
| |
|
| | if score > 0:
|
| | matches.append({
|
| | "folder_name": folder_name,
|
| | "relevance_score": min(score, 1.0),
|
| | "reason": f"Keyword matching: {', '.join(match_reasons)}"
|
| | })
|
| |
|
| |
|
| | matches.sort(key=lambda x: x['relevance_score'], reverse=True)
|
| |
|
| | logger.info(f"Simple matching found {len(matches)} relevant folders")
|
| | return matches[:10]
|
| |
|
| | def get_fallback_sources(self, max_files: int = 20) -> List[str]:
|
| | """
|
| | Lấy một số source ngẫu nhiên làm fallback
|
| | """
|
| | logger.info("Sử dụng fallback sources...")
|
| |
|
| | all_files = []
|
| | for folder_data in self.sources_cache.values():
|
| | all_files.extend(folder_data['media_files'])
|
| |
|
| | if all_files:
|
| | random.shuffle(all_files)
|
| | selected = all_files[:max_files]
|
| | logger.info(f"Fallback: chọn {len(selected)} files ngẫu nhiên")
|
| | return selected
|
| |
|
| | return []
|
| |
|
| | async def get_available_sources(self) -> Dict:
|
| | """
|
| | Lấy danh sách tất cả sources available
|
| | """
|
| | self._scan_sources()
|
| |
|
| | return {
|
| | "total_folders": len(self.sources_cache),
|
| | "folders": [
|
| | {
|
| | "name": folder_name,
|
| | "path": data["folder_path"],
|
| | "media_count": data["media_count"],
|
| | "sample_files": data["media_files"][:3]
|
| | }
|
| | for folder_name, data in self.sources_cache.items()
|
| | ]
|
| | }
|
| |
|
| | def refresh_sources(self):
|
| | """
|
| | Refresh danh sách sources
|
| | """
|
| | logger.info("Refreshing sources cache...")
|
| | self._scan_sources()
|
| | return len(self.sources_cache) |