import hmac import base64 import hashlib import os from dotenv import load_dotenv import requests from typing import Optional from fastapi import FastAPI, Request, Header, HTTPException import datetime import json import pytz from google.oauth2 import service_account from googleapiclient.discovery import build from google.auth.transport.requests import Request as GoogleRequest from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow import google.generativeai as genai from vertexai.generative_models import GenerativeModel # .env 파일에서 환경 변수를 불러옵니다. load_dotenv() app = FastAPI() # ⚠️ Chatwork 웹훅 설정 페이지에서 받은 '웹훅 토큰' CHATWORK_WEBHOOK_TOKEN = os.getenv("CHATWORK_WEBHOOK_TOKEN") # ⚠️ 1단계에서 발급받은 'API 토큰' CHATWORK_API_TOKEN = os.getenv("CHATWORK_API_TOKEN") base_path = os.path.dirname(__file__) credentials_path = os.path.join(base_path, 'new_credential.json') token_path = os.path.join(base_path, 'new_token.json') if not os.path.exists(credentials_path): print(f"'{credentials_path}' 파일이 없어 새로 생성합니다.") credentials_str = os.getenv('GOOGLE_CREDENTIALS_JSON') if credentials_str: with open(credentials_path, 'w', encoding='utf-8') as f: f.write(credentials_str) else: print("경고: .env 파일에 'GOOGLE_CREDENTIALS_JSON' 변수가 없습니다.") if not os.path.exists(token_path): print(f"'{token_path}' 파일이 없어 새로 생성합니다.") token_str = os.getenv('GOOGLE_TOKEN_JSON') if token_str: with open(token_path, 'w', encoding='utf-8') as f: f.write(token_str) CREDENTIALS_FILENAME = credentials_path TOKEN_FILENAME = token_path # Google Calendar API 스코프 (읽기 전용) SCOPES = ['https://www.googleapis.com/auth/calendar.readonly'] # 기준 시간대 (GAS의 Session.getScriptTimeZone()에 해당) # 필요에 따라 'Asia/Tokyo', 'America/New_York' 등으로 변경하세요. TIMEZONE = 'Asia/Tokyo' rooms = [ {"capacity": 6, "name": "1F2", "email": os.getenv("GOOGLE_CALENDAR_1F2")}, {"capacity": 6, "name": "1F3", "email": os.getenv("GOOGLE_CALENDAR_1F3")}, {"capacity": 12, "name": "3F1", "email": os.getenv("GOOGLE_CALENDAR_3F1")}, {"capacity": 4, "name": "3F2", "email": os.getenv("GOOGLE_CALENDAR_3F2")}, {"capacity": 4, "name": "3F3", "email": os.getenv("GOOGLE_CALENDAR_3F3")}, {"capacity": 4, "name": "3F5", "email": os.getenv("GOOGLE_CALENDAR_3F5")}, {"capacity": 4, "name": "3F6", "email": os.getenv("GOOGLE_CALENDAR_3F6")}, {"capacity": 12, "name": "6F1", "email": os.getenv("GOOGLE_CALENDAR_6F1")}, {"capacity": 4, "name": "6F2", "email": os.getenv("GOOGLE_CALENDAR_6F2")}, {"capacity": 4, "name": "6F3", "email": os.getenv("GOOGLE_CALENDAR_6F3")}, {"capacity": 2, "name": "6F5", "email": os.getenv("GOOGLE_CALENDAR_6F5")}, {"capacity": 2, "name": "6F6", "email": os.getenv("GOOGLE_CALENDAR_6F6")}, {"capacity": 2, "name": "6F7", "email": os.getenv("GOOGLE_CALENDAR_6F7")}, {"capacity": 4, "name": "7F2", "email": os.getenv("GOOGLE_CALENDAR_7F2")}, {"capacity": 4, "name": "7F3", "email": os.getenv("GOOGLE_CALENDAR_7F3")}, {"capacity": 4, "name": "7F5", "email": os.getenv("GOOGLE_CALENDAR_7F5") }, {"capacity": 4, "name": "7F6", "email": os.getenv("GOOGLE_CALENDAR_7F6") } ] # def get_credentials(): # creds = None # if os.path.exists(TOKEN_FILENAME): # creds = Credentials.from_authorized_user_file(TOKEN_FILENAME, SCOPES) # # 크리덴셜이 없거나 유효하지 않으면 사용자 로그인 플로우를 실행합니다. # if not creds or not creds.valid: # if creds and creds.expired and creds.refresh_token: # creds.refresh(GoogleRequest()) # else: # flow = InstalledAppFlow.from_client_secrets_file( # CREDENTIALS_FILENAME, SCOPES) # creds = flow.run_local_server(port=0) # # 다음 실행을 위해 토큰을 저장합니다. # with open(TOKEN_FILENAME, 'w') as token: # token.write(creds.to_json()) # return creds def get_oauth_credentials(): """ OAuth 2.0 사용자 인증을 처리하고 유효한 Credentials 객체를 반환합니다. - token.json이 있으면 사용하고, 없거나 만료되었으면 새로 인증합니다. """ creds = None print(f"log1") # token.json 파일이 이미 존재하면, 저장된 인증 정보를 불러옵니다. if os.path.exists(TOKEN_FILENAME): print(f"log2") creds = Credentials.from_authorized_user_file(TOKEN_FILENAME, SCOPES) print(f"log3") # 인증 정보가 없거나 유효하지 않으면, 사용자에게 로그인을 요청합니다. if not creds or not creds.valid: print(f"log4") if creds and creds.expired and creds.refresh_token: print(f"log5") # 토큰이 만료되었으면, 리프레시 토큰을 사용해 갱신합니다. creds.refresh(GoogleRequest()) else: print(f"log6") # 토큰이 없으면, credentials.json을 이용해 새로운 인증 절차를 시작합니다. flow = InstalledAppFlow.from_client_secrets_file( CREDENTIALS_FILENAME, SCOPES) print(f"log7") # 서버 환경을 위해 로컬 서버 대신 콘솔 기반 인증을 실행합니다. creds = flow.run_console() # 다음 실행을 위해 새로 발급받거나 갱신된 인증 정보를 token.json 파일에 저장합니다. print(f"log8") with open(TOKEN_FILENAME, 'w') as token: print(f"log9") token.write(creds.to_json()) print(f"log10") return creds def check_room_free_slots_over_1h(date_str=None): # 회의실 정보 output_lines = [] # --- 인증 및 서비스 빌드 --- try: creds = get_oauth_credentials() service = build('calendar', 'v3', credentials=creds) except Exception as e: print(f"認証エラー: {e}") return # --- 조회 시간 범위 설정 --- # --- 조회 시간 범위 설정 --- tz = pytz.timezone(TIMEZONE) now = datetime.datetime.now(tz) if date_str: target_date = datetime.datetime.strptime(date_str, "%Y-%m-%d").date() start_time = tz.localize(datetime.datetime.combine(target_date, datetime.time(10, 0))) end_time = tz.localize(datetime.datetime.combine(target_date, datetime.time(19, 0))) output_lines.append(f"照会対象: {start_time.strftime('%Y/%m/%d')}") else: today = now.date() start_hour = max(10, now.hour) start_time = tz.localize(datetime.datetime.combine(today, datetime.time(start_hour, 0))) end_time = tz.localize(datetime.datetime.combine(today, datetime.time(19, 0))) output_lines.append(f"照会対象: {start_time.strftime('%Y/%m/%d')}") if start_time >= end_time: output_lines.append("照会可能な時間がありません") return "\n".join(output_lines) # --- FreeBusy API로 모든 회의실의 바쁜 시간 한 번에 조회 --- body = { "timeMin": start_time.isoformat(), "timeMax": end_time.isoformat(), "timeZone": TIMEZONE, "items": [{"id": room["email"]} for room in rooms] } print(f"freebusy 1") try: freebusy_result = service.freebusy().query(body=body).execute() calendars_busy_info = freebusy_result.get('calendars', {}) print(f"freebusy 2") except Exception as e: print(f"freebusy 3") return f"FreeBusy API 失敗: {e}" output_lines.append("-" * 30) print(f"freebusy 4") # --- 각 회의실별로 빈 시간 계산 및 출력 --- for room in rooms: room_email = room['email'] busy_info = calendars_busy_info.get(room_email, {}) if busy_info.get('errors'): reason = busy_info['errors'][0].get('reason', 'unknown') output_lines.append(f"[{room['name']}] カレンダー照会失敗: {reason}") continue busy_slots = busy_info.get('busy', []) cursor = start_time free_slots = [] for busy in busy_slots: busy_start = datetime.datetime.fromisoformat(busy['start']) busy_end = datetime.datetime.fromisoformat(busy['end']) if busy_start > cursor: free_slots.append({'start': cursor, 'end': busy_start}) if busy_end > cursor: cursor = busy_end if cursor < end_time: free_slots.append({'start': cursor, 'end': end_time}) long_free_slots = [ slot for slot in free_slots if (slot['end'] - slot['start']) >= datetime.timedelta(hours=1) ] if long_free_slots: output_lines.append(f"[{room['name']}] (定員 {room['capacity']}名)") for slot in long_free_slots: output_lines.append(f" {slot['start'].strftime('%H:%M')} ~ {slot['end'].strftime('%H:%M')}") return "\n".join(output_lines) def search_calendar(calendar_data , order: str) -> str: genai.configure(api_key=os.getenv("GEMINI_API_KEY")) model = genai.GenerativeModel("gemini-1.5-pro") message = f"""以下が空き室の空いている時間情報です。\n {calendar_data} このデータを見て""" prompt = "{message}{order}".format(message=message, order=order) token_info = model.count_tokens(prompt) print(f"Prompt Token Count: {token_info.total_tokens}") response = model.generate_content(prompt) return response.text if response.text else "生成失敗" def calendar_main(input_string: str): try: parts = input_string.split() if len(parts) < 3 or parts[0].lower() != '会議室': print("\n ●●●●●●● 会議室じゃないのでRETURN \n") return # 날짜 파싱 및 포맷팅 date_input = parts[1] formatted_date= "" if date_input == "すぐ": formatted_date = "" elif len(date_input) != 8 or not date_input.isdigit(): print("日付のフォーマットが正しくありません YYYYMMDD にしてください。") return else: formatted_date = f"{date_input[:4]}-{date_input[4:6]}-{date_input[6:]}" # 인원수 파싱 capacity_input = parts[2] if not capacity_input.isdigit(): print("人数は数字のみ入力してください。") return min_capacity = int(capacity_input) # 4번째 인자가 있으면 취득 optional_arg = "" if len(parts) > 3: optional_arg = " ".join(parts[3:]) print(f"追加オプション: '{optional_arg}'") except Exception as e: print(f"エラー発生: {e}") return free_rooms = check_room_free_slots_over_1h(formatted_date) print(f"\n ●●●●●●● response {free_rooms} \n") if free_rooms: response = search_calendar(free_rooms, f"{formatted_date}に{min_capacity}名が入れる会議室を教えてください{optional_arg}。回答はシンプルに[会議室名] 定員 空き時間のフォーマットでしてください") return response else: return @app.get("/") async def home(): return "Hello, FastAPI! 이건 루트 경로입니다." @app.post("/") async def handle_chatwork_webhook( request: Request, # 👇 str | None 대신 Optional[str]을 사용하도록 수정합니다. signature: Optional[str] = Header(None, alias="X-ChatWorkWebhookSignature") ): """ Chatwork 웹훅을 수신하고 서명을 검증한 뒤 데이터를 처리합니다. """ if not signature: raise HTTPException(status_code=400, detail="Signature header is missing.") # 요청의 원본 본문(raw body)을 가져옵니다. raw_body = await request.body() # --- 서명 검증 로직 --- # 1. 웹훅 토큰을 base64로 디코딩합니다. secret_key = base64.b64decode(CHATWORK_WEBHOOK_TOKEN) # 2. HMAC-SHA256 해시를 생성합니다. digest = hmac.new(secret_key, raw_body, hashlib.sha256).digest() # 3. 계산된 해시를 base64로 인코딩합니다. computed_signature = base64.b64encode(digest).decode() # 4. 수신한 서명과 계산된 서명이 일치하는지 확인합니다. if not hmac.compare_digest(computed_signature, signature): raise HTTPException(status_code=401, detail="Invalid signature.") # 서명이 유효하면, 데이터를 JSON으로 파싱하여 처리합니다. data = await request.json() # 로그에 수신된 데이터 출력 (서버 로그에서 확인 가능) print("✅ Webhook received and verified:") print(data) # --- 👇 답장 메시지 전송 로직 추가 --- try: # 웹훅 페이로드에서 필요한 정보 추출 event = data.get("webhook_event", {}) room_id = event.get("room_id") from_account_id = event.get("account_id") message_id = event.get("message_id") message_body = event.get("body") calendar_data = "" if message_body: # strip()을 사용하여 앞뒤 공백 및 줄바꿈 문자를 제거하고 출력합니다. cleaned_message = message_body.strip() if cleaned_message[0] != '会': print("会議室の検索コマンドではありません。") print(cleaned_message[0]) return if cleaned_message: calendar_data = calendar_main(cleaned_message) if not calendar_data: return if room_id and from_account_id and message_id: # Chatwork API 엔드포인트 url = f"https://api.chatwork.com/v2/rooms/{room_id}/messages" # API 요청 헤더 headers = {"X-ChatWorkToken": CHATWORK_API_TOKEN} # 보낼 메시지 본문 (수신 확인 답장) # [rp] 태그를 사용하면 특정 메시지에 대한 답장 형식으로 보낼 수 있습니다. reply_body = ( f"[rp aid={from_account_id} to={room_id}-{message_id}]" f"\n✅{calendar_data}" ) payload = {"body": reply_body} # Chatwork API로 메시지 전송 요청 response = requests.post(url, headers=headers, data=payload) response.raise_for_status() # 요청이 실패하면 예외 발생 print(f"✅ Replied to room {room_id} successfully.") except Exception as e: print(f"❌ Failed to send reply message: {e}") # 답장 전송에 실패하더라도 웹훅 수신 자체는 성공했으므로 오류를 반환하지 않을 수 있습니다. # Chatwork에 성공적으로 수신했음을 알림 return {"status": "success"} get_oauth_credentials()