Spaces:
Running
Running
| import http.server | |
| import socketserver | |
| import json | |
| from curl_cffi import requests | |
| from openai import OpenAI | |
| import re | |
| import cgi | |
| import urllib.parse | |
| from html import escape | |
| PORT = 7860 | |
| STT_URL = "https://multi-modal.ai.cloudflare.com/api/inference?model=@cf/deepgram/nova-3&field=audio" | |
| TTS_URL = "https://multi-modal.ai.cloudflare.com/api/inference" | |
| client = OpenAI( | |
| base_url="https://integrate.api.nvidia.com/v1", | |
| api_key="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" | |
| ) | |
| def simple_md(text): | |
| text = re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', text) | |
| text = re.sub(r'\*(.*?)\*', r'<i>\1</i>', text) | |
| text = re.sub(r'`(.*?)`', r'<code>\1</code>', text) | |
| text = re.sub(r'\n', r'<br>', text) | |
| return text | |
| MAIN_HTML = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Multi-Modal Playground</title> | |
| <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css"> | |
| </head> | |
| <body> | |
| <h1>Multi-Modal Playground</h1> | |
| <h2>STT (Nova-3)</h2> | |
| <form action="/stt" method="post" enctype="multipart/form-data"> | |
| <p>Upload audio:</p> | |
| <input type="file" name="audio" accept="audio/*" required> | |
| <button type="submit">Transcribe</button> | |
| </form> | |
| <h2>TTS (AURA-1)</h2> | |
| <form action="/tts" method="post"> | |
| <p>Enter text:</p> | |
| <input type="text" name="text" placeholder="Enter text to speak" style="width:100%;" required> | |
| <button type="submit">Generate Audio</button> | |
| </form> | |
| <h2>Chat (Llama) - Single Turn</h2> | |
| <form action="/chat" method="post"> | |
| <p>Enter message:</p> | |
| <textarea name="message" placeholder="Type your message..." style="width:100%; height:60px;" required></textarea> | |
| <button type="submit">Send</button> | |
| </form> | |
| <h2>Voice Chat</h2> | |
| <p><a href="/voicechat">Go to Voice Chat</a></p> | |
| <hr> | |
| <a href="/">Refresh Playground</a> | |
| </body> | |
| </html> | |
| """ | |
| VOICECHAT_HTML = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Voice Chat</title> | |
| <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css"> | |
| </head> | |
| <body> | |
| <h1>Voice Chat</h1> | |
| <div id="messages" style="height:300px;overflow-y:scroll;border:1px solid #ccc;padding:10px;margin-bottom:10px;background:#eee;"></div> | |
| <button id="micBtn" onclick="toggleRecord()" style="font-size:48px;"><i class="fas fa-microphone"></i></button> | |
| <p>Status: <span id="statusVoice">Click to start recording</span></p> | |
| <audio id="voicePlayer" style="display:none;"></audio> | |
| <p><a href="/">Back to Playground</a></p> | |
| <script> | |
| let chatMessages = []; | |
| let mediaRecorder; | |
| let audioChunks = []; | |
| let voiceStream; | |
| function renderMD(text) { | |
| return text.replace(/\*\*(.*?)\*\*/g, '<b>$1</b>') | |
| .replace(/\*(.*?)\*/g, '<i>$1</i>') | |
| .replace(/`(.*?)`/g, '<code>$1</code>') | |
| .replace(/\n/g, '<br>'); | |
| } | |
| function addMessage(role, content) { | |
| const div = document.getElementById('messages'); | |
| const msg = document.createElement('div'); | |
| msg.innerHTML = `<strong>${role}:</strong> ${renderMD(content)}`; | |
| div.appendChild(msg); | |
| div.scrollTop = div.scrollHeight; | |
| } | |
| async function toggleRecord() { | |
| const btn = document.getElementById('micBtn'); | |
| if (!mediaRecorder || mediaRecorder.state === 'inactive') { | |
| try { | |
| voiceStream = await navigator.mediaDevices.getUserMedia({audio: true}); | |
| mediaRecorder = new MediaRecorder(voiceStream); | |
| audioChunks = []; | |
| mediaRecorder.ondataavailable = e => audioChunks.push(e.data); | |
| mediaRecorder.onstop = processVoice; | |
| mediaRecorder.start(); | |
| btn.style.color = 'red'; | |
| document.getElementById('statusVoice').innerText = 'Recording... Click to stop'; | |
| } catch (e) { | |
| console.error('Mic error:', e); | |
| document.getElementById('statusVoice').innerText = 'Error accessing mic'; | |
| } | |
| } else { | |
| mediaRecorder.stop(); | |
| btn.style.color = 'black'; | |
| document.getElementById('statusVoice').innerText = 'Processing...'; | |
| } | |
| } | |
| async function processVoice() { | |
| const audioBlob = new Blob(audioChunks, {type: 'audio/webm'}); | |
| if (voiceStream) { | |
| voiceStream.getTracks().forEach(track => track.stop()); | |
| } | |
| document.getElementById('statusVoice').innerText = 'Transcribing...'; | |
| try { | |
| const sttRes = await fetch('/api/stt', {method: 'POST', body: audioBlob}); | |
| const sttData = await sttRes.json(); | |
| let userText = ''; | |
| if (sttData.results && sttData.results.channels && sttData.results.channels[0] && | |
| sttData.results.channels[0].alternatives && sttData.results.channels[0].alternatives[0]) { | |
| userText = sttData.results.channels[0].alternatives[0].transcript; | |
| } | |
| if (!userText) { | |
| document.getElementById('statusVoice').innerText = 'No speech detected'; | |
| return; | |
| } | |
| addMessage('user', userText); | |
| chatMessages.push({role: 'user', content: userText}); | |
| document.getElementById('statusVoice').innerText = 'Thinking...'; | |
| const chatRes = await fetch('/api/chat', { | |
| method: 'POST', | |
| headers: {'Content-Type': 'application/json'}, | |
| body: JSON.stringify({messages: chatMessages}) | |
| }); | |
| const chatData = await chatRes.json(); | |
| const response = chatData.response; | |
| addMessage('assistant', response); | |
| chatMessages.push({role: 'assistant', content: response}); | |
| document.getElementById('statusVoice').innerText = 'Generating speech...'; | |
| const ttsRes = await fetch('/api/tts', { | |
| method: 'POST', | |
| headers: {'Content-Type': 'application/json'}, | |
| body: JSON.stringify({text: response}) | |
| }); | |
| const ttsData = await ttsRes.json(); | |
| const audioPlayer = document.getElementById('voicePlayer'); | |
| audioPlayer.src = 'data:audio/webm;base64,' + ttsData.audio; | |
| audioPlayer.play(); | |
| document.getElementById('statusVoice').innerText = 'Done'; | |
| } catch (e) { | |
| console.error('Voice process error:', e); | |
| document.getElementById('statusVoice').innerText = 'Error'; | |
| } | |
| } | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| STT_RESULT_HTML = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head><title>STT Result</title></head> | |
| <body> | |
| <h1>STT Transcription Result</h1> | |
| <pre style="background:#eee;padding:10px;white-space:pre-wrap;">{result}</pre> | |
| <a href="/">Back to Playground</a> | |
| </body> | |
| </html> | |
| """ | |
| TTS_RESULT_HTML = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head><title>TTS Result</title></head> | |
| <body> | |
| <h1>TTS Generated Audio</h1> | |
| <audio controls style="width:100%;"> | |
| <source src="data:audio/webm;base64,{audio_b64}" type="audio/webm"> | |
| Your browser does not support the audio element. | |
| </audio> | |
| <p><a href="/">Back to Playground</a></p> | |
| </body> | |
| </html> | |
| """ | |
| CHAT_RESULT_HTML = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head><title>Chat Result</title></head> | |
| <body> | |
| <h1>Chat Response</h1> | |
| <div style="border:1px solid #ccc;padding:10px;margin-bottom:10px;background:#eee;"> | |
| <strong>You:</strong> {user_message}<br><br> | |
| <strong>Assistant:</strong> {response} | |
| </div> | |
| <p><a href="/">Back to Playground</a></p> | |
| </body> | |
| </html> | |
| """ | |
| class Handler(http.server.BaseHTTPRequestHandler): | |
| def do_GET(self): | |
| if self.path.split('?')[0] == '/': | |
| self.send_response(200) | |
| self.send_header("Content-type", "text/html") | |
| self.end_headers() | |
| self.wfile.write(MAIN_HTML.encode()) | |
| elif self.path.split('?')[0] == '/voicechat': | |
| self.send_response(200) | |
| self.send_header("Content-type", "text/html") | |
| self.end_headers() | |
| self.wfile.write(VOICECHAT_HTML.encode()) | |
| else: | |
| self.send_error(404) | |
| def do_POST(self): | |
| if self.path == '/api/stt': | |
| content_length = int(self.headers['Content-Length']) | |
| body = self.rfile.read(content_length) | |
| r = requests.post(STT_URL, data=body, impersonate="chrome") | |
| self.send_response(200) | |
| self.send_header("Content-type", "application/json") | |
| self.end_headers() | |
| self.wfile.write(r.content) | |
| return | |
| elif self.path == '/stt': | |
| form = cgi.FieldStorage( | |
| fp=self.rfile, | |
| headers=self.headers, | |
| environ={'REQUEST_METHOD': 'POST'} | |
| ) | |
| if 'audio' in form: | |
| fileitem = form['audio'] | |
| if fileitem.file: | |
| body = fileitem.file.read() | |
| r = requests.post(STT_URL, data=body, impersonate="chrome") | |
| try: | |
| result_json = json.dumps(r.json(), indent=2) | |
| except: | |
| result_json = str(r.text) | |
| result_html = STT_RESULT_HTML.format(result=escape(result_json)) | |
| self.send_response(200) | |
| self.send_header("Content-type", "text/html") | |
| self.end_headers() | |
| self.wfile.write(result_html.encode()) | |
| return | |
| self.send_error(400, "No audio file") | |
| elif self.path == '/tts': | |
| form = cgi.FieldStorage( | |
| fp=self.rfile, | |
| headers=self.headers, | |
| environ={'REQUEST_METHOD': 'POST'} | |
| ) | |
| if 'text' in form: | |
| text = form['text'].value.strip() | |
| if text: | |
| tts_payload = {"model": "@cf/myshell-ai/melotts", "params": {"prompt": text}} | |
| r = requests.post(TTS_URL, json=tts_payload, impersonate="chrome") | |
| resp_data = r.json() | |
| audio_b64 = resp_data["response"]["audio"] | |
| result_html = TTS_RESULT_HTML.format(audio_b64=escape(audio_b64)) | |
| self.send_response(200) | |
| self.send_header("Content-type", "text/html") | |
| self.end_headers() | |
| self.wfile.write(result_html.encode()) | |
| return | |
| self.send_error(400, "No text provided") | |
| elif self.path == '/chat': | |
| form = cgi.FieldStorage( | |
| fp=self.rfile, | |
| headers=self.headers, | |
| environ={'REQUEST_METHOD': 'POST'} | |
| ) | |
| if 'message' in form: | |
| user_message = form['message'].value.strip() | |
| if user_message: | |
| messages = [{"role": "user", "content": user_message}] | |
| completion = client.chat.completions.create( | |
| model="meta/llama-3.2-1b-instruct", | |
| messages=messages, | |
| temperature=0.2, | |
| top_p=0.7, | |
| max_tokens=1024, | |
| stream=False | |
| ) | |
| response_text = completion.choices[0].message.content | |
| response_html = simple_md(escape(response_text)) | |
| result_html = CHAT_RESULT_HTML.format( | |
| user_message=escape(user_message), | |
| response=response_html | |
| ) | |
| self.send_response(200) | |
| self.send_header("Content-type", "text/html") | |
| self.end_headers() | |
| self.wfile.write(result_html.encode()) | |
| return | |
| self.send_error(400, "No message provided") | |
| elif self.path == '/api/tts': | |
| content_length = int(self.headers['Content-Length']) | |
| body_str = self.rfile.read(content_length).decode('utf-8') | |
| req_data = json.loads(body_str) | |
| text = req_data['text'] | |
| tts_payload = {"model": "@cf/myshell-ai/melotts", "params": {"prompt": text}} | |
| r = requests.post(TTS_URL, json=tts_payload, impersonate="chrome") | |
| resp_data = r.json() | |
| audio_b64 = resp_data["response"]["audio"] | |
| response = {"audio": audio_b64} | |
| self.send_response(200) | |
| self.send_header("Content-type", "application/json") | |
| self.end_headers() | |
| self.wfile.write(json.dumps(response).encode()) | |
| elif self.path == '/api/chat': | |
| content_length = int(self.headers['Content-Length']) | |
| body_str = self.rfile.read(content_length).decode('utf-8') | |
| req_data = json.loads(body_str) | |
| messages = req_data['messages'] | |
| completion = client.chat.completions.create( | |
| model="meta/llama-3.2-1b-instruct", | |
| messages=messages, | |
| temperature=0.2, | |
| top_p=0.7, | |
| max_tokens=1024, | |
| stream=False | |
| ) | |
| response_text = completion.choices[0].message.content | |
| response = {"response": response_text} | |
| self.send_response(200) | |
| self.send_header("Content-type", "application/json") | |
| self.end_headers() | |
| self.wfile.write(json.dumps(response).encode()) | |
| else: | |
| self.send_error(404) | |
| with socketserver.TCPServer(("0.0.0.0", PORT), Handler) as d: | |
| print(f"Server: {PORT}") | |
| d.serve_forever() |