Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import random | |
| import time | |
| import os | |
| import requests | |
| import base64 | |
| import random | |
| import pymongo | |
| import certifi | |
| token = '5UAYO8UWHNQKT3UUS9H8V360L76MD72DRIUY9QC2' | |
| # Chatbot demo with multimodal input (text, markdown, LaTeX, code blocks, image, audio, & video). Plus shows support for streaming text. | |
| uri = "mongodb+srv://clementrof:t5fXqwpDQYFpvuCk@cluster0.rl5qhcj.mongodb.net/?retryWrites=true&w=majority" | |
| # Create a new client and connect to the server | |
| client = pymongo.MongoClient(uri, tlsCAFile=certifi.where()) | |
| # Send a ping to confirm a successful connection | |
| try: | |
| client.admin.command('ping') | |
| print("Pinged your deployment. You successfully connected to MongoDB!") | |
| except Exception as e: | |
| print(e) | |
| # Access your database | |
| db = client.get_database('camila') | |
| records = db.info | |
| last_used_id = 0 | |
| def generate_unique_id(): | |
| global last_used_id | |
| last_used_id += random.randint(100, 1000000) | |
| return last_used_id | |
| def clear_button_callback(): | |
| global last_used_id | |
| # Generate a new unique ID when the clear button is clicked | |
| ID = generate_unique_id() | |
| # Update the ID attribute in the chatbot object | |
| chatbot.ID = ID | |
| def save(ID, response, message_text): | |
| # Check if the user already has a collection | |
| records.find_one({'ID': ID}) | |
| records.update_one({'ID': ID}, | |
| {'$push':{'message': {'role': 'user', 'content': f'{message_text}'}}}) | |
| records.update_one({'ID': ID}, | |
| {'$push':{'message': {'role': 'assistant', 'content': f'{response}'}}}) | |
| return response | |
| ######################################### | |
| ######################################### | |
| def LLM_call(message_log): | |
| serverless_api_id = '4whzcbwuriohqh' | |
| # Define the URL you want to send the request to | |
| url = f"https://api.runpod.ai/v2/{serverless_api_id}/run" | |
| # Define your custom headers | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Accept": "application/json", | |
| "Content-Type": "application/json" | |
| } | |
| # Define your data (this could also be a JSON payload) | |
| data = { | |
| "input": { | |
| "prompt": message_log, | |
| "max_new_tokens": 4500, | |
| "temperature": 0.7, | |
| "top_k": 50, | |
| "top_p": 0.9, | |
| "repetition_penalty": 1.2, | |
| "batch_size": 8, | |
| "stop": ["</s>"] | |
| } | |
| } | |
| # Send the POST request with headers and data | |
| call = requests.post(url, headers=headers, json=data) | |
| response_data = call.json() | |
| msg_id = response_data['id'] | |
| print("Message ID:", msg_id) | |
| output = "Output not available" | |
| # Poll the API until the response is ready | |
| while True: | |
| # Get the status using the message ID | |
| response = requests.get(f"https://api.runpod.ai/v2/{serverless_api_id}/status/{msg_id}", headers=headers) | |
| if response.status_code == 200: | |
| response_data = response.json() | |
| status = response_data.get('status') | |
| if status == 'COMPLETED': | |
| # Access the 'output' directly from the response | |
| output = response_data.get('output', 'Output not available') | |
| print("Response content:", output) | |
| break # Exit the loop once the response is ready | |
| elif status == 'FAILED': | |
| error_message = response_data.get('error', 'Unknown error') | |
| print("Request failed. Reason:", error_message) | |
| break # Exit the loop if the request failed | |
| else: | |
| print("Failed to get status. HTTP status code:", response.status_code) | |
| # Wait for a short time before polling again (e.g., 2 seconds) | |
| time.sleep(2) | |
| return output | |
| def Chat_call(chat,prompt): | |
| global last_used_id | |
| # Use the last used ID | |
| ID = last_used_id | |
| existing_user_doc = records.find_one({'ID': ID}) | |
| if existing_user_doc: | |
| message_log = [{"role": "system", "content": f"{prompt}"}, | |
| ] | |
| messages = existing_user_doc['message'] | |
| if len(messages)>5: | |
| messages = messages[-5:] | |
| message_log.extend(messages) | |
| new_message = {"role": "user", "content": chat} | |
| message_log.append(new_message) | |
| response = LLM_call(message_log) | |
| else: | |
| new_user_doc = { | |
| 'ID': ID, | |
| 'message': [] | |
| } | |
| records.insert_one(new_user_doc) | |
| response = "Hello" | |
| response = save(ID, response, chat) | |
| return response | |
| with gr.Blocks() as demo: | |
| chatbot = gr.Chatbot() | |
| chatbot.ID = 10 # Initialize the ID attribute | |
| msg = gr.Textbox(label = "Chat") | |
| prompt = gr.Textbox(label ="Prompt") | |
| clear = gr.ClearButton([msg, chatbot,prompt]) | |
| def respond(message, chat_history,prompt): | |
| bot_message = Chat_call(message,prompt) | |
| chat_history.append((message, bot_message)) | |
| time.sleep(2) | |
| return "", chat_history,prompt | |
| msg.submit(respond, [msg, chatbot, prompt], [msg, chatbot, prompt]) | |
| # Add an event listener to the Chatbot to update the ID when the button is clicked | |
| clear.click(lambda: clear_button_callback()) | |
| if __name__ == "__main__": | |
| demo.launch() | |