pdf_analysis / app.py
randusertry's picture
Update app.py
629b912 verified
from fastapi.responses import FileResponse
import subprocess
import pdfplumber
import io
import os
from markitdown import MarkItDown
import shutil
from pathlib import Path
from fastapi import FastAPI, UploadFile, File, HTTPException
import soundfile as sf
import numpy as np
import os
import torch
from kokoro import KPipeline
from pydantic import BaseModel
import tempfile
md = MarkItDown()
app = FastAPI()
EPUB_CSS = """
body {
font-family: serif;
line-height: 1.5;
margin: 5%;
color: #e0e0e0;
background-color: #1a1a1a;
}
h1 {
text-align: center;
color: #f4a261;
text-transform: uppercase;
margin-bottom: 0.2em;
}
h3 {
border-bottom: 1px solid #333;
padding-bottom: 5px;
margin-top: 30px;
color: #e76f51;
}
blockquote {
font-style: italic;
border-left: 3px solid #e76f51;
padding-left: 15px;
color: #b0b0b0;
margin: 1.5em 10px;
}
li {
margin-bottom: 8px;
}
table {
width: 100%;
border-collapse: separate;
border-spacing: 0;
margin: 20px 0;
border: 1px solid #333;
border-radius: 8px;
overflow: hidden;
}
th {
background-color: #2d2d2d;
color: #f4a261;
font-weight: bold;
text-align: left;
padding: 12px;
border-bottom: 2px solid #3d3d3d;
}
td {
padding: 10px 12px;
border-bottom: 1px solid #2d2d2d;
vertical-align: top;
font-size: 0.95em;
}
tr:last-child td {
border-bottom: none;
}
tr:nth-child(even) {
background-color: #222222;
}
"""
@app.post("/extract")
async def extract(file: UploadFile = File(...)):
pdf_bytes = await file.read()
stream = []
with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
for page in pdf.pages:
# Extract text first
text = page.extract_text()
if text:
stream.append({
"type": "text",
"content": text
})
# Extract tables
tables = page.extract_tables()
for table in tables:
stream.append({
"type": "table",
"content": table
})
return {
"stream": stream
}
@app.post("/convert")
async def convert_to_markdown(file: UploadFile = File(...)):
temp_path = f"temp_{file.filename}"
try:
with open(temp_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
# MarkItDown conversion
result = md.convert(temp_path)
return {
"filename": file.filename,
"markdown": result.text_content
}
except Exception as e:
# Added missing HTTPException import
raise HTTPException(status_code=500, detail=str(e))
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
@app.post("/export-epub")
async def export_epub(file: UploadFile = File(...)):
base_name = os.path.splitext(file.filename)[0]
temp_input = f"temp_{file.filename}"
temp_css = "style.css"
output_epub = f"{base_name}.epub"
try:
# 1. Save the upload
with open(temp_input, "wb") as buffer:
content = await file.read()
buffer.write(content)
# 2. Use MarkItDown to get the Markdown content
result = md.convert(temp_input)
markdown_content = result.text_content
# 3. Create a temporary CSS file for Pandoc
with open(temp_css, "w") as f:
f.write(EPUB_CSS)
# 4. Call Pandoc to convert Markdown string to EPUB
# We pipe the markdown_content directly into pandoc's stdin
process = subprocess.Popen(
['pandoc', '--from=markdown', '--to=epub', '--css', temp_css,
'--metadata', f'title={base_name}', '-o', output_epub],
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=markdown_content)
if process.returncode != 0:
raise Exception(f"Pandoc Error: {stderr}")
# 5. Return the generated EPUB file
return FileResponse(
path=output_epub,
filename=output_epub,
media_type='application/epub+zip'
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
# Cleanup
for path in [temp_input, temp_css]:
if os.path.exists(path):
os.remove(path)
VOICE_MAP = {
"en": {"male": "bm_lewis", "female": "bf_emma", "code": "b"},
"es": {"male": "em_alex", "female": "ef_dora", "code": "e"},
"fr": {"male": "fr_male", "female": "fr_female", "code": "f"},
"pt": {"male": "pm_santa", "female": "pf_dora", "code": "p"}, # Portuguese
"it": {"male": "im_nicola", "female": "if_sara", "code": "i"},
}
print("Loading TTS Pipelines... please wait.")
PIPELINES = {
"b": KPipeline(lang_code='b'), # British English
"e": KPipeline(lang_code='e'), # Spanish
"f": KPipeline(lang_code='f'), # French
"p": KPipeline(lang_code='p'), # Portuguese
"i": KPipeline(lang_code='i'), # Italian
}
print("All pipelines loaded and ready!")
class TTSRequest(BaseModel):
text: str
language: str = "en"
gender: str = "male"
@app.post("/generate-audio-from-text")
async def generate_audio_text(data: TTSRequest):
output_filename = os.path.join(tempfile.gettempdir(), "speech_output.wav")
try:
# 1. Look up the language configuration
# Defaults to English (British) if the requested language isn't found
lang_config = VOICE_MAP.get(data.language.lower(), VOICE_MAP["en"])
phoneme_code = lang_config["code"]
# 2. Select the global pipeline
active_pipeline = PIPELINES.get(phoneme_code, PIPELINES["b"])
# 3. Select the voice (Male is the base default)
voice_name = lang_config.get(data.gender.lower(), lang_config["male"])
# 4. Generate audio chunks
generator = active_pipeline(
data.text,
voice=voice_name,
speed=1.1
)
audio_chunks = [audio for _, _, audio in generator if audio is not None]
if not audio_chunks:
raise HTTPException(status_code=400, detail="TTS generation failed")
# 5. Concatenate and Save
final_audio = np.concatenate(audio_chunks)
sf.write(output_filename, final_audio, 24000)
return FileResponse(
path=output_filename,
media_type="audio/wav",
filename=output_filename
)
except Exception as e:
print(f"Detailed Error: {e}")
raise HTTPException(status_code=500, detail=str(e))@app.get("/health")
async def health():
return {
"status": "ok"
}