PersianOCR / temp_uploads /Persian_ocr.py
Really-amin's picture
Rename Persian_ocr.py to temp_uploads/Persian_ocr.py
0e99ba3 verified
"""
FastAPI Webhook Integration for Persian OCR System
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, File, UploadFile, Form
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl
import requests
import json
import asyncio
from typing import List, Dict, Optional, Any
import uvicorn
from datetime import datetime
import aiohttp
import logging
from pathlib import Path
import shutil
import aiofiles
import numpy as np
import cv2
from io import BytesIO
# Import our OCR system
from app import PersianOCRSystem
# تنظیمات لاگینگ
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class WebhookRequest(BaseModel):
"""مدل درخواست وب‌هوک"""
url: HttpUrl
headers: Optional[Dict[str, str]] = None
secret: Optional[str] = None
class ProcessingResult(BaseModel):
"""مدل نتیجه پردازش"""
job_id: str
status: str
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
processed_at: datetime
class WebhookConfig:
"""تنظیمات وب‌هوک"""
def __init__(self):
self.webhooks: Dict[str, WebhookRequest] = {}
self.results: Dict[str, ProcessingResult] = {}
self.max_retries = 3
self.retry_delay = 5 # seconds
class OCRWebhookAPI:
"""API وب‌هوک برای سیستم OCR"""
def __init__(self):
self.app = FastAPI(
title="Persian OCR Webhook API",
description="API for Persian OCR system with webhook support",
version="1.0.0"
)
# تنظیم CORS
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# راه‌اندازی سیستم OCR و تنظیمات وب‌هوک
self.ocr_system = PersianOCRSystem()
self.webhook_config = WebhookConfig()
# ایجاد مسیر موقت
self.temp_dir = Path("temp_uploads")
self.temp_dir.mkdir(exist_ok=True)
# تعریف مسیرها
self.setup_routes()
def setup_routes(self):
"""تنظیم مسیرهای API"""
@self.app.post("/webhook/register")
async def register_webhook(webhook: WebhookRequest):
"""ثبت یک وب‌هوک جدید"""
try:
webhook_id = str(len(self.webhook_config.webhooks) + 1)
self.webhook_config.webhooks[webhook_id] = webhook
return {"webhook_id": webhook_id, "status": "registered"}
except Exception as e:
logger.error(f"خطا در ثبت وب‌هوک: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@self.app.post("/process")
async def process_image(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
webhook_id: Optional[str] = Form(None)
):
"""پردازش تصویر و ارسال نتایج از طریق وب‌هوک"""
try:
# ذخیره فایل
job_id = f"job_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
file_path = self.temp_dir / f"{job_id}_{file.filename}"
async with aiofiles.open(file_path, 'wb') as out_file:
content = await file.read()
await out_file.write(content)
# اضافه کردن تسک پردازش به پس‌زمینه
background_tasks.add_task(
self.process_and_notify,
file_path,
job_id,
webhook_id
)
return {
"job_id": job_id,
"status": "processing",
"message": "Image processing started"
}
except Exception as e:
logger.error(f"خطا در شروع پردازش: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/status/{job_id}")
async def get_status(job_id: str):
"""دریافت وضعیت پردازش"""
if job_id in self.webhook_config.results:
return self.webhook_config.results[job_id]
raise HTTPException(status_code=404, detail="Job not found")
@self.app.delete("/webhook/{webhook_id}")
async def delete_webhook(webhook_id: str):
"""حذف وب‌هوک"""
if webhook_id in self.webhook_config.webhooks:
del self.webhook_config.webhooks[webhook_id]
return {"status": "deleted"}
raise HTTPException(status_code=404, detail="Webhook not found")
async def process_and_notify(
self,
file_path: Path,
job_id: str,
webhook_id: Optional[str]
):
"""پردازش تصویر و ارسال نتایج"""
try:
# خواندن و پردازش تصویر
image = cv2.imread(str(file_path))
if image is None:
raise ValueError("Failed to load image")
# پردازش OCR
results = await self.ocr_system.recognize_text(image)
# ذخیره نتیجه
processing_result = ProcessingResult(
job_id=job_id,
status="completed",
result=results,
processed_at=datetime.now()
)
self.webhook_config.results[job_id] = processing_result
# ارسال نتایج از طریق وب‌هوک
if webhook_id and webhook_id in self.webhook_config.webhooks:
await self.send_webhook_notification(
webhook_id,
job_id,
processing_result
)
except Exception as e:
logger.error(f"خطا در پردازش تصویر: {str(e)}")
self.webhook_config.results[job_id] = ProcessingResult(
job_id=job_id,
status="failed",
error=str(e),
processed_at=datetime.now()
)
# ارسال خطا از طریق وب‌هوک
if webhook_id and webhook_id in self.webhook_config.webhooks:
await self.send_webhook_notification(
webhook_id,
job_id,
self.webhook_config.results[job_id]
)
finally:
# پاکسازی فایل موقت
try:
file_path.unlink()
except Exception as e:
logger.warning(f"خطا در حذف فایل موقت: {str(e)}")
async def send_webhook_notification(
self,
webhook_id: str,
job_id: str,
result: ProcessingResult
):
"""ارسال نتایج به وب‌هوک"""
webhook = self.webhook_config.webhooks[webhook_id]
for attempt in range(self.webhook_config.max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
str(webhook.url),
json={
"job_id": job_id,
"result": result.dict()
},
headers=webhook.headers
) as response:
if response.status == 200:
logger.info(f"Webhook notification sent successfully for job {job_id}")
return
logger.warning(
f"Webhook attempt {attempt + 1} failed for job {job_id}"
)
await asyncio.sleep(self.webhook_config.retry_delay)
except Exception as e:
logger.error(f"Error sending webhook notification: {str(e)}")
if attempt == self.webhook_config.max_retries - 1:
logger.error(f"All webhook attempts failed for job {job_id}")
def run(self, host: str = "0.0.0.0", port: int = 8000):
"""اجرای سرور"""
uvicorn.run(self.app, host=host, port=port)
def create_api():
"""ایجاد نمونه API"""
api = OCRWebhookAPI()
return api.app
if __name__ == "__main__":
api = OCRWebhookAPI()
api.run()