| import json
|
| import logging
|
| import os
|
| import tempfile
|
|
|
| import torch
|
| from audio_transcription import perform_forced_alignment
|
| from media_transcription_processor import MediaTranscriptionProcessor
|
| from transcription_status import transcription_status
|
| from omnilingual_asr.models.wav2vec2_llama.lang_ids import supported_langs
|
|
|
| from env_vars import API_LOG_LEVEL, MODEL_NAME
|
| from flask import Blueprint, jsonify, request, send_file
|
| from video_utils import check_ffmpeg_available, combine_video_with_subtitles
|
|
|
| transcriptions_blueprint = Blueprint(
|
| "transcriptions_blueprint",
|
| __name__,
|
| )
|
|
|
| logger = logging.getLogger(__name__)
|
| logger.level = API_LOG_LEVEL
|
| logging.getLogger("boto3").setLevel(API_LOG_LEVEL)
|
| logging.getLogger("botocore").setLevel(API_LOG_LEVEL)
|
|
|
| MAX_SHORTFORM_DURATION = 10
|
|
|
|
|
| @transcriptions_blueprint.route("/health")
|
| def health():
|
| """Comprehensive health check endpoint"""
|
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
|
| cuda_available = torch.cuda.is_available()
|
| ffmpeg_available = check_ffmpeg_available()
|
|
|
|
|
| transcription_info = MediaTranscriptionProcessor.get_server_status()
|
|
|
|
|
| gpu_info = {}
|
| if cuda_available:
|
| gpu_info = {
|
| "gpu_count": torch.cuda.device_count(),
|
| "current_device": torch.cuda.current_device(),
|
| "gpu_name": (
|
| torch.cuda.get_device_name(0)
|
| if torch.cuda.device_count() > 0
|
| else "Unknown"
|
| ),
|
| }
|
|
|
|
|
| try:
|
| current_device = torch.cuda.current_device()
|
| memory_allocated = torch.cuda.memory_allocated(current_device)
|
| memory_reserved = torch.cuda.memory_reserved(current_device)
|
| memory_total = torch.cuda.get_device_properties(current_device).total_memory
|
|
|
| gpu_info.update(
|
| {
|
| "gpu_memory_allocated_mb": round(memory_allocated / 1024 / 1024, 1),
|
| "gpu_memory_reserved_mb": round(memory_reserved / 1024 / 1024, 1),
|
| "gpu_memory_total_mb": round(memory_total / 1024 / 1024, 1),
|
| "gpu_memory_free_mb": round(
|
| (memory_total - memory_reserved) / 1024 / 1024, 1
|
| ),
|
| }
|
| )
|
| except Exception as e:
|
| logger.warning(f"Could not get GPU memory info: {e}")
|
|
|
| return {
|
| "status": "healthy",
|
| "message": "MMS Transcription API is running",
|
| "version": "1.0.0",
|
| "service": "mms-transcription",
|
| "device": str(device),
|
| "cuda_available": cuda_available,
|
| "ffmpeg_available": ffmpeg_available,
|
| "transcription_status": transcription_info,
|
| **gpu_info,
|
| }
|
|
|
|
|
| @transcriptions_blueprint.route("/supported-languages")
|
| def get_supported_languages():
|
| """Get list of supported languages for transcription"""
|
| try:
|
| return jsonify({
|
| "supported_languages": supported_langs,
|
| })
|
| except Exception as e:
|
| logger.error(f"Error getting supported languages: {str(e)}")
|
| return jsonify({
|
| "error": "Could not retrieve supported languages",
|
| "message": str(e)
|
| }), 500
|
|
|
|
|
| @transcriptions_blueprint.route("/status")
|
| def get_transcription_status():
|
| """Get current transcription status"""
|
| return jsonify(MediaTranscriptionProcessor.get_server_status())
|
|
|
|
|
| @transcriptions_blueprint.route("/transcribe", methods=["POST"])
|
| def transcribe_audio():
|
| """Transcribe media using the MMS model with intelligent chunking for all audio/video files"""
|
| try:
|
|
|
| if MediaTranscriptionProcessor.is_server_busy():
|
| status = MediaTranscriptionProcessor.get_server_status()
|
| return (
|
| jsonify(
|
| {
|
| "error": "Server is currently processing another transcription",
|
| "status": "busy",
|
| "current_operation": status.get("current_operation"),
|
| }
|
| ),
|
| 503,
|
| )
|
|
|
|
|
| if "media" not in request.files:
|
| return jsonify({"error": "No media file provided"}), 400
|
|
|
| media_file = request.files["media"]
|
| if media_file.filename == "":
|
| return jsonify({"error": "No file selected"}), 400
|
|
|
|
|
| language_with_script = request.form.get("language", None)
|
|
|
| if language_with_script:
|
| logger.info(f"Language specified: {language_with_script}")
|
| else:
|
| logger.info("No language specified, using auto-detection")
|
|
|
|
|
| include_preprocessed = (
|
| request.form.get("include_preprocessed", "false").lower() == "true" or
|
| request.args.get("include_preprocessed", "false").lower() == "true"
|
| )
|
| if include_preprocessed:
|
| logger.info("Preprocessed audio will be included in response")
|
|
|
|
|
|
|
|
|
|
|
| media_bytes = media_file.read()
|
|
|
| try:
|
|
|
| with MediaTranscriptionProcessor(media_bytes, media_file.filename, language_with_script) as processor:
|
|
|
| processor.start_transcription()
|
|
|
|
|
| processor.convert_media()
|
| logger.info(f"Media conversion completed for: {media_file.filename}")
|
|
|
|
|
| processor.transcribe_full_pipeline()
|
|
|
|
|
| results = processor.get_results(include_preprocessed_audio=include_preprocessed)
|
|
|
| logger.info(f"Transcription completed: {results.get('num_chunks', 0)} chunks")
|
|
|
|
|
| response = {
|
| "transcription": results.get("transcription", ""),
|
| "aligned_segments": results.get("aligned_segments", []),
|
| "chunks": results.get("chunks", []),
|
| "total_duration": results.get("total_duration", 0.0),
|
| "num_chunks": results.get("num_chunks", 0),
|
| "num_segments": results.get("num_segments", 0),
|
| "model": MODEL_NAME,
|
| "device": str(torch.device("cuda:0" if torch.cuda.is_available() else "cpu")),
|
| "status": results.get("status", "success"),
|
| }
|
|
|
|
|
| if "preprocessed_audio" in results:
|
| response["preprocessed_audio"] = results["preprocessed_audio"]
|
|
|
| if "error" in results:
|
| response["error"] = results["error"]
|
| logger.error(f"Transcription response with error: {response}")
|
| return jsonify(response), 500
|
|
|
|
|
| logger.info("=== TRANSCRIBE RESPONSE ===")
|
|
|
| logger.info("=== END TRANSCRIBE RESPONSE ===")
|
|
|
| return jsonify(response)
|
|
|
|
|
| except Exception as e:
|
| logger.error(f"Media conversion/transcription error: {str(e)}")
|
| return jsonify({"error": f"Media processing failed: {str(e)}"}), 500
|
|
|
| except Exception as e:
|
| logger.error(f"Transcription error: {str(e)}")
|
| return jsonify({"error": f"Transcription failed: {str(e)}"}), 500
|
|
|
|
|
| @transcriptions_blueprint.route("/combine-video-subtitles", methods=["POST"])
|
| def combine_video_subtitles():
|
| """Combine video with subtitles using FFmpeg"""
|
| try:
|
|
|
| if MediaTranscriptionProcessor.is_server_busy():
|
| status = MediaTranscriptionProcessor.get_server_status()
|
| return (
|
| jsonify(
|
| {
|
| "error": "Server is currently processing another request",
|
| "status": "busy",
|
| "current_operation": status.get("current_operation"),
|
| }
|
| ),
|
| 503,
|
| )
|
|
|
|
|
| if "video" not in request.files:
|
| return jsonify({"error": "No video file provided"}), 400
|
|
|
| if "subtitles" not in request.form:
|
| return jsonify({"error": "No subtitles provided"}), 400
|
|
|
| video_file = request.files["video"]
|
| subtitles = request.form["subtitles"]
|
|
|
| if video_file.filename == "":
|
| return jsonify({"error": "No video file selected"}), 400
|
|
|
|
|
| subtitle_format = request.form.get("format", "srt")
|
| output_format = request.form.get("output_format", "mp4")
|
| language = request.form.get("language", "eng")
|
|
|
|
|
| transcription_status.start_transcription("combine_video", video_file.filename)
|
|
|
| try:
|
| transcription_status.update_progress(0.1)
|
|
|
|
|
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_file.filename)[1]) as temp_video:
|
| video_file.save(temp_video.name)
|
| temp_video_path = temp_video.name
|
|
|
| transcription_status.update_progress(0.3)
|
|
|
| try:
|
|
|
| output_path = combine_video_with_subtitles(
|
| temp_video_path, subtitles, subtitle_format, output_format, language
|
| )
|
|
|
| transcription_status.update_progress(0.9)
|
|
|
| logger.info(f"Video combination completed: {output_path}")
|
|
|
|
|
| return send_file(
|
| output_path,
|
| as_attachment=True,
|
| download_name=f"{video_file.filename.rsplit('.', 1)[0]}_with_subtitles.{output_format}",
|
| mimetype=f"video/{output_format}",
|
| )
|
|
|
| finally:
|
|
|
| try:
|
| os.unlink(temp_video_path)
|
| except OSError:
|
| pass
|
|
|
| finally:
|
|
|
| transcription_status.finish_transcription()
|
|
|
| except Exception as e:
|
| transcription_status.finish_transcription()
|
| logger.error(f"Video combination error: {str(e)}")
|
| return jsonify({"error": f"Video combination failed: {str(e)}"}), 500
|
|
|