ohytic6's picture
Update app.py
63249a5 verified
import gradio as gr
import openai
import speech_recognition as sr
import os
import firebase_admin
from firebase_admin import credentials, firestore
import json
import pandas as pd
import plotly.express as px
from datetime import datetime
import re
from transformers import AutoModel, AutoTokenizer, AutoModelForCausalLM, pipeline
from peft import PeftModel, PeftConfig
import faiss
import torch
# 正確設定您的 OpenAI API 金鑰
#openai.api_key = os.getenv('OPENAI_API_KEY')
# 初始化 Firebase
# 從環境變數中讀取 JSON 字符串
service_account_info = json.loads(os.getenv("FIREBASE_SERVICE_ACCOUNT_KEY"))
# 使用 JSON 字符串初始化 Firebase
cred = credentials.Certificate(service_account_info)
firebase_admin.initialize_app(cred)
db = firestore.client()
# 加載 LLaMA2 來處理請假請求
MODEL_NAME = "meta-llama/Llama-2-7b-chat-hf"
TOKEN = os.getenv("HUGGINGFACE_TOKEN") # 從環境變數讀取 Token
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_auth_token=TOKEN)
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, use_auth_token=TOKEN, torch_dtype="auto", device_map="auto")
text_generator = pipeline("text-generation", model=model, tokenizer=tokenizer)
# 加載嵌入模型,用於 FAISS 檢索
embedding_model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
embedding_tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
# 初始化 FAISS,並索引靜態請假規則
def initialize_rag():
index = faiss.IndexFlatL2(384) # all-MiniLM-L6-v2 的向量維度是 384
policies = [
"病假最多 7 天",
"員工每月有 5 天事假",
"超過 3 天病假需要醫療證明"
]
for policy in policies:
encoded_input = embedding_tokenizer(policy, return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
embedding = embedding_model(**encoded_input).last_hidden_state.mean(dim=1).numpy()
index.add(embedding)
return index, policies
rag_index, policies = initialize_rag()
# FAISS 檢索請假規則
def search_leave_policy(query):
encoded_input = embedding_tokenizer(query, return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
query_embedding = embedding_model(**encoded_input).last_hidden_state.mean(dim=1).numpy()
D, I = rag_index.search(query_embedding, k=3) # 取前三條最接近的規則
retrieved_docs = [policies[i] for i in I[0] if i >= 0] # 過濾掉無效結果
return retrieved_docs
# 初始化全局變量來保存請假資訊
leave_info = {
"姓名": "",
"員工編號": "",
"請假類別": "",
"開始日期": "",
"天數": ""
}
# 預設開始日期為今天
default_start_date = datetime.now().strftime("%Y-%m-%d")
# 查詢特定員工的請假資訊
def get_employee_attendance(name, employee_id):
attendance_records = db.collection("employees").where("name", "==", name).where("employee_id", "==", employee_id).stream()
return [record.to_dict() for record in attendance_records]
# 檢查請假額度
def check_leave_quota(name, employee_id, requested_days):
records = get_employee_attendance(name, employee_id)
taken_leave_days = sum(1 for record in records if record["work_status"] == "未上班已請假")
absent_days = sum(1 for record in records if record["work_status"] == "未上班未請假")
leave_quota = 7
absent_quota = 5
if taken_leave_days + requested_days > leave_quota:
return "請假額度已超過規定,請洽人資"
elif absent_days > absent_quota:
return "未上班未請假額度已超過規定,請洽人資"
else:
return "請假額度正常"
# 提取登入資訊
def extract_login_information(text):
match = re.search(r"(.+?)(\d+)$", text.strip())
if match:
leave_info["姓名"] = match.group(1).strip()
leave_info["員工編號"] = match.group(2).strip()
employee_records = get_employee_attendance(leave_info["姓名"], leave_info["員工編號"])
if not employee_records:
reset_login_info()
return "員工資訊錯誤,不予通過。", "登入失敗"
return f"請假人: {leave_info['姓名']}\n員工編號: {leave_info['員工編號']}", "登入資訊已完成。"
else:
reset_login_info()
return "無法識別姓名和員工編號,請確認格式或重新輸入。", ""
# 生成 TARS 風格的回應
def generate_tars_response(input_text):
prompt = f"你是電影《Interstellar》中的 TARS,一位具有幽默、冷靜且專業的人資助理。\n{input_text}"
return text_generator(prompt, max_length=100, do_sample=True)[0]["generated_text"]
def extract_leave_status_information(text):
# 第一步:提取清晰的資訊
retrieved_docs = search_leave_policy(text)
input_text = f"使用者請求: {text}\n相關規則: {retrieved_docs}\n請提取請假資訊。"
response = text_generator(input_text, max_length=100, do_sample=True)[0]["generated_text"]
# 第二步:手動提取資料
leave_types = ["婚假", "喪假", "病假", "公假", "事假", "產假", "不確定假別"]
leave_type_detected = "請假種類未定,將由人工審查"
# 檢查請假類別是否精確匹配
for leave_type in leave_types:
if leave_type in response:
leave_type_detected = leave_type
break
start_date_match = re.search(r"開始日期: (\d{4}-\d{2}-\d{2})", response)
duration_match = re.search(r"天數: (\d+)", response)
leave_info["請假類別"] = leave_type_detected
leave_info["開始日期"] = start_date_match.group(1) if start_date_match else default_start_date
leave_info["天數"] = duration_match.group(1) if duration_match else "未指定"
return f"請假類別: {leave_info['請假類別']}\n開始日期: {leave_info['開始日期']}\n天數: {leave_info['天數']}", "請假狀態已完成。"
# 生成 TARS 風格回應
def generate_tars_response(input_text):
prompt = f"你是電影《Interstellar》中的 TARS,一位具有幽默、冷靜且專業的人資助理。\n{input_text}"
return text_generator(prompt, max_length=100, do_sample=True)[0]["generated_text"]
def fetch_pending_requests():
pending_records = db.collection("pending_requests").stream()
return [record.to_dict() for record in pending_records]
def display_pending_requests():
records = fetch_pending_requests()
if not records:
# 返回空的 DataFrame,而不是文字訊息
return pd.DataFrame(columns=["name", "employee_id", "leave_type", "start_date", "duration", "request_date", "status"])
return pd.DataFrame(records)
# 重置登入資訊
def reset_login_info():
leave_info["姓名"] = ""
leave_info["員工編號"] = ""
# 重置請假狀態資訊
def reset_leave_status_info():
leave_info["請假類別"] = ""
leave_info["開始日期"] = ""
leave_info["天數"] = ""
# 處理登入和請假狀態的函數
def transcribe_and_process_login(audio, text_input):
global login_completed
try:
if audio is not None:
recognizer = sr.Recognizer()
with sr.AudioFile(audio) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data, language="zh-TW")
elif text_input:
text = text_input
else:
return "請輸入或錄製登入資訊", ""
user_text, assistant_response = extract_login_information(text)
login_completed = (assistant_response == "登入資訊已完成。")
return user_text, assistant_response
except Exception as e:
return f"語音識別失敗: {str(e)}", ""
def transcribe_and_process_leave_status(audio, text_input):
global leave_completed
try:
if audio is not None:
recognizer = sr.Recognizer()
with sr.AudioFile(audio) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data, language="zh-TW")
elif text_input:
text = text_input
else:
return "請輸入或錄製請假狀態資訊", ""
user_text, assistant_response = extract_leave_status_information(text)
leave_completed = (assistant_response == "請假狀態已完成。")
return user_text, assistant_response
except Exception as e:
return f"語音識別失敗: {str(e)}", ""
# 全局變數
login_completed = False
leave_completed = False
# 修改 check_leave_completion 函數
def check_leave_completion():
if leave_info["請假類別"] != "不確定假別" and leave_info["開始日期"] and leave_info["天數"] != "未指定":
requested_days = int(leave_info["天數"])
quota_status = check_leave_quota(leave_info["姓名"], leave_info["員工編號"], requested_days)
if quota_status == "請假額度正常":
return "請假流程完成!"
else:
return quota_status
return "請假有誤或未完成,請洽人資"
# Firebase 操作整合至管理者頁面
def fetch_attendance_data():
attendance_records = db.collection("employees").stream()
return pd.DataFrame([record.to_dict() for record in attendance_records])
def generate_leave_trend_plot():
attendance_df = fetch_attendance_data()
leave_counts = attendance_df[attendance_df['work_status'] == "未上班已請假"].groupby('name').size().reset_index(name='請假天數')
fig = px.bar(leave_counts, x='name', y='請假天數', title="每位員工的請假天數")
fig.update_layout(xaxis_title="姓名", yaxis_title="請假天數")
return fig
def generate_leave_type_trend():
attendance_df = fetch_attendance_data()
daily_leave_trend = attendance_df[attendance_df['work_status'] == "未上班已請假"].groupby('date').size().reset_index(name='請假數')
fig = px.line(daily_leave_trend, x='date', y='請假數', title="每日請假趨勢")
fig.update_layout(xaxis_title="日期", yaxis_title="請假數")
return fig
def admin_view():
leave_trend_plot = generate_leave_trend_plot()
leave_type_trend = generate_leave_type_trend()
return leave_trend_plot, leave_type_trend
# 設置 Gradio 介面
with gr.Blocks() as demo:
with gr.Tab("請假申請系統"):
with gr.Row(elem_id="title-container"):
gr.Image("advantech_logo.png", show_label=False, elem_id="company-logo", scale=0.15)
gr.Markdown("## TARS 彥安科技請假小助手")
with gr.Row():
with gr.Column():
gr.Markdown("### 登入資訊")
login_audio = gr.Audio(type="filepath", label="錄製登入資訊(姓名、員工編號)")
login_text_input = gr.Textbox(label="直接輸入登入資訊", placeholder="輸入姓名及員工編號")
login_user_text = gr.Textbox(label="登入狀況", interactive=False)
login_assistant_response = gr.Textbox(label="登入回報", interactive=False)
login_submit_btn = gr.Button("提交登入資訊")
with gr.Column():
gr.Markdown("### 請假狀態")
leave_audio = gr.Audio(type="filepath", label="錄製請假狀態")
leave_text_input = gr.Textbox(label="直接輸入請假資訊", placeholder="描述請假類別等")
leave_user_text = gr.Textbox(label="請假狀況", interactive=False)
leave_assistant_response = gr.Textbox(label="請假回報", interactive=False)
leave_submit_btn = gr.Button("提交請假狀態")
# 登入和請假提交按鈕
login_submit_btn.click(
transcribe_and_process_login,
inputs=[login_audio, login_text_input],
outputs=[login_user_text, login_assistant_response]
)
leave_submit_btn.click(
transcribe_and_process_leave_status,
inputs=[leave_audio, leave_text_input],
outputs=[leave_user_text, leave_assistant_response]
)
# 檢查請假流程的完成情況
gr.Button("檢查請假流程").click(
check_leave_completion,
inputs=None,
outputs=gr.Textbox(label="最終回報", interactive=False)
)
# 新增管理者頁面
with gr.Tab("管理者頁面"):
gr.Markdown("## 假期數據分析 - 管理者專用頁面")
# 顯示待批准的請假申請
pending_requests_display = gr.Dataframe(label="待批准請假申請")
show_pending_requests_btn = gr.Button("顯示待批准請假申請")
show_pending_requests_btn.click(display_pending_requests, outputs=pending_requests_display)
# 圖表生成
leave_trend_plot_output = gr.Plot(label="每位員工的請假天數趨勢圖")
leave_type_trend_output = gr.Plot(label="每日請假趨勢")
generate_plots_btn = gr.Button("生成假期數據趨勢圖")
generate_plots_btn.click(admin_view, outputs=[leave_trend_plot_output, leave_type_trend_output])
demo.launch()