from fastapi.responses import FileResponse import subprocess import pdfplumber import io import os from markitdown import MarkItDown import shutil from pathlib import Path from fastapi import FastAPI, UploadFile, File, HTTPException import soundfile as sf import numpy as np import os import torch from kokoro import KPipeline from pydantic import BaseModel import tempfile md = MarkItDown() app = FastAPI() EPUB_CSS = """ body { font-family: serif; line-height: 1.5; margin: 5%; color: #e0e0e0; background-color: #1a1a1a; } h1 { text-align: center; color: #f4a261; text-transform: uppercase; margin-bottom: 0.2em; } h3 { border-bottom: 1px solid #333; padding-bottom: 5px; margin-top: 30px; color: #e76f51; } blockquote { font-style: italic; border-left: 3px solid #e76f51; padding-left: 15px; color: #b0b0b0; margin: 1.5em 10px; } li { margin-bottom: 8px; } table { width: 100%; border-collapse: separate; border-spacing: 0; margin: 20px 0; border: 1px solid #333; border-radius: 8px; overflow: hidden; } th { background-color: #2d2d2d; color: #f4a261; font-weight: bold; text-align: left; padding: 12px; border-bottom: 2px solid #3d3d3d; } td { padding: 10px 12px; border-bottom: 1px solid #2d2d2d; vertical-align: top; font-size: 0.95em; } tr:last-child td { border-bottom: none; } tr:nth-child(even) { background-color: #222222; } """ @app.post("/extract") async def extract(file: UploadFile = File(...)): pdf_bytes = await file.read() stream = [] with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: for page in pdf.pages: # Extract text first text = page.extract_text() if text: stream.append({ "type": "text", "content": text }) # Extract tables tables = page.extract_tables() for table in tables: stream.append({ "type": "table", "content": table }) return { "stream": stream } @app.post("/convert") async def convert_to_markdown(file: UploadFile = File(...)): temp_path = f"temp_{file.filename}" try: with open(temp_path, "wb") as buffer: content = await file.read() buffer.write(content) # MarkItDown conversion result = md.convert(temp_path) return { "filename": file.filename, "markdown": result.text_content } except Exception as e: # Added missing HTTPException import raise HTTPException(status_code=500, detail=str(e)) finally: if os.path.exists(temp_path): os.remove(temp_path) @app.post("/export-epub") async def export_epub(file: UploadFile = File(...)): base_name = os.path.splitext(file.filename)[0] temp_input = f"temp_{file.filename}" temp_css = "style.css" output_epub = f"{base_name}.epub" try: # 1. Save the upload with open(temp_input, "wb") as buffer: content = await file.read() buffer.write(content) # 2. Use MarkItDown to get the Markdown content result = md.convert(temp_input) markdown_content = result.text_content # 3. Create a temporary CSS file for Pandoc with open(temp_css, "w") as f: f.write(EPUB_CSS) # 4. Call Pandoc to convert Markdown string to EPUB # We pipe the markdown_content directly into pandoc's stdin process = subprocess.Popen( ['pandoc', '--from=markdown', '--to=epub', '--css', temp_css, '--metadata', f'title={base_name}', '-o', output_epub], stdin=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = process.communicate(input=markdown_content) if process.returncode != 0: raise Exception(f"Pandoc Error: {stderr}") # 5. Return the generated EPUB file return FileResponse( path=output_epub, filename=output_epub, media_type='application/epub+zip' ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: # Cleanup for path in [temp_input, temp_css]: if os.path.exists(path): os.remove(path) VOICE_MAP = { "en": {"male": "bm_lewis", "female": "bf_emma", "code": "b"}, "es": {"male": "em_alex", "female": "ef_dora", "code": "e"}, "fr": {"male": "fr_male", "female": "fr_female", "code": "f"}, "pt": {"male": "pm_santa", "female": "pf_dora", "code": "p"}, # Portuguese "it": {"male": "im_nicola", "female": "if_sara", "code": "i"}, } print("Loading TTS Pipelines... please wait.") PIPELINES = { "b": KPipeline(lang_code='b'), # British English "e": KPipeline(lang_code='e'), # Spanish "f": KPipeline(lang_code='f'), # French "p": KPipeline(lang_code='p'), # Portuguese "i": KPipeline(lang_code='i'), # Italian } print("All pipelines loaded and ready!") class TTSRequest(BaseModel): text: str language: str = "en" gender: str = "male" @app.post("/generate-audio-from-text") async def generate_audio_text(data: TTSRequest): output_filename = os.path.join(tempfile.gettempdir(), "speech_output.wav") try: # 1. Look up the language configuration # Defaults to English (British) if the requested language isn't found lang_config = VOICE_MAP.get(data.language.lower(), VOICE_MAP["en"]) phoneme_code = lang_config["code"] # 2. Select the global pipeline active_pipeline = PIPELINES.get(phoneme_code, PIPELINES["b"]) # 3. Select the voice (Male is the base default) voice_name = lang_config.get(data.gender.lower(), lang_config["male"]) # 4. Generate audio chunks generator = active_pipeline( data.text, voice=voice_name, speed=1.1 ) audio_chunks = [audio for _, _, audio in generator if audio is not None] if not audio_chunks: raise HTTPException(status_code=400, detail="TTS generation failed") # 5. Concatenate and Save final_audio = np.concatenate(audio_chunks) sf.write(output_filename, final_audio, 24000) return FileResponse( path=output_filename, media_type="audio/wav", filename=output_filename ) except Exception as e: print(f"Detailed Error: {e}") raise HTTPException(status_code=500, detail=str(e))@app.get("/health") async def health(): return { "status": "ok" }