# # app.py # app.py import uvicorn import numpy as np import cv2 import boto3 import os import json import time import requests from datetime import datetime from fastapi import FastAPI, UploadFile, File, HTTPException, Header from rapidocr_onnxruntime import RapidOCR from openai import OpenAI from pymongo import MongoClient from pymongo.errors import PyMongoError from botocore.exceptions import BotoCoreError, ClientError # ---------------- ENV CONFIG ---------------- DO_KEY_ID = os.getenv("DO_SPACES_KEY_ID") DO_SECRET_KEY = os.getenv("DO_SPACES_SECRET_KEY") DO_REGION = os.getenv("DO_SPACES_REGION", "blr1") DO_ENDPOINT = os.getenv("DO_SPACES_ENDPOINT") DO_BUCKET = os.getenv("DO_SPACES_BUCKET", "milestone") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") FOLDER = "OCR_Images" CATEGORY_API_URL = "https://logicgoinfotechspaces-expensecategorizenotes.hf.space" if not OPENAI_API_KEY: raise RuntimeError("OPENAI_API_KEY missing!") # ---------------- OPENAI ---------------- client = OpenAI(api_key=OPENAI_API_KEY) # ---------------- S3 ---------------- s3 = boto3.client( "s3", region_name=DO_REGION, endpoint_url=DO_ENDPOINT, aws_access_key_id=DO_KEY_ID, aws_secret_access_key=DO_SECRET_KEY, ) # ---------------- MONGODB ---------------- MONGO_URI = os.getenv("MONGO_URI") mongo_client = MongoClient(MONGO_URI) mongo_db = mongo_client["expense"] api_logs_col = mongo_db["api_logs"] # ---------------- APP ---------------- app = FastAPI() ocr_engine = RapidOCR() # ---------------- HELPERS ---------------- def ist_now(): return datetime.now().strftime("%d-%m-%Y %H:%M:%S:IST") def log_api_event( *, status: str, response_time: float, user_id: str | None, error_message: str | None = None ): payload = { "name": "Receipt Scanner", "status": status, "date": ist_now(), "response_time": round(response_time, 3), } if user_id: payload["user_id"] = user_id if error_message: payload["error_message"] = error_message try: api_logs_col.insert_one(payload) except Exception: pass # never break API because of logging failure # ---------------- ROUTES ---------------- @app.get("/health") async def health(): health_report = { "service": "Receipt Scanner API", "status": "healthy", "checks": {} } # ---------------- MongoDB ---------------- try: mongo_client.admin.command("ping") health_report["checks"]["mongodb"] = "ok" except PyMongoError as e: health_report["checks"]["mongodb"] = f"fail: {str(e)}" health_report["status"] = "degraded" # ---------------- OpenAI ---------------- try: # very light call, does not consume tokens client.models.list() health_report["checks"]["openai"] = "ok" except Exception as e: health_report["checks"]["openai"] = f"fail: {str(e)}" health_report["status"] = "degraded" # ---------------- DO Spaces / S3 ---------------- try: s3.head_bucket(Bucket=DO_BUCKET) health_report["checks"]["object_storage"] = "ok" except (BotoCoreError, ClientError) as e: health_report["checks"]["object_storage"] = f"fail: {str(e)}" health_report["status"] = "degraded" # ---------------- OCR Engine ---------------- try: if ocr_engine is None: raise RuntimeError("OCR engine not initialized") health_report["checks"]["ocr"] = "ok" except Exception as e: health_report["checks"]["ocr"] = f"fail: {str(e)}" health_report["status"] = "degraded" # ---------------- Overall ---------------- health_report["timestamp"] = datetime.utcnow().isoformat() return health_report @app.post("/upload") async def upload_image(file: UploadFile = File(...)): try: file_bytes = await file.read() image_key = f"{FOLDER}/{file.filename}" s3.put_object( Bucket=DO_BUCKET, Key=image_key, Body=file_bytes, ContentType=file.content_type, ACL="private" ) return { "status": "success", "message": "Uploaded successfully", "image_id": image_key, "local_path": "/mnt/data/image.png" } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/generate/{image_id:path}") async def generate( image_id: str, user_id: str | None = Header(default=None, alias="user_id") ): start_time = time.time() try: # -------- DOWNLOAD IMAGE -------- try: obj = s3.get_object(Bucket=DO_BUCKET, Key=image_id) raw_bytes = obj["Body"].read() except Exception: local_path = "/mnt/data/image.png" if os.path.exists(local_path): with open(local_path, "rb") as f: raw_bytes = f.read() else: raise HTTPException(status_code=404, detail="Image not found") img_array = np.frombuffer(raw_bytes, np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: raise HTTPException(status_code=400, detail="Unable to decode image") # -------- OCR -------- result, _ = ocr_engine(img) if not result: raise RuntimeError("OCR returned empty result") full_text = "\n".join([text for _, text, _ in result]) confidences = [conf for _, _, conf in result if isinstance(conf, (int, float))] avg_confidence = sum(confidences) / len(confidences) if confidences else 0 if avg_confidence < 0.70: response_time = time.time() - start_time log_api_event( status="fail", response_time=response_time, user_id=user_id, error_message="Low OCR confidence" ) return { "status": "fail", "message": "Upload image with more clarity or enter manually.", "image_id": image_id, "raw_text": full_text, "confidence": round(avg_confidence, 3), } # -------- GPT SCHEMA -------- schema = { "name": "extract_expense_details", "schema": { "type": "object", "properties": { "total_amount": {"type": "number"}, "date": {"type": "string"}, "time": {"type": "string"}, "payment_type": { "type": "string", "enum": ["cash", "card", "upi", "unknown"] }, "notes": {"type": "string"} }, "required": ["total_amount"] } } prompt = f""" Extract expense details from OCR text below: \"\"\" {full_text} \"\"\" Rules: - Do not guess missing values → use "unknown" - Notes format: "Spent on on ." """ response = client.chat.completions.create( model="gpt-4o-mini", response_format={"type": "json_schema", "json_schema": schema}, messages=[ {"role": "system", "content": "You are an expert in receipt parsing."}, {"role": "user", "content": prompt} ], temperature=0.1 ) parsed = json.loads(response.choices[0].message.content) parsed.setdefault("date", "unknown") parsed.setdefault("time", "unknown") parsed.setdefault("payment_type", "cash") parsed.setdefault("notes", "unknown") # Set payment_type to "cash" if it's "unknown" if parsed.get("payment_type") == "unknown": parsed["payment_type"] = "cash" # -------- CATEGORY API -------- try: cat_response = requests.post( f"{CATEGORY_API_URL}/api/v1/categorize", json={ "notes": parsed["notes"], "user_id": user_id }, timeout=10 ) if cat_response.status_code == 200: cat_data = cat_response.json() if cat_data.get("status") == "success" and cat_data.get("data"): data = cat_data["data"] parsed["headcategory_id"] = data.get("headcategory_id") parsed["headcategory_title"] = data.get("headcategory_title") parsed["category_id"] = data.get("category_id") parsed["category_title"] = data.get("category_title") else: parsed["headcategory_id"] = None parsed["headcategory_title"] = None parsed["category_id"] = None parsed["category_title"] = None else: parsed["headcategory_id"] = None parsed["headcategory_title"] = None parsed["category_id"] = None parsed["category_title"] = None except Exception: parsed["headcategory_id"] = None parsed["headcategory_title"] = None parsed["category_id"] = None parsed["category_title"] = None response_time = time.time() - start_time log_api_event( status="success", response_time=response_time, user_id=user_id ) return { "status": "success", "message": "Receipt processed and logged in DB", "image_id": image_id, "confidence": round(avg_confidence, 3), "raw_text": full_text, "parsed": parsed, } except Exception as e: response_time = time.time() - start_time log_api_event( status="fail", response_time=response_time, user_id=user_id, error_message=str(e) ) raise HTTPException(status_code=500, detail=str(e)) @app.get("/ping") def ping(): return {"status": "alive"} if __name__ == "__main__": uvicorn.run("app:app", host="0.0.0.0", port=7860) # import uvicorn # import numpy as np # import cv2 # import boto3 # import os # import json # import time # import requests # from datetime import datetime # from fastapi import FastAPI, UploadFile, File, HTTPException, Header # from rapidocr_onnxruntime import RapidOCR # from openai import OpenAI # from pymongo import MongoClient # from pymongo.errors import PyMongoError # from botocore.exceptions import BotoCoreError, ClientError # # ---------------- ENV CONFIG ---------------- # DO_KEY_ID = os.getenv("DO_SPACES_KEY_ID") # DO_SECRET_KEY = os.getenv("DO_SPACES_SECRET_KEY") # DO_REGION = os.getenv("DO_SPACES_REGION", "blr1") # DO_ENDPOINT = os.getenv("DO_SPACES_ENDPOINT") # DO_BUCKET = os.getenv("DO_SPACES_BUCKET", "milestone") # OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # FOLDER = "OCR_Images" # CATEGORY_API_URL = os.getenv("CATEGORY_API_URL") # NOTES_CATEGORIZER_URL = os.getenv("NOTES_CATEGORIZER_URL") # if not OPENAI_API_KEY: # raise RuntimeError("OPENAI_API_KEY missing!") # # ---------------- OPENAI ---------------- # client = OpenAI(api_key=OPENAI_API_KEY) # # ---------------- S3 ---------------- # s3 = boto3.client( # "s3", # region_name=DO_REGION, # endpoint_url=DO_ENDPOINT, # aws_access_key_id=DO_KEY_ID, # aws_secret_access_key=DO_SECRET_KEY, # ) # # ---------------- MONGODB ---------------- # MONGO_URI = os.getenv("MONGO_URI") # mongo_client = MongoClient(MONGO_URI) # mongo_db = mongo_client["expense"] # api_logs_col = mongo_db["api_logs"] # # ---------------- APP ---------------- # app = FastAPI() # ocr_engine = RapidOCR() # # ---------------- HELPERS ---------------- # def ist_now(): # return datetime.now().strftime("%d-%m-%Y %H:%M:%S:IST") # def log_api_event( # *, # status: str, # response_time: float, # user_id: str | None, # error_message: str | None = None # ): # payload = { # "name": "Receipt Scanner", # "status": status, # "date": ist_now(), # "response_time": round(response_time, 3), # } # if user_id: # payload["user_id"] = user_id # if error_message: # payload["error_message"] = error_message # try: # api_logs_col.insert_one(payload) # except Exception: # pass # never break API because of logging failure # # ---------------- ROUTES ---------------- # @app.get("/health") # async def health(): # health_report = { # "service": "Receipt Scanner API", # "status": "healthy", # "checks": {} # } # # ---------------- MongoDB ---------------- # try: # mongo_client.admin.command("ping") # health_report["checks"]["mongodb"] = "ok" # except PyMongoError as e: # health_report["checks"]["mongodb"] = f"fail: {str(e)}" # health_report["status"] = "degraded" # # ---------------- OpenAI ---------------- # try: # # very light call, does not consume tokens # client.models.list() # health_report["checks"]["openai"] = "ok" # except Exception as e: # health_report["checks"]["openai"] = f"fail: {str(e)}" # health_report["status"] = "degraded" # # ---------------- DO Spaces / S3 ---------------- # try: # s3.head_bucket(Bucket=DO_BUCKET) # health_report["checks"]["object_storage"] = "ok" # except (BotoCoreError, ClientError) as e: # health_report["checks"]["object_storage"] = f"fail: {str(e)}" # health_report["status"] = "degraded" # # ---------------- OCR Engine ---------------- # try: # if ocr_engine is None: # raise RuntimeError("OCR engine not initialized") # health_report["checks"]["ocr"] = "ok" # except Exception as e: # health_report["checks"]["ocr"] = f"fail: {str(e)}" # health_report["status"] = "degraded" # # ---------------- Overall ---------------- # health_report["timestamp"] = datetime.utcnow().isoformat() # return health_report # @app.post("/upload") # async def upload_image(file: UploadFile = File(...)): # try: # file_bytes = await file.read() # image_key = f"{FOLDER}/{file.filename}" # s3.put_object( # Bucket=DO_BUCKET, # Key=image_key, # Body=file_bytes, # ContentType=file.content_type, # ACL="private" # ) # return { # "status": "success", # "message": "Uploaded successfully", # "image_id": image_key, # "local_path": "/mnt/data/image.png" # } # except Exception as e: # raise HTTPException(status_code=500, detail=str(e)) # @app.post("/generate/{image_id:path}") # async def generate( # image_id: str, # user_id: str | None = Header(default=None, alias="user_id") # ): # start_time = time.time() # try: # # -------- DOWNLOAD IMAGE -------- # try: # obj = s3.get_object(Bucket=DO_BUCKET, Key=image_id) # raw_bytes = obj["Body"].read() # except Exception: # local_path = "/mnt/data/image.png" # if os.path.exists(local_path): # with open(local_path, "rb") as f: # raw_bytes = f.read() # else: # raise HTTPException(status_code=404, detail="Image not found") # img_array = np.frombuffer(raw_bytes, np.uint8) # img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) # if img is None: # raise HTTPException(status_code=400, detail="Unable to decode image") # # -------- OCR -------- # result, _ = ocr_engine(img) # if not result: # raise RuntimeError("OCR returned empty result") # full_text = "\n".join([text for _, text, _ in result]) # confidences = [conf for _, _, conf in result if isinstance(conf, (int, float))] # avg_confidence = sum(confidences) / len(confidences) if confidences else 0 # if avg_confidence < 0.70: # response_time = time.time() - start_time # log_api_event( # status="fail", # response_time=response_time, # user_id=user_id, # error_message="Low OCR confidence" # ) # return { # "status": "fail", # "message": "Upload image with more clarity or enter manually.", # "image_id": image_id, # "raw_text": full_text, # "confidence": round(avg_confidence, 3), # } # # -------- GPT SCHEMA -------- # schema = { # "name": "extract_expense_details", # "schema": { # "type": "object", # "properties": { # "total_amount": {"type": "number"}, # "label": {"type": "string"}, # "date": {"type": "string"}, # "time": {"type": "string"}, # "payment_type": { # "type": "string", # "enum": ["cash", "card", "upi", "unknown"] # }, # "notes": {"type": "string"} # }, # "required": ["total_amount", "label"] # } # } # prompt = f""" # Extract expense details from OCR text below: # \"\"\" # {full_text} # \"\"\" # Rules: # - Do not guess missing values → use "unknown" # - Notes format: # "Spent on