RenAI / app.py
Arsh124's picture
Added support for HF-Model
9a88738
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from typing import Optional, Any, Dict, Union
import shutil
import os
import json
from loguru import logger
from pathlib import Path
import tempfile
import numpy as np
from datetime import datetime
import base64
from io import BytesIO
from PIL import Image
from main import RenAITranscription
app = FastAPI(title="RenAI Transcription API", version="1.0.0")
# Add CORS middleware
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".webp"}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
def numpy_to_base64(array: np.ndarray, format: str = 'PNG', quality: int = 85) -> str:
"""
Convert numpy array (image) to base64 encoded string for web display.
Args:
array: Numpy array representing the image
format: Image format ('PNG' or 'JPEG')
quality: JPEG quality (1-100), only used if format is JPEG
Returns:
Data URI string that can be directly used in HTML <img> src attribute
"""
try:
# Convert numpy array to PIL Image
img = Image.fromarray(array)
# Save to bytes buffer
buffer = BytesIO()
if format.upper() == 'JPEG':
# Convert to RGB if needed (JPEG doesn't support transparency)
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode in ('RGBA', 'LA') else None)
img = background
img.save(buffer, format='JPEG', quality=quality, optimize=True)
mime_type = 'image/jpeg'
else:
img.save(buffer, format='PNG', optimize=True)
mime_type = 'image/png'
# Encode to base64
img_str = base64.b64encode(buffer.getvalue()).decode('utf-8')
return f"data:{mime_type};base64,{img_str}"
except Exception as e:
logger.error(f"Error converting numpy array to base64: {e}")
return None
def format_transcription_result(result: Dict, include_images: bool = False, image_format: str = 'PNG') -> Dict[str, Any]:
"""
Format transcription result into a structured response.
Args:
result: Dictionary with line IDs as keys, each containing 'image' and 'transcription'
include_images: Whether to include base64 encoded images in response
image_format: Image format for base64 encoding ('PNG' or 'JPEG')
Returns:
Formatted dictionary with transcription data
"""
formatted_lines = {}
transcription_text = []
for line_id, line_data in result.items():
formatted_line = {
'line_id': line_id,
'transcription': line_data.get('transcription', '')
}
# Optionally include image as base64 (web-ready format)
if include_images and 'image' in line_data:
image_array = line_data['image']
if isinstance(image_array, np.ndarray):
image_base64 = numpy_to_base64(image_array, format=image_format)
if image_base64:
formatted_line['image'] = image_base64
formatted_lines[line_id] = formatted_line
transcription_text.append(f"{line_id}: {line_data.get('transcription', '')}")
return {
'lines': formatted_lines,
'full_text': '\n'.join(transcription_text),
'total_lines': len(result)
}
@app.get("/")
def home():
return {
"message": "Hello, RenAI!",
"version": "1.0.0",
"endpoints": {
"transcribe": "/renai-transcribe (POST)",
"transcribe_base64": "/renai-transcribe-base64 (POST)",
"health": "/health (GET)"
}
}
@app.post("/renai-transcribe")
async def transcription_endpoint(
image: UploadFile = File(..., description="Image file to transcribe"),
userToken: Optional[str] = Form(None, description="User authentication token"),
post_processing_enabled: bool = Form(False, description="Enable post-processing"),
unet_enabled: bool = Form(False, description="Enable UNet processing"),
include_images: bool = Form(True, description="Include base64 encoded line images in response"),
image_format: str = Form("JPEG", description="Image format for line images: PNG or JPEG")
):
"""
Upload an image file and get transcription results.
- **image**: Image file (JPG, PNG, BMP, TIFF, WebP)
- **userToken**: Optional user authentication token
- **post_processing_enabled**: Enable/disable post-processing
- **unet_enabled**: Enable/disable UNet processing
- **include_images**: Include base64 encoded images of each line (web-ready format)
- **image_format**: Format for line images: 'PNG' (higher quality, larger) or 'JPEG' (smaller, faster)
"""
start_time = datetime.now()
logger.info(f"Transcription request received for file: {image.filename} by userToken: {userToken if userToken else 'Anonymous'}")
# Validate file type
if not image.filename:
raise HTTPException(status_code=400, detail="No file provided")
file_extension = Path(image.filename).suffix.lower()
if file_extension not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Invalid file type. Allowed types: {', '.join(ALLOWED_EXTENSIONS)}"
)
# Check file size
if image.size and image.size > MAX_FILE_SIZE:
raise HTTPException(
status_code=400,
detail=f"File too large. Maximum size: {MAX_FILE_SIZE // (1024*1024)}MB"
)
temp_file_path = None
try:
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as temp_file:
shutil.copyfileobj(image.file, temp_file)
temp_file_path = temp_file.name
logger.info(f"Processing image: {temp_file_path}")
# Call transcription function
result = RenAITranscription(
image=temp_file_path,
post_processing_enabled=post_processing_enabled,
unet_enabled=unet_enabled
)
logger.info(f"Transcription completed. Result type: {type(result)}, Lines: {len(result)}")
# Format the result
formatted_result = format_transcription_result(result, include_images=include_images, image_format=image_format)
# Clean up
os.unlink(temp_file_path)
processing_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Request completed in {processing_time:.2f}s")
response_data = {
"success": True,
"filename": image.filename,
"transcription": formatted_result,
"metadata": {
"processing_time_seconds": round(processing_time, 2),
"timestamp": datetime.now().isoformat(),
"total_lines": formatted_result['total_lines'],
"parameters": {
"post_processing_enabled": post_processing_enabled,
"unet_enabled": unet_enabled,
"include_images": include_images,
"userToken": userToken if userToken else "Anonymous"
}
}
}
return JSONResponse(content=response_data)
except Exception as e:
# Clean up
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
except:
pass
logger.error(f"Transcription failed: {e}")
raise HTTPException(
status_code=500,
detail={
"error": str(e),
"type": type(e).__name__
}
)
@app.post("/renai-transcribe-base64")
async def transcription_base64_endpoint(
image_data: str = Form(..., description="Base64 encoded image data"),
userToken: Optional[str] = Form(None, description="User authentication token"),
post_processing_enabled: bool = Form(False, description="Enable post-processing"),
unet_enabled: bool = Form(False, description="Enable UNet processing"),
include_images: bool = Form(False, description="Include base64 encoded line images in response"),
image_format: str = Form("JPEG", description="Image format for line images: PNG or JPEG")
):
"""
Alternative endpoint that accepts base64 encoded image data.
"""
import base64
import io
from PIL import Image
start_time = datetime.now()
logger.info(f"Base64 transcription request received by userToken: {userToken if userToken else 'Anonymous'}")
temp_file_path = None
try:
# Remove data URL prefix if present
if "," in image_data:
image_data = image_data.split(",", 1)[1]
# Decode base64 image
image_bytes = base64.b64decode(image_data)
image_pil = Image.open(io.BytesIO(image_bytes))
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file:
image_pil.save(temp_file.name)
temp_file_path = temp_file.name
logger.info(f"Processing base64 image: {temp_file_path}")
# Call transcription function
result = RenAITranscription(
image=temp_file_path,
post_processing_enabled=post_processing_enabled,
unet_enabled=unet_enabled
)
# Format the result
formatted_result = format_transcription_result(result, include_images=include_images, image_format=image_format)
# Clean up
os.unlink(temp_file_path)
processing_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Base64 request completed in {processing_time:.2f}s")
response_data = {
"success": True,
"transcription": formatted_result,
"metadata": {
"processing_time_seconds": round(processing_time, 2),
"timestamp": datetime.now().isoformat(),
"total_lines": formatted_result['total_lines'],
"parameters": {
"post_processing_enabled": post_processing_enabled,
"unet_enabled": unet_enabled,
"include_images": include_images,
"image_format": image_format if include_images else None,
"userToken": userToken if userToken else "Anonymous"
}
}
}
return JSONResponse(content=response_data)
except Exception as e:
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
except:
pass
logger.error(f"Base64 transcription failed: {e}")
raise HTTPException(
status_code=500,
detail={
"error": str(e),
"type": type(e).__name__
}
)
@app.get("/health")
def health_check():
try:
return {
"status": "healthy",
"service": "RenAI Transcription API",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=500, detail="Service unhealthy")