Spaces:
Sleeping
Sleeping
| # from fastapi import FastAPI, Response | |
| # from fastapi.responses import FileResponse | |
| # from kokoro import KPipeline | |
| # import soundfile as sf | |
| # import os | |
| # import numpy as np | |
| # import torch | |
| # from huggingface_hub import InferenceClient | |
| # def llm_chat_response(text): | |
| # HF_TOKEN = os.getenv("HF_TOKEN") | |
| # client = InferenceClient(api_key=HF_TOKEN) | |
| # messages = [ | |
| # { | |
| # "role": "user", | |
| # "content": [ | |
| # { | |
| # "type": "text", | |
| # "text": text + str('describe in one line only') | |
| # } #, | |
| # # { | |
| # # "type": "image_url", | |
| # # "image_url": { | |
| # # "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" | |
| # # } | |
| # # } | |
| # ] | |
| # } | |
| # ] | |
| # response_from_llama = client.chat.completions.create( | |
| # model="meta-llama/Llama-3.2-11B-Vision-Instruct", | |
| # messages=messages, | |
| # max_tokens=500) | |
| # return response_from_llama.choices[0].message['content'] | |
| # app = FastAPI() | |
| # # Initialize pipeline once at startup | |
| # pipeline = KPipeline(lang_code='a') | |
| # @app.post("/generate") | |
| # async def generate_audio(text: str, voice: str = "af_heart", speed: float = 1.0): | |
| # text_reply = llm_chat_response(text) | |
| # # Generate audio | |
| # generator = pipeline( | |
| # text_reply, | |
| # voice=voice, | |
| # speed=speed, | |
| # split_pattern=r'\n+' | |
| # ) | |
| # # # Save first segment only for demo | |
| # # for i, (gs, ps, audio) in enumerate(generator): | |
| # # sf.write(f"output_{i}.wav", audio, 24000) | |
| # # return FileResponse( | |
| # # f"output_{i}.wav", | |
| # # media_type="audio/wav", | |
| # # filename="output.wav" | |
| # # ) | |
| # # return Response("No audio generated", status_code=400) | |
| # # Process only the first segment for demo | |
| # for i, (gs, ps, audio) in enumerate(generator): | |
| # # Convert PyTorch tensor to NumPy array | |
| # audio_numpy = audio.cpu().numpy() | |
| # # Convert to 16-bit PCM | |
| # # Ensure the audio is in the range [-1, 1] | |
| # audio_numpy = np.clip(audio_numpy, -1, 1) | |
| # # Convert to 16-bit signed integers | |
| # pcm_data = (audio_numpy * 32767).astype(np.int16) | |
| # # Convert to bytes (automatically uses row-major order) | |
| # raw_audio = pcm_data.tobytes() | |
| # # Return PCM data with minimal necessary headers | |
| # return Response( | |
| # content=raw_audio, | |
| # media_type="application/octet-stream", | |
| # headers={ | |
| # "Content-Disposition": f'attachment; filename="output.pcm"', | |
| # "X-Sample-Rate": "24000", | |
| # "X-Bits-Per-Sample": "16", | |
| # "X-Endianness": "little" | |
| # } | |
| # ) | |
| # return Response("No audio generated", status_code=400) | |
| from fastapi import FastAPI, Response, HTTPException, Request | |
| from fastapi.responses import JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from kokoro import KPipeline | |
| import os | |
| import numpy as np | |
| import torch | |
| from huggingface_hub import InferenceClient | |
| from pydantic import BaseModel | |
| import base64 | |
| import logging | |
| from typing import Optional, ClassVar, List | |
| import uuid | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class TextImageRequest(BaseModel): | |
| text: Optional[str] = None | |
| image_base64: Optional[str] = None | |
| voice: str = "af_heart" # Default voice that we know exists | |
| speed: float = 1.0 | |
| # Annotate as a ClassVar so Pydantic ignores it as a field. | |
| AVAILABLE_VOICES: ClassVar[List[str]] = ["af_heart"] | |
| def validate_voice(self): | |
| if self.voice not in self.AVAILABLE_VOICES: | |
| return "af_heart" | |
| return self.voice | |
| class AudioResponse(BaseModel): | |
| status: str | |
| message: str | |
| class ErrorResponse(BaseModel): | |
| error: str | |
| detail: Optional[str] = None | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Text-to-Speech API with Vision Support", | |
| description="API for generating speech from text with optional image analysis", | |
| version="1.0.0" | |
| ) | |
| # Create and mount static images directory so images are accessible via URL | |
| STATIC_DIR = "static_images" | |
| if not os.path.exists(STATIC_DIR): | |
| os.makedirs(STATIC_DIR) | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
| def llm_chat_response(text, image_base64=None): | |
| """Get responses from LLM with text and optionally an image input.""" | |
| try: | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| logger.info("Checking HF_TOKEN...") | |
| if not HF_TOKEN: | |
| logger.error("HF_TOKEN not found in environment variables") | |
| raise HTTPException(status_code=500, detail="HF_TOKEN not configured") | |
| logger.info("Initializing InferenceClient...") | |
| client = InferenceClient( | |
| provider="hf-inference", # Using the correct provider as per sample | |
| api_key=HF_TOKEN | |
| ) | |
| if image_base64: | |
| logger.info("Processing request with image") | |
| # Save the base64 image to the static folder | |
| filename = f"{uuid.uuid4()}.jpg" | |
| image_path = os.path.join(STATIC_DIR, filename) | |
| try: | |
| image_data = base64.b64decode(image_base64) | |
| except Exception as e: | |
| logger.error(f"Error decoding base64 image: {str(e)}") | |
| raise HTTPException(status_code=400, detail="Invalid base64 image data") | |
| with open(image_path, "wb") as f: | |
| f.write(image_data) | |
| # Construct image URL (assumes BASE_URL environment variable or defaults to localhost) | |
| base_url = os.getenv("BASE_URL", "http://localhost:8000") | |
| image_url = f"{base_url}/static/{filename}" | |
| prompt = text if text else "Describe this image in one sentence." | |
| # Construct message exactly as in the reference | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| {"type": "image_url", "image_url": {"url": image_url}} | |
| ] | |
| } | |
| ] | |
| else: | |
| logger.info("Processing text-only request") | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": text + " Describe in one line only." | |
| } | |
| ] | |
| logger.info("Sending request to model...") | |
| logger.info(f"Message structure: {messages}") | |
| completion = client.chat.completions.create( | |
| model="meta-llama/Llama-3.2-11B-Vision-Instruct", | |
| messages=messages, | |
| max_tokens=500 | |
| ) | |
| logger.info("Received response from model") | |
| logger.info(f"Model response received: {completion}") | |
| try: | |
| response = completion.choices[0].message.content | |
| logger.info(f"Extracted response content: {response}") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error extracting message content: {str(e)}") | |
| try: | |
| if hasattr(completion.choices[0], "message") and hasattr(completion.choices[0].message, "content"): | |
| return completion.choices[0].message.content | |
| return completion.choices[0]["message"]["content"] | |
| except Exception as e2: | |
| logger.error(f"All extraction methods failed: {str(e2)}") | |
| return "I couldn't process that input. Please try again with a different query." | |
| except Exception as e: | |
| logger.error(f"Error in llm_chat_response: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Initialize the audio generation pipeline once at startup | |
| try: | |
| logger.info("Initializing KPipeline...") | |
| pipeline = KPipeline(lang_code='a') | |
| logger.info("KPipeline initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize KPipeline: {str(e)}") | |
| # The app starts regardless but logs the error | |
| async def generate_audio(request: TextImageRequest): | |
| """ | |
| Generate audio from text and optionally analyze an image. | |
| - If text is provided, it is used as input. | |
| - If an image is provided (base64), it is saved and a URL is generated for processing. | |
| - The LLM response is then converted to speech. | |
| """ | |
| try: | |
| logger.info("Received audio generation request") | |
| user_text = request.text if request.text is not None else "" | |
| if not user_text and request.image_base64: | |
| user_text = "Describe what you see in the image" | |
| elif not user_text and not request.image_base64: | |
| logger.error("Neither text nor image provided in request") | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Request must include either text or image_base64"} | |
| ) | |
| logger.info("Getting LLM response...") | |
| text_reply = llm_chat_response(user_text, request.image_base64) | |
| logger.info(f"LLM response: {text_reply}") | |
| validated_voice = request.validate_voice() | |
| if validated_voice != request.voice: | |
| logger.warning(f"Requested voice '{request.voice}' not available, using '{validated_voice}' instead") | |
| logger.info(f"Generating audio using voice={validated_voice}, speed={request.speed}") | |
| try: | |
| generator = pipeline( | |
| text_reply, | |
| voice=validated_voice, | |
| speed=request.speed, | |
| split_pattern=r'\n+' | |
| ) | |
| for i, (gs, ps, audio) in enumerate(generator): | |
| logger.info(f"Audio generated successfully: segment {i}") | |
| # Convert PyTorch tensor to NumPy array | |
| audio_numpy = audio.cpu().numpy() | |
| # Clip values to range [-1, 1] and convert to 16-bit PCM | |
| audio_numpy = np.clip(audio_numpy, -1, 1) | |
| pcm_data = (audio_numpy * 32767).astype(np.int16) | |
| raw_audio = pcm_data.tobytes() | |
| return Response( | |
| content=raw_audio, | |
| media_type="application/octet-stream", | |
| headers={ | |
| "Content-Disposition": 'attachment; filename="output.pcm"', | |
| "X-Sample-Rate": "24000", | |
| "X-Bits-Per-Sample": "16", | |
| "X-Endianness": "little" | |
| } | |
| ) | |
| logger.error("No audio segments generated") | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "No audio generated", "detail": "The pipeline did not produce any audio"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating audio: {str(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": "Audio generation failed", "detail": str(e)} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error in generate_audio endpoint: {str(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": "Internal server error", "detail": str(e)} | |
| ) | |
| async def root(): | |
| return {"message": "Welcome to the Text-to-Speech API with Vision Support. Use POST /generate with 'text' and optionally 'image_base64' for queries."} | |
| async def not_found_handler(request: Request, exc): | |
| return JSONResponse( | |
| status_code=404, | |
| content={"error": "Endpoint not found. Please use POST /generate for queries."} | |
| ) | |
| async def method_not_allowed_handler(request: Request, exc): | |
| return JSONResponse( | |
| status_code=405, | |
| content={"error": "Method not allowed. Please check the API documentation."} | |
| ) | |