File size: 7,533 Bytes
07ad070 3452ded 07ad070 3452ded 567695d 07ad070 567695d 07ad070 52329b2 1f096f9 07ad070 567695d 07ad070 3452ded 07ad070 3452ded 07ad070 567695d 07ad070 3452ded 07ad070 567695d 07ad070 567695d 3452ded 567695d 07ad070 567695d e2d7c7f 567695d 07ad070 567695d 07ad070 e2d7c7f 567695d 07ad070 567695d 07ad070 567695d 3452ded 07ad070 567695d 07ad070 567695d 07ad070 567695d 07ad070 567695d 3452ded 567695d 3452ded 567695d e2d7c7f 07ad070 567695d 07ad070 567695d 07ad070 567695d 07ad070 567695d 07ad070 567695d 07ad070 567695d 07ad070 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | import os
import random
import string
import psycopg2
from huggingface_hub import upload_file
from _db_ import create_connection
import tempfile
import base64
import logging
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
import wave
# -------------------------------
# Logging setup
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Hugging Face credentials
HF_TOKEN = os.getenv("HF_TOKEN")
DATASET_REPO = "EllenBeta/Voxai-data"
# -------------------------------
# Upload to Hugging Face Dataset
# -------------------------------
def save_to_dataset_repo(file_path, hf_path, file_name):
"""Upload audio file to Hugging Face dataset."""
try:
if not os.path.exists(file_path):
raise Exception(f"File not found: {file_path}")
upload_file(
path_or_fileobj=file_path,
path_in_repo=hf_path,
repo_id=DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
uploaded_url = f"https://voxai-api3.onrender.com/my/audios/{file_name}"
logger.info(f"✅ Upload successful: {uploaded_url}")
return uploaded_url
except Exception as e:
logger.error(f"❌ Upload failed: {str(e)}")
raise Exception(f"Upload failed: {str(e)}")
# -------------------------------
# Save metadata to PostgreSQL
# -------------------------------
def save_audio(user_id, audio_url, title, text, duration):
"""Save audio metadata to PostgreSQL."""
conn = create_connection()
try:
if conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO voxai_generated_voices (user_id, audio, title, text, duration)
VALUES (%s, %s, %s, %s, %s)
""",
(user_id, audio_url, title, text, duration)
)
conn.commit()
logger.info("✅ Audio metadata saved to database")
except Exception as e:
logger.error(f"❌ Database error: {e}")
finally:
if conn:
conn.close()
# -------------------------------
# Utility: Generate random filename
# -------------------------------
def generate_random_filename(extension="mp3", length=12):
letters = string.ascii_letters + string.digits
random_str = ''.join(random.choice(letters) for _ in range(length))
return f"{random_str}.{extension}"
# -------------------------------
# Cut video safely
# -------------------------------
def cut_video(video_path, max_duration=2):
"""Cut video to a maximum duration using MoviePy."""
try:
clip = VideoFileClip(video_path)
if clip.duration > max_duration:
clip = clip.subclip(0, max_duration)
return clip
except Exception as e:
raise Exception(f"Failed to open or trim video: {e}")
# -------------------------------
# Convert video to audio safely
# -------------------------------
def video_to_audio(video_input, output_path=None, max_duration=2):
"""
Convert video (file path, data URL, or base64) to WAV audio using MoviePy.
Handles all input formats safely and avoids oversized subprocess arguments.
"""
temp_video_path = None
clip = None
try:
# ✅ Auto-generate output path if None
if output_path is None:
output_path = tempfile.NamedTemporaryFile(delete=False, suffix=".wav").name
logger.debug(f"Auto-created output path: {output_path}")
# Detect and decode base64 or data URLs
if isinstance(video_input, str):
if video_input.startswith("data:video/"): # full data URL
logger.info("📹 Detected data URL video input")
header, encoded = video_input.split(",", 1)
decoded_bytes = base64.b64decode(encoded)
elif len(video_input) > 500 and all(c.isalnum() or c in "+/=\n" for c in video_input.strip()):
logger.info("📹 Detected raw base64 video input")
decoded_bytes = base64.b64decode(video_input)
elif os.path.exists(video_input): # already a valid path
logger.info(f"📁 Detected video file path: {video_input}")
video_path = video_input
clip = cut_video(video_path, max_duration)
else:
raise Exception("Invalid video input format (not file path or base64)")
else:
raise Exception("Invalid video input type")
# If we got decoded bytes, write to a temp file
if clip is None:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp:
tmp.write(decoded_bytes)
temp_video_path = tmp.name
video_path = temp_video_path
clip = cut_video(video_path, max_duration)
# Extract audio safely
clip.audio.write_audiofile(
output_path,
fps=22050,
nbytes=2,
codec="pcm_s16le",
verbose=False,
logger=None
)
clip.close()
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0:
raise Exception("Output audio file not created or empty")
logger.info(f"✅ Video converted successfully → {output_path}")
return output_path
except Exception as e:
logger.error(f"❌ Video→Audio conversion failed: {e}")
raise
finally:
if clip:
try:
clip.close()
except:
pass
if temp_video_path and os.path.exists(temp_video_path):
try:
os.remove(temp_video_path)
logger.debug(f"🧹 Deleted temp file {temp_video_path}")
except Exception as e:
logger.warning(f"⚠️ Failed to delete temp file: {e}")
# -------------------------------
# Ensure WAV format
# -------------------------------
def ensure_wav_format(input_path):
"""Convert audio to WAV if not already valid WAV."""
try:
with wave.open(input_path, 'rb'):
return input_path # valid WAV
except:
try:
audio = AudioSegment.from_file(input_path)
wav_path = os.path.splitext(input_path)[0] + "_converted.wav"
audio.export(wav_path, format="wav")
logger.info(f"🎵 Converted {input_path} → WAV ({wav_path})")
return wav_path
except Exception as e:
raise Exception(f"Failed to convert to WAV: {e}")
# -------------------------------
# Validate audio file
# -------------------------------
def validate_audio_file(file_path, max_size_mb=10):
"""Validate audio file: size, format, and duration."""
try:
if not os.path.exists(file_path):
return False, "File not found"
size_mb = os.path.getsize(file_path) / (1024 * 1024)
if size_mb > max_size_mb:
return False, f"File too large ({size_mb:.2f}MB > {max_size_mb}MB)"
if size_mb == 0:
return False, "Empty file"
try:
audio = AudioSegment.from_file(file_path)
if len(audio) < 200: # less than 0.2 sec
return False, "Audio too short"
except Exception as e:
return False, f"Invalid audio format: {e}"
return True, "Valid"
except Exception as e:
return False, f"Validation error: {e}" |