Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException, Request | |
| from fastapi.responses import JSONResponse, HTMLResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import fitz # PyMuPDF | |
| from transformers import AutoModel, AutoTokenizer | |
| from PIL import Image | |
| import numpy as np | |
| import os | |
| import base64 | |
| import io | |
| import uuid | |
| import tempfile | |
| import time | |
| import shutil | |
| from pathlib import Path | |
| import json | |
| from starlette.requests import Request | |
| import uvicorn | |
| from bs4 import BeautifulSoup | |
| app = FastAPI() | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allow all origins for simplicity | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Load tokenizer and model | |
| tokenizer = AutoTokenizer.from_pretrained('ucaslcl/GOT-OCR2_0', trust_remote_code=True) | |
| model = AutoModel.from_pretrained('ucaslcl/GOT-OCR2_0', trust_remote_code=True, device_map='cuda', use_safetensors=True) | |
| model = model.eval().cuda() | |
| UPLOAD_FOLDER = "./uploads" | |
| RESULTS_FOLDER = "./results" | |
| # Ensure directories exist | |
| for folder in [UPLOAD_FOLDER, RESULTS_FOLDER]: | |
| if not os.path.exists(folder): | |
| os.makedirs(folder) | |
| def image_to_base64(image): | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="PNG") | |
| return base64.b64encode(buffered.getvalue()).decode() | |
| def pdf_to_images(pdf_path): | |
| images = [] | |
| pdf_document = fitz.open(pdf_path) | |
| for page_num in range(len(pdf_document)): | |
| page = pdf_document.load_page(page_num) | |
| pix = page.get_pixmap() | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| images.append(img) | |
| return images | |
| def run_GOT(pdf_file): | |
| unique_id = str(uuid.uuid4()) | |
| pdf_path = os.path.join(UPLOAD_FOLDER, f"{unique_id}.pdf") | |
| shutil.copy(pdf_file, pdf_path) | |
| images = pdf_to_images(pdf_path) | |
| results = [] | |
| try: | |
| for i, image in enumerate(images): | |
| image_path = os.path.join(UPLOAD_FOLDER, f"{unique_id}_page_{i+1}.png") | |
| image.save(image_path) | |
| result_path = os.path.join(RESULTS_FOLDER, f"{unique_id}_page_{i+1}.html") | |
| res = model.chat_crop(tokenizer, image_path, ocr_type='format', render=True, save_render_file=result_path) | |
| # Read the rendered HTML content | |
| with open(result_path, 'r') as f: | |
| html_content = f.read() | |
| # Encode the HTML content to base64 | |
| encoded_html = base64.b64encode(html_content.encode('utf-8')).decode('utf-8') | |
| iframe_src = f"data:text/html;base64,{encoded_html}" | |
| iframe = f'<iframe src="{iframe_src}" width="100%" height="600px"></iframe>' | |
| download_link = f'<a href="data:text/html;base64,{encoded_html}" download="result_{unique_id}_page_{i+1}.html">Download Full Result</a>' | |
| results.append({ | |
| "page_number": i + 1, | |
| "text": res, # Directly use the output from model.chat_crop | |
| "html": iframe, | |
| "download_link": download_link | |
| }) | |
| if os.path.exists(image_path): | |
| os.remove(image_path) | |
| if os.path.exists(result_path): | |
| os.remove(result_path) | |
| except Exception as e: | |
| return f"Error: {str(e)}", None | |
| finally: | |
| if os.path.exists(pdf_path): | |
| os.remove(pdf_path) | |
| return json.dumps(results, indent=4), results | |
| def cleanup_old_files(): | |
| current_time = time.time() | |
| for folder in [UPLOAD_FOLDER, RESULTS_FOLDER]: | |
| for file_path in Path(folder).glob('*'): | |
| if current_time - file_path.stat().st_mtime > 3600: # 1 hour | |
| file_path.unlink() | |
| cleanup_old_files() | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Set up Jinja2 templates | |
| templates = Jinja2Templates(directory="templates") | |
| async def read_root(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def upload_file(file: UploadFile = File(...)): | |
| temp_dir = tempfile.TemporaryDirectory() | |
| temp_pdf_path = os.path.join(temp_dir.name, file.filename) | |
| with open(temp_pdf_path, "wb") as buffer: | |
| buffer.write(await file.read()) | |
| json_output, results = run_GOT(temp_pdf_path) | |
| temp_dir.cleanup() | |
| return results | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |