Spaces:
Configuration error
Configuration error
| from datetime import timedelta | |
| import os | |
| import uuid | |
| import torch | |
| from rest_framework import status | |
| from rest_framework.response import Response | |
| from rest_framework.generics import CreateAPIView,ListAPIView | |
| from TTS.api import TTS # Ensure this import is correct based on your TTS library/package | |
| from rest_framework.authentication import TokenAuthentication | |
| from rest_framework.permissions import IsAuthenticated | |
| from texttovoice.models import TextToSpeech | |
| from .serializers import TextToSpeechSerializer, TextToSpeechSerializerResponse ,TextToSpeechSerializerResponseWithURL # Ensure this import matches your file structure | |
| from rest_framework.parsers import MultiPartParser | |
| from drf_yasg.utils import swagger_auto_schema | |
| from drf_yasg import openapi | |
| from rest_framework.exceptions import NotFound as NOT_FOUND | |
| from .minio_utils import get_minio_client # Ensure this import matches your file structure | |
| minio_client = get_minio_client() | |
| BUCKET_NAME = "voice-clone" | |
| class TextToSpeechCreateView(CreateAPIView): | |
| serializer_class = TextToSpeechSerializer | |
| authentication_classes = [TokenAuthentication] | |
| permission_classes = [IsAuthenticated] | |
| parser_classes = [MultiPartParser] | |
| def create(self, request, *args, **kwargs): | |
| serializer = self.get_serializer(data=request.data) | |
| if serializer.is_valid(): | |
| try: | |
| gpu_available = torch.cuda.is_available() | |
| text = serializer.validated_data.get("text") | |
| speaker_wav = serializer.validated_data.get("speaker_wav") | |
| language = serializer.validated_data.get("language") | |
| # Temporary file paths | |
| speaker_file_path = os.path.join('/tmp', f"{uuid.uuid4()}{speaker_wav.name}") | |
| output_filename = os.path.join('/tmp', f"{uuid.uuid4()}.wav") | |
| # Save speaker WAV file | |
| with open(speaker_file_path, "wb") as destination: | |
| for chunk in speaker_wav.chunks(): | |
| destination.write(chunk) | |
| # TTS processing | |
| tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2", gpu=gpu_available) | |
| tts.tts_to_file(text=text, file_path=output_filename, speaker_wav=speaker_file_path, language=language) | |
| # Upload files to MinIO and cleanup | |
| public_url,speaker_wav_path = self.upload_file_to_minio(speaker_file_path, 'speakers/') | |
| public_url_output ,output_wav_path = self.upload_file_to_minio(output_filename, 'output/') | |
| # Create DB entry | |
| tts_instance = TextToSpeech.objects.create( | |
| text=text, | |
| speaker_wav=speaker_wav_path, | |
| output_wav=output_wav_path, | |
| language=language, | |
| created_by=request.user | |
| ) | |
| # Serialize and return the created instance | |
| response_serializer = TextToSpeechSerializerResponse(tts_instance) | |
| response_data = { | |
| **response_serializer.data, | |
| "speaker_wav": public_url, | |
| "output_wav": public_url_output | |
| } | |
| return Response(response_data, status=status.HTTP_201_CREATED) | |
| except Exception as e: | |
| print("Error due to ",e) | |
| return Response({"error": "An error occurred processing your request."}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) | |
| finally: | |
| # Ensure cleanup happens | |
| self.cleanup_files([speaker_file_path, output_filename]) | |
| else: | |
| return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) | |
| def upload_file_to_minio(self, file_path, prefix): | |
| """Uploads a file to MinIO and returns a pre-signed URL for secure, temporary access.""" | |
| file_name = os.path.basename(file_path) | |
| object_name = f"{prefix}{file_name}" | |
| with open(file_path, "rb") as file_data: | |
| minio_client.put_object(BUCKET_NAME, object_name, file_data, os.path.getsize(file_path)) | |
| # Generate a pre-signed URL for the uploaded object | |
| pre_signed_url = minio_client.presigned_get_object(BUCKET_NAME, object_name, expires=timedelta(days=1)) | |
| return pre_signed_url ,f"{BUCKET_NAME}/{object_name}" | |
| def cleanup_files(self, file_paths): | |
| """Removes files from the filesystem.""" | |
| for file_path in file_paths: | |
| try: | |
| os.remove(file_path) | |
| except Exception as e: | |
| print(e) | |
| pass | |
| class TextToSpeechListView(ListAPIView): | |
| serializer_class = TextToSpeechSerializerResponseWithURL | |
| authentication_classes = [TokenAuthentication] | |
| permission_classes = [IsAuthenticated] | |
| def get_queryset(self): | |
| return TextToSpeech.objects.filter(created_by=self.request.user) | |
| def list(self, request, *args, **kwargs): | |
| queryset = self.get_queryset() | |
| if not queryset.exists(): | |
| raise NOT_FOUND('No text-to-speech data found for the current user.') | |
| # Directly serialize the data, pre-signed URLs are handled by the serializer | |
| serializer = self.get_serializer(queryset, many=True, context={'view': self}) | |
| return Response(serializer.data, status=status.HTTP_200_OK) | |
| def generate_presigned_url(self, object_path): | |
| # Ensure this logic correctly splits your `object_path` to get the bucket name and object name | |
| # This example assumes `object_path` is in the format "bucket_name/object_name" | |
| try: | |
| bucket, object_name = object_path.split('/', 1) | |
| presigned_url = minio_client.presigned_get_object(bucket, object_name, expires=timedelta(hours=1)) | |
| return presigned_url | |
| except Exception as e: | |
| print(e) | |
| return None | |