| import gradio as gr |
| from graph_manager import user_input_handler |
| from nodes import session_form_data, get_form_info, set_form_info |
| import ast |
| import uuid |
| import logging |
| import sqlite3 |
| import pandas as pd |
| import os |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| def generate_session_id(): |
| session_id = str(uuid.uuid4()) |
| return session_id, f"Session ID: {session_id}" |
|
|
| DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'database', 'hello_earth_data_2.db') |
|
|
| def connect_db(): |
| return sqlite3.connect(DB_PATH) |
|
|
| def fetch_df(query, params=None): |
| with connect_db() as conn: |
| return pd.read_sql_query(query, conn, params=params or ()) |
|
|
| def get_deliverable_data(): |
| return fetch_df("SELECT * FROM Deliverable") |
|
|
| def get_deliverable_titles(): |
| df = get_deliverable_data() |
| return ['None'] + df['title'].dropna().tolist() |
|
|
| def save_expense_to_db(form_info_dict): |
| deliverable_df = get_deliverable_data() |
| title_to_id = dict(zip(deliverable_df['title'], deliverable_df['deliverable_id'])) |
|
|
| associated_title = form_info_dict.get("Associated Deliverable", "None") |
| associated_id = title_to_id.get(associated_title, None) |
| if associated_id is None: |
| logger.warning(f"No matching deliverable ID found for title: {associated_title}") |
| return False |
|
|
| expense_id = str(uuid.uuid4()) |
| seller_name = form_info_dict.get("Seller Name") |
| seller_address = form_info_dict.get("Seller Address") |
| seller_phone_number = form_info_dict.get("Seller Phone Number") |
| buyer_name = form_info_dict.get("Buyer Name") |
| buyer_address = form_info_dict.get("Buyer Address") |
| raw_date = form_info_dict.get("Transaction Date") |
| transaction_date = pd.to_datetime(raw_date).date().isoformat() if raw_date else None |
| total_payment_amount = form_info_dict.get("Total Payment Amount") |
| expense_description = form_info_dict.get("Expense Description") |
| status = "approved" |
|
|
| insert_query = """ |
| INSERT INTO Expense ( |
| expense_id, associated_deliverable_id, seller_name, seller_address, seller_phone_number, |
| buyer_name, buyer_address, transaction_date, total_payment_amount, expense_description, status |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| """ |
| with connect_db() as conn: |
| cursor = conn.cursor() |
| cursor.execute(insert_query, ( |
| expense_id, associated_id, seller_name, seller_address, seller_phone_number, |
| buyer_name, buyer_address, transaction_date, total_payment_amount, expense_description, status |
| )) |
| conn.commit() |
| logger.info(f"Expense saved: {expense_id}") |
| return True |
|
|
| def is_form_complete(form_info_dict): |
| required_keys = [ |
| "Associated Deliverable", "Seller Name", "Seller Address", "Seller Phone Number", |
| "Buyer Name", "Buyer Address", "Transaction Date", "Total Payment Amount", "Expense Description" |
| ] |
| for key in required_keys: |
| value = form_info_dict.get(key) |
| if not value or value == "None": |
| return gr.update(interactive=False) |
| return gr.update(interactive=True) |
|
|
| def update_field_input(key): |
| def inner(session_id, value): |
| set_form_info(session_id, key, value) |
| return get_form_info(session_id) |
| return inner |
|
|
| def update_form_information_handler(form_information): |
| valid_titles = get_deliverable_titles() |
| target_deliverable = form_information.get("Associated Deliverable") |
| target_deliverable = target_deliverable if target_deliverable in valid_titles else "None" |
| |
| def clean(val): |
| return None if val == "None" else val |
|
|
| seller_name_input = clean(form_information.get("Seller Name")) |
| seller_address_input = clean(form_information.get("Seller Address")) |
| seller_phone_number_input = clean(form_information.get("Seller Phone Number")) |
| buyer_name_input = clean(form_information.get("Buyer Name")) |
| buyer_address_input = clean(form_information.get("Buyer Address")) |
| transaction_date_input = clean(form_information.get("Transaction Date")) |
|
|
| total_payment_amount_input = form_information.get("Total Payment Amount") |
| if total_payment_amount_input and total_payment_amount_input != "None": |
| total_payment_amount_input = float(total_payment_amount_input) |
|
|
| expense_description_input = clean(form_information.get("Expense Description")) |
|
|
| return ( |
| target_deliverable, seller_name_input, seller_address_input, seller_phone_number_input, |
| buyer_name_input, buyer_address_input, transaction_date_input, |
| total_payment_amount_input, expense_description_input |
| ) |
|
|
| def handle_submit(session_id): |
| form_data = get_form_info(session_id) |
| required_fields = [ |
| "Associated Deliverable", "Seller Name", "Seller Address", "Seller Phone Number", |
| "Buyer Name", "Buyer Address", "Transaction Date", "Total Payment Amount", "Expense Description" |
| ] |
| for field in required_fields: |
| if not form_data.get(field) or form_data.get(field) == "None": |
| return gr.update(value="❌ กรุณากรอกข้อมูลในทุกช่องให้ครบถ้วน", visible=True) |
| save_expense_to_db(form_data) |
| return gr.update(value="✅ บันทึกค่าใช้จ่ายเรียบร้อยแล้ว", visible=True) |
|
|
| _chat_prefill = [ |
| {"role": "assistant", "content": "สวัสดีค่ะ ดิฉันชื่อ เอมมี่ ค่ะ ยินดีช่วยคุณเรื่องการนำส่งรายการค่าใช้จ่ายค่ะ คุณสามารถเริ่มต้นด้วยการอัพโหลดรูปภาพของบิล 1 รูปค่ะ"} |
| ] |
|
|
| def chat_handler(message,history,session_id): |
| |
| logger.info(f"Received message: {message}") |
| if len(message['files']) > 0: |
| response = user_input_handler(str(message['files'][0]), session_id) |
| elif message['text'] != "": |
| try: |
| response = user_input_handler(message['text'], session_id) |
| except Exception as e: |
| raise e |
| form_information = get_form_info(session_id) |
| update_form_information_handler(form_information) |
| return response, form_information |
|
|
| def create_expense_register_page(): |
| with gr.Blocks() as page: |
| session_id = gr.State("") |
| form_information = gr.State({}) |
|
|
| with gr.Row(): |
| session_id_display = gr.Markdown("Session ID: Loading...") |
|
|
| new_btn = gr.Button("เริ่มใหม่", variant="primary") |
| with gr.Tabs(): |
| with gr.Tab("แชท"): |
| chat_interface = gr.ChatInterface( |
| fn=chat_handler, |
| type="messages", |
| chatbot=gr.Chatbot(value=_chat_prefill, type="messages", label="กล่องข้อความ"), |
| multimodal=True, |
| submit_btn="ส่งข้อความ", |
| autofocus=True, |
| examples=None, |
| additional_inputs=[session_id], |
| additional_outputs=[form_information] |
| ) |
|
|
| with gr.Tab("ฟอร์ม"): |
| gr.Markdown("**การส่งมอบงาน**") |
| deliverable_dropdown = gr.Dropdown(choices=get_deliverable_titles(), filterable=False, container=False) |
| gr.Markdown("**ชื่อผู้ขาย**") |
| seller_name_input = gr.Textbox(value=ast.literal_eval(session_form_data[session_id]["Seller Name"]), label="", container=False) |
| gr.Markdown("**ที่อยู่ผู้ขาย**") |
| seller_address_input = gr.Textbox(value=ast.literal_eval(session_form_data[session_id]["Seller Address"]), label="", container=False) |
| gr.Markdown("**เบอร์โทรศัพท์ผู้ขาย**") |
| seller_phone_number_input = gr.Textbox(value=ast.literal_eval(session_form_data[session_id]["Seller Phone Number"]), label="", container=False) |
| gr.Markdown("**ชื่อผู้ซื้อ**") |
| buyer_name_input = gr.Textbox(value=ast.literal_eval(session_form_data[session_id]["Buyer Name"]), label="", container=False) |
| gr.Markdown("**ที่อยู่ผู้ซื้อ**") |
| buyer_address_input = gr.Textbox(value=ast.literal_eval(session_form_data[session_id]["Buyer Address"]), label="", container=False) |
| gr.Markdown("**วันที่ทำรายการ**") |
| transaction_date_input = gr.DateTime(include_time=False, type="datetime", show_label=False) |
| gr.Markdown("**ยอดชำระเงินทั้งหมด**") |
| with gr.Row(): |
| total_payment_amount_input = gr.Number(value=ast.literal_eval(session_form_data[session_id]['Total Payment Amount']), label="", container=False) |
| gr.Markdown("บาท") |
| gr.Markdown("**คำอธิบายค่าใช้จ่าย**") |
| expense_description_input = gr.Textbox(value=ast.literal_eval(session_form_data[session_id]["Expense Description"]), label="", container=False, lines=4) |
| |
| feedback_message = gr.Markdown("", visible=False) |
| submit_btn = gr.Button("บันทึกค่าใช้จ่าย", variant="primary") |
|
|
| deliverable_dropdown.change(update_field_input("Associated Deliverable"), inputs=[session_id, deliverable_dropdown], outputs=[form_information],queue=False) |
| seller_name_input.change(update_field_input("Seller Name"), inputs=[session_id, seller_name_input], outputs=[form_information],queue=False) |
| seller_address_input.change(update_field_input("Seller Address"), inputs=[session_id, seller_address_input], outputs=[form_information],queue=False) |
| seller_phone_number_input.change(update_field_input("Seller Phone Number"), inputs=[session_id, seller_phone_number_input], outputs=[form_information],queue=False) |
| buyer_name_input.change(update_field_input("Buyer Name"), inputs=[session_id, buyer_name_input], outputs=[form_information],queue=False) |
| buyer_address_input.change(update_field_input("Buyer Address"), inputs=[session_id, buyer_address_input], outputs=[form_information],queue=False) |
| transaction_date_input.change(update_field_input("Transaction Date"), inputs=[session_id, transaction_date_input], outputs=[form_information],queue=False) |
| total_payment_amount_input.change(update_field_input("Total Payment Amount"), inputs=[session_id, total_payment_amount_input], outputs=[form_information],queue=False) |
| expense_description_input.change(update_field_input("Expense Description"), inputs=[session_id, expense_description_input], outputs=[form_information],queue=False) |
|
|
| form_information.change( |
| update_form_information_handler, |
| inputs=[form_information], |
| outputs=[ |
| deliverable_dropdown, seller_name_input, seller_address_input, seller_phone_number_input, |
| buyer_name_input, buyer_address_input, transaction_date_input, |
| total_payment_amount_input, expense_description_input, |
| ], |
| queue=False, |
| ) |
| submit_btn.click(fn=handle_submit, inputs=[session_id], outputs=[feedback_message]) |
|
|
| def reset_chat(): |
| new_session_id, new_session_id_display = generate_session_id() |
| logger.info(f"Chat reset with new session ID: {new_session_id}") |
|
|
| |
| session_form_data[new_session_id] = { |
| "Associated Deliverable": "None", |
| "Seller Name": "", |
| "Seller Address": "", |
| "Seller Phone Number": "", |
| "Buyer Name": "", |
| "Buyer Address": "", |
| "Transaction Date": None, |
| "Total Payment Amount": None, |
| "Expense Description": "", |
| } |
|
|
| return ( |
| new_session_id, |
| new_session_id_display, |
| _chat_prefill, |
| session_form_data[new_session_id], |
| "None", |
| "", |
| "", |
| "", |
| "", |
| "", |
| None, |
| None, |
| "", |
| gr.update(visible=False, value="") |
| ) |
| |
| new_btn.click( |
| fn=reset_chat, |
| inputs=[], |
| outputs=[ |
| session_id, |
| session_id_display, |
| chat_interface.chatbot_value, |
| form_information, |
| deliverable_dropdown, |
| seller_name_input, |
| seller_address_input, |
| seller_phone_number_input, |
| buyer_name_input, |
| buyer_address_input, |
| transaction_date_input, |
| total_payment_amount_input, |
| expense_description_input, |
| feedback_message |
| ], |
| ) |
|
|
| page.load(generate_session_id, outputs=[session_id, session_id_display]) |
| return page |
|
|
| if __name__ == "__main__": |
| page = create_expense_register_page() |
| page.launch() |