| | import mangadex as md |
| | from fastapi import FastAPI, Query, HTTPException |
| | from fastapi.responses import JSONResponse, FileResponse |
| | from fastapi.staticfiles import StaticFiles |
| | from reportlab.pdfgen import canvas |
| | import tempfile |
| | import os |
| | from PIL import Image as PILImage |
| | import httpx |
| | import pikepdf |
| | from pathlib import Path |
| |
|
| | app = FastAPI() |
| |
|
| | |
| | app.mount("/static", StaticFiles(directory="static"), name="static") |
| |
|
| | |
| | Path("static").mkdir(exist_ok=True) |
| |
|
| | auth = md.auth.Auth() |
| |
|
| | @app.get("/mangadex") |
| | async def fetch_and_upload_pdf( |
| | chapter_id: str = Query(..., description="Comma-separated MangaDex chapter IDs to fetch images for"), |
| | as_pdf: bool = Query(True, description="Set to False to return image URLs without creating a PDF"), |
| | page: str = Query(None, description="Specific page(s) to fetch, e.g., '1' or '1-5'") |
| | ): |
| | """ |
| | Fetch images from specified chapters and optionally combine them into a single PDF. |
| | """ |
| | try: |
| | chapter_ids = chapter_id.split(",") |
| | all_images = [] |
| |
|
| | |
| | chapter = md.series.Chapter(auth=auth) |
| | for chapter_id in chapter_ids: |
| | chapter_data = chapter.get_chapter_by_id(chapter_id=chapter_id) |
| | images = chapter_data.fetch_chapter_images() |
| | if not images: |
| | raise HTTPException(status_code=404, detail=f"No images found for chapter ID {chapter_id}.") |
| | all_images.extend(images) |
| |
|
| | |
| | if page: |
| | pages = [] |
| | try: |
| | if '-' in page: |
| | start, end = map(int, page.split('-')) |
| | pages = list(range(start, end + 1)) |
| | else: |
| | pages = [int(page)] |
| | except ValueError: |
| | raise HTTPException(status_code=400, detail="Invalid page query format.") |
| | all_images = [all_images[i - 1] for i in pages if 1 <= i <= len(all_images)] |
| |
|
| | if not all_images: |
| | raise HTTPException(status_code=404, detail="No images found for the selected pages.") |
| |
|
| | |
| | if not as_pdf: |
| | return JSONResponse(content={"image_urls": all_images}) |
| |
|
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf: |
| | pdf_path = temp_pdf.name |
| | c = canvas.Canvas(pdf_path) |
| |
|
| | async with httpx.AsyncClient() as client: |
| | for image_url in all_images: |
| | response = await client.get(image_url) |
| | response.raise_for_status() |
| |
|
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_image: |
| | temp_image.write(response.content) |
| | temp_image_path = temp_image.name |
| |
|
| | |
| | img = PILImage.open(temp_image_path) |
| | img = img.convert("RGB") |
| |
|
| | |
| | converted_image_path = temp_image_path + "_converted.jpg" |
| | img.save(converted_image_path, "JPEG") |
| |
|
| | |
| | width, height = img.size |
| |
|
| | |
| | c.setPageSize((width, height)) |
| | c.drawImage(converted_image_path, 0, 0, width, height) |
| | c.showPage() |
| |
|
| | |
| | os.remove(temp_image_path) |
| | os.remove(converted_image_path) |
| |
|
| | c.save() |
| |
|
| | |
| | compressed_pdf_path = f"static/{chapter_id}_compressed.pdf" |
| | with pikepdf.open(pdf_path) as pdf: |
| | pdf.save(compressed_pdf_path) |
| |
|
| | |
| | os.remove(pdf_path) |
| |
|
| | |
| | return JSONResponse(content={"file_url": f"/static/{chapter_id}_compressed.pdf"}) |
| |
|
| | except Exception as e: |
| | return JSONResponse(content={"error": f"Failed to process the chapters: {str(e)}"}, status_code=500) |
| |
|
| | @app.get("/name") |
| | async def fetch_manga_details( |
| | title: str = Query(..., description="Manga title to search for") |
| | ): |
| | """ |
| | Fetch manga details by title from MangaDex. |
| | """ |
| | try: |
| | manga = md.series.Manga(auth=auth) |
| | manga_list = manga.get_manga_list(title=title) |
| |
|
| | if not manga_list: |
| | return JSONResponse( |
| | content={"error": f"No manga found with the title '{title}'"}, |
| | status_code=404 |
| | ) |
| |
|
| | manga_details = [] |
| | for m in manga_list: |
| | manga_details.append({ |
| | "id": m.manga_id, |
| | "title": m.title.get("en", "No English Title"), |
| | "alt_titles": m.altTitles, |
| | "description": m.description.get("en", "No English Description"), |
| | "status": m.status, |
| | "content_rating": m.contentRating, |
| | "last_volume": m.lastVolume, |
| | "last_chapter": m.lastChapter, |
| | "year": m.year, |
| | "original_language": m.originalLanguage, |
| | "created_at": str(m.createdAt) if m.createdAt else None, |
| | "uploaded_at": str(getattr(m, "uploadedAt", None)) |
| | }) |
| |
|
| | return JSONResponse(content={"manga_list": manga_details}) |
| |
|
| | except Exception as e: |
| | return JSONResponse( |
| | content={"error": f"Failed to fetch manga details: {str(e)}"}, |
| | status_code=500 |
| | ) |
| |
|
| | @app.get("/") |
| | def root(): |
| | """ |
| | Basic root endpoint. |
| | """ |
| | return {"message": "يعني ايه"} |
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run("main:app", host="0.0.0.0", port=8000, workers=4) |