import os import re import asyncio import logging from datetime import datetime from typing import List, Dict, Optional from dataclasses import dataclass from contextlib import asynccontextmanager import httpx from fastapi import FastAPI, Request, HTTPException from slack_bolt import App from slack_bolt.adapter.fastapi import SlackRequestHandler from newspaper import Article import uvicorn from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration @dataclass class Config: slack_bot_token: Optional[str] = os.getenv('SLACK_BOT_TOKEN') slack_signing_secret: Optional[str] = os.getenv('SLACK_SIGNING_SECRET') azure_openai_endpoint: Optional[str] = os.getenv('AZURE_OPENAI_ENDPOINT') azure_openai_api_key: Optional[str] = os.getenv('AZURE_OPENAI_API_KEY') azure_openai_deployment_name: str = os.getenv('AZURE_OPENAI_DEPLOYMENT_NAME', 'gpt-4') azure_openai_api_version: str = os.getenv('AZURE_OPENAI_API_VERSION', '2025-01-01') max_content_length: int = 10000 processing_timeout: int = 30 config = Config() # Early validation for required environment variables required_vars = { 'SLACK_BOT_TOKEN': config.slack_bot_token, 'SLACK_SIGNING_SECRET': config.slack_signing_secret, 'AZURE_OPENAI_ENDPOINT': config.azure_openai_endpoint, 'AZURE_OPENAI_API_KEY': config.azure_openai_api_key } missing = [k for k, v in required_vars.items() if not v] if missing: error_msg = f"Missing required environment variables: {', '.join(missing)}. Please set these in Hugging Face Space Secrets." logger.error(error_msg) # Don't raise here yet to allow the process to be visible in logs @asynccontextmanager async def lifespan(app: FastAPI): """FastAPI lifespan event handler""" # Startup logger.info("Starting Slack URL Summarizer Bot") # Validate configuration required_vars = [ 'SLACK_BOT_TOKEN', 'SLACK_SIGNING_SECRET', 'AZURE_OPENAI_ENDPOINT', 'AZURE_OPENAI_API_KEY' ] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: logger.error(f"Missing required environment variables: {', '.join(missing_vars)}") raise Exception(f"Missing required environment variables: {', '.join(missing_vars)}") logger.info("Bot started successfully") yield # Shutdown logger.info("Shutting down Slack URL Summarizer Bot") if hasattr(processor, 'http_client'): await processor.http_client.aclose() # Initialize Slack app try: slack_app = App( token=config.slack_bot_token, signing_secret=config.slack_signing_secret, process_before_response=True, # 暫時停用簽名驗證進行測試 request_verification_enabled=False ) except Exception as e: logger.error(f"Failed to initialize Slack App: {str(e)}") if missing: logger.error(f"CRITICAL: The following environment variables are MISSING: {', '.join(missing)}") # We still need a slack_app object for handler, but it will be broken slack_app = None # Initialize FastAPI with lifespan api = FastAPI(title="Slack URL Summarizer Bot", lifespan=lifespan) handler = SlackRequestHandler(slack_app) class URLProcessor: """Core URL processing functionality""" def __init__(self, config: Config): self.config = config self.http_client = httpx.AsyncClient( timeout=httpx.Timeout(30.0), follow_redirects=True ) async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.http_client.aclose() def extract_urls(self, text: str) -> List[str]: """Extract all URLs from message text""" pattern = r'https?://[^\s<>"{\[\]|\\^`]+' urls = re.findall(pattern, text) logger.info(f"Extracted {len(urls)} URLs from message") return urls async def extract_content(self, url: str) -> Dict: """Extract main content from URL""" try: logger.info(f"Extracting content from: {url}") # 設定更好的用戶代理來避免被阻擋 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } # 先嘗試使用 httpx 直接獲取內容 try: response = await self.http_client.get(url, headers=headers) response.raise_for_status() # 使用 newspaper4k 解析 HTML 內容 article = Article(url) article.set_html(response.text) article.parse() except Exception as e: # 如果 httpx 失敗,嘗試 newspaper4k 的原始方法 logger.warning(f"Direct HTTP request failed, trying newspaper4k: {str(e)}") article = Article(url) # 設定用戶代理 article.config.browser_user_agent = headers['User-Agent'] article.download() article.parse() # 驗證內容 if not article.text or len(article.text.strip()) < 50: # 如果提取的內容太少,嘗試使用基本的網頁內容 if 'response' in locals() and response.text: # 簡單的 HTML 解析 import re from html import unescape # 移除 HTML 標籤 text = re.sub(r'<[^>]+>', '', response.text) text = unescape(text) text = re.sub(r'\s+', ' ', text).strip() if len(text) > 100: # 取前 3000 字符作為內容 text = text[:3000] + "..." if len(text) > 3000 else text result = { 'title': url.split('/')[-1].replace('-', ' ').title(), 'text': text, 'authors': [], 'publish_date': None, 'url': url } logger.info(f"Successfully extracted content using fallback method from {url}") return result raise Exception("Insufficient content extracted") # 截斷內容如果太長 text = article.text if len(text) > self.config.max_content_length: text = text[:self.config.max_content_length] + "..." result = { 'title': article.title or "No title available", 'text': text, 'authors': article.authors, 'publish_date': article.publish_date, 'url': url } logger.info(f"Successfully extracted content from {url}") return result except Exception as e: logger.error(f"Error extracting content from {url}: {str(e)}") # 最後的備用方案:返回基本信息讓 AI 處理 fallback_result = { 'title': f"無法完全提取內容的網頁: {url}", 'text': f"由於網站限制,無法提取完整內容。網址: {url}. 請嘗試直接訪問該網站查看內容。", 'authors': [], 'publish_date': None, 'url': url } logger.info(f"Using fallback content for {url}") return fallback_result async def summarize_and_translate(self, content: Dict) -> str: """Summarize content and translate to Traditional Chinese using Azure OpenAI""" try: logger.info(f"Summarizing content for: {content['url']}") # 檢查是否為備用內容 if "無法完全提取內容" in content['title']: prompt = f"""這個網址因為網站限制無法完全提取內容:{content['url']} 請用繁體中文回覆一個友善的訊息,說明: 1. 由於網站的保護機制,無法自動提取該網頁的完整內容 2. 建議用戶直接點擊連結查看完整內容 3. 如果是知名網站,可以簡單說明該網站的性質(如新聞、技術等) 請保持簡潔友善的語調。""" # 對於備用內容,使用簡化的處理 url = f"{self.config.azure_openai_endpoint}/openai/deployments/{self.config.azure_openai_deployment_name}/chat/completions?api-version={self.config.azure_openai_api_version}" headers = { "Content-Type": "application/json", "api-key": self.config.azure_openai_api_key, } body = { "messages": [ { "role": "system", "content": "你是一個友善的助手,會提供實用的建議。" }, { "role": "user", "content": prompt } ], "temperature": 0.3, "max_tokens": 300 } response = await self.http_client.post(url, headers=headers, json=body) response.raise_for_status() result = response.json() summary = result["choices"][0]["message"]["content"].strip() # 提取 token 使用量資訊 usage_info = result.get("usage", {}) token_stats = { "prompt_tokens": usage_info.get("prompt_tokens", 0), "completion_tokens": usage_info.get("completion_tokens", 0), "total_tokens": usage_info.get("total_tokens", 0) } logger.info(f"Generated fallback response for: {content['url']}") logger.info(f"Token usage - Prompt: {token_stats['prompt_tokens']}, Completion: {token_stats['completion_tokens']}, Total: {token_stats['total_tokens']}") return summary, token_stats else: # 正常的摘要處理 prompt = f"""請將以下文章摘要成 3-5 句重點,並翻譯為繁體中文。請確保摘要簡潔明瞭且包含最重要的資訊: 標題:{content['title']} 內容:{content['text']} 請用繁體中文回覆摘要。""" # Azure OpenAI API call url = f"{self.config.azure_openai_endpoint}/openai/deployments/{self.config.azure_openai_deployment_name}/chat/completions?api-version={self.config.azure_openai_api_version}" headers = { "Content-Type": "application/json", "api-key": self.config.azure_openai_api_key, } body = { "messages": [ { "role": "system", "content": "你是一個專業的技術文章摘要與翻譯專家,精通各種技術領域,能夠準確保留技術術語、專有名詞、數據細節,並將內容翻譯成自然流暢的繁體中文。你特別擅長處理科技、醫療、商業和學術文章,能夠識別並保留重要的技術細節。" }, { "role": "user", "content": prompt } ], "temperature": 0.3, "max_tokens": 800 } response = await self.http_client.post(url, headers=headers, json=body) response.raise_for_status() result = response.json() summary = result["choices"][0]["message"]["content"].strip() logger.info(f"Successfully generated summary for: {content['url']}") return summary except Exception as e: logger.error(f"Error in summarization: {str(e)}") # 回傳錯誤時也要保持 tuple 格式 error_summary = f"抱歉,AI 處理時發生錯誤。錯誤訊息:{str(e)}" error_token_stats = { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0 } return error_summary, error_token_stats def format_response(self, url: str, title: str, summary: str, token_stats: dict = None) -> str: """Format the response message""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") response = f"""🔗 原始網址: {url} 📰 標題: {title} {summary} --- ⏰ 處理時間: {timestamp}""" # 加入 token 使用統計 if token_stats: response += f""" 📊 Token 使用量: 輸入 {token_stats['prompt_tokens']} + 輸出 {token_stats['completion_tokens']} = 總計 {token_stats['total_tokens']} tokens""" return response def format_error_response(self, url: str, error_message: str) -> str: """Format error response message""" return f"""❌ 處理失敗: {url} 🔍 錯誤原因: {error_message} 💡 建議: 請檢查網址是否正確或稍後再試""" # Global processor instance and deduplication cache processor = URLProcessor(config) processing_cache = set() # 用於去重的快取 async def process_url_async(url: str, channel: str, say): """Asynchronous URL processing pipeline""" # 建立唯一的處理 ID process_id = f"{url}:{channel}:{int(datetime.now().timestamp())//60}" # 每分鐘重置 # 檢查是否已經在處理中 if process_id in processing_cache: logger.info(f"URL {url} is already being processed, skipping duplicate") return # 添加到處理快取 processing_cache.add(process_id) try: logger.info(f"Starting to process URL: {url}") async with URLProcessor(config) as proc: # Step 1: Extract content logger.info(f"Step 1: Extracting content from {url}") content = await proc.extract_content(url) logger.info(f"Content extracted successfully. Title: {content.get('title', 'N/A')}") # Step 2: Summarize and translate logger.info(f"Step 2: Summarizing and translating content for {url}") try: result = await proc.summarize_and_translate(content) # 處理回傳值 - 可能是 tuple 或只是 string if isinstance(result, tuple): summary, token_stats = result else: summary = result token_stats = None logger.info(f"Summary generated successfully for {url}") except Exception as e: logger.error(f"Error in summarization, trying fallback: {str(e)}") # 如果 AI 處理失敗,提供基本回應 summary = f"抱歉,由於技術問題無法生成摘要。請直接查看原始網址:{url}" token_stats = None # Step 3: Format and send response logger.info(f"Step 3: Formatting and sending response for {url}") response = proc.format_response(url, content['title'], summary, token_stats) # Send to Slack (使用同步的 say 函數) say(channel=channel, text=response) logger.info(f"Successfully processed and sent response for: {url}") except Exception as e: logger.error(f"Error processing URL {url}: {str(e)}", exc_info=True) error_message = processor.format_error_response(url, str(e)) say(channel=channel, text=error_message) finally: # 處理完成後從快取中移除(延遲5秒) import threading def remove_from_cache(): import time time.sleep(5) processing_cache.discard(process_id) threading.Thread(target=remove_from_cache).start() # Slack event handlers @slack_app.event("message") def handle_message(event, say, ack): """Handle incoming Slack messages""" ack() # 確認收到事件 try: logger.info(f"Received message event: {event}") # Skip bot messages if event.get('bot_id'): logger.info("Skipping bot message") return # Skip app_mention events (這些會由 handle_app_mention 處理) if event.get('type') == 'app_mention': logger.info("Skipping app_mention in message handler") return # Skip messages without text if 'text' not in event: logger.info("Skipping message without text") return message_text = event.get('text', '') channel = event.get('channel') user = event.get('user') # 檢查是否為提及機器人的訊息 (避免重複處理) if '<@U094J502LLC>' in message_text: logger.info("Skipping mention message in message handler (will be handled by app_mention)") return logger.info(f"Processing message from user {user} in channel {channel}: {message_text}") # Extract URLs from message urls = processor.extract_urls(message_text) if not urls: logger.info("No URLs found in message") return logger.info(f"Found {len(urls)} URLs: {urls}") # Send initial acknowledgment for multiple URLs if len(urls) > 1: say( channel=channel, text=f"🔄 正在處理 {len(urls)} 個網址,請稍候..." ) # Process each URL asynchronously import threading for url in urls: logger.info(f"Creating thread for URL: {url}") thread = threading.Thread( target=lambda u=url: asyncio.run(process_url_async(u, channel, say)) ) thread.start() except Exception as e: logger.error(f"Error in message handler: {str(e)}", exc_info=True) say( channel=event.get('channel'), text="❌ 處理訊息時發生錯誤,請稍後再試" ) @slack_app.event("app_mention") def handle_app_mention(event, say, ack): """Handle app mentions""" ack() # 確認收到事件 logger.info(f"Received app mention: {event}") # 檢查訊息中是否包含 URL message_text = event.get('text', '') urls = processor.extract_urls(message_text) if urls: # 如果有 URL,則處理 URL logger.info(f"App mention contains URLs: {urls}") # Send initial acknowledgment say( channel=event['channel'], text=f"🔄 收到!正在處理 {len(urls)} 個網址..." ) # Process URLs in threads import threading for url in urls: logger.info(f"Creating thread for app mention URL: {url}") thread = threading.Thread( target=lambda u=url: asyncio.run(process_url_async(u, event['channel'], say)) ) thread.start() else: # 沒有 URL,回覆歡迎訊息 say( channel=event["channel"], text="👋 你好!我是網址摘要機器人。只要在頻道中貼上網址,我就會自動為你生成繁體中文摘要!" ) # FastAPI routes @api.get("/") async def root(): """Health check endpoint""" return {"status": "healthy", "service": "Slack URL Summarizer Bot"} @api.get("/health") async def health_check(): """Detailed health check""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "config": { "slack_configured": bool(config.slack_bot_token), "azure_openai_configured": bool(config.azure_openai_endpoint), } } @api.get("/slack/events") async def slack_events_get(): """Handle GET requests to slack events endpoint""" return {"message": "Slack events endpoint is ready", "methods": ["POST"]} @api.post("/slack/events") async def slack_events(request: Request): """Handle Slack events""" try: # Get the request body body = await request.body() # Parse JSON import json data = json.loads(body) # Handle URL verification challenge if data.get("type") == "url_verification": challenge = data.get("challenge") logger.info(f"Received URL verification challenge: {challenge}") return {"challenge": challenge} # Handle regular Slack events logger.info(f"Received Slack event: {data.get('type')}") return await handler.handle(request) except json.JSONDecodeError: logger.error("Invalid JSON in Slack request") raise HTTPException(status_code=400, detail="Invalid JSON") except Exception as e: logger.error(f"Error handling Slack event: {str(e)}") raise HTTPException(status_code=500, detail="Internal server error") # Error handling middleware @api.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error(f"Unhandled exception: {str(exc)}") raise HTTPException(status_code=500, detail="Internal server error") if __name__ == "__main__": # Run the FastAPI application uvicorn.run( "main:api", host="0.0.0.0", port=int(os.getenv("PORT", 7860)), log_level="info", reload=os.getenv("ENVIRONMENT") == "development" )