| import os |
| import json |
| import httpx |
| import time |
| from dotenv import load_dotenv |
| from supabase import create_client, Client |
|
|
| load_dotenv() |
|
|
| |
| from store import token_store |
|
|
| |
| |
| SUPABASE_URL = os.getenv("SUPABASE_URL") |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") |
| if not SUPABASE_URL or not SUPABASE_KEY: |
| print("Error: SUPABASE_URL or SUPABASE_KEY not found in environment variables.") |
| |
| |
| supabase: Client | None = None |
| else: |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) |
| print("Supabase client initialized.") |
|
|
| async def get_app_secret_from_db(app_id: str) -> str | None: |
| """ |
| Reads the app_secret from the Supabase database based on app_id. |
| Assumes a table named 'feishu_apps' with columns 'app_id' and 'app_secret'. |
| """ |
| if not supabase: |
| print("Supabase client not initialized, cannot fetch app_secret.") |
| return None |
|
|
| try: |
| |
| |
| response = supabase.table('feishu_bot_config').select('platform_specific').eq('bot_id', app_id).single().execute() |
| |
| |
| if response.data and 'platform_specific' in response.data and 'app_secret' in response.data['platform_specific']: |
| app_secret = response.data['platform_specific']['app_secret'] |
| return app_secret |
| else: |
| print(f"No app_secret found in Supabase for app_id: {app_id}.") |
| return None |
| except Exception as e: |
| print(f"Error fetching app_secret from Supabase for app_id {app_id}: {e}") |
| return None |
|
|
| async def get_valid_tenant_access_token(app_id: str) -> str | None: |
| """ |
| Retrieves a valid tenant access token for the given app_id. |
| Prioritizes fetching from the global store if not expired, |
| otherwise generates a new one and stores it. |
| |
| Args: |
| app_id: The Feishu App ID. |
| |
| Returns: |
| A valid tenant access token or None if unable to obtain one. |
| """ |
| |
| stored_token_info = token_store.get(app_id) |
| if stored_token_info: |
| token = stored_token_info.get('token') |
| created_time = stored_token_info.get('created_time') |
| expires_in = stored_token_info.get('expires_in') |
|
|
| |
| if token and created_time is not None and expires_in is not None: |
| current_time = time.time() |
| |
| if current_time < created_time + expires_in - 60: |
| print(f"Using cached tenant access token for app_id: {app_id}") |
| return token |
| else: |
| print(f"Cached tenant access token for app_id: {app_id} has expired.") |
| else: |
| print(f"Incomplete token info found in store for app_id: {app_id}.") |
|
|
| |
| print(f"\n\nGenerating new tenant access token for app_id: {app_id}") |
|
|
| |
| app_secret = await get_app_secret_from_db(app_id) |
|
|
| if not app_secret: |
| print(f"Could not retrieve valid app_secret for app_id: {app_id}") |
| return None |
|
|
| url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" |
| headers = { |
| "Content-Type": "application/json" |
| } |
| payload = { |
| "app_id": app_id, |
| "app_secret": app_secret |
| } |
|
|
| async with httpx.AsyncClient() as client: |
| try: |
| response = await client.post(url, headers=headers, json=payload) |
| response.raise_for_status() |
| data = response.json() |
| if data.get("code") == 0: |
| new_token = data.get("tenant_access_token") |
| expire = data.get("expire") |
| if new_token and expire is not None: |
| |
| token_store[app_id] = { |
| 'token': new_token, |
| 'created_time': time.time(), |
| 'expire': expire |
| } |
| return new_token |
| else: |
| print(f"Error generating new tenant access token: Missing token or expiry info in response.") |
| return None |
| else: |
| print(f"Error getting tenant access token from API: {data.get('msg')}") |
| return None |
| except httpx.HTTPStatusError as e: |
| print(f"HTTP error occurred while generating token: {e}") |
| return None |
| except httpx.RequestError as e: |
| print(f"An error occurred while requesting {e.request.url!r} to generate token: {e}") |
| return None |
| except Exception as e: |
| print(f"An unexpected error occurred while generating token: {e}") |
| return None |
|
|
| async def send_feishu_reply(msg_id: str, tenant_access_token: str, content: dict, msg_type: str): |
| """ |
| Sends a reply message to Feishu API. |
| |
| Args: |
| msg_id: The message ID to reply to. |
| tenant_access_token: The tenant access token. |
| content: The content of the reply message (dictionary). |
| msg_type: The type of the message (e.g., 'text'). |
| """ |
| url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg_id}/reply" |
| headers = { |
| "Authorization": f"Bearer {tenant_access_token}", |
| "Content-Type": "application/json" |
| } |
| |
| body = { |
| "content": json.dumps(content), |
| "msg_type": msg_type |
| } |
|
|
| async with httpx.AsyncClient() as client: |
| try: |
| response = await client.post(url, headers=headers, json=body) |
| response.raise_for_status() |
| except httpx.HTTPStatusError as e: |
| print(f"HTTP error sending reply: {e}") |
| except httpx.RequestError as e: |
| print(f"An error occurred while requesting {e.request.url!r} to send reply: {e}") |
| except Exception as e: |
| print(f"An unexpected error occurred while sending reply: {e}") |
|
|