|
|
import ffmpeg |
|
|
import os |
|
|
import time |
|
|
from PIL import Image |
|
|
import re |
|
|
import tempfile |
|
|
import shutil |
|
|
import threading |
|
|
import requests |
|
|
import json |
|
|
from fastapi import FastAPI, File, Form, UploadFile, HTTPException, Request |
|
|
from fastapi.responses import FileResponse, JSONResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
import uvicorn |
|
|
import asyncio |
|
|
from threading import Thread |
|
|
import nest_asyncio |
|
|
|
|
|
|
|
|
nest_asyncio.apply() |
|
|
|
|
|
app = FastAPI(title="Video Conversion API") |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
supported_formats = ['ASF', 'AVI', 'FLV', 'M2TS', 'M4V', 'MKV', 'MOV', 'MP4', 'MPEG', 'MPG', 'MTS', 'TS', 'VOB', 'WEBM', 'WMV'] |
|
|
audio_formats = ['AAC', 'AIFF', 'ALAC', 'CAF', 'FLAC', 'M4A', 'MP3', 'OGG', 'OPUS', 'SPX', 'TTA', 'WAV', 'WMA', 'WV'] |
|
|
gif_formats = ['GIF'] |
|
|
image_formats = ['BMP', 'DIB', 'EPS', 'GIF', 'ICNS', 'ICO', 'IM', 'JPEG', 'JPEG2000', 'MPO', 'MSP', 'PALM', 'PCX', 'PDF', 'PNG', 'PPM', 'SGI', 'SPIDER', 'TGA', 'TIFF', 'WEBP', 'WMX', 'XBM'] |
|
|
|
|
|
|
|
|
CACHE_DIR = tempfile.mkdtemp() |
|
|
|
|
|
|
|
|
AUDIO_CODECS = { |
|
|
'MP3': {'acodec': 'libmp3lame', 'audio_bitrate': '192k'}, |
|
|
'AAC': {'acodec': 'aac', 'audio_bitrate': '192k'}, |
|
|
'WAV': {'acodec': 'pcm_s16le'}, |
|
|
'FLAC': {'acodec': 'flac'}, |
|
|
'OGG': {'acodec': 'libvorbis', 'audio_bitrate': '192k'}, |
|
|
'M4A': {'acodec': 'aac', 'audio_bitrate': '192k'}, |
|
|
'ALAC': {'acodec': 'alac'}, |
|
|
'WMA': {'acodec': 'wmav2', 'audio_bitrate': '192k'}, |
|
|
'AIFF': {'acodec': 'pcm_s16be'}, |
|
|
'OPUS': {'acodec': 'libopus', 'audio_bitrate': '128k'}, |
|
|
'CAF': {'acodec': 'alac'}, |
|
|
'SPX': {'acodec': 'libspeex', 'audio_bitrate': '32k'}, |
|
|
'WV': {'acodec': 'wavpack'}, |
|
|
} |
|
|
|
|
|
|
|
|
VIDEO_CODECS = { |
|
|
'MP4': {'vcodec': 'libx264', 'acodec': 'aac', 'preset': 'medium', 'crf': '23'}, |
|
|
'AVI': {'vcodec': 'libxvid', 'acodec': 'libmp3lame'}, |
|
|
'MOV': {'vcodec': 'libx264', 'acodec': 'aac'}, |
|
|
'MKV': {'vcodec': 'libx264', 'acodec': 'aac'}, |
|
|
'WEBM': {'vcodec': 'libvpx-vp9', 'acodec': 'libopus'}, |
|
|
'FLV': {'vcodec': 'libx264', 'acodec': 'aac'}, |
|
|
'WMV': {'vcodec': 'wmv2', 'acodec': 'wmav2'}, |
|
|
'M4V': {'vcodec': 'libx264', 'acodec': 'aac'}, |
|
|
'MPG': {'vcodec': 'mpeg2video', 'acodec': 'mp2'}, |
|
|
'MPEG': {'vcodec': 'mpeg2video', 'acodec': 'mp2'}, |
|
|
'VOB': {'vcodec': 'mpeg2video', 'acodec': 'mp2'}, |
|
|
'ASF': {'vcodec': 'wmv2', 'acodec': 'wmav2'}, |
|
|
'TS': {'vcodec': 'libx264', 'acodec': 'aac'}, |
|
|
'M2TS': {'vcodec': 'libx264', 'acodec': 'aac'}, |
|
|
'MTS': {'vcodec': 'libx264', 'acodec': 'aac'}, |
|
|
} |
|
|
|
|
|
def delete_temp_dir(directory, delay=900): |
|
|
"""Delete temporary directory after delay""" |
|
|
def cleanup(): |
|
|
try: |
|
|
if os.path.exists(directory): |
|
|
shutil.rmtree(directory) |
|
|
print(f"Cleaned up temporary directory: {directory}") |
|
|
except Exception as e: |
|
|
print(f"Error cleaning up directory {directory}: {e}") |
|
|
|
|
|
timer = threading.Timer(delay, cleanup) |
|
|
timer.daemon = True |
|
|
timer.start() |
|
|
|
|
|
|
|
|
delete_temp_dir(CACHE_DIR, delay=900) |
|
|
|
|
|
def sanitize_filename(filename): |
|
|
"""Sanitize filename by removing special characters and spaces.""" |
|
|
return re.sub(r'[^a-zA-Z0-9_.-]', '_', filename) |
|
|
|
|
|
def get_video_duration(video_path): |
|
|
"""Get video duration in seconds using ffmpeg.""" |
|
|
try: |
|
|
probe = ffmpeg.probe(video_path, v='error', select_streams='v:0', show_entries='stream=duration') |
|
|
return float(probe['streams'][0]['duration']) |
|
|
except: |
|
|
return 60 |
|
|
|
|
|
def convert_video(video_path, target_format, conversion_type, time_in_seconds=None): |
|
|
try: |
|
|
base_name = os.path.splitext(os.path.basename(video_path))[0] |
|
|
sanitized_base_name = sanitize_filename(base_name) |
|
|
|
|
|
output_dir = os.path.join(CACHE_DIR, "outputs") |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
if conversion_type == 'Video to Video': |
|
|
output_file = os.path.join(output_dir, f"converted_{sanitized_base_name}.{target_format.lower()}") |
|
|
|
|
|
|
|
|
codec_settings = VIDEO_CODECS.get(target_format.upper(), {}) |
|
|
|
|
|
if codec_settings: |
|
|
|
|
|
input_stream = ffmpeg.input(video_path) |
|
|
output_kwargs = {} |
|
|
|
|
|
|
|
|
if 'vcodec' in codec_settings: |
|
|
output_kwargs['vcodec'] = codec_settings['vcodec'] |
|
|
|
|
|
|
|
|
if 'acodec' in codec_settings: |
|
|
output_kwargs['acodec'] = codec_settings['acodec'] |
|
|
|
|
|
|
|
|
for key, value in codec_settings.items(): |
|
|
if key not in ['vcodec', 'acodec']: |
|
|
output_kwargs[key] = value |
|
|
|
|
|
ffmpeg.output(input_stream, output_file, **output_kwargs).overwrite_output().run(quiet=True) |
|
|
else: |
|
|
|
|
|
ffmpeg.input(video_path).output(output_file).overwrite_output().run(quiet=True) |
|
|
|
|
|
return output_file |
|
|
|
|
|
elif conversion_type == 'Video to Audio': |
|
|
output_file = os.path.join(output_dir, f"audio_{sanitized_base_name}.{target_format.lower()}") |
|
|
|
|
|
|
|
|
codec_settings = AUDIO_CODECS.get(target_format.upper(), {}) |
|
|
|
|
|
if codec_settings: |
|
|
|
|
|
input_stream = ffmpeg.input(video_path) |
|
|
output_kwargs = { |
|
|
'vn': None |
|
|
} |
|
|
|
|
|
|
|
|
if 'acodec' in codec_settings: |
|
|
output_kwargs['acodec'] = codec_settings['acodec'] |
|
|
|
|
|
|
|
|
if 'audio_bitrate' in codec_settings: |
|
|
output_kwargs['audio_bitrate'] = codec_settings['audio_bitrate'] |
|
|
|
|
|
|
|
|
for key, value in codec_settings.items(): |
|
|
if key not in ['acodec', 'audio_bitrate']: |
|
|
output_kwargs[key] = value |
|
|
|
|
|
ffmpeg.output(input_stream, output_file, **output_kwargs).overwrite_output().run(quiet=True) |
|
|
else: |
|
|
|
|
|
ffmpeg.input(video_path).output(output_file, vn=None).overwrite_output().run(quiet=True) |
|
|
|
|
|
return output_file |
|
|
|
|
|
elif conversion_type == 'Video to GIF': |
|
|
output_file = os.path.join(output_dir, f"gif_{sanitized_base_name}.gif") |
|
|
|
|
|
( |
|
|
ffmpeg |
|
|
.input(video_path) |
|
|
.output(output_file, |
|
|
vf="fps=15,scale=480:-1:flags=lanczos,palettegen=stats_mode=diff", |
|
|
loop=0) |
|
|
.overwrite_output() |
|
|
.run(quiet=True) |
|
|
) |
|
|
return output_file |
|
|
|
|
|
elif conversion_type == 'Video to Image': |
|
|
if time_in_seconds is None: |
|
|
time_in_seconds = 0 |
|
|
|
|
|
|
|
|
temp_png_file = os.path.join(output_dir, f"temp_image_{sanitized_base_name}_{time_in_seconds}s.png") |
|
|
|
|
|
|
|
|
( |
|
|
ffmpeg |
|
|
.input(video_path, ss=time_in_seconds) |
|
|
.output(temp_png_file, vframes=1, **{'qscale:v': '1'}) |
|
|
.overwrite_output() |
|
|
.run(quiet=True, capture_stdout=True, capture_stderr=True) |
|
|
) |
|
|
|
|
|
|
|
|
if target_format.upper() == 'PNG': |
|
|
return temp_png_file |
|
|
|
|
|
|
|
|
output_file = os.path.join(output_dir, f"image_{sanitized_base_name}_{time_in_seconds}s.{target_format.lower()}") |
|
|
|
|
|
try: |
|
|
|
|
|
with Image.open(temp_png_file) as img: |
|
|
|
|
|
if target_format.upper() == 'PDF': |
|
|
|
|
|
if img.mode != 'RGB': |
|
|
img = img.convert('RGB') |
|
|
img.save(output_file, format='PDF', save_all=True, quality=95) |
|
|
elif target_format.upper() in ['JPEG', 'JPG']: |
|
|
|
|
|
if img.mode != 'RGB': |
|
|
img = img.convert('RGB') |
|
|
img.save(output_file, format='JPEG', quality=95, optimize=True) |
|
|
elif target_format.upper() == 'TIFF': |
|
|
img.save(output_file, format='TIFF', compression='tiff_lzw') |
|
|
elif target_format.upper() == 'WEBP': |
|
|
img.save(output_file, format='WEBP', quality=95, method=6) |
|
|
elif target_format.upper() == 'BMP': |
|
|
|
|
|
if img.mode != 'RGB': |
|
|
img = img.convert('RGB') |
|
|
img.save(output_file, format='BMP') |
|
|
else: |
|
|
|
|
|
try: |
|
|
img.save(output_file, format=target_format.upper()) |
|
|
except: |
|
|
|
|
|
if img.mode != 'RGB': |
|
|
img = img.convert('RGB') |
|
|
img.save(output_file, format=target_format.upper()) |
|
|
|
|
|
|
|
|
if os.path.exists(temp_png_file): |
|
|
os.remove(temp_png_file) |
|
|
return output_file |
|
|
|
|
|
except Exception as pil_error: |
|
|
|
|
|
print(f"PIL conversion error: {pil_error}, returning PNG instead") |
|
|
return temp_png_file |
|
|
|
|
|
except ffmpeg.Error as e: |
|
|
error_message = f"FFmpeg error: {e.stderr.decode() if e.stderr else str(e)}" |
|
|
raise Exception(error_message) |
|
|
except Exception as e: |
|
|
raise Exception(f"Conversion error: {str(e)}") |
|
|
|
|
|
|
|
|
@app.post("/api/convert") |
|
|
async def api_convert( |
|
|
file: UploadFile = File(...), |
|
|
conversion_type: str = Form("Video to Video"), |
|
|
target_format: str = Form("MP4"), |
|
|
time_in_seconds: int = Form(0) |
|
|
): |
|
|
try: |
|
|
print(f"Received conversion request:") |
|
|
print(f"- File: {file.filename} ({file.content_type})") |
|
|
print(f"- Conversion type: {conversion_type}") |
|
|
print(f"- Target format: {target_format}") |
|
|
print(f"- Time: {time_in_seconds}") |
|
|
|
|
|
|
|
|
if not file.filename: |
|
|
raise HTTPException(status_code=400, detail="No file provided") |
|
|
|
|
|
|
|
|
if conversion_type == 'Video to Audio' and target_format.upper() not in [f.upper() for f in audio_formats]: |
|
|
raise HTTPException(status_code=400, detail=f"Unsupported audio format: {target_format}") |
|
|
elif conversion_type == 'Video to Video' and target_format.upper() not in [f.upper() for f in supported_formats]: |
|
|
raise HTTPException(status_code=400, detail=f"Unsupported video format: {target_format}") |
|
|
elif conversion_type == 'Video to Image' and target_format.upper() not in [f.upper() for f in image_formats]: |
|
|
raise HTTPException(status_code=400, detail=f"Unsupported image format: {target_format}") |
|
|
|
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
file_path = os.path.join(temp_dir, file.filename) |
|
|
|
|
|
print(f"Saving file to: {file_path}") |
|
|
|
|
|
with open(file_path, "wb") as f: |
|
|
content = await file.read() |
|
|
f.write(content) |
|
|
print(f"File saved, size: {len(content)} bytes") |
|
|
|
|
|
|
|
|
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0: |
|
|
raise HTTPException(status_code=400, detail="Failed to save uploaded file") |
|
|
|
|
|
|
|
|
print(f"Starting conversion...") |
|
|
output_file = convert_video(file_path, target_format, conversion_type, time_in_seconds) |
|
|
|
|
|
|
|
|
if not os.path.exists(output_file): |
|
|
raise HTTPException(status_code=500, detail="Conversion failed - output file not created") |
|
|
|
|
|
print(f"Conversion successful. Output file: {output_file} (size: {os.path.getsize(output_file)} bytes)") |
|
|
|
|
|
|
|
|
content_type = "application/octet-stream" |
|
|
if conversion_type == 'Video to Video': |
|
|
content_type = f"video/{target_format.lower()}" |
|
|
elif conversion_type == 'Video to Audio': |
|
|
if target_format.upper() == 'OGG': |
|
|
content_type = "audio/ogg" |
|
|
elif target_format.upper() == 'M4A': |
|
|
content_type = "audio/mp4" |
|
|
else: |
|
|
content_type = f"audio/{target_format.lower()}" |
|
|
elif conversion_type == 'Video to GIF': |
|
|
content_type = "image/gif" |
|
|
elif conversion_type == 'Video to Image': |
|
|
|
|
|
if target_format.upper() == 'PDF': |
|
|
content_type = "application/pdf" |
|
|
elif target_format.upper() in ['JPEG', 'JPG']: |
|
|
content_type = "image/jpeg" |
|
|
elif target_format.upper() == 'PNG': |
|
|
content_type = "image/png" |
|
|
elif target_format.upper() == 'TIFF': |
|
|
content_type = "image/tiff" |
|
|
elif target_format.upper() == 'WEBP': |
|
|
content_type = "image/webp" |
|
|
else: |
|
|
content_type = f"image/{target_format.lower()}" |
|
|
|
|
|
|
|
|
try: |
|
|
os.unlink(file_path) |
|
|
os.rmdir(temp_dir) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return FileResponse( |
|
|
output_file, |
|
|
media_type=content_type, |
|
|
filename=os.path.basename(output_file), |
|
|
headers={ |
|
|
"Content-Disposition": f"attachment; filename=\"{os.path.basename(output_file)}\"", |
|
|
"Cache-Control": "no-cache" |
|
|
} |
|
|
) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
print(f"Conversion error: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=f"Conversion failed: {str(e)}") |
|
|
|
|
|
@app.get("/api/formats") |
|
|
async def get_formats(): |
|
|
return { |
|
|
"video_formats": supported_formats, |
|
|
"audio_formats": audio_formats, |
|
|
"image_formats": image_formats, |
|
|
"gif_formats": gif_formats, |
|
|
"supported_audio_codecs": list(AUDIO_CODECS.keys()), |
|
|
"supported_video_codecs": list(VIDEO_CODECS.keys()) |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/_stcore/health") |
|
|
async def stcore_health(): |
|
|
return {"status": "healthy"} |
|
|
|
|
|
@app.get("/_stcore/host-config") |
|
|
async def stcore_host_config(): |
|
|
return { |
|
|
"version": "1.0", |
|
|
"config": { |
|
|
"enableCors": False, |
|
|
"enableXsrfProtection": False |
|
|
} |
|
|
} |
|
|
|
|
|
@app.get("/health") |
|
|
async def health(): |
|
|
return {"status": "healthy", "message": "Video Conversion API is running"} |
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
return {"message": "Video Conversion API is running", "docs": "/docs", "status": "healthy"} |
|
|
|
|
|
|
|
|
def main(): |
|
|
print("Starting Video Conversion API...") |
|
|
print(f"Cache directory: {CACHE_DIR}") |
|
|
print(f"Supported video formats: {supported_formats}") |
|
|
print(f"Supported audio formats: {audio_formats}") |
|
|
|
|
|
|
|
|
port = int(os.environ.get("PORT", 7860)) |
|
|
host = "0.0.0.0" |
|
|
|
|
|
print(f"Starting server on {host}:{port}") |
|
|
|
|
|
|
|
|
uvicorn.run( |
|
|
app, |
|
|
host=host, |
|
|
port=port, |
|
|
log_level="info", |
|
|
access_log=True |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |