import argparse import base64 import concurrent.futures import copy import os import sys from io import BytesIO from typing import Any, Dict, List, Optional, Tuple, Union import yaml import yt_dlp from dotenv import load_dotenv from langchain.output_parsers import OutputFixingParser from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.language_models.llms import LLM from langchain_core.messages import AIMessage, HumanMessage, SystemMessage from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser from langchain_core.prompts import load_prompt from pydantic import BaseModel, Field current_dir = os.path.dirname(os.path.abspath(__file__)) kit_dir = os.path.dirname(current_dir) repo_dir = os.path.dirname(kit_dir) sys.path.append(kit_dir) sys.path.append(repo_dir) from utils.model_wrappers.api_gateway import APIGateway load_dotenv(os.path.join(repo_dir, '.env')) CONFIG_PATH = os.path.join(kit_dir, 'config.yaml') MAX_FILE_SIZE = 25 * 1024 * 1024 # 25 MB in bytes class FileSizeExceededError(Exception): pass class Transcript(BaseModel): transcript: str = Field(description='audio transcription') class Scribe: """Downloading, transcription, question answering and summarization class""" def __init__( self, sambanova_api_key: Optional[str] = None, ) -> None: """ Create a new Scribe class Args: sambanova_api_key (str): sambanova Cloud env api key """ config = self.get_config_info() self.llm_info = config[0] self.audio_model_info = config[1] self.prod_mode = config[2] self.sambanova_api_key: Optional[str] = sambanova_api_key self.audio_model = self.set_audio_model() self.llm = self.set_llm() self.reset_query_audio_conversation() def get_config_info(self) -> Tuple[Dict[str, Any], Dict[str, Any], bool]: """ Loads json config file """ with open(CONFIG_PATH, 'r') as yaml_file: config = yaml.safe_load(yaml_file) llm_info = config['llm'] audio_model_info = config['audio_model'] prod_mode = config['prod_mode'] return llm_info, audio_model_info, prod_mode def set_audio_model(self) -> BaseChatModel: """ Sets the audio model. Returns: audio_model: The audio model. """ audio_model = APIGateway.load_chat( type=self.audio_model_info['type'], streaming=False, max_tokens=self.audio_model_info['max_tokens'], temperature=self.audio_model_info['temperature'], model=self.audio_model_info['model'], sambanova_api_key=self.sambanova_api_key, ) return audio_model def set_llm(self) -> Union[LLM, BaseChatModel]: """ Sets the sncloud, or sambastudio LLM based on the llm type attribute. Returns: LLM: The SambaStudio Cloud or Sambastudio Langchain ChatModel. """ llm = APIGateway.load_chat( type=self.llm_info['type'], streaming=False, do_sample=self.llm_info['do_sample'], max_tokens=self.llm_info['max_tokens'], temperature=self.llm_info['temperature'], model=self.llm_info['model'], process_prompt=False, sambanova_api_key=self.sambanova_api_key, ) return llm def summarize(self, text: str, num: int = 5) -> str: """ /Crete a bullet points summarY of the text input. Args: text (str): The text to summarize. num (int, optional): The number of bullet points to generate. Defaults to 5. Returns: str: The bullet points summary of the text. """ prompt_template = load_prompt(os.path.join(kit_dir, 'prompts', 'summary.yaml')) chain = prompt_template | self.llm | StrOutputParser() summary = chain.invoke({'text': text, 'num': num}) return summary def reset_query_audio_conversation(self) -> None: self.query_audio_conversation: List[Union[HumanMessage, AIMessage, SystemMessage]] = [ AIMessage( 'You are helpful assistant called Scribe developed by SambaNova Systems, ' 'you are helping users in general purpose tasks' ) ] def query_audio(self, audio: Optional[Union[BytesIO, str]] = None, query: Optional[str] = None) -> str: if audio is not None: b64_audio = self.load_encode_audio(audio) self.query_audio_conversation.append( HumanMessage( content=[ {'type': 'audio_content', 'audio_content': {'content': f'data:audio/mp3;base64,{b64_audio}'}} ] ) ) if query is not None: self.query_audio_conversation.append(HumanMessage(f'{query}, explain your response')) chain = self.audio_model | StrOutputParser() response = chain.invoke(self.query_audio_conversation) self.query_audio_conversation.append(AIMessage(response)) return response def query_audio_pipeline(self, audio: Union[BytesIO, str], query: str) -> str: with concurrent.futures.ThreadPoolExecutor() as executor: transcription_future = executor.submit(self.transcribe_audio, audio) audio_result_future = executor.submit(self.query_audio, audio, query) transcription = transcription_future.result() audio_result = audio_result_future.result() conversation = [ SystemMessage(""" You are helpful assistant called Scribe developed by SambaNova Systems. the user will ask information about an audio. You will get the user query, the audio transcription and an intermediate response generated by a model capable of listening the audio Whit those give a final response to the user query """), # noqa: E501 HumanMessage( f""" Transcript: {transcription} Intermediate Audio Response: {audio_result} Query: {query} """ ), ] chain = self.llm | StrOutputParser() response = chain.invoke(conversation) return response def encode_to_base64(self, content: bytes) -> str: """Encode audio file to base64""" return base64.b64encode(content).decode('utf-8') def load_encode_audio(self, audio: Union[BytesIO, str]) -> str: if isinstance(audio, str): with open(audio, 'rb') as file: audio_bytes = file.read() else: # make a copy given BytesIO object is not thread safe audio_copy = copy.deepcopy(audio) audio_bytes = audio_copy.read() b64_audio = self.encode_to_base64(content=audio_bytes) return b64_audio def transcribe_audio(self, audio_file: Union[BytesIO, str]) -> str: b64_audio = self.load_encode_audio(audio_file) conversation = [ AIMessage('You are Automatic Speech Recognition tool'), HumanMessage( content=[{'type': 'audio_content', 'audio_content': {'content': f'data:audio/mp3;base64,{b64_audio}'}}] ), HumanMessage( f"""Please transcribe the previous audio in the following format ``` {{ "transcript":"