Spaces:
Runtime error
Runtime error
| import os | |
| import whisper | |
| from io import BytesIO | |
| import base64 | |
| import boto3 | |
| from pydub import AudioSegment | |
| from pydub.playback import play | |
| import logging | |
| from langchain import OpenAI | |
| from langchain.chains import RetrievalQA | |
| from langchain.vectorstores import Chroma | |
| from langchain.document_loaders import DirectoryLoader | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.text_splitter import CharacterTextSplitter | |
| OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
| AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | |
| AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | |
| AWS_REGION_NAME = 'ap-south-1' | |
| logging.basicConfig(level="INFO", | |
| filename='conversations.log', | |
| filemode='a', | |
| format='%(asctime)s %(message)s', | |
| datefmt='%H:%M:%S') | |
| def buzz_user(): | |
| input_prompt = AudioSegment.from_mp3('assets/timeout_audio.mp3') | |
| play(input_prompt) | |
| def initialize_knowledge_base(): | |
| loader = DirectoryLoader('profiles', glob='**/*.txt') | |
| docs = loader.load() | |
| char_text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
| doc_texts = char_text_splitter.split_documents(docs) | |
| openAI_embeddings = OpenAIEmbeddings() | |
| vStore = Chroma.from_documents(doc_texts, openAI_embeddings) | |
| conv_model = RetrievalQA.from_chain_type( | |
| llm=OpenAI(), | |
| chain_type="stuff", | |
| retriever=vStore.as_retriever( | |
| search_kwargs={"k": 1} | |
| ) | |
| ) | |
| voice_model = whisper.load_model("tiny") | |
| return conv_model, voice_model | |
| def text_to_speech_gen(answer): | |
| polly = boto3.client('polly', | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| region_name=AWS_REGION_NAME) | |
| response = polly.synthesize_speech( | |
| Text=answer, | |
| VoiceId='Matthew', | |
| OutputFormat='mp3', | |
| Engine = "neural") | |
| audio_stream = response['AudioStream'].read() | |
| audio_html = audio_to_html(audio_stream) | |
| return audio_html | |
| def audio_to_html(audio_bytes): | |
| audio_io = BytesIO(audio_bytes) | |
| audio_io.seek(0) | |
| audio_base64 = base64.b64encode(audio_io.read()).decode("utf-8") | |
| audio_html = f'<audio src="data:audio/mpeg;base64,{audio_base64}" controls autoplay></audio>' | |
| return audio_html | |
| def get_chat_history(user_message, history): | |
| return "", history + [[user_message, None]] | |