Spaces:
Sleeping
Sleeping
| from fastapi.responses import FileResponse | |
| import subprocess | |
| import pdfplumber | |
| import io | |
| import os | |
| from markitdown import MarkItDown | |
| import shutil | |
| from pathlib import Path | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| import soundfile as sf | |
| import numpy as np | |
| import os | |
| import torch | |
| from kokoro import KPipeline | |
| from pydantic import BaseModel | |
| import tempfile | |
| md = MarkItDown() | |
| app = FastAPI() | |
| EPUB_CSS = """ | |
| body { | |
| font-family: serif; | |
| line-height: 1.5; | |
| margin: 5%; | |
| color: #e0e0e0; | |
| background-color: #1a1a1a; | |
| } | |
| h1 { | |
| text-align: center; | |
| color: #f4a261; | |
| text-transform: uppercase; | |
| margin-bottom: 0.2em; | |
| } | |
| h3 { | |
| border-bottom: 1px solid #333; | |
| padding-bottom: 5px; | |
| margin-top: 30px; | |
| color: #e76f51; | |
| } | |
| blockquote { | |
| font-style: italic; | |
| border-left: 3px solid #e76f51; | |
| padding-left: 15px; | |
| color: #b0b0b0; | |
| margin: 1.5em 10px; | |
| } | |
| li { | |
| margin-bottom: 8px; | |
| } | |
| table { | |
| width: 100%; | |
| border-collapse: separate; | |
| border-spacing: 0; | |
| margin: 20px 0; | |
| border: 1px solid #333; | |
| border-radius: 8px; | |
| overflow: hidden; | |
| } | |
| th { | |
| background-color: #2d2d2d; | |
| color: #f4a261; | |
| font-weight: bold; | |
| text-align: left; | |
| padding: 12px; | |
| border-bottom: 2px solid #3d3d3d; | |
| } | |
| td { | |
| padding: 10px 12px; | |
| border-bottom: 1px solid #2d2d2d; | |
| vertical-align: top; | |
| font-size: 0.95em; | |
| } | |
| tr:last-child td { | |
| border-bottom: none; | |
| } | |
| tr:nth-child(even) { | |
| background-color: #222222; | |
| } | |
| """ | |
| async def extract(file: UploadFile = File(...)): | |
| pdf_bytes = await file.read() | |
| stream = [] | |
| with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: | |
| for page in pdf.pages: | |
| # Extract text first | |
| text = page.extract_text() | |
| if text: | |
| stream.append({ | |
| "type": "text", | |
| "content": text | |
| }) | |
| # Extract tables | |
| tables = page.extract_tables() | |
| for table in tables: | |
| stream.append({ | |
| "type": "table", | |
| "content": table | |
| }) | |
| return { | |
| "stream": stream | |
| } | |
| async def convert_to_markdown(file: UploadFile = File(...)): | |
| temp_path = f"temp_{file.filename}" | |
| try: | |
| with open(temp_path, "wb") as buffer: | |
| content = await file.read() | |
| buffer.write(content) | |
| # MarkItDown conversion | |
| result = md.convert(temp_path) | |
| return { | |
| "filename": file.filename, | |
| "markdown": result.text_content | |
| } | |
| except Exception as e: | |
| # Added missing HTTPException import | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| async def export_epub(file: UploadFile = File(...)): | |
| base_name = os.path.splitext(file.filename)[0] | |
| temp_input = f"temp_{file.filename}" | |
| temp_css = "style.css" | |
| output_epub = f"{base_name}.epub" | |
| try: | |
| # 1. Save the upload | |
| with open(temp_input, "wb") as buffer: | |
| content = await file.read() | |
| buffer.write(content) | |
| # 2. Use MarkItDown to get the Markdown content | |
| result = md.convert(temp_input) | |
| markdown_content = result.text_content | |
| # 3. Create a temporary CSS file for Pandoc | |
| with open(temp_css, "w") as f: | |
| f.write(EPUB_CSS) | |
| # 4. Call Pandoc to convert Markdown string to EPUB | |
| # We pipe the markdown_content directly into pandoc's stdin | |
| process = subprocess.Popen( | |
| ['pandoc', '--from=markdown', '--to=epub', '--css', temp_css, | |
| '--metadata', f'title={base_name}', '-o', output_epub], | |
| stdin=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| stdout, stderr = process.communicate(input=markdown_content) | |
| if process.returncode != 0: | |
| raise Exception(f"Pandoc Error: {stderr}") | |
| # 5. Return the generated EPUB file | |
| return FileResponse( | |
| path=output_epub, | |
| filename=output_epub, | |
| media_type='application/epub+zip' | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| # Cleanup | |
| for path in [temp_input, temp_css]: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| VOICE_MAP = { | |
| "en": {"male": "bm_lewis", "female": "bf_emma", "code": "b"}, | |
| "es": {"male": "em_alex", "female": "ef_dora", "code": "e"}, | |
| "fr": {"male": "fr_male", "female": "fr_female", "code": "f"}, | |
| "pt": {"male": "pm_santa", "female": "pf_dora", "code": "p"}, # Portuguese | |
| "it": {"male": "im_nicola", "female": "if_sara", "code": "i"}, | |
| } | |
| print("Loading TTS Pipelines... please wait.") | |
| PIPELINES = { | |
| "b": KPipeline(lang_code='b'), # British English | |
| "e": KPipeline(lang_code='e'), # Spanish | |
| "f": KPipeline(lang_code='f'), # French | |
| "p": KPipeline(lang_code='p'), # Portuguese | |
| "i": KPipeline(lang_code='i'), # Italian | |
| } | |
| print("All pipelines loaded and ready!") | |
| class TTSRequest(BaseModel): | |
| text: str | |
| language: str = "en" | |
| gender: str = "male" | |
| async def generate_audio_text(data: TTSRequest): | |
| output_filename = os.path.join(tempfile.gettempdir(), "speech_output.wav") | |
| try: | |
| # 1. Look up the language configuration | |
| # Defaults to English (British) if the requested language isn't found | |
| lang_config = VOICE_MAP.get(data.language.lower(), VOICE_MAP["en"]) | |
| phoneme_code = lang_config["code"] | |
| # 2. Select the global pipeline | |
| active_pipeline = PIPELINES.get(phoneme_code, PIPELINES["b"]) | |
| # 3. Select the voice (Male is the base default) | |
| voice_name = lang_config.get(data.gender.lower(), lang_config["male"]) | |
| # 4. Generate audio chunks | |
| generator = active_pipeline( | |
| data.text, | |
| voice=voice_name, | |
| speed=1.1 | |
| ) | |
| audio_chunks = [audio for _, _, audio in generator if audio is not None] | |
| if not audio_chunks: | |
| raise HTTPException(status_code=400, detail="TTS generation failed") | |
| # 5. Concatenate and Save | |
| final_audio = np.concatenate(audio_chunks) | |
| sf.write(output_filename, final_audio, 24000) | |
| return FileResponse( | |
| path=output_filename, | |
| media_type="audio/wav", | |
| filename=output_filename | |
| ) | |
| except Exception as e: | |
| print(f"Detailed Error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e))@app.get("/health") | |
| async def health(): | |
| return { | |
| "status": "ok" | |
| } |