| import json, os, glob, pathlib, time, re |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status |
| from google import genai |
| from linebot import LineBotApi, WebhookHandler |
| from linebot.exceptions import InvalidSignatureError |
| from linebot.models import MessageEvent, TextMessage, TextSendMessage |
| import PyPDF2 |
| import logging |
| from datetime import datetime, timedelta |
| from google.oauth2.credentials import Credentials |
| from google.oauth2 import service_account |
| from googleapiclient.discovery import build |
| from googleapiclient.errors import HttpError |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| SCOPES = ['https://www.googleapis.com/auth/calendar'] |
|
|
| def get_calendar_service(): |
| """建立 Google Calendar 服務""" |
| try: |
| |
| service_account_file = os.getenv('GOOGLE_SERVICE_ACCOUNT_JSON') |
| |
| if service_account_file: |
| |
| service_account_info = json.loads(service_account_file) |
| credentials = service_account.Credentials.from_service_account_info( |
| service_account_info, scopes=SCOPES) |
| else: |
| logger.warning("未設定 Google Calendar 認證,預約將只儲存在本地") |
| return None |
| |
| service = build('calendar', 'v3', credentials=credentials) |
| logger.info("Google Calendar 服務已建立") |
| return service |
| except Exception as e: |
| logger.error(f"建立 Google Calendar 服務失敗: {str(e)}") |
| return None |
|
|
| |
| calendar_service = get_calendar_service() |
| CALENDAR_ID = os.getenv('GOOGLE_CALENDAR_ID', 'primary') |
|
|
| def create_calendar_event(booking_data, booking_id): |
| """在 Google Calendar 建立事件""" |
| if not calendar_service: |
| logger.warning("Calendar 服務未啟用,跳過建立事件") |
| return None |
| |
| try: |
| |
| date_str = booking_data['date'] |
| time_str = booking_data['time'] |
| |
| |
| time_patterns = [ |
| r'(\d{1,2}):(\d{2})\s*[-~到至]\s*(\d{1,2}):(\d{2})', |
| r'(\d{1,2})\s*點\s*[-~到至]\s*(\d{1,2})\s*點', |
| ] |
| |
| start_time = "09:00" |
| end_time = "10:00" |
| |
| for pattern in time_patterns: |
| match = re.search(pattern, time_str) |
| if match: |
| groups = match.groups() |
| if len(groups) == 4: |
| start_time = f"{int(groups[0]):02d}:{int(groups[1]):02d}" |
| end_time = f"{int(groups[2]):02d}:{int(groups[3]):02d}" |
| elif len(groups) == 2: |
| start_time = f"{int(groups[0]):02d}:00" |
| end_time = f"{int(groups[1]):02d}:00" |
| break |
| |
| |
| start_datetime = f"{date_str}T{start_time}:00" |
| end_datetime = f"{date_str}T{end_time}:00" |
| |
| |
| event = { |
| 'summary': f'🎹 琴房預約 - {booking_data["room"]}', |
| 'description': ( |
| f'預約編號:{booking_id}\n' |
| f'琴房:{booking_data["room"]}\n' |
| f'人數:{booking_data["people"]}人\n' |
| f'狀態:待確認' |
| ), |
| 'start': { |
| 'dateTime': start_datetime, |
| 'timeZone': 'Asia/Taipei', |
| }, |
| 'end': { |
| 'dateTime': end_datetime, |
| 'timeZone': 'Asia/Taipei', |
| }, |
| 'colorId': '9', |
| 'reminders': { |
| 'useDefault': False, |
| 'overrides': [ |
| {'method': 'popup', 'minutes': 60}, |
| {'method': 'popup', 'minutes': 10}, |
| ], |
| }, |
| } |
| |
| |
| created_event = calendar_service.events().insert( |
| calendarId=CALENDAR_ID, |
| body=event |
| ).execute() |
| |
| logger.info(f"Google Calendar 事件已建立: {created_event.get('id')}") |
| return created_event.get('htmlLink') |
| |
| except HttpError as error: |
| logger.error(f"Google Calendar API 錯誤: {error}") |
| return None |
| except Exception as e: |
| logger.error(f"建立 Calendar 事件失敗: {str(e)}") |
| return None |
|
|
| def check_time_slot_available(date_str, time_str, room): |
| """檢查時段是否可用""" |
| if not calendar_service: |
| return True, None |
| |
| try: |
| |
| date_obj = datetime.strptime(date_str, "%Y-%m-%d") |
| |
| |
| time_min = date_obj.isoformat() + 'Z' |
| time_max = (date_obj + timedelta(days=1)).isoformat() + 'Z' |
| |
| events_result = calendar_service.events().list( |
| calendarId=CALENDAR_ID, |
| timeMin=time_min, |
| timeMax=time_max, |
| singleEvents=True, |
| orderBy='startTime' |
| ).execute() |
| |
| events = events_result.get('items', []) |
| |
| |
| conflicts = [] |
| for event in events: |
| event_summary = event.get('summary', '') |
| if room in event_summary or '任意' in event_summary: |
| start = event['start'].get('dateTime', event['start'].get('date')) |
| conflicts.append(f"{event_summary} ({start})") |
| |
| if conflicts: |
| return False, f"該時段已有預約:\n" + "\n".join(conflicts) |
| |
| return True, None |
| |
| except Exception as e: |
| logger.error(f"檢查時段可用性失敗: {str(e)}") |
| return True, None |
|
|
| |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
| if GOOGLE_API_KEY: |
| client = genai.Client(api_key=GOOGLE_API_KEY) |
| ai_enabled = True |
| logger.info("Gemini AI 已啟用") |
| else: |
| ai_enabled = False |
| logger.warning("未設定 GOOGLE_API_KEY,AI 問答功能已停用") |
|
|
| |
| files = glob.glob('docs/*.pdf') |
| pdf_content = '' |
|
|
| if not files: |
| logger.warning("警告:docs/ 資料夾中找不到 PDF 檔案") |
| else: |
| logger.info(f"找到 {len(files)} 個 PDF 檔案") |
| for filename in files: |
| try: |
| with open(filename, 'rb') as pdf_file: |
| pdf_reader = PyPDF2.PdfReader(pdf_file) |
| for i in range(len(pdf_reader.pages)): |
| page = pdf_reader.pages[i] |
| pdf_content += page.extract_text() |
| logger.info(f"成功讀取: {filename}") |
| except Exception as e: |
| logger.error(f"讀取 {filename} 失敗: {str(e)}") |
|
|
| MAX_CONTENT_LENGTH = 15000 |
| if len(pdf_content) > MAX_CONTENT_LENGTH: |
| pdf_content = pdf_content[:MAX_CONTENT_LENGTH] |
|
|
| logger.info(f"PDF 內容總長度: {len(pdf_content)} 字元") |
|
|
| |
| line_bot_api = LineBotApi(os.getenv("CHANNEL_ACCESS_TOKEN")) |
| line_handler = WebhookHandler(os.getenv("CHANNEL_SECRET")) |
| working_status = True |
|
|
| |
| last_request_time = {} |
| REQUEST_COOLDOWN = 2 |
|
|
| |
| user_booking_state = {} |
|
|
| class BookingState: |
| IDLE = "idle" |
| ASKING_DATE = "asking_date" |
| ASKING_TIME = "asking_time" |
| ASKING_PEOPLE = "asking_people" |
| ASKING_ROOM = "asking_room" |
| CONFIRMING = "confirming" |
|
|
| AVAILABLE_ROOMS = ["A琴房", "B琴房", "C琴房", "D琴房", "任意"] |
|
|
| def init_user_booking(user_id): |
| user_booking_state[user_id] = { |
| "state": BookingState.IDLE, |
| "date": None, |
| "time": None, |
| "people": None, |
| "room": None, |
| "last_update": time.time() |
| } |
|
|
| def get_user_booking(user_id): |
| if user_id not in user_booking_state: |
| init_user_booking(user_id) |
| return user_booking_state[user_id] |
|
|
| def reset_booking(user_id): |
| init_user_booking(user_id) |
|
|
| def is_booking_keyword(text): |
| keywords = ["預約", "預定", "訂位", "訂房", "訂琴房", "借琴房", "租琴房", "我要預約", "想預約"] |
| return any(keyword in text for keyword in keywords) |
|
|
| def parse_date(date_str): |
| today = datetime.now() |
| |
| if "今天" in date_str or "今日" in date_str: |
| return today.strftime("%Y-%m-%d"), None |
| elif "明天" in date_str or "明日" in date_str: |
| return (today + timedelta(days=1)).strftime("%Y-%m-%d"), None |
| elif "後天" in date_str: |
| return (today + timedelta(days=2)).strftime("%Y-%m-%d"), None |
| |
| patterns = [ |
| (r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', '%Y-%m-%d'), |
| (r'(\d{1,2})[/-](\d{1,2})', '%m-%d'), |
| ] |
| |
| for pattern, fmt in patterns: |
| match = re.search(pattern, date_str) |
| if match: |
| try: |
| if fmt == '%m-%d': |
| date = datetime.strptime(f"{today.year}-{match.group(0)}", f'%Y-{fmt}') |
| else: |
| date = datetime.strptime(match.group(0), fmt) |
| |
| if date.date() < today.date(): |
| return None, "❌ 日期不能是過去的時間,請重新輸入。" |
| |
| return date.strftime("%Y-%m-%d"), None |
| except ValueError: |
| pass |
| |
| return None, "❌ 日期格式不正確,請重新輸入。\n例如:明天、12/26、2025-12-30" |
|
|
| def validate_time(time_str): |
| time_keywords = ["點", "時", ":", "-", "到", "至", "~"] |
| if any(kw in time_str for kw in time_keywords): |
| return True, None |
| return False, "❌ 時段格式不正確。\n例如:09:00-11:00、下午2點到4點" |
|
|
| def parse_people(people_str): |
| numbers = re.findall(r'\d+', people_str) |
| if numbers: |
| count = int(numbers[0]) |
| if 1 <= count <= 10: |
| return count, None |
| else: |
| return None, "❌ 人數必須在 1-10 人之間。" |
| return None, "❌ 請輸入有效的人數。\n例如:2人、3、5位" |
|
|
| def validate_room(room_str): |
| room_normalized = room_str.strip().upper() |
| |
| for room in AVAILABLE_ROOMS: |
| if room.upper() in room_normalized or room_normalized in room.upper(): |
| return room, None |
| |
| return None, f"❌ 請選擇有效的琴房:{', '.join(AVAILABLE_ROOMS)}" |
|
|
| def save_booking_to_file(user_id, booking_data): |
| bookings_dir = pathlib.Path("bookings") |
| bookings_dir.mkdir(exist_ok=True) |
| |
| booking_file = bookings_dir / "bookings.json" |
| |
| if booking_file.exists(): |
| with open(booking_file, 'r', encoding='utf-8') as f: |
| try: |
| bookings = json.load(f) |
| except json.JSONDecodeError: |
| bookings = [] |
| else: |
| bookings = [] |
| |
| booking_id = f"BK{int(time.time())}" |
| |
| booking_record = { |
| "booking_id": booking_id, |
| "user_id": user_id, |
| "date": booking_data["date"], |
| "time": booking_data["time"], |
| "people": booking_data["people"], |
| "room": booking_data["room"], |
| "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "status": "pending", |
| "calendar_link": None |
| } |
| |
| |
| calendar_link = create_calendar_event(booking_data, booking_id) |
| if calendar_link: |
| booking_record["calendar_link"] = calendar_link |
| logger.info(f"預約已同步至 Google Calendar: {booking_id}") |
| |
| bookings.append(booking_record) |
| |
| with open(booking_file, 'w', encoding='utf-8') as f: |
| json.dump(bookings, f, ensure_ascii=False, indent=2) |
| |
| logger.info(f"預約已儲存: {booking_id}") |
| return booking_id, calendar_link |
|
|
| def handle_booking_flow(user_id, user_message): |
| booking = get_user_booking(user_id) |
| current_state = booking["state"] |
| |
| if time.time() - booking["last_update"] > 600: |
| reset_booking(user_id) |
| booking = get_user_booking(user_id) |
| current_state = booking["state"] |
| |
| booking["last_update"] = time.time() |
| |
| if user_message in ["取消", "取消預約", "重來", "重新開始"]: |
| reset_booking(user_id) |
| return "✅ 已取消預約流程。\n\n如需重新預約,請輸入「預約」。" |
| |
| if current_state == BookingState.IDLE: |
| if is_booking_keyword(user_message): |
| booking["state"] = BookingState.ASKING_DATE |
| return ( |
| "🎹 歡迎使用琴房預約系統!\n\n" |
| "📅 請問您想要預約哪一天呢?\n" |
| "例如:明天、12/26、2025/12/30\n\n" |
| "💡 隨時輸入「取消」可取消預約" |
| ) |
| else: |
| return None |
| |
| elif current_state == BookingState.ASKING_DATE: |
| parsed_date, error = parse_date(user_message) |
| if error: |
| return error |
| booking["date"] = parsed_date |
| booking["state"] = BookingState.ASKING_TIME |
| return ( |
| f"✅ 預約日期:{parsed_date}\n\n" |
| f"⏰ 請問您想要預約哪個時段呢?\n" |
| f"例如:09:00-11:00、下午2點到4點" |
| ) |
| |
| elif current_state == BookingState.ASKING_TIME: |
| is_valid, error = validate_time(user_message) |
| if not is_valid: |
| return error |
| booking["time"] = user_message |
| booking["state"] = BookingState.ASKING_PEOPLE |
| return ( |
| f"✅ 預約時段:{user_message}\n\n" |
| f"👥 請問有幾位使用呢?\n" |
| f"例如:1人、2人" |
| ) |
| |
| elif current_state == BookingState.ASKING_PEOPLE: |
| people_count, error = parse_people(user_message) |
| if error: |
| return error |
| booking["people"] = people_count |
| booking["state"] = BookingState.ASKING_ROOM |
| return ( |
| f"✅ 使用人數:{people_count}人\n\n" |
| f"🎹 請問您想使用哪個琴房呢?\n" |
| f"可選擇:{', '.join(AVAILABLE_ROOMS)}" |
| ) |
| |
| elif current_state == BookingState.ASKING_ROOM: |
| room, error = validate_room(user_message) |
| if error: |
| return error |
| |
| |
| available, conflict_msg = check_time_slot_available( |
| booking["date"], |
| booking["time"], |
| room |
| ) |
| |
| if not available: |
| return f"⚠️ {conflict_msg}\n\n請重新選擇琴房或輸入「取消」重新預約。" |
| |
| booking["room"] = room |
| booking["state"] = BookingState.CONFIRMING |
| |
| summary = ( |
| f"📋 請確認您的預約資訊:\n" |
| f"{'='*25}\n" |
| f"📅 日期:{booking['date']}\n" |
| f"⏰ 時段:{booking['time']}\n" |
| f"👥 人數:{booking['people']}人\n" |
| f"🎹 琴房:{booking['room']}\n" |
| f"{'='*25}\n\n" |
| f"✅ 請輸入「確認」送出預約\n" |
| f"❌ 輸入「取消」重新填寫" |
| ) |
| return summary |
| |
| elif current_state == BookingState.CONFIRMING: |
| if user_message in ["確認", "確定", "送出", "ok", "OK"]: |
| try: |
| booking_id, calendar_link = save_booking_to_file(user_id, booking) |
| |
| summary = ( |
| f"🎉 預約成功!\n" |
| f"{'='*25}\n" |
| f"📅 日期:{booking['date']}\n" |
| f"⏰ 時段:{booking['time']}\n" |
| f"👥 人數:{booking['people']}人\n" |
| f"🎹 琴房:{booking['room']}\n" |
| f"📝 預約編號:{booking_id}\n" |
| ) |
| |
| if calendar_link: |
| summary += f"📆 已加入 Google Calendar\n" |
| |
| summary += ( |
| f"{'='*25}\n\n" |
| f"✅ 我們已收到您的預約!\n" |
| f"服務專員將盡快與您聯繫確認。" |
| ) |
| |
| reset_booking(user_id) |
| return summary |
| except Exception as e: |
| logger.error(f"儲存預約失敗: {str(e)}") |
| reset_booking(user_id) |
| return "❌ 預約儲存失敗,請稍後再試。" |
| else: |
| return "請輸入「確認」來完成預約,或輸入「取消」重新開始。" |
| |
| return None |
|
|
| |
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| @app.get("/") |
| def root(): |
| return { |
| "title": "Line Bot - 琴房預約系統", |
| "status": "running", |
| "version": "3.0", |
| "features": { |
| "booking": True, |
| "google_calendar": calendar_service is not None, |
| "ai_qa": ai_enabled, |
| "pdf_loaded": len(pdf_content) > 0 |
| } |
| } |
|
|
| @app.get("/bookings") |
| def get_bookings(): |
| bookings_file = pathlib.Path("bookings/bookings.json") |
| if bookings_file.exists(): |
| with open(bookings_file, 'r', encoding='utf-8') as f: |
| bookings = json.load(f) |
| return {"total": len(bookings), "bookings": bookings} |
| return {"total": 0, "bookings": []} |
|
|
| @app.post("/webhook") |
| async def webhook( |
| request: Request, |
| background_tasks: BackgroundTasks, |
| x_line_signature=Header(None), |
| ): |
| body = await request.body() |
| try: |
| background_tasks.add_task( |
| line_handler.handle, body.decode("utf-8"), x_line_signature |
| ) |
| except InvalidSignatureError: |
| raise HTTPException(status_code=400, detail="Invalid signature") |
| return "ok" |
|
|
| @line_handler.add(MessageEvent, message=TextMessage) |
| def handle_message(event): |
| user_message = event.message.text.strip() |
| user_id = event.source.user_id |
| |
| if user_message == "再見": |
| line_bot_api.reply_message( |
| event.reply_token, |
| TextSendMessage(text="👋 再見!期待下次為您服務!") |
| ) |
| return |
| |
| |
| current_time = time.time() |
| if user_id in last_request_time: |
| elapsed = current_time - last_request_time[user_id] |
| if elapsed < REQUEST_COOLDOWN: |
| line_bot_api.reply_message( |
| event.reply_token, |
| TextSendMessage(text=f"⏰ 請稍等 {REQUEST_COOLDOWN - int(elapsed)} 秒") |
| ) |
| return |
| |
| last_request_time[user_id] = current_time |
| |
| |
| booking_response = handle_booking_flow(user_id, user_message) |
| |
| if booking_response: |
| line_bot_api.reply_message( |
| event.reply_token, |
| TextSendMessage(text=booking_response) |
| ) |
| return |
| |
| |
| if ai_enabled: |
| try: |
| if pdf_content: |
| full_prompt = f"""參考以下資料回答: |
| |
| {pdf_content} |
| |
| 問題:{user_message} |
| |
| 請簡潔回答。""" |
| else: |
| full_prompt = user_message |
| |
| response = client.models.generate_content( |
| model="gemini-1.5-flash", |
| contents=full_prompt |
| ) |
| |
| out = response.text if response and response.text else "抱歉,我無法回答這個問題。" |
| |
| except Exception as e: |
| logger.error(f"AI 錯誤: {str(e)}") |
| out = "系統忙碌中,請稍後再試。" |
| else: |
| out = "AI 問答功能未啟用。如需預約請輸入「預約」。" |
| |
| line_bot_api.reply_message( |
| event.reply_token, |
| TextSendMessage(text=out) |
| ) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| logger.info("="*50) |
| logger.info("啟動琴房預約系統 with Google Calendar") |
| logger.info(f"Google Calendar: {'已啟用' if calendar_service else '未啟用'}") |
| logger.info(f"AI 問答: {'已啟用' if ai_enabled else '未啟用'}") |
| logger.info("="*50) |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |