ragline / main.py
AlanRex's picture
Update main.py
2ce0fd5 verified
raw
history blame
20.7 kB
import json, os, glob, pathlib, time, re
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status
from google import genai
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import MessageEvent, TextMessage, TextSendMessage
import PyPDF2
import logging
from datetime import datetime, timedelta
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# ============== 設定日誌 ==============
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============== Google Calendar 設定 ==============
SCOPES = ['https://www.googleapis.com/auth/calendar']
def get_calendar_service():
"""建立 Google Calendar 服務"""
try:
# 方法 1: 使用 Service Account(推薦用於伺服器)
service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_JSON')
if service_account_file:
# 從環境變數讀取 JSON 內容
service_account_info = json.loads(service_account_file)
credentials = service_account.Credentials.from_service_account_info(
service_account_info, scopes=SCOPES)
else:
logger.warning("未設定 Google Calendar 認證,預約將只儲存在本地")
return None
service = build('calendar', 'v3', credentials=credentials)
logger.info("Google Calendar 服務已建立")
return service
except Exception as e:
logger.error(f"建立 Google Calendar 服務失敗: {str(e)}")
return None
# 初始化 Calendar 服務
calendar_service = get_calendar_service()
CALENDAR_ID = os.getenv('GOOGLE_CALENDAR_ID', 'primary') # 使用主要日曆或指定 ID
def create_calendar_event(booking_data, booking_id):
"""在 Google Calendar 建立事件"""
if not calendar_service:
logger.warning("Calendar 服務未啟用,跳過建立事件")
return None
try:
# 解析日期和時間
date_str = booking_data['date']
time_str = booking_data['time']
# 嘗試解析時間範圍(例如:14:00-16:00 或 下午2點到4點)
time_patterns = [
r'(\d{1,2}):(\d{2})\s*[-~到至]\s*(\d{1,2}):(\d{2})', # 14:00-16:00
r'(\d{1,2})\s*點\s*[-~到至]\s*(\d{1,2})\s*點', # 2點到4點
]
start_time = "09:00"
end_time = "10:00"
for pattern in time_patterns:
match = re.search(pattern, time_str)
if match:
groups = match.groups()
if len(groups) == 4:
start_time = f"{int(groups[0]):02d}:{int(groups[1]):02d}"
end_time = f"{int(groups[2]):02d}:{int(groups[3]):02d}"
elif len(groups) == 2:
start_time = f"{int(groups[0]):02d}:00"
end_time = f"{int(groups[1]):02d}:00"
break
# 建立 ISO 格式的時間
start_datetime = f"{date_str}T{start_time}:00"
end_datetime = f"{date_str}T{end_time}:00"
# 建立事件
event = {
'summary': f'🎹 琴房預約 - {booking_data["room"]}',
'description': (
f'預約編號:{booking_id}\n'
f'琴房:{booking_data["room"]}\n'
f'人數:{booking_data["people"]}人\n'
f'狀態:待確認'
),
'start': {
'dateTime': start_datetime,
'timeZone': 'Asia/Taipei',
},
'end': {
'dateTime': end_datetime,
'timeZone': 'Asia/Taipei',
},
'colorId': '9', # 藍色
'reminders': {
'useDefault': False,
'overrides': [
{'method': 'popup', 'minutes': 60}, # 1小時前提醒
{'method': 'popup', 'minutes': 10}, # 10分鐘前提醒
],
},
}
# 插入事件
created_event = calendar_service.events().insert(
calendarId=CALENDAR_ID,
body=event
).execute()
logger.info(f"Google Calendar 事件已建立: {created_event.get('id')}")
return created_event.get('htmlLink')
except HttpError as error:
logger.error(f"Google Calendar API 錯誤: {error}")
return None
except Exception as e:
logger.error(f"建立 Calendar 事件失敗: {str(e)}")
return None
def check_time_slot_available(date_str, time_str, room):
"""檢查時段是否可用"""
if not calendar_service:
return True, None
try:
# 解析日期
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
# 查詢當天的所有事件
time_min = date_obj.isoformat() + 'Z'
time_max = (date_obj + timedelta(days=1)).isoformat() + 'Z'
events_result = calendar_service.events().list(
calendarId=CALENDAR_ID,
timeMin=time_min,
timeMax=time_max,
singleEvents=True,
orderBy='startTime'
).execute()
events = events_result.get('items', [])
# 檢查是否有衝突
conflicts = []
for event in events:
event_summary = event.get('summary', '')
if room in event_summary or '任意' in event_summary:
start = event['start'].get('dateTime', event['start'].get('date'))
conflicts.append(f"{event_summary} ({start})")
if conflicts:
return False, f"該時段已有預約:\n" + "\n".join(conflicts)
return True, None
except Exception as e:
logger.error(f"檢查時段可用性失敗: {str(e)}")
return True, None # 如果檢查失敗,允許預約
# ============== API 金鑰檢查 ==============
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if GOOGLE_API_KEY:
client = genai.Client(api_key=GOOGLE_API_KEY)
ai_enabled = True
logger.info("Gemini AI 已啟用")
else:
ai_enabled = False
logger.warning("未設定 GOOGLE_API_KEY,AI 問答功能已停用")
# ============== 讀取 PDF 內容 ==============
files = glob.glob('docs/*.pdf')
pdf_content = ''
if not files:
logger.warning("警告:docs/ 資料夾中找不到 PDF 檔案")
else:
logger.info(f"找到 {len(files)} 個 PDF 檔案")
for filename in files:
try:
with open(filename, 'rb') as pdf_file:
pdf_reader = PyPDF2.PdfReader(pdf_file)
for i in range(len(pdf_reader.pages)):
page = pdf_reader.pages[i]
pdf_content += page.extract_text()
logger.info(f"成功讀取: {filename}")
except Exception as e:
logger.error(f"讀取 {filename} 失敗: {str(e)}")
MAX_CONTENT_LENGTH = 15000
if len(pdf_content) > MAX_CONTENT_LENGTH:
pdf_content = pdf_content[:MAX_CONTENT_LENGTH]
logger.info(f"PDF 內容總長度: {len(pdf_content)} 字元")
# ============== LINE Bot 設定 ==============
line_bot_api = LineBotApi(os.getenv("CHANNEL_ACCESS_TOKEN"))
line_handler = WebhookHandler(os.getenv("CHANNEL_SECRET"))
working_status = True
# ============== 請求速率控制 ==============
last_request_time = {}
REQUEST_COOLDOWN = 2
# ============== 預約系統 ==============
user_booking_state = {}
class BookingState:
IDLE = "idle"
ASKING_DATE = "asking_date"
ASKING_TIME = "asking_time"
ASKING_PEOPLE = "asking_people"
ASKING_ROOM = "asking_room"
CONFIRMING = "confirming"
AVAILABLE_ROOMS = ["A琴房", "B琴房", "C琴房", "D琴房", "任意"]
def init_user_booking(user_id):
user_booking_state[user_id] = {
"state": BookingState.IDLE,
"date": None,
"time": None,
"people": None,
"room": None,
"last_update": time.time()
}
def get_user_booking(user_id):
if user_id not in user_booking_state:
init_user_booking(user_id)
return user_booking_state[user_id]
def reset_booking(user_id):
init_user_booking(user_id)
def is_booking_keyword(text):
keywords = ["預約", "預定", "訂位", "訂房", "訂琴房", "借琴房", "租琴房", "我要預約", "想預約"]
return any(keyword in text for keyword in keywords)
def parse_date(date_str):
today = datetime.now()
if "今天" in date_str or "今日" in date_str:
return today.strftime("%Y-%m-%d"), None
elif "明天" in date_str or "明日" in date_str:
return (today + timedelta(days=1)).strftime("%Y-%m-%d"), None
elif "後天" in date_str:
return (today + timedelta(days=2)).strftime("%Y-%m-%d"), None
patterns = [
(r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', '%Y-%m-%d'),
(r'(\d{1,2})[/-](\d{1,2})', '%m-%d'),
]
for pattern, fmt in patterns:
match = re.search(pattern, date_str)
if match:
try:
if fmt == '%m-%d':
date = datetime.strptime(f"{today.year}-{match.group(0)}", f'%Y-{fmt}')
else:
date = datetime.strptime(match.group(0), fmt)
if date.date() < today.date():
return None, "❌ 日期不能是過去的時間,請重新輸入。"
return date.strftime("%Y-%m-%d"), None
except ValueError:
pass
return None, "❌ 日期格式不正確,請重新輸入。\n例如:明天、12/26、2025-12-30"
def validate_time(time_str):
time_keywords = ["點", "時", ":", "-", "到", "至", "~"]
if any(kw in time_str for kw in time_keywords):
return True, None
return False, "❌ 時段格式不正確。\n例如:09:00-11:00、下午2點到4點"
def parse_people(people_str):
numbers = re.findall(r'\d+', people_str)
if numbers:
count = int(numbers[0])
if 1 <= count <= 10:
return count, None
else:
return None, "❌ 人數必須在 1-10 人之間。"
return None, "❌ 請輸入有效的人數。\n例如:2人、3、5位"
def validate_room(room_str):
room_normalized = room_str.strip().upper()
for room in AVAILABLE_ROOMS:
if room.upper() in room_normalized or room_normalized in room.upper():
return room, None
return None, f"❌ 請選擇有效的琴房:{', '.join(AVAILABLE_ROOMS)}"
def save_booking_to_file(user_id, booking_data):
bookings_dir = pathlib.Path("bookings")
bookings_dir.mkdir(exist_ok=True)
booking_file = bookings_dir / "bookings.json"
if booking_file.exists():
with open(booking_file, 'r', encoding='utf-8') as f:
try:
bookings = json.load(f)
except json.JSONDecodeError:
bookings = []
else:
bookings = []
booking_id = f"BK{int(time.time())}"
booking_record = {
"booking_id": booking_id,
"user_id": user_id,
"date": booking_data["date"],
"time": booking_data["time"],
"people": booking_data["people"],
"room": booking_data["room"],
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"status": "pending",
"calendar_link": None
}
# 建立 Google Calendar 事件
calendar_link = create_calendar_event(booking_data, booking_id)
if calendar_link:
booking_record["calendar_link"] = calendar_link
logger.info(f"預約已同步至 Google Calendar: {booking_id}")
bookings.append(booking_record)
with open(booking_file, 'w', encoding='utf-8') as f:
json.dump(bookings, f, ensure_ascii=False, indent=2)
logger.info(f"預約已儲存: {booking_id}")
return booking_id, calendar_link
def handle_booking_flow(user_id, user_message):
booking = get_user_booking(user_id)
current_state = booking["state"]
if time.time() - booking["last_update"] > 600:
reset_booking(user_id)
booking = get_user_booking(user_id)
current_state = booking["state"]
booking["last_update"] = time.time()
if user_message in ["取消", "取消預約", "重來", "重新開始"]:
reset_booking(user_id)
return "✅ 已取消預約流程。\n\n如需重新預約,請輸入「預約」。"
if current_state == BookingState.IDLE:
if is_booking_keyword(user_message):
booking["state"] = BookingState.ASKING_DATE
return (
"🎹 歡迎使用琴房預約系統!\n\n"
"📅 請問您想要預約哪一天呢?\n"
"例如:明天、12/26、2025/12/30\n\n"
"💡 隨時輸入「取消」可取消預約"
)
else:
return None
elif current_state == BookingState.ASKING_DATE:
parsed_date, error = parse_date(user_message)
if error:
return error
booking["date"] = parsed_date
booking["state"] = BookingState.ASKING_TIME
return (
f"✅ 預約日期:{parsed_date}\n\n"
f"⏰ 請問您想要預約哪個時段呢?\n"
f"例如:09:00-11:00、下午2點到4點"
)
elif current_state == BookingState.ASKING_TIME:
is_valid, error = validate_time(user_message)
if not is_valid:
return error
booking["time"] = user_message
booking["state"] = BookingState.ASKING_PEOPLE
return (
f"✅ 預約時段:{user_message}\n\n"
f"👥 請問有幾位使用呢?\n"
f"例如:1人、2人"
)
elif current_state == BookingState.ASKING_PEOPLE:
people_count, error = parse_people(user_message)
if error:
return error
booking["people"] = people_count
booking["state"] = BookingState.ASKING_ROOM
return (
f"✅ 使用人數:{people_count}人\n\n"
f"🎹 請問您想使用哪個琴房呢?\n"
f"可選擇:{', '.join(AVAILABLE_ROOMS)}"
)
elif current_state == BookingState.ASKING_ROOM:
room, error = validate_room(user_message)
if error:
return error
# 檢查時段是否可用
available, conflict_msg = check_time_slot_available(
booking["date"],
booking["time"],
room
)
if not available:
return f"⚠️ {conflict_msg}\n\n請重新選擇琴房或輸入「取消」重新預約。"
booking["room"] = room
booking["state"] = BookingState.CONFIRMING
summary = (
f"📋 請確認您的預約資訊:\n"
f"{'='*25}\n"
f"📅 日期:{booking['date']}\n"
f"⏰ 時段:{booking['time']}\n"
f"👥 人數:{booking['people']}人\n"
f"🎹 琴房:{booking['room']}\n"
f"{'='*25}\n\n"
f"✅ 請輸入「確認」送出預約\n"
f"❌ 輸入「取消」重新填寫"
)
return summary
elif current_state == BookingState.CONFIRMING:
if user_message in ["確認", "確定", "送出", "ok", "OK"]:
try:
booking_id, calendar_link = save_booking_to_file(user_id, booking)
summary = (
f"🎉 預約成功!\n"
f"{'='*25}\n"
f"📅 日期:{booking['date']}\n"
f"⏰ 時段:{booking['time']}\n"
f"👥 人數:{booking['people']}人\n"
f"🎹 琴房:{booking['room']}\n"
f"📝 預約編號:{booking_id}\n"
)
if calendar_link:
summary += f"📆 已加入 Google Calendar\n"
summary += (
f"{'='*25}\n\n"
f"✅ 我們已收到您的預約!\n"
f"服務專員將盡快與您聯繫確認。"
)
reset_booking(user_id)
return summary
except Exception as e:
logger.error(f"儲存預約失敗: {str(e)}")
reset_booking(user_id)
return "❌ 預約儲存失敗,請稍後再試。"
else:
return "請輸入「確認」來完成預約,或輸入「取消」重新開始。"
return None
# ============== FastAPI 應用 ==============
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def root():
return {
"title": "Line Bot - 琴房預約系統",
"status": "running",
"version": "3.0",
"features": {
"booking": True,
"google_calendar": calendar_service is not None,
"ai_qa": ai_enabled,
"pdf_loaded": len(pdf_content) > 0
}
}
@app.get("/bookings")
def get_bookings():
bookings_file = pathlib.Path("bookings/bookings.json")
if bookings_file.exists():
with open(bookings_file, 'r', encoding='utf-8') as f:
bookings = json.load(f)
return {"total": len(bookings), "bookings": bookings}
return {"total": 0, "bookings": []}
@app.post("/webhook")
async def webhook(
request: Request,
background_tasks: BackgroundTasks,
x_line_signature=Header(None),
):
body = await request.body()
try:
background_tasks.add_task(
line_handler.handle, body.decode("utf-8"), x_line_signature
)
except InvalidSignatureError:
raise HTTPException(status_code=400, detail="Invalid signature")
return "ok"
@line_handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
user_message = event.message.text.strip()
user_id = event.source.user_id
if user_message == "再見":
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text="👋 再見!期待下次為您服務!")
)
return
# 速率限制
current_time = time.time()
if user_id in last_request_time:
elapsed = current_time - last_request_time[user_id]
if elapsed < REQUEST_COOLDOWN:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=f"⏰ 請稍等 {REQUEST_COOLDOWN - int(elapsed)} 秒")
)
return
last_request_time[user_id] = current_time
# 處理預約流程
booking_response = handle_booking_flow(user_id, user_message)
if booking_response:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=booking_response)
)
return
# AI 問答
if ai_enabled:
try:
if pdf_content:
full_prompt = f"""參考以下資料回答:
{pdf_content}
問題:{user_message}
請簡潔回答。"""
else:
full_prompt = user_message
response = client.models.generate_content(
model="gemini-1.5-flash",
contents=full_prompt
)
out = response.text if response and response.text else "抱歉,我無法回答這個問題。"
except Exception as e:
logger.error(f"AI 錯誤: {str(e)}")
out = "系統忙碌中,請稍後再試。"
else:
out = "AI 問答功能未啟用。如需預約請輸入「預約」。"
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=out)
)
if __name__ == "__main__":
import uvicorn
logger.info("="*50)
logger.info("啟動琴房預約系統 with Google Calendar")
logger.info(f"Google Calendar: {'已啟用' if calendar_service else '未啟用'}")
logger.info(f"AI 問答: {'已啟用' if ai_enabled else '未啟用'}")
logger.info("="*50)
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)