slack_url_bot / main.py
tonebeta's picture
chore: replace newspaper3k with newspaper4k to fix syntax warnings
ea984aa
import os
import re
import asyncio
import logging
from datetime import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI, Request, HTTPException
from slack_bolt import App
from slack_bolt.adapter.fastapi import SlackRequestHandler
from newspaper import Article
import uvicorn
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
@dataclass
class Config:
slack_bot_token: Optional[str] = os.getenv('SLACK_BOT_TOKEN')
slack_signing_secret: Optional[str] = os.getenv('SLACK_SIGNING_SECRET')
azure_openai_endpoint: Optional[str] = os.getenv('AZURE_OPENAI_ENDPOINT')
azure_openai_api_key: Optional[str] = os.getenv('AZURE_OPENAI_API_KEY')
azure_openai_deployment_name: str = os.getenv('AZURE_OPENAI_DEPLOYMENT_NAME', 'gpt-4')
azure_openai_api_version: str = os.getenv('AZURE_OPENAI_API_VERSION', '2025-01-01')
max_content_length: int = 10000
processing_timeout: int = 30
config = Config()
# Early validation for required environment variables
required_vars = {
'SLACK_BOT_TOKEN': config.slack_bot_token,
'SLACK_SIGNING_SECRET': config.slack_signing_secret,
'AZURE_OPENAI_ENDPOINT': config.azure_openai_endpoint,
'AZURE_OPENAI_API_KEY': config.azure_openai_api_key
}
missing = [k for k, v in required_vars.items() if not v]
if missing:
error_msg = f"Missing required environment variables: {', '.join(missing)}. Please set these in Hugging Face Space Secrets."
logger.error(error_msg)
# Don't raise here yet to allow the process to be visible in logs
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI lifespan event handler"""
# Startup
logger.info("Starting Slack URL Summarizer Bot")
# Validate configuration
required_vars = [
'SLACK_BOT_TOKEN',
'SLACK_SIGNING_SECRET',
'AZURE_OPENAI_ENDPOINT',
'AZURE_OPENAI_API_KEY'
]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
logger.error(f"Missing required environment variables: {', '.join(missing_vars)}")
raise Exception(f"Missing required environment variables: {', '.join(missing_vars)}")
logger.info("Bot started successfully")
yield
# Shutdown
logger.info("Shutting down Slack URL Summarizer Bot")
if hasattr(processor, 'http_client'):
await processor.http_client.aclose()
# Initialize Slack app
try:
slack_app = App(
token=config.slack_bot_token,
signing_secret=config.slack_signing_secret,
process_before_response=True,
# 暫時停用簽名驗證進行測試
request_verification_enabled=False
)
except Exception as e:
logger.error(f"Failed to initialize Slack App: {str(e)}")
if missing:
logger.error(f"CRITICAL: The following environment variables are MISSING: {', '.join(missing)}")
# We still need a slack_app object for handler, but it will be broken
slack_app = None
# Initialize FastAPI with lifespan
api = FastAPI(title="Slack URL Summarizer Bot", lifespan=lifespan)
handler = SlackRequestHandler(slack_app)
class URLProcessor:
"""Core URL processing functionality"""
def __init__(self, config: Config):
self.config = config
self.http_client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0),
follow_redirects=True
)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.http_client.aclose()
def extract_urls(self, text: str) -> List[str]:
"""Extract all URLs from message text"""
pattern = r'https?://[^\s<>"{\[\]|\\^`]+'
urls = re.findall(pattern, text)
logger.info(f"Extracted {len(urls)} URLs from message")
return urls
async def extract_content(self, url: str) -> Dict:
"""Extract main content from URL"""
try:
logger.info(f"Extracting content from: {url}")
# 設定更好的用戶代理來避免被阻擋
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# 先嘗試使用 httpx 直接獲取內容
try:
response = await self.http_client.get(url, headers=headers)
response.raise_for_status()
# 使用 newspaper4k 解析 HTML 內容
article = Article(url)
article.set_html(response.text)
article.parse()
except Exception as e:
# 如果 httpx 失敗,嘗試 newspaper4k 的原始方法
logger.warning(f"Direct HTTP request failed, trying newspaper4k: {str(e)}")
article = Article(url)
# 設定用戶代理
article.config.browser_user_agent = headers['User-Agent']
article.download()
article.parse()
# 驗證內容
if not article.text or len(article.text.strip()) < 50:
# 如果提取的內容太少,嘗試使用基本的網頁內容
if 'response' in locals() and response.text:
# 簡單的 HTML 解析
import re
from html import unescape
# 移除 HTML 標籤
text = re.sub(r'<[^>]+>', '', response.text)
text = unescape(text)
text = re.sub(r'\s+', ' ', text).strip()
if len(text) > 100:
# 取前 3000 字符作為內容
text = text[:3000] + "..." if len(text) > 3000 else text
result = {
'title': url.split('/')[-1].replace('-', ' ').title(),
'text': text,
'authors': [],
'publish_date': None,
'url': url
}
logger.info(f"Successfully extracted content using fallback method from {url}")
return result
raise Exception("Insufficient content extracted")
# 截斷內容如果太長
text = article.text
if len(text) > self.config.max_content_length:
text = text[:self.config.max_content_length] + "..."
result = {
'title': article.title or "No title available",
'text': text,
'authors': article.authors,
'publish_date': article.publish_date,
'url': url
}
logger.info(f"Successfully extracted content from {url}")
return result
except Exception as e:
logger.error(f"Error extracting content from {url}: {str(e)}")
# 最後的備用方案:返回基本信息讓 AI 處理
fallback_result = {
'title': f"無法完全提取內容的網頁: {url}",
'text': f"由於網站限制,無法提取完整內容。網址: {url}. 請嘗試直接訪問該網站查看內容。",
'authors': [],
'publish_date': None,
'url': url
}
logger.info(f"Using fallback content for {url}")
return fallback_result
async def summarize_and_translate(self, content: Dict) -> str:
"""Summarize content and translate to Traditional Chinese using Azure OpenAI"""
try:
logger.info(f"Summarizing content for: {content['url']}")
# 檢查是否為備用內容
if "無法完全提取內容" in content['title']:
prompt = f"""這個網址因為網站限制無法完全提取內容:{content['url']}
請用繁體中文回覆一個友善的訊息,說明:
1. 由於網站的保護機制,無法自動提取該網頁的完整內容
2. 建議用戶直接點擊連結查看完整內容
3. 如果是知名網站,可以簡單說明該網站的性質(如新聞、技術等)
請保持簡潔友善的語調。"""
# 對於備用內容,使用簡化的處理
url = f"{self.config.azure_openai_endpoint}/openai/deployments/{self.config.azure_openai_deployment_name}/chat/completions?api-version={self.config.azure_openai_api_version}"
headers = {
"Content-Type": "application/json",
"api-key": self.config.azure_openai_api_key,
}
body = {
"messages": [
{
"role": "system",
"content": "你是一個友善的助手,會提供實用的建議。"
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.3,
"max_tokens": 300
}
response = await self.http_client.post(url, headers=headers, json=body)
response.raise_for_status()
result = response.json()
summary = result["choices"][0]["message"]["content"].strip()
# 提取 token 使用量資訊
usage_info = result.get("usage", {})
token_stats = {
"prompt_tokens": usage_info.get("prompt_tokens", 0),
"completion_tokens": usage_info.get("completion_tokens", 0),
"total_tokens": usage_info.get("total_tokens", 0)
}
logger.info(f"Generated fallback response for: {content['url']}")
logger.info(f"Token usage - Prompt: {token_stats['prompt_tokens']}, Completion: {token_stats['completion_tokens']}, Total: {token_stats['total_tokens']}")
return summary, token_stats
else:
# 正常的摘要處理
prompt = f"""請將以下文章摘要成 3-5 句重點,並翻譯為繁體中文。請確保摘要簡潔明瞭且包含最重要的資訊:
標題:{content['title']}
內容:{content['text']}
請用繁體中文回覆摘要。"""
# Azure OpenAI API call
url = f"{self.config.azure_openai_endpoint}/openai/deployments/{self.config.azure_openai_deployment_name}/chat/completions?api-version={self.config.azure_openai_api_version}"
headers = {
"Content-Type": "application/json",
"api-key": self.config.azure_openai_api_key,
}
body = {
"messages": [
{
"role": "system",
"content": "你是一個專業的技術文章摘要與翻譯專家,精通各種技術領域,能夠準確保留技術術語、專有名詞、數據細節,並將內容翻譯成自然流暢的繁體中文。你特別擅長處理科技、醫療、商業和學術文章,能夠識別並保留重要的技術細節。"
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.3,
"max_tokens": 800
}
response = await self.http_client.post(url, headers=headers, json=body)
response.raise_for_status()
result = response.json()
summary = result["choices"][0]["message"]["content"].strip()
logger.info(f"Successfully generated summary for: {content['url']}")
return summary
except Exception as e:
logger.error(f"Error in summarization: {str(e)}")
# 回傳錯誤時也要保持 tuple 格式
error_summary = f"抱歉,AI 處理時發生錯誤。錯誤訊息:{str(e)}"
error_token_stats = {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
return error_summary, error_token_stats
def format_response(self, url: str, title: str, summary: str, token_stats: dict = None) -> str:
"""Format the response message"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response = f"""🔗 原始網址: {url}
📰 標題: {title}
{summary}
---
⏰ 處理時間: {timestamp}"""
# 加入 token 使用統計
if token_stats:
response += f"""
📊 Token 使用量: 輸入 {token_stats['prompt_tokens']} + 輸出 {token_stats['completion_tokens']} = 總計 {token_stats['total_tokens']} tokens"""
return response
def format_error_response(self, url: str, error_message: str) -> str:
"""Format error response message"""
return f"""❌ 處理失敗: {url}
🔍 錯誤原因: {error_message}
💡 建議: 請檢查網址是否正確或稍後再試"""
# Global processor instance and deduplication cache
processor = URLProcessor(config)
processing_cache = set() # 用於去重的快取
async def process_url_async(url: str, channel: str, say):
"""Asynchronous URL processing pipeline"""
# 建立唯一的處理 ID
process_id = f"{url}:{channel}:{int(datetime.now().timestamp())//60}" # 每分鐘重置
# 檢查是否已經在處理中
if process_id in processing_cache:
logger.info(f"URL {url} is already being processed, skipping duplicate")
return
# 添加到處理快取
processing_cache.add(process_id)
try:
logger.info(f"Starting to process URL: {url}")
async with URLProcessor(config) as proc:
# Step 1: Extract content
logger.info(f"Step 1: Extracting content from {url}")
content = await proc.extract_content(url)
logger.info(f"Content extracted successfully. Title: {content.get('title', 'N/A')}")
# Step 2: Summarize and translate
logger.info(f"Step 2: Summarizing and translating content for {url}")
try:
result = await proc.summarize_and_translate(content)
# 處理回傳值 - 可能是 tuple 或只是 string
if isinstance(result, tuple):
summary, token_stats = result
else:
summary = result
token_stats = None
logger.info(f"Summary generated successfully for {url}")
except Exception as e:
logger.error(f"Error in summarization, trying fallback: {str(e)}")
# 如果 AI 處理失敗,提供基本回應
summary = f"抱歉,由於技術問題無法生成摘要。請直接查看原始網址:{url}"
token_stats = None
# Step 3: Format and send response
logger.info(f"Step 3: Formatting and sending response for {url}")
response = proc.format_response(url, content['title'], summary, token_stats)
# Send to Slack (使用同步的 say 函數)
say(channel=channel, text=response)
logger.info(f"Successfully processed and sent response for: {url}")
except Exception as e:
logger.error(f"Error processing URL {url}: {str(e)}", exc_info=True)
error_message = processor.format_error_response(url, str(e))
say(channel=channel, text=error_message)
finally:
# 處理完成後從快取中移除(延遲5秒)
import threading
def remove_from_cache():
import time
time.sleep(5)
processing_cache.discard(process_id)
threading.Thread(target=remove_from_cache).start()
# Slack event handlers
@slack_app.event("message")
def handle_message(event, say, ack):
"""Handle incoming Slack messages"""
ack() # 確認收到事件
try:
logger.info(f"Received message event: {event}")
# Skip bot messages
if event.get('bot_id'):
logger.info("Skipping bot message")
return
# Skip app_mention events (這些會由 handle_app_mention 處理)
if event.get('type') == 'app_mention':
logger.info("Skipping app_mention in message handler")
return
# Skip messages without text
if 'text' not in event:
logger.info("Skipping message without text")
return
message_text = event.get('text', '')
channel = event.get('channel')
user = event.get('user')
# 檢查是否為提及機器人的訊息 (避免重複處理)
if '<@U094J502LLC>' in message_text:
logger.info("Skipping mention message in message handler (will be handled by app_mention)")
return
logger.info(f"Processing message from user {user} in channel {channel}: {message_text}")
# Extract URLs from message
urls = processor.extract_urls(message_text)
if not urls:
logger.info("No URLs found in message")
return
logger.info(f"Found {len(urls)} URLs: {urls}")
# Send initial acknowledgment for multiple URLs
if len(urls) > 1:
say(
channel=channel,
text=f"🔄 正在處理 {len(urls)} 個網址,請稍候..."
)
# Process each URL asynchronously
import threading
for url in urls:
logger.info(f"Creating thread for URL: {url}")
thread = threading.Thread(
target=lambda u=url: asyncio.run(process_url_async(u, channel, say))
)
thread.start()
except Exception as e:
logger.error(f"Error in message handler: {str(e)}", exc_info=True)
say(
channel=event.get('channel'),
text="❌ 處理訊息時發生錯誤,請稍後再試"
)
@slack_app.event("app_mention")
def handle_app_mention(event, say, ack):
"""Handle app mentions"""
ack() # 確認收到事件
logger.info(f"Received app mention: {event}")
# 檢查訊息中是否包含 URL
message_text = event.get('text', '')
urls = processor.extract_urls(message_text)
if urls:
# 如果有 URL,則處理 URL
logger.info(f"App mention contains URLs: {urls}")
# Send initial acknowledgment
say(
channel=event['channel'],
text=f"🔄 收到!正在處理 {len(urls)} 個網址..."
)
# Process URLs in threads
import threading
for url in urls:
logger.info(f"Creating thread for app mention URL: {url}")
thread = threading.Thread(
target=lambda u=url: asyncio.run(process_url_async(u, event['channel'], say))
)
thread.start()
else:
# 沒有 URL,回覆歡迎訊息
say(
channel=event["channel"],
text="👋 你好!我是網址摘要機器人。只要在頻道中貼上網址,我就會自動為你生成繁體中文摘要!"
)
# FastAPI routes
@api.get("/")
async def root():
"""Health check endpoint"""
return {"status": "healthy", "service": "Slack URL Summarizer Bot"}
@api.get("/health")
async def health_check():
"""Detailed health check"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"config": {
"slack_configured": bool(config.slack_bot_token),
"azure_openai_configured": bool(config.azure_openai_endpoint),
}
}
@api.get("/slack/events")
async def slack_events_get():
"""Handle GET requests to slack events endpoint"""
return {"message": "Slack events endpoint is ready", "methods": ["POST"]}
@api.post("/slack/events")
async def slack_events(request: Request):
"""Handle Slack events"""
try:
# Get the request body
body = await request.body()
# Parse JSON
import json
data = json.loads(body)
# Handle URL verification challenge
if data.get("type") == "url_verification":
challenge = data.get("challenge")
logger.info(f"Received URL verification challenge: {challenge}")
return {"challenge": challenge}
# Handle regular Slack events
logger.info(f"Received Slack event: {data.get('type')}")
return await handler.handle(request)
except json.JSONDecodeError:
logger.error("Invalid JSON in Slack request")
raise HTTPException(status_code=400, detail="Invalid JSON")
except Exception as e:
logger.error(f"Error handling Slack event: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
# Error handling middleware
@api.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {str(exc)}")
raise HTTPException(status_code=500, detail="Internal server error")
if __name__ == "__main__":
# Run the FastAPI application
uvicorn.run(
"main:api",
host="0.0.0.0",
port=int(os.getenv("PORT", 7860)),
log_level="info",
reload=os.getenv("ENVIRONMENT") == "development"
)