import os import json import argparse from typing import Dict, Any, List, Optional, Tuple from dotenv import load_dotenv import requests # Set up ENVIRONMENT VARIABLE load_dotenv() WORKSPACE_ID = os.getenv("WORKSPACE_ID") BITBUCKET_USERNAME = os.getenv("BITBUCKET_USERNAME") BITBUCKET_APP_PASSWORD = os.getenv("BITBUCKET_APP_PASSWORD") DEFAULT_BASE_URL = "https://api.bitbucket.org/2.0" SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL") SLACK_BOT_TOKEN = os.getenv("SLACK_BOT_TOKEN") SLACK_CHANNEL = os.getenv("SLACK_CHANNEL") def create_session(username: Optional[str] = None, app_password: Optional[str] = None) -> requests.Session: """ Create an authenticated session for Bitbucket Cloud using Basic Auth (username + App Password). Falls back to BITBUCKET_USERNAME and BITBUCKET_APP_PASSWORD environment variables. """ resolved_username = username or os.getenv("BITBUCKET_USERNAME") resolved_app_password = app_password or os.getenv("BITBUCKET_APP_PASSWORD") if not resolved_username or not resolved_app_password: raise ValueError( "Missing credentials. Provide --username and --app-password or set env vars " "BITBUCKET_USERNAME and BITBUCKET_APP_PASSWORD." ) session = requests.Session() session.auth = (resolved_username, resolved_app_password) session.headers.update({"Accept": "application/json"}) return session def _request_json(session: requests.Session, method: str, url: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: response = session.request(method, url, params=params, timeout=30) response.raise_for_status() return response.json() def _paginate_all(session: requests.Session, url: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """ Retrieve all pages for Bitbucket v2.0 endpoints that return 'values' and 'next'. """ aggregated_values: List[Dict[str, Any]] = [] next_url: Optional[str] = url query_params: Dict[str, Any] = dict(params or {}) if "pagelen" not in query_params: query_params["pagelen"] = 100 while next_url: data = _request_json(session, "GET", next_url, params=query_params if next_url == url else None) values = data.get("values", []) aggregated_values.extend(values) next_url = data.get("next") return aggregated_values def get_pull_request_overview(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> Dict[str, Any]: url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}" return _request_json(session, "GET", url) def get_pull_request_commits(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> List[Dict[str, Any]]: url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}/commits" return _paginate_all(session, url) def get_pull_request_comments(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> List[Dict[str, Any]]: url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}/comments" return _paginate_all(session, url) def get_pull_request_activity(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> List[Dict[str, Any]]: url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}/activity" return _paginate_all(session, url) def get_pull_request_diff(session: requests.Session, workspace: str, repo_slug: str, pr_id: int, base_url: str = DEFAULT_BASE_URL) -> str: """ Returns unified diff text for the PR. This endpoint returns text/plain. """ url = f"{base_url}/repositories/{workspace}/{repo_slug}/pullrequests/{pr_id}/diff" headers = {"Accept": "text/plain"} response = session.get(url, headers=headers, timeout=60) response.raise_for_status() return response.text def get_diff(session, diff_url): response = session.request('GET', diff_url, timeout=30) response.raise_for_status() return response.text def save_md_report(filename, content): with open(filename, "w", encoding="utf-8") as f: f.write(content) print(f"Saved {filename} file") def _send_via_webhook(message_text: str, blocks: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: """Gửi tin nhắn qua Incoming Webhook.""" if not SLACK_WEBHOOK_URL: raise ValueError("Missing SLACK_WEBHOOK_URL for webhook message") payload: Dict[str, Any] = {"text": message_text} if blocks is not None: payload["blocks"] = blocks response = requests.post(SLACK_WEBHOOK_URL, json=payload, timeout=15) if 200 <= response.status_code < 300 and response.text.strip().lower() in ("ok", ""): return { "ok": True, "method": "webhook", "status_code": response.status_code, "response": response.text, } raise ValueError(f"Slack webhook failed (status {response.status_code}): {response.text}") def _send_via_api( message_text: str, *, channel: Optional[str] = None, blocks: Optional[List[Dict[str, Any]]] = None, thread_ts: Optional[str] = None, ) -> Dict[str, Any]: """Gửi tin nhắn qua Slack Web API (chat.postMessage).""" if not SLACK_BOT_TOKEN or not (channel or SLACK_CHANNEL): raise ValueError("Missing SLACK_BOT_TOKEN or channel for Web API message") api_url = "https://slack.com/api/chat.postMessage" headers = { "Authorization": f"Bearer {SLACK_BOT_TOKEN}", "Content-Type": "application/json", } payload: Dict[str, Any] = { "channel": channel or SLACK_CHANNEL, "text": message_text, } if blocks is not None: payload["blocks"] = blocks if thread_ts is not None: payload["thread_ts"] = thread_ts response = requests.post(api_url, headers=headers, json=payload, timeout=15) try: response.raise_for_status() except Exception as exc: raise ValueError(f"Slack API request failed: {exc}") from exc data = response.json() if not data.get("ok", False): raise ValueError(f"Slack API responded with error: {data.get('error', 'unknown_error')}") return { "ok": True, "method": "web_api", "status_code": response.status_code, "response": data, } def send_slack_message( message_text: str, *, channel: Optional[str] = None, blocks: Optional[List[Dict[str, Any]]] = None, use_webhook: bool = False, ) -> Dict[str, Any]: """ Gửi tin nhắn mới vào Slack (không dùng thread). Có thể chọn gửi qua Webhook hoặc Web API. """ if use_webhook: return _send_via_webhook(message_text, blocks) return _send_via_api(message_text, channel=channel, blocks=blocks) def reply_slack_thread( thread_ts: str, message_text: str, *, channel: Optional[str] = None, blocks: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """ Trả lời một tin nhắn trong thread (yêu cầu thread_ts). Bắt buộc dùng Web API (webhook không hỗ trợ thread). """ return _send_via_api(message_text, channel=channel, blocks=blocks, thread_ts=thread_ts) def list_slack_users(*, bot_token: Optional[str] = None, include_bots: bool = True) -> List[Dict[str, Any]]: """ Trả về danh sách user Slack sử dụng Web API users.list (có phân trang). - bot_token: ưu tiên dùng tham số, nếu không có sẽ lấy SLACK_BOT_TOKEN từ env - include_bots: False để lọc bỏ bot """ token = bot_token or SLACK_BOT_TOKEN if not token: raise ValueError("Missing Slack bot token. Provide bot_token or set SLACK_BOT_TOKEN.") users: List[Dict[str, Any]] = [] cursor: Optional[str] = None headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/x-www-form-urlencoded; charset=utf-8", } while True: params: Dict[str, Any] = {"limit": 200} if cursor: params["cursor"] = cursor resp = requests.get("https://slack.com/api/users.list", headers=headers, params=params, timeout=30) resp.raise_for_status() data = resp.json() if not data.get("ok", False): raise ValueError(f"Slack API responded with error: {data.get('error', 'unknown_error')}") page_members = data.get("members", []) if not include_bots: page_members = [m for m in page_members if not m.get("is_bot", False)] users.extend(page_members) cursor = (data.get("response_metadata") or {}).get("next_cursor") if not cursor: break return users def get_slack_usergroups(bot_token: str): """ Lấy danh sách Slack User Groups (subteams). Args: bot_token (str): Slack Bot Token (bắt đầu bằng xoxb-...). Returns: list[dict]: Danh sách user groups gồm {id, handle, name, description}. """ url = "https://slack.com/api/usergroups.list" headers = { "Authorization": f"Bearer {bot_token}" } response = requests.get(url, headers=headers) if response.status_code == 200: data = response.json() if data.get("ok"): usergroups = [] for group in data.get("usergroups", []): usergroups.append({ "id": group.get("id"), # subteam ID "handle": group.get("handle"), # ví dụ: dev, qa "name": group.get("name"), # tên đầy đủ "description": group.get("description") # mô tả }) return usergroups else: raise Exception(f"Slack API error: {data.get('error')}") else: raise Exception(f"HTTP Error: {response.status_code}") def main() -> None: # parser = argparse.ArgumentParser(description="Fetch full details of a Bitbucket Cloud pull request.") # parser.add_argument("--username", required=True, help="Bitbucket username") # parser.add_argument("--password", required=True, help="Bitbucket app password") # parser.add_argument("--workspace", required=True, help="Bitbucket workspace ID or slug") # parser.add_argument("--repo", required=True, help="Repository slug") # parser.add_argument("--pr", required=True, type=int, help="Pull request ID") # parser.add_argument("--username", help="Bitbucket username (or set BITBUCKET_USERNAME)") # parser.add_argument("--app-password", help="Bitbucket App Password (or set BITBUCKET_APP_PASSWORD)") # parser.add_argument( # "--include", # nargs="*", # choices=["overview", "commits", "comments", "activity", "diff"], # help="Sections to include. Default: all", # ) # parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Base API URL. Default is Bitbucket Cloud v2.0") # args = parser.parse_args() session = create_session(BITBUCKET_USERNAME, BITBUCKET_APP_PASSWORD) data = get_pull_request_overview( session = session, workspace = WORKSPACE_ID, # args.workspace, repo_slug = REPO_SLUG, pr_id = PR_ID, base_url = DEFAULT_BASE_URL ) # Lấy các thông tin cần thiết title = data["title"] description = data["description"] source_branch = data["source"]["branch"]["name"] destination_branch = data["destination"]["branch"]["name"] author = data["author"]["display_name"] created_on = data["created_on"] reviewers = data.get("reviewers", []) reviewer_mentions = " ".join([f"@{r.get('nickname')}" for r in reviewers]) if reviewers else "None" pr_link = data["links"]["html"]["href"] changelog = data["summary"]["raw"] diff_url = data["links"]["diff"]["href"] pr_report = f""" # [{args.repo}] PR #{args.pr}: {title} *{description}* **🌿 Branch Information:** - **Source Branch:** {source_branch} → **Target Branch:** {destination_branch} **👤 Người tạo:** {author} **📅 Thời gian tạo:** 2024-08-20 14:30:00 +07:00 **👥 Reviewers:** {reviewer_mentions} **🔗 Link Pull Request:** [PR #{args.pr}: Implement user authentication system]({pr_link}) --- ## 📝 Changelogs: """ save_md_report("PR.md", pr_report) data = get_diff(session, diff_url) save_md_report("DIFF.md", data) if __name__ == "__main__": send_slack_message( message_text = "Hello", webhook_url = SLACK_WEBHOOK_URL, bot_token = SLACK_BOT_TOKEN, channel = "pull-request" )