Spaces:
Paused
Paused
| import os | |
| import shutil | |
| import fitz # PyMuPDF | |
| import logging | |
| import asyncio | |
| from uuid import uuid4 | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from telegram import Bot | |
| from telegram.constants import ParseMode | |
| from starlette.middleware.cors import CORSMiddleware | |
| # Logging Setup | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Telegram Bot Setup | |
| TELEGRAM_BOT_TOKEN = "8046554458:AAGxVuq_xrTXihLmxE_vcopsdg11zPvv1_I" | |
| TELEGRAM_CHAT_ID = "5173085859" | |
| bot = Bot(token=TELEGRAM_BOT_TOKEN) | |
| # Directories Setup | |
| UPLOAD_DIR = "uploads" | |
| IMAGES_DIR = "images" | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| os.makedirs(IMAGES_DIR, exist_ok=True) | |
| # FastAPI Setup | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def check_status(): | |
| return {"status": "API is working"} | |
| # ----- Original PDF Upload Endpoint (unchanged) ----- | |
| async def upload_pdf(file: UploadFile = File(...)): | |
| """ Upload PDF, convert to images, send to Telegram & cleanup """ | |
| file_id = str(uuid4()) | |
| file_path = os.path.join(UPLOAD_DIR, f"{file_id}.pdf") | |
| try: | |
| # Save uploaded PDF | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # Convert PDF to images & send to Telegram | |
| image_links = await convert_pdf_to_images(file_path) | |
| return {"status": "Images sent to Telegram", "images": image_links} | |
| except Exception as e: | |
| logger.error(f"Error processing PDF: {e}") | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| return {"status": "error", "message": str(e)} | |
| finally: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| async def convert_pdf_to_images(pdf_path): | |
| """ Converts PDF pages to images & sends them to Telegram """ | |
| loop = asyncio.get_running_loop() | |
| doc = await loop.run_in_executor(None, fitz.open, pdf_path) | |
| image_links = [] | |
| for i in range(len(doc)): | |
| page = await loop.run_in_executor(None, doc.load_page, i) | |
| image_path = os.path.join(IMAGES_DIR, f"{uuid4()}.png") | |
| pix = await loop.run_in_executor(None, page.get_pixmap) | |
| await loop.run_in_executor(None, pix.save, image_path) | |
| # Send image to Telegram with page number caption and get image link | |
| caption = f"Page {i+1}" | |
| image_link = await send_image_to_telegram(image_path, caption) | |
| image_links.append(image_link) | |
| return image_links | |
| # ------------------------------------------------------- | |
| # ----- New Endpoint: Image Upload with Caption ----- | |
| async def upload_image( | |
| file: UploadFile = File(...), | |
| caption: str = Form("Uploaded image") | |
| ): | |
| """ | |
| Upload an image along with a caption and send it to Telegram. | |
| """ | |
| file_id = str(uuid4()) | |
| extension = file.filename.split(".")[-1] | |
| image_path = os.path.join(IMAGES_DIR, f"{file_id}.{extension}") | |
| try: | |
| with open(image_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| image_link = await send_image_to_telegram(image_path, caption) | |
| return {"status": "Image sent to Telegram", "image_link": image_link} | |
| except Exception as e: | |
| logger.error(f"Error processing image: {e}") | |
| if os.path.exists(image_path): | |
| os.remove(image_path) | |
| return {"status": "error", "message": str(e)} | |
| # ------------------------------------------------------- | |
| async def send_image_to_telegram(image_path, caption): | |
| """ | |
| Send an image to Telegram asynchronously and return the image link. | |
| This function awaits the asynchronous Telegram API calls directly. | |
| """ | |
| # Open the image file and send the photo | |
| with open(image_path, "rb") as image_file: | |
| message = await bot.send_photo( | |
| chat_id=TELEGRAM_CHAT_ID, | |
| photo=image_file, | |
| caption=caption, | |
| parse_mode=ParseMode.HTML | |
| ) | |
| # Telegram returns a Message object with a 'photo' attribute (a list of PhotoSize objects) | |
| if message.photo: | |
| # Get the largest available photo size (last in the list) | |
| largest_photo = message.photo[-1] | |
| file_obj = await bot.get_file(largest_photo.file_id) | |
| image_url = f"{file_obj.file_path}" | |
| return image_url | |
| else: | |
| return "No photo found in message." | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info", reload=True) | |