from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, status, Query from fastapi.responses import FileResponse from pydantic import BaseModel, EmailStr, Field from typing import Optional import cv2 import numpy as np import tensorflow as tf import pickle import matplotlib.pyplot as plt import matplotlib.font_manager as fm import os import io import sys import tempfile import requests from PIL import Image import uvicorn import shutil from pathlib import Path import py_text_scan from sqlalchemy import create_engine, Column, Integer, String, Boolean, Text, DateTime # --- FIX: Added the missing import below --- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session from passlib.context import CryptContext import datetime # --- Database Setup (SQLite) --- DATABASE_URL = "sqlite:///./test.db" engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # This line will now work correctly Base = declarative_base() # --- Database Models --- class UserModel(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) username = Column(String, unique=True, index=True) email = Column(String, unique=True, index=True) hashed_password = Column(String) is_active = Column(Boolean, default=True) is_admin = Column(Boolean, default=False) class FeedbackModel(Base): __tablename__ = "feedback" id = Column(Integer, primary_key=True, index=True) username = Column(String) comment = Column(Text) created_at = Column(DateTime, default=datetime.datetime.utcnow) Base.metadata.create_all(bind=engine) class OCRResponse(BaseModel): sakshi_output: str word_count: int prediction_label: str app = FastAPI( title="Dynamic Hindi OCR API", description="API for Hindi OCR with selectable models from the frontend.", version="1.1.0" ) # --- Model download and setup remains the same --- MODEL_URL = "https://huggingface.co/sameernotes/hindi-ocr/resolve/main/hindi_ocr_model.keras" ENCODER_URL = "https://huggingface.co/sameernotes/hindi-ocr/resolve/main/label_encoder.pkl" FONT_URL = "https://huggingface.co/sameernotes/hindi-ocr/resolve/main/NotoSansDevanagari-Regular.ttf" MODEL_PATH = "hindi_ocr_model.keras" ENCODER_PATH = "label_encoder.pkl" FONT_PATH = "NotoSansDevanagari-Regular.ttf" model = None label_encoder = None session_files = {} def download_file(url, dest): if not os.path.exists(dest): print(f"Downloading {dest}...") response = requests.get(url, stream=True) response.raise_for_status() with open(dest, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"Downloaded {dest}") @app.on_event("startup") async def startup_event(): global model, label_encoder download_file(MODEL_URL, MODEL_PATH) download_file(ENCODER_URL, ENCODER_PATH) download_file(FONT_URL, FONT_PATH) if os.path.exists(FONT_PATH): fm.fontManager.addfont(FONT_PATH) plt.rcParams['font.family'] = 'Noto Sans Devanagari' model = tf.keras.models.load_model(MODEL_PATH) if os.path.exists(MODEL_PATH) else None if os.path.exists(ENCODER_PATH): with open(ENCODER_PATH, 'rb') as f: label_encoder = pickle.load(f) # --- Image processing functions --- def detect_words(image): _, binary = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) kernel = np.ones((3,3), np.uint8) dilated = cv2.dilate(binary, kernel, iterations=2) contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) word_img = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) word_count = 0 for contour in contours: x, y, w, h = cv2.boundingRect(contour) if w > 10 and h > 10: cv2.rectangle(word_img, (x, y), (x+w, y+h), (0, 255, 0), 2) word_count += 1 return word_img, word_count def run_py_text_scan(image_path): buffer = io.StringIO() old_stdout = sys.stdout sys.stdout = buffer try: py_text_scan.generate(image_path) finally: sys.stdout = old_stdout return buffer.getvalue() def process_image(image_array, use_keras: bool, use_py_text_scan: bool): img = cv2.cvtColor(image_array, cv2.COLOR_RGB2GRAY) word_detected_img, word_count = detect_words(img) word_detection_path = tempfile.NamedTemporaryFile(delete=False, suffix=".png").name cv2.imwrite(word_detection_path, word_detected_img) session_files['word_detection'] = word_detection_path # --- Conditional Keras Model Prediction --- pred_label = "Keras model disabled by user" if use_keras: try: img_resized = cv2.resize(img, (128, 32)) img_norm = img_resized / 255.0 img_input = img_norm[np.newaxis, ..., np.newaxis] if model is not None and label_encoder is not None: pred = model.predict(img_input) pred_label_idx = np.argmax(pred) pred_label = label_encoder.inverse_transform([pred_label_idx])[0] else: pred_label = "Keras model not loaded on server" except Exception as e: pred_label = f"Keras Error: {str(e)}" # --- Conditional py_text_scan Execution --- sakshi_output = "py_text_scan disabled by user" if use_py_text_scan: with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp_file: cv2.imwrite(tmp_file.name, img) sakshi_output = run_py_text_scan(tmp_file.name) os.unlink(tmp_file.name) return { "sakshi_output": sakshi_output, "word_count": word_count, "prediction_label": pred_label } # --- API Endpoints --- @app.post("/process/", response_model=OCRResponse) async def process( file: UploadFile = File(...), use_keras: bool = Query(True, description="Enable/disable the Keras model"), use_py_text_scan: bool = Query(True, description="Enable/disable the py_text_scan library") ): if not file.content_type.startswith("image/"): raise HTTPException(status_code=400, detail="File must be an image") # Clear previous session files for key, filepath in session_files.items(): if os.path.exists(filepath): try: os.unlink(filepath) except: pass session_files.clear() # Process the new image temp_file_path = "" try: # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: shutil.copyfileobj(file.file, temp_file) temp_file_path = temp_file.name image = Image.open(temp_file_path) image_array = np.array(image) # Call the processing function with the flags result = process_image(image_array, use_keras, use_py_text_scan) return OCRResponse( sakshi_output=result["sakshi_output"], word_count=result["word_count"], prediction_label=result["prediction_label"] ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}") finally: # Clean up the temporary file if os.path.exists(temp_file_path): os.unlink(temp_file_path) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)