Spaces:
Sleeping
Sleeping
| """ | |
| Document Conversion API for Hugging Face Spaces | |
| Handles ALL PDF operations: Word↔PDF, Image↔Text, PDF Merge/Split | |
| Self-hosted, FREE forever with unlimited usage! | |
| """ | |
| from flask import Flask, request, send_file, jsonify | |
| from flask_cors import CORS | |
| import subprocess | |
| import os | |
| import tempfile | |
| import uuid | |
| from pathlib import Path | |
| import logging | |
| from PyPDF2 import PdfReader, PdfWriter, PdfMerger | |
| import pytesseract | |
| from PIL import Image | |
| from pdf2docx import Converter | |
| from io import BytesIO | |
| import zipfile | |
| app = Flask(__name__) | |
| CORS(app, origins=["*"]) # In production, replace * with your Vercel domain | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Ensure LibreOffice is available | |
| def check_libreoffice(): | |
| """Check if LibreOffice is installed""" | |
| try: | |
| result = subprocess.run( | |
| ['libreoffice', '--version'], | |
| capture_output=True, | |
| text=True, | |
| timeout=5 | |
| ) | |
| logger.info(f"LibreOffice version: {result.stdout.strip()}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"LibreOffice not found: {e}") | |
| return False | |
| # Ensure Tesseract is available | |
| def check_tesseract(): | |
| """Check if Tesseract OCR is installed""" | |
| try: | |
| result = subprocess.run( | |
| ['tesseract', '--version'], | |
| capture_output=True, | |
| text=True, | |
| timeout=5 | |
| ) | |
| logger.info(f"Tesseract version: {result.stdout.strip()}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Tesseract not found: {e}") | |
| return False | |
| def root(): | |
| """Root endpoint with API info""" | |
| lo_status = "Available ✅" if check_libreoffice() else "Not Found ❌" | |
| tesseract_status = "Available ✅" if check_tesseract() else "Not Found ❌" | |
| return { | |
| 'name': 'Document Conversion API', | |
| 'version': '2.0.0', | |
| 'backend': { | |
| 'LibreOffice': lo_status, | |
| 'Tesseract OCR': tesseract_status, | |
| 'PyPDF2': 'Available ✅' | |
| }, | |
| 'platform': 'Hugging Face Spaces', | |
| 'features': '100% FREE forever, Unlimited usage, No API keys needed', | |
| 'operations': { | |
| 'word-to-pdf': 'Convert Word/DOCX to PDF', | |
| 'pdf-to-word': 'Convert PDF to Word/DOCX (OCR)', | |
| 'image-to-text': 'Extract text from images (OCR)', | |
| 'pdf-split': 'Split PDF into separate pages', | |
| 'pdf-merge': 'Merge multiple PDFs' | |
| }, | |
| 'endpoints': { | |
| 'POST /convert': 'Word to PDF conversion', | |
| 'POST /pdf-to-word': 'PDF to Word conversion', | |
| 'POST /image-to-text': 'Image OCR to text', | |
| 'POST /pdf-split': 'Split PDF pages', | |
| 'POST /pdf-merge': 'Merge multiple PDFs', | |
| 'GET /health': 'Health check' | |
| } | |
| }, 200 | |
| def health_check(): | |
| """Health check endpoint""" | |
| lo_available = check_libreoffice() | |
| tesseract_available = check_tesseract() | |
| return { | |
| 'status': 'healthy' if (lo_available and tesseract_available) else 'degraded', | |
| 'services': { | |
| 'libreoffice': lo_available, | |
| 'tesseract': tesseract_available, | |
| 'pypdf2': True | |
| }, | |
| 'message': 'All services running' if (lo_available and tesseract_available) else 'Some services unavailable' | |
| }, 200 if (lo_available and tesseract_available) else 503 | |
| def word_to_pdf(): | |
| """Convert Word/Document to PDF using LibreOffice""" | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file provided'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'Empty filename'}), 400 | |
| # Get file extension | |
| file_ext = Path(file.filename).suffix.lower() | |
| supported_exts = ['.docx', '.doc', '.odt', '.rtf', '.txt'] | |
| if file_ext not in supported_exts: | |
| return jsonify({ | |
| 'error': f'Unsupported file format: {file_ext}', | |
| 'supported': supported_exts | |
| }), 400 | |
| # Create unique temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| unique_id = str(uuid.uuid4()) | |
| try: | |
| input_filename = f"input_{unique_id}{file_ext}" | |
| input_path = os.path.join(temp_dir, input_filename) | |
| file.save(input_path) | |
| logger.info(f"Converting {input_filename} to PDF...") | |
| # Convert using LibreOffice | |
| cmd = [ | |
| 'libreoffice', | |
| '--headless', | |
| '--convert-to', 'pdf', | |
| '--outdir', temp_dir, | |
| input_path | |
| ] | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=30, | |
| cwd=temp_dir | |
| ) | |
| if result.returncode != 0: | |
| logger.error(f"LibreOffice error: {result.stderr}") | |
| return jsonify({ | |
| 'error': 'Conversion failed', | |
| 'details': result.stderr | |
| }), 500 | |
| # Find output PDF | |
| output_filename = input_filename.rsplit('.', 1)[0] + '.pdf' | |
| output_path = os.path.join(temp_dir, output_filename) | |
| if not os.path.exists(output_path): | |
| return jsonify({ | |
| 'error': 'PDF file not created', | |
| 'details': 'LibreOffice did not produce output file' | |
| }), 500 | |
| file_size = os.path.getsize(output_path) | |
| logger.info(f"Conversion successful! Output: {output_filename} ({file_size} bytes)") | |
| return send_file( | |
| output_path, | |
| mimetype='application/pdf', | |
| as_attachment=True, | |
| download_name='converted.pdf' | |
| ) | |
| except subprocess.TimeoutExpired: | |
| logger.error("Conversion timeout") | |
| return jsonify({'error': 'Conversion timeout (>30s)'}), 504 | |
| except Exception as e: | |
| logger.error(f"Conversion error: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Conversion failed', | |
| 'details': str(e) | |
| }), 500 | |
| finally: | |
| try: | |
| import shutil | |
| shutil.rmtree(temp_dir) | |
| except Exception as e: | |
| logger.warning(f"Cleanup warning: {e}") | |
| def pdf_to_word(): | |
| """Convert PDF to Word/DOCX using pdf2docx""" | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file provided'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'Empty filename'}), 400 | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| input_path = os.path.join(temp_dir, 'input.pdf') | |
| output_path = os.path.join(temp_dir, 'output.docx') | |
| file.save(input_path) | |
| logger.info("Converting PDF to DOCX...") | |
| # Convert PDF to DOCX | |
| cv = Converter(input_path) | |
| cv.convert(output_path) | |
| cv.close() | |
| if not os.path.exists(output_path): | |
| return jsonify({'error': 'DOCX file not created'}), 500 | |
| file_size = os.path.getsize(output_path) | |
| logger.info(f"PDF to Word conversion successful! ({file_size} bytes)") | |
| return send_file( | |
| output_path, | |
| mimetype='application/vnd.openxmlformats-officedocument.wordprocessingml.document', | |
| as_attachment=True, | |
| download_name='converted.docx' | |
| ) | |
| except Exception as e: | |
| logger.error(f"PDF to Word error: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Conversion failed', | |
| 'details': str(e) | |
| }), 500 | |
| finally: | |
| try: | |
| import shutil | |
| shutil.rmtree(temp_dir) | |
| except Exception as e: | |
| logger.warning(f"Cleanup warning: {e}") | |
| def image_to_text(): | |
| """Extract text from image using Tesseract OCR with smart preprocessing""" | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file provided'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'Empty filename'}), 400 | |
| try: | |
| # Read image | |
| image = Image.open(file.stream) | |
| logger.info(f"Extracting text from image ({image.size})...") | |
| # Convert to RGB if necessary | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| import numpy as np | |
| import cv2 | |
| img_array = np.array(image) | |
| # Try multiple OCR strategies and pick the best result | |
| results = [] | |
| # Strategy 1: Original image (best for colored text, graphics) | |
| try: | |
| config1 = r'--oem 3 --psm 3' | |
| text1 = pytesseract.image_to_string(image, config=config1, lang='eng') | |
| results.append(('original', text1, len(text1.strip()))) | |
| except Exception as e: | |
| logger.warning(f"Strategy 1 failed: {e}") | |
| # Strategy 2: Grayscale (good for normal documents) | |
| try: | |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| gray_img = Image.fromarray(gray) | |
| config2 = r'--oem 3 --psm 6' | |
| text2 = pytesseract.image_to_string(gray_img, config=config2, lang='eng') | |
| results.append(('grayscale', text2, len(text2.strip()))) | |
| except Exception as e: | |
| logger.warning(f"Strategy 2 failed: {e}") | |
| # Strategy 3: High contrast (for faded text) | |
| try: | |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| # Increase contrast | |
| alpha = 1.5 # Contrast control | |
| beta = 0 # Brightness control | |
| contrast = cv2.convertScaleAbs(gray, alpha=alpha, beta=beta) | |
| contrast_img = Image.fromarray(contrast) | |
| config3 = r'--oem 3 --psm 3' | |
| text3 = pytesseract.image_to_string(contrast_img, config=config3, lang='eng') | |
| results.append(('contrast', text3, len(text3.strip()))) | |
| except Exception as e: | |
| logger.warning(f"Strategy 3 failed: {e}") | |
| # Pick the result with the most content | |
| if not results: | |
| raise Exception("All OCR strategies failed") | |
| # Sort by text length (more text usually means better recognition) | |
| results.sort(key=lambda x: x[2], reverse=True) | |
| best_strategy, raw_text, _ = results[0] | |
| logger.info(f"Best strategy: {best_strategy} with {len(raw_text)} characters") | |
| # Clean up the text | |
| lines = [] | |
| for line in raw_text.split('\n'): | |
| # Strip whitespace | |
| line = line.strip() | |
| # Skip empty lines | |
| if not line: | |
| continue | |
| # Skip lines that are mostly noise (too many special chars) | |
| alnum = sum(c.isalnum() or c in ' .,!?-$%()' for c in line) | |
| if len(line) > 0 and (alnum / len(line)) > 0.4: | |
| lines.append(line) | |
| text = '\n'.join(lines) | |
| # If result is still too short, try without filtering | |
| if len(text) < 20 and len(raw_text.strip()) > len(text): | |
| text = raw_text.strip() | |
| logger.info(f"OCR successful! Extracted {len(text)} characters") | |
| # Create text file | |
| text_content = f"Extracted Text from {file.filename}\n\n{text}" | |
| # Return as downloadable text file | |
| buffer = BytesIO() | |
| buffer.write(text_content.encode('utf-8')) | |
| buffer.seek(0) | |
| return send_file( | |
| buffer, | |
| mimetype='text/plain', | |
| as_attachment=True, | |
| download_name='extracted-text.txt' | |
| ) | |
| except Exception as e: | |
| logger.error(f"OCR error: {str(e)}") | |
| return jsonify({ | |
| 'error': 'Text extraction failed', | |
| 'details': str(e) | |
| }), 500 | |
| def pdf_split(): | |
| """Split PDF into separate pages""" | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file provided'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'Empty filename'}), 400 | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| # Read PDF | |
| pdf_reader = PdfReader(file.stream) | |
| total_pages = len(pdf_reader.pages) | |
| logger.info(f"Splitting PDF ({total_pages} pages)...") | |
| # Create ZIP file for all pages | |
| zip_path = os.path.join(temp_dir, 'split_pages.zip') | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| for page_num in range(total_pages): | |
| # Create new PDF for each page | |
| pdf_writer = PdfWriter() | |
| pdf_writer.add_page(pdf_reader.pages[page_num]) | |
| # Write to buffer | |
| page_buffer = BytesIO() | |
| pdf_writer.write(page_buffer) | |
| page_buffer.seek(0) | |
| # Add to ZIP | |
| zipf.writestr(f'page_{page_num + 1}.pdf', page_buffer.read()) | |
| logger.info(f"Split successful! Created {total_pages} page files") | |
| return send_file( | |
| zip_path, | |
| mimetype='application/zip', | |
| as_attachment=True, | |
| download_name='split_pages.zip' | |
| ) | |
| except Exception as e: | |
| logger.error(f"PDF split error: {str(e)}") | |
| return jsonify({ | |
| 'error': 'PDF split failed', | |
| 'details': str(e) | |
| }), 500 | |
| finally: | |
| try: | |
| import shutil | |
| shutil.rmtree(temp_dir) | |
| except Exception as e: | |
| logger.warning(f"Cleanup warning: {e}") | |
| def pdf_merge(): | |
| """Merge multiple PDFs into one""" | |
| if 'files' not in request.files: | |
| return jsonify({'error': 'No files provided'}), 400 | |
| files = request.files.getlist('files') | |
| if len(files) < 2: | |
| return jsonify({'error': 'At least 2 PDF files required'}), 400 | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| logger.info(f"Merging {len(files)} PDF files...") | |
| # Merge PDFs | |
| merger = PdfMerger() | |
| for file in files: | |
| if file.filename.lower().endswith('.pdf'): | |
| merger.append(file.stream) | |
| # Write merged PDF | |
| output_path = os.path.join(temp_dir, 'merged.pdf') | |
| merger.write(output_path) | |
| merger.close() | |
| file_size = os.path.getsize(output_path) | |
| logger.info(f"Merge successful! Output: {file_size} bytes") | |
| return send_file( | |
| output_path, | |
| mimetype='application/pdf', | |
| as_attachment=True, | |
| download_name='merged.pdf' | |
| ) | |
| except Exception as e: | |
| logger.error(f"PDF merge error: {str(e)}") | |
| return jsonify({ | |
| 'error': 'PDF merge failed', | |
| 'details': str(e) | |
| }), 500 | |
| finally: | |
| try: | |
| import shutil | |
| shutil.rmtree(temp_dir) | |
| except Exception as e: | |
| logger.warning(f"Cleanup warning: {e}") | |
| if __name__ == '__main__': | |
| logger.info("🚀 Starting Document Conversion API...") | |
| # Check dependencies on startup | |
| if check_libreoffice(): | |
| logger.info("✅ LibreOffice is ready!") | |
| else: | |
| logger.warning("⚠️ LibreOffice not found") | |
| if check_tesseract(): | |
| logger.info("✅ Tesseract OCR is ready!") | |
| else: | |
| logger.warning("⚠️ Tesseract not found") | |
| # Run Flask app | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port, debug=False) | |