| import gradio as gr |
| import openai |
| import speech_recognition as sr |
| import os |
| import firebase_admin |
| from firebase_admin import credentials, firestore |
| import json |
| import pandas as pd |
| import plotly.express as px |
| from datetime import datetime |
| import re |
| from transformers import AutoModel, AutoTokenizer, AutoModelForCausalLM, pipeline |
| from peft import PeftModel, PeftConfig |
| import faiss |
| import torch |
|
|
| |
| |
|
|
| |
|
|
| |
| service_account_info = json.loads(os.getenv("FIREBASE_SERVICE_ACCOUNT_KEY")) |
|
|
| |
| cred = credentials.Certificate(service_account_info) |
| firebase_admin.initialize_app(cred) |
| db = firestore.client() |
|
|
| |
| MODEL_NAME = "meta-llama/Llama-2-7b-chat-hf" |
| TOKEN = os.getenv("HUGGINGFACE_TOKEN") |
|
|
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_auth_token=TOKEN) |
| model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, use_auth_token=TOKEN, torch_dtype="auto", device_map="auto") |
| text_generator = pipeline("text-generation", model=model, tokenizer=tokenizer) |
|
|
| |
| embedding_model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") |
| embedding_tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") |
|
|
| |
| def initialize_rag(): |
| index = faiss.IndexFlatL2(384) |
| policies = [ |
| "病假最多 7 天", |
| "員工每月有 5 天事假", |
| "超過 3 天病假需要醫療證明" |
| ] |
|
|
| for policy in policies: |
| encoded_input = embedding_tokenizer(policy, return_tensors="pt", padding=True, truncation=True) |
| with torch.no_grad(): |
| embedding = embedding_model(**encoded_input).last_hidden_state.mean(dim=1).numpy() |
| index.add(embedding) |
|
|
| return index, policies |
|
|
| rag_index, policies = initialize_rag() |
|
|
| |
| def search_leave_policy(query): |
| encoded_input = embedding_tokenizer(query, return_tensors="pt", padding=True, truncation=True) |
| with torch.no_grad(): |
| query_embedding = embedding_model(**encoded_input).last_hidden_state.mean(dim=1).numpy() |
|
|
| D, I = rag_index.search(query_embedding, k=3) |
| retrieved_docs = [policies[i] for i in I[0] if i >= 0] |
| return retrieved_docs |
|
|
| |
| leave_info = { |
| "姓名": "", |
| "員工編號": "", |
| "請假類別": "", |
| "開始日期": "", |
| "天數": "" |
| } |
|
|
| |
| default_start_date = datetime.now().strftime("%Y-%m-%d") |
|
|
| |
| def get_employee_attendance(name, employee_id): |
| attendance_records = db.collection("employees").where("name", "==", name).where("employee_id", "==", employee_id).stream() |
| return [record.to_dict() for record in attendance_records] |
|
|
| |
| def check_leave_quota(name, employee_id, requested_days): |
| records = get_employee_attendance(name, employee_id) |
| taken_leave_days = sum(1 for record in records if record["work_status"] == "未上班已請假") |
| absent_days = sum(1 for record in records if record["work_status"] == "未上班未請假") |
|
|
| leave_quota = 7 |
| absent_quota = 5 |
| |
| if taken_leave_days + requested_days > leave_quota: |
| return "請假額度已超過規定,請洽人資" |
| elif absent_days > absent_quota: |
| return "未上班未請假額度已超過規定,請洽人資" |
| else: |
| return "請假額度正常" |
|
|
| |
| def extract_login_information(text): |
| match = re.search(r"(.+?)(\d+)$", text.strip()) |
| if match: |
| leave_info["姓名"] = match.group(1).strip() |
| leave_info["員工編號"] = match.group(2).strip() |
| |
| employee_records = get_employee_attendance(leave_info["姓名"], leave_info["員工編號"]) |
| if not employee_records: |
| reset_login_info() |
| return "員工資訊錯誤,不予通過。", "登入失敗" |
| |
| return f"請假人: {leave_info['姓名']}\n員工編號: {leave_info['員工編號']}", "登入資訊已完成。" |
| else: |
| reset_login_info() |
| return "無法識別姓名和員工編號,請確認格式或重新輸入。", "" |
|
|
| |
| def generate_tars_response(input_text): |
| prompt = f"你是電影《Interstellar》中的 TARS,一位具有幽默、冷靜且專業的人資助理。\n{input_text}" |
| return text_generator(prompt, max_length=100, do_sample=True)[0]["generated_text"] |
|
|
| def extract_leave_status_information(text): |
| |
| retrieved_docs = search_leave_policy(text) |
|
|
| input_text = f"使用者請求: {text}\n相關規則: {retrieved_docs}\n請提取請假資訊。" |
| response = text_generator(input_text, max_length=100, do_sample=True)[0]["generated_text"] |
| |
| |
| leave_types = ["婚假", "喪假", "病假", "公假", "事假", "產假", "不確定假別"] |
| leave_type_detected = "請假種類未定,將由人工審查" |
| |
| |
| for leave_type in leave_types: |
| if leave_type in response: |
| leave_type_detected = leave_type |
| break |
|
|
| start_date_match = re.search(r"開始日期: (\d{4}-\d{2}-\d{2})", response) |
| duration_match = re.search(r"天數: (\d+)", response) |
| |
| leave_info["請假類別"] = leave_type_detected |
| leave_info["開始日期"] = start_date_match.group(1) if start_date_match else default_start_date |
| leave_info["天數"] = duration_match.group(1) if duration_match else "未指定" |
|
|
| return f"請假類別: {leave_info['請假類別']}\n開始日期: {leave_info['開始日期']}\n天數: {leave_info['天數']}", "請假狀態已完成。" |
|
|
| |
| def generate_tars_response(input_text): |
| prompt = f"你是電影《Interstellar》中的 TARS,一位具有幽默、冷靜且專業的人資助理。\n{input_text}" |
| return text_generator(prompt, max_length=100, do_sample=True)[0]["generated_text"] |
|
|
| def fetch_pending_requests(): |
|
|
| pending_records = db.collection("pending_requests").stream() |
| return [record.to_dict() for record in pending_records] |
|
|
| |
| def display_pending_requests(): |
|
|
| records = fetch_pending_requests() |
| if not records: |
| |
| return pd.DataFrame(columns=["name", "employee_id", "leave_type", "start_date", "duration", "request_date", "status"]) |
| return pd.DataFrame(records) |
|
|
|
|
| |
| def reset_login_info(): |
|
|
| leave_info["姓名"] = "" |
| leave_info["員工編號"] = "" |
|
|
| |
|
|
| def reset_leave_status_info(): |
|
|
| leave_info["請假類別"] = "" |
| leave_info["開始日期"] = "" |
| leave_info["天數"] = "" |
|
|
| |
|
|
| def transcribe_and_process_login(audio, text_input): |
|
|
| global login_completed |
|
|
| try: |
| if audio is not None: |
| recognizer = sr.Recognizer() |
| with sr.AudioFile(audio) as source: |
| audio_data = recognizer.record(source) |
| text = recognizer.recognize_google(audio_data, language="zh-TW") |
| elif text_input: |
| text = text_input |
| else: |
| return "請輸入或錄製登入資訊", "" |
|
|
| user_text, assistant_response = extract_login_information(text) |
| login_completed = (assistant_response == "登入資訊已完成。") |
| return user_text, assistant_response |
|
|
| except Exception as e: |
| return f"語音識別失敗: {str(e)}", "" |
|
|
| def transcribe_and_process_leave_status(audio, text_input): |
| global leave_completed |
|
|
| try: |
| if audio is not None: |
| recognizer = sr.Recognizer() |
| with sr.AudioFile(audio) as source: |
| audio_data = recognizer.record(source) |
| text = recognizer.recognize_google(audio_data, language="zh-TW") |
| elif text_input: |
| text = text_input |
| else: |
| return "請輸入或錄製請假狀態資訊", "" |
|
|
| user_text, assistant_response = extract_leave_status_information(text) |
| leave_completed = (assistant_response == "請假狀態已完成。") |
| return user_text, assistant_response |
|
|
| except Exception as e: |
|
|
| return f"語音識別失敗: {str(e)}", "" |
|
|
| |
| login_completed = False |
| leave_completed = False |
|
|
|
|
| |
| def check_leave_completion(): |
| if leave_info["請假類別"] != "不確定假別" and leave_info["開始日期"] and leave_info["天數"] != "未指定": |
| requested_days = int(leave_info["天數"]) |
| quota_status = check_leave_quota(leave_info["姓名"], leave_info["員工編號"], requested_days) |
| if quota_status == "請假額度正常": |
| return "請假流程完成!" |
| else: |
| return quota_status |
| return "請假有誤或未完成,請洽人資" |
|
|
| |
| |
| def fetch_attendance_data(): |
| attendance_records = db.collection("employees").stream() |
| return pd.DataFrame([record.to_dict() for record in attendance_records]) |
|
|
| def generate_leave_trend_plot(): |
| attendance_df = fetch_attendance_data() |
| leave_counts = attendance_df[attendance_df['work_status'] == "未上班已請假"].groupby('name').size().reset_index(name='請假天數') |
| fig = px.bar(leave_counts, x='name', y='請假天數', title="每位員工的請假天數") |
| fig.update_layout(xaxis_title="姓名", yaxis_title="請假天數") |
| return fig |
|
|
| def generate_leave_type_trend(): |
| attendance_df = fetch_attendance_data() |
| daily_leave_trend = attendance_df[attendance_df['work_status'] == "未上班已請假"].groupby('date').size().reset_index(name='請假數') |
| fig = px.line(daily_leave_trend, x='date', y='請假數', title="每日請假趨勢") |
| fig.update_layout(xaxis_title="日期", yaxis_title="請假數") |
| return fig |
|
|
| def admin_view(): |
| leave_trend_plot = generate_leave_trend_plot() |
| leave_type_trend = generate_leave_type_trend() |
| return leave_trend_plot, leave_type_trend |
| |
| |
| with gr.Blocks() as demo: |
|
|
| with gr.Tab("請假申請系統"): |
| with gr.Row(elem_id="title-container"): |
| gr.Image("advantech_logo.png", show_label=False, elem_id="company-logo", scale=0.15) |
| gr.Markdown("## TARS 彥安科技請假小助手") |
| |
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("### 登入資訊") |
| login_audio = gr.Audio(type="filepath", label="錄製登入資訊(姓名、員工編號)") |
| login_text_input = gr.Textbox(label="直接輸入登入資訊", placeholder="輸入姓名及員工編號") |
| login_user_text = gr.Textbox(label="登入狀況", interactive=False) |
| login_assistant_response = gr.Textbox(label="登入回報", interactive=False) |
| login_submit_btn = gr.Button("提交登入資訊") |
| |
| with gr.Column(): |
| gr.Markdown("### 請假狀態") |
| leave_audio = gr.Audio(type="filepath", label="錄製請假狀態") |
| leave_text_input = gr.Textbox(label="直接輸入請假資訊", placeholder="描述請假類別等") |
| leave_user_text = gr.Textbox(label="請假狀況", interactive=False) |
| leave_assistant_response = gr.Textbox(label="請假回報", interactive=False) |
| leave_submit_btn = gr.Button("提交請假狀態") |
| |
| |
| |
| login_submit_btn.click( |
| transcribe_and_process_login, |
| inputs=[login_audio, login_text_input], |
| outputs=[login_user_text, login_assistant_response] |
| ) |
| |
| leave_submit_btn.click( |
| transcribe_and_process_leave_status, |
| inputs=[leave_audio, leave_text_input], |
| outputs=[leave_user_text, leave_assistant_response] |
| ) |
| |
| |
| gr.Button("檢查請假流程").click( |
| check_leave_completion, |
| inputs=None, |
| outputs=gr.Textbox(label="最終回報", interactive=False) |
| ) |
| |
| |
| with gr.Tab("管理者頁面"): |
| gr.Markdown("## 假期數據分析 - 管理者專用頁面") |
|
|
| |
| pending_requests_display = gr.Dataframe(label="待批准請假申請") |
| show_pending_requests_btn = gr.Button("顯示待批准請假申請") |
| show_pending_requests_btn.click(display_pending_requests, outputs=pending_requests_display) |
|
|
| |
| leave_trend_plot_output = gr.Plot(label="每位員工的請假天數趨勢圖") |
| leave_type_trend_output = gr.Plot(label="每日請假趨勢") |
| generate_plots_btn = gr.Button("生成假期數據趨勢圖") |
| generate_plots_btn.click(admin_view, outputs=[leave_trend_plot_output, leave_type_trend_output]) |
|
|
| demo.launch() |
|
|