Vooo / app.py
TDN-M's picture
Upload 40 files
7ad1067 verified
import sys
import os
import uuid
import gradio as gr
from pathlib import Path
import time
import threading
from datetime import datetime
# Thêm đường dẫn để import các module cục bộ
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
# Import các module đã được tạo
from sports_news import get_sports_news_content
import tts_module
from gemini_news_parser_with_search import configure_gemini, process_news_data_for_podcast_with_search
# --- IMPORT MODULE VIDEO MỚI ---
from video_generator_module import generate_multiple_video_versions, DEFAULT_SAVE_DIR as VIDEO_SAVE_DIR
# =============================================================================
# KHỞI TẠO VÀ CẤU HÌNH (Đơn giản hóa)
# =============================================================================
# Các hằng số và thiết lập chung cho ứng dụng Gradio
MAX_ARTICLES = 10
SAVE_DIR = "generated_content"
Path(SAVE_DIR).mkdir(exist_ok=True)
# Thư mục lưu video được quản lý bởi module video
Path(VIDEO_SAVE_DIR).mkdir(exist_ok=True)
Path(os.path.join(SAVE_DIR, "images")).mkdir(exist_ok=True)
# Khởi tạo Gemini API
try:
configure_gemini()
GEMINI_AVAILABLE = True
print("✅ Gemini API đã được cấu hình thành công")
except Exception as e:
GEMINI_AVAILABLE = False
print(f"❌ Không thể khởi tạo Gemini API: {e}")
# =============================================================================
# UTILITY CLASSES FOR PROGRESS TRACKING
# =============================================================================
class ProgressTracker:
"""Class để theo dõi tiến trình và hiển thị progress"""
def __init__(self):
self.current_step = 0
self.total_steps = 0
self.current_message = ""
self.is_running = False
self.start_time = None
def start(self, total_steps, initial_message="Bắt đầu xử lý..."):
self.current_step = 0
self.total_steps = total_steps
self.current_message = initial_message
self.is_running = True
self.start_time = time.time()
def update(self, step_increment=1, message=""):
if self.is_running:
self.current_step += step_increment
if message:
self.current_message = message
def finish(self, final_message="Hoàn thành!"):
self.is_running = False
self.current_message = final_message
self.current_step = self.total_steps
def get_progress(self):
if self.total_steps == 0:
return 0, self.current_message
progress = min(self.current_step / self.total_steps, 1.0)
# Thêm thời gian đã chạy
if self.start_time:
elapsed = time.time() - self.start_time
elapsed_str = f" ({elapsed:.1f}s)"
else:
elapsed_str = ""
progress_text = f"[{self.current_step}/{self.total_steps}] {self.current_message}{elapsed_str}"
return progress, progress_text
# Global progress tracker
progress_tracker = ProgressTracker()
# =============================================================================
# BUTTON STATE MANAGEMENT
# =============================================================================
class ButtonStateManager:
"""Quản lý trạng thái các nút để tránh spam click"""
def __init__(self):
self.button_states = {}
def set_processing(self, button_id):
self.button_states[button_id] = True
def set_ready(self, button_id):
self.button_states[button_id] = False
def is_processing(self, button_id):
return self.button_states.get(button_id, False)
button_manager = ButtonStateManager()
# =============================================================================
# CÁC HÀM TIỆN ÍCH (Utility Functions)
# =============================================================================
def show_progress_message(message, message_type="info"):
"""Hiển thị thông báo với timestamp"""
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {message}"
if message_type == "info":
gr.Info(formatted_message)
elif message_type == "success":
gr.Success(formatted_message)
elif message_type == "warning":
gr.Warning(formatted_message)
elif message_type == "error":
gr.Error(formatted_message)
def get_available_audio_files():
"""Lấy danh sách các file audio có sẵn từ thư mục audio."""
audio_dir = "audio"
audio_files = []
if not os.path.exists(audio_dir):
return []
supported_formats = ['.wav', '.mp3', '.m4a', '.ogg', '.flac']
try:
for file in os.listdir(audio_dir):
if any(file.lower().endswith(ext) for ext in supported_formats):
audio_files.append(os.path.join(audio_dir, file))
# Sắp xếp theo tên file
audio_files.sort()
return audio_files
except Exception as e:
print(f"Lỗi khi quét thư mục audio: {e}")
return []
def get_audio_file_display_name(file_path):
"""Lấy tên hiển thị cho file audio."""
return os.path.basename(file_path)
def create_progress_html(progress, message):
"""Tạo HTML cho thanh progress bar"""
progress_percent = int(progress * 100)
return f"""
<div style="width: 100%; background-color: #f0f0f0; border-radius: 10px; padding: 3px;">
<div style="width: {progress_percent}%; background-color: #4CAF50; height: 20px; border-radius: 7px; text-align: center; line-height: 20px; color: white; font-size: 12px;">
{progress_percent}%
</div>
<div style="margin-top: 5px; font-size: 12px; color: #666;">
{message}
</div>
</div>
"""
# =============================================================================
# CÁC HÀM XỬ LÝ CỐT LÕI (Core Functions)
# =============================================================================
def get_sports_news_contents(news_type, language, limit):
"""
Lấy nội dung tin tức thể thao từ module sports_news.
"""
show_progress_message(f"Đang tải {limit} tin tức loại '{news_type}' bằng ngôn ngữ '{language}'...", "info")
try:
contents = get_sports_news_content(news_type, language, limit)
# Định dạng lại nội dung để hiển thị
content = ["Tiêu đề: " + item['title'] + "\nTóm tắt: " + item['summary'] for item in contents]
return content, contents # Trả về cả formatted content và raw data
except Exception as e:
print(f"Lỗi khi lấy tin tức: {e}")
show_progress_message(f"Lỗi khi lấy tin tức: {str(e)}", "error")
return [], []
# =============================================================================
# CÁC HÀM XỬ LÝ PODCAST (Podcast Processing Functions)
# =============================================================================
def process_selected_news_for_podcast(*args):
"""
Xử lý các tin tức đã chọn để tạo kịch bản podcast bằng Gemini.
"""
if button_manager.is_processing("generate_podcast"):
show_progress_message("Đang xử lý kịch bản khác, vui lòng chờ...", "warning")
return [gr.Group(visible=True)] + [gr.Column(visible=False)] * MAX_ARTICLES + [gr.Textbox(value="", visible=False)] * MAX_ARTICLES + [gr.Textbox(value="", visible=False)] * MAX_ARTICLES
if not GEMINI_AVAILABLE:
show_progress_message("Gemini API chưa được cấu hình. Vui lòng kiểm tra file .env và GEMINI_API_KEY.", "error")
raise gr.Error("Gemini API chưa được cấu hình.")
button_manager.set_processing("generate_podcast")
try:
raw_news_data = args[0]
checkboxes = args[1:MAX_ARTICLES+1]
selected_indices = [i for i, cb in enumerate(checkboxes) if cb and i < len(raw_news_data)]
if not selected_indices:
show_progress_message("Bạn chưa chọn tin tức nào để xử lý.", "warning")
raise gr.Error("Bạn chưa chọn tin tức nào để xử lý.")
progress_tracker.start(len(selected_indices), "Khởi tạo xử lý kịch bản podcast...")
show_progress_message(f"Bắt đầu tạo kịch bản cho {len(selected_indices)} tin tức đã chọn", "info")
podcast_col_updates = [gr.Column(visible=False)] * MAX_ARTICLES
podcast_textbox_updates = [gr.Textbox(value="", visible=False)] * MAX_ARTICLES
podcast_status_updates = [gr.Textbox(value="", visible=False)] * MAX_ARTICLES
processed_scripts = []
for idx in selected_indices:
if idx < MAX_ARTICLES and idx < len(raw_news_data):
try:
progress_tracker.update(0, f"Đang xử lý tin #{idx+1} với Gemini AI...")
show_progress_message(f"Đang tạo kịch bản cho tin #{idx+1}...", "info")
news_item = raw_news_data[idx]
podcast_script = process_news_data_for_podcast_with_search(news_item)
if podcast_script and not podcast_script.startswith("Lỗi:"):
processed_scripts.append({'index': idx, 'script': podcast_script})
podcast_col_updates[idx] = gr.Column(visible=True)
podcast_textbox_updates[idx] = gr.Textbox(value=podcast_script, label=f"Kịch bản Podcast cho Tin #{idx+1}", lines=8, interactive=True, visible=True)
podcast_status_updates[idx] = gr.Textbox(value=f"✅ Hoàn thành ({len(podcast_script)} ký tự)", visible=True)
show_progress_message(f"✅ Hoàn thành kịch bản tin #{idx+1}", "success")
else:
error_msg = podcast_script if podcast_script else "Không thể tạo kịch bản"
podcast_col_updates[idx] = gr.Column(visible=True)
podcast_textbox_updates[idx] = gr.Textbox(value=f"Lỗi: {error_msg}", visible=True)
podcast_status_updates[idx] = gr.Textbox(value=f"❌ Lỗi khi xử lý", visible=True)
show_progress_message(f"❌ Lỗi khi xử lý tin #{idx+1}: {error_msg}", "error")
progress_tracker.update(1)
except Exception as e:
print(f"Lỗi khi xử lý tin #{idx+1}: {e}")
podcast_col_updates[idx] = gr.Column(visible=True)
podcast_textbox_updates[idx] = gr.Textbox(value=f"Lỗi: {str(e)}", visible=True)
podcast_status_updates[idx] = gr.Textbox(value=f"❌ Lỗi: {str(e)}", visible=True)
show_progress_message(f"❌ Lỗi xử lý tin #{idx+1}: {str(e)}", "error")
progress_tracker.update(1)
progress_tracker.finish(f"Hoàn thành! Đã tạo {len(processed_scripts)} kịch bản thành công")
if processed_scripts:
show_progress_message(f"🎉 Đã tạo thành công {len(processed_scripts)}/{len(selected_indices)} kịch bản podcast!", "success")
else:
show_progress_message("⚠️ Không có kịch bản nào được tạo thành công.", "warning")
return [gr.Group(visible=True)] + podcast_col_updates + podcast_textbox_updates + podcast_status_updates
finally:
button_manager.set_ready("generate_podcast")
# =============================================================================
# CÁC HÀM ĐIỀU KHIỂN GIAO DIỆN (Gradio Logic Functions)
# =============================================================================
def process_and_display_news(news_type, language, limit):
"""
Xử lý việc lấy tin tức và cập nhật giao diện Gradio.
"""
if button_manager.is_processing("generate_news"):
show_progress_message("Đang tải tin tức khác, vui lòng chờ...", "warning")
return [[], gr.Group(visible=False), gr.Group(visible=False), gr.Group(visible=False), gr.Group(visible=False)] + [gr.Row(visible=False)] * MAX_ARTICLES + [gr.Textbox(value="")] * MAX_ARTICLES
button_manager.set_processing("generate_news")
try:
show_progress_message(f"Bắt đầu tải {limit} tin tức loại '{news_type}'...", "info")
articles, raw_data = get_sports_news_contents(news_type, language, int(limit))
state_update = raw_data
news_group_update = gr.Group(visible=False)
podcast_group_update = gr.Group(visible=False)
audio_group_update = gr.Group(visible=False)
# --- Tắt group video khi tạo tin mới ---
video_group_update = gr.Group(visible=False)
row_updates = [gr.Row(visible=False)] * MAX_ARTICLES
textbox_updates = [gr.Textbox(value="")] * MAX_ARTICLES
if not articles:
show_progress_message("Không tìm thấy tin tức nào. Vui lòng thử lại hoặc thay đổi tiêu chí tìm kiếm.", "warning")
else:
show_progress_message(f"🎉 Đã tải thành công {len(articles)} tin tức!", "success")
news_group_update = gr.Group(visible=True)
if GEMINI_AVAILABLE:
podcast_group_update = gr.Group(visible=True)
audio_group_update = gr.Group(visible=True)
for i in range(len(articles)):
if i < MAX_ARTICLES:
row_updates[i] = gr.Row(visible=True)
textbox_updates[i] = gr.Textbox(value=articles[i])
return [state_update, news_group_update, podcast_group_update, audio_group_update, video_group_update] + row_updates + textbox_updates
finally:
button_manager.set_ready("generate_news")
def generate_audio_from_podcast_wrapper(voice_ref, *args):
"""
Hàm bao bọc tạo audio và kích hoạt bước tạo video.
"""
if button_manager.is_processing("generate_audio"):
show_progress_message("Đang tạo audio khác, vui lòng chờ...", "warning")
return [gr.Group(visible=True)] + [gr.Column(visible=False)] * MAX_ARTICLES + [gr.Audio(value=None, visible=False)] * MAX_ARTICLES + [gr.Textbox(value="", visible=False)] * MAX_ARTICLES + [gr.Group(visible=False), gr.Dropdown(choices=[], value=None), gr.CheckboxGroup(choices=[], value=[]), gr.CheckboxGroup(choices=[], value=[]), gr.Textbox(value=""), gr.Textbox(value=""), gr.File(value=None), gr.File(value=None)]
button_manager.set_processing("generate_audio")
try:
show_progress_message("Bắt đầu quá trình tạo audio...", "info")
podcast_scripts = args[0:MAX_ARTICLES]
valid_scripts, script_indices = [], []
for i, script in enumerate(podcast_scripts):
if script and script.strip() and not script.startswith("Lỗi:") and len(script.strip()) > 50:
valid_scripts.append(script.strip())
script_indices.append(i)
if not valid_scripts:
show_progress_message("Không có kịch bản podcast hợp lệ nào để tạo audio.", "error")
raise gr.Error("Không có kịch bản podcast hợp lệ nào để tạo audio.")
if not voice_ref:
show_progress_message("Vui lòng cung cấp file giọng nói tham chiếu (Giọng MC).", "error")
raise gr.Error("Vui lòng cung cấp file giọng nói tham chiếu (Giọng MC).")
progress_tracker.start(len(valid_scripts) + 1, "Khởi tạo tạo audio...")
show_progress_message(f"Chuẩn bị tạo audio cho {len(valid_scripts)} kịch bản...", "info")
try:
progress_tracker.update(0, "Đang phân tích giọng nói của MC...")
show_progress_message("Đang phân tích giọng nói của MC...", "info")
gpt_cond_latent, speaker_embedding = tts_module.get_voice_conditioning(voice_ref)
progress_tracker.update(1, "Phân tích giọng nói thành công!")
show_progress_message("✅ Phân tích giọng nói thành công!", "success")
except Exception as e:
show_progress_message(f"Có lỗi với file giọng nói tham chiếu: {e}", "error")
raise gr.Error(f"Có lỗi với file giọng nói tham chiếu: {e}")
audio_col_updates = [gr.Column(visible=False)] * MAX_ARTICLES
audio_player_updates = [gr.Audio(value=None, visible=False)] * MAX_ARTICLES
audio_status_updates = [gr.Textbox(value="", visible=False)] * MAX_ARTICLES
successful_audio_paths = [] # --- Lưu các audio thành công
for i, (script_idx, script) in enumerate(zip(script_indices, valid_scripts)):
if script_idx < MAX_ARTICLES:
try:
progress_tracker.update(0, f"Đang tạo audio cho kịch bản #{script_idx+1}... ({i+1}/{len(valid_scripts)})")
show_progress_message(f"Đang tạo audio cho kịch bản #{script_idx+1}... ({i+1}/{len(valid_scripts)})", "info")
output_filename = f"news_{script_idx+1}_{uuid.uuid4()}.wav"
audio_path = tts_module.predict_tts(
text=script, language="vi", output_filename=output_filename,
gpt_cond_latent=gpt_cond_latent, speaker_embedding=speaker_embedding
)
successful_audio_paths.append(audio_path) # Thêm vào danh sách
audio_col_updates[script_idx] = gr.Column(visible=True)
audio_player_updates[script_idx] = gr.Audio(value=audio_path, label=f"Audio Podcast #{script_idx+1}", visible=True)
# Tính thời lượng file audio
try:
import librosa
duration = librosa.get_duration(filename=audio_path)
duration_str = f" - {duration:.1f}s"
except:
duration_str = ""
audio_status_updates[script_idx] = gr.Textbox(value=f"✅ Thành công: {os.path.basename(audio_path)}{duration_str}", visible=True)
show_progress_message(f"✅ Hoàn thành audio #{script_idx+1}{duration_str}", "success")
progress_tracker.update(1)
except Exception as e:
print(f"Lỗi khi tạo audio cho kịch bản #{script_idx+1}: {e}")
audio_col_updates[script_idx] = gr.Column(visible=True)
audio_player_updates[script_idx] = gr.Audio(value=None, visible=False)
audio_status_updates[script_idx] = gr.Textbox(value=f"❌ Lỗi: {e}", visible=True)
show_progress_message(f"❌ Lỗi tạo audio #{script_idx+1}: {str(e)}", "error")
progress_tracker.update(1)
progress_tracker.finish(f"Hoàn thành! Tạo được {len(successful_audio_paths)} audio")
show_progress_message(f"🎉 Hoàn tất! Tạo thành công {len(successful_audio_paths)} audio!", "success")
# --- Cập nhật giao diện cho Bước 4 ---
video_group_visible = gr.Group(visible=True) if successful_audio_paths else gr.Group(visible=False)
audio_dropdown_update = gr.Dropdown(choices=successful_audio_paths, value=successful_audio_paths[0] if successful_audio_paths else None, interactive=True)
# Khởi tạo titles và load content cho title đầu tiên nếu có
titles = scan_public_titles()
title_checkboxes_update = gr.CheckboxGroup(choices=titles, value=titles[:1] if titles else [], interactive=True)
# Load media cho title đầu tiên
if titles:
media = load_title_content(titles[:1])
else:
media = gr.CheckboxGroup(choices=[], value=[])
# Khởi tạo các field mới cho thumbnail overlay
description_input = gr.Textbox(value="Breaking News - Tin Tức Thể Thao Mới Nhất", placeholder="Nhập mô tả cho video...")
date_input = gr.Textbox(value=datetime.now().strftime("%Y-%m-%d"), placeholder="YYYY-MM-DD")
thumbnail_upload = gr.File(value=None, file_types=["image"])
logo_upload = gr.File(value=None, file_types=["image"])
return [gr.Group(visible=True)] + audio_col_updates + audio_player_updates + audio_status_updates + [video_group_visible, audio_dropdown_update, title_checkboxes_update, media, description_input, date_input, thumbnail_upload, logo_upload]
finally:
button_manager.set_ready("generate_audio")
# =============================================================================
# --- CÁC HÀM MỚI CHO VIỆC TẠO VIDEO THEO CẤU TRÚC PUBLIC ---
# =============================================================================
def scan_public_titles():
"""Quét thư mục public để tìm các title có sẵn."""
public_dir = "public"
titles = []
if not os.path.exists(public_dir):
show_progress_message("Thư mục 'public' không tồn tại.", "warning")
return []
try:
for item in os.listdir(public_dir):
item_path = os.path.join(public_dir, item)
if os.path.isdir(item_path):
# Chỉ kiểm tra xem có thư mục media không
media_dir = os.path.join(item_path, "media")
if os.path.exists(media_dir):
titles.append(item)
if not titles:
show_progress_message("Không tìm thấy title nào trong thư mục public.", "warning")
else:
show_progress_message(f"Tìm thấy {len(titles)} title có sẵn", "info")
return titles
except Exception as e:
show_progress_message(f"Lỗi khi quét thư mục public: {e}", "error")
return []
def load_title_content(selected_titles):
"""Tải media files từ các title đã chọn."""
if not selected_titles:
return gr.CheckboxGroup(choices=[], value=[])
public_dir = "public"
supported_media_exts = ['.mp4', '.mov', '.avi', '.mkv', '.webm', '.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.tiff']
all_media_files = []
# Xử lý từng title được chọn
for selected_title in selected_titles:
title_dir = os.path.join(public_dir, selected_title)
media_dir = os.path.join(title_dir, "media")
# Tải media files từ title này
if os.path.exists(media_dir):
try:
for item in os.listdir(media_dir):
if os.path.splitext(item)[1].lower() in supported_media_exts:
file_path = os.path.join(media_dir, item)
# Thêm tên title vào hiển thị để dễ phân biệt
display_name = f"{selected_title}/media/{item}"
all_media_files.append((display_name, file_path))
except Exception as e:
show_progress_message(f"Lỗi khi tải media files từ {selected_title}: {e}", "warning")
# Thông báo kết quả
if all_media_files:
show_progress_message(f"Tìm thấy {len(all_media_files)} media files từ {len(selected_titles)} title(s)", "info")
else:
show_progress_message(f"Không tìm thấy file nào từ các title đã chọn", "warning")
# Chuyển đổi để sử dụng với CheckboxGroup
media_choices = [(display, path) for display, path in all_media_files]
# Trả về cập nhật cho CheckboxGroup
media_update = gr.CheckboxGroup(
choices=media_choices,
value=[path for _, path in all_media_files],
interactive=True
)
return media_update
def refresh_titles_checkboxes():
"""Làm mới danh sách titles có sẵn."""
show_progress_message("Đang làm mới danh sách title...", "info")
titles = scan_public_titles()
return gr.CheckboxGroup(choices=titles, value=titles[:1] if titles else [], interactive=True)
def generate_video_wrapper(audio_path, source_media_paths, aspect_ratio, num_versions, description, date, thumbnail_file, logo_file):
"""Hàm bao bọc để gọi module tạo video và cập nhật giao diện."""
if not audio_path:
raise gr.Error("Vui lòng chọn một file audio từ Bước 3.")
# source_media_paths là list các đường dẫn file từ CheckboxGroup
selected_media = source_media_paths if source_media_paths else []
if not selected_media:
raise gr.Error("Vui lòng chọn ít nhất một file media nguồn.")
gr.Info(f"Bắt đầu tạo {num_versions} phiên bản video với tỷ lệ '{aspect_ratio}'...")
gr.Info(f"Sử dụng {len(selected_media)} media files")
try:
# Xử lý thumbnail và logo paths
thumbnail_path = thumbnail_file.name if thumbnail_file else None
logo_path = logo_file.name if logo_file else None
base_filename = f"video_{Path(audio_path).stem}"
generated_videos = generate_multiple_video_versions(
audio_path=audio_path,
source_media_paths=selected_media,
aspect_ratio_key=aspect_ratio,
num_versions=int(num_versions),
base_filename=base_filename,
output_dir=VIDEO_SAVE_DIR,
target_height=1080,
# Thông tin thumbnail overlay
thumbnail_path=thumbnail_path,
logo_path=logo_path,
description=description,
date=date
)
if not generated_videos:
show_progress_message("Không có video nào được tạo. Vui lòng kiểm tra log.", "warning")
return gr.Group(visible=False), gr.Gallery(value=None)
show_progress_message(f"Đã tạo thành công {len(generated_videos)} video!", "success")
# Hiển thị video trong Gallery
return gr.Group(visible=True), gr.Gallery(value=generated_videos, label="Các phiên bản video")
except Exception as e:
print(f"Lỗi nghiêm trọng khi tạo video: {e}")
import traceback
traceback.print_exc()
raise gr.Error(f"Lỗi khi tạo video: {e}")
# =============================================================================
# CÁC HÀM QUẢN LÝ MEDIA (Media Management Functions)
# =============================================================================
def get_title_list():
"""Lấy danh sách các title có sẵn trong thư mục public."""
public_dir = Path("public")
if not public_dir.exists():
return []
return [d.name for d in public_dir.iterdir() if d.is_dir()]
def create_new_title(title_name):
"""Tạo thư mục mới cho title."""
try:
title_dir = Path("public") / title_name
title_dir.mkdir(parents=True, exist_ok=True)
(title_dir / "media").mkdir(exist_ok=True)
return True, f"Đã tạo title '{title_name}' thành công!"
except Exception as e:
return False, f"Lỗi khi tạo title: {str(e)}"
def delete_title(title_name):
"""Xóa title và tất cả dữ liệu liên quan."""
try:
import shutil
title_dir = Path("public") / title_name
if title_dir.exists():
shutil.rmtree(title_dir)
return True, f"Đã xóa title '{title_name}' thành công!"
return False, f"Không tìm thấy title '{title_name}'"
except Exception as e:
return False, f"Lỗi khi xóa title: {str(e)}"
def get_media_files(title_name):
"""Lấy danh sách các file media trong thư mục của title."""
if not title_name:
return []
media_dir = Path("public") / title_name / "media"
if not media_dir.exists():
return []
files = []
for ext in ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.mp4', '*.mov', '*.avi']:
files.extend([str(f.absolute()) for f in media_dir.glob(ext)])
return files
def save_uploaded_files(uploaded_files, title_name, media_type):
"""Lưu các file đã tải lên vào thư mục tương ứng."""
if not title_name or not uploaded_files:
return False, "Vui lòng chọn title và file cần tải lên!"
try:
save_dir = Path("public") / title_name / media_type
save_dir.mkdir(parents=True, exist_ok=True)
saved_files = []
for file_info in uploaded_files:
if isinstance(file_info, dict):
# Handle Gradio file object (when using file_count="multiple")
file_path = Path(file_info["name"]).name
file_content = file_info["data"]
save_path = save_dir / file_path
with open(save_path, "wb") as f:
if isinstance(file_content, str):
file_content = file_content.encode()
f.write(file_content)
saved_files.append(str(save_path.absolute()))
else:
# Handle direct file paths (for backward compatibility)
file_path = Path(file_info)
save_path = save_dir / file_path.name
if file_path.exists():
import shutil
shutil.copy2(file_path, save_path)
saved_files.append(str(save_path.absolute()))
return True, f"Đã tải lên {len(saved_files)} file thành công!"
except Exception as e:
return False, f"Lỗi khi tải lên file: {str(e)}"
# =============================================================================
# GIAO DIỆN GRADIO (Interface)
# =============================================================================
with gr.Blocks(title="Hệ thống sản xuất Video") as demo:
gr.Markdown("# 🎬 Hệ thống sản xuất Video Thể thao theo từng bước")
with gr.Tabs() as tabs:
with gr.TabItem("🎥 Tạo Video"):
gr.Markdown("**Quy trình:** `Bước 1: Tạo Tin Tức` → `Bước 2: Lựa chọn & Chỉnh sửa` → `Bước 2.5: Tạo Kịch bản Podcast` → `Bước 3: Tạo Âm thanh` → `Bước 4: Tạo Video`")
state_news_content = gr.State([])
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## ⚽ Bước 1: Tạo Tin Tức")
sports_news_type = gr.Dropdown(label="Loại tin thể thao", choices=["all", "football", "tennis", "basketball"], value="all")
sports_language = gr.Dropdown(label="Ngôn ngữ nguồn", choices=["vi", "en"], value="vi")
sports_limit = gr.Slider(label="Số lượng tin", minimum=1, maximum=MAX_ARTICLES, value=3, step=1)
generate_news_btn = gr.Button("📰 Tạo Tin Tức", variant="primary")
with gr.Group(visible=False) as news_display_group:
gr.Markdown("## ✍️ Bước 2: Lựa chọn và Chỉnh sửa Tin Tức")
gr.Markdown("Tích vào các tin bạn muốn sử dụng để tạo kịch bản podcast...")
news_rows, news_checkboxes, news_textboxes = [], [], []
for i in range(MAX_ARTICLES):
with gr.Row(visible=False) as row:
cb = gr.Checkbox(label=f"Tin #{i+1}", value=True, interactive=True, scale=1)
tb = gr.Textbox(lines=3, interactive=True, scale=9)
news_rows.append(row); news_checkboxes.append(cb); news_textboxes.append(tb)
with gr.Group(visible=False) as podcast_generation_group:
gr.Markdown("## 🎭 Bước 2.5: Tạo Kịch bản Podcast với Gemini AI")
if not GEMINI_AVAILABLE: gr.Markdown("❌ Gemini AI chưa được cấu hình.")
generate_podcast_btn = gr.Button("🎭 Tạo Kịch bản Podcast", variant="secondary", interactive=GEMINI_AVAILABLE)
with gr.Group(visible=False) as podcast_results_group:
gr.Markdown("## 📝 Kịch bản Podcast (Có thể chỉnh sửa)")
podcast_result_cols, podcast_textboxes, podcast_statuses = [], [], []
for i in range(MAX_ARTICLES):
with gr.Column(visible=False) as col:
textbox = gr.Textbox(label=f"Kịch bản Podcast cho Tin #{i+1}", lines=8, interactive=True)
status = gr.Textbox(label="Trạng thái", interactive=False)
podcast_result_cols.append(col); podcast_textboxes.append(textbox); podcast_statuses.append(status)
with gr.Group(visible=False) as audio_generation_group:
gr.Markdown("## 🎙️ Bước 3: Tạo Âm Thanh từ Kịch bản")
gr.Markdown("### Chọn giọng MC từ thư mục audio có sẵn:")
# Lấy danh sách audio files có sẵn
available_audio_files = get_available_audio_files()
audio_choices = [(get_audio_file_display_name(f), f) for f in available_audio_files]
with gr.Row():
voice_ref_audio_dropdown = gr.Dropdown(
label="🎤 Chọn Giọng MC",
choices=audio_choices,
value=available_audio_files[0] if available_audio_files else None,
interactive=True,
scale=3
)
refresh_audio_btn = gr.Button("🔄 Làm mới", size="sm", scale=1)
# Thêm audio player để preview
voice_preview_player = gr.Audio(
label="🔊 Nghe thử giọng đã chọn",
visible=True if available_audio_files else False
)
generate_audio_from_podcast_btn = gr.Button("🔊 Tạo Audio từ Kịch bản", variant="primary")
with gr.Group(visible=False) as audio_results_group:
gr.Markdown("## 🎧 Kết quả Âm thanh")
audio_result_cols, audio_players, audio_statuses = [], [], []
for i in range(MAX_ARTICLES):
with gr.Column(visible=False) as col:
player = gr.Audio(label=f"Audio Podcast #{i+1}")
status = gr.Textbox(label="Trạng thái", interactive=False)
audio_result_cols.append(col); audio_players.append(player); audio_statuses.append(status)
# --- GIAO DIỆN BƯỚC 4: TẠO VIDEO ---
with gr.Group(visible=False) as video_generation_group:
gr.Markdown("## 🎬 Bước 4: Tạo Video từ Audio và Media")
gr.Markdown("Chọn title từ thư mục public để sử dụng media có sẵn và cấu hình thumbnail overlay.")
with gr.Row():
with gr.Column(scale=1):
video_audio_input = gr.Dropdown(label="Chọn Audio đã tạo từ Bước 3", interactive=True)
video_aspect_ratio = gr.Dropdown(label="Tỷ lệ khung hình", choices=["ngang", "doc", "vuong"], value="doc")
video_num_versions = gr.Slider(label="Số phiên bản Video", minimum=1, maximum=5, value=1, step=1)
with gr.Column(scale=1):
# Khởi tạo với multiple selection cho titles
initial_titles = scan_public_titles()
video_title_checkboxes = gr.CheckboxGroup(
label="Chọn Title(s) - Có thể chọn nhiều",
choices=initial_titles,
value=initial_titles[:1] if initial_titles else [],
interactive=True
)
refresh_titles_btn = gr.Button("🔄 Làm mới danh sách", size="sm")
gr.Markdown("### 📁 Media từ thư mục: `public/{title}/media`")
video_source_media_checkboxes = gr.CheckboxGroup(label="Chọn Media Files", info="Video và ảnh từ thư mục public/{title}/media", interactive=True)
gr.Markdown("### 🎨 Cấu hình Thumbnail Overlay (hiển thị 5 giây đầu)")
with gr.Row():
with gr.Column(scale=1):
video_description_input = gr.Textbox(
label="Mô tả (hiển thị trên thumbnail)",
value="Breaking News - Tin Tức Thể Thao Mới Nhất",
placeholder="Nhập mô tả cho video...",
lines=2
)
video_date_input = gr.Textbox(
label="Ngày tháng (góc phải thumbnail)",
value=datetime.now().strftime("%Y-%m-%d"),
placeholder="YYYY-MM-DD"
)
with gr.Column(scale=1):
video_thumbnail_upload = gr.File(
label="Tải lên Thumbnail (1/3 dưới video)",
file_types=["image"],
file_count="single"
)
video_logo_upload = gr.File(
label="Tải lên Logo (góc phải trên)",
file_types=["image"],
file_count="single"
)
generate_video_btn = gr.Button("🎬 Tạo Video", variant="primary")
# --- KẾT QUẢ VIDEO ---
with gr.Group(visible=False) as video_results_group:
gr.Markdown("## 🎥 Kết quả Video")
video_gallery_output = gr.Gallery(label="Các phiên bản video đã tạo", show_label=True, object_fit="contain", height="auto")
# =========================================================================
# KẾT NỐI CÁC SỰ KIỆN (Event Handlers)
# =========================================================================
generate_news_btn.click(
fn=process_and_display_news,
inputs=[sports_news_type, sports_language, sports_limit],
outputs=[state_news_content, news_display_group, podcast_generation_group, audio_generation_group, video_generation_group] + news_rows + news_textboxes
)
generate_podcast_btn.click(
fn=process_selected_news_for_podcast,
inputs=[state_news_content] + news_checkboxes,
outputs=[podcast_results_group] + podcast_result_cols + podcast_textboxes + podcast_statuses
)
# Thêm function để refresh audio dropdown
def refresh_audio_dropdown():
"""Làm mới danh sách audio files có sẵn."""
available_audio_files = get_available_audio_files()
audio_choices = [(get_audio_file_display_name(f), f) for f in available_audio_files]
return gr.Dropdown(choices=audio_choices, value=available_audio_files[0] if available_audio_files else None)
def update_audio_preview(selected_audio):
"""Cập nhật audio player để preview."""
if selected_audio and os.path.exists(selected_audio):
return gr.Audio(value=selected_audio, visible=True)
return gr.Audio(value=None, visible=False)
# Nút Tạo Audio giờ sẽ cập nhật cả kết quả audio và giao diện Bước 4
audio_outputs = [audio_results_group] + audio_result_cols + audio_players + audio_statuses
video_setup_outputs = [video_generation_group, video_audio_input, video_title_checkboxes, video_source_media_checkboxes, video_description_input, video_date_input, video_thumbnail_upload, video_logo_upload]
generate_audio_from_podcast_btn.click(
fn=generate_audio_from_podcast_wrapper,
inputs=[voice_ref_audio_dropdown] + podcast_textboxes,
outputs=audio_outputs + video_setup_outputs
)
# Thêm event handlers cho audio dropdown
refresh_audio_btn.click(
fn=refresh_audio_dropdown,
inputs=[],
outputs=[voice_ref_audio_dropdown]
)
voice_ref_audio_dropdown.change(
fn=update_audio_preview,
inputs=[voice_ref_audio_dropdown],
outputs=[voice_preview_player]
)
# --- KẾT NỐI SỰ KIỆN CHO BƯỚC 4 ---
refresh_titles_btn.click(
fn=refresh_titles_checkboxes,
inputs=[],
outputs=[video_title_checkboxes]
)
video_title_checkboxes.change(
fn=load_title_content,
inputs=[video_title_checkboxes],
outputs=[video_source_media_checkboxes]
)
generate_video_btn.click(
fn=generate_video_wrapper,
inputs=[
video_audio_input,
video_source_media_checkboxes,
video_aspect_ratio,
video_num_versions,
video_description_input,
video_date_input,
video_thumbnail_upload,
video_logo_upload
],
outputs=[video_results_group, video_gallery_output]
)
with gr.TabItem("📁 Quản lý Media"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## 📂 Quản lý Title")
with gr.Row():
new_title_input = gr.Textbox(label="Tên Title mới", placeholder="Nhập tên title...", scale=3)
create_title_btn = gr.Button("Tạo mới", variant="primary", scale=1)
title_dropdown = gr.Dropdown(
label="Chọn Title",
choices=get_title_list(),
interactive=True
)
refresh_titles_mgmt_btn = gr.Button("🔄 Làm mới danh sách")
delete_title_btn = gr.Button("🗑️ Xóa Title", variant="stop")
title_status = gr.Textbox(label="Trạng thái", interactive=False)
with gr.Column(scale=2):
gr.Markdown("## 🎬 Quản lý Media")
with gr.Row():
with gr.Column(scale=1):
media_upload = gr.File(
label="Tải lên Media (hình ảnh/video)",
file_types=["image", "video"],
file_count="multiple"
)
upload_media_btn = gr.Button("Tải lên Media", variant="primary")
media_gallery = gr.Gallery(
label="Media hiện có",
show_label=True,
object_fit="contain",
height="400px"
)
# =========================================================================
# KẾT NỐI CÁC SỰ KIỆN (Event Handlers) cho Media Management
# =========================================================================
def refresh_title_list():
titles = get_title_list()
return gr.Dropdown(choices=titles, value=titles[0] if titles else None)
def update_media_galleries(selected_title):
if not selected_title:
return gr.Gallery(value=[]), ""
media = get_media_files(selected_title)
return media, f"Đã tải {len(media)} media files"
def handle_upload_media(files, title):
if not title:
return None, "Vui lòng chọn title trước khi tải lên!"
success, message = save_uploaded_files(files, title, "media")
if success:
# Refresh the gallery after upload
media = get_media_files(title)
return media, message
return None, message
def handle_create_title(title_name):
if not title_name.strip():
return "", "Vui lòng nhập tên title hợp lệ!", gr.Dropdown()
success, message = create_new_title(title_name.strip())
if success:
return "", message, gr.Dropdown(choices=get_title_list(), value=title_name.strip())
return "", message, gr.Dropdown()
def handle_delete_title(title_name):
if not title_name:
return "", "Vui lòng chọn title để xóa!", gr.Dropdown()
success, message = delete_title(title_name)
if success:
titles = get_title_list()
return "", message, gr.Dropdown(choices=titles, value=titles[0] if titles else None)
return "", message, gr.Dropdown()
# Connect event handlers
refresh_titles_mgmt_btn.click(
fn=refresh_title_list,
inputs=[],
outputs=[title_dropdown]
)
title_dropdown.change(
fn=update_media_galleries,
inputs=[title_dropdown],
outputs=[media_gallery, title_status]
)
create_title_btn.click(
fn=handle_create_title,
inputs=[new_title_input],
outputs=[new_title_input, title_status, title_dropdown]
)
delete_title_btn.click(
fn=handle_delete_title,
inputs=[title_dropdown],
outputs=[title_status, title_dropdown]
)
upload_media_btn.click(
fn=handle_upload_media,
inputs=[media_upload, title_dropdown],
outputs=[media_gallery, title_status]
)
# =============================================================================
# KHỞI CHẠY ỨNG DỤNG
# =============================================================================
if __name__ == "__main__":
demo.queue()
demo.launch(
debug=True,
show_api=False,
share=False,
server_name="0.0.0.0",
server_port=7860,
)