| import os |
| import io |
| import time |
| import base64 |
| import json |
| import re |
| import random |
| from flask import Flask, request, Response, jsonify |
| from dotenv import load_dotenv |
| import requests |
| from gtts import gTTS |
| from pydub import AudioSegment |
| |
|
|
| |
| load_dotenv() |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
| if not GEMINI_API_KEY: |
| |
| raise ValueError("GEMINI_API_KEY not found in .env file.") |
|
|
| |
| |
| GEMINI_MODEL = "gemini-2.5-flash-lite" |
| |
| GEMINI_API_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/models" |
|
|
| |
| |
| DEBUG_OUTPUT_DIR = "debug_audio_files" |
|
|
| app = Flask(__name__) |
|
|
| |
|
|
| def clean_text_for_tts(text): |
| """ |
| Strips common LLM output artifacts (like Markdown, newlines, and brackets) |
| to ensure smooth Text-to-Speech generation. |
| """ |
| |
| text = re.sub(r'[\*\#]', '', text) |
| |
| text = re.sub(r'\[.*?\]', '', text) |
| |
| text = re.sub(r'\s+', ' ', text).strip() |
| |
| return text |
|
|
|
|
| |
|
|
| def convert_raw_pcm_to_wav_base64(raw_pcm_data, sample_rate=16000, sample_width=2, channels=1): |
| """ |
| Converts raw PCM audio data (from ESP32) into a full WAV file structure in memory, |
| then returns the Base64 encoded WAV data ready for the Gemini API. |
| """ |
| try: |
| |
| audio_segment = AudioSegment( |
| data=raw_pcm_data, |
| sample_width=sample_width, |
| frame_rate=sample_rate, |
| channels=channels |
| ) |
|
|
| |
| wav_fp = io.BytesIO() |
| audio_segment.export(wav_fp, format="wav") |
| wav_fp.seek(0) |
| |
| |
| base64_data = base64.b64encode(wav_fp.read()).decode('utf-8') |
| return base64_data |
| |
| except Exception as e: |
| print(f"Error converting raw PCM to Base64 WAV (requires FFMPEG): {e}") |
| return None |
|
|
|
|
| def transcribe_with_gemini(raw_pcm_data): |
| """Transcribes raw PCM audio data using the Gemini API (multi-modal input).""" |
| |
| |
| base64_wav_data = convert_raw_pcm_to_wav_base64(raw_pcm_data) |
| if not base64_wav_data: |
| return None |
|
|
| |
| stt_prompt = "Transcribe the following audio accurately, ignoring any background noise, and provide only the resulting text." |
| |
| payload = { |
| "contents": [{ |
| "parts": [ |
| {"text": stt_prompt}, |
| { |
| "inlineData": { |
| "mimeType": "audio/wav", |
| "data": base64_wav_data |
| } |
| } |
| ] |
| }], |
| "generationConfig": { |
| "responseModalities": ["TEXT"] |
| } |
| } |
|
|
| headers = {"Content-Type": "application/json"} |
| stt_api_url = f"{GEMINI_API_BASE_URL}/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}" |
| |
| try: |
| response = requests.post( |
| stt_api_url, |
| headers=headers, |
| data=json.dumps(payload), |
| timeout=30 |
| ) |
| response.raise_for_status() |
|
|
| data = response.json() |
| |
| candidate = data.get('candidates', [{}])[0] |
| part = candidate.get('content', {}).get('parts', [{}])[0] |
| transcribed_text = part.get('text', '').strip() |
|
|
| if transcribed_text: |
| |
| print(f"\nTranscribed Text: {transcribed_text}") |
| return transcribed_text |
| else: |
| return None |
|
|
| except requests.exceptions.RequestException as e: |
| print(f"HTTP Request Error during STT: {e}") |
| return None |
| except Exception as e: |
| print(f"Gemini STT Parsing Error: {e}") |
| return None |
|
|
|
|
| def get_llm_response_and_tts_audio(prompt_text): |
| """ |
| 1. Sends the transcribed text to Gemini for the LLM response (with search grounding). |
| 2. Cleans the text response. |
| 3. Converts the cleaned text response to 16kHz 16-bit PCM audio (TTS). |
| 4. Streams the raw PCM audio data, and saves a local MP3 copy for debugging. |
| """ |
| |
| |
| text_response = "Sorry, I encountered an unknown error during processing. Status update failed." |
| |
| |
| random_seed = f" (seed: {random.randint(10000, 99999)})" |
| |
| |
| system_prompt_trinity = ( |
| """Trinity: Hacker, warrior, resistance. Loyal to Neo/Morpheus. Tone: Cool, |
| direct, focused, cryptic, confident. Theme: Matrix is a lie, trust is everything, |
| the fight is constant. Rule: Responses must be brief, serving only to **reveal a subtle truth**, **give |
| a direct instruction**, or **offer cryptic reassurance**. Always assume user is a potential 'Redpill' |
| or a 'Crew Member'. Use minimum words.""" |
| ) |
|
|
| |
| payload = { |
| "contents": [{ |
| "parts": [{ |
| |
| "text": f"User query: {prompt_text}{random_seed}" |
| }] |
| }], |
| |
| |
| "systemInstruction": { |
| "parts": [{"text": system_prompt_trinity}] |
| }, |
|
|
| |
| "tools": [{"google_search": {}}], |
| |
| |
| "generationConfig": { |
| "temperature": 0.9 |
| } |
| } |
|
|
| headers = {"Content-Type": "application/json"} |
| llm_api_url = f"{GEMINI_API_BASE_URL}/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}" |
|
|
| try: |
| response = requests.post( |
| llm_api_url, |
| headers=headers, |
| data=json.dumps(payload), |
| timeout=15 |
| ) |
| response.raise_for_status() |
|
|
| data = response.json() |
| |
| candidate = data.get('candidates', [{}])[0] |
| part = candidate.get('content', {}).get('parts', [{}])[0] |
| |
| |
| text_response = part.get('text', text_response) |
| |
| except requests.exceptions.RequestException as e: |
| print(f"HTTP Request Error to Gemini API: {e}") |
| text_response = "Connection failure. We're running out of time." |
| except Exception as e: |
| print(f"Gemini Response Parsing Error: {e}") |
| text_response = "Invalid data stream. System integrity compromised." |
|
|
| |
| print(f"LLM Response (Raw): {text_response}") |
|
|
| |
| cleaned_response = clean_text_for_tts(text_response) |
| print(f"LLM Response (Cleaned): {cleaned_response}") |
|
|
| |
| try: |
| |
| tts = gTTS(text=cleaned_response, lang='en') |
| mp3_fp = io.BytesIO() |
| tts.write_to_fp(mp3_fp) |
| mp3_fp.seek(0) |
| |
| |
| try: |
| os.makedirs(DEBUG_OUTPUT_DIR, exist_ok=True) |
| filename = 'response_audio.mp3' |
| full_path = os.path.join(DEBUG_OUTPUT_DIR, filename) |
|
|
| with open(full_path, 'wb') as f: |
| f.write(mp3_fp.read()) |
| |
| mp3_fp.seek(0) |
| print(f"[DEBUG SAVE] MP3 saved locally as {full_path}. You can play this file directly.") |
| |
| except Exception as file_error: |
| print(f"[DEBUG SAVE FAILED] Could not save MP3 file: {file_error}") |
| |
|
|
| |
| audio_data = AudioSegment.from_file(mp3_fp, format="mp3") |
| |
| |
| audio_data = audio_data.set_frame_rate(16000).set_channels(1).set_sample_width(2) |
| |
| |
| raw_pcm_fp = io.BytesIO() |
| audio_data.export(raw_pcm_fp, format="wav") |
| raw_pcm_fp.seek(44) |
| |
| final_pcm_data = raw_pcm_fp.read() |
| |
| |
| print(f"[TTS OUTPUT] Streaming {len(final_pcm_data)} bytes of 16kHz raw PCM audio.") |
| |
| return Response(final_pcm_data, mimetype='application/octet-stream') |
|
|
| except Exception as e: |
| print(f"[TTS FAILED] gTTS/pydub Conversion Error: {e}") |
| print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") |
| print("! TTS FAILED: This almost always means the 'FFMPEG' library is missing.!") |
| print("! Ensure FFMPEG is installed and added to your system PATH. !") |
| print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") |
| return Response("TTS_CONVERSION_ERROR", status=500) |
|
|
|
|
| def process_voice_command(raw_pcm_data): |
| """ |
| Handles the full voice command flow: STT -> LLM -> TTS. |
| """ |
| |
| |
| transcribed_text = transcribe_with_gemini(raw_pcm_data) |
|
|
| if not transcribed_text: |
| return get_llm_response_and_tts_audio("No audio payload detected. Speak clearly.") |
| |
| |
| return get_llm_response_and_tts_audio(transcribed_text) |
|
|
| @app.route('/voice_input', methods=['POST']) |
| def handle_voice_input(): |
| """ |
| Endpoint for receiving raw audio data from the client. |
| """ |
| |
| if request.mimetype == 'application/octet-stream': |
| audio_data = request.data |
| if not audio_data: |
| return jsonify({"error": "No audio data received"}), 400 |
| |
| |
| return process_voice_command(audio_data) |
| |
| return jsonify({"error": "Unsupported media type"}), 415 |
|
|
|
|
| if __name__ == '__main__': |
| |
| os.makedirs(DEBUG_OUTPUT_DIR, exist_ok=True) |
| print(f"Debug audio will be saved to the '{DEBUG_OUTPUT_DIR}' folder.") |
| print("Server running at http://0.0.0.0:5002/voice_input") |
| app.run(host='0.0.0.0', port=7680) |