from fastapi import FastAPI, WebSocket, WebSocketDisconnect import random import asyncio app = FastAPI() # Predefined list of sentences to simulate a transcript SENTENCES = [ "Hello, how are you doing today?", "This is a simulated transcript.", "FastAPI makes building APIs fun and fast.", "Let's create a mock real-time transcript generator.", "This sentence is unique in the stream.", "Here's another unique sentence for variety.", "Real-time transcripts can be exciting to work with!", "The API ensures no consecutive repeats of sentences.", ] def get_random_sentence(previous_sentence: str = None) -> str: """ Returns a random sentence from the list, ensuring no consecutive repeats. """ while True: sentence = random.choice(SENTENCES) if sentence != previous_sentence: return sentence # Store the previous sentence for the normal API previous_sentence = None @app.get("/normal-transcript") def normal_transcript(): """ Normal API endpoint that returns a new sentence each time it is called. """ global previous_sentence sentence = get_random_sentence(previous_sentence) previous_sentence = sentence return {"transcript": sentence} @app.websocket("/ws/realtime-transcript") async def websocket_endpoint(websocket: WebSocket): """ WebSocket endpoint for real-time transcript streaming. Sends sentences one at a time with a delay. """ await websocket.accept() previous_sentence = None try: while True: # Generate a random sentence current_sentence = get_random_sentence(previous_sentence) # Send the sentence to the client await websocket.send_text(current_sentence) # Update the previous sentence to avoid repetition previous_sentence = current_sentence # Simulate delay for real-time streaming await asyncio.sleep(0.5) # 0.5 second delay except WebSocketDisconnect: print("Client disconnected")