# pip install flask google-genai import os, time, base64, struct, re from flask import Flask, request, render_template_string, Response, stream_with_context from google import genai from google.genai import types import json from concurrent.futures import ThreadPoolExecutor from queue import Queue import threading app = Flask(__name__) HTML = """ Gemini Multi (Text → Chunked Streaming TTS)

Gemini Multi (Text + Image → Chunked Streaming TTS)










  
""" client = genai.Client(api_key="AIzaSyDolbPUZBPUPvQUu-RGktJmvnUpkcEKIYo") def wrap_pcm_to_wav(pcm_data: bytes, sample_rate=24000, num_channels=1, bits_per_sample=16) -> bytes: byte_rate = sample_rate * num_channels * bits_per_sample // 8 block_align = num_channels * bits_per_sample // 8 data_size = len(pcm_data) header = b"RIFF" + struct.pack(" str: if getattr(resp, "text", None): return resp.text parts_text = [] for cand in getattr(resp, "candidates", []) or []: content = getattr(cand, "content", None) parts = getattr(content, "parts", None) or [] for p in parts: if getattr(p, "text", None): parts_text.append(p.text) return "\n".join(parts_text).strip() def chunk_text(text, max_words=25): """Split text into sentence-based chunks for parallel TTS generation.""" # Split on sentence boundaries sentences = re.split(r'(?<=[.!?])\s+', text) chunks = [] current_chunk = [] current_words = 0 for sentence in sentences: words = len(sentence.split()) if current_words + words > max_words and current_chunk: chunks.append(' '.join(current_chunk)) current_chunk = [sentence] current_words = words else: current_chunk.append(sentence) current_words += words if current_chunk: chunks.append(' '.join(current_chunk)) return chunks def generate_audio_for_chunk(chunk, voice, accent, tone, chunk_index): """Generate audio for a single text chunk.""" style_prompt = f"Say the following in a {accent} accent with a {tone} tone:\n\n{chunk}" try: tts_resp = client.models.generate_content( model="gemini-2.5-flash-preview-tts", contents=[types.Content(role="user", parts=[types.Part.from_text(text=style_prompt)])], config=types.GenerateContentConfig( response_modalities=["AUDIO"], speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=voice) ) ) ) ) for cand in getattr(tts_resp, "candidates", []) or []: for p in getattr(cand.content, "parts", []): if getattr(p, "inline_data", None) and p.inline_data.data: pcm_bytes = p.inline_data.data wav = wrap_pcm_to_wav(pcm_bytes) return (chunk_index, wav) return (chunk_index, None) except Exception as e: print(f"Error generating audio for chunk {chunk_index}: {e}") return (chunk_index, None) @app.route('/') def index(): return render_template_string(HTML) @app.route('/generate_stream', methods=['POST']) def generate_stream(): def generate(): t_start = time.perf_counter() prompt = (request.form.get("text") or "").strip() file = request.files.get("image") voice = (request.form.get("voice") or "Puck").strip() accent = (request.form.get("accent") or "British").strip() tone = (request.form.get("tone") or "casual and friendly").strip() if not prompt and not file: yield f"data: {json.dumps({'error': 'No input provided'})}\n\n" return # Build multimodal input parts = [] if prompt: parts.append(types.Part.from_text(text=prompt)) if file: parts.append(types.Part.from_bytes(data=file.read(), mime_type=file.mimetype or "image/png")) # 1) Generate text t0 = time.perf_counter() try: gen_resp = client.models.generate_content( model="gemini-2.5-flash-lite", contents=[types.Content(role="user", parts=parts)], config=types.GenerateContentConfig(response_mime_type="text/plain"), ) except Exception as e: yield f"data: {json.dumps({'error': f'text generation failed: {str(e)}'})}\n\n" return t1 = time.perf_counter() final_text = extract_text(gen_resp) if not final_text: yield f"data: {json.dumps({'error': 'Text generation returned empty'})}\n\n" return # Split text into chunks for parallel processing text_chunks = chunk_text(final_text) # Send text immediately with chunk count yield f"data: {json.dumps({'type': 'text', 'text': final_text, 'chunk_count': len(text_chunks)})}\n\n" # 2) Generate audio chunks in parallel with streaming tts_start = time.perf_counter() audio_queue = Queue() def process_chunks(): with ThreadPoolExecutor(max_workers=4) as executor: futures = [] for i, chunk in enumerate(text_chunks): future = executor.submit(generate_audio_for_chunk, chunk, voice, accent, tone, i) futures.append(future) # Process results as they complete (not necessarily in order) for future in futures: try: result = future.result(timeout=30) audio_queue.put(result) except Exception as e: print(f"Error in chunk processing: {e}") audio_queue.put((None, None)) audio_queue.put(('DONE', None)) # Start parallel processing in background thread processing_thread = threading.Thread(target=process_chunks) processing_thread.start() # Stream audio chunks as they become available (in order) completed_chunks = {} next_chunk_to_send = 0 while True: chunk_index, wav_data = audio_queue.get() if chunk_index == 'DONE': break if wav_data is None: continue # Store completed chunk completed_chunks[chunk_index] = wav_data # Send chunks in order while next_chunk_to_send in completed_chunks: audio_b64 = base64.b64encode(completed_chunks[next_chunk_to_send]).decode("ascii") yield f"data: {json.dumps({'type': 'audio_chunk', 'audio_base64': audio_b64, 'chunk_index': next_chunk_to_send})}\n\n" del completed_chunks[next_chunk_to_send] next_chunk_to_send += 1 processing_thread.join() tts_end = time.perf_counter() t_total = time.perf_counter() - t_start # Send completion signal yield f"data: {json.dumps({'type': 'complete', 'timings': {'text_seconds': round(t1 - t0, 3), 'tts_seconds': round(tts_end - tts_start, 3), 'total_seconds': round(t_total, 3)}})}\n\n" return Response(stream_with_context(generate()), mimetype='text/event-stream') if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) app.run(host="0.0.0.0", port=port, threaded=True)