File size: 2,436 Bytes
89f4e6a
 
 
 
0c5c2a7
 
 
89f4e6a
0c5c2a7
 
b2531e4
0c5c2a7
 
 
 
22d9bf5
0c5c2a7
b2531e4
 
0c5c2a7
b2531e4
0c5c2a7
 
 
 
 
89f4e6a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0c5c2a7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from dataclasses import dataclass
from typing import Optional, Dict
import librosa
import numpy as np
import parameters
from datetime import datetime
from S3_bucket import AWS

aws = AWS()

def upload_voice_clone_audio(audio_file,clone_id):
    if clone_id is None:
        clone_id="failed"

    s3_folder = parameters.voice_clone_data_key
    s3_key = f"{s3_folder}/{datetime.now().strftime('%Y_%b_%d_%H_%M_%S')}_{clone_id}.wav"
    try:
        # with open(audio_path, "rb") as f:
        #     audio_file = io.BytesIO(f.read())
        aws.s3_upload_wav(obj=audio_file,s3_key=s3_key)
        print(f"Uploaded to s3:{s3_key}")
        return s3_key
    except Exception as e:
        print(f"Error uploading voice clone audio: {e}")
        return None


@dataclass
class AudioInfo:
    raw_audio: np.ndarray
    sr: int
    duration: float
    channels: int
    # path: Optional[str] = None


class AudioStateManager:
    def __init__(self):
        self.current_recording_info: Optional[AudioInfo] = None
        self.save_uploads: bool = False

    def update_current_recording(self, filepath: str, save: bool = None):
        try:
            print(
                "filepath in update_current_recording:---",
                filepath,
                "::and this is type of it:-",
                type(filepath),
            )
            audio, sr = librosa.load(path=filepath, sr=22000)
            audio = audio.astype(np.float32)
            if np.abs(audio).max() > 1:
                audio = audio / np.abs(audio).max()

            duration = len(audio) / sr

            self.current_recording_info = AudioInfo(
                raw_audio=audio,
                sr=sr,
                duration=duration,
                channels=1,
            )

            return duration >= 5 and duration <= 30
        except Exception as e:
            print(f"Error processing audio: {e}")
            self.current_recording_info = None
            return False

    def get_current_audio_info(self) -> Optional[Dict]:
        """Get current audio information for TTS inference"""
        if self.current_recording_info:
            return {
                "raw_audio": self.current_recording_info.raw_audio,
                "sr": self.current_recording_info.sr,
                "channels": self.current_recording_info.channels,
                "duration": self.current_recording_info.duration,
            }
        return None