pr-webhook / tool.py
hari-huynh
Add code to send mesage to specific thread in Slack
04417e3
import os
import json
import argparse
from typing import Dict, Any, List, Optional, Tuple
from dotenv import load_dotenv
import requests
# Set up ENVIRONMENT VARIABLE
load_dotenv()
WORKSPACE_ID = os.getenv("WORKSPACE_ID")
BITBUCKET_USERNAME = os.getenv("BITBUCKET_USERNAME")
BITBUCKET_APP_PASSWORD = os.getenv("BITBUCKET_APP_PASSWORD")
DEFAULT_BASE_URL = "https://api.bitbucket.org/2.0"
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL")
SLACK_BOT_TOKEN = os.getenv("SLACK_BOT_TOKEN")
SLACK_CHANNEL = os.getenv("SLACK_CHANNEL")
def create_session(username: Optional[str] = None, app_password: Optional[str] = None) -> requests.Session:
"""
Create an authenticated session for Bitbucket Cloud using Basic Auth (username + App Password).
Falls back to BITBUCKET_USERNAME and BITBUCKET_APP_PASSWORD environment variables.
"""
resolved_username = username or os.getenv("BITBUCKET_USERNAME")
resolved_app_password = app_password or os.getenv("BITBUCKET_APP_PASSWORD")
if not resolved_username or not resolved_app_password:
raise ValueError(
"Missing credentials. Provide --username and --app-password or set env vars "
"BITBUCKET_USERNAME and BITBUCKET_APP_PASSWORD."
)
session = requests.Session()
session.auth = (resolved_username, resolved_app_password)
session.headers.update({"Accept": "application/json"})
return session
def _request_json(session: requests.Session, method: str, url: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
response = session.request(method, url, params=params, timeout=30)
response.raise_for_status()
return response.json()
def _paginate_all(session: requests.Session, url: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
Retrieve all pages for Bitbucket v2.0 endpoints that return 'values' and 'next'.
"""
aggregated_values: List[Dict[str, Any]] = []
next_url: Optional[str] = url
query_params: Dict[str, Any] = dict(params or {})
if "pagelen" not in query_params:
query_params["pagelen"] = 100
while next_url:
data = _request_json(session, "GET", next_url, params=query_params if next_url == url else None)
values = data.get("values", [])
aggregated_values.extend(values)
next_url = data.get("next")
return aggregated_values
def get_pull_request_overview(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> Dict[str, Any]:
url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}"
return _request_json(session, "GET", url)
def get_pull_request_commits(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> List[Dict[str, Any]]:
url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}/commits"
return _paginate_all(session, url)
def get_pull_request_comments(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> List[Dict[str, Any]]:
url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}/comments"
return _paginate_all(session, url)
def get_pull_request_activity(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> List[Dict[str, Any]]:
url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}/activity"
return _paginate_all(session, url)
def get_pull_request_diff(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> str:
"""
Returns unified diff text for the PR. This endpoint returns text/plain.
"""
url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}/diff"
headers = {"Accept": "text/plain"}
response = session.get(url, headers=headers, timeout=60)
response.raise_for_status()
return response.text
def get_diff(session, diff_url):
response = session.request('GET', diff_url, timeout=30)
response.raise_for_status()
return response.text
def save_md_report(filename, content):
with open(filename, "w", encoding="utf-8") as f:
f.write(content)
print(f"Saved {filename} file")
def _send_via_webhook(message_text: str, blocks: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
"""Gửi tin nhắn qua Incoming Webhook."""
if not SLACK_WEBHOOK_URL:
raise ValueError("Missing SLACK_WEBHOOK_URL for webhook message")
payload: Dict[str, Any] = {"text": message_text}
if blocks is not None:
payload["blocks"] = blocks
response = requests.post(SLACK_WEBHOOK_URL, json=payload, timeout=15)
if 200 <= response.status_code < 300 and response.text.strip().lower() in ("ok", ""):
return {
"ok": True,
"method": "webhook",
"status_code": response.status_code,
"response": response.text,
}
raise ValueError(f"Slack webhook failed (status {response.status_code}): {response.text}")
def _send_via_api(
message_text: str,
*,
channel: Optional[str] = None,
blocks: Optional[List[Dict[str, Any]]] = None,
thread_ts: Optional[str] = None,
) -> Dict[str, Any]:
"""Gửi tin nhắn qua Slack Web API (chat.postMessage)."""
if not SLACK_BOT_TOKEN or not (channel or SLACK_CHANNEL):
raise ValueError("Missing SLACK_BOT_TOKEN or channel for Web API message")
api_url = "https://slack.com/api/chat.postMessage"
headers = {
"Authorization": f"Bearer {SLACK_BOT_TOKEN}",
"Content-Type": "application/json",
}
payload: Dict[str, Any] = {
"channel": channel or SLACK_CHANNEL,
"text": message_text,
}
if blocks is not None:
payload["blocks"] = blocks
if thread_ts is not None:
payload["thread_ts"] = thread_ts
response = requests.post(api_url, headers=headers, json=payload, timeout=15)
try:
response.raise_for_status()
except Exception as exc:
raise ValueError(f"Slack API request failed: {exc}") from exc
data = response.json()
if not data.get("ok", False):
raise ValueError(f"Slack API responded with error: {data.get('error', 'unknown_error')}")
return {
"ok": True,
"method": "web_api",
"status_code": response.status_code,
"response": data,
}
def send_slack_message(
message_text: str,
*,
channel: Optional[str] = None,
blocks: Optional[List[Dict[str, Any]]] = None,
use_webhook: bool = False,
) -> Dict[str, Any]:
"""
Gửi tin nhắn mới vào Slack (không dùng thread).
Có thể chọn gửi qua Webhook hoặc Web API.
"""
if use_webhook:
return _send_via_webhook(message_text, blocks)
return _send_via_api(message_text, channel=channel, blocks=blocks)
def reply_slack_thread(
thread_ts: str,
message_text: str,
*,
channel: Optional[str] = None,
blocks: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""
Trả lời một tin nhắn trong thread (yêu cầu thread_ts).
Bắt buộc dùng Web API (webhook không hỗ trợ thread).
"""
return _send_via_api(message_text, channel=channel, blocks=blocks, thread_ts=thread_ts)
def list_slack_users(*, bot_token: Optional[str] = None, include_bots: bool = True) -> List[Dict[str, Any]]:
"""
Trả về danh sách user Slack sử dụng Web API users.list (có phân trang).
- bot_token: ưu tiên dùng tham số, nếu không có sẽ lấy SLACK_BOT_TOKEN từ env
- include_bots: False để lọc bỏ bot
"""
token = bot_token or SLACK_BOT_TOKEN
if not token:
raise ValueError("Missing Slack bot token. Provide bot_token or set SLACK_BOT_TOKEN.")
users: List[Dict[str, Any]] = []
cursor: Optional[str] = None
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8",
}
while True:
params: Dict[str, Any] = {"limit": 200}
if cursor:
params["cursor"] = cursor
resp = requests.get("https://slack.com/api/users.list", headers=headers, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
if not data.get("ok", False):
raise ValueError(f"Slack API responded with error: {data.get('error', 'unknown_error')}")
page_members = data.get("members", [])
if not include_bots:
page_members = [m for m in page_members if not m.get("is_bot", False)]
users.extend(page_members)
cursor = (data.get("response_metadata") or {}).get("next_cursor")
if not cursor:
break
return users
def get_slack_usergroups(bot_token: str):
"""
Lấy danh sách Slack User Groups (subteams).
Args:
bot_token (str): Slack Bot Token (bắt đầu bằng xoxb-...).
Returns:
list[dict]: Danh sách user groups gồm {id, handle, name, description}.
"""
url = "https://slack.com/api/usergroups.list"
headers = {
"Authorization": f"Bearer {bot_token}"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
if data.get("ok"):
usergroups = []
for group in data.get("usergroups", []):
usergroups.append({
"id": group.get("id"), # subteam ID
"handle": group.get("handle"), # ví dụ: dev, qa
"name": group.get("name"), # tên đầy đủ
"description": group.get("description") # mô tả
})
return usergroups
else:
raise Exception(f"Slack API error: {data.get('error')}")
else:
raise Exception(f"HTTP Error: {response.status_code}")
def main() -> None:
# parser = argparse.ArgumentParser(description="Fetch full details of a Bitbucket Cloud pull request.")
# parser.add_argument("--username", required=True, help="Bitbucket username")
# parser.add_argument("--password", required=True, help="Bitbucket app password")
# parser.add_argument("--workspace", required=True, help="Bitbucket workspace ID or slug")
# parser.add_argument("--repo", required=True, help="Repository slug")
# parser.add_argument("--pr", required=True, type=int, help="Pull request ID")
# parser.add_argument("--username", help="Bitbucket username (or set BITBUCKET_USERNAME)")
# parser.add_argument("--app-password", help="Bitbucket App Password (or set BITBUCKET_APP_PASSWORD)")
# parser.add_argument(
# "--include",
# nargs="*",
# choices=["overview", "commits", "comments", "activity", "diff"],
# help="Sections to include. Default: all",
# )
# parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Base API URL. Default is Bitbucket Cloud v2.0")
# args = parser.parse_args()
session = create_session(BITBUCKET_USERNAME, BITBUCKET_APP_PASSWORD)
data = get_pull_request_overview(
session = session,
workspace = WORKSPACE_ID, # args.workspace,
repo_slug = REPO_SLUG,
pr_id = PR_ID,
base_url = DEFAULT_BASE_URL
)
# Lấy các thông tin cần thiết
title = data["title"]
description = data["description"]
source_branch = data["source"]["branch"]["name"]
destination_branch = data["destination"]["branch"]["name"]
author = data["author"]["display_name"]
created_on = data["created_on"]
reviewers = data.get("reviewers", [])
reviewer_mentions = " ".join([f"@{r.get('nickname')}" for r in reviewers]) if reviewers else "None"
pr_link = data["links"]["html"]["href"]
changelog = data["summary"]["raw"]
diff_url = data["links"]["diff"]["href"]
pr_report = f"""
# [{args.repo}] PR #{args.pr}: {title}
*{description}*
**🌿 Branch Information:**
- **Source Branch:** {source_branch} → **Target Branch:** {destination_branch}
**👤 Người tạo:** {author}
**📅 Thời gian tạo:** 2024-08-20 14:30:00 +07:00
**👥 Reviewers:**
{reviewer_mentions}
**🔗 Link Pull Request:** [PR #{args.pr}: Implement user authentication system]({pr_link})
---
## 📝 Changelogs:
"""
save_md_report("PR.md", pr_report)
data = get_diff(session, diff_url)
save_md_report("DIFF.md", data)
if __name__ == "__main__":
send_slack_message(
message_text = "Hello",
webhook_url = SLACK_WEBHOOK_URL,
bot_token = SLACK_BOT_TOKEN,
channel = "pull-request"
)