import json, os, glob, pathlib, time, re from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, JSONResponse import google.generativeai as genai from linebot import LineBotApi, WebhookHandler from linebot.exceptions import InvalidSignatureError from linebot.models import MessageEvent, TextMessage, TextSendMessage import PyPDF2 import logging from datetime import datetime, timedelta # 嘗試導入 Google Calendar 相關套件 try: from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.errors import HttpError CALENDAR_AVAILABLE = True except ImportError: CALENDAR_AVAILABLE = False logging.warning("Google Calendar 套件未安裝") # ============== 設定日誌 ============== logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============== Google Calendar 設定 ============== SCOPES = ['https://www.googleapis.com/auth/calendar'] calendar_service = None if CALENDAR_AVAILABLE: def get_calendar_service(): try: service_account_json = os.getenv('GOOGLE_SERVICE_ACCOUNT_JSON') if service_account_json: service_account_info = json.loads(service_account_json) credentials = service_account.Credentials.from_service_account_info( service_account_info, scopes=SCOPES) service = build('calendar', 'v3', credentials=credentials) logger.info("✅ Google Calendar 服務已建立") return service else: logger.warning("⚠️ 未設定 GOOGLE_SERVICE_ACCOUNT_JSON") return None except json.JSONDecodeError as e: logger.error(f"❌ JSON 格式錯誤: {str(e)}") return None except Exception as e: logger.error(f"❌ 建立 Calendar 服務失敗: {str(e)}") return None calendar_service = get_calendar_service() CALENDAR_ID = os.getenv('GOOGLE_CALENDAR_ID', 'primary') def create_calendar_event(booking_data, booking_id): if not calendar_service: return None try: date_str = booking_data['date'] # 直接使用已解析的時間 start_time = booking_data.get('start_time', '09:00') end_time = booking_data.get('end_time', '10:00') # 建立 ISO 格式的時間 start_datetime = f"{date_str}T{start_time}:00" end_datetime = f"{date_str}T{end_time}:00" event = { 'summary': f'🎹 琴房預約 - {booking_data["room"]}', 'description': ( f'預約編號:{booking_id}\n' f'琴房:{booking_data["room"]}\n' f'人數:{booking_data["people"]}人\n' f'時段:{booking_data["time"]}\n' f'狀態:待確認' ), 'start': { 'dateTime': start_datetime, 'timeZone': 'Asia/Taipei', }, 'end': { 'dateTime': end_datetime, 'timeZone': 'Asia/Taipei', }, 'colorId': '9', 'reminders': { 'useDefault': False, 'overrides': [ {'method': 'popup', 'minutes': 60}, {'method': 'popup', 'minutes': 10}, ], }, } created_event = calendar_service.events().insert( calendarId=CALENDAR_ID, body=event ).execute() logger.info(f"✅ Google Calendar 事件已建立: {created_event.get('id')}") return created_event.get('htmlLink') except Exception as e: logger.error(f"❌ 建立 Calendar 事件失敗: {str(e)}") return None # ============== Gemini AI 設定 ============== GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") ai_enabled = False if GOOGLE_API_KEY: try: genai.configure(api_key=GOOGLE_API_KEY) ai_enabled = True logger.info("✅ Gemini AI 已啟用") except Exception as e: logger.error(f"❌ Gemini AI 初始化失敗: {str(e)}") else: logger.warning("⚠️ 未設定 GOOGLE_API_KEY") # ============== 讀取 PDF ============== files = glob.glob('docs/*.pdf') pdf_content = '' if files: logger.info(f"找到 {len(files)} 個 PDF 檔案") for filename in files: try: with open(filename, 'rb') as pdf_file: pdf_reader = PyPDF2.PdfReader(pdf_file) for page in pdf_reader.pages: pdf_content += page.extract_text() logger.info(f"✅ 成功讀取: {filename}") except Exception as e: logger.error(f"❌ 讀取 {filename} 失敗: {str(e)}") MAX_CONTENT_LENGTH = 15000 if len(pdf_content) > MAX_CONTENT_LENGTH: pdf_content = pdf_content[:MAX_CONTENT_LENGTH] # ============== LINE Bot 設定 ============== line_bot_api = LineBotApi(os.getenv("CHANNEL_ACCESS_TOKEN")) line_handler = WebhookHandler(os.getenv("CHANNEL_SECRET")) # ============== 預約系統 ============== user_booking_state = {} last_request_time = {} REQUEST_COOLDOWN = 2 class BookingState: IDLE = "idle" ASKING_DATE = "asking_date" ASKING_TIME = "asking_time" ASKING_PEOPLE = "asking_people" ASKING_ROOM = "asking_room" CONFIRMING = "confirming" AVAILABLE_ROOMS = ["A琴房", "B琴房", "C琴房", "D琴房", "任意"] def init_user_booking(user_id): user_booking_state[user_id] = { "state": BookingState.IDLE, "date": None, "time": None, "start_time": None, "end_time": None, "people": None, "room": None, "last_update": time.time() } def get_user_booking(user_id): if user_id not in user_booking_state: init_user_booking(user_id) return user_booking_state[user_id] def reset_booking(user_id): init_user_booking(user_id) def is_booking_keyword(text): keywords = ["預約", "預定", "訂位", "訂房", "訂琴房", "借琴房", "租琴房", "我要預約", "想預約"] return any(keyword in text for keyword in keywords) def parse_date(date_str): today = datetime.now() if "今天" in date_str or "今日" in date_str: return today.strftime("%Y-%m-%d"), None elif "明天" in date_str or "明日" in date_str: return (today + timedelta(days=1)).strftime("%Y-%m-%d"), None elif "後天" in date_str: return (today + timedelta(days=2)).strftime("%Y-%m-%d"), None patterns = [ (r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', '%Y-%m-%d'), (r'(\d{1,2})[/-](\d{1,2})', '%m-%d'), ] for pattern, fmt in patterns: match = re.search(pattern, date_str) if match: try: if fmt == '%m-%d': date = datetime.strptime(f"{today.year}-{match.group(0)}", f'%Y-{fmt}') else: date = datetime.strptime(match.group(0), fmt) if date.date() < today.date(): return None, "❌ 日期不能是過去的時間" return date.strftime("%Y-%m-%d"), None except ValueError: pass return None, "❌ 日期格式不正確\n例如:明天、2025-12-26" def parse_time_range(time_str): """解析並驗證時間範圍,返回標準格式""" # 移除空格 time_str = time_str.replace(" ", "") # 定義多種時間格式的正則表達式 patterns = [ # 標準格式:09:00-11:00, 9:00-11:00 (r'(\d{1,2}):(\d{2})[-~到至](\d{1,2}):(\d{2})', 'hh:mm-hh:mm'), # 4位數格式:0900-1100 (r'(\d{4})[-~到至](\d{4})', 'hhmm-hhmm'), # 中文格式:9點到11點, 下午2點到4點 (r'(?:上午|下午)?(\d{1,2})點(?:半)?[-~到至](?:上午|下午)?(\d{1,2})點(?:半)?', '中文'), # 混合格式:9:00到11:00 (r'(\d{1,2}):(\d{2})[-~到至](\d{1,2})', 'h:mm-h'), ] start_hour = None start_minute = None end_hour = None end_minute = None for pattern, format_type in patterns: match = re.search(pattern, time_str) if match: groups = match.groups() if format_type == 'hh:mm-hh:mm': # 09:00-11:00 start_hour = int(groups[0]) start_minute = int(groups[1]) end_hour = int(groups[2]) end_minute = int(groups[3]) elif format_type == 'hhmm-hhmm': # 0900-1100 start_str = groups[0] end_str = groups[1] if len(start_str) == 4 and len(end_str) == 4: start_hour = int(start_str[:2]) start_minute = int(start_str[2:]) end_hour = int(end_str[:2]) end_minute = int(end_str[2:]) else: continue elif format_type == '中文': # 9點到11點 start_hour = int(groups[0]) start_minute = 0 end_hour = int(groups[1]) end_minute = 0 # 處理下午時間(如果有) if '下午' in time_str and start_hour < 12: start_hour += 12 if '下午' in time_str and end_hour < 12: end_hour += 12 # 處理半點 if '半' in time_str.split('到')[0]: start_minute = 30 if '半' in time_str.split('到')[1]: end_minute = 30 elif format_type == 'h:mm-h': # 9:00到11 start_hour = int(groups[0]) start_minute = int(groups[1]) end_hour = int(groups[2]) end_minute = 0 break # 驗證解析結果 if start_hour is None or end_hour is None: return None, None, None, None, "❌ 時段格式不正確\n請使用以下格式:\n• 09:00-11:00\n• 0900-1100\n• 9點到11點\n• 下午2點到4點" # 驗證時間有效性 if not (0 <= start_hour <= 23 and 0 <= end_hour <= 23): return None, None, None, None, "❌ 小時必須在 0-23 之間" if not (0 <= start_minute <= 59 and 0 <= end_minute <= 59): return None, None, None, None, "❌ 分鐘必須在 0-59 之間" # 計算時間(轉換為分鐘便於比較) start_total = start_hour * 60 + start_minute end_total = end_hour * 60 + end_minute if start_total >= end_total: return None, None, None, None, "❌ 結束時間必須晚於開始時間" duration = (end_total - start_total) / 60 if duration > 8: return None, None, None, None, "❌ 預約時段不能超過 8 小時" # 返回標準格式 start_time = f"{start_hour:02d}:{start_minute:02d}" end_time = f"{end_hour:02d}:{end_minute:02d}" display_time = f"{start_time}-{end_time}" return start_time, end_time, display_time, None, None def validate_time(time_str): """驗證時間格式(保留向後兼容)""" start_time, end_time, display_time, error, _ = parse_time_range(time_str) if error: return False, error return True, None def parse_people(people_str): numbers = re.findall(r'\d+', people_str) if numbers: count = int(numbers[0]) if 1 <= count <= 10: return count, None return None, "❌ 人數必須在 1-10 人之間" return None, "❌ 請輸入有效的人數\n例如:2人、3" def validate_room(room_str): room_normalized = room_str.strip().upper() for room in AVAILABLE_ROOMS: if room.upper() in room_normalized or room_normalized in room.upper(): return room, None return None, f"❌ 請選擇:{', '.join(AVAILABLE_ROOMS)}" def save_booking_to_file(user_id, booking_data): bookings_dir = pathlib.Path("bookings") bookings_dir.mkdir(exist_ok=True) booking_file = bookings_dir / "bookings.json" if booking_file.exists(): with open(booking_file, 'r', encoding='utf-8') as f: try: bookings = json.load(f) except: bookings = [] else: bookings = [] booking_id = f"BK{int(time.time())}" booking_record = { "booking_id": booking_id, "user_id": user_id, "date": booking_data["date"], "time": booking_data["time"], "start_time": booking_data.get("start_time"), "end_time": booking_data.get("end_time"), "people": booking_data["people"], "room": booking_data["room"], "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "status": "pending", "calendar_link": None } calendar_link = create_calendar_event(booking_data, booking_id) if calendar_link: booking_record["calendar_link"] = calendar_link bookings.append(booking_record) with open(booking_file, 'w', encoding='utf-8') as f: json.dump(bookings, f, ensure_ascii=False, indent=2) logger.info(f"✅ 預約已儲存: {booking_id}") return booking_id, calendar_link def handle_booking_flow(user_id, user_message): booking = get_user_booking(user_id) current_state = booking["state"] if time.time() - booking["last_update"] > 600: reset_booking(user_id) booking = get_user_booking(user_id) current_state = booking["state"] booking["last_update"] = time.time() if user_message in ["取消", "取消預約", "重來"]: reset_booking(user_id) return "✅ 已取消預約\n輸入「預約」可重新開始" if current_state == BookingState.IDLE: if is_booking_keyword(user_message): booking["state"] = BookingState.ASKING_DATE return "🎹 琴房預約系統\n\n📅 請問預約日期?\n例如:明天、2025-12-26" return None elif current_state == BookingState.ASKING_DATE: parsed_date, error = parse_date(user_message) if error: return error booking["date"] = parsed_date booking["state"] = BookingState.ASKING_TIME return f"✅ 日期:{parsed_date}\n\n⏰ 請問時段?\n例如:09:00-11:00" elif current_state == BookingState.ASKING_TIME: # 解析並驗證時間 start_time, end_time, display_time, error, err_msg = parse_time_range(user_message) if error: return err_msg # 儲存標準格式 booking["time"] = display_time booking["start_time"] = start_time booking["end_time"] = end_time booking["state"] = BookingState.ASKING_PEOPLE return f"✅ 時段:{display_time}\n\n👥 請問人數?\n例如:2人" elif current_state == BookingState.ASKING_PEOPLE: people_count, error = parse_people(user_message) if error: return error booking["people"] = people_count booking["state"] = BookingState.ASKING_ROOM return f"✅ 人數:{people_count}人\n\n🎹 請選擇琴房:\n{', '.join(AVAILABLE_ROOMS)}" elif current_state == BookingState.ASKING_ROOM: room, error = validate_room(user_message) if error: return error booking["room"] = room booking["state"] = BookingState.CONFIRMING return ( f"📋 確認預約資訊:\n" f"{'='*20}\n" f"📅 {booking['date']}\n" f"⏰ {booking['time']}\n" f"👥 {booking['people']}人\n" f"🎹 {booking['room']}\n" f"{'='*20}\n\n" f"輸入「確認」送出" ) elif current_state == BookingState.CONFIRMING: if user_message in ["確認", "確定", "ok", "OK"]: try: booking_id, calendar_link = save_booking_to_file(user_id, booking) result = ( f"🎉 預約成功!\n" f"{'='*20}\n" f"📅 {booking['date']}\n" f"⏰ {booking['time']}\n" f"👥 {booking['people']}人\n" f"🎹 {booking['room']}\n" f"📝 {booking_id}\n" ) if calendar_link: result += f"📆 已加入 Google Calendar\n" result += f"{'='*20}\n✅ 已收到預約!" reset_booking(user_id) return result except Exception as e: logger.error(f"❌ 儲存失敗: {str(e)}") reset_booking(user_id) return "❌ 預約失敗,請稍後再試" else: return "請輸入「確認」完成預約" return None # ============== FastAPI 應用 ============== app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/", response_class=JSONResponse) def root(): return { "title": "琴房預約系統", "status": "running", "version": "3.1", "features": { "booking": True, "google_calendar": calendar_service is not None, "ai_qa": ai_enabled, "pdf_loaded": len(pdf_content) > 0 }, "endpoints": [ "/admin - 管理介面", "/bookings - 查看預約", "/bookings/latest - 最新預約" ] } @app.get("/bookings", response_class=JSONResponse) def get_bookings(): """查看所有預約""" bookings_file = pathlib.Path("bookings/bookings.json") if bookings_file.exists(): with open(bookings_file, 'r', encoding='utf-8') as f: try: bookings = json.load(f) return {"total": len(bookings), "bookings": bookings} except: return {"total": 0, "bookings": [], "error": "讀取失敗"} return {"total": 0, "bookings": []} @app.get("/bookings/latest", response_class=JSONResponse) def get_latest_booking(): """最新預約""" bookings_file = pathlib.Path("bookings/bookings.json") if bookings_file.exists(): with open(bookings_file, 'r', encoding='utf-8') as f: try: bookings = json.load(f) if bookings: return bookings[-1] except: pass return {"message": "無預約資料"} @app.get("/admin", response_class=HTMLResponse) def admin_panel(): """管理介面""" bookings_file = pathlib.Path("bookings/bookings.json") bookings = [] if bookings_file.exists(): with open(bookings_file, 'r', encoding='utf-8') as f: try: bookings = json.load(f) except: pass html = f""" 琴房預約管理

🎹 琴房預約管理

總預約數

{len(bookings)}

待確認

{len([b for b in bookings if b.get('status') == 'pending'])}

Google Calendar

{'✓' if calendar_service else '✗'}
""" for booking in reversed(bookings): calendar_icon = '📆' if booking.get('calendar_link') else '❌' html += f""" " html += """
編號 日期 時段 人數 琴房 建立時間 日曆
{booking['booking_id']} {booking['date']} {booking['time']} {booking['people']}人 {booking['room']} {booking['created_at']} """ if booking.get('calendar_link'): html += f'{calendar_icon}' else: html += calendar_icon html += "
""" return html @app.post("/webhook") async def webhook(request: Request, background_tasks: BackgroundTasks, x_line_signature=Header(None)): body = await request.body() try: background_tasks.add_task(line_handler.handle, body.decode("utf-8"), x_line_signature) except InvalidSignatureError: raise HTTPException(status_code=400, detail="Invalid signature") return "ok" @line_handler.add(MessageEvent, message=TextMessage) def handle_message(event): user_message = event.message.text.strip() user_id = event.source.user_id if user_message == "再見": line_bot_api.reply_message(event.reply_token, TextSendMessage(text="👋 再見!")) return # 速率限制 current_time = time.time() if user_id in last_request_time: if current_time - last_request_time[user_id] < REQUEST_COOLDOWN: return last_request_time[user_id] = current_time # 預約流程 booking_response = handle_booking_flow(user_id, user_message) if booking_response: line_bot_api.reply_message(event.reply_token, TextSendMessage(text=booking_response)) return # AI 問答 if ai_enabled: try: prompt = f"參考資料:{pdf_content}\n\n問題:{user_message}\n\n簡潔回答。" if pdf_content else user_message model = genai.GenerativeModel('gemini-2.5-flash') response = model.generate_content(prompt) out = response.text if response and response.text else "無法回答" except Exception as e: logger.error(f"AI錯誤: {str(e)}") out = "系統忙碌中" else: out = "請輸入「預約」開始預約琴房" line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out)) if __name__ == "__main__": import uvicorn logger.info("="*50) logger.info("🎹 琴房預約系統啟動") logger.info(f"Google Calendar: {'✅' if calendar_service else '❌'}") logger.info(f"AI 問答: {'✅' if ai_enabled else '❌'}") logger.info(f"PDF: {'✅' if pdf_content else '❌'}") logger.info("="*50) uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)