test-2 / app.py
GamerC0der's picture
Update app.py
afc5457 verified
import http.server
import socketserver
import json
from curl_cffi import requests
from openai import OpenAI
import re
import cgi
import urllib.parse
from html import escape
PORT = 7860
STT_URL = "https://multi-modal.ai.cloudflare.com/api/inference?model=@cf/deepgram/nova-3&field=audio"
TTS_URL = "https://multi-modal.ai.cloudflare.com/api/inference"
client = OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key="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"
)
def simple_md(text):
text = re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', text)
text = re.sub(r'\*(.*?)\*', r'<i>\1</i>', text)
text = re.sub(r'`(.*?)`', r'<code>\1</code>', text)
text = re.sub(r'\n', r'<br>', text)
return text
MAIN_HTML = """
<!DOCTYPE html>
<html>
<head>
<title>Multi-Modal Playground</title>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css">
</head>
<body>
<h1>Multi-Modal Playground</h1>
<h2>STT (Nova-3)</h2>
<form action="/stt" method="post" enctype="multipart/form-data">
<p>Upload audio:</p>
<input type="file" name="audio" accept="audio/*" required>
<button type="submit">Transcribe</button>
</form>
<h2>TTS (AURA-1)</h2>
<form action="/tts" method="post">
<p>Enter text:</p>
<input type="text" name="text" placeholder="Enter text to speak" style="width:100%;" required>
<button type="submit">Generate Audio</button>
</form>
<h2>Chat (Llama) - Single Turn</h2>
<form action="/chat" method="post">
<p>Enter message:</p>
<textarea name="message" placeholder="Type your message..." style="width:100%; height:60px;" required></textarea>
<button type="submit">Send</button>
</form>
<h2>Voice Chat</h2>
<p><a href="/voicechat">Go to Voice Chat</a></p>
<hr>
<a href="/">Refresh Playground</a>
</body>
</html>
"""
VOICECHAT_HTML = """
<!DOCTYPE html>
<html>
<head>
<title>Voice Chat</title>
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css">
</head>
<body>
<h1>Voice Chat</h1>
<div id="messages" style="height:300px;overflow-y:scroll;border:1px solid #ccc;padding:10px;margin-bottom:10px;background:#eee;"></div>
<button id="micBtn" onclick="toggleRecord()" style="font-size:48px;"><i class="fas fa-microphone"></i></button>
<p>Status: <span id="statusVoice">Click to start recording</span></p>
<audio id="voicePlayer" style="display:none;"></audio>
<p><a href="/">Back to Playground</a></p>
<script>
let chatMessages = [];
let mediaRecorder;
let audioChunks = [];
let voiceStream;
function renderMD(text) {
return text.replace(/\*\*(.*?)\*\*/g, '<b>$1</b>')
.replace(/\*(.*?)\*/g, '<i>$1</i>')
.replace(/`(.*?)`/g, '<code>$1</code>')
.replace(/\n/g, '<br>');
}
function addMessage(role, content) {
const div = document.getElementById('messages');
const msg = document.createElement('div');
msg.innerHTML = `<strong>${role}:</strong> ${renderMD(content)}`;
div.appendChild(msg);
div.scrollTop = div.scrollHeight;
}
async function toggleRecord() {
const btn = document.getElementById('micBtn');
if (!mediaRecorder || mediaRecorder.state === 'inactive') {
try {
voiceStream = await navigator.mediaDevices.getUserMedia({audio: true});
mediaRecorder = new MediaRecorder(voiceStream);
audioChunks = [];
mediaRecorder.ondataavailable = e => audioChunks.push(e.data);
mediaRecorder.onstop = processVoice;
mediaRecorder.start();
btn.style.color = 'red';
document.getElementById('statusVoice').innerText = 'Recording... Click to stop';
} catch (e) {
console.error('Mic error:', e);
document.getElementById('statusVoice').innerText = 'Error accessing mic';
}
} else {
mediaRecorder.stop();
btn.style.color = 'black';
document.getElementById('statusVoice').innerText = 'Processing...';
}
}
async function processVoice() {
const audioBlob = new Blob(audioChunks, {type: 'audio/webm'});
if (voiceStream) {
voiceStream.getTracks().forEach(track => track.stop());
}
document.getElementById('statusVoice').innerText = 'Transcribing...';
try {
const sttRes = await fetch('/api/stt', {method: 'POST', body: audioBlob});
const sttData = await sttRes.json();
let userText = '';
if (sttData.results && sttData.results.channels && sttData.results.channels[0] &&
sttData.results.channels[0].alternatives && sttData.results.channels[0].alternatives[0]) {
userText = sttData.results.channels[0].alternatives[0].transcript;
}
if (!userText) {
document.getElementById('statusVoice').innerText = 'No speech detected';
return;
}
addMessage('user', userText);
chatMessages.push({role: 'user', content: userText});
document.getElementById('statusVoice').innerText = 'Thinking...';
const chatRes = await fetch('/api/chat', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({messages: chatMessages})
});
const chatData = await chatRes.json();
const response = chatData.response;
addMessage('assistant', response);
chatMessages.push({role: 'assistant', content: response});
document.getElementById('statusVoice').innerText = 'Generating speech...';
const ttsRes = await fetch('/api/tts', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({text: response})
});
const ttsData = await ttsRes.json();
const audioPlayer = document.getElementById('voicePlayer');
audioPlayer.src = 'data:audio/webm;base64,' + ttsData.audio;
audioPlayer.play();
document.getElementById('statusVoice').innerText = 'Done';
} catch (e) {
console.error('Voice process error:', e);
document.getElementById('statusVoice').innerText = 'Error';
}
}
</script>
</body>
</html>
"""
STT_RESULT_HTML = """
<!DOCTYPE html>
<html>
<head><title>STT Result</title></head>
<body>
<h1>STT Transcription Result</h1>
<pre style="background:#eee;padding:10px;white-space:pre-wrap;">{result}</pre>
<a href="/">Back to Playground</a>
</body>
</html>
"""
TTS_RESULT_HTML = """
<!DOCTYPE html>
<html>
<head><title>TTS Result</title></head>
<body>
<h1>TTS Generated Audio</h1>
<audio controls style="width:100%;">
<source src="data:audio/webm;base64,{audio_b64}" type="audio/webm">
Your browser does not support the audio element.
</audio>
<p><a href="/">Back to Playground</a></p>
</body>
</html>
"""
CHAT_RESULT_HTML = """
<!DOCTYPE html>
<html>
<head><title>Chat Result</title></head>
<body>
<h1>Chat Response</h1>
<div style="border:1px solid #ccc;padding:10px;margin-bottom:10px;background:#eee;">
<strong>You:</strong> {user_message}<br><br>
<strong>Assistant:</strong> {response}
</div>
<p><a href="/">Back to Playground</a></p>
</body>
</html>
"""
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path.split('?')[0] == '/':
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(MAIN_HTML.encode())
elif self.path.split('?')[0] == '/voicechat':
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(VOICECHAT_HTML.encode())
else:
self.send_error(404)
def do_POST(self):
if self.path == '/api/stt':
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
r = requests.post(STT_URL, data=body, impersonate="chrome")
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(r.content)
return
elif self.path == '/stt':
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={'REQUEST_METHOD': 'POST'}
)
if 'audio' in form:
fileitem = form['audio']
if fileitem.file:
body = fileitem.file.read()
r = requests.post(STT_URL, data=body, impersonate="chrome")
try:
result_json = json.dumps(r.json(), indent=2)
except:
result_json = str(r.text)
result_html = STT_RESULT_HTML.format(result=escape(result_json))
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(result_html.encode())
return
self.send_error(400, "No audio file")
elif self.path == '/tts':
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={'REQUEST_METHOD': 'POST'}
)
if 'text' in form:
text = form['text'].value.strip()
if text:
tts_payload = {"model": "@cf/myshell-ai/melotts", "params": {"prompt": text}}
r = requests.post(TTS_URL, json=tts_payload, impersonate="chrome")
resp_data = r.json()
audio_b64 = resp_data["response"]["audio"]
result_html = TTS_RESULT_HTML.format(audio_b64=escape(audio_b64))
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(result_html.encode())
return
self.send_error(400, "No text provided")
elif self.path == '/chat':
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={'REQUEST_METHOD': 'POST'}
)
if 'message' in form:
user_message = form['message'].value.strip()
if user_message:
messages = [{"role": "user", "content": user_message}]
completion = client.chat.completions.create(
model="meta/llama-3.2-1b-instruct",
messages=messages,
temperature=0.2,
top_p=0.7,
max_tokens=1024,
stream=False
)
response_text = completion.choices[0].message.content
response_html = simple_md(escape(response_text))
result_html = CHAT_RESULT_HTML.format(
user_message=escape(user_message),
response=response_html
)
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(result_html.encode())
return
self.send_error(400, "No message provided")
elif self.path == '/api/tts':
content_length = int(self.headers['Content-Length'])
body_str = self.rfile.read(content_length).decode('utf-8')
req_data = json.loads(body_str)
text = req_data['text']
tts_payload = {"model": "@cf/myshell-ai/melotts", "params": {"prompt": text}}
r = requests.post(TTS_URL, json=tts_payload, impersonate="chrome")
resp_data = r.json()
audio_b64 = resp_data["response"]["audio"]
response = {"audio": audio_b64}
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(response).encode())
elif self.path == '/api/chat':
content_length = int(self.headers['Content-Length'])
body_str = self.rfile.read(content_length).decode('utf-8')
req_data = json.loads(body_str)
messages = req_data['messages']
completion = client.chat.completions.create(
model="meta/llama-3.2-1b-instruct",
messages=messages,
temperature=0.2,
top_p=0.7,
max_tokens=1024,
stream=False
)
response_text = completion.choices[0].message.content
response = {"response": response_text}
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(response).encode())
else:
self.send_error(404)
with socketserver.TCPServer(("0.0.0.0", PORT), Handler) as d:
print(f"Server: {PORT}")
d.serve_forever()