flaskSample / app /app.py
medical-kiban's picture
no message
f1cd3d2
import hmac
import base64
import hashlib
import os
from dotenv import load_dotenv
import requests
from typing import Optional
from fastapi import FastAPI, Request, Header, HTTPException
import datetime
import json
import pytz
from google.oauth2 import service_account
from googleapiclient.discovery import build
from google.auth.transport.requests import Request as GoogleRequest
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
import google.generativeai as genai
from vertexai.generative_models import GenerativeModel
# .env ํŒŒ์ผ์—์„œ ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋ฅผ ๋ถˆ๋Ÿฌ์˜ต๋‹ˆ๋‹ค.
load_dotenv()
app = FastAPI()
# โš ๏ธ Chatwork ์›นํ›… ์„ค์ • ํŽ˜์ด์ง€์—์„œ ๋ฐ›์€ '์›นํ›… ํ† ํฐ'
CHATWORK_WEBHOOK_TOKEN = os.getenv("CHATWORK_WEBHOOK_TOKEN")
# โš ๏ธ 1๋‹จ๊ณ„์—์„œ ๋ฐœ๊ธ‰๋ฐ›์€ 'API ํ† ํฐ'
CHATWORK_API_TOKEN = os.getenv("CHATWORK_API_TOKEN")
base_path = os.path.dirname(__file__)
credentials_path = os.path.join(base_path, 'new_credential.json')
token_path = os.path.join(base_path, 'new_token.json')
if not os.path.exists(credentials_path):
print(f"'{credentials_path}' ํŒŒ์ผ์ด ์—†์–ด ์ƒˆ๋กœ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.")
credentials_str = os.getenv('GOOGLE_CREDENTIALS_JSON')
if credentials_str:
with open(credentials_path, 'w', encoding='utf-8') as f:
f.write(credentials_str)
else:
print("๊ฒฝ๊ณ : .env ํŒŒ์ผ์— 'GOOGLE_CREDENTIALS_JSON' ๋ณ€์ˆ˜๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.")
if not os.path.exists(token_path):
print(f"'{token_path}' ํŒŒ์ผ์ด ์—†์–ด ์ƒˆ๋กœ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.")
token_str = os.getenv('GOOGLE_TOKEN_JSON')
if token_str:
with open(token_path, 'w', encoding='utf-8') as f:
f.write(token_str)
CREDENTIALS_FILENAME = credentials_path
TOKEN_FILENAME = token_path
# Google Calendar API ์Šค์ฝ”ํ”„ (์ฝ๊ธฐ ์ „์šฉ)
SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']
# ๊ธฐ์ค€ ์‹œ๊ฐ„๋Œ€ (GAS์˜ Session.getScriptTimeZone()์— ํ•ด๋‹น)
# ํ•„์š”์— ๋”ฐ๋ผ 'Asia/Tokyo', 'America/New_York' ๋“ฑ์œผ๋กœ ๋ณ€๊ฒฝํ•˜์„ธ์š”.
TIMEZONE = 'Asia/Tokyo'
rooms = [
{"capacity": 6, "name": "1F2", "email": os.getenv("GOOGLE_CALENDAR_1F2")},
{"capacity": 6, "name": "1F3", "email": os.getenv("GOOGLE_CALENDAR_1F3")},
{"capacity": 12, "name": "3F1", "email": os.getenv("GOOGLE_CALENDAR_3F1")},
{"capacity": 4, "name": "3F2", "email": os.getenv("GOOGLE_CALENDAR_3F2")},
{"capacity": 4, "name": "3F3", "email": os.getenv("GOOGLE_CALENDAR_3F3")},
{"capacity": 4, "name": "3F5", "email": os.getenv("GOOGLE_CALENDAR_3F5")},
{"capacity": 4, "name": "3F6", "email": os.getenv("GOOGLE_CALENDAR_3F6")},
{"capacity": 12, "name": "6F1", "email": os.getenv("GOOGLE_CALENDAR_6F1")},
{"capacity": 4, "name": "6F2", "email": os.getenv("GOOGLE_CALENDAR_6F2")},
{"capacity": 4, "name": "6F3", "email": os.getenv("GOOGLE_CALENDAR_6F3")},
{"capacity": 2, "name": "6F5", "email": os.getenv("GOOGLE_CALENDAR_6F5")},
{"capacity": 2, "name": "6F6", "email": os.getenv("GOOGLE_CALENDAR_6F6")},
{"capacity": 2, "name": "6F7", "email": os.getenv("GOOGLE_CALENDAR_6F7")},
{"capacity": 4, "name": "7F2", "email": os.getenv("GOOGLE_CALENDAR_7F2")},
{"capacity": 4, "name": "7F3", "email": os.getenv("GOOGLE_CALENDAR_7F3")},
{"capacity": 4, "name": "7F5", "email": os.getenv("GOOGLE_CALENDAR_7F5") },
{"capacity": 4, "name": "7F6", "email": os.getenv("GOOGLE_CALENDAR_7F6") }
]
# def get_credentials():
# creds = None
# if os.path.exists(TOKEN_FILENAME):
# creds = Credentials.from_authorized_user_file(TOKEN_FILENAME, SCOPES)
# # ํฌ๋ฆฌ๋ด์…œ์ด ์—†๊ฑฐ๋‚˜ ์œ ํšจํ•˜์ง€ ์•Š์œผ๋ฉด ์‚ฌ์šฉ์ž ๋กœ๊ทธ์ธ ํ”Œ๋กœ์šฐ๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.
# if not creds or not creds.valid:
# if creds and creds.expired and creds.refresh_token:
# creds.refresh(GoogleRequest())
# else:
# flow = InstalledAppFlow.from_client_secrets_file(
# CREDENTIALS_FILENAME, SCOPES)
# creds = flow.run_local_server(port=0)
# # ๋‹ค์Œ ์‹คํ–‰์„ ์œ„ํ•ด ํ† ํฐ์„ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
# with open(TOKEN_FILENAME, 'w') as token:
# token.write(creds.to_json())
# return creds
def get_oauth_credentials():
"""
OAuth 2.0 ์‚ฌ์šฉ์ž ์ธ์ฆ์„ ์ฒ˜๋ฆฌํ•˜๊ณ  ์œ ํšจํ•œ Credentials ๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
- token.json์ด ์žˆ์œผ๋ฉด ์‚ฌ์šฉํ•˜๊ณ , ์—†๊ฑฐ๋‚˜ ๋งŒ๋ฃŒ๋˜์—ˆ์œผ๋ฉด ์ƒˆ๋กœ ์ธ์ฆํ•ฉ๋‹ˆ๋‹ค.
"""
creds = None
print(f"log1")
# token.json ํŒŒ์ผ์ด ์ด๋ฏธ ์กด์žฌํ•˜๋ฉด, ์ €์žฅ๋œ ์ธ์ฆ ์ •๋ณด๋ฅผ ๋ถˆ๋Ÿฌ์˜ต๋‹ˆ๋‹ค.
if os.path.exists(TOKEN_FILENAME):
print(f"log2")
creds = Credentials.from_authorized_user_file(TOKEN_FILENAME, SCOPES)
print(f"log3")
# ์ธ์ฆ ์ •๋ณด๊ฐ€ ์—†๊ฑฐ๋‚˜ ์œ ํšจํ•˜์ง€ ์•Š์œผ๋ฉด, ์‚ฌ์šฉ์ž์—๊ฒŒ ๋กœ๊ทธ์ธ์„ ์š”์ฒญํ•ฉ๋‹ˆ๋‹ค.
if not creds or not creds.valid:
print(f"log4")
if creds and creds.expired and creds.refresh_token:
print(f"log5")
# ํ† ํฐ์ด ๋งŒ๋ฃŒ๋˜์—ˆ์œผ๋ฉด, ๋ฆฌํ”„๋ ˆ์‹œ ํ† ํฐ์„ ์‚ฌ์šฉํ•ด ๊ฐฑ์‹ ํ•ฉ๋‹ˆ๋‹ค.
creds.refresh(GoogleRequest())
else:
print(f"log6")
# ํ† ํฐ์ด ์—†์œผ๋ฉด, credentials.json์„ ์ด์šฉํ•ด ์ƒˆ๋กœ์šด ์ธ์ฆ ์ ˆ์ฐจ๋ฅผ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค.
flow = InstalledAppFlow.from_client_secrets_file(
CREDENTIALS_FILENAME, SCOPES)
print(f"log7")
# ์„œ๋ฒ„ ํ™˜๊ฒฝ์„ ์œ„ํ•ด ๋กœ์ปฌ ์„œ๋ฒ„ ๋Œ€์‹  ์ฝ˜์†” ๊ธฐ๋ฐ˜ ์ธ์ฆ์„ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.
creds = flow.run_console()
# ๋‹ค์Œ ์‹คํ–‰์„ ์œ„ํ•ด ์ƒˆ๋กœ ๋ฐœ๊ธ‰๋ฐ›๊ฑฐ๋‚˜ ๊ฐฑ์‹ ๋œ ์ธ์ฆ ์ •๋ณด๋ฅผ token.json ํŒŒ์ผ์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
print(f"log8")
with open(TOKEN_FILENAME, 'w') as token:
print(f"log9")
token.write(creds.to_json())
print(f"log10")
return creds
def check_room_free_slots_over_1h(date_str=None):
# ํšŒ์˜์‹ค ์ •๋ณด
output_lines = []
# --- ์ธ์ฆ ๋ฐ ์„œ๋น„์Šค ๋นŒ๋“œ ---
try:
creds = get_oauth_credentials()
service = build('calendar', 'v3', credentials=creds)
except Exception as e:
print(f"่ช่จผใ‚จใƒฉใƒผ: {e}")
return
# --- ์กฐํšŒ ์‹œ๊ฐ„ ๋ฒ”์œ„ ์„ค์ • ---
# --- ์กฐํšŒ ์‹œ๊ฐ„ ๋ฒ”์œ„ ์„ค์ • ---
tz = pytz.timezone(TIMEZONE)
now = datetime.datetime.now(tz)
if date_str:
target_date = datetime.datetime.strptime(date_str, "%Y-%m-%d").date()
start_time = tz.localize(datetime.datetime.combine(target_date, datetime.time(10, 0)))
end_time = tz.localize(datetime.datetime.combine(target_date, datetime.time(19, 0)))
output_lines.append(f"็…งไผšๅฏพ่ฑก: {start_time.strftime('%Y/%m/%d')}")
else:
today = now.date()
start_hour = max(10, now.hour)
start_time = tz.localize(datetime.datetime.combine(today, datetime.time(start_hour, 0)))
end_time = tz.localize(datetime.datetime.combine(today, datetime.time(19, 0)))
output_lines.append(f"็…งไผšๅฏพ่ฑก: {start_time.strftime('%Y/%m/%d')}")
if start_time >= end_time:
output_lines.append("็…งไผšๅฏ่ƒฝใชๆ™‚้–“ใŒใ‚ใ‚Šใพใ›ใ‚“")
return "\n".join(output_lines)
# --- FreeBusy API๋กœ ๋ชจ๋“  ํšŒ์˜์‹ค์˜ ๋ฐ”์œ ์‹œ๊ฐ„ ํ•œ ๋ฒˆ์— ์กฐํšŒ ---
body = {
"timeMin": start_time.isoformat(),
"timeMax": end_time.isoformat(),
"timeZone": TIMEZONE,
"items": [{"id": room["email"]} for room in rooms]
}
print(f"freebusy 1")
try:
freebusy_result = service.freebusy().query(body=body).execute()
calendars_busy_info = freebusy_result.get('calendars', {})
print(f"freebusy 2")
except Exception as e:
print(f"freebusy 3")
return f"FreeBusy API ๅคฑๆ•—: {e}"
output_lines.append("-" * 30)
print(f"freebusy 4")
# --- ๊ฐ ํšŒ์˜์‹ค๋ณ„๋กœ ๋นˆ ์‹œ๊ฐ„ ๊ณ„์‚ฐ ๋ฐ ์ถœ๋ ฅ ---
for room in rooms:
room_email = room['email']
busy_info = calendars_busy_info.get(room_email, {})
if busy_info.get('errors'):
reason = busy_info['errors'][0].get('reason', 'unknown')
output_lines.append(f"[{room['name']}] ใ‚ซใƒฌใƒณใƒ€ใƒผ็…งไผšๅคฑๆ•—: {reason}")
continue
busy_slots = busy_info.get('busy', [])
cursor = start_time
free_slots = []
for busy in busy_slots:
busy_start = datetime.datetime.fromisoformat(busy['start'])
busy_end = datetime.datetime.fromisoformat(busy['end'])
if busy_start > cursor:
free_slots.append({'start': cursor, 'end': busy_start})
if busy_end > cursor:
cursor = busy_end
if cursor < end_time:
free_slots.append({'start': cursor, 'end': end_time})
long_free_slots = [
slot for slot in free_slots
if (slot['end'] - slot['start']) >= datetime.timedelta(hours=1)
]
if long_free_slots:
output_lines.append(f"[{room['name']}] (ๅฎšๅ“ก {room['capacity']}ๅ)")
for slot in long_free_slots:
output_lines.append(f" {slot['start'].strftime('%H:%M')} ~ {slot['end'].strftime('%H:%M')}")
return "\n".join(output_lines)
def search_calendar(calendar_data , order: str) -> str:
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
model = genai.GenerativeModel("gemini-1.5-pro")
message = f"""ไปฅไธ‹ใŒ็ฉบใๅฎคใฎ็ฉบใ„ใฆใ„ใ‚‹ๆ™‚้–“ๆƒ…ๅ ฑใงใ™ใ€‚\n {calendar_data} ใ“ใฎใƒ‡ใƒผใ‚ฟใ‚’่ฆ‹ใฆ"""
prompt = "{message}{order}".format(message=message, order=order)
token_info = model.count_tokens(prompt)
print(f"Prompt Token Count: {token_info.total_tokens}")
response = model.generate_content(prompt)
return response.text if response.text else "็”Ÿๆˆๅคฑๆ•—"
def calendar_main(input_string: str):
try:
parts = input_string.split()
if len(parts) < 3 or parts[0].lower() != 'ไผš่ญฐๅฎค':
print("\n โ—โ—โ—โ—โ—โ—โ— ไผš่ญฐๅฎคใ˜ใ‚ƒใชใ„ใฎใงRETURN \n")
return
# ๋‚ ์งœ ํŒŒ์‹ฑ ๋ฐ ํฌ๋งทํŒ…
date_input = parts[1]
formatted_date= ""
if date_input == "ใ™ใ":
formatted_date = ""
elif len(date_input) != 8 or not date_input.isdigit():
print("ๆ—ฅไป˜ใฎใƒ•ใ‚ฉใƒผใƒžใƒƒใƒˆใŒๆญฃใ—ใใ‚ใ‚Šใพใ›ใ‚“ YYYYMMDD ใซใ—ใฆใใ ใ•ใ„ใ€‚")
return
else:
formatted_date = f"{date_input[:4]}-{date_input[4:6]}-{date_input[6:]}"
# ์ธ์›์ˆ˜ ํŒŒ์‹ฑ
capacity_input = parts[2]
if not capacity_input.isdigit():
print("ไบบๆ•ฐใฏๆ•ฐๅญ—ใฎใฟๅ…ฅๅŠ›ใ—ใฆใใ ใ•ใ„ใ€‚")
return
min_capacity = int(capacity_input)
# 4๋ฒˆ์งธ ์ธ์ž๊ฐ€ ์žˆ์œผ๋ฉด ์ทจ๋“
optional_arg = ""
if len(parts) > 3:
optional_arg = " ".join(parts[3:])
print(f"่ฟฝๅŠ ใ‚ชใƒ—ใ‚ทใƒงใƒณ: '{optional_arg}'")
except Exception as e:
print(f"ใ‚จใƒฉใƒผ็™บ็”Ÿ: {e}")
return
free_rooms = check_room_free_slots_over_1h(formatted_date)
print(f"\n โ—โ—โ—โ—โ—โ—โ— response {free_rooms} \n")
if free_rooms:
response = search_calendar(free_rooms, f"{formatted_date}ใซ{min_capacity}ๅใŒๅ…ฅใ‚Œใ‚‹ไผš่ญฐๅฎคใ‚’ๆ•™ใˆใฆใใ ใ•ใ„{optional_arg}ใ€‚ๅ›ž็ญ”ใฏใ‚ทใƒณใƒ—ใƒซใซ[ไผš่ญฐๅฎคๅ] ๅฎšๅ“ก ็ฉบใๆ™‚้–“ใฎใƒ•ใ‚ฉใƒผใƒžใƒƒใƒˆใงใ—ใฆใใ ใ•ใ„")
return response
else:
return
@app.get("/")
async def home():
return "Hello, FastAPI! ์ด๊ฑด ๋ฃจํŠธ ๊ฒฝ๋กœ์ž…๋‹ˆ๋‹ค."
@app.post("/")
async def handle_chatwork_webhook(
request: Request,
# ๐Ÿ‘‡ str | None ๋Œ€์‹  Optional[str]์„ ์‚ฌ์šฉํ•˜๋„๋ก ์ˆ˜์ •ํ•ฉ๋‹ˆ๋‹ค.
signature: Optional[str] = Header(None, alias="X-ChatWorkWebhookSignature")
):
"""
Chatwork ์›นํ›…์„ ์ˆ˜์‹ ํ•˜๊ณ  ์„œ๋ช…์„ ๊ฒ€์ฆํ•œ ๋’ค ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
"""
if not signature:
raise HTTPException(status_code=400, detail="Signature header is missing.")
# ์š”์ฒญ์˜ ์›๋ณธ ๋ณธ๋ฌธ(raw body)์„ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.
raw_body = await request.body()
# --- ์„œ๋ช… ๊ฒ€์ฆ ๋กœ์ง ---
# 1. ์›นํ›… ํ† ํฐ์„ base64๋กœ ๋””์ฝ”๋”ฉํ•ฉ๋‹ˆ๋‹ค.
secret_key = base64.b64decode(CHATWORK_WEBHOOK_TOKEN)
# 2. HMAC-SHA256 ํ•ด์‹œ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
digest = hmac.new(secret_key, raw_body, hashlib.sha256).digest()
# 3. ๊ณ„์‚ฐ๋œ ํ•ด์‹œ๋ฅผ base64๋กœ ์ธ์ฝ”๋”ฉํ•ฉ๋‹ˆ๋‹ค.
computed_signature = base64.b64encode(digest).decode()
# 4. ์ˆ˜์‹ ํ•œ ์„œ๋ช…๊ณผ ๊ณ„์‚ฐ๋œ ์„œ๋ช…์ด ์ผ์น˜ํ•˜๋Š”์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.
if not hmac.compare_digest(computed_signature, signature):
raise HTTPException(status_code=401, detail="Invalid signature.")
# ์„œ๋ช…์ด ์œ ํšจํ•˜๋ฉด, ๋ฐ์ดํ„ฐ๋ฅผ JSON์œผ๋กœ ํŒŒ์‹ฑํ•˜์—ฌ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
data = await request.json()
# ๋กœ๊ทธ์— ์ˆ˜์‹ ๋œ ๋ฐ์ดํ„ฐ ์ถœ๋ ฅ (์„œ๋ฒ„ ๋กœ๊ทธ์—์„œ ํ™•์ธ ๊ฐ€๋Šฅ)
print("โœ… Webhook received and verified:")
print(data)
# --- ๐Ÿ‘‡ ๋‹ต์žฅ ๋ฉ”์‹œ์ง€ ์ „์†ก ๋กœ์ง ์ถ”๊ฐ€ ---
try:
# ์›นํ›… ํŽ˜์ด๋กœ๋“œ์—์„œ ํ•„์š”ํ•œ ์ •๋ณด ์ถ”์ถœ
event = data.get("webhook_event", {})
room_id = event.get("room_id")
from_account_id = event.get("account_id")
message_id = event.get("message_id")
message_body = event.get("body")
calendar_data = ""
if message_body:
# strip()์„ ์‚ฌ์šฉํ•˜์—ฌ ์•ž๋’ค ๊ณต๋ฐฑ ๋ฐ ์ค„๋ฐ”๊ฟˆ ๋ฌธ์ž๋ฅผ ์ œ๊ฑฐํ•˜๊ณ  ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค.
cleaned_message = message_body.strip()
if cleaned_message[0] != 'ไผš':
print("ไผš่ญฐๅฎคใฎๆคœ็ดขใ‚ณใƒžใƒณใƒ‰ใงใฏใ‚ใ‚Šใพใ›ใ‚“ใ€‚")
print(cleaned_message[0])
return
if cleaned_message:
calendar_data = calendar_main(cleaned_message)
if not calendar_data:
return
if room_id and from_account_id and message_id:
# Chatwork API ์—”๋“œํฌ์ธํŠธ
url = f"https://api.chatwork.com/v2/rooms/{room_id}/messages"
# API ์š”์ฒญ ํ—ค๋”
headers = {"X-ChatWorkToken": CHATWORK_API_TOKEN}
# ๋ณด๋‚ผ ๋ฉ”์‹œ์ง€ ๋ณธ๋ฌธ (์ˆ˜์‹  ํ™•์ธ ๋‹ต์žฅ)
# [rp] ํƒœ๊ทธ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ํŠน์ • ๋ฉ”์‹œ์ง€์— ๋Œ€ํ•œ ๋‹ต์žฅ ํ˜•์‹์œผ๋กœ ๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
reply_body = (
f"[rp aid={from_account_id} to={room_id}-{message_id}]"
f"\nโœ…{calendar_data}"
)
payload = {"body": reply_body}
# Chatwork API๋กœ ๋ฉ”์‹œ์ง€ ์ „์†ก ์š”์ฒญ
response = requests.post(url, headers=headers, data=payload)
response.raise_for_status() # ์š”์ฒญ์ด ์‹คํŒจํ•˜๋ฉด ์˜ˆ์™ธ ๋ฐœ์ƒ
print(f"โœ… Replied to room {room_id} successfully.")
except Exception as e:
print(f"โŒ Failed to send reply message: {e}")
# ๋‹ต์žฅ ์ „์†ก์— ์‹คํŒจํ•˜๋”๋ผ๋„ ์›นํ›… ์ˆ˜์‹  ์ž์ฒด๋Š” ์„ฑ๊ณตํ–ˆ์œผ๋ฏ€๋กœ ์˜ค๋ฅ˜๋ฅผ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
# Chatwork์— ์„ฑ๊ณต์ ์œผ๋กœ ์ˆ˜์‹ ํ–ˆ์Œ์„ ์•Œ๋ฆผ
return {"status": "success"}
get_oauth_credentials()