Spaces:
Runtime error
Runtime error
| from httpx import AsyncClient | |
| import os | |
| import requests | |
| import gradio as gr | |
| from fastapi import Depends, FastAPI, Request | |
| from app.db import User, create_db_and_tables | |
| from app.schemas import UserCreate, UserRead, UserUpdate | |
| from app.users import auth_backend, current_active_user, fastapi_users | |
| from dotenv import load_dotenv | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| import examples as chatbot_examples | |
| # Get the current environment from the environment variable | |
| current_environment = os.getenv("APP_ENV", "dev") | |
| # Load the appropriate .env file based on the current environment | |
| if current_environment == "dev": | |
| load_dotenv(".env.dev") | |
| elif current_environment == "test": | |
| load_dotenv(".env.test") | |
| elif current_environment == "prod": | |
| load_dotenv(".env.prod") | |
| else: | |
| raise ValueError("Invalid environment specified") | |
| import openai | |
| def api_login(email, password): | |
| port = os.getenv("APP_PORT") | |
| scheme = os.getenv("APP_SCHEME") | |
| host = os.getenv("APP_HOST") | |
| url = f"{scheme}://{host}:{port}/auth/jwt/login" | |
| payload = {"username": email, "password": password} | |
| headers = {"Content-Type": "application/x-www-form-urlencoded"} | |
| response = requests.post(url, data=payload, headers=headers) | |
| if response.status_code == 200: | |
| response_json = response.json() | |
| api_key = response_json["access_token"] | |
| return True, api_key | |
| else: | |
| response_json = response.json() | |
| detail = response_json["detail"] | |
| return False, detail | |
| def get_api_key(email, password): | |
| successful, message = api_login(email, password) | |
| if successful: | |
| return os.getenv("APP_API_BASE"), message | |
| else: | |
| raise gr.Error(message) | |
| return "", "" | |
| # Define a function to get the AI's reply using the OpenAI API | |
| def get_ai_reply( | |
| message, | |
| model="gpt-3.5-turbo", | |
| system_message=None, | |
| temperature=0, | |
| message_history=[], | |
| ): | |
| # Initialize the messages list | |
| messages = [] | |
| # Add the system message to the messages list | |
| if system_message is not None: | |
| messages += [{"role": "system", "content": system_message}] | |
| # Add the message history to the messages list | |
| if message_history is not None: | |
| messages += message_history | |
| # Add the user's message to the messages list | |
| messages += [{"role": "user", "content": message}] | |
| # Make an API call to the OpenAI ChatCompletion endpoint with the model and messages | |
| completion = openai.ChatCompletion.create( | |
| model=model, messages=messages, temperature=temperature | |
| ) | |
| # Extract and return the AI's response from the API response | |
| return completion.choices[0].message.content.strip() | |
| def get_ai_image(prompt, size="512x512"): | |
| response = openai.Image.create(prompt=prompt, n=1, size=size) | |
| image_1_url = response.data[0]["url"] | |
| return image_1_url | |
| def get_ai_transcript(path_to_audio, language=None): | |
| audio_file = open(path_to_audio, "rb") | |
| transcript = openai.Audio.transcribe("whisper-1", audio_file, language=language) | |
| return transcript.text | |
| def generate_transcription(path_to_audio_file): | |
| try: | |
| transcript = get_ai_transcript(path_to_audio_file) | |
| return transcript | |
| except Exception as e: | |
| raise gr.Error(e) | |
| return "" | |
| def generate_image(prompt): | |
| try: | |
| image_url = get_ai_image(prompt) | |
| return image_url | |
| except Exception as e: | |
| raise gr.Error(e) | |
| return None | |
| # Define a function to handle the chat interaction with the AI model | |
| def chat(message, chatbot_messages, model, temperature, system_message): | |
| history_openai_format = [] | |
| for human, assistant in chatbot_messages: | |
| history_openai_format.append({"role": "user", "content": human}) | |
| history_openai_format.append({"role": "assistant", "content": assistant}) | |
| # Try to get the AI's reply using the get_ai_reply function | |
| try: | |
| ai_reply = get_ai_reply( | |
| message, | |
| model=model, | |
| system_message=system_message, | |
| message_history=history_openai_format, | |
| temperature=temperature, | |
| ) | |
| except Exception as e: | |
| # If an error occurs, raise a Gradio error | |
| raise gr.Error(e) | |
| # Return None (empty out the user's message textbox), the updated chatbot_messages | |
| return ai_reply | |
| # Define a function to launch the chatbot interface using Gradio | |
| def get_chatbot_app(additional_examples=[]): | |
| # Load chatbot examples and merge with any additional examples provided | |
| examples = chatbot_examples.load_examples(additional=additional_examples) | |
| # Define a function to get the names of the examples | |
| def get_examples(): | |
| return [example["name"] for example in examples] | |
| # Define a function to choose an example based on the index | |
| def choose_example(index): | |
| if index != None: | |
| system_message = examples[index]["system_message"].strip() | |
| user_message = examples[index]["message"].strip() | |
| return system_message, user_message, [], [] | |
| else: | |
| return "", "", [], [] | |
| # Create the Gradio interface using the Blocks layout | |
| with gr.Blocks() as app: | |
| with gr.Tab("Conversation"): | |
| with gr.Row(): | |
| # Create a textbox for the system message (prompt) | |
| system_message = gr.TextArea( | |
| label="System Message (Prompt)", | |
| value="You are a helpful assistant.", | |
| lines=20, | |
| max_lines=400, | |
| ) | |
| # Create a chatbot interface for the conversation | |
| chatbot = gr.ChatInterface( | |
| chat, | |
| additional_inputs=[ | |
| gr.Dropdown( | |
| ["gpt-3.5-turbo", "gpt-3.5-turbo-16k"], | |
| label="Model", | |
| value="gpt-3.5-turbo", | |
| ), | |
| gr.Slider( | |
| label="Temperature", | |
| minimum=0, | |
| maximum=2, | |
| step=0.1, | |
| value=0, | |
| ), | |
| system_message, | |
| ], | |
| ) | |
| with gr.Tab("Image Generation"): | |
| image_prompt = gr.Textbox( | |
| label="Prompt", placeholder="A cute puppy wearing sunglasses." | |
| ) | |
| image_btn = gr.Button(value="Generate") | |
| image = gr.Image(label="Result", interactive=False, type="filepath") | |
| image_btn.click(generate_image, inputs=[image_prompt], outputs=[image]) | |
| with gr.Tab("Speech-to-text"): | |
| audio_file = gr.Audio(label="Audio", source="microphone", type="filepath") | |
| transcribe = gr.Button(value="Transcribe") | |
| audio_transcript = gr.Textbox(label="Transcription", interactive=False) | |
| transcribe.click( | |
| generate_transcription, inputs=[audio_file], outputs=[audio_transcript] | |
| ) | |
| with gr.Tab("Get API Key"): | |
| email_box = gr.Textbox(label="Email Address", placeholder="Student Email") | |
| password_box = gr.Textbox( | |
| label="Password", type="password", placeholder="Student ID" | |
| ) | |
| btn = gr.Button(value="Generate") | |
| api_host_box = gr.Textbox(label="OpenAI API Base", interactive=False) | |
| api_key_box = gr.Textbox(label="OpenAI API Key", interactive=False) | |
| btn.click( | |
| get_api_key, | |
| inputs=[email_box, password_box], | |
| outputs=[api_host_box, api_key_box], | |
| ) | |
| # Return the app | |
| return app | |
| app = FastAPI() | |
| app.include_router( | |
| fastapi_users.get_auth_router(auth_backend), prefix="/auth/jwt", tags=["auth"] | |
| ) | |
| app.include_router( | |
| fastapi_users.get_register_router(UserRead, UserCreate), | |
| prefix="/auth", | |
| tags=["auth"], | |
| ) | |
| app.include_router( | |
| fastapi_users.get_users_router(UserRead, UserUpdate), | |
| prefix="/users", | |
| tags=["users"], | |
| ) | |
| async def authenticated_route(user: User = Depends(current_active_user)): | |
| return {"message": f"Hello {user.email}!"} | |
| async def openai_api_embeddings_passthrough( | |
| request: Request, | |
| user: User = Depends(fastapi_users.current_user()), | |
| ): | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # Get the request data and headers | |
| request_data = await request.json() | |
| request_headers = request.headers | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| # Forward the request to the OpenAI API | |
| async with AsyncClient() as client: | |
| response = await client.post( | |
| "https://api.openai.com/v1/embeddings", | |
| json=request_data, | |
| headers={ | |
| "Content-Type": request_headers.get("Content-Type"), | |
| "Authorization": f"Bearer {openai_api_key}", | |
| }, | |
| timeout=120.0, | |
| ) | |
| # Return the OpenAI API response | |
| return response.json() | |
| async def openai_api_embeddings_passthrough( | |
| request: Request, | |
| user: User = Depends(fastapi_users.current_user()), | |
| ): | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # Get the request data and headers | |
| request_data = await request.json() | |
| request_headers = request.headers | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| # Forward the request to the OpenAI API | |
| async with AsyncClient() as client: | |
| response = await client.post( | |
| "https://api.openai.com/v1/engines/text-embedding-ada-002/embeddings", | |
| json=request_data, | |
| headers={ | |
| "Content-Type": request_headers.get("Content-Type"), | |
| "Authorization": f"Bearer {openai_api_key}", | |
| }, | |
| timeout=120.0, | |
| ) | |
| # Return the OpenAI API response | |
| return response.json() | |
| async def openai_api_completions_passthrough( | |
| request: Request, | |
| user: User = Depends(fastapi_users.current_user()), | |
| ): | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # Get the request data and headers | |
| request_data = await request.json() | |
| request_headers = request.headers | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| # Forward the request to the OpenAI API | |
| async with AsyncClient() as client: | |
| response = await client.post( | |
| "https://api.openai.com/v1/completions", | |
| json=request_data, | |
| headers={ | |
| "Content-Type": request_headers.get("Content-Type"), | |
| "Authorization": f"Bearer {openai_api_key}", | |
| }, | |
| timeout=120.0, | |
| ) | |
| # Return the OpenAI API response | |
| return response.json() | |
| async def openai_api_chat_completions_passthrough( | |
| request: Request, | |
| user: User = Depends(fastapi_users.current_user()), | |
| ): | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # Get the request data and headers | |
| request_data = await request.json() | |
| request_headers = request.headers | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if "gpt-4" in request_data["model"]: | |
| request_data["model"] = "gpt-3.5-turbo" | |
| # Forward the request to the OpenAI API | |
| async with AsyncClient() as client: | |
| response = await client.post( | |
| "https://api.openai.com/v1/chat/completions", | |
| json=request_data, | |
| headers={ | |
| "Content-Type": request_headers.get("Content-Type"), | |
| "Authorization": f"Bearer {openai_api_key}", | |
| }, | |
| timeout=120.0, | |
| ) | |
| # Return the OpenAI API response | |
| return response.json() | |
| # @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30)) | |
| # @app.post("/v1/images/generations") | |
| # async def openai_api_chat_completions_passthrough( | |
| # request: Request, | |
| # user: User = Depends(fastapi_users.current_user()), | |
| # ): | |
| # if not user: | |
| # raise HTTPException(status_code=401, detail="Unauthorized") | |
| # # Get the request data and headers | |
| # request_data = await request.json() | |
| # request_headers = request.headers | |
| # openai_api_key = os.getenv("OPENAI_API_KEY") | |
| # # Forward the request to the OpenAI API | |
| # async with AsyncClient() as client: | |
| # response = await client.post( | |
| # "https://api.openai.com/v1/images/generations", | |
| # json=request_data, | |
| # headers={ | |
| # "Content-Type": request_headers.get("Content-Type"), | |
| # "Authorization": f"Bearer {openai_api_key}", | |
| # }, | |
| # timeout=120.0, | |
| # ) | |
| # # Return the OpenAI API response | |
| # return response.json() | |
| # @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30)) | |
| # @app.post("/v1/audio/speech") | |
| # async def openai_api_audio_speech_passthrough( | |
| # request: Request, | |
| # user: User = Depends(fastapi_users.current_user()), | |
| # ): | |
| # if not user: | |
| # raise HTTPException(status_code=401, detail="Unauthorized") | |
| # # Get the request data and headers | |
| # request_data = await request.json() | |
| # request_headers = request.headers | |
| # openai_api_key = os.getenv("OPENAI_API_KEY") | |
| # # Forward the request to the OpenAI API | |
| # async with AsyncClient() as client: | |
| # response = await client.post( | |
| # "https://api.openai.com/v1/audio/speech", | |
| # json=request_data, | |
| # headers={ | |
| # "Content-Type": request_headers.get("Content-Type"), | |
| # "Authorization": f"Bearer {openai_api_key}", | |
| # }, | |
| # timeout=120.0, | |
| # ) | |
| # # Return the OpenAI API response | |
| # return response.json() | |
| async def on_startup(): | |
| # Not needed if you setup a migration system like Alembic | |
| await create_db_and_tables() | |
| gradio_gui = get_chatbot_app() | |
| gradio_gui.auth = api_login | |
| gradio_gui.auth_message = "Welcome, to the 4341 OpenAI Service" | |
| app = gr.mount_gradio_app(app, gradio_gui, path="/") | |