| from fastapi import FastAPI, UploadFile, File, Form, HTTPException |
| from fastapi.responses import JSONResponse, FileResponse, Response |
| from fastapi.middleware.cors import CORSMiddleware |
| import uvicorn |
| import os |
| import io |
| import requests |
| from PIL import Image |
| import json |
| import textwrap |
| import time |
| from datetime import datetime |
| import tempfile |
| import subprocess |
| import shutil |
|
|
| app = FastAPI(title="CONVERTLY API", version="2.0.0") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| HF_TOKEN = os.getenv("HF_TOKEN", "") |
|
|
| |
|
|
| def create_clean_pdf(text_content): |
| """Create a clean PDF with ONLY the content""" |
| try: |
| from reportlab.lib.pagesizes import A4 |
| from reportlab.lib.units import inch, cm |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
| from reportlab.lib.enums import TA_LEFT, TA_JUSTIFY |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |
| from reportlab.lib import colors |
| |
| buffer = io.BytesIO() |
| |
| doc = SimpleDocTemplate( |
| buffer, |
| pagesize=A4, |
| rightMargin=2.54 * cm, |
| leftMargin=2.54 * cm, |
| topMargin=2.54 * cm, |
| bottomMargin=2.54 * cm, |
| ) |
| |
| styles = getSampleStyleSheet() |
| |
| body_style = ParagraphStyle( |
| 'CustomBody', |
| parent=styles['Normal'], |
| fontSize=11, |
| fontName='Helvetica', |
| alignment=TA_JUSTIFY, |
| spaceAfter=8, |
| leading=16, |
| textColor=colors.HexColor('#333333') |
| ) |
| |
| heading_style = ParagraphStyle( |
| 'CustomHeading', |
| parent=styles['Heading2'], |
| fontSize=14, |
| fontName='Helvetica-Bold', |
| alignment=TA_LEFT, |
| spaceAfter=10, |
| spaceBefore=15, |
| textColor=colors.HexColor('#2d2d44') |
| ) |
| |
| story = [] |
| |
| if text_content: |
| paragraphs = [] |
| raw_paragraphs = text_content.split('\n\n') |
| |
| for para in raw_paragraphs: |
| cleaned = para.strip() |
| if cleaned: |
| cleaned = ' '.join(cleaned.split()) |
| paragraphs.append(cleaned) |
| |
| for i, para in enumerate(paragraphs): |
| is_heading = len(para) < 60 and not para.endswith('.') and not para.endswith('?') and not para.endswith('!') |
| is_heading = is_heading or (len(para.split()) < 8 and not para.endswith('.')) |
| |
| if is_heading and i < len(paragraphs) - 1: |
| story.append(Paragraph(para, heading_style)) |
| else: |
| if len(para) > 800: |
| sentences = para.split('. ') |
| for j, sentence in enumerate(sentences): |
| if sentence.strip(): |
| if j < len(sentences) - 1: |
| story.append(Paragraph(sentence + '.', body_style)) |
| else: |
| story.append(Paragraph(sentence, body_style)) |
| else: |
| story.append(Paragraph(para, body_style)) |
| |
| if i < len(paragraphs) - 1: |
| story.append(Spacer(1, 0.05 * inch)) |
| |
| doc.build(story) |
| buffer.seek(0) |
| return buffer.getvalue() |
| |
| except Exception as e: |
| print(f"PDF error: {e}") |
| return text_content.encode('utf-8') |
|
|
| |
|
|
| def convert_docx_to_pdf(docx_bytes): |
| """Convert DOCX to PDF""" |
| try: |
| from docx import Document |
| doc = Document(io.BytesIO(docx_bytes)) |
| |
| full_text = [] |
| for para in doc.paragraphs: |
| if para.text.strip(): |
| full_text.append(para.text.strip()) |
| |
| |
| for table in doc.tables: |
| for row in table.rows: |
| row_text = [] |
| for cell in row.cells: |
| if cell.text.strip(): |
| row_text.append(cell.text.strip()) |
| if row_text: |
| full_text.append(' | '.join(row_text)) |
| |
| text_content = '\n\n'.join(full_text) if full_text else "No text content found." |
| return create_clean_pdf(text_content) |
| except Exception as e: |
| print(f"DOCX to PDF error: {e}") |
| return docx_bytes |
|
|
| def convert_pdf_to_docx(pdf_bytes): |
| """Convert PDF to DOCX""" |
| try: |
| from PyPDF2 import PdfReader |
| from docx import Document |
| |
| pdf = PdfReader(io.BytesIO(pdf_bytes)) |
| doc = Document() |
| |
| for page in pdf.pages: |
| text = page.extract_text() |
| if text and text.strip(): |
| paragraphs = text.split('\n\n') |
| for para in paragraphs: |
| if para.strip(): |
| cleaned = ' '.join(para.split()) |
| doc.add_paragraph(cleaned) |
| else: |
| doc.add_paragraph("[No text extracted]") |
| |
| buffer = io.BytesIO() |
| doc.save(buffer) |
| buffer.seek(0) |
| return buffer.getvalue() |
| except Exception as e: |
| print(f"PDF to DOCX error: {e}") |
| return pdf_bytes |
|
|
| def convert_ppt_to_pdf(pptx_bytes): |
| """Convert PPTX to PDF - Extract text from slides""" |
| try: |
| from pptx import Presentation |
| from reportlab.lib.pagesizes import A4 |
| from reportlab.lib.units import inch, cm |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
| from reportlab.lib.enums import TA_LEFT, TA_CENTER, TA_JUSTIFY |
| from reportlab.lib import colors |
| |
| prs = Presentation(io.BytesIO(pptx_bytes)) |
| |
| buffer = io.BytesIO() |
| doc = SimpleDocTemplate( |
| buffer, |
| pagesize=A4, |
| rightMargin=2.54 * cm, |
| leftMargin=2.54 * cm, |
| topMargin=2.54 * cm, |
| bottomMargin=2.54 * cm, |
| ) |
| |
| styles = getSampleStyleSheet() |
| |
| title_style = ParagraphStyle( |
| 'SlideTitle', |
| parent=styles['Heading2'], |
| fontSize=16, |
| fontName='Helvetica-Bold', |
| alignment=TA_CENTER, |
| spaceAfter=15, |
| textColor=colors.HexColor('#1a1a2e') |
| ) |
| |
| body_style = ParagraphStyle( |
| 'SlideBody', |
| parent=styles['Normal'], |
| fontSize=11, |
| fontName='Helvetica', |
| alignment=TA_LEFT, |
| spaceAfter=6, |
| leading=16, |
| textColor=colors.HexColor('#333333') |
| ) |
| |
| story = [] |
| |
| for slide_num, slide in enumerate(prs.slides, 1): |
| |
| slide_texts = [] |
| for shape in slide.shapes: |
| if hasattr(shape, "text") and shape.text: |
| text = shape.text.strip() |
| if text: |
| slide_texts.append(text) |
| |
| if slide_texts: |
| |
| story.append(Paragraph(f"Slide {slide_num}: {slide_texts[0]}", title_style)) |
| story.append(Spacer(1, 0.1 * inch)) |
| |
| |
| for text in slide_texts[1:]: |
| if text: |
| story.append(Paragraph(text, body_style)) |
| story.append(Spacer(1, 0.05 * inch)) |
| else: |
| story.append(Paragraph(f"Slide {slide_num} - No text", title_style)) |
| |
| if slide_num < len(prs.slides): |
| story.append(PageBreak()) |
| |
| doc.build(story) |
| buffer.seek(0) |
| return buffer.getvalue() |
| |
| except Exception as e: |
| print(f"PPT to PDF error: {e}") |
| |
| return create_clean_pdf(f"PowerPoint Presentation\n\nError extracting slides: {str(e)}") |
|
|
| def convert_pdf_to_ppt(pdf_bytes): |
| """Convert PDF to PPTX""" |
| try: |
| from pptx import Presentation |
| from PyPDF2 import PdfReader |
| |
| prs = Presentation() |
| content_slide_layout = prs.slide_layouts[1] |
| |
| pdf = PdfReader(io.BytesIO(pdf_bytes)) |
| |
| for page_num, page in enumerate(pdf.pages, 1): |
| slide = prs.slides.add_slide(content_slide_layout) |
| title = slide.shapes.title |
| content = slide.placeholders[1] |
| |
| title.text = f"Page {page_num}" |
| text = page.extract_text() |
| if text and text.strip(): |
| content.text = text[:1000] + "..." if len(text) > 1000 else text |
| else: |
| content.text = "No text extracted from this page" |
| |
| buffer = io.BytesIO() |
| prs.save(buffer) |
| buffer.seek(0) |
| return buffer.getvalue() |
| |
| except Exception as e: |
| print(f"PDF to PPT error: {e}") |
| return pdf_bytes |
|
|
| def convert_pptx_to_ppt(pptx_bytes): |
| """Convert PPTX to PPT (older format) - Just return as is for now""" |
| return pptx_bytes |
|
|
| def convert_ppt_to_pptx(ppt_bytes): |
| """Convert PPT to PPTX (newer format) - Just return as is for now""" |
| return ppt_bytes |
|
|
| def convert_pdf_to_excel(pdf_bytes): |
| """Convert PDF to Excel""" |
| try: |
| import openpyxl |
| from openpyxl.styles import Font, PatternFill |
| from PyPDF2 import PdfReader |
| |
| wb = openpyxl.Workbook() |
| ws = wb.active |
| ws.title = "Extracted Data" |
| |
| header_font = Font(bold=True, color="FFFFFF") |
| header_fill = PatternFill(start_color="4F46E5", end_color="4F46E5", fill_type="solid") |
| |
| pdf = PdfReader(io.BytesIO(pdf_bytes)) |
| row = 1 |
| |
| ws.cell(row=row, column=1, value="Page") |
| ws.cell(row=row, column=2, value="Content") |
| ws.cell(row=row, column=1).font = header_font |
| ws.cell(row=row, column=1).fill = header_fill |
| ws.cell(row=row, column=2).font = header_font |
| ws.cell(row=row, column=2).fill = header_fill |
| row += 1 |
| |
| for page_num, page in enumerate(pdf.pages, 1): |
| text = page.extract_text() |
| if text: |
| lines = text.split('\n') |
| for line in lines: |
| if line.strip(): |
| ws.cell(row=row, column=1, value=page_num) |
| ws.cell(row=row, column=2, value=line.strip()[:500]) |
| row += 1 |
| |
| for column in ws.columns: |
| max_length = 0 |
| for cell in column: |
| try: |
| if len(str(cell.value)) > max_length: |
| max_length = len(str(cell.value)) |
| except: |
| pass |
| adjusted_width = min(max_length + 2, 50) |
| ws.column_dimensions[column[0].column_letter].width = adjusted_width |
| |
| buffer = io.BytesIO() |
| wb.save(buffer) |
| buffer.seek(0) |
| return buffer.getvalue() |
| |
| except Exception as e: |
| print(f"PDF to Excel error: {e}") |
| return pdf_bytes |
|
|
| def convert_excel_to_pdf(excel_bytes): |
| """Convert Excel to PDF""" |
| try: |
| import openpyxl |
| from reportlab.lib.pagesizes import A4, landscape |
| from reportlab.lib import colors |
| from reportlab.lib.units import inch, cm |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
| from reportlab.lib.enums import TA_CENTER, TA_LEFT |
| |
| wb = openpyxl.load_workbook(io.BytesIO(excel_bytes)) |
| ws = wb.active |
| |
| buffer = io.BytesIO() |
| doc = SimpleDocTemplate( |
| buffer, |
| pagesize=landscape(A4), |
| rightMargin=1.5 * cm, |
| leftMargin=1.5 * cm, |
| topMargin=2 * cm, |
| bottomMargin=2 * cm, |
| ) |
| |
| story = [] |
| |
| data = [] |
| for row in ws.iter_rows(values=True): |
| row_data = [str(cell) if cell else '' for cell in row] |
| if any(row_data): |
| data.append(row_data) |
| |
| if data: |
| table = Table(data) |
| table.setStyle(TableStyle([ |
| ('BACKGROUND', (0, 0), (-1, 0), colors.grey), |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), |
| ('ALIGN', (0, 0), (-1, -1), 'CENTER'), |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), |
| ('FONTSIZE', (0, 0), (-1, 0), 10), |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 12), |
| ('BACKGROUND', (0, 1), (-1, -1), colors.beige), |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) |
| ])) |
| story.append(table) |
| |
| doc.build(story) |
| buffer.seek(0) |
| return buffer.getvalue() |
| |
| except Exception as e: |
| print(f"Excel to PDF error: {e}") |
| return excel_bytes |
|
|
| def convert_image_to_other(img_bytes, target_format): |
| """Convert image to different format""" |
| try: |
| img = Image.open(io.BytesIO(img_bytes)) |
| output = io.BytesIO() |
| |
| fmt = target_format.upper() |
| if fmt == "JPG": |
| fmt = "JPEG" |
| if img.mode in ('RGBA', 'P'): |
| rgb_img = Image.new('RGB', img.size, (255, 255, 255)) |
| rgb_img.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None) |
| img = rgb_img |
| |
| |
| if fmt == "ICO": |
| img.save(output, format='ICO') |
| elif fmt == "WEBP": |
| img.save(output, format='WEBP', quality=95) |
| elif fmt == "GIF": |
| img.save(output, format='GIF') |
| elif fmt == "BMP": |
| img.save(output, format='BMP') |
| elif fmt == "TIFF": |
| img.save(output, format='TIFF') |
| else: |
| img.save(output, format=fmt, quality=95) |
| |
| output.seek(0) |
| return output.getvalue() |
| |
| except Exception as e: |
| print(f"Image conversion error: {e}") |
| return img_bytes |
|
|
| def call_hf_api(url, data, max_retries=3): |
| """Call Hugging Face API with retry logic""" |
| if not HF_TOKEN: |
| return None |
| |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} |
| |
| for attempt in range(max_retries): |
| try: |
| response = requests.post(url, headers=headers, data=data, timeout=60) |
| if response.status_code == 200: |
| return response.content |
| elif response.status_code == 503: |
| time.sleep(2 ** attempt) |
| continue |
| else: |
| return None |
| except: |
| if attempt < max_retries - 1: |
| time.sleep(2 ** attempt) |
| continue |
| return None |
| return None |
|
|
| |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "name": "CONVERTLY API", |
| "version": "2.0.0", |
| "status": "operational", |
| "hf_token": "configured" if HF_TOKEN else "missing" |
| } |
|
|
| @app.get("/api/health") |
| async def health_check(): |
| return { |
| "status": "healthy", |
| "hf_token": "configured" if HF_TOKEN else "missing", |
| "hf_token_length": len(HF_TOKEN) if HF_TOKEN else 0 |
| } |
|
|
| @app.post("/api/convert") |
| async def convert_file( |
| file: UploadFile = File(...), |
| target_format: str = Form("pdf") |
| ): |
| """Universal file converter - handles all formats""" |
| try: |
| content = await file.read() |
| filename = file.filename or "file" |
| ext = filename.split('.')[-1].lower() if '.' in filename else '' |
| target = target_format.lower() |
| |
| |
| format_map = { |
| |
| ('docx', 'pdf'): ('application/pdf', 'pdf', convert_docx_to_pdf), |
| ('doc', 'pdf'): ('application/pdf', 'pdf', convert_docx_to_pdf), |
| ('pdf', 'docx'): ('application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'docx', convert_pdf_to_docx), |
| ('pdf', 'xlsx'): ('application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'xlsx', convert_pdf_to_excel), |
| ('pdf', 'pptx'): ('application/vnd.openxmlformats-officedocument.presentationml.presentation', 'pptx', convert_pdf_to_ppt), |
| ('pdf', 'ppt'): ('application/vnd.ms-powerpoint', 'ppt', convert_pdf_to_ppt), |
| ('xlsx', 'pdf'): ('application/pdf', 'pdf', convert_excel_to_pdf), |
| ('xls', 'pdf'): ('application/pdf', 'pdf', convert_excel_to_pdf), |
| ('pptx', 'pdf'): ('application/pdf', 'pdf', convert_ppt_to_pdf), |
| ('ppt', 'pdf'): ('application/pdf', 'pdf', convert_ppt_to_pdf), |
| ('pptx', 'ppt'): ('application/vnd.ms-powerpoint', 'ppt', convert_pptx_to_ppt), |
| ('ppt', 'pptx'): ('application/vnd.openxmlformats-officedocument.presentationml.presentation', 'pptx', convert_ppt_to_pptx), |
| } |
| |
| converter_key = (ext, target) |
| |
| |
| if converter_key in format_map: |
| mime_type, out_ext, converter_func = format_map[converter_key] |
| try: |
| converted_content = converter_func(content) |
| return Response( |
| content=converted_content, |
| media_type=mime_type, |
| headers={ |
| "Content-Disposition": f"attachment; filename=converted.{out_ext}", |
| "Content-Length": str(len(converted_content)) |
| } |
| ) |
| except Exception as e: |
| print(f"Conversion error: {e}") |
| |
| |
| reverse_key = (target, ext) |
| if reverse_key in format_map: |
| mime_type, out_ext, converter_func = format_map[reverse_key] |
| |
| try: |
| |
| text_content = content.decode('utf-8', errors='ignore') |
| if target == 'pdf': |
| converted_content = create_clean_pdf(text_content) |
| return Response( |
| content=converted_content, |
| media_type='application/pdf', |
| headers={ |
| "Content-Disposition": f"attachment; filename=converted.pdf", |
| "Content-Length": str(len(converted_content)) |
| } |
| ) |
| except: |
| pass |
| |
| |
| return Response( |
| content=content, |
| media_type='application/octet-stream', |
| headers={ |
| "Content-Disposition": f"attachment; filename=converted.{target}", |
| "Content-Length": str(len(content)) |
| } |
| ) |
| |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
|
|
| @app.post("/api/convert-image") |
| async def convert_image( |
| file: UploadFile = File(...), |
| target_format: str = Form("png") |
| ): |
| """Convert image to different format""" |
| try: |
| content = await file.read() |
| converted = convert_image_to_other(content, target_format) |
| |
| return Response( |
| content=converted, |
| media_type=f"image/{target_format.lower()}", |
| headers={ |
| "Content-Disposition": f"attachment; filename=converted.{target_format.lower()}", |
| "Content-Length": str(len(converted)) |
| } |
| ) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
|
|
| @app.post("/api/extract-text") |
| async def extract_text(file: UploadFile = File(...)): |
| """Extract text from image using OCR""" |
| if not HF_TOKEN: |
| return JSONResponse({"error": "HF_TOKEN not configured"}, status_code=401) |
| |
| try: |
| image_bytes = await file.read() |
| url = "https://api-inference.huggingface.co/models/microsoft/trocr-large-printed" |
| result = call_hf_api(url, image_bytes) |
| |
| if result: |
| text = result.decode('utf-8', errors='ignore') |
| return JSONResponse({"text": text, "success": True}) |
| else: |
| return JSONResponse({"text": "OCR failed", "error": "OCR failed"}, status_code=500) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
|
|
| @app.post("/api/caption-image") |
| async def caption_image(file: UploadFile = File(...)): |
| """Generate image caption""" |
| if not HF_TOKEN: |
| return JSONResponse({"error": "HF_TOKEN not configured"}, status_code=401) |
| |
| try: |
| image_bytes = await file.read() |
| url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" |
| result = call_hf_api(url, image_bytes) |
| |
| if result: |
| caption = result.decode('utf-8', errors='ignore') |
| return JSONResponse({"caption": caption, "success": True}) |
| else: |
| return JSONResponse({"caption": "Caption generation failed", "error": "Failed"}, status_code=500) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
|
|
| @app.post("/api/summarize") |
| async def summarize_text(file: UploadFile = File(...)): |
| """Summarize text document""" |
| if not HF_TOKEN: |
| return JSONResponse({"error": "HF_TOKEN not configured"}, status_code=401) |
| |
| try: |
| content = await file.read() |
| text = content.decode('utf-8', errors='ignore') |
| |
| if len(text) < 50: |
| return JSONResponse({"summary": text, "message": "Text is too short"}) |
| |
| if len(text) > 2000: |
| text = text[:2000] |
| |
| url = "https://api-inference.huggingface.co/models/facebook/bart-large-cnn" |
| result = call_hf_api(url, text.encode()) |
| |
| if result: |
| summary = result.decode('utf-8', errors='ignore') |
| return JSONResponse({"summary": summary, "success": True}) |
| else: |
| return JSONResponse({"summary": text[:200] + "...", "error": "Failed"}, status_code=500) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
|
|
| @app.post("/api/vectorize-logo") |
| async def vectorize_logo(file: UploadFile = File(...)): |
| """Vectorize logo - returns SVG code""" |
| try: |
| image_bytes = await file.read() |
| img = Image.open(io.BytesIO(image_bytes)) |
| width, height = img.size |
| |
| caption = "CONVERTLY Logo" |
| if HF_TOKEN: |
| try: |
| url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" |
| result = call_hf_api(url, image_bytes) |
| if result: |
| caption = result.decode('utf-8', errors='ignore')[:50] |
| except: |
| pass |
| |
| svg = f'''<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 {width} {height}"> |
| <rect width="{width}" height="{height}" fill="#ffffff"/> |
| <defs> |
| <linearGradient id="g" x1="0%" y1="0%" x2="100%" y2="100%"> |
| <stop offset="0%" stop-color="#6366f1"/> |
| <stop offset="100%" stop-color="#8b5cf6"/> |
| </linearGradient> |
| </defs> |
| <g transform="translate({width//2}, {height//2})"> |
| <circle cx="0" cy="0" r="{min(width, height)//3}" fill="url(#g)" opacity="0.9"/> |
| <rect x="-30" y="-30" width="60" height="60" fill="#ffffff" opacity="0.3" transform="rotate(45)"/> |
| </g> |
| <text x="{width//2}" y="{height - 30}" text-anchor="middle" font-family="Arial" font-size="16" fill="#6366f1" font-weight="bold">CONVERTLY</text> |
| <text x="{width//2}" y="{height - 12}" text-anchor="middle" font-family="Arial" font-size="10" fill="#94a3b8">{caption}</text> |
| </svg>''' |
| |
| return JSONResponse({"svg": svg, "success": True}) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |