test / video.py
wynai's picture
Upload 5 files
3a02865 verified
from flask import Blueprint, request, jsonify, send_file
import os
import threading
import time
import uuid
from src.mock_scraper import MockVideoScraper
import logging
# Cấu hình logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
video_bp = Blueprint('video', __name__)
# Dictionary để lưu trạng thái các job
video_jobs = {}
class VideoJob:
def __init__(self, job_id, prompt):
self.job_id = job_id
self.prompt = prompt
self.status = "pending" # pending, processing, completed, failed
self.video_path = None
self.error_message = None
self.created_at = time.time()
def generate_video_async(job_id, prompt):
"""Tạo video bất đồng bộ"""
job = video_jobs[job_id]
job.status = "processing"
scraper = None
try:
logger.info(f"Bắt đầu tạo video cho job {job_id} với prompt: {prompt}")
scraper = MockVideoScraper(headless=True)
video_path = scraper.generate_video_simple(prompt, timeout=300)
if video_path and os.path.exists(video_path):
job.video_path = video_path
job.status = "completed"
logger.info(f"Job {job_id} hoàn thành thành công")
else:
job.status = "failed"
job.error_message = "Không thể tạo video"
logger.error(f"Job {job_id} thất bại: Không thể tạo video")
except Exception as e:
job.status = "failed"
job.error_message = str(e)
logger.error(f"Job {job_id} thất bại với lỗi: {e}")
finally:
if scraper:
scraper.close()
@video_bp.route('/generate/<path:prompt>', methods=['GET'])
def generate_video_endpoint(prompt):
"""
Endpoint để tạo video từ prompt
URL: /api/video/generate/your_prompt_here
"""
try:
# Tạo job ID duy nhất
job_id = str(uuid.uuid4())
# Tạo job mới
job = VideoJob(job_id, prompt)
video_jobs[job_id] = job
# Bắt đầu tạo video trong thread riêng
thread = threading.Thread(target=generate_video_async, args=(job_id, prompt))
thread.daemon = True
thread.start()
return jsonify({
"success": True,
"job_id": job_id,
"message": "Video generation started",
"status_url": f"/api/video/status/{job_id}",
"download_url": f"/api/video/download/{job_id}"
}), 202
except Exception as e:
logger.error(f"Lỗi khi tạo job: {e}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@video_bp.route('/status/<job_id>', methods=['GET'])
def get_job_status(job_id):
"""Kiểm tra trạng thái job"""
if job_id not in video_jobs:
return jsonify({
"success": False,
"error": "Job not found"
}), 404
job = video_jobs[job_id]
response = {
"success": True,
"job_id": job_id,
"status": job.status,
"prompt": job.prompt,
"created_at": job.created_at
}
if job.status == "failed" and job.error_message:
response["error"] = job.error_message
if job.status == "completed" and job.video_path:
response["download_url"] = f"/api/video/download/{job_id}"
response["video_ready"] = True
return jsonify(response)
@video_bp.route('/download/<job_id>', methods=['GET'])
def download_video(job_id):
"""Tải xuống video"""
if job_id not in video_jobs:
return jsonify({
"success": False,
"error": "Job not found"
}), 404
job = video_jobs[job_id]
if job.status != "completed" or not job.video_path:
return jsonify({
"success": False,
"error": "Video not ready",
"status": job.status
}), 400
if not os.path.exists(job.video_path):
return jsonify({
"success": False,
"error": "Video file not found"
}), 404
try:
return send_file(
job.video_path,
as_attachment=True,
download_name=f"video_{job_id}.mp4",
mimetype='video/mp4'
)
except Exception as e:
logger.error(f"Lỗi khi tải xuống video: {e}")
return jsonify({
"success": False,
"error": "Failed to download video"
}), 500
@video_bp.route('/jobs', methods=['GET'])
def list_jobs():
"""Liệt kê tất cả jobs"""
jobs_list = []
for job_id, job in video_jobs.items():
job_info = {
"job_id": job_id,
"prompt": job.prompt,
"status": job.status,
"created_at": job.created_at
}
if job.status == "completed":
job_info["download_url"] = f"/api/video/download/{job_id}"
jobs_list.append(job_info)
return jsonify({
"success": True,
"jobs": jobs_list,
"total": len(jobs_list)
})
@video_bp.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
"success": True,
"message": "Video generation API is running",
"active_jobs": len([j for j in video_jobs.values() if j.status == "processing"]),
"total_jobs": len(video_jobs)
})
# Cleanup old jobs (older than 1 hour)
def cleanup_old_jobs():
"""Dọn dẹp các job cũ"""
current_time = time.time()
to_remove = []
for job_id, job in video_jobs.items():
if current_time - job.created_at > 3600: # 1 hour
to_remove.append(job_id)
# Xóa file video nếu có
if job.video_path and os.path.exists(job.video_path):
try:
os.remove(job.video_path)
except:
pass
for job_id in to_remove:
del video_jobs[job_id]
logger.info(f"Cleaned up {len(to_remove)} old jobs")
# Chạy cleanup mỗi 30 phút
import atexit
cleanup_timer = None
def start_cleanup_timer():
global cleanup_timer
cleanup_timer = threading.Timer(1800, cleanup_old_jobs) # 30 minutes
cleanup_timer.daemon = True
cleanup_timer.start()
def stop_cleanup_timer():
global cleanup_timer
if cleanup_timer:
cleanup_timer.cancel()
atexit.register(stop_cleanup_timer)
start_cleanup_timer()