vox-beta / helper.py
EllenBeta's picture
Update helper.py
e2d7c7f verified
import os
import random
import string
import psycopg2
from huggingface_hub import upload_file
from _db_ import create_connection
import tempfile
import base64
import logging
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
import wave
# -------------------------------
# Logging setup
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Hugging Face credentials
HF_TOKEN = os.getenv("HF_TOKEN")
DATASET_REPO = "EllenBeta/Voxai-data"
# -------------------------------
# Upload to Hugging Face Dataset
# -------------------------------
def save_to_dataset_repo(file_path, hf_path, file_name):
"""Upload audio file to Hugging Face dataset."""
try:
if not os.path.exists(file_path):
raise Exception(f"File not found: {file_path}")
upload_file(
path_or_fileobj=file_path,
path_in_repo=hf_path,
repo_id=DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
uploaded_url = f"https://voxai-api3.onrender.com/my/audios/{file_name}"
logger.info(f"✅ Upload successful: {uploaded_url}")
return uploaded_url
except Exception as e:
logger.error(f"❌ Upload failed: {str(e)}")
raise Exception(f"Upload failed: {str(e)}")
# -------------------------------
# Save metadata to PostgreSQL
# -------------------------------
def save_audio(user_id, audio_url, title, text, duration):
"""Save audio metadata to PostgreSQL."""
conn = create_connection()
try:
if conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO voxai_generated_voices (user_id, audio, title, text, duration)
VALUES (%s, %s, %s, %s, %s)
""",
(user_id, audio_url, title, text, duration)
)
conn.commit()
logger.info("✅ Audio metadata saved to database")
except Exception as e:
logger.error(f"❌ Database error: {e}")
finally:
if conn:
conn.close()
# -------------------------------
# Utility: Generate random filename
# -------------------------------
def generate_random_filename(extension="mp3", length=12):
letters = string.ascii_letters + string.digits
random_str = ''.join(random.choice(letters) for _ in range(length))
return f"{random_str}.{extension}"
# -------------------------------
# Cut video safely
# -------------------------------
def cut_video(video_path, max_duration=2):
"""Cut video to a maximum duration using MoviePy."""
try:
clip = VideoFileClip(video_path)
if clip.duration > max_duration:
clip = clip.subclip(0, max_duration)
return clip
except Exception as e:
raise Exception(f"Failed to open or trim video: {e}")
# -------------------------------
# Convert video to audio safely
# -------------------------------
def video_to_audio(video_input, output_path=None, max_duration=2):
"""
Convert video (file path, data URL, or base64) to WAV audio using MoviePy.
Handles all input formats safely and avoids oversized subprocess arguments.
"""
temp_video_path = None
clip = None
try:
# ✅ Auto-generate output path if None
if output_path is None:
output_path = tempfile.NamedTemporaryFile(delete=False, suffix=".wav").name
logger.debug(f"Auto-created output path: {output_path}")
# Detect and decode base64 or data URLs
if isinstance(video_input, str):
if video_input.startswith("data:video/"): # full data URL
logger.info("📹 Detected data URL video input")
header, encoded = video_input.split(",", 1)
decoded_bytes = base64.b64decode(encoded)
elif len(video_input) > 500 and all(c.isalnum() or c in "+/=\n" for c in video_input.strip()):
logger.info("📹 Detected raw base64 video input")
decoded_bytes = base64.b64decode(video_input)
elif os.path.exists(video_input): # already a valid path
logger.info(f"📁 Detected video file path: {video_input}")
video_path = video_input
clip = cut_video(video_path, max_duration)
else:
raise Exception("Invalid video input format (not file path or base64)")
else:
raise Exception("Invalid video input type")
# If we got decoded bytes, write to a temp file
if clip is None:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp:
tmp.write(decoded_bytes)
temp_video_path = tmp.name
video_path = temp_video_path
clip = cut_video(video_path, max_duration)
# Extract audio safely
clip.audio.write_audiofile(
output_path,
fps=22050,
nbytes=2,
codec="pcm_s16le",
verbose=False,
logger=None
)
clip.close()
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0:
raise Exception("Output audio file not created or empty")
logger.info(f"✅ Video converted successfully → {output_path}")
return output_path
except Exception as e:
logger.error(f"❌ Video→Audio conversion failed: {e}")
raise
finally:
if clip:
try:
clip.close()
except:
pass
if temp_video_path and os.path.exists(temp_video_path):
try:
os.remove(temp_video_path)
logger.debug(f"🧹 Deleted temp file {temp_video_path}")
except Exception as e:
logger.warning(f"⚠️ Failed to delete temp file: {e}")
# -------------------------------
# Ensure WAV format
# -------------------------------
def ensure_wav_format(input_path):
"""Convert audio to WAV if not already valid WAV."""
try:
with wave.open(input_path, 'rb'):
return input_path # valid WAV
except:
try:
audio = AudioSegment.from_file(input_path)
wav_path = os.path.splitext(input_path)[0] + "_converted.wav"
audio.export(wav_path, format="wav")
logger.info(f"🎵 Converted {input_path} → WAV ({wav_path})")
return wav_path
except Exception as e:
raise Exception(f"Failed to convert to WAV: {e}")
# -------------------------------
# Validate audio file
# -------------------------------
def validate_audio_file(file_path, max_size_mb=10):
"""Validate audio file: size, format, and duration."""
try:
if not os.path.exists(file_path):
return False, "File not found"
size_mb = os.path.getsize(file_path) / (1024 * 1024)
if size_mb > max_size_mb:
return False, f"File too large ({size_mb:.2f}MB > {max_size_mb}MB)"
if size_mb == 0:
return False, "Empty file"
try:
audio = AudioSegment.from_file(file_path)
if len(audio) < 200: # less than 0.2 sec
return False, "Audio too short"
except Exception as e:
return False, f"Invalid audio format: {e}"
return True, "Valid"
except Exception as e:
return False, f"Validation error: {e}"