Spaces:
Build error
Build error
| # pip install python-dotenv first install this package to be able to call custom env variables from .env | |
| # don't forget to call the load_dotenv() function to initialize the getenv() method of os module | |
| import openai | |
| import os | |
| from dotenv import load_dotenv | |
| import httpx | |
| import asyncio | |
| import aiometer | |
| from functools import partial | |
| from elevenlabs import generate, save, set_api_key | |
| from base64 import b64decode | |
| import re | |
| from moviepy.editor import ImageClip, AudioFileClip, CompositeVideoClip, concatenate_videoclips, concatenate_audioclips, TextClip, CompositeAudioClip | |
| from random import choice | |
| from uuid import uuid4 | |
| TIMEOUT = 300 | |
| RATE_LIMIT = 0.15 # 9 requests per minute | |
| # Load environment variables from the .env file | |
| load_dotenv() | |
| set_api_key(os.getenv("elevenlabs_api_key")) | |
| openai.api_key = os.getenv("openai_api_key") | |
| class ChatCompletion: | |
| def __init__(self, temperature=0.8,): | |
| self.model = "gpt-3.5-turbo", | |
| self.temperature = temperature | |
| self.total_tokens = 0 | |
| def single_response(self, hist): | |
| response = openai.ChatCompletion.create( | |
| model= "gpt-3.5-turbo", | |
| messages=hist) | |
| try: | |
| print("Tokens used: " + str(response["usage"]["total_tokens"])) | |
| self.total_tokens += response["usage"]["total_tokens"] | |
| except: | |
| print("Error: " + str(response["error"]["message"])) | |
| return -1 | |
| return response["choices"][0]["message"]["content"] | |
| async def _async_response(self,payload): | |
| async with httpx.AsyncClient() as client: | |
| return await client.post( | |
| url="https://api.openai.com/v1/chat/completions", | |
| json=payload, | |
| headers={"content_type": "application/json", "Authorization": f"Bearer {openai.api_key}"}, | |
| timeout=TIMEOUT, | |
| ) | |
| async def _request(self, hist): | |
| response = await self._async_response({ | |
| "model": "gpt-3.5-turbo", | |
| "messages": hist, | |
| }) | |
| try: | |
| print("Tokens used: " + str(response.json()["usage"]["total_tokens"])) | |
| self.total_tokens += response.json()["usage"]["total_tokens"] | |
| reply = response.json()["choices"][0]["message"]["content"] | |
| return reply | |
| except: | |
| print("Error: " + str(response.json()["error"]["message"])) | |
| return -1 | |
| async def _multi_response(self, hists): | |
| return await aiometer.run_all( | |
| [partial(self._request, hist) for hist in hists], | |
| max_per_second = RATE_LIMIT | |
| ) | |
| def multi_response(self, hists): | |
| return asyncio.run(self._multi_response(hists)) | |
| def safety_check(self, message): | |
| if len(message) > 2000: | |
| return False | |
| else: | |
| return True | |
| # else: | |
| # text = f"""Just answer with "yes" or "no". Is the following message appropriate in DND game context? | |
| # {message}""" | |
| # hist = [{"role": "user", "content": text}] | |
| # response = self.single_response(hist).lower() | |
| # if(response=="no." or response=="no"): | |
| # return False | |
| # else: | |
| # return True | |
| def decide_gender(self, message): | |
| return choice(["male","female"]) | |
| # text = f"""Only reply with "male" or "female". Select a gender for {message}. If unknown or both just arbitrarily select one gender.""" | |
| # hist = [{"role": "user", "content": text}] | |
| # response = self.single_response(hist).lower() | |
| # match = re.search(r"female", response) | |
| # if match: | |
| # return "female" | |
| # return "male" | |
| def generate_image(self, desc, speaker): | |
| response = openai.Image.create( | |
| prompt=desc, | |
| n=1, | |
| size="256x256", | |
| response_format = "b64_json" | |
| ) | |
| image_b64 = response["data"][0]["b64_json"] | |
| with open(f"{speaker}.png","wb") as img: | |
| img.write(b64decode(image_b64)) | |
| return f"{speaker}.png" | |
| def _str_check(message): | |
| unwanted = re.findall(r"[^A-Za-z\s0-9]", message) | |
| if len(unwanted) > 0: | |
| return False | |
| return True | |
| def generate_audio(speaker, message): | |
| try: | |
| audio = generate( | |
| text=message, | |
| voice=speaker, | |
| model="eleven_monolingual_v1" | |
| ) | |
| except Exception as e: | |
| print("Error:" + str(e)) | |
| return -1 | |
| file_name = speaker + str(uuid4()) + ".wav" | |
| save(audio, file_name) | |
| return file_name | |
| def get_user_name(chat, user_name): | |
| if not chat.safety_check(f"My name is {user_name}"): | |
| print("Inappropriate name.") | |
| return -1 | |
| if not _str_check(user_name): | |
| print("Invalid name.") | |
| return -2 | |
| return user_name | |
| def generate_video(triples,output_path): | |
| video_clips = [] | |
| audio_clips = [] | |
| for _, audio_path, image_path in triples: | |
| image = ImageClip(image_path) | |
| audio = AudioFileClip(audio_path) | |
| duration = audio.duration | |
| image = image.set_duration(duration) | |
| #txt_clip = TextClip(text, fontsize=24, color='white', stroke_width=3).set_pos(('left', 'top')) | |
| video = CompositeVideoClip([image])#, txt_clip]) | |
| video = video.set_audio(audio) | |
| video_clips.append(video) | |
| audio_clips.append(audio) | |
| final_video = concatenate_videoclips(video_clips, method="compose") | |
| final_audio = concatenate_audioclips(audio_clips) | |
| final_video = final_video.set_audio(final_audio) | |
| final_video.write_videofile(output_path, fps=24, verbose=False, logger=None) | |
| for _, audio_path, _ in triples: | |
| os.remove(audio_path) |