| from fastapi import FastAPI, HTTPException, BackgroundTasks
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from pydantic import BaseModel
|
| from typing import List, Dict
|
| import os
|
| import uuid
|
| import aiohttp
|
| import asyncio
|
| import logging
|
| import tempfile
|
| import openai
|
| from pathlib import Path
|
| import subprocess
|
| import shutil
|
| import ssl
|
| import json
|
| from fastapi.staticfiles import StaticFiles
|
| from pydub import AudioSegment
|
| import shlex
|
| from ffmpeg import probe as ffmpeg_probe
|
| import time
|
|
|
|
|
| logging.basicConfig(level=logging.INFO)
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
|
| OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
|
| BASE_URL = os.getenv("BASE_URL", "http://localhost:8000")
|
|
|
|
|
| openai.api_key = OPENAI_API_KEY
|
| if OPENAI_BASE_URL:
|
| openai.api_base = OPENAI_BASE_URL
|
|
|
| app = FastAPI()
|
|
|
| app.mount("/storage", StaticFiles(directory="storage"), name="storage")
|
|
|
|
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=True,
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
|
|
| class ComicData(BaseModel):
|
| captions: List[str]
|
| speeches: List[str]
|
| panels: List[str]
|
|
|
|
|
| async def download_image(url, output_path):
|
| try:
|
|
|
| ssl_context = ssl.create_default_context()
|
| ssl_context.check_hostname = False
|
| ssl_context.verify_mode = ssl.CERT_NONE
|
|
|
| async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
|
| async with session.get(url) as response:
|
| if response.status == 200:
|
| with open(output_path, 'wb') as f:
|
| f.write(await response.read())
|
| return output_path
|
| else:
|
| logger.error(f"Failed to download image: {response.status}")
|
| return None
|
| except Exception as e:
|
| logger.error(f"Error downloading image: {e}")
|
| return None
|
|
|
|
|
| async def generate_speech(text, voice="alloy", output_path=None):
|
| try:
|
| if not output_path:
|
| output_path = f"{uuid.uuid4()}.mp3"
|
|
|
| response = openai.audio.speech.create(
|
| model="tts-1",
|
| voice=voice,
|
| input=text
|
| )
|
|
|
|
|
| with open(output_path, "wb") as f:
|
| f.write(response.content)
|
|
|
| return output_path
|
| except Exception as e:
|
| logger.error(f"Error generating speech: {e}")
|
| return None
|
|
|
|
|
| def get_audio_duration(audio_path):
|
| try:
|
| audio = AudioSegment.from_file(audio_path)
|
| return len(audio) / 1000.0
|
| except Exception as e:
|
| logger.error(f"Error getting audio duration: {e}")
|
| return 5.0
|
|
|
|
|
| ASS_STYLE_HEADER = """[Script Info]
|
| WrapStyle: 0
|
| ScaledBorderAndShadow: yes
|
| PlayResX: 1920
|
| PlayResY: 1080
|
| [V4+ Styles]
|
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
|
| Style: Caption,Noto Sans CJK SC,54,&H00FFFFFF,&H000000FF,&H00333333,&H00000000,0,0,0,0,100,100,0,0,1,2,3,2,100,100,50,0
|
| Style: Speech,Noto Sans CJK SC,48,&H00FFFFFF,&H000000FF,&H00333333,&H00000000,0,0,0,0,100,100,0,0,1,2,3,8,100,100,50,0
|
| [Events]
|
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
|
| """
|
| def smart_wrap(text, video_width=1920):
|
| """智能换行算法"""
|
| max_chars = int(video_width // 38)
|
| lines = []
|
| current_line = []
|
| current_len = 0
|
|
|
| for char in text:
|
| char_len = 2 if ord(char) > 255 else 1
|
| if current_len + char_len > max_chars * 2:
|
| lines.append(''.join(current_line))
|
| current_line = [char]
|
| current_len = char_len
|
| else:
|
| current_line.append(char)
|
| current_len += char_len
|
| lines.append(''.join(current_line))
|
|
|
| return r'\N'.join(lines)
|
|
|
|
|
| def create_caption_subtitle_file(project_dir, captions, panel_start_times, panel_durations):
|
| try:
|
| subtitle_file = os.path.join(project_dir, "captions.ass")
|
|
|
| with open(subtitle_file, "w", encoding="utf-8") as f:
|
| f.write(ASS_STYLE_HEADER)
|
| for i, (caption, start, duration) in enumerate(zip(captions, panel_start_times, panel_durations)):
|
| wrapped_text = smart_wrap(caption)
|
| f.write(
|
| f"Dialogue: 0,{format_time(start)},{format_time(start + duration)},"
|
| f"Caption,,0,0,0,,{wrapped_text}\n"
|
| )
|
|
|
| return subtitle_file
|
| except Exception as e:
|
| logger.error(f"Error creating caption subtitle file: {e}")
|
| return None
|
|
|
|
|
| def create_speech_subtitle_file(project_dir, speeches, panel_start_times, panel_durations):
|
| try:
|
| subtitle_file = os.path.join(project_dir, "speeches.ass")
|
|
|
| with open(subtitle_file, "w", encoding="utf-8") as f:
|
| f.write(ASS_STYLE_HEADER)
|
| for i, (speech, start, duration) in enumerate(zip(speeches, panel_start_times, panel_durations)):
|
| wrapped_text = smart_wrap(speech)
|
| f.write(
|
| f"Dialogue: 0,{format_time(start)},{format_time(start + duration)},"
|
| f"Speech,,0,0,0,,{wrapped_text}\n"
|
| )
|
|
|
| return subtitle_file
|
| except Exception as e:
|
| logger.error(f"Error creating speech subtitle file: {e}")
|
| return None
|
|
|
|
|
| def format_time(seconds):
|
| hours = int(seconds / 3600)
|
| minutes = int((seconds % 3600) / 60)
|
| secs = int(seconds % 60)
|
| centisecs = int((seconds - int(seconds)) * 100)
|
| return f"{hours}:{minutes:02}:{secs:02}.{centisecs:02}"
|
|
|
|
|
| async def create_audio_file(project_dir, captions, speeches):
|
| try:
|
| audio_parts = []
|
| audio_durations = {}
|
| panel_start_times = [0]
|
| current_time = 0
|
| panel_durations = []
|
|
|
|
|
| for i, (caption, speech) in enumerate(zip(captions, speeches)):
|
| panel_audio_parts = []
|
| panel_duration = 0
|
|
|
|
|
| if caption:
|
| caption_audio = os.path.join(project_dir, f"caption_{i}.mp3")
|
| result = await generate_speech(caption, "zh-CN-YunjianNeural", caption_audio)
|
| if result:
|
| duration = get_audio_duration(caption_audio)
|
| audio_durations[f"caption_{i}"] = duration
|
| panel_audio_parts.append(caption_audio)
|
| panel_duration += duration
|
|
|
|
|
| if speech:
|
| speech_audio = os.path.join(project_dir, f"speech_{i}.mp3")
|
| result = await generate_speech(speech, "zh-CN-XiaoxiaoNeural", speech_audio)
|
| if result:
|
| duration = get_audio_duration(speech_audio)
|
| audio_durations[f"speech_{i}"] = duration
|
| panel_audio_parts.append(speech_audio)
|
| panel_duration += duration
|
|
|
|
|
| if panel_duration == 0:
|
| panel_duration = 5.0
|
|
|
| panel_durations.append(panel_duration)
|
|
|
|
|
| if panel_audio_parts:
|
| panel_combined = os.path.join(project_dir, f"panel_{i}_combined.mp3")
|
| combined = AudioSegment.empty()
|
|
|
| for audio_path in panel_audio_parts:
|
| segment = AudioSegment.from_file(audio_path)
|
| combined += segment
|
|
|
| combined.export(panel_combined, format="mp3")
|
| audio_parts.append(panel_combined)
|
|
|
|
|
| current_time += panel_duration
|
| if i < len(captions) - 1:
|
| panel_start_times.append(current_time)
|
|
|
| if not audio_parts:
|
| logger.error("No audio parts generated")
|
| return None, {}, [], []
|
|
|
|
|
| combined_audio = os.path.join(project_dir, "combined_audio.mp3")
|
| final_combined = AudioSegment.empty()
|
|
|
| for audio_path in audio_parts:
|
| segment = AudioSegment.from_file(audio_path)
|
| final_combined += segment
|
|
|
| final_combined.export(combined_audio, format="mp3")
|
|
|
|
|
| durations_file = os.path.join(project_dir, "audio_durations.json")
|
| with open(durations_file, "w") as f:
|
| json.dump(audio_durations, f)
|
|
|
|
|
| panel_times_file = os.path.join(project_dir, "panel_times.json")
|
| with open(panel_times_file, "w") as f:
|
| json.dump({"start_times": panel_start_times, "durations": panel_durations}, f)
|
|
|
| return combined_audio, audio_durations, panel_start_times, panel_durations
|
| except Exception as e:
|
| logger.error(f"Error creating audio file: {e}")
|
| import traceback
|
| logger.error(traceback.format_exc())
|
| return None, {}, [], []
|
|
|
| def get_video_dimensions(video_path):
|
| try:
|
| result = subprocess.run(
|
| ["ffprobe", "-v", "error", "-select_streams", "v:0",
|
| "-show_entries", "stream=width,height", "-of", "json", video_path],
|
| capture_output=True,
|
| text=True
|
| )
|
| data = json.loads(result.stdout)
|
| return (int(data['streams'][0]['width']),
|
| int(data['streams'][0]['height']))
|
| except Exception as e:
|
| logger.warning(f"Video dimension detection failed: {e}")
|
| return (1920, 1080)
|
|
|
| def process_sub_path(path):
|
| """深度处理FFmpeg路径转义"""
|
|
|
| processed = Path(path).as_posix()
|
|
|
| processed = processed.translate(str.maketrans({
|
| ':': r'\:',
|
| "'": r"\\\'",
|
| ',': r'\\,',
|
| '[': r'\\[',
|
| ']': r'\\]',
|
| ' ': r'\ '
|
| }))
|
| return f"'{processed}'"
|
|
|
|
|
| def create_video(project_dir, image_paths, caption_subtitle_file, speech_subtitle_file,
|
| audio_file, output_video, audio_durations, panel_start_times, panel_durations):
|
| try:
|
|
|
| frames_list = os.path.join(project_dir, "frames.txt")
|
| with open(frames_list, "w") as f:
|
| for i, (img, duration) in enumerate(zip(image_paths, panel_durations)):
|
| f.write(f"file '{img.replace(os.sep, '/')}'\n")
|
| f.write(f"duration {duration}\n")
|
|
|
|
|
| f.write(f"file '{image_paths[-1].replace(os.sep, '/')}'\n")
|
|
|
|
|
| temp_video = os.path.join(project_dir, "temp_video.mp4")
|
| cmd1 = [
|
| "ffmpeg", "-y",
|
| "-f", "concat", "-safe", "0", "-i", frames_list,
|
| "-i", audio_file,
|
| "-c:v", "libx264", "-pix_fmt", "yuv420p",
|
| "-c:a", "aac", "-strict", "experimental",
|
| "-vsync", "vfr",
|
| "-async", "1",
|
| temp_video
|
| ]
|
| subprocess.run(cmd1, check=True)
|
|
|
|
|
| video_width, video_height = get_video_dimensions(temp_video)
|
| base_fontsize = max(24, video_width // 50)
|
|
|
| combined_filter = (
|
| f"subtitles={process_sub_path(caption_subtitle_file)}:"
|
| "force_style='"
|
|
|
| "Fontsize={},"
|
| "Alignment=2,"
|
| "MarginV={},"
|
| "'".format(
|
| int(base_fontsize*0.6),
|
| video_height//100
|
| ),
|
| f"subtitles={process_sub_path(speech_subtitle_file)}:"
|
| "force_style='"
|
|
|
| "Fontsize={},"
|
| "Alignment=8,"
|
| "MarginV={},"
|
| "'".format(
|
| int(base_fontsize*0.5),
|
| video_height//10
|
| )
|
| )
|
| filter_chain = ",".join(combined_filter)
|
|
|
| cmd_combined = [
|
| "ffmpeg", "-y",
|
| "-i", temp_video,
|
| "-vf", filter_chain,
|
| "-c:a", "copy",
|
| "-c:v", "libx264",
|
| "-preset", "fast",
|
| "-movflags", "+faststart",
|
| output_video
|
| ]
|
|
|
| start_time = time.time()
|
| subprocess.run(cmd_combined, check=True)
|
| logger.info(f"Video processed in {time.time()-start_time:.2f}s")
|
|
|
| os.remove(temp_video)
|
| return output_video
|
| except subprocess.CalledProcessError as e:
|
| logger.error(f"FFmpeg failed with cmd: {' '.join(e.cmd)}")
|
| logger.error(f"FFmpeg stderr: {e.stderr}")
|
| return None
|
| except Exception as e:
|
| logger.error(f"Unexpected error: {str(e)}")
|
| return None
|
|
|
|
|
| def upload_to_local_storage(local_path, relative_path):
|
| try:
|
|
|
| storage_dir = os.path.abspath("storage")
|
| os.makedirs(storage_dir, exist_ok=True)
|
|
|
|
|
| target_dir = os.path.dirname(os.path.join(storage_dir, relative_path))
|
| os.makedirs(target_dir, exist_ok=True)
|
|
|
| target_path = os.path.join(storage_dir, relative_path)
|
|
|
|
|
| shutil.copy2(local_path, target_path)
|
|
|
|
|
| relative_url = f"/storage/{relative_path.replace(os.sep, '/')}"
|
| full_url = f"{BASE_URL}{relative_url}"
|
| return full_url
|
| except Exception as e:
|
| logger.error(f"Error copying to local storage: {e}")
|
| import traceback
|
| logger.error(traceback.format_exc())
|
| return None
|
|
|
| @app.post("/api/generate-video")
|
| async def generate_video(comic_data: ComicData, background_tasks: BackgroundTasks):
|
|
|
| project_id = str(uuid.uuid4())
|
|
|
| project_dir = os.path.abspath(os.path.join("temp", project_id))
|
| os.makedirs(project_dir, exist_ok=True)
|
|
|
| logger.info(f"Created project directory: {project_dir}")
|
|
|
| try:
|
|
|
| image_paths = []
|
| for i, panel_url in enumerate(comic_data.panels):
|
| output_path = os.path.join(project_dir, f"panel_{i}.jpg")
|
| result = await download_image(panel_url, output_path)
|
| if result:
|
| image_paths.append(result)
|
|
|
| if not image_paths:
|
| raise HTTPException(status_code=500, detail="Failed to download images")
|
|
|
| logger.info(f"Downloaded {len(image_paths)} images")
|
|
|
|
|
| audio_file, audio_durations, panel_start_times, panel_durations = await create_audio_file(
|
| project_dir, comic_data.captions, comic_data.speeches
|
| )
|
| if not audio_file:
|
| raise HTTPException(status_code=500, detail="Failed to create audio file")
|
|
|
| logger.info(f"Created audio file: {audio_file}")
|
|
|
|
|
| caption_subtitle_file = create_caption_subtitle_file(
|
| project_dir, comic_data.captions, panel_start_times, panel_durations
|
| )
|
| if not caption_subtitle_file:
|
| raise HTTPException(status_code=500, detail="Failed to create caption subtitle file")
|
|
|
| speech_subtitle_file = create_speech_subtitle_file(
|
| project_dir, comic_data.speeches, panel_start_times, panel_durations
|
| )
|
| if not speech_subtitle_file:
|
| raise HTTPException(status_code=500, detail="Failed to create speech subtitle file")
|
|
|
| logger.info(f"Created subtitle files: {caption_subtitle_file}, {speech_subtitle_file}")
|
|
|
|
|
| output_video = os.path.join(project_dir, "output.mp4")
|
| result = create_video(
|
| project_dir, image_paths, caption_subtitle_file, speech_subtitle_file,
|
| audio_file, output_video, audio_durations, panel_start_times, panel_durations
|
| )
|
| if not result:
|
| raise HTTPException(status_code=500, detail="Failed to create video")
|
|
|
| logger.info(f"Created video: {output_video}")
|
|
|
|
|
| video_url = upload_to_local_storage(output_video, f"{project_id}/video.mp4")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| shutil.rmtree(project_dir, ignore_errors=True)
|
|
|
| return {
|
| "videoUrl": video_url,
|
|
|
|
|
|
|
| "projectId": project_id
|
| }
|
| except Exception as e:
|
|
|
| shutil.rmtree(project_dir, ignore_errors=True)
|
| logger.error(f"Error generating video: {e}")
|
| import traceback
|
| logger.error(traceback.format_exc())
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
| @app.get("/")
|
| async def health_check():
|
| return {"status": "ok"}
|
|
|
| if __name__ == "__main__":
|
| import uvicorn
|
| uvicorn.run(app, host="0.0.0.0", port=8000) |