be1z / source_matcher.py
TDN-M's picture
Upload 7 files
43b61a2 verified
import os
import logging
from typing import List, Dict
import google.generativeai as genai
from pathlib import Path
import json
import asyncio
import random
logger = logging.getLogger(__name__)
class SourceMatcher:
def __init__(self, api_key: str = None, sources_root_dir: str = "public"):
"""
Khởi tạo SourceMatcher
Args:
api_key: Gemini API key (nếu không có sẽ lấy từ env)
sources_root_dir: Thư mục gốc chứa các source folder (mặc định: public)
"""
# Cấu hình Gemini AI
if api_key:
genai.configure(api_key=api_key)
else:
# Lấy từ environment variable
api_key = os.getenv('GEMINI_API_KEY')
if api_key:
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-pro')
self.use_ai = True
logger.info("Gemini AI được kích hoạt")
else:
logger.warning("GEMINI_API_KEY không được cung cấp, sử dụng keyword matching đơn giản")
self.model = None
self.use_ai = False
self.sources_root_dir = sources_root_dir
self.sources_cache = {}
# Tạo folder public nếu chưa có
os.makedirs(self.sources_root_dir, exist_ok=True)
# Scan available sources
self._scan_sources()
def _scan_sources(self):
"""
Scan tất cả các folder source có sẵn trong public
"""
try:
self.sources_cache = {}
if not os.path.exists(self.sources_root_dir):
logger.warning(f"Thư mục sources không tồn tại: {self.sources_root_dir}")
os.makedirs(self.sources_root_dir, exist_ok=True)
return
logger.info(f"Đang scan folder: {self.sources_root_dir}")
for item in os.listdir(self.sources_root_dir):
item_path = os.path.join(self.sources_root_dir, item)
if os.path.isdir(item_path):
# Tìm các file media trong folder
media_files = []
# Kiểm tra subfolder 'media'
media_folder = os.path.join(item_path, "media")
if os.path.exists(media_folder):
for file in os.listdir(media_folder):
file_path = os.path.join(media_folder, file)
if os.path.isfile(file_path) and self._is_media_file(file_path):
media_files.append(file_path)
# Tìm media files trực tiếp trong folder chính
for file in os.listdir(item_path):
file_path = os.path.join(item_path, file)
if os.path.isfile(file_path) and self._is_media_file(file_path):
media_files.append(file_path)
if media_files:
self.sources_cache[item] = {
"folder_name": item,
"folder_path": item_path,
"media_files": media_files,
"media_count": len(media_files)
}
logger.info(f"✓ Tìm thấy source: {item} ({len(media_files)} media files)")
else:
logger.info(f"✗ Folder trống hoặc không có media: {item}")
logger.info(f"Tổng cộng {len(self.sources_cache)} source folders có media")
# Log chi tiết các folder được tìm thấy
if self.sources_cache:
logger.info("Chi tiết sources:")
for folder_name, data in self.sources_cache.items():
logger.info(f" - {folder_name}: {data['media_count']} files")
except Exception as e:
logger.error(f"Lỗi khi scan sources: {str(e)}")
def _is_media_file(self, file_path: str) -> bool:
"""
Kiểm tra xem file có phải là media không
"""
media_extensions = [
# Video formats
'.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.wmv', '.m4v',
# Image formats
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff', '.tif'
]
return any(file_path.lower().endswith(ext) for ext in media_extensions)
async def find_matching_sources(
self,
keywords: List[str],
title: str = "",
max_sources: int = 15
) -> List[str]:
"""
Tìm các source media phù hợp với keywords
Args:
keywords: Danh sách keywords
title: Tiêu đề video (optional)
max_sources: Số lượng source tối đa
Returns:
Danh sách đường dẫn các file media phù hợp
"""
try:
if not self.sources_cache:
logger.warning("Không có source nào available trong folder public")
return []
logger.info(f"Tìm kiếm sources cho keywords: {keywords}")
logger.info(f"Title: {title}")
if self.use_ai and self.model:
# Sử dụng Gemini AI
selected_folders = await self._ai_matching(keywords, title, max_sources)
else:
# Sử dụng keyword matching đơn giản
selected_folders = self._simple_keyword_matching(keywords, title)
# Lấy media files từ các folder đã chọn
selected_media_files = []
for folder_info in selected_folders:
folder_name = folder_info.get('folder_name', '')
if folder_name in self.sources_cache:
folder_data = self.sources_cache[folder_name]
folder_files = folder_data['media_files']
# Shuffle để tạo sự ngẫu nhiên
random.shuffle(folder_files)
selected_media_files.extend(folder_files)
logger.info(
f"✓ Selected folder: {folder_name} "
f"(score: {folder_info.get('relevance_score', 'N/A'):.2f}) "
f"- {len(folder_files)} files"
)
else:
logger.warning(f"✗ Folder không tồn tại trong cache: {folder_name}")
# Shuffle toàn bộ danh sách để tạo sự đa dạng
random.shuffle(selected_media_files)
# Giới hạn số lượng files
if len(selected_media_files) > max_sources * 5:
selected_media_files = selected_media_files[:max_sources * 5]
logger.info(f"Tổng cộng {len(selected_media_files)} media files được chọn")
return selected_media_files
except Exception as e:
logger.error(f"Lỗi khi tìm matching sources: {str(e)}")
return self.get_fallback_sources()
async def _ai_matching(self, keywords: List[str], title: str, max_sources: int) -> List[Dict]:
"""
Sử dụng Gemini AI để matching
"""
try:
source_names = list(self.sources_cache.keys())
prompt = f"""
Bạn là một AI chuyên phân tích và matching nội dung video cho người Việt Nam.
NHIỆM VỤ: Tìm các folder source phù hợp nhất với keywords và title đã cho.
THÔNG TIN INPUT:
- Title: "{title}"
- Keywords: {keywords}
CÁC FOLDER SOURCE AVAILABLE trong thư mục public:
{json.dumps(source_names, ensure_ascii=False, indent=2)}
YÊU CẦU:
1. Phân tích semantic meaning của title và keywords (hỗ trợ tiếng Việt)
2. So sánh với tên các folder source
3. Chọn tối đa {max_sources} folder phù hợp nhất
4. Sắp xếp theo độ phù hợp (cao nhất trước)
5. Ưu tiên folder có tên chứa keywords hoặc có ý nghĩa tương tự
6. Xem xét cả từ khóa tiếng Việt và tiếng Anh
Trả về CHÍNH XÁC theo format JSON sau:
{{
"analysis": "Phân tích ngắn gọn về sự phù hợp",
"selected_folders": [
{{
"folder_name": "tên_folder_chính_xác_từ_danh_sách",
"relevance_score": 0.95,
"reason": "Lý do chọn folder này"
}}
]
}}
LưU Ý:
- CHỈ chọn folder names CÓ TRONG danh sách available ở trên
- Relevance score từ 0.0 đến 1.0
- Không tạo ra tên folder mới
- Ưu tiên folders liên quan đến chủ đề
"""
logger.info("Đang phân tích sources với Gemini AI...")
# Gọi Gemini API
response = self.model.generate_content(prompt)
response_text = response.text.strip()
logger.info(f"Gemini response: {response_text[:200]}...")
# Parse JSON response
try:
# Tìm và extract JSON từ response
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx >= 0 and end_idx > start_idx:
json_str = response_text[start_idx:end_idx]
result = json.loads(json_str)
selected_folders = result.get('selected_folders', [])
logger.info(f"Gemini analysis: {result.get('analysis', 'No analysis')}")
return selected_folders
else:
raise ValueError("Không tìm thấy JSON trong response")
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Lỗi parse JSON từ Gemini: {e}")
logger.info("Fallback to simple keyword matching")
return self._simple_keyword_matching(keywords, title)
except Exception as e:
logger.error(f"Lỗi Gemini AI: {str(e)}")
return self._simple_keyword_matching(keywords, title)
def _simple_keyword_matching(self, keywords: List[str], title: str) -> List[Dict]:
"""
Fallback method: keyword matching đơn giản
"""
logger.info("Sử dụng simple keyword matching...")
all_text_lower = " ".join(keywords + [title]).lower()
matches = []
for folder_name, folder_data in self.sources_cache.items():
folder_name_lower = folder_name.lower()
# Tính score dựa trên số keywords match
score = 0
match_reasons = []
# Kiểm tra từng keyword
for keyword in keywords:
keyword_lower = keyword.lower()
if keyword_lower in folder_name_lower:
score += 0.4
match_reasons.append(f"keyword '{keyword}'")
# Kiểm tra title
if title and title.lower() in folder_name_lower:
score += 0.3
match_reasons.append("title")
# Kiểm tra các từ phổ biến
common_terms = {
'sport': ['sport', 'thể thao', 'bóng đá', 'football'],
'news': ['news', 'tin tức', 'báo'],
'u23': ['u23', 'đội tuyển'],
'vietnam': ['vietnam', 'việt nam', 'vn'],
'music': ['music', 'nhạc', 'âm nhạc']
}
for category, terms in common_terms.items():
for term in terms:
if term in all_text_lower and term in folder_name_lower:
score += 0.2
match_reasons.append(f"{category} term")
break
if score > 0:
matches.append({
"folder_name": folder_name,
"relevance_score": min(score, 1.0),
"reason": f"Keyword matching: {', '.join(match_reasons)}"
})
# Sắp xếp theo score
matches.sort(key=lambda x: x['relevance_score'], reverse=True)
logger.info(f"Simple matching found {len(matches)} relevant folders")
return matches[:10] # Top 10
def get_fallback_sources(self, max_files: int = 20) -> List[str]:
"""
Lấy một số source ngẫu nhiên làm fallback
"""
logger.info("Sử dụng fallback sources...")
all_files = []
for folder_data in self.sources_cache.values():
all_files.extend(folder_data['media_files'])
if all_files:
random.shuffle(all_files)
selected = all_files[:max_files]
logger.info(f"Fallback: chọn {len(selected)} files ngẫu nhiên")
return selected
return []
async def get_available_sources(self) -> Dict:
"""
Lấy danh sách tất cả sources available
"""
self._scan_sources() # Refresh cache
return {
"total_folders": len(self.sources_cache),
"folders": [
{
"name": folder_name,
"path": data["folder_path"],
"media_count": data["media_count"],
"sample_files": data["media_files"][:3] # 3 files đầu làm sample
}
for folder_name, data in self.sources_cache.items()
]
}
def refresh_sources(self):
"""
Refresh danh sách sources
"""
logger.info("Refreshing sources cache...")
self._scan_sources()
return len(self.sources_cache)