| | import os |
| | import random |
| | import string |
| | import psycopg2 |
| | from huggingface_hub import upload_file |
| | from _db_ import create_connection |
| | import tempfile |
| | import base64 |
| | import logging |
| | from pydub import AudioSegment |
| | from moviepy.editor import VideoFileClip |
| | import wave |
| |
|
| | |
| | |
| | logger = logging.getLogger(__name__) |
| | logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
| |
|
| | |
| | HF_TOKEN = os.getenv("HF_TOKEN") |
| | DATASET_REPO = "EllenBeta/Voxai-data" |
| |
|
| | |
| | |
| | |
| | def save_to_dataset_repo(file_path, hf_path, file_name): |
| | """Upload audio file to Hugging Face dataset.""" |
| | try: |
| | if not os.path.exists(file_path): |
| | raise Exception(f"File not found: {file_path}") |
| | |
| | upload_file( |
| | path_or_fileobj=file_path, |
| | path_in_repo=hf_path, |
| | repo_id=DATASET_REPO, |
| | repo_type="dataset", |
| | token=HF_TOKEN |
| | ) |
| | uploaded_url = f"https://voxai-api3.onrender.com/my/audios/{file_name}" |
| | logger.info(f"✅ Upload successful: {uploaded_url}") |
| | return uploaded_url |
| | except Exception as e: |
| | logger.error(f"❌ Upload failed: {str(e)}") |
| | raise Exception(f"Upload failed: {str(e)}") |
| |
|
| | |
| | |
| | |
| | def save_audio(user_id, audio_url, title, text, duration): |
| | """Save audio metadata to PostgreSQL.""" |
| | conn = create_connection() |
| | try: |
| | if conn: |
| | cursor = conn.cursor() |
| | cursor.execute( |
| | """ |
| | INSERT INTO voxai_generated_voices (user_id, audio, title, text, duration) |
| | VALUES (%s, %s, %s, %s, %s) |
| | """, |
| | (user_id, audio_url, title, text, duration) |
| | ) |
| | conn.commit() |
| | logger.info("✅ Audio metadata saved to database") |
| | except Exception as e: |
| | logger.error(f"❌ Database error: {e}") |
| | finally: |
| | if conn: |
| | conn.close() |
| |
|
| | |
| | |
| | |
| | def generate_random_filename(extension="mp3", length=12): |
| | letters = string.ascii_letters + string.digits |
| | random_str = ''.join(random.choice(letters) for _ in range(length)) |
| | return f"{random_str}.{extension}" |
| |
|
| | |
| | |
| | |
| | def cut_video(video_path, max_duration=2): |
| | """Cut video to a maximum duration using MoviePy.""" |
| | try: |
| | clip = VideoFileClip(video_path) |
| | if clip.duration > max_duration: |
| | clip = clip.subclip(0, max_duration) |
| | return clip |
| | except Exception as e: |
| | raise Exception(f"Failed to open or trim video: {e}") |
| |
|
| | |
| | |
| | |
| | def video_to_audio(video_input, output_path=None, max_duration=2): |
| | """ |
| | Convert video (file path, data URL, or base64) to WAV audio using MoviePy. |
| | Handles all input formats safely and avoids oversized subprocess arguments. |
| | """ |
| | temp_video_path = None |
| | clip = None |
| |
|
| | try: |
| | |
| | if output_path is None: |
| | output_path = tempfile.NamedTemporaryFile(delete=False, suffix=".wav").name |
| | logger.debug(f"Auto-created output path: {output_path}") |
| |
|
| | |
| | if isinstance(video_input, str): |
| | if video_input.startswith("data:video/"): |
| | logger.info("📹 Detected data URL video input") |
| | header, encoded = video_input.split(",", 1) |
| | decoded_bytes = base64.b64decode(encoded) |
| | elif len(video_input) > 500 and all(c.isalnum() or c in "+/=\n" for c in video_input.strip()): |
| | logger.info("📹 Detected raw base64 video input") |
| | decoded_bytes = base64.b64decode(video_input) |
| | elif os.path.exists(video_input): |
| | logger.info(f"📁 Detected video file path: {video_input}") |
| | video_path = video_input |
| | clip = cut_video(video_path, max_duration) |
| | else: |
| | raise Exception("Invalid video input format (not file path or base64)") |
| | else: |
| | raise Exception("Invalid video input type") |
| |
|
| | |
| | if clip is None: |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp: |
| | tmp.write(decoded_bytes) |
| | temp_video_path = tmp.name |
| | video_path = temp_video_path |
| | clip = cut_video(video_path, max_duration) |
| |
|
| | |
| | clip.audio.write_audiofile( |
| | output_path, |
| | fps=22050, |
| | nbytes=2, |
| | codec="pcm_s16le", |
| | verbose=False, |
| | logger=None |
| | ) |
| | clip.close() |
| |
|
| | if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: |
| | raise Exception("Output audio file not created or empty") |
| |
|
| | logger.info(f"✅ Video converted successfully → {output_path}") |
| | return output_path |
| |
|
| | except Exception as e: |
| | logger.error(f"❌ Video→Audio conversion failed: {e}") |
| | raise |
| |
|
| | finally: |
| | if clip: |
| | try: |
| | clip.close() |
| | except: |
| | pass |
| | if temp_video_path and os.path.exists(temp_video_path): |
| | try: |
| | os.remove(temp_video_path) |
| | logger.debug(f"🧹 Deleted temp file {temp_video_path}") |
| | except Exception as e: |
| | logger.warning(f"⚠️ Failed to delete temp file: {e}") |
| | |
| | |
| | |
| | |
| | def ensure_wav_format(input_path): |
| | """Convert audio to WAV if not already valid WAV.""" |
| | try: |
| | with wave.open(input_path, 'rb'): |
| | return input_path |
| | except: |
| | try: |
| | audio = AudioSegment.from_file(input_path) |
| | wav_path = os.path.splitext(input_path)[0] + "_converted.wav" |
| | audio.export(wav_path, format="wav") |
| | logger.info(f"🎵 Converted {input_path} → WAV ({wav_path})") |
| | return wav_path |
| | except Exception as e: |
| | raise Exception(f"Failed to convert to WAV: {e}") |
| |
|
| | |
| | |
| | |
| | def validate_audio_file(file_path, max_size_mb=10): |
| | """Validate audio file: size, format, and duration.""" |
| | try: |
| | if not os.path.exists(file_path): |
| | return False, "File not found" |
| |
|
| | size_mb = os.path.getsize(file_path) / (1024 * 1024) |
| | if size_mb > max_size_mb: |
| | return False, f"File too large ({size_mb:.2f}MB > {max_size_mb}MB)" |
| | if size_mb == 0: |
| | return False, "Empty file" |
| |
|
| | try: |
| | audio = AudioSegment.from_file(file_path) |
| | if len(audio) < 200: |
| | return False, "Audio too short" |
| | except Exception as e: |
| | return False, f"Invalid audio format: {e}" |
| |
|
| | return True, "Valid" |
| | except Exception as e: |
| | return False, f"Validation error: {e}" |