| # # app.py | |
| # app.py | |
| import uvicorn | |
| import numpy as np | |
| import cv2 | |
| import boto3 | |
| import os | |
| import json | |
| import time | |
| import requests | |
| from datetime import datetime | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Header | |
| from rapidocr_onnxruntime import RapidOCR | |
| from openai import OpenAI | |
| from pymongo import MongoClient | |
| from pymongo.errors import PyMongoError | |
| from botocore.exceptions import BotoCoreError, ClientError | |
| # ---------------- ENV CONFIG ---------------- | |
| DO_KEY_ID = os.getenv("DO_SPACES_KEY_ID") | |
| DO_SECRET_KEY = os.getenv("DO_SPACES_SECRET_KEY") | |
| DO_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| DO_ENDPOINT = os.getenv("DO_SPACES_ENDPOINT") | |
| DO_BUCKET = os.getenv("DO_SPACES_BUCKET", "milestone") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| FOLDER = "OCR_Images" | |
| CATEGORY_API_URL = "https://logicgoinfotechspaces-expensecategorizenotes.hf.space" | |
| if not OPENAI_API_KEY: | |
| raise RuntimeError("OPENAI_API_KEY missing!") | |
| # ---------------- OPENAI ---------------- | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| # ---------------- S3 ---------------- | |
| s3 = boto3.client( | |
| "s3", | |
| region_name=DO_REGION, | |
| endpoint_url=DO_ENDPOINT, | |
| aws_access_key_id=DO_KEY_ID, | |
| aws_secret_access_key=DO_SECRET_KEY, | |
| ) | |
| # ---------------- MONGODB ---------------- | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| mongo_client = MongoClient(MONGO_URI) | |
| mongo_db = mongo_client["expense"] | |
| api_logs_col = mongo_db["api_logs"] | |
| # ---------------- APP ---------------- | |
| app = FastAPI() | |
| ocr_engine = RapidOCR() | |
| # ---------------- HELPERS ---------------- | |
| def ist_now(): | |
| return datetime.now().strftime("%d-%m-%Y %H:%M:%S:IST") | |
| def log_api_event( | |
| *, | |
| status: str, | |
| response_time: float, | |
| user_id: str | None, | |
| error_message: str | None = None | |
| ): | |
| payload = { | |
| "name": "Receipt Scanner", | |
| "status": status, | |
| "date": ist_now(), | |
| "response_time": round(response_time, 3), | |
| } | |
| if user_id: | |
| payload["user_id"] = user_id | |
| if error_message: | |
| payload["error_message"] = error_message | |
| try: | |
| api_logs_col.insert_one(payload) | |
| except Exception: | |
| pass # never break API because of logging failure | |
| # ---------------- ROUTES ---------------- | |
| async def health(): | |
| health_report = { | |
| "service": "Receipt Scanner API", | |
| "status": "healthy", | |
| "checks": {} | |
| } | |
| # ---------------- MongoDB ---------------- | |
| try: | |
| mongo_client.admin.command("ping") | |
| health_report["checks"]["mongodb"] = "ok" | |
| except PyMongoError as e: | |
| health_report["checks"]["mongodb"] = f"fail: {str(e)}" | |
| health_report["status"] = "degraded" | |
| # ---------------- OpenAI ---------------- | |
| try: | |
| # very light call, does not consume tokens | |
| client.models.list() | |
| health_report["checks"]["openai"] = "ok" | |
| except Exception as e: | |
| health_report["checks"]["openai"] = f"fail: {str(e)}" | |
| health_report["status"] = "degraded" | |
| # ---------------- DO Spaces / S3 ---------------- | |
| try: | |
| s3.head_bucket(Bucket=DO_BUCKET) | |
| health_report["checks"]["object_storage"] = "ok" | |
| except (BotoCoreError, ClientError) as e: | |
| health_report["checks"]["object_storage"] = f"fail: {str(e)}" | |
| health_report["status"] = "degraded" | |
| # ---------------- OCR Engine ---------------- | |
| try: | |
| if ocr_engine is None: | |
| raise RuntimeError("OCR engine not initialized") | |
| health_report["checks"]["ocr"] = "ok" | |
| except Exception as e: | |
| health_report["checks"]["ocr"] = f"fail: {str(e)}" | |
| health_report["status"] = "degraded" | |
| # ---------------- Overall ---------------- | |
| health_report["timestamp"] = datetime.utcnow().isoformat() | |
| return health_report | |
| async def upload_image(file: UploadFile = File(...)): | |
| try: | |
| file_bytes = await file.read() | |
| image_key = f"{FOLDER}/{file.filename}" | |
| s3.put_object( | |
| Bucket=DO_BUCKET, | |
| Key=image_key, | |
| Body=file_bytes, | |
| ContentType=file.content_type, | |
| ACL="private" | |
| ) | |
| return { | |
| "status": "success", | |
| "message": "Uploaded successfully", | |
| "image_id": image_key, | |
| "local_path": "/mnt/data/image.png" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def generate( | |
| image_id: str, | |
| user_id: str | None = Header(default=None, alias="user_id") | |
| ): | |
| start_time = time.time() | |
| try: | |
| # -------- DOWNLOAD IMAGE -------- | |
| try: | |
| obj = s3.get_object(Bucket=DO_BUCKET, Key=image_id) | |
| raw_bytes = obj["Body"].read() | |
| except Exception: | |
| local_path = "/mnt/data/image.png" | |
| if os.path.exists(local_path): | |
| with open(local_path, "rb") as f: | |
| raw_bytes = f.read() | |
| else: | |
| raise HTTPException(status_code=404, detail="Image not found") | |
| img_array = np.frombuffer(raw_bytes, np.uint8) | |
| img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) | |
| if img is None: | |
| raise HTTPException(status_code=400, detail="Unable to decode image") | |
| # -------- OCR -------- | |
| result, _ = ocr_engine(img) | |
| if not result: | |
| raise RuntimeError("OCR returned empty result") | |
| full_text = "\n".join([text for _, text, _ in result]) | |
| confidences = [conf for _, _, conf in result if isinstance(conf, (int, float))] | |
| avg_confidence = sum(confidences) / len(confidences) if confidences else 0 | |
| if avg_confidence < 0.70: | |
| response_time = time.time() - start_time | |
| log_api_event( | |
| status="fail", | |
| response_time=response_time, | |
| user_id=user_id, | |
| error_message="Low OCR confidence" | |
| ) | |
| return { | |
| "status": "fail", | |
| "message": "Upload image with more clarity or enter manually.", | |
| "image_id": image_id, | |
| "raw_text": full_text, | |
| "confidence": round(avg_confidence, 3), | |
| } | |
| # -------- GPT SCHEMA -------- | |
| schema = { | |
| "name": "extract_expense_details", | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "total_amount": {"type": "number"}, | |
| "date": {"type": "string"}, | |
| "time": {"type": "string"}, | |
| "payment_type": { | |
| "type": "string", | |
| "enum": ["cash", "card", "upi", "unknown"] | |
| }, | |
| "notes": {"type": "string"} | |
| }, | |
| "required": ["total_amount"] | |
| } | |
| } | |
| prompt = f""" | |
| Extract expense details from OCR text below: | |
| \"\"\" | |
| {full_text} | |
| \"\"\" | |
| Rules: | |
| - Do not guess missing values → use "unknown" | |
| - Notes format: | |
| "Spent <total_amount> on <merchant_name> on <date>." | |
| """ | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| response_format={"type": "json_schema", "json_schema": schema}, | |
| messages=[ | |
| {"role": "system", "content": "You are an expert in receipt parsing."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.1 | |
| ) | |
| parsed = json.loads(response.choices[0].message.content) | |
| parsed.setdefault("date", "unknown") | |
| parsed.setdefault("time", "unknown") | |
| parsed.setdefault("payment_type", "cash") | |
| parsed.setdefault("notes", "unknown") | |
| # Set payment_type to "cash" if it's "unknown" | |
| if parsed.get("payment_type") == "unknown": | |
| parsed["payment_type"] = "cash" | |
| # -------- CATEGORY API -------- | |
| try: | |
| cat_response = requests.post( | |
| f"{CATEGORY_API_URL}/api/v1/categorize", | |
| json={ | |
| "notes": parsed["notes"], | |
| "user_id": user_id | |
| }, | |
| timeout=10 | |
| ) | |
| if cat_response.status_code == 200: | |
| cat_data = cat_response.json() | |
| if cat_data.get("status") == "success" and cat_data.get("data"): | |
| data = cat_data["data"] | |
| parsed["headcategory_id"] = data.get("headcategory_id") | |
| parsed["headcategory_title"] = data.get("headcategory_title") | |
| parsed["category_id"] = data.get("category_id") | |
| parsed["category_title"] = data.get("category_title") | |
| else: | |
| parsed["headcategory_id"] = None | |
| parsed["headcategory_title"] = None | |
| parsed["category_id"] = None | |
| parsed["category_title"] = None | |
| else: | |
| parsed["headcategory_id"] = None | |
| parsed["headcategory_title"] = None | |
| parsed["category_id"] = None | |
| parsed["category_title"] = None | |
| except Exception: | |
| parsed["headcategory_id"] = None | |
| parsed["headcategory_title"] = None | |
| parsed["category_id"] = None | |
| parsed["category_title"] = None | |
| response_time = time.time() - start_time | |
| log_api_event( | |
| status="success", | |
| response_time=response_time, | |
| user_id=user_id | |
| ) | |
| return { | |
| "status": "success", | |
| "message": "Receipt processed and logged in DB", | |
| "image_id": image_id, | |
| "confidence": round(avg_confidence, 3), | |
| "raw_text": full_text, | |
| "parsed": parsed, | |
| } | |
| except Exception as e: | |
| response_time = time.time() - start_time | |
| log_api_event( | |
| status="fail", | |
| response_time=response_time, | |
| user_id=user_id, | |
| error_message=str(e) | |
| ) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def ping(): | |
| return {"status": "alive"} | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860) | |
| # import uvicorn | |
| # import numpy as np | |
| # import cv2 | |
| # import boto3 | |
| # import os | |
| # import json | |
| # import time | |
| # import requests | |
| # from datetime import datetime | |
| # from fastapi import FastAPI, UploadFile, File, HTTPException, Header | |
| # from rapidocr_onnxruntime import RapidOCR | |
| # from openai import OpenAI | |
| # from pymongo import MongoClient | |
| # from pymongo.errors import PyMongoError | |
| # from botocore.exceptions import BotoCoreError, ClientError | |
| # # ---------------- ENV CONFIG ---------------- | |
| # DO_KEY_ID = os.getenv("DO_SPACES_KEY_ID") | |
| # DO_SECRET_KEY = os.getenv("DO_SPACES_SECRET_KEY") | |
| # DO_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| # DO_ENDPOINT = os.getenv("DO_SPACES_ENDPOINT") | |
| # DO_BUCKET = os.getenv("DO_SPACES_BUCKET", "milestone") | |
| # OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| # FOLDER = "OCR_Images" | |
| # CATEGORY_API_URL = os.getenv("CATEGORY_API_URL") | |
| # NOTES_CATEGORIZER_URL = os.getenv("NOTES_CATEGORIZER_URL") | |
| # if not OPENAI_API_KEY: | |
| # raise RuntimeError("OPENAI_API_KEY missing!") | |
| # # ---------------- OPENAI ---------------- | |
| # client = OpenAI(api_key=OPENAI_API_KEY) | |
| # # ---------------- S3 ---------------- | |
| # s3 = boto3.client( | |
| # "s3", | |
| # region_name=DO_REGION, | |
| # endpoint_url=DO_ENDPOINT, | |
| # aws_access_key_id=DO_KEY_ID, | |
| # aws_secret_access_key=DO_SECRET_KEY, | |
| # ) | |
| # # ---------------- MONGODB ---------------- | |
| # MONGO_URI = os.getenv("MONGO_URI") | |
| # mongo_client = MongoClient(MONGO_URI) | |
| # mongo_db = mongo_client["expense"] | |
| # api_logs_col = mongo_db["api_logs"] | |
| # # ---------------- APP ---------------- | |
| # app = FastAPI() | |
| # ocr_engine = RapidOCR() | |
| # # ---------------- HELPERS ---------------- | |
| # def ist_now(): | |
| # return datetime.now().strftime("%d-%m-%Y %H:%M:%S:IST") | |
| # def log_api_event( | |
| # *, | |
| # status: str, | |
| # response_time: float, | |
| # user_id: str | None, | |
| # error_message: str | None = None | |
| # ): | |
| # payload = { | |
| # "name": "Receipt Scanner", | |
| # "status": status, | |
| # "date": ist_now(), | |
| # "response_time": round(response_time, 3), | |
| # } | |
| # if user_id: | |
| # payload["user_id"] = user_id | |
| # if error_message: | |
| # payload["error_message"] = error_message | |
| # try: | |
| # api_logs_col.insert_one(payload) | |
| # except Exception: | |
| # pass # never break API because of logging failure | |
| # # ---------------- ROUTES ---------------- | |
| # @app.get("/health") | |
| # async def health(): | |
| # health_report = { | |
| # "service": "Receipt Scanner API", | |
| # "status": "healthy", | |
| # "checks": {} | |
| # } | |
| # # ---------------- MongoDB ---------------- | |
| # try: | |
| # mongo_client.admin.command("ping") | |
| # health_report["checks"]["mongodb"] = "ok" | |
| # except PyMongoError as e: | |
| # health_report["checks"]["mongodb"] = f"fail: {str(e)}" | |
| # health_report["status"] = "degraded" | |
| # # ---------------- OpenAI ---------------- | |
| # try: | |
| # # very light call, does not consume tokens | |
| # client.models.list() | |
| # health_report["checks"]["openai"] = "ok" | |
| # except Exception as e: | |
| # health_report["checks"]["openai"] = f"fail: {str(e)}" | |
| # health_report["status"] = "degraded" | |
| # # ---------------- DO Spaces / S3 ---------------- | |
| # try: | |
| # s3.head_bucket(Bucket=DO_BUCKET) | |
| # health_report["checks"]["object_storage"] = "ok" | |
| # except (BotoCoreError, ClientError) as e: | |
| # health_report["checks"]["object_storage"] = f"fail: {str(e)}" | |
| # health_report["status"] = "degraded" | |
| # # ---------------- OCR Engine ---------------- | |
| # try: | |
| # if ocr_engine is None: | |
| # raise RuntimeError("OCR engine not initialized") | |
| # health_report["checks"]["ocr"] = "ok" | |
| # except Exception as e: | |
| # health_report["checks"]["ocr"] = f"fail: {str(e)}" | |
| # health_report["status"] = "degraded" | |
| # # ---------------- Overall ---------------- | |
| # health_report["timestamp"] = datetime.utcnow().isoformat() | |
| # return health_report | |
| # @app.post("/upload") | |
| # async def upload_image(file: UploadFile = File(...)): | |
| # try: | |
| # file_bytes = await file.read() | |
| # image_key = f"{FOLDER}/{file.filename}" | |
| # s3.put_object( | |
| # Bucket=DO_BUCKET, | |
| # Key=image_key, | |
| # Body=file_bytes, | |
| # ContentType=file.content_type, | |
| # ACL="private" | |
| # ) | |
| # return { | |
| # "status": "success", | |
| # "message": "Uploaded successfully", | |
| # "image_id": image_key, | |
| # "local_path": "/mnt/data/image.png" | |
| # } | |
| # except Exception as e: | |
| # raise HTTPException(status_code=500, detail=str(e)) | |
| # @app.post("/generate/{image_id:path}") | |
| # async def generate( | |
| # image_id: str, | |
| # user_id: str | None = Header(default=None, alias="user_id") | |
| # ): | |
| # start_time = time.time() | |
| # try: | |
| # # -------- DOWNLOAD IMAGE -------- | |
| # try: | |
| # obj = s3.get_object(Bucket=DO_BUCKET, Key=image_id) | |
| # raw_bytes = obj["Body"].read() | |
| # except Exception: | |
| # local_path = "/mnt/data/image.png" | |
| # if os.path.exists(local_path): | |
| # with open(local_path, "rb") as f: | |
| # raw_bytes = f.read() | |
| # else: | |
| # raise HTTPException(status_code=404, detail="Image not found") | |
| # img_array = np.frombuffer(raw_bytes, np.uint8) | |
| # img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) | |
| # if img is None: | |
| # raise HTTPException(status_code=400, detail="Unable to decode image") | |
| # # -------- OCR -------- | |
| # result, _ = ocr_engine(img) | |
| # if not result: | |
| # raise RuntimeError("OCR returned empty result") | |
| # full_text = "\n".join([text for _, text, _ in result]) | |
| # confidences = [conf for _, _, conf in result if isinstance(conf, (int, float))] | |
| # avg_confidence = sum(confidences) / len(confidences) if confidences else 0 | |
| # if avg_confidence < 0.70: | |
| # response_time = time.time() - start_time | |
| # log_api_event( | |
| # status="fail", | |
| # response_time=response_time, | |
| # user_id=user_id, | |
| # error_message="Low OCR confidence" | |
| # ) | |
| # return { | |
| # "status": "fail", | |
| # "message": "Upload image with more clarity or enter manually.", | |
| # "image_id": image_id, | |
| # "raw_text": full_text, | |
| # "confidence": round(avg_confidence, 3), | |
| # } | |
| # # -------- GPT SCHEMA -------- | |
| # schema = { | |
| # "name": "extract_expense_details", | |
| # "schema": { | |
| # "type": "object", | |
| # "properties": { | |
| # "total_amount": {"type": "number"}, | |
| # "label": {"type": "string"}, | |
| # "date": {"type": "string"}, | |
| # "time": {"type": "string"}, | |
| # "payment_type": { | |
| # "type": "string", | |
| # "enum": ["cash", "card", "upi", "unknown"] | |
| # }, | |
| # "notes": {"type": "string"} | |
| # }, | |
| # "required": ["total_amount", "label"] | |
| # } | |
| # } | |
| # prompt = f""" | |
| # Extract expense details from OCR text below: | |
| # \"\"\" | |
| # {full_text} | |
| # \"\"\" | |
| # Rules: | |
| # - Do not guess missing values → use "unknown" | |
| # - Notes format: | |
| # "Spent <total_amount> on <label> on <date>." | |
| # """ | |
| # response = client.chat.completions.create( | |
| # model="gpt-4o-mini", | |
| # response_format={"type": "json_schema", "json_schema": schema}, | |
| # messages=[ | |
| # {"role": "system", "content": "You are an expert in receipt parsing."}, | |
| # {"role": "user", "content": prompt} | |
| # ], | |
| # temperature=0.1 | |
| # ) | |
| # parsed = json.loads(response.choices[0].message.content) | |
| # parsed.setdefault("date", "unknown") | |
| # parsed.setdefault("time", "unknown") | |
| # parsed.setdefault("payment_type", "unknown") | |
| # parsed.setdefault("notes", "unknown") | |
| # # -------- CATEGORY API -------- | |
| # try: | |
| # cat_response = requests.post( | |
| # NOTES_CATEGORIZER_URL, | |
| # json={"notes": parsed["notes"]}, | |
| # timeout=10 | |
| # ) | |
| # if cat_response.status_code == 200: | |
| # cat_data = cat_response.json() | |
| # parsed["category"] = cat_data.get("subcategory", "unknown") | |
| # parsed["category_title"] = cat_data.get("title") | |
| # else: | |
| # parsed["category"] = "unknown" | |
| # parsed["category_title"] = None | |
| # except Exception: | |
| # parsed["category"] = "unknown" | |
| # parsed["category_title"] = None | |
| # response_time = time.time() - start_time | |
| # log_api_event( | |
| # status="success", | |
| # response_time=response_time, | |
| # user_id=user_id | |
| # ) | |
| # return { | |
| # "status": "success", | |
| # "message": "Receipt processed and logged in DB", | |
| # "image_id": image_id, | |
| # "confidence": round(avg_confidence, 3), | |
| # "raw_text": full_text, | |
| # "parsed": parsed, | |
| # } | |
| # except Exception as e: | |
| # response_time = time.time() - start_time | |
| # log_api_event( | |
| # status="fail", | |
| # response_time=response_time, | |
| # user_id=user_id, | |
| # error_message=str(e) | |
| # ) | |
| # raise HTTPException(status_code=500, detail=str(e)) | |
| # @app.get("/ping") | |
| # def ping(): | |
| # return {"status": "alive"} | |
| # if __name__ == "__main__": | |
| # uvicorn.run("app:app", host="0.0.0.0", port=7860) | |