Spaces:
No application file
No application file
File size: 3,932 Bytes
cf064e5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import base64
import re
import sqlite3
from datetime import datetime, date, time
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Optional, Dict
from dataclasses import dataclass
import pytesseract
from PIL import Image
@dataclass
class Settings:
telegram_bot_token: str
authorized_user_id: int
gemini_api_key: str
gemini_model: str
pms_blueprint_path: Path
database_path: Path
slip_storage_dir: Path
webapp_base_url: str
webapp_static_dir: Path
webapp_host: str
webapp_port: int
ocr_language: str
tesseract_cmd: Optional[str]
amount_tolerance: Decimal
@dataclass
class SlipExtractionResult:
raw_text: str
amount: Optional[Decimal]
payment_date: Optional[date]
payment_time: Optional[time]
def save_base64_image(b64: str, dest: Path) -> None:
"""Decode base64-encoded image and save to file."""
try:
data = base64.b64decode(b64)
dest.write_bytes(data)
except Exception as exc:
raise RuntimeError(f"ไม่สามารถบันทึกรูปภาพได้: {exc}")
def extract_slip_information(image_path: Path, settings: Settings) -> SlipExtractionResult:
"""Use Tesseract OCR to extract payment info from slip."""
try:
img = Image.open(image_path)
except Exception as exc:
raise RuntimeError(f"เปิดไฟล์ภาพไม่ได้: {exc}")
raw_text = pytesseract.image_to_string(img, lang=settings.ocr_language)
# Extract amount
amount: Optional[Decimal] = None
amt_match = re.search(r'(\d{1,3}(?:[,\s]\d{3})*(?:\.\d{2})|\d+\.\d{2})', raw_text)
if amt_match:
amt_raw = amt_match.group(1).replace(',', '').replace(' ', '')
try:
amount = Decimal(amt_raw)
except InvalidOperation:
pass
# Extract date
payment_date: Optional[date] = None
date_match = re.search(r'(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{2,4})', raw_text)
if date_match:
for fmt in ("%d/%m/%Y", "%d-%m-%Y", "%Y-%m-%d", "%d.%m.%Y"):
try:
payment_date = datetime.strptime(date_match.group(1), fmt).date()
break
except ValueError:
continue
# Extract time
payment_time: Optional[time] = None
time_match = re.search(r'(\d{1,2}:\d{2})(?:\s?(AM|PM))?', raw_text, re.IGNORECASE)
if time_match:
try:
t_str = time_match.group(1)
ampm = time_match.group(2)
if ampm:
payment_time = datetime.strptime(f"{t_str} {ampm}", "%I:%M %p").time()
else:
payment_time = datetime.strptime(t_str, "%H:%M").time()
except ValueError:
pass
return SlipExtractionResult(
raw_text=raw_text,
amount=amount,
payment_date=payment_date,
payment_time=payment_time,
)
def verify_booking_amount(
settings: Settings,
booking_id: str,
extracted_amount: Optional[Decimal],
) -> Dict[str, str]:
"""Verify extracted amount against booking."""
with sqlite3.connect(settings.database_path) as conn:
row = conn.execute("SELECT total_due FROM bookings WHERE id = ?", (booking_id,)).fetchone()
if not row:
return {"status": "booking_not_found"}
expected = Decimal(str(row[0]))
if extracted_amount is None:
return {"status": "amount_missing"}
if abs(expected - extracted_amount) <= settings.amount_tolerance:
return {"status": "verified", "expected_amount": str(expected)}
else:
return {"status": "amount_mismatch", "expected_amount": str(expected)}
def chunk_for_telegram(text: str, limit: int = 4096) -> list[str]:
"""Split text into Telegram-compatible chunks."""
if len(text) <= limit:
return [text]
return [text[i:i + limit] for i in range(0, len(text), limit)]
|