manga / main.py
Akane710's picture
Update main.py
3ec3597 verified
import mangadex as md
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from reportlab.pdfgen import canvas
import tempfile
import os
from PIL import Image as PILImage
import httpx
import pikepdf
from pathlib import Path
app = FastAPI()
# Serve static files from the "static" directory
app.mount("/static", StaticFiles(directory="static"), name="static")
# Ensure the "static" directory exists
Path("static").mkdir(exist_ok=True)
auth = md.auth.Auth()
@app.get("/mangadex")
async def fetch_and_upload_pdf(
chapter_id: str = Query(..., description="Comma-separated MangaDex chapter IDs to fetch images for"),
as_pdf: bool = Query(True, description="Set to False to return image URLs without creating a PDF"),
page: str = Query(None, description="Specific page(s) to fetch, e.g., '1' or '1-5'")
):
"""
Fetch images from specified chapters and optionally combine them into a single PDF.
"""
try:
chapter_ids = chapter_id.split(",")
all_images = []
# Fetch images for each chapter
chapter = md.series.Chapter(auth=auth)
for chapter_id in chapter_ids:
chapter_data = chapter.get_chapter_by_id(chapter_id=chapter_id)
images = chapter_data.fetch_chapter_images()
if not images:
raise HTTPException(status_code=404, detail=f"No images found for chapter ID {chapter_id}.")
all_images.extend(images)
# Filter images by page if applicable
if page:
pages = []
try:
if '-' in page:
start, end = map(int, page.split('-'))
pages = list(range(start, end + 1))
else:
pages = [int(page)]
except ValueError:
raise HTTPException(status_code=400, detail="Invalid page query format.")
all_images = [all_images[i - 1] for i in pages if 1 <= i <= len(all_images)]
if not all_images:
raise HTTPException(status_code=404, detail="No images found for the selected pages.")
# Return image URLs if not converting to PDF
if not as_pdf:
return JSONResponse(content={"image_urls": all_images})
# Generate PDF from images
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
pdf_path = temp_pdf.name
c = canvas.Canvas(pdf_path)
async with httpx.AsyncClient() as client:
for image_url in all_images:
response = await client.get(image_url)
response.raise_for_status()
# Save image to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_image:
temp_image.write(response.content)
temp_image_path = temp_image.name
# Process image
img = PILImage.open(temp_image_path)
img = img.convert("RGB")
# Save a converted image (JPEG format)
converted_image_path = temp_image_path + "_converted.jpg"
img.save(converted_image_path, "JPEG")
# Get image dimensions
width, height = img.size
# Add image to PDF
c.setPageSize((width, height))
c.drawImage(converted_image_path, 0, 0, width, height)
c.showPage()
# Clean up temporary files
os.remove(temp_image_path)
os.remove(converted_image_path)
c.save()
# Compress the PDF
compressed_pdf_path = f"static/{chapter_id}_compressed.pdf"
with pikepdf.open(pdf_path) as pdf:
pdf.save(compressed_pdf_path)
# Clean up temporary PDF file
os.remove(pdf_path)
# Return the URL to the compressed PDF
return JSONResponse(content={"file_url": f"/static/{chapter_id}_compressed.pdf"})
except Exception as e:
return JSONResponse(content={"error": f"Failed to process the chapters: {str(e)}"}, status_code=500)
@app.get("/name")
async def fetch_manga_details(
title: str = Query(..., description="Manga title to search for")
):
"""
Fetch manga details by title from MangaDex.
"""
try:
manga = md.series.Manga(auth=auth)
manga_list = manga.get_manga_list(title=title)
if not manga_list:
return JSONResponse(
content={"error": f"No manga found with the title '{title}'"},
status_code=404
)
manga_details = []
for m in manga_list:
manga_details.append({
"id": m.manga_id,
"title": m.title.get("en", "No English Title"),
"alt_titles": m.altTitles,
"description": m.description.get("en", "No English Description"),
"status": m.status,
"content_rating": m.contentRating,
"last_volume": m.lastVolume,
"last_chapter": m.lastChapter,
"year": m.year,
"original_language": m.originalLanguage,
"created_at": str(m.createdAt) if m.createdAt else None,
"uploaded_at": str(getattr(m, "uploadedAt", None))
})
return JSONResponse(content={"manga_list": manga_details})
except Exception as e:
return JSONResponse(
content={"error": f"Failed to fetch manga details: {str(e)}"},
status_code=500
)
@app.get("/")
def root():
"""
Basic root endpoint.
"""
return {"message": "يعني ايه"}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, workers=4)