F5-TTS-pt-br / app.py
fuuuzzy's picture
Upload folder using huggingface_hub
7c71fa7 verified
import os
import shutil
import threading
import time
from functools import wraps
import logging
import requests
import yaml
from flask import Flask, request, jsonify, g
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from services.logger import get_app_logger, RequestLogger, task_id_var, get_process_worker_logger, \
get_upload_worker_logger
from services.tts_service import TTSService
from services.queue_manager import QueueManager
from services.r2_uploader import R2Uploader
from services.uvr5_service import UVR5Service
from services.merger_service import MergerService
logger = get_app_logger()
# 加载配置
def load_config(config_path='config.yaml'):
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
config = load_config()
# Auth Helpers
def check_auth(username, password):
app_config = config.get('app', {})
return username == app_config['api_username'] and password == app_config['api_password']
def authenticate():
return jsonify({'error': 'Authentication required'}), 401, {'WWW-Authenticate': 'Basic realm="Login Required"'}
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def send_hook_with_retry(url: str, data: dict, max_retries: int = 3):
session = requests.Session()
retries = Retry(total=max_retries, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))
try:
response = session.post(url, json=data, timeout=10)
response.raise_for_status()
return response
except Exception as e:
logger.error(f"Failed to send hook to {url}: {e}")
pass
def download_file(url: str, path: str):
response = requests.get(url, stream=True, timeout=60)
response.raise_for_status()
with open(path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Initialize Flask
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024
# Initialize Services
queue_manager = QueueManager(config['redis'])
r2_uploader = R2Uploader(config['r2'])
tts_service = TTSService(config)
uvr5_service = UVR5Service(config)
merger_service = MergerService(config)
# Temp Dir for Videos
VIDEO_TEMP_DIR = 'data/temp_videos'
os.makedirs(VIDEO_TEMP_DIR, exist_ok=True)
# -------------------------------------------------------------------------
# Workers
# -------------------------------------------------------------------------
def process_worker():
"""
Main Pipeline Worker:
1. Fetch Task
2. Download Video
3. Run TTS Generation
4. Run UVR5 Separation (get BGM)
5. Merge (Video + TTS + BGM)
6. Push to Upload Queue
"""
worker_logger = get_process_worker_logger()
worker_logger.info("Main Process Worker started")
while True:
try:
task = queue_manager.get_process_task()
if not task:
time.sleep(1)
continue
task_id = task.get('task_id')
token = task_id_var.set(task_id)
# Context variables for cleanup
local_video_path = None
bgm_path = None
vocals_path = None
task_tts_dir = None
final_output_path = None
success = False
try:
worker_logger.info("Processing started.")
# 1. Download Video
video_url = task['data'].get('video_url')
if not video_url:
raise ValueError("Missing video_url")
local_video_path = os.path.join(VIDEO_TEMP_DIR, f"{task_id}_input.mp4")
worker_logger.info(f"Downloading video from {video_url}")
download_file(video_url, local_video_path)
# 2. Run TTS
worker_logger.info("Running TTS...")
tts_result = tts_service.process_task(task)
segments = tts_result['segments']
task_tts_dir = tts_result['task_dir']
# 3. Run UVR5
worker_logger.info("Running UVR5 Separation...")
vocals_path, bgm_path = uvr5_service.process_audio(local_video_path, task_id)
if not bgm_path or not os.path.exists(bgm_path):
raise Exception("UVR5 failed to produce background music.")
# 4. Merge
worker_logger.info("Merging Audio and Video...")
final_output_path = os.path.join(VIDEO_TEMP_DIR, f"{task_id}_final.mp4")
merger_service.merge_video(
video_path=local_video_path,
bgm_path=bgm_path,
segments=segments,
output_path=final_output_path
)
# 5. Push to Upload
upload_task = {
'task_id': task_id,
'file_path': final_output_path,
'hook_url': task['data'].get('hook_url'),
}
queue_manager.push_upload_task(upload_task)
success = True
except Exception as e:
worker_logger.error(f"Task processing failed: {e}", exc_info=True)
if 'hook_url' in task.get('data', {}):
hook_url = task['data']['hook_url']
failure_payload = {
"task_uuid": task_id,
"status": "failed",
"timestamp": int(time.time()),
"error_message": str(e)
}
send_hook_with_retry(hook_url, failure_payload)
finally:
# Cleanup Logic
try:
if local_video_path and os.path.exists(local_video_path):
os.remove(local_video_path)
if bgm_path and os.path.exists(bgm_path):
os.remove(bgm_path)
if vocals_path and os.path.exists(vocals_path):
os.remove(vocals_path)
if task_tts_dir and os.path.exists(task_tts_dir):
shutil.rmtree(task_tts_dir)
# Only delete final output if we FAILED.
# If success, upload worker handles it.
if not success and final_output_path and os.path.exists(final_output_path):
os.remove(final_output_path)
except Exception as cleanup_err:
worker_logger.warning(f"Cleanup error: {cleanup_err}")
task_id_var.reset(token)
except Exception as e:
worker_logger.error(f"Worker Loop Error: {e}")
time.sleep(5)
def upload_worker():
"""
Upload Worker:
1. Upload Final Video
2. Send Success Callback
3. Cleanup Final Video
"""
worker_logger = get_upload_worker_logger()
worker_logger.info("Upload Worker started")
while True:
try:
result = queue_manager.get_upload_task(timeout=5)
if not result:
continue
task_id = result.get('task_id')
token = task_id_var.set(task_id)
file_path = result.get('file_path')
hook_url = result.get('hook_url')
try:
worker_logger.info(f"Uploading result: {file_path}")
file_url = None
if file_path and os.path.exists(file_path):
object_key = f"{task_id}.mp4"
file_url = r2_uploader.upload_file(file_path, object_key=object_key)
else:
raise FileNotFoundError(f"File to upload not found: {file_path}")
if hook_url:
success_payload = {
"task_uuid": task_id,
"status": "success",
"timestamp": int(time.time()),
"result_url": file_url
}
worker_logger.info(f"Sending success callback to {hook_url}")
send_hook_with_retry(hook_url, success_payload)
except Exception as e:
worker_logger.error(f"Upload failed: {e}", exc_info=True)
if hook_url:
failure_payload = {
"task_uuid": task_id,
"status": "failed",
"timestamp": int(time.time()),
"error_message": str(e)
}
send_hook_with_retry(hook_url, failure_payload)
finally:
# Cleanup the final video file
if file_path and os.path.exists(file_path):
try:
os.remove(file_path)
worker_logger.info(f"Removed final video: {file_path}")
except Exception as e:
worker_logger.warning(f"Failed to remove file: {e}")
task_id_var.reset(token)
except Exception as e:
logger.error(f"Upload Loop Error: {e}")
time.sleep(5)
# -------------------------------------------------------------------------
# Flask Routes
# -------------------------------------------------------------------------
@app.before_request
def before_request():
g.start_time = time.time()
@app.after_request
def after_request(response):
if hasattr(g, 'start_time'):
duration = time.time() - g.start_time
RequestLogger.log_request(request, response, duration)
return response
@app.route('/dubbing/character', methods=['POST'])
@requires_auth
def generate():
try:
data = request.json
# Basic Validation
required = ['character_voice', 'content', 'hook_url', 'video_url']
for field in required:
if not data.get(field):
return jsonify({'error': f'Missing field: {field}'}), 400
priority = data.get('priority', 3)
if priority not in range(1, 6):
return jsonify({'error': 'Priority must be 1-5'}), 400
task_id = queue_manager.add_task(data, priority)
logger.info(f"Created Task: {task_id}")
return jsonify({
'task_uuid': task_id,
'status': 'queued',
'message': 'Task queued successfully'
}), 201
except Exception as e:
logger.error(f"API Error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/dubbing/character/tasks/<task_id>/cancel', methods=['DELETE'])
@requires_auth
def cancel_task(task_id: str):
try:
if queue_manager.delete_process_task(task_id):
return jsonify({'message': 'Task canceled'}), 200
return jsonify({'message': 'Task not found or already processed'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.errorhandler(500)
def internal_error(error):
logger.error(f"500 Error: {error}")
return jsonify({'error': 'Internal server error'}), 500
def main():
logger.info("Starting Service...")
# Directories
os.makedirs(config['tts']['output_dir'], exist_ok=True)
os.makedirs(config['tts']['voices_dir'], exist_ok=True)
os.makedirs(VIDEO_TEMP_DIR, exist_ok=True)
# Threads
threading.Thread(target=process_worker, daemon=True).start()
threading.Thread(target=upload_worker, daemon=True).start()
app.run(
host=config['app']['host'],
port=config['app']['port'],
debug=config['app']['debug']
)
if __name__ == '__main__':
main()