|
|
import os |
|
|
import shutil |
|
|
import threading |
|
|
import time |
|
|
from functools import wraps |
|
|
import logging |
|
|
|
|
|
import requests |
|
|
import yaml |
|
|
from flask import Flask, request, jsonify, g |
|
|
from requests.adapters import HTTPAdapter |
|
|
from urllib3.util.retry import Retry |
|
|
|
|
|
from services.logger import get_app_logger, RequestLogger, task_id_var, get_process_worker_logger, \ |
|
|
get_upload_worker_logger |
|
|
from services.tts_service import TTSService |
|
|
from services.queue_manager import QueueManager |
|
|
from services.r2_uploader import R2Uploader |
|
|
from services.uvr5_service import UVR5Service |
|
|
from services.merger_service import MergerService |
|
|
|
|
|
logger = get_app_logger() |
|
|
|
|
|
|
|
|
def load_config(config_path='config.yaml'): |
|
|
with open(config_path, 'r', encoding='utf-8') as f: |
|
|
return yaml.safe_load(f) |
|
|
|
|
|
config = load_config() |
|
|
|
|
|
|
|
|
def check_auth(username, password): |
|
|
app_config = config.get('app', {}) |
|
|
return username == app_config['api_username'] and password == app_config['api_password'] |
|
|
|
|
|
def authenticate(): |
|
|
return jsonify({'error': 'Authentication required'}), 401, {'WWW-Authenticate': 'Basic realm="Login Required"'} |
|
|
|
|
|
def requires_auth(f): |
|
|
@wraps(f) |
|
|
def decorated(*args, **kwargs): |
|
|
auth = request.authorization |
|
|
if not auth or not check_auth(auth.username, auth.password): |
|
|
return authenticate() |
|
|
return f(*args, **kwargs) |
|
|
return decorated |
|
|
|
|
|
def send_hook_with_retry(url: str, data: dict, max_retries: int = 3): |
|
|
session = requests.Session() |
|
|
retries = Retry(total=max_retries, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) |
|
|
session.mount('http://', HTTPAdapter(max_retries=retries)) |
|
|
session.mount('https://', HTTPAdapter(max_retries=retries)) |
|
|
try: |
|
|
response = session.post(url, json=data, timeout=10) |
|
|
response.raise_for_status() |
|
|
return response |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to send hook to {url}: {e}") |
|
|
pass |
|
|
|
|
|
def download_file(url: str, path: str): |
|
|
response = requests.get(url, stream=True, timeout=60) |
|
|
response.raise_for_status() |
|
|
with open(path, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 |
|
|
|
|
|
|
|
|
queue_manager = QueueManager(config['redis']) |
|
|
r2_uploader = R2Uploader(config['r2']) |
|
|
tts_service = TTSService(config) |
|
|
uvr5_service = UVR5Service(config) |
|
|
merger_service = MergerService(config) |
|
|
|
|
|
|
|
|
VIDEO_TEMP_DIR = 'data/temp_videos' |
|
|
os.makedirs(VIDEO_TEMP_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_worker(): |
|
|
""" |
|
|
Main Pipeline Worker: |
|
|
1. Fetch Task |
|
|
2. Download Video |
|
|
3. Run TTS Generation |
|
|
4. Run UVR5 Separation (get BGM) |
|
|
5. Merge (Video + TTS + BGM) |
|
|
6. Push to Upload Queue |
|
|
""" |
|
|
worker_logger = get_process_worker_logger() |
|
|
worker_logger.info("Main Process Worker started") |
|
|
|
|
|
while True: |
|
|
try: |
|
|
task = queue_manager.get_process_task() |
|
|
if not task: |
|
|
time.sleep(1) |
|
|
continue |
|
|
|
|
|
task_id = task.get('task_id') |
|
|
token = task_id_var.set(task_id) |
|
|
|
|
|
|
|
|
local_video_path = None |
|
|
bgm_path = None |
|
|
vocals_path = None |
|
|
task_tts_dir = None |
|
|
final_output_path = None |
|
|
success = False |
|
|
|
|
|
try: |
|
|
worker_logger.info("Processing started.") |
|
|
|
|
|
|
|
|
video_url = task['data'].get('video_url') |
|
|
if not video_url: |
|
|
raise ValueError("Missing video_url") |
|
|
|
|
|
local_video_path = os.path.join(VIDEO_TEMP_DIR, f"{task_id}_input.mp4") |
|
|
worker_logger.info(f"Downloading video from {video_url}") |
|
|
download_file(video_url, local_video_path) |
|
|
|
|
|
|
|
|
worker_logger.info("Running TTS...") |
|
|
tts_result = tts_service.process_task(task) |
|
|
segments = tts_result['segments'] |
|
|
task_tts_dir = tts_result['task_dir'] |
|
|
|
|
|
|
|
|
worker_logger.info("Running UVR5 Separation...") |
|
|
vocals_path, bgm_path = uvr5_service.process_audio(local_video_path, task_id) |
|
|
|
|
|
if not bgm_path or not os.path.exists(bgm_path): |
|
|
raise Exception("UVR5 failed to produce background music.") |
|
|
|
|
|
|
|
|
worker_logger.info("Merging Audio and Video...") |
|
|
final_output_path = os.path.join(VIDEO_TEMP_DIR, f"{task_id}_final.mp4") |
|
|
|
|
|
merger_service.merge_video( |
|
|
video_path=local_video_path, |
|
|
bgm_path=bgm_path, |
|
|
segments=segments, |
|
|
output_path=final_output_path |
|
|
) |
|
|
|
|
|
|
|
|
upload_task = { |
|
|
'task_id': task_id, |
|
|
'file_path': final_output_path, |
|
|
'hook_url': task['data'].get('hook_url'), |
|
|
} |
|
|
queue_manager.push_upload_task(upload_task) |
|
|
success = True |
|
|
|
|
|
except Exception as e: |
|
|
worker_logger.error(f"Task processing failed: {e}", exc_info=True) |
|
|
|
|
|
if 'hook_url' in task.get('data', {}): |
|
|
hook_url = task['data']['hook_url'] |
|
|
failure_payload = { |
|
|
"task_uuid": task_id, |
|
|
"status": "failed", |
|
|
"timestamp": int(time.time()), |
|
|
"error_message": str(e) |
|
|
} |
|
|
send_hook_with_retry(hook_url, failure_payload) |
|
|
|
|
|
finally: |
|
|
|
|
|
try: |
|
|
if local_video_path and os.path.exists(local_video_path): |
|
|
os.remove(local_video_path) |
|
|
|
|
|
if bgm_path and os.path.exists(bgm_path): |
|
|
os.remove(bgm_path) |
|
|
|
|
|
if vocals_path and os.path.exists(vocals_path): |
|
|
os.remove(vocals_path) |
|
|
|
|
|
if task_tts_dir and os.path.exists(task_tts_dir): |
|
|
shutil.rmtree(task_tts_dir) |
|
|
|
|
|
|
|
|
|
|
|
if not success and final_output_path and os.path.exists(final_output_path): |
|
|
os.remove(final_output_path) |
|
|
|
|
|
except Exception as cleanup_err: |
|
|
worker_logger.warning(f"Cleanup error: {cleanup_err}") |
|
|
|
|
|
task_id_var.reset(token) |
|
|
|
|
|
except Exception as e: |
|
|
worker_logger.error(f"Worker Loop Error: {e}") |
|
|
time.sleep(5) |
|
|
|
|
|
def upload_worker(): |
|
|
""" |
|
|
Upload Worker: |
|
|
1. Upload Final Video |
|
|
2. Send Success Callback |
|
|
3. Cleanup Final Video |
|
|
""" |
|
|
worker_logger = get_upload_worker_logger() |
|
|
worker_logger.info("Upload Worker started") |
|
|
|
|
|
while True: |
|
|
try: |
|
|
result = queue_manager.get_upload_task(timeout=5) |
|
|
if not result: |
|
|
continue |
|
|
|
|
|
task_id = result.get('task_id') |
|
|
token = task_id_var.set(task_id) |
|
|
|
|
|
file_path = result.get('file_path') |
|
|
hook_url = result.get('hook_url') |
|
|
|
|
|
try: |
|
|
worker_logger.info(f"Uploading result: {file_path}") |
|
|
|
|
|
file_url = None |
|
|
if file_path and os.path.exists(file_path): |
|
|
object_key = f"{task_id}.mp4" |
|
|
file_url = r2_uploader.upload_file(file_path, object_key=object_key) |
|
|
else: |
|
|
raise FileNotFoundError(f"File to upload not found: {file_path}") |
|
|
|
|
|
if hook_url: |
|
|
success_payload = { |
|
|
"task_uuid": task_id, |
|
|
"status": "success", |
|
|
"timestamp": int(time.time()), |
|
|
"result_url": file_url |
|
|
} |
|
|
worker_logger.info(f"Sending success callback to {hook_url}") |
|
|
send_hook_with_retry(hook_url, success_payload) |
|
|
|
|
|
except Exception as e: |
|
|
worker_logger.error(f"Upload failed: {e}", exc_info=True) |
|
|
if hook_url: |
|
|
failure_payload = { |
|
|
"task_uuid": task_id, |
|
|
"status": "failed", |
|
|
"timestamp": int(time.time()), |
|
|
"error_message": str(e) |
|
|
} |
|
|
send_hook_with_retry(hook_url, failure_payload) |
|
|
finally: |
|
|
|
|
|
if file_path and os.path.exists(file_path): |
|
|
try: |
|
|
os.remove(file_path) |
|
|
worker_logger.info(f"Removed final video: {file_path}") |
|
|
except Exception as e: |
|
|
worker_logger.warning(f"Failed to remove file: {e}") |
|
|
|
|
|
task_id_var.reset(token) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Upload Loop Error: {e}") |
|
|
time.sleep(5) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.before_request |
|
|
def before_request(): |
|
|
g.start_time = time.time() |
|
|
|
|
|
@app.after_request |
|
|
def after_request(response): |
|
|
if hasattr(g, 'start_time'): |
|
|
duration = time.time() - g.start_time |
|
|
RequestLogger.log_request(request, response, duration) |
|
|
return response |
|
|
|
|
|
@app.route('/dubbing/character', methods=['POST']) |
|
|
@requires_auth |
|
|
def generate(): |
|
|
try: |
|
|
data = request.json |
|
|
|
|
|
required = ['character_voice', 'content', 'hook_url', 'video_url'] |
|
|
for field in required: |
|
|
if not data.get(field): |
|
|
return jsonify({'error': f'Missing field: {field}'}), 400 |
|
|
|
|
|
priority = data.get('priority', 3) |
|
|
if priority not in range(1, 6): |
|
|
return jsonify({'error': 'Priority must be 1-5'}), 400 |
|
|
|
|
|
task_id = queue_manager.add_task(data, priority) |
|
|
logger.info(f"Created Task: {task_id}") |
|
|
|
|
|
return jsonify({ |
|
|
'task_uuid': task_id, |
|
|
'status': 'queued', |
|
|
'message': 'Task queued successfully' |
|
|
}), 201 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"API Error: {e}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/dubbing/character/tasks/<task_id>/cancel', methods=['DELETE']) |
|
|
@requires_auth |
|
|
def cancel_task(task_id: str): |
|
|
try: |
|
|
if queue_manager.delete_process_task(task_id): |
|
|
return jsonify({'message': 'Task canceled'}), 200 |
|
|
return jsonify({'message': 'Task not found or already processed'}), 404 |
|
|
except Exception as e: |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.errorhandler(500) |
|
|
def internal_error(error): |
|
|
logger.error(f"500 Error: {error}") |
|
|
return jsonify({'error': 'Internal server error'}), 500 |
|
|
|
|
|
def main(): |
|
|
logger.info("Starting Service...") |
|
|
|
|
|
|
|
|
os.makedirs(config['tts']['output_dir'], exist_ok=True) |
|
|
os.makedirs(config['tts']['voices_dir'], exist_ok=True) |
|
|
os.makedirs(VIDEO_TEMP_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
threading.Thread(target=process_worker, daemon=True).start() |
|
|
threading.Thread(target=upload_worker, daemon=True).start() |
|
|
|
|
|
app.run( |
|
|
host=config['app']['host'], |
|
|
port=config['app']['port'], |
|
|
debug=config['app']['debug'] |
|
|
) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|