File size: 3,932 Bytes
cf064e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import base64
import re
import sqlite3
from datetime import datetime, date, time
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Optional, Dict
from dataclasses import dataclass

import pytesseract
from PIL import Image


@dataclass
class Settings:
    telegram_bot_token: str
    authorized_user_id: int
    gemini_api_key: str
    gemini_model: str
    pms_blueprint_path: Path
    database_path: Path
    slip_storage_dir: Path
    webapp_base_url: str
    webapp_static_dir: Path
    webapp_host: str
    webapp_port: int
    ocr_language: str
    tesseract_cmd: Optional[str]
    amount_tolerance: Decimal


@dataclass
class SlipExtractionResult:
    raw_text: str
    amount: Optional[Decimal]
    payment_date: Optional[date]
    payment_time: Optional[time]


def save_base64_image(b64: str, dest: Path) -> None:
    """Decode base64-encoded image and save to file."""
    try:
        data = base64.b64decode(b64)
        dest.write_bytes(data)
    except Exception as exc:
        raise RuntimeError(f"ไม่สามารถบันทึกรูปภาพได้: {exc}")


def extract_slip_information(image_path: Path, settings: Settings) -> SlipExtractionResult:
    """Use Tesseract OCR to extract payment info from slip."""
    try:
        img = Image.open(image_path)
    except Exception as exc:
        raise RuntimeError(f"เปิดไฟล์ภาพไม่ได้: {exc}")

    raw_text = pytesseract.image_to_string(img, lang=settings.ocr_language)

    # Extract amount
    amount: Optional[Decimal] = None
    amt_match = re.search(r'(\d{1,3}(?:[,\s]\d{3})*(?:\.\d{2})|\d+\.\d{2})', raw_text)
    if amt_match:
        amt_raw = amt_match.group(1).replace(',', '').replace(' ', '')
        try:
            amount = Decimal(amt_raw)
        except InvalidOperation:
            pass

    # Extract date
    payment_date: Optional[date] = None
    date_match = re.search(r'(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{2,4})', raw_text)
    if date_match:
        for fmt in ("%d/%m/%Y", "%d-%m-%Y", "%Y-%m-%d", "%d.%m.%Y"):
            try:
                payment_date = datetime.strptime(date_match.group(1), fmt).date()
                break
            except ValueError:
                continue

    # Extract time
    payment_time: Optional[time] = None
    time_match = re.search(r'(\d{1,2}:\d{2})(?:\s?(AM|PM))?', raw_text, re.IGNORECASE)
    if time_match:
        try:
            t_str = time_match.group(1)
            ampm = time_match.group(2)
            if ampm:
                payment_time = datetime.strptime(f"{t_str} {ampm}", "%I:%M %p").time()
            else:
                payment_time = datetime.strptime(t_str, "%H:%M").time()
        except ValueError:
            pass

    return SlipExtractionResult(
        raw_text=raw_text,
        amount=amount,
        payment_date=payment_date,
        payment_time=payment_time,
    )


def verify_booking_amount(
    settings: Settings,
    booking_id: str,
    extracted_amount: Optional[Decimal],
) -> Dict[str, str]:
    """Verify extracted amount against booking."""
    with sqlite3.connect(settings.database_path) as conn:
        row = conn.execute("SELECT total_due FROM bookings WHERE id = ?", (booking_id,)).fetchone()

    if not row:
        return {"status": "booking_not_found"}

    expected = Decimal(str(row[0]))

    if extracted_amount is None:
        return {"status": "amount_missing"}

    if abs(expected - extracted_amount) <= settings.amount_tolerance:
        return {"status": "verified", "expected_amount": str(expected)}
    else:
        return {"status": "amount_mismatch", "expected_amount": str(expected)}


def chunk_for_telegram(text: str, limit: int = 4096) -> list[str]:
    """Split text into Telegram-compatible chunks."""
    if len(text) <= limit:
        return [text]
    return [text[i:i + limit] for i in range(0, len(text), limit)]