WalletSyncOCR / app.py
LogicGoInfotechSpaces's picture
Update app.py
80ddd9c verified
# # app.py
# app.py
import uvicorn
import numpy as np
import cv2
import boto3
import os
import json
import time
import requests
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, HTTPException, Header
from rapidocr_onnxruntime import RapidOCR
from openai import OpenAI
from pymongo import MongoClient
from pymongo.errors import PyMongoError
from botocore.exceptions import BotoCoreError, ClientError
# ---------------- ENV CONFIG ----------------
DO_KEY_ID = os.getenv("DO_SPACES_KEY_ID")
DO_SECRET_KEY = os.getenv("DO_SPACES_SECRET_KEY")
DO_REGION = os.getenv("DO_SPACES_REGION", "blr1")
DO_ENDPOINT = os.getenv("DO_SPACES_ENDPOINT")
DO_BUCKET = os.getenv("DO_SPACES_BUCKET", "milestone")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
FOLDER = "OCR_Images"
CATEGORY_API_URL = "https://logicgoinfotechspaces-expensecategorizenotes.hf.space"
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY missing!")
# ---------------- OPENAI ----------------
client = OpenAI(api_key=OPENAI_API_KEY)
# ---------------- S3 ----------------
s3 = boto3.client(
"s3",
region_name=DO_REGION,
endpoint_url=DO_ENDPOINT,
aws_access_key_id=DO_KEY_ID,
aws_secret_access_key=DO_SECRET_KEY,
)
# ---------------- MONGODB ----------------
MONGO_URI = os.getenv("MONGO_URI")
mongo_client = MongoClient(MONGO_URI)
mongo_db = mongo_client["expense"]
api_logs_col = mongo_db["api_logs"]
# ---------------- APP ----------------
app = FastAPI()
ocr_engine = RapidOCR()
# ---------------- HELPERS ----------------
def ist_now():
return datetime.now().strftime("%d-%m-%Y %H:%M:%S:IST")
def log_api_event(
*,
status: str,
response_time: float,
user_id: str | None,
error_message: str | None = None
):
payload = {
"name": "Receipt Scanner",
"status": status,
"date": ist_now(),
"response_time": round(response_time, 3),
}
if user_id:
payload["user_id"] = user_id
if error_message:
payload["error_message"] = error_message
try:
api_logs_col.insert_one(payload)
except Exception:
pass # never break API because of logging failure
# ---------------- ROUTES ----------------
@app.get("/health")
async def health():
health_report = {
"service": "Receipt Scanner API",
"status": "healthy",
"checks": {}
}
# ---------------- MongoDB ----------------
try:
mongo_client.admin.command("ping")
health_report["checks"]["mongodb"] = "ok"
except PyMongoError as e:
health_report["checks"]["mongodb"] = f"fail: {str(e)}"
health_report["status"] = "degraded"
# ---------------- OpenAI ----------------
try:
# very light call, does not consume tokens
client.models.list()
health_report["checks"]["openai"] = "ok"
except Exception as e:
health_report["checks"]["openai"] = f"fail: {str(e)}"
health_report["status"] = "degraded"
# ---------------- DO Spaces / S3 ----------------
try:
s3.head_bucket(Bucket=DO_BUCKET)
health_report["checks"]["object_storage"] = "ok"
except (BotoCoreError, ClientError) as e:
health_report["checks"]["object_storage"] = f"fail: {str(e)}"
health_report["status"] = "degraded"
# ---------------- OCR Engine ----------------
try:
if ocr_engine is None:
raise RuntimeError("OCR engine not initialized")
health_report["checks"]["ocr"] = "ok"
except Exception as e:
health_report["checks"]["ocr"] = f"fail: {str(e)}"
health_report["status"] = "degraded"
# ---------------- Overall ----------------
health_report["timestamp"] = datetime.utcnow().isoformat()
return health_report
@app.post("/upload")
async def upload_image(file: UploadFile = File(...)):
try:
file_bytes = await file.read()
image_key = f"{FOLDER}/{file.filename}"
s3.put_object(
Bucket=DO_BUCKET,
Key=image_key,
Body=file_bytes,
ContentType=file.content_type,
ACL="private"
)
return {
"status": "success",
"message": "Uploaded successfully",
"image_id": image_key,
"local_path": "/mnt/data/image.png"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/generate/{image_id:path}")
async def generate(
image_id: str,
user_id: str | None = Header(default=None, alias="user_id")
):
start_time = time.time()
try:
# -------- DOWNLOAD IMAGE --------
try:
obj = s3.get_object(Bucket=DO_BUCKET, Key=image_id)
raw_bytes = obj["Body"].read()
except Exception:
local_path = "/mnt/data/image.png"
if os.path.exists(local_path):
with open(local_path, "rb") as f:
raw_bytes = f.read()
else:
raise HTTPException(status_code=404, detail="Image not found")
img_array = np.frombuffer(raw_bytes, np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=400, detail="Unable to decode image")
# -------- OCR --------
result, _ = ocr_engine(img)
if not result:
raise RuntimeError("OCR returned empty result")
full_text = "\n".join([text for _, text, _ in result])
confidences = [conf for _, _, conf in result if isinstance(conf, (int, float))]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
if avg_confidence < 0.70:
response_time = time.time() - start_time
log_api_event(
status="fail",
response_time=response_time,
user_id=user_id,
error_message="Low OCR confidence"
)
return {
"status": "fail",
"message": "Upload image with more clarity or enter manually.",
"image_id": image_id,
"raw_text": full_text,
"confidence": round(avg_confidence, 3),
}
# -------- GPT SCHEMA --------
schema = {
"name": "extract_expense_details",
"schema": {
"type": "object",
"properties": {
"total_amount": {"type": "number"},
"date": {"type": "string"},
"time": {"type": "string"},
"payment_type": {
"type": "string",
"enum": ["cash", "card", "upi", "unknown"]
},
"notes": {"type": "string"}
},
"required": ["total_amount"]
}
}
prompt = f"""
Extract expense details from OCR text below:
\"\"\"
{full_text}
\"\"\"
Rules:
- Do not guess missing values → use "unknown"
- Notes format:
"Spent <total_amount> on <merchant_name> on <date>."
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
response_format={"type": "json_schema", "json_schema": schema},
messages=[
{"role": "system", "content": "You are an expert in receipt parsing."},
{"role": "user", "content": prompt}
],
temperature=0.1
)
parsed = json.loads(response.choices[0].message.content)
parsed.setdefault("date", "unknown")
parsed.setdefault("time", "unknown")
parsed.setdefault("payment_type", "cash")
parsed.setdefault("notes", "unknown")
# Set payment_type to "cash" if it's "unknown"
if parsed.get("payment_type") == "unknown":
parsed["payment_type"] = "cash"
# -------- CATEGORY API --------
try:
cat_response = requests.post(
f"{CATEGORY_API_URL}/api/v1/categorize",
json={
"notes": parsed["notes"],
"user_id": user_id
},
timeout=10
)
if cat_response.status_code == 200:
cat_data = cat_response.json()
if cat_data.get("status") == "success" and cat_data.get("data"):
data = cat_data["data"]
parsed["headcategory_id"] = data.get("headcategory_id")
parsed["headcategory_title"] = data.get("headcategory_title")
parsed["category_id"] = data.get("category_id")
parsed["category_title"] = data.get("category_title")
else:
parsed["headcategory_id"] = None
parsed["headcategory_title"] = None
parsed["category_id"] = None
parsed["category_title"] = None
else:
parsed["headcategory_id"] = None
parsed["headcategory_title"] = None
parsed["category_id"] = None
parsed["category_title"] = None
except Exception:
parsed["headcategory_id"] = None
parsed["headcategory_title"] = None
parsed["category_id"] = None
parsed["category_title"] = None
response_time = time.time() - start_time
log_api_event(
status="success",
response_time=response_time,
user_id=user_id
)
return {
"status": "success",
"message": "Receipt processed and logged in DB",
"image_id": image_id,
"confidence": round(avg_confidence, 3),
"raw_text": full_text,
"parsed": parsed,
}
except Exception as e:
response_time = time.time() - start_time
log_api_event(
status="fail",
response_time=response_time,
user_id=user_id,
error_message=str(e)
)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/ping")
def ping():
return {"status": "alive"}
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=7860)
# import uvicorn
# import numpy as np
# import cv2
# import boto3
# import os
# import json
# import time
# import requests
# from datetime import datetime
# from fastapi import FastAPI, UploadFile, File, HTTPException, Header
# from rapidocr_onnxruntime import RapidOCR
# from openai import OpenAI
# from pymongo import MongoClient
# from pymongo.errors import PyMongoError
# from botocore.exceptions import BotoCoreError, ClientError
# # ---------------- ENV CONFIG ----------------
# DO_KEY_ID = os.getenv("DO_SPACES_KEY_ID")
# DO_SECRET_KEY = os.getenv("DO_SPACES_SECRET_KEY")
# DO_REGION = os.getenv("DO_SPACES_REGION", "blr1")
# DO_ENDPOINT = os.getenv("DO_SPACES_ENDPOINT")
# DO_BUCKET = os.getenv("DO_SPACES_BUCKET", "milestone")
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# FOLDER = "OCR_Images"
# CATEGORY_API_URL = os.getenv("CATEGORY_API_URL")
# NOTES_CATEGORIZER_URL = os.getenv("NOTES_CATEGORIZER_URL")
# if not OPENAI_API_KEY:
# raise RuntimeError("OPENAI_API_KEY missing!")
# # ---------------- OPENAI ----------------
# client = OpenAI(api_key=OPENAI_API_KEY)
# # ---------------- S3 ----------------
# s3 = boto3.client(
# "s3",
# region_name=DO_REGION,
# endpoint_url=DO_ENDPOINT,
# aws_access_key_id=DO_KEY_ID,
# aws_secret_access_key=DO_SECRET_KEY,
# )
# # ---------------- MONGODB ----------------
# MONGO_URI = os.getenv("MONGO_URI")
# mongo_client = MongoClient(MONGO_URI)
# mongo_db = mongo_client["expense"]
# api_logs_col = mongo_db["api_logs"]
# # ---------------- APP ----------------
# app = FastAPI()
# ocr_engine = RapidOCR()
# # ---------------- HELPERS ----------------
# def ist_now():
# return datetime.now().strftime("%d-%m-%Y %H:%M:%S:IST")
# def log_api_event(
# *,
# status: str,
# response_time: float,
# user_id: str | None,
# error_message: str | None = None
# ):
# payload = {
# "name": "Receipt Scanner",
# "status": status,
# "date": ist_now(),
# "response_time": round(response_time, 3),
# }
# if user_id:
# payload["user_id"] = user_id
# if error_message:
# payload["error_message"] = error_message
# try:
# api_logs_col.insert_one(payload)
# except Exception:
# pass # never break API because of logging failure
# # ---------------- ROUTES ----------------
# @app.get("/health")
# async def health():
# health_report = {
# "service": "Receipt Scanner API",
# "status": "healthy",
# "checks": {}
# }
# # ---------------- MongoDB ----------------
# try:
# mongo_client.admin.command("ping")
# health_report["checks"]["mongodb"] = "ok"
# except PyMongoError as e:
# health_report["checks"]["mongodb"] = f"fail: {str(e)}"
# health_report["status"] = "degraded"
# # ---------------- OpenAI ----------------
# try:
# # very light call, does not consume tokens
# client.models.list()
# health_report["checks"]["openai"] = "ok"
# except Exception as e:
# health_report["checks"]["openai"] = f"fail: {str(e)}"
# health_report["status"] = "degraded"
# # ---------------- DO Spaces / S3 ----------------
# try:
# s3.head_bucket(Bucket=DO_BUCKET)
# health_report["checks"]["object_storage"] = "ok"
# except (BotoCoreError, ClientError) as e:
# health_report["checks"]["object_storage"] = f"fail: {str(e)}"
# health_report["status"] = "degraded"
# # ---------------- OCR Engine ----------------
# try:
# if ocr_engine is None:
# raise RuntimeError("OCR engine not initialized")
# health_report["checks"]["ocr"] = "ok"
# except Exception as e:
# health_report["checks"]["ocr"] = f"fail: {str(e)}"
# health_report["status"] = "degraded"
# # ---------------- Overall ----------------
# health_report["timestamp"] = datetime.utcnow().isoformat()
# return health_report
# @app.post("/upload")
# async def upload_image(file: UploadFile = File(...)):
# try:
# file_bytes = await file.read()
# image_key = f"{FOLDER}/{file.filename}"
# s3.put_object(
# Bucket=DO_BUCKET,
# Key=image_key,
# Body=file_bytes,
# ContentType=file.content_type,
# ACL="private"
# )
# return {
# "status": "success",
# "message": "Uploaded successfully",
# "image_id": image_key,
# "local_path": "/mnt/data/image.png"
# }
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @app.post("/generate/{image_id:path}")
# async def generate(
# image_id: str,
# user_id: str | None = Header(default=None, alias="user_id")
# ):
# start_time = time.time()
# try:
# # -------- DOWNLOAD IMAGE --------
# try:
# obj = s3.get_object(Bucket=DO_BUCKET, Key=image_id)
# raw_bytes = obj["Body"].read()
# except Exception:
# local_path = "/mnt/data/image.png"
# if os.path.exists(local_path):
# with open(local_path, "rb") as f:
# raw_bytes = f.read()
# else:
# raise HTTPException(status_code=404, detail="Image not found")
# img_array = np.frombuffer(raw_bytes, np.uint8)
# img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
# if img is None:
# raise HTTPException(status_code=400, detail="Unable to decode image")
# # -------- OCR --------
# result, _ = ocr_engine(img)
# if not result:
# raise RuntimeError("OCR returned empty result")
# full_text = "\n".join([text for _, text, _ in result])
# confidences = [conf for _, _, conf in result if isinstance(conf, (int, float))]
# avg_confidence = sum(confidences) / len(confidences) if confidences else 0
# if avg_confidence < 0.70:
# response_time = time.time() - start_time
# log_api_event(
# status="fail",
# response_time=response_time,
# user_id=user_id,
# error_message="Low OCR confidence"
# )
# return {
# "status": "fail",
# "message": "Upload image with more clarity or enter manually.",
# "image_id": image_id,
# "raw_text": full_text,
# "confidence": round(avg_confidence, 3),
# }
# # -------- GPT SCHEMA --------
# schema = {
# "name": "extract_expense_details",
# "schema": {
# "type": "object",
# "properties": {
# "total_amount": {"type": "number"},
# "label": {"type": "string"},
# "date": {"type": "string"},
# "time": {"type": "string"},
# "payment_type": {
# "type": "string",
# "enum": ["cash", "card", "upi", "unknown"]
# },
# "notes": {"type": "string"}
# },
# "required": ["total_amount", "label"]
# }
# }
# prompt = f"""
# Extract expense details from OCR text below:
# \"\"\"
# {full_text}
# \"\"\"
# Rules:
# - Do not guess missing values → use "unknown"
# - Notes format:
# "Spent <total_amount> on <label> on <date>."
# """
# response = client.chat.completions.create(
# model="gpt-4o-mini",
# response_format={"type": "json_schema", "json_schema": schema},
# messages=[
# {"role": "system", "content": "You are an expert in receipt parsing."},
# {"role": "user", "content": prompt}
# ],
# temperature=0.1
# )
# parsed = json.loads(response.choices[0].message.content)
# parsed.setdefault("date", "unknown")
# parsed.setdefault("time", "unknown")
# parsed.setdefault("payment_type", "unknown")
# parsed.setdefault("notes", "unknown")
# # -------- CATEGORY API --------
# try:
# cat_response = requests.post(
# NOTES_CATEGORIZER_URL,
# json={"notes": parsed["notes"]},
# timeout=10
# )
# if cat_response.status_code == 200:
# cat_data = cat_response.json()
# parsed["category"] = cat_data.get("subcategory", "unknown")
# parsed["category_title"] = cat_data.get("title")
# else:
# parsed["category"] = "unknown"
# parsed["category_title"] = None
# except Exception:
# parsed["category"] = "unknown"
# parsed["category_title"] = None
# response_time = time.time() - start_time
# log_api_event(
# status="success",
# response_time=response_time,
# user_id=user_id
# )
# return {
# "status": "success",
# "message": "Receipt processed and logged in DB",
# "image_id": image_id,
# "confidence": round(avg_confidence, 3),
# "raw_text": full_text,
# "parsed": parsed,
# }
# except Exception as e:
# response_time = time.time() - start_time
# log_api_event(
# status="fail",
# response_time=response_time,
# user_id=user_id,
# error_message=str(e)
# )
# raise HTTPException(status_code=500, detail=str(e))
# @app.get("/ping")
# def ping():
# return {"status": "alive"}
# if __name__ == "__main__":
# uvicorn.run("app:app", host="0.0.0.0", port=7860)